diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 22ba14b..0a5f5ee 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(timestamp) add_subdirectory(tuple) add_subdirectory(packet) +add_subdirectory(dupkt) add_subdirectory(session) add_subdirectory(stellar) \ No newline at end of file diff --git a/src/dupkt/CMakeLists.txt b/src/dupkt/CMakeLists.txt new file mode 100644 index 0000000..1dc6ad5 --- /dev/null +++ b/src/dupkt/CMakeLists.txt @@ -0,0 +1,12 @@ +############################################################################### +# dupkt_filter +############################################################################### + +add_library(dupkt_filter dupkt_filter.cpp) +target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/dupkt) +target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) +target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp) +target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/deps/dablooms) +target_link_libraries(dupkt_filter packet timestamp dablooms) + +add_subdirectory(test) \ No newline at end of file diff --git a/src/dupkt/dupkt_filter.cpp b/src/dupkt/dupkt_filter.cpp new file mode 100644 index 0000000..ee2bef2 --- /dev/null +++ b/src/dupkt/dupkt_filter.cpp @@ -0,0 +1,145 @@ +#include + +#include "timestamp.h" +#include "dablooms.h" +#include "tcp_helpers.h" +#include "ipv4_helpers.h" +#include "dupkt_filter.h" + +struct packet_identify +{ + // TCP + uint32_t seq; + uint32_t ack; + uint16_t sport; + uint16_t dport; + uint16_t l4_checksum; + + // IPv4 + uint16_t ip_id; + uint32_t ip_src; + uint32_t ip_dst; +} __attribute__((__packed__)); + +struct dupkt_filter +{ + uint8_t enable; + unsigned int capacity; + double error_rate; + int timeout_s; + + struct expiry_dablooms_handle *handle; +}; + +// return 0: success +// reutrn -1: error +static inline int packet_get_identify(const struct packet *packet, struct packet_identify *key) +{ + const struct layer_record *ipv4_layer = packet_get_innermost_layer(packet, LAYER_TYPE_IPV4); + if (ipv4_layer == NULL) + { + return -1; + } + const struct layer_record *tcp_layer = packet_get_innermost_layer(packet, LAYER_TYPE_TCP); + if (tcp_layer == NULL) + { + return -1; + } + + memset(key, 0, sizeof(struct packet_identify)); + + const struct ip *iphdr = (const struct ip *)ipv4_layer->hdr_ptr; + key->ip_id = ipv4_hdr_get_ipid(iphdr); + key->ip_src = ipv4_hdr_get_src(iphdr); + key->ip_dst = ipv4_hdr_get_dst(iphdr); + + const struct tcphdr *tcphdr = (const struct tcphdr *)tcp_layer->hdr_ptr; + key->seq = tcp_hdr_get_seq(tcphdr); + key->ack = tcp_hdr_get_ack(tcphdr); + key->sport = tcp_hdr_get_sport(tcphdr); + key->dport = tcp_hdr_get_dport(tcphdr); + key->l4_checksum = tcp_hdr_get_checksum(tcphdr); + + return 0; +} + +struct dupkt_filter *dupkt_filter_create(uint8_t enable, unsigned int capacity, double error_rate, int timeout_s) +{ + struct dupkt_filter *filter = (struct dupkt_filter *)calloc(1, sizeof(struct dupkt_filter)); + if (filter == NULL) + { + return NULL; + } + + filter->enable = enable; + filter->capacity = capacity; + filter->error_rate = error_rate; + filter->timeout_s = timeout_s; + + if (filter->enable == 0) + { + return filter; + } + + filter->handle = expiry_dablooms_init(filter->capacity, filter->error_rate, timestamp_get_sec(), filter->timeout_s); + if (filter->handle == NULL) + { + free(filter); + return NULL; + } + + return filter; +} + +void dupkt_filter_destroy(struct dupkt_filter *filter) +{ + if (filter) + { + if (filter->handle) + { + expiry_dablooms_destroy(filter->handle); + filter->handle = NULL; + } + free(filter); + filter = NULL; + } +} + +// return 1: found +// reutrn 0: no found +int dupkt_filter_search(struct dupkt_filter *filter, const struct packet *packet) +{ + if (filter->enable == 0) + { + return 0; + } + + struct packet_identify identify; + if (packet_get_identify(packet, &identify) == -1) + { + return 0; + } + + if (expiry_dablooms_search(filter->handle, (const char *)&identify, sizeof(struct packet_identify), timestamp_get_sec()) == 1) + { + return 1; + } + + return 0; +} + +void dupkt_filter_add(struct dupkt_filter *filter, const struct packet *packet) +{ + if (filter->enable == 0) + { + return; + } + + struct packet_identify identify; + if (packet_get_identify(packet, &identify) == -1) + { + return; + } + + expiry_dablooms_add(filter->handle, (const char *)&identify, sizeof(struct packet_identify), timestamp_get_sec()); +} diff --git a/src/dupkt/dupkt_filter.h b/src/dupkt/dupkt_filter.h new file mode 100644 index 0000000..d690297 --- /dev/null +++ b/src/dupkt/dupkt_filter.h @@ -0,0 +1,23 @@ +#ifndef _DUPKT_FILTER_H +#define _DUPKT_FILTER_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include "packet.h" + +struct dupkt_filter *dupkt_filter_create(uint8_t enable, unsigned int capacity, double error_rate, int timeout_s); +void dupkt_filter_destroy(struct dupkt_filter *filter); + +// return 1: found +// reutrn 0: no found +int dupkt_filter_search(struct dupkt_filter *filter, const struct packet *packet); +void dupkt_filter_add(struct dupkt_filter *filter, const struct packet *packet); + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/src/dupkt/test/CMakeLists.txt b/src/dupkt/test/CMakeLists.txt new file mode 100644 index 0000000..62c4ae8 --- /dev/null +++ b/src/dupkt/test/CMakeLists.txt @@ -0,0 +1,9 @@ +############################################################################### +# gtest +############################################################################### + +add_executable(gtest_dupkt_filter gtest_dupkt_filter.cpp) +target_link_libraries(gtest_dupkt_filter dupkt_filter gtest) + +include(GoogleTest) +gtest_discover_tests(gtest_dupkt_filter) \ No newline at end of file diff --git a/src/dupkt/test/gtest_dupkt_filter.cpp b/src/dupkt/test/gtest_dupkt_filter.cpp new file mode 100644 index 0000000..2182109 --- /dev/null +++ b/src/dupkt/test/gtest_dupkt_filter.cpp @@ -0,0 +1,117 @@ +#include + +#include "dupkt_filter.h" +#include "timestamp.h" + +/****************************************************************************** + * [Protocols in frame: eth:ethertype:ip:ipv6:tcp] + ****************************************************************************** + * + * Frame 1: 106 bytes on wire (848 bits), 106 bytes captured (848 bits) + * Ethernet II, Src: JuniperN_45:88:29 (2c:6b:f5:45:88:29), Dst: JuniperN_2a:a2:00 (5c:5e:ab:2a:a2:00) + * Destination: JuniperN_2a:a2:00 (5c:5e:ab:2a:a2:00) + * Source: JuniperN_45:88:29 (2c:6b:f5:45:88:29) + * Type: IPv4 (0x0800) + * Internet Protocol Version 4, Src: 210.77.88.163, Dst: 59.66.4.50 + * 0100 .... = Version: 4 + * .... 0101 = Header Length: 20 bytes (5) + * Differentiated Services Field: 0x00 (DSCP: CS0, ECN: Not-ECT) + * Total Length: 92 + * Identification: 0x0b4d (2893) + * 000. .... = Flags: 0x0 + * ...0 0000 0000 0000 = Fragment Offset: 0 + * Time to Live: 59 + * Protocol: IPv6 (41) + * Header Checksum: 0x09c8 [validation disabled] + * [Header checksum status: Unverified] + * Source Address: 210.77.88.163 + * Destination Address: 59.66.4.50 + * Internet Protocol Version 6, Src: 2001:da8:200:900e:200:5efe:d24d:58a3, Dst: 2600:140e:6::1702:1058 + * 0110 .... = Version: 6 + * .... 0000 0000 .... .... .... .... .... = Traffic Class: 0x00 (DSCP: CS0, ECN: Not-ECT) + * .... 0000 0000 0000 0000 0000 = Flow Label: 0x00000 + * Payload Length: 32 + * Next Header: TCP (6) + * Hop Limit: 64 + * Source Address: 2001:da8:200:900e:200:5efe:d24d:58a3 + * Destination Address: 2600:140e:6::1702:1058 + * [Source ISATAP IPv4: 210.77.88.163] + * Transmission Control Protocol, Src Port: 52556, Dst Port: 80, Seq: 0, Len: 0 + * Source Port: 52556 + * Destination Port: 80 + * [Stream index: 0] + * [Conversation completeness: Complete, WITH_DATA (31)] + * [TCP Segment Len: 0] + * Sequence Number: 0 (relative sequence number) + * Sequence Number (raw): 2172673142 + * [Next Sequence Number: 1 (relative sequence number)] + * Acknowledgment Number: 0 + * Acknowledgment number (raw): 0 + * 1000 .... = Header Length: 32 bytes (8) + * Flags: 0x002 (SYN) + * Window: 8192 + * [Calculated window size: 8192] + * Checksum: 0xf757 [unverified] + * [Checksum Status: Unverified] + * Urgent Pointer: 0 + * Options: (12 bytes), Maximum segment size, No-Operation (NOP), Window scale, No-Operation (NOP), No-Operation (NOP), SACK permitted + * [Timestamps] + */ + +unsigned char data[] = { + 0x5c, 0x5e, 0xab, 0x2a, 0xa2, 0x00, 0x2c, 0x6b, 0xf5, 0x45, 0x88, 0x29, 0x08, 0x00, 0x45, 0x00, 0x00, 0x5c, 0x0b, 0x4d, 0x00, 0x00, 0x3b, 0x29, 0x09, 0xc8, + 0xd2, 0x4d, 0x58, 0xa3, 0x3b, 0x42, 0x04, 0x32, 0x60, 0x00, 0x00, 0x00, 0x00, 0x20, 0x06, 0x40, 0x20, 0x01, 0x0d, 0xa8, 0x02, 0x00, 0x90, 0x0e, 0x02, 0x00, + 0x5e, 0xfe, 0xd2, 0x4d, 0x58, 0xa3, 0x26, 0x00, 0x14, 0x0e, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x02, 0x10, 0x58, 0xcd, 0x4c, 0x00, 0x50, + 0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01, + 0x04, 0x02}; + +struct config +{ + uint8_t enable; + unsigned int capacity; + double error_rate; + int timeout_s; +} config = { + .enable = 1, + .capacity = 1000000, + .error_rate = 0.00001, + .timeout_s = 10, +}; + +TEST(DUPKT_FILTER, TEST) +{ + timestamp_update(); + + struct packet pkt; + packet_parse(&pkt, (const char *)data, sizeof(data)); + + struct dupkt_filter *filter = dupkt_filter_create(config.enable, config.capacity, config.error_rate, config.timeout_s); + EXPECT_TRUE(filter != nullptr); + + EXPECT_TRUE(dupkt_filter_search(filter, &pkt) == 0); // no found + dupkt_filter_add(filter, &pkt); // add + + for (int i = 0; i < 12; i++) + { + timestamp_update(); + + if (i < config.timeout_s) + { + EXPECT_TRUE(dupkt_filter_search(filter, &pkt) == 1); // found + } + else + { + EXPECT_TRUE(dupkt_filter_search(filter, &pkt) == 0); // no found + } + sleep(1); + printf("sleep[%02d] 1s\n", i); + } + + dupkt_filter_destroy(filter); +} + +int main(int argc, char **argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt index d826933..9e86b39 100644 --- a/src/stellar/CMakeLists.txt +++ b/src/stellar/CMakeLists.txt @@ -2,7 +2,7 @@ add_executable(stellar stellar.cpp) target_include_directories(stellar PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) target_include_directories(stellar PUBLIC ${CMAKE_SOURCE_DIR}/src/session) -target_include_directories(stellar PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp) -target_link_libraries(stellar session_manager pthread) +target_include_directories(stellar PUBLIC ${CMAKE_SOURCE_DIR}/src/dupkt) +target_link_libraries(stellar session_manager dupkt_filter pthread) install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program) \ No newline at end of file diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp index 865ac7d..6fa6bc9 100644 --- a/src/stellar/stellar.cpp +++ b/src/stellar/stellar.cpp @@ -10,6 +10,7 @@ #include "packet.h" #include "timestamp.h" #include "session_manager.h" +#include "dupkt_filter.h" #ifndef STELLAR_LOG_ERROR #define STELLAR_LOG_ERROR(format, ...) \ @@ -30,23 +31,41 @@ struct thread_ctx uint64_t need_exit; uint64_t is_runing; struct session_manager *sess_mgr; + struct dupkt_filter *dupkt_filter; }; struct stellar_ctx { uint64_t need_exit; uint16_t max_worker_num; + + // session manager uint64_t sess_mgr_max_session_num; uint64_t sess_mgr_timeout_ms_toclsoing; uint64_t sess_mgr_timeout_ms_toclosed; + + // duplicated packet filter + uint8_t dupkt_filter_enable; + unsigned int dupkt_filter_capacity; + double dupkt_filter_error_rate; + int dupkt_filter_timeout_s; + + // thread struct thread_ctx thread_ctx[128]; } g_stellar_ctx = { .need_exit = 0, .max_worker_num = 1, + + // session manager .sess_mgr_max_session_num = 1000000, .sess_mgr_timeout_ms_toclsoing = 1000, .sess_mgr_timeout_ms_toclosed = 10000, -}; + + // duplicated packet filter + .dupkt_filter_enable = 1, + .dupkt_filter_capacity = 1000000, + .dupkt_filter_error_rate = 0.0001, + .dupkt_filter_timeout_s = 10}; // TODO static int recv_packet(const char **data) @@ -98,11 +117,15 @@ static void thread_ctx_init(struct stellar_ctx *ctx) thd_ctx->index = i; thd_ctx->need_exit = 0; thd_ctx->is_runing = 0; + // session manager thd_ctx->sess_mgr = session_manager_create(ctx->sess_mgr_max_session_num); assert(thd_ctx->sess_mgr != NULL); session_manager_set_session_eventcb(thd_ctx->sess_mgr, plugin_dispatch, thd_ctx); session_manager_set_timeout_toclosing(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclsoing); session_manager_set_timeout_toclosed(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclosed); + // duplicated packet filter + thd_ctx->dupkt_filter = dupkt_filter_create(ctx->dupkt_filter_enable, ctx->dupkt_filter_capacity, ctx->dupkt_filter_error_rate, ctx->dupkt_filter_timeout_s); + assert(thd_ctx->dupkt_filter != NULL); } } @@ -126,6 +149,7 @@ static void *thread_cycle(void *arg) struct session *sess = NULL; struct thread_ctx *thd_ctx = (struct thread_ctx *)arg; struct session_manager *sess_mgr = thd_ctx->sess_mgr; + struct dupkt_filter *dupkt_filter = thd_ctx->dupkt_filter; char thread_name[16]; ATOMIC_SET(&thd_ctx->is_runing, 1); @@ -141,7 +165,12 @@ static void *thread_cycle(void *arg) // parse packet packet_parse(&pkt, data, len); - // TODO duplicated packet filter + // duplicated packet filter + if (dupkt_filter_search(dupkt_filter, &pkt) == 0) + { + STELLAR_LOG_DEBUG("duplicated packet, forward it"); + goto fast_forward; + } // update session sess = session_manager_update(sess_mgr, &pkt); @@ -149,7 +178,11 @@ static void *thread_cycle(void *arg) { goto fast_forward; } + + // TODO session synchronization + session_manager_dispatch(sess_mgr, sess); + dupkt_filter_add(dupkt_filter, &pkt); fast_forward: // TODO send packet