diff --git a/src/dupkt_filter/CMakeLists.txt b/src/dupkt_filter/CMakeLists.txt index 70b1c4b..2a5586c 100644 --- a/src/dupkt_filter/CMakeLists.txt +++ b/src/dupkt_filter/CMakeLists.txt @@ -3,10 +3,7 @@ ############################################################################### add_library(dupkt_filter dupkt_filter.cpp) -target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/dupkt_filter) -target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp) -target_include_directories(dupkt_filter PUBLIC ${CMAKE_SOURCE_DIR}/deps/dablooms) +target_include_directories(dupkt_filter PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_link_libraries(dupkt_filter packet timestamp dablooms) add_subdirectory(test) \ No newline at end of file diff --git a/src/eviction_filter/CMakeLists.txt b/src/eviction_filter/CMakeLists.txt index 91c44b9..dad9e2d 100644 --- a/src/eviction_filter/CMakeLists.txt +++ b/src/eviction_filter/CMakeLists.txt @@ -3,10 +3,7 @@ ############################################################################### add_library(eviction_filter eviction_filter.cpp) -target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/eviction_filter) -target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp) -target_include_directories(eviction_filter PUBLIC ${CMAKE_SOURCE_DIR}/deps/dablooms) +target_include_directories(eviction_filter PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_link_libraries(eviction_filter packet timestamp dablooms) add_subdirectory(test) \ No newline at end of file diff --git a/src/packet/CMakeLists.txt b/src/packet/CMakeLists.txt index 754b177..0157926 100644 --- a/src/packet/CMakeLists.txt +++ b/src/packet/CMakeLists.txt @@ -3,8 +3,7 @@ ############################################################################### add_library(packet packet.cpp) -target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/src/tuple) +target_include_directories(packet PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash) target_link_libraries(packet tuple) diff --git a/src/packet/test/CMakeLists.txt b/src/packet/test/CMakeLists.txt index dd9a0fe..43de4df 100644 --- a/src/packet/test/CMakeLists.txt +++ b/src/packet/test/CMakeLists.txt @@ -6,20 +6,16 @@ add_executable(gtest_packet gtest_packet.cpp) target_link_libraries(gtest_packet packet gtest) add_executable(gtest_udp_helpers gtest_udp_helpers.cpp) -target_include_directories(gtest_udp_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_link_libraries(gtest_udp_helpers gtest) +target_link_libraries(gtest_udp_helpers packet gtest) add_executable(gtest_tcp_helpers gtest_tcp_helpers.cpp) -target_include_directories(gtest_tcp_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_link_libraries(gtest_tcp_helpers gtest) +target_link_libraries(gtest_tcp_helpers packet gtest) add_executable(gtest_ipv4_helpers gtest_ipv4_helpers.cpp) -target_include_directories(gtest_ipv4_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_link_libraries(gtest_ipv4_helpers gtest) +target_link_libraries(gtest_ipv4_helpers packet gtest) add_executable(gtest_ipv6_helpers gtest_ipv6_helpers.cpp) -target_include_directories(gtest_ipv6_helpers PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_link_libraries(gtest_ipv6_helpers gtest) +target_link_libraries(gtest_ipv6_helpers packet gtest) add_executable(gtest_packet_helpers gtest_packet_helpers.cpp) target_link_libraries(gtest_packet_helpers packet gtest) diff --git a/src/session/CMakeLists.txt b/src/session/CMakeLists.txt index eea3647..fb475eb 100644 --- a/src/session/CMakeLists.txt +++ b/src/session/CMakeLists.txt @@ -10,12 +10,7 @@ add_library(session_manager session_queue.cpp session_manager.cpp ) -target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash) -target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/timeout) -target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/packet) -target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/session) -target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp) -target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/tuple) -target_link_libraries(session_manager timeout timestamp tuple packet) +target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_link_libraries(session_manager timeout dupkt_filter eviction_filter) add_subdirectory(test) \ No newline at end of file diff --git a/src/session/session.cpp b/src/session/session.cpp index cdea80b..4117b4c 100644 --- a/src/session/session.cpp +++ b/src/session/session.cpp @@ -46,7 +46,7 @@ uint64_t session_get_id(const struct session *sess) } // session tuple6 -void session_set_tuple6(struct session *sess, struct tuple6 *tuple) +void session_set_tuple6(struct session *sess, const struct tuple6 *tuple) { memcpy(&sess->tuple, tuple, sizeof(struct tuple6)); } @@ -88,6 +88,17 @@ enum session_type session_get_type(const struct session *sess) return sess->type; } +// session dup traffic flag +void session_set_dup_traffic_flag(struct session *sess, enum dup_traffic_flag flag) +{ + sess->dup_flag = flag; +} + +enum dup_traffic_flag session_get_dup_traffic_flag(const struct session *sess) +{ + return sess->dup_flag; +} + // closing reasion void session_set_closing_reasion(struct session *sess, enum closing_reasion reasion) { @@ -349,14 +360,14 @@ static void tcp_ex_data_tostring(uint64_t ex_data, char *buffer, size_t buffer_l nused += snprintf(buffer + nused, buffer_len - nused, "TCP_SYNACK_RECVED "); } - if (ex_data & TCP_C2S_PAYLOAD_RECVED) + if (ex_data & TCP_C2S_DATA_RECVED) { - nused += snprintf(buffer + nused, buffer_len - nused, "TCP_C2S_PAYLOAD_RECVED "); + nused += snprintf(buffer + nused, buffer_len - nused, "TCP_C2S_DATA_RECVED "); } - if (ex_data & TCP_S2C_PAYLOAD_RECVED) + if (ex_data & TCP_S2C_DATA_RECVED) { - nused += snprintf(buffer + nused, buffer_len - nused, "TCP_S2C_PAYLOAD_RECVED "); + nused += snprintf(buffer + nused, buffer_len - nused, "TCP_S2C_DATA_RECVED "); } if (ex_data & TCP_C2S_FIN_RECVED) @@ -404,17 +415,17 @@ const char *session_closing_reasion_tostring(enum closing_reasion reasion) switch (reasion) { case CLOSING_BY_TIMEOUT: - return "CLOSING_BY_TIMEOUT"; + return "closing_by_timeout"; case CLOSING_BY_EVICTED: - return "CLOSING_BY_EVICTED"; + return "closing_by_evicted"; case CLOSING_BY_CLIENT_FIN: - return "CLOSING_BY_CLIENT_FIN"; + return "closing_by_client_fin"; case CLOSING_BY_CLIENT_RST: - return "CLOSING_BY_CLIENT_RST"; + return "closing_by_client_rst"; case CLOSING_BY_SERVER_FIN: - return "CLOSING_BY_SERVER_FIN"; + return "closing_by_server_fin"; case CLOSING_BY_SERVER_RST: - return "CLOSING_BY_SERVER_RST"; + return "closing_by_server_rst"; default: return "unknown"; } @@ -424,8 +435,6 @@ const char *session_state_tostring(enum session_state state) { switch (state) { - case SESSION_STATE_INIT: - return "init"; case SESSION_STATE_OPENING: return "opening"; case SESSION_STATE_ACTIVE: @@ -456,8 +465,6 @@ const char *session_dir_tostring(enum session_dir dir) { switch (dir) { - case SESSION_DIR_NONE: - return "none"; case SESSION_DIR_C2S: return "c2s"; case SESSION_DIR_S2C: diff --git a/src/session/session.h b/src/session/session.h index 309c9ce..74bbd93 100644 --- a/src/session/session.h +++ b/src/session/session.h @@ -13,11 +13,10 @@ extern "C" enum session_state { - SESSION_STATE_INIT = 0, - SESSION_STATE_OPENING, - SESSION_STATE_ACTIVE, - SESSION_STATE_CLOSING, - SESSION_STATE_CLOSED, + SESSION_STATE_OPENING = 0x1, + SESSION_STATE_ACTIVE = 0x2, + SESSION_STATE_CLOSING = 0x3, + SESSION_STATE_CLOSED = 0x4, }; enum session_type @@ -28,19 +27,25 @@ enum session_type enum session_dir { - SESSION_DIR_NONE = 0, - SESSION_DIR_C2S = 1, - SESSION_DIR_S2C = 2, + SESSION_DIR_NONE = 0x0, + SESSION_DIR_C2S = 0x1, + SESSION_DIR_S2C = 0x2, +}; + +enum dup_traffic_flag +{ + DUP_TRAFFIC_YES = 0x1, + DUP_TRAFFIC_NO = 0x2, }; enum closing_reasion { - CLOSING_BY_TIMEOUT = 1, - CLOSING_BY_EVICTED = 2, - CLOSING_BY_CLIENT_FIN = 3, - CLOSING_BY_CLIENT_RST = 4, - CLOSING_BY_SERVER_FIN = 5, - CLOSING_BY_SERVER_RST = 6, + CLOSING_BY_TIMEOUT = 0x1, + CLOSING_BY_EVICTED = 0x2, + CLOSING_BY_CLIENT_FIN = 0x3, + CLOSING_BY_CLIENT_RST = 0x4, + CLOSING_BY_SERVER_FIN = 0x5, + CLOSING_BY_SERVER_RST = 0x6, }; struct session; @@ -56,7 +61,7 @@ void session_set_id(struct session *sess, uint64_t id); uint64_t session_get_id(const struct session *sess); // session key -void session_set_tuple6(struct session *sess, struct tuple6 *tuple); +void session_set_tuple6(struct session *sess, const struct tuple6 *tuple); const struct tuple6 *session_get0_tuple6(const struct session *sess); void session_set_tuple6_dir(struct session *sess, enum session_dir dir); enum session_dir session_get_tuple6_dir(const struct session *sess); @@ -69,6 +74,10 @@ enum session_state session_get_state(const struct session *sess); void session_set_type(struct session *sess, enum session_type type); enum session_type session_get_type(const struct session *sess); +// session dup traffic flag +void session_set_dup_traffic_flag(struct session *sess, enum dup_traffic_flag flag); +enum dup_traffic_flag session_get_dup_traffic_flag(const struct session *sess); + // closing reasion void session_set_closing_reasion(struct session *sess, enum closing_reasion reasion); enum closing_reasion session_get_closing_reasion(const struct session *sess); diff --git a/src/session/session_manager.cpp b/src/session/session_manager.cpp index c30a8bd..d1d5234 100644 --- a/src/session/session_manager.cpp +++ b/src/session/session_manager.cpp @@ -2,56 +2,225 @@ #include #include "timestamp.h" -#include "session_manager.h" +#include "session_private.h" #include "session_pool.h" #include "session_table.h" #include "session_timer.h" #include "session_queue.h" -#include "session_private.h" -#include "packet_helpers.h" +#include "session_manager.h" #include "tcp_helpers.h" #include "udp_helpers.h" +#include "packet_helpers.h" +#include "dupkt_filter.h" +#include "eviction_filter.h" struct session_manager { struct session_pool *sess_pool; - struct session_table *sess_table; + struct session_table *tcp_sess_table; + struct session_table *udp_sess_table; struct session_timer *sess_timer; - struct session_queue *evicted_sess; + struct session_queue *sess_evicted_queue; + struct session_queue *sess_toclosed_queue; - // timeout config - uint64_t timeout_toclosing; - uint64_t timeout_toclosed; + struct dupkt_filter *tcp_dupkt_filter; + struct eviction_filter *udp_eviction_filter; + + struct session_manager_config config; + + /*************************************************************** + * session manager status + ***************************************************************/ // session number + uint64_t tcp_sess_num; uint64_t tcp_opening_sess_num; - uint64_t tcp_closing_sess_num; uint64_t tcp_active_sess_num; + uint64_t tcp_closing_sess_num; + uint64_t udp_sess_num; uint64_t udp_opening_sess_num; - uint64_t udp_closing_sess_num; uint64_t udp_active_sess_num; + uint64_t udp_closing_sess_num; + + // packet filter status + uint64_t npkts_miss_l4_proto; // fast forward + + uint64_t npkts_hit_tcp_evicted; // fast forward (miss session) + uint64_t npkts_hit_tcp_dupkt; // fast forward + uint64_t npkts_hit_tcp_discard; // drop + uint64_t npkts_hit_tcp_closing; // fast forward + + uint64_t npkts_hit_udp_evicted; // fast forward }; /****************************************************************************** * utils ******************************************************************************/ +// OK +// return 0: success +// return -1: invalid config +static int session_manager_check_config(struct session_manager_config *config) +{ + if (config == NULL) + { + SESSION_LOG_ERROR("invalid config"); + return -1; + } + + // max session number + if (config->max_tcp_session_num < 2) + { + SESSION_LOG_ERROR("invalid max tcp session number"); + return -1; + } + if (config->max_udp_session_num < 2) + { + SESSION_LOG_ERROR("invalid max udp session number"); + return -1; + } + + // TCP timeout config + if (config->tcp_timeout_init < 1 || config->tcp_timeout_init > 60) + { + SESSION_LOG_ERROR("invalid tcp timeout init, support range: 1-60"); + return -1; + } + if (config->tcp_timeout_handshake < 1 || config->tcp_timeout_handshake > 60) + { + SESSION_LOG_ERROR("invalid tcp timeout handshake, support range: 1-60"); + return -1; + } + if (config->tcp_timeout_data < 1 || config->tcp_timeout_data > 15999999) + { + SESSION_LOG_ERROR("invalid tcp timeout data, support range: 1-15,999,999"); + return -1; + } + if (config->tcp_timeout_half_closed < 1 || config->tcp_timeout_half_closed > 604800) + { + SESSION_LOG_ERROR("invalid tcp timeout half closed, support range: 1-604,800"); + return -1; + } + if (config->tcp_timeout_discard < 1 || config->tcp_timeout_discard > 15999999) + { + SESSION_LOG_ERROR("invalid tcp timeout discard, support range: 1-15,999,999"); + return -1; + } + + // UDP timeout config + if (config->udp_timeout_data < 1 || config->udp_timeout_data > 15999999) + { + SESSION_LOG_ERROR("invalid udp timeout data, support range: 1-15,999,999"); + return -1; + } + + // TCP duplicate packet filter config + if (config->tcp_dupkt_filter_enable != 0 && config->tcp_dupkt_filter_enable != 1) + { + SESSION_LOG_ERROR("invalid tcp dupkt filter enable, support range: 0-1"); + return -1; + } + if (config->tcp_dupkt_filter_enable) + { + if (config->tcp_dupkt_filter_capacity == 0) + { + SESSION_LOG_ERROR("invalid tcp dupkt filter capacity"); + return -1; + } + if (config->tcp_dupkt_filter_timeout < 1 || config->tcp_dupkt_filter_timeout > 60) + { + SESSION_LOG_ERROR("invalid tcp dupkt filter timeout, support range: 1-60"); + return -1; + } + if (config->tcp_dupkt_filter_error_rate < 0 || config->tcp_dupkt_filter_error_rate > 1) + { + SESSION_LOG_ERROR("invalid tcp dupkt filter error rate, support range: 0-1"); + return -1; + } + } + + // UDP eviction filter config + if (config->udp_eviction_filter_enable != 0 && config->udp_eviction_filter_enable != 1) + { + SESSION_LOG_ERROR("invalid udp eviction filter enable, support range: 0-1"); + return -1; + } + if (config->udp_eviction_filter_enable) + { + if (config->udp_eviction_filter_capacity == 0) + { + SESSION_LOG_ERROR("invalid udp eviction filter capacity"); + return -1; + } + if (config->udp_eviction_filter_timeout < 1 || config->udp_eviction_filter_timeout > 60) + { + SESSION_LOG_ERROR("invalid udp eviction filter timeout, support range: 1-60"); + return -1; + } + if (config->udp_eviction_filter_error_rate < 0 || config->udp_eviction_filter_error_rate > 1) + { + SESSION_LOG_ERROR("invalid udp eviction filter error rate, support range: 0-1"); + return -1; + } + } + + return 0; +} + // TODO -static uint64_t alloc_session_id(void) +static uint64_t session_manager_alloc_session_id(void) { return 0; } -static void metadata_ex_free_cb(struct session *sess, uint8_t idx, void *ex_ptr, void *arg) +// OK +// return 1: duplicate packet +// return 0: not duplicate packet +static int session_manager_filter_session_dupkt(struct session_manager *mgr, struct session *sess, const struct packet *pkt, enum session_dir curr_dir) { - if (ex_ptr) + if (curr_dir == SESSION_DIR_C2S) { - metadata_free((struct metadata *)ex_ptr); + if (session_get_c2s_packets(sess) < 3) + { + goto dupkt_fitler; + } + } + else if (curr_dir == SESSION_DIR_S2C) + { + if (session_get_s2c_packets(sess) < 3) + { + goto dupkt_fitler; + } + } + + if (session_get_dup_traffic_flag(sess) == DUP_TRAFFIC_YES) + { + goto dupkt_fitler; + } + else + { + return 0; + } + +dupkt_fitler: + if (dupkt_filter_lookup(mgr->tcp_dupkt_filter, pkt)) + { + return 1; + } + else + { + dupkt_filter_add(mgr->tcp_dupkt_filter, pkt); + return 0; } } -static void packet_ex_free_cb(struct session *sess, uint8_t idx, void *ex_ptr, void *arg) +/****************************************************************************** + * built-in ex data + ******************************************************************************/ + +// OK +static void session_free_packet_exdata(struct session *sess, uint8_t idx, void *ex_ptr, void *arg) { if (ex_ptr) { @@ -59,109 +228,143 @@ static void packet_ex_free_cb(struct session *sess, uint8_t idx, void *ex_ptr, v } } -/****************************************************************************** - * session manager counter - ******************************************************************************/ - -static void update_counter_on_opening(struct session_manager *mgr, struct session *sess) +// OK +static void session_free_pktmeta_exdata(struct session *sess, uint8_t idx, void *ex_ptr, void *arg) { - if (session_get_state(sess) == SESSION_STATE_INIT) + if (ex_ptr) { - if (session_get_type(sess) == SESSION_TYPE_TCP) + metadata_free((struct metadata *)ex_ptr); + } +} + +// OK +static void session_update_packet_exdata(struct session *sess, const struct packet *pkt, enum session_dir curr_dir) +{ + if (curr_dir == SESSION_DIR_C2S) + { + if (session_get0_ex_data(sess, c2s_1st_pkt_ex) == NULL) { - mgr->tcp_opening_sess_num++; + session_set_ex_data(sess, c2s_1st_pkt_ex, packet_dup(pkt)); } - else + } + else if (curr_dir == SESSION_DIR_S2C) + { + if (session_get0_ex_data(sess, s2c_1st_pkt_ex) == NULL) { - mgr->udp_opening_sess_num++; + session_set_ex_data(sess, s2c_1st_pkt_ex, packet_dup(pkt)); } } } -static void update_counter_on_active(struct session_manager *mgr, struct session *sess) +// OK +static void session_update_pktmeta_exdata(struct session *sess, const struct metadata *md, enum session_dir curr_dir) { - if (session_get_state(sess) == SESSION_STATE_OPENING) + if (curr_dir == SESSION_DIR_C2S) { - if (session_get_type(sess) == SESSION_TYPE_TCP) + if (session_get0_ex_data(sess, c2s_1st_md_ex) == NULL) { - mgr->tcp_opening_sess_num--; - mgr->tcp_active_sess_num++; + session_set_ex_data(sess, c2s_1st_md_ex, metadata_dup(md)); } - else + } + else if (curr_dir == SESSION_DIR_S2C) + { + if (session_get0_ex_data(sess, s2c_1st_md_ex) == NULL) { - mgr->udp_opening_sess_num--; - mgr->udp_active_sess_num++; + session_set_ex_data(sess, s2c_1st_md_ex, metadata_dup(md)); } } } -static void update_counter_on_closing(struct session_manager *mgr, struct session *sess) +// OK +static void session_update_tcp_exdata(struct session *sess, const struct layer_record *tcp_layer, enum session_dir curr_dir) { - if (session_get_state(sess) == SESSION_STATE_OPENING) + const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; + uint64_t state = (uint64_t)session_get0_ex_data(sess, tcp_builtin_ex); + if (tcp_hdr_has_flag_syn(hdr)) { - if (session_get_type(sess) == SESSION_TYPE_TCP) + state |= (tcp_hdr_has_flag_ack(hdr) ? TCP_SYNACK_RECVED : TCP_SYN_RECVED); + } + if (tcp_hdr_has_flag_fin(hdr)) + { + if (curr_dir == SESSION_DIR_C2S) { - mgr->tcp_opening_sess_num--; - mgr->tcp_closing_sess_num++; + state |= TCP_C2S_FIN_RECVED; } - else + else if (curr_dir == SESSION_DIR_S2C) { - mgr->udp_opening_sess_num--; - mgr->udp_closing_sess_num++; + state |= TCP_S2C_FIN_RECVED; + } + } + if (tcp_hdr_has_flag_rst(hdr)) + { + if (curr_dir == SESSION_DIR_C2S) + { + state |= TCP_C2S_RST_RECVED; + } + else if (curr_dir == SESSION_DIR_S2C) + { + state |= TCP_S2C_RST_RECVED; + } + } + if (tcp_layer->pld_len > 0) + { + if (curr_dir == SESSION_DIR_C2S) + { + state |= TCP_C2S_DATA_RECVED; + } + else if (curr_dir == SESSION_DIR_S2C) + { + state |= TCP_S2C_DATA_RECVED; } - - return; } - if (session_get_state(sess) == SESSION_STATE_ACTIVE) - { - if (session_get_type(sess) == SESSION_TYPE_TCP) - { - mgr->tcp_active_sess_num--; - mgr->tcp_closing_sess_num++; - } - else - { - mgr->udp_active_sess_num--; - mgr->udp_closing_sess_num++; - } - - return; - } + session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); } -static void update_counter_on_closed(struct session_manager *mgr, struct session *sess) +// OK +static void session_update_udp_exdata(struct session *sess, const struct layer_record *udp_layer, enum session_dir curr_dir) { - if (session_get_state(sess) == SESSION_STATE_CLOSING) + const struct udphdr *hdr = (const struct udphdr *)udp_layer->hdr_ptr; + uint64_t state = (uint64_t)session_get0_ex_data(sess, udp_builtin_ex); + if (curr_dir == SESSION_DIR_C2S) { - if (session_get_type(sess) == SESSION_TYPE_TCP) - { - mgr->tcp_closing_sess_num--; - } - else - { - mgr->udp_closing_sess_num--; - } + state |= UDP_C2S_RECVED; } + else if (curr_dir == SESSION_DIR_S2C) + { + state |= UDP_S2C_RECVED; + } + session_set_ex_data(sess, udp_builtin_ex, (void *)(state)); } /****************************************************************************** * judge session direction ******************************************************************************/ +// OK static enum session_dir judge_direction_by_tuple6(const struct tuple6 *key) { + uint16_t src_port = ntohs(key->src_port); + uint16_t dst_port = ntohs(key->dst_port); + // big port is client - if (ntohs(key->src_port) > ntohs(key->dst_port)) + if (src_port > dst_port) { return SESSION_DIR_C2S; } - else + + else if (src_port < dst_port) { return SESSION_DIR_S2C; } + else + { + // if port is equal, first packet is C2S + return SESSION_DIR_C2S; + } } +// OK static enum session_dir judge_direction_by_session(const struct session *sess, const struct tuple6 *key) { if (tuple6_cmp(session_get0_tuple6(sess), key) == 0) @@ -170,374 +373,530 @@ static enum session_dir judge_direction_by_session(const struct session *sess, c } else { - if (session_get_tuple6_dir(sess) == SESSION_DIR_C2S) - { - return SESSION_DIR_S2C; - } - else - { - return SESSION_DIR_C2S; - } + return (session_get_tuple6_dir(sess) == SESSION_DIR_C2S ? SESSION_DIR_S2C : SESSION_DIR_C2S); } } /****************************************************************************** - * update session timer + * timeout callback ******************************************************************************/ -void session_manager_update_session_timer(struct session_manager *mgr, struct session *sess, session_expire_cb cb, uint64_t timeout_ms) +// OK +static void tcp_init_timeout_cb(struct session *sess, void *arg) { - session_timer_del_session(mgr->sess_timer, sess); - session_set_expirecb(sess, cb, mgr, timestamp_get_msec() + timeout_ms); - session_timer_add_session(mgr->sess_timer, sess); -} - -/****************************************************************************** - * expire callback - ******************************************************************************/ - -static void session_to_closed(struct session *sess, void *arg) -{ - SESSION_LOG_DEBUG("session %lu closing expire, free session", session_get_id(sess)); + SESSION_LOG_DEBUG("run tcp_init_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); - update_counter_on_closed(mgr, sess); - session_set_state(sess, SESSION_STATE_CLOSED); - session_set0_cur_pkt(sess, NULL); - session_set_cur_dir(sess, SESSION_DIR_NONE); - for (uint8_t i = 0; i < EX_DATA_MAX_COUNT; i++) - { - session_free_ex_data(sess, i); - } - session_table_delete_session(mgr->sess_table, session_get0_tuple6(sess)); - session_timer_del_session(mgr->sess_timer, sess); - session_pool_free(mgr->sess_pool, sess); + session_manager_update_session_on_closing(mgr, sess, CLOSING_BY_TIMEOUT); } -static void session_to_closing(struct session *sess, void *arg) +// OK +static void tcp_handshake_timeout_cb(struct session *sess, void *arg) { - SESSION_LOG_DEBUG("session %lu packet expire, trigger closing event", session_get_id(sess)); + SESSION_LOG_DEBUG("run tcp_handshake_timeout_cb on session %lu", session_get_id(sess)); struct session_manager *mgr = (struct session_manager *)arg; assert(mgr != NULL); - update_counter_on_closing(mgr, sess); - session_set_state(sess, SESSION_STATE_CLOSING); - session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_toclosed); + session_manager_update_session_on_closing(mgr, sess, CLOSING_BY_TIMEOUT); +} + +// OK +static void tcp_data_timeout_cb(struct session *sess, void *arg) +{ + SESSION_LOG_DEBUG("run tcp_data_timeout_cb on session %lu", session_get_id(sess)); + struct session_manager *mgr = (struct session_manager *)arg; + assert(mgr != NULL); + + session_manager_update_session_on_closing(mgr, sess, CLOSING_BY_TIMEOUT); +} + +// OK +static void tcp_half_closed_timeout_cb(struct session *sess, void *arg) +{ + SESSION_LOG_DEBUG("run tcp_half_closed_timeout_cb on session %lu", session_get_id(sess)); + struct session_manager *mgr = (struct session_manager *)arg; + assert(mgr != NULL); + + session_manager_update_session_on_closing(mgr, sess, CLOSING_BY_TIMEOUT); +} + +// OK +static void udp_data_timeout_cb(struct session *sess, void *arg) +{ + SESSION_LOG_DEBUG("run udp_data_timeout_cb on session %lu", session_get_id(sess)); + struct session_manager *mgr = (struct session_manager *)arg; + assert(mgr != NULL); + + session_manager_update_session_on_closing(mgr, sess, CLOSING_BY_TIMEOUT); } /****************************************************************************** - * session ex data + * update session ******************************************************************************/ -static int tcp_need_closing(uint64_t state) +/* +on opening update session +[*] session_init +[*] session_set_id +[*] session_set_tuple6 +[*] session_set_tuple6_dir +[*] session_set_type +[*] session_set_create_time +[*] session_set_state + +on packet update session +[*] session_inc_c2s_metrics +[*] session_inc_s2c_metrics +[*] session_set_c2s_1st_pkt +[*] session_set_s2c_1st_pkt +[*] session_set_c2s_1st_pkt_md +[*] session_set_s2c_1st_pkt_md +[*] session_set0_cur_pkt +[*] session_set_cur_dir +[*] session_set_last_time + session_set_state + session_set_dup_traffic_flag + +on closing update session +[*] session_set_state +[*] session_set_closing_reasion +*/ + +// OK +static void session_manager_update_session_state(struct session_manager *mgr, struct session *sess, enum session_state state) { - if ((state & TCP_C2S_FIN_RECVED) && (state & TCP_S2C_FIN_RECVED)) + // session state not change + if (session_get_state(sess) == state) { - return 1; + return; } - if (state & TCP_C2S_RST_RECVED) + enum session_type type = session_get_type(sess); + if (type == SESSION_TYPE_TCP) { - return 1; + // handle old state + switch (session_get_state(sess)) + { + case SESSION_STATE_OPENING: + mgr->tcp_opening_sess_num--; + break; + case SESSION_STATE_ACTIVE: + mgr->tcp_active_sess_num--; + break; + case SESSION_STATE_CLOSING: + mgr->tcp_closing_sess_num--; + break; + case SESSION_STATE_CLOSED: + /* void */ + break; + default: + break; + } + + // handle new state + switch (state) + { + case SESSION_STATE_OPENING: + mgr->tcp_opening_sess_num++; + mgr->tcp_sess_num++; + break; + case SESSION_STATE_ACTIVE: + mgr->tcp_active_sess_num++; + break; + case SESSION_STATE_CLOSING: + mgr->tcp_closing_sess_num++; + break; + case SESSION_STATE_CLOSED: + mgr->tcp_sess_num--; + break; + default: + break; + } + } + else if (type == SESSION_TYPE_UDP) + { + // handle old state + switch (session_get_state(sess)) + { + case SESSION_STATE_OPENING: + mgr->udp_opening_sess_num--; + break; + case SESSION_STATE_ACTIVE: + mgr->udp_active_sess_num--; + break; + case SESSION_STATE_CLOSING: + mgr->udp_closing_sess_num--; + break; + case SESSION_STATE_CLOSED: + /* void */ + break; + default: + break; + } + + // handle new state + switch (state) + { + case SESSION_STATE_OPENING: + mgr->udp_opening_sess_num++; + mgr->udp_sess_num++; + break; + case SESSION_STATE_ACTIVE: + mgr->udp_active_sess_num++; + break; + case SESSION_STATE_CLOSING: + mgr->udp_closing_sess_num++; + break; + case SESSION_STATE_CLOSED: + mgr->udp_sess_num--; + break; + default: + break; + } } - if (state & TCP_S2C_RST_RECVED) - { - return 1; - } - - return 0; + session_set_state(sess, state); } -static int tcp_need_active(uint64_t state) +// OK +static inline void session_manager_update_session_on_opening(struct session_manager *mgr, struct session *sess, const struct tuple6 *key, enum session_dir curr_dir) { - if ((state & TCP_C2S_PAYLOAD_RECVED) || (state & TCP_S2C_PAYLOAD_RECVED)) + session_init(sess); + session_set_id(sess, session_manager_alloc_session_id()); + session_set_tuple6(sess, key); + session_set_tuple6_dir(sess, curr_dir); + if (key->ip_proto == IPPROTO_UDP) { - return 1; + session_set_type(sess, SESSION_TYPE_UDP); } - - return 0; + else if (key->ip_proto == IPPROTO_TCP) + { + session_set_type(sess, SESSION_TYPE_TCP); + } + session_set_create_time(sess, timestamp_get_msec()); + session_manager_update_session_state(mgr, sess, SESSION_STATE_OPENING); } -static void update_session_base(struct session *sess, const struct packet *pkt, enum session_dir curr_dir) +// OK +static inline void session_manager_update_session_on_packet(struct session_manager *mgr, struct session *sess, const struct packet *pkt, enum session_dir curr_dir) { uint64_t len = packet_get_raw_len(pkt); const struct metadata *md = packet_get0_metadata(pkt); if (curr_dir == SESSION_DIR_C2S) { session_inc_c2s_metrics(sess, 1, len); - if (session_get0_ex_data(sess, c2s_1st_md_ex) == NULL) - { - session_set_ex_data(sess, c2s_1st_md_ex, metadata_dup(md)); - } - if (session_get0_ex_data(sess, c2s_1st_pkt_ex) == NULL) - { - session_set_ex_data(sess, c2s_1st_pkt_ex, packet_dup(pkt)); - } } - else + else if (curr_dir == SESSION_DIR_S2C) { session_inc_s2c_metrics(sess, 1, len); - if (session_get0_ex_data(sess, s2c_1st_md_ex) == NULL) - { - session_set_ex_data(sess, s2c_1st_md_ex, metadata_dup(md)); - } - if (session_get0_ex_data(sess, s2c_1st_pkt_ex) == NULL) - { - session_set_ex_data(sess, s2c_1st_pkt_ex, packet_dup(pkt)); - } } - session_set_last_time(sess, timestamp_get_msec()); + session_update_packet_exdata(sess, pkt, curr_dir); + session_update_pktmeta_exdata(sess, md, curr_dir); session_set0_cur_pkt(sess, pkt); session_set_cur_dir(sess, curr_dir); + session_set_last_time(sess, timestamp_get_msec()); } -static void update_tcp_ex_data(struct session *sess, const struct packet *pkt, enum session_dir curr_dir) +// OK +static inline void session_manager_update_session_on_closing(struct session_manager *mgr, struct session *sess, enum closing_reasion reasion) { - const struct layer_record *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); - const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; - - uint64_t state = (uint64_t)session_get0_ex_data(sess, tcp_builtin_ex); - if (tcp_hdr_has_flag_rst(hdr)) - { - if (curr_dir == SESSION_DIR_C2S) - { - state |= TCP_C2S_RST_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - else - { - state |= TCP_S2C_RST_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - } - - if (tcp_hdr_has_flag_fin(hdr)) - { - if (curr_dir == SESSION_DIR_C2S) - { - state |= TCP_C2S_FIN_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - else - { - state |= TCP_S2C_FIN_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - } - - if (tcp_hdr_has_flag_syn(hdr)) - { - if (tcp_hdr_has_flag_ack(hdr)) - { - state |= TCP_SYNACK_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - else - { - state |= TCP_SYN_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - } - - if (tcp_layer->pld_len > 0) - { - if (curr_dir == SESSION_DIR_C2S) - { - state |= TCP_C2S_PAYLOAD_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - else - { - state |= TCP_S2C_PAYLOAD_RECVED; - session_set_ex_data(sess, tcp_builtin_ex, (void *)(state)); - } - } -} - -static void update_udp_ex_data(struct session *sess, const struct packet *pkt, enum session_dir curr_dir) -{ - uint64_t state = (uint64_t)session_get0_ex_data(sess, udp_builtin_ex); - if (curr_dir == SESSION_DIR_C2S) - { - session_set_ex_data(sess, udp_builtin_ex, (void *)(state | UDP_C2S_RECVED)); - } - else - { - session_set_ex_data(sess, udp_builtin_ex, (void *)(state | UDP_S2C_RECVED)); - } + session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSING); + session_set_closing_reasion(sess, reasion); + session_timer_del_session(mgr->sess_timer, sess); } /****************************************************************************** * handle session ******************************************************************************/ -// return 0: success -// return -1: tcp not syn packet, discard -static int handle_tcp_new_session(struct session_manager *mgr, struct tuple6 *key, struct session *sess, const struct packet *pkt) +// OK +static void session_manager_free_session(struct session_manager *mgr, struct session *sess) +{ + if (sess) + { + session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSED); + for (uint8_t i = 0; i < EX_DATA_MAX_COUNT; i++) + { + session_free_ex_data(sess, i); + } + session_table_del_session(mgr->tcp_sess_table, session_get0_tuple6(sess)); + session_timer_del_session(mgr->sess_timer, sess); + session_set0_cur_pkt(sess, NULL); + session_set_cur_dir(sess, SESSION_DIR_NONE); + session_pool_free(mgr->sess_pool, sess); + sess = NULL; + } +} + +// OK +static void session_manager_recycle_session(struct session_manager *mgr) +{ + while (1) + { + struct session *sess = session_queue_pop(mgr->sess_toclosed_queue); + if (sess == NULL) + { + break; + } + session_manager_free_session(mgr, sess); + } +} + +// OK +static void session_manager_evicte_session(struct session_manager *mgr, struct session *sess) +{ + session_manager_update_session_on_closing(mgr, sess, CLOSING_BY_EVICTED); + + session_table_del_session(mgr->tcp_sess_table, session_get0_tuple6(sess)); + session_queue_push(mgr->sess_evicted_queue, sess); + + if (session_get_type(sess) == SESSION_TYPE_UDP) + { + eviction_filter_add(mgr->udp_eviction_filter, session_get0_1st_pkt(sess)); + } +} + +// OK +static struct session *session_manager_handle_tcp_new_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key) { const struct layer_record *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); if (tcp_layer == NULL) { - // not tcp packet, discard - return -1; + mgr->npkts_miss_l4_proto++; + return NULL; } + const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; if (!tcp_hdr_has_flag_syn(hdr)) { - // not syn packet, discard - return -1; + mgr->npkts_hit_tcp_evicted++; + return NULL; } - enum session_dir curr_dir = SESSION_DIR_NONE; - - session_init(sess); - // syn packet - if (!tcp_hdr_has_flag_ack(hdr)) + if (mgr->tcp_sess_num == mgr->config.max_tcp_session_num - 1) { - curr_dir = SESSION_DIR_C2S; - session_set_ex_data(sess, tcp_builtin_ex, (void *)TCP_SYN_RECVED); - } - // syn ack packet - else - { - curr_dir = SESSION_DIR_S2C; - session_set_ex_data(sess, tcp_builtin_ex, (void *)TCP_SYNACK_RECVED); + struct session *evicted_sess = session_table_find_least_recently_unused_session(mgr->tcp_sess_table); + assert(evicted_sess); + session_manager_evicte_session(mgr, evicted_sess); } - session_set_id(sess, alloc_session_id()); - session_set_tuple6(sess, key); - session_set_tuple6_dir(sess, curr_dir); + struct session *sess = session_pool_alloc(mgr->sess_pool); + assert(sess); - session_set_type(sess, SESSION_TYPE_TCP); - update_counter_on_opening(mgr, sess); - session_set_state(sess, SESSION_STATE_OPENING); - - session_set_create_time(sess, timestamp_get_msec()); - update_session_base(sess, pkt, curr_dir); - - session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing); - - return 0; -} - -// always return 0 -static int handle_udp_new_session(struct session_manager *mgr, struct tuple6 *key, struct session *sess, const struct packet *pkt) -{ - enum session_dir curr_dir = judge_direction_by_tuple6(key); - - session_init(sess); - update_udp_ex_data(sess, pkt, curr_dir); - session_set_id(sess, alloc_session_id()); - session_set_tuple6(sess, key); - session_set_tuple6_dir(sess, curr_dir); - - /* - * when a UDP Session is created, the Opening and active events are triggered, - * (the plugin is called twice by the opening/active events in turn), - * and the state of the UDP session is directly switched to the active state. - */ - session_set_type(sess, SESSION_TYPE_UDP); - update_counter_on_opening(mgr, sess); - session_set_state(sess, SESSION_STATE_OPENING); - update_counter_on_active(mgr, sess); - session_set_state(sess, SESSION_STATE_ACTIVE); - - session_set_create_time(sess, timestamp_get_msec()); - update_session_base(sess, pkt, curr_dir); - - session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing); - - return 0; -} - -static void handle_tcp_old_session(struct session_manager *mgr, struct tuple6 *key, struct session *sess, const struct packet *pkt) -{ - enum session_dir curr_dir = judge_direction_by_session(sess, key); - update_tcp_ex_data(sess, pkt, curr_dir); - update_session_base(sess, pkt, curr_dir); - - uint64_t state = (uint64_t)session_get0_ex_data(sess, tcp_builtin_ex); - if (tcp_need_closing(state)) + enum session_dir curr_dir = tcp_hdr_has_flag_ack(hdr) ? SESSION_DIR_S2C : SESSION_DIR_C2S; + session_manager_update_session_on_opening(mgr, sess, key, curr_dir); + session_manager_update_session_on_packet(mgr, sess, pkt, curr_dir); + session_update_tcp_exdata(sess, tcp_layer, curr_dir); + if (tcp_hdr_has_flag_ack(hdr)) { - update_counter_on_closing(mgr, sess); - session_set_state(sess, SESSION_STATE_CLOSING); - session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_toclosed); - return; - } - - if (tcp_need_active(state)) - { - update_counter_on_active(mgr, sess); - session_set_state(sess, SESSION_STATE_ACTIVE); - session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing); - return; - } -} - -static void handle_udp_old_session(struct session_manager *mgr, struct tuple6 *key, struct session *sess, const struct packet *pkt) -{ - enum session_dir curr_dir = judge_direction_by_session(sess, key); - update_udp_ex_data(sess, pkt, curr_dir); - update_session_base(sess, pkt, curr_dir); - - update_counter_on_active(mgr, sess); - session_set_state(sess, SESSION_STATE_ACTIVE); - session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing); -} - -// return 0: success -// return -1: tcp not syn packet, discard -static int handle_new_session(struct session_manager *mgr, struct tuple6 *key, struct session *sess, const struct packet *pkt) -{ - if (key->ip_proto == IPPROTO_TCP) - { - return handle_tcp_new_session(mgr, key, sess, pkt); + session_set_expirecb(sess, tcp_handshake_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_handshake); } else { - return handle_udp_new_session(mgr, key, sess, pkt); + session_set_expirecb(sess, tcp_init_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_init); } + session_timer_add_session(mgr->sess_timer, sess); + dupkt_filter_add(mgr->tcp_dupkt_filter, pkt); + + return sess; } -static void handle_old_session(struct session_manager *mgr, struct tuple6 *key, struct session *sess, const struct packet *pkt) +// OK +static struct session *session_manager_handle_tcp_old_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key) { if (session_get_state(sess) == SESSION_STATE_CLOSING) { - return; + mgr->npkts_hit_tcp_closing++; + return NULL; } - if (key->ip_proto == IPPROTO_TCP) + const struct layer_record *tcp_layer = packet_get_innermost_layer(pkt, LAYER_TYPE_TCP); + if (tcp_layer == NULL) { - handle_tcp_old_session(mgr, key, sess, pkt); + mgr->npkts_miss_l4_proto++; + return NULL; } - else + + enum session_dir curr_dir = judge_direction_by_session(sess, key); + if (session_manager_filter_session_dupkt(mgr, sess, pkt, curr_dir)) { - handle_udp_old_session(mgr, key, sess, pkt); + mgr->npkts_hit_tcp_dupkt++; + session_set_dup_traffic_flag(sess, DUP_TRAFFIC_YES); + return NULL; } + + uint64_t tcp_old_exdata = (uint64_t)session_get0_ex_data(sess, tcp_builtin_ex); + session_manager_update_session_on_packet(mgr, sess, pkt, curr_dir); + session_update_tcp_exdata(sess, tcp_layer, curr_dir); + uint64_t tcp_curr_exdata = (uint64_t)session_get0_ex_data(sess, tcp_builtin_ex); + + // update tcp session state & timeout callback + + // TCP SYN retransmission + if (tcp_old_exdata == TCP_SYN_RECVED && tcp_curr_exdata == TCP_SYN_RECVED) + { + session_manager_update_session_state(mgr, sess, SESSION_STATE_OPENING); + session_set_expirecb(sess, tcp_init_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_init); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; + } + + // TCP SYNACK retransmission & TCP S2C asymmetric + if (tcp_old_exdata == TCP_SYNACK_RECVED && tcp_curr_exdata == TCP_SYNACK_RECVED) + { + session_manager_update_session_state(mgr, sess, SESSION_STATE_OPENING); + session_set_expirecb(sess, tcp_handshake_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_handshake); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; + } + + // TCP handshake success + if (!(tcp_old_exdata & TCP_SYNACK_RECVED) && (tcp_curr_exdata & TCP_SYNACK_RECVED)) + { + session_manager_update_session_state(mgr, sess, SESSION_STATE_OPENING); + session_set_expirecb(sess, tcp_handshake_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_handshake); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; + } + + // TCP established + if ((tcp_curr_exdata & TCP_C2S_DATA_RECVED) || (tcp_curr_exdata & TCP_S2C_DATA_RECVED)) + { + session_manager_update_session_state(mgr, sess, SESSION_STATE_ACTIVE); + session_set_expirecb(sess, tcp_data_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_data); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; + } + + // TCP closing + if ((tcp_curr_exdata & TCP_C2S_RST_RECVED) || (tcp_curr_exdata & TCP_S2C_RST_RECVED) || ((tcp_curr_exdata & TCP_C2S_FIN_RECVED) && (tcp_curr_exdata & TCP_S2C_FIN_RECVED))) + { + if (tcp_curr_exdata & TCP_C2S_RST_RECVED) + { + session_set_closing_reasion(sess, CLOSING_BY_CLIENT_RST); + } + else if (tcp_curr_exdata & TCP_S2C_RST_RECVED) + { + session_set_closing_reasion(sess, CLOSING_BY_SERVER_RST); + } + session_manager_update_session_state(mgr, sess, SESSION_STATE_CLOSING); + + return sess; + } + + // TCP half closed + if ((tcp_curr_exdata & TCP_C2S_FIN_RECVED) || (tcp_curr_exdata & TCP_S2C_FIN_RECVED)) + { + if (tcp_curr_exdata & TCP_C2S_FIN_RECVED) + { + session_set_closing_reasion(sess, CLOSING_BY_CLIENT_FIN); + } + else if (tcp_curr_exdata & TCP_S2C_FIN_RECVED) + { + session_set_closing_reasion(sess, CLOSING_BY_SERVER_FIN); + } + // session still is active + session_manager_update_session_state(mgr, sess, SESSION_STATE_ACTIVE); + session_set_expirecb(sess, tcp_half_closed_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_half_closed); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; + } + + assert(0); + session_set_expirecb(sess, tcp_data_timeout_cb, mgr, timestamp_get_sec() + mgr->config.tcp_timeout_data); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; +} + +// OK +static struct session *session_manager_handle_udp_new_session(struct session_manager *mgr, const struct packet *pkt, const struct tuple6 *key) +{ + if (eviction_filter_lookup(mgr->udp_eviction_filter, pkt)) + { + mgr->npkts_hit_udp_evicted++; + return NULL; + } + + if (mgr->udp_sess_num == mgr->config.max_udp_session_num - 1) + { + struct session *evicted_sess = session_table_find_least_recently_unused_session(mgr->udp_sess_table); + assert(evicted_sess); + session_manager_evicte_session(mgr, evicted_sess); + } + + struct session *sess = session_pool_alloc(mgr->sess_pool); + assert(sess); + + enum session_dir curr_dir = judge_direction_by_tuple6(key); + session_manager_update_session_on_opening(mgr, sess, key, curr_dir); + session_manager_update_session_on_packet(mgr, sess, pkt, curr_dir); + session_manager_update_session_state(mgr, sess, SESSION_STATE_ACTIVE); // change session state to active + session_update_udp_exdata(sess, NULL, curr_dir); + session_set_expirecb(sess, udp_data_timeout_cb, mgr, timestamp_get_sec() + mgr->config.udp_timeout_data); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; +} + +// OK +static struct session *session_manager_handle_udp_old_session(struct session_manager *mgr, struct session *sess, const struct packet *pkt, const struct tuple6 *key) +{ + if (session_get_state(sess) == SESSION_STATE_CLOSING) + { + mgr->npkts_hit_udp_evicted++; + return NULL; + } + + enum session_dir curr_dir = judge_direction_by_session(sess, key); + session_manager_update_session_on_packet(mgr, sess, pkt, curr_dir); + session_manager_update_session_state(mgr, sess, SESSION_STATE_ACTIVE); + session_update_udp_exdata(sess, NULL, curr_dir); + session_set_expirecb(sess, udp_data_timeout_cb, mgr, timestamp_get_sec() + mgr->config.udp_timeout_data); + session_timer_add_session(mgr->sess_timer, sess); + + return sess; } /****************************************************************************** * public API ******************************************************************************/ -struct session_manager *session_manager_create(uint64_t max_session_num) +// OK +struct session_manager *session_manager_create(struct session_manager_config *config) { + if (session_manager_check_config(config)) + { + return NULL; + } + struct session_manager *mgr = (struct session_manager *)calloc(1, sizeof(struct session_manager)); if (mgr == NULL) { return NULL; } - mgr->sess_pool = session_pool_create(max_session_num); + memcpy(&mgr->config, config, sizeof(struct session_manager_config)); + mgr->sess_pool = session_pool_create(mgr->config.max_tcp_session_num + mgr->config.max_udp_session_num); if (mgr->sess_pool == NULL) { goto error; } - mgr->sess_table = session_table_create(); - if (mgr->sess_table == NULL) + mgr->tcp_sess_table = session_table_create(); + if (mgr->tcp_sess_table == NULL) + { + goto error; + } + + mgr->udp_sess_table = session_table_create(); + if (mgr->udp_sess_table == NULL) { goto error; } @@ -548,29 +907,37 @@ struct session_manager *session_manager_create(uint64_t max_session_num) goto error; } - mgr->evicted_sess = session_queue_create(); - if (mgr->evicted_sess == NULL) + mgr->sess_evicted_queue = session_queue_create(); + if (mgr->sess_evicted_queue == NULL) { goto error; } - mgr->timeout_toclosed = 2 * 1000; - mgr->timeout_toclosing = 5 * 1000; + mgr->sess_toclosed_queue = session_queue_create(); + if (mgr->sess_toclosed_queue == NULL) + { + goto error; + } - mgr->tcp_opening_sess_num = 0; - mgr->tcp_closing_sess_num = 0; - mgr->tcp_active_sess_num = 0; + mgr->tcp_dupkt_filter = dupkt_filter_create(mgr->config.tcp_dupkt_filter_enable, mgr->config.tcp_dupkt_filter_capacity, mgr->config.tcp_dupkt_filter_error_rate, mgr->config.tcp_dupkt_filter_timeout); + if (mgr->tcp_dupkt_filter == NULL) + { + goto error; + } - mgr->udp_opening_sess_num = 0; - mgr->udp_closing_sess_num = 0; - mgr->udp_active_sess_num = 0; + mgr->udp_eviction_filter = eviction_filter_create(mgr->config.udp_eviction_filter_enable, mgr->config.udp_eviction_filter_capacity, mgr->config.udp_eviction_filter_error_rate, mgr->config.udp_eviction_filter_timeout); + if (mgr->udp_eviction_filter == NULL) + { + goto error; + } + // init builtin session ex data index tcp_builtin_ex = session_get_ex_new_index("tcp_builtin_ex", NULL, NULL); udp_builtin_ex = session_get_ex_new_index("udp_builtin_ex", NULL, NULL); - c2s_1st_md_ex = session_get_ex_new_index("c2s_1st_md_ex", metadata_ex_free_cb, NULL); - s2c_1st_md_ex = session_get_ex_new_index("s2c_1st_md_ex", metadata_ex_free_cb, NULL); - c2s_1st_pkt_ex = session_get_ex_new_index("c2s_1st_pkt_ex", packet_ex_free_cb, NULL); - s2c_1st_pkt_ex = session_get_ex_new_index("s2c_1st_pkt_ex", packet_ex_free_cb, NULL); + c2s_1st_md_ex = session_get_ex_new_index("c2s_1st_md_ex", session_free_pktmeta_exdata, NULL); + s2c_1st_md_ex = session_get_ex_new_index("s2c_1st_md_ex", session_free_pktmeta_exdata, NULL); + c2s_1st_pkt_ex = session_get_ex_new_index("c2s_1st_pkt_ex", session_free_packet_exdata, NULL); + s2c_1st_pkt_ex = session_get_ex_new_index("s2c_1st_pkt_ex", session_free_packet_exdata, NULL); return mgr; @@ -579,111 +946,143 @@ error: return NULL; } +// OK void session_manager_destroy(struct session_manager *mgr) { if (mgr) { - session_queue_destroy(mgr->evicted_sess); + eviction_filter_destroy(mgr->udp_eviction_filter); + dupkt_filter_destroy(mgr->tcp_dupkt_filter); + session_queue_destroy(mgr->sess_toclosed_queue); + session_queue_destroy(mgr->sess_evicted_queue); session_timer_destroy(mgr->sess_timer); - session_table_destroy(mgr->sess_table); + session_table_destroy(mgr->udp_sess_table); + session_table_destroy(mgr->tcp_sess_table); session_pool_destroy(mgr->sess_pool); free(mgr); mgr = NULL; } } -void session_manager_set_timeout_toclosing(struct session_manager *mgr, uint64_t timeout_ms) -{ - mgr->timeout_toclosing = timeout_ms; -} - -void session_manager_set_timeout_toclosed(struct session_manager *mgr, uint64_t timeout_ms) -{ - mgr->timeout_toclosed = timeout_ms; -} - -struct session *session_manager_lookup(struct session_manager *mgr, const struct packet *pkt) +// OK +// Only use the packet six-tuple to find the session, not update it +struct session *session_manager_lookup_sesssion(struct session_manager *mgr, const struct packet *pkt) { struct tuple6 key; - if (packet_get_innermost_tuple6(pkt, &key) == -1) + memset(&key, 0, sizeof(struct tuple6)); + if (packet_get_innermost_tuple6(pkt, &key)) { return NULL; } - return session_table_find_session(mgr->sess_table, &key); -} - -// return null: invalid tuple6 or tcp first packet is not syn -struct session *session_manager_update(struct session_manager *mgr, const struct packet *pkt) -{ - struct tuple6 key; - if (packet_get_innermost_tuple6(pkt, &key) == -1) + if (key.ip_proto == IPPROTO_UDP) + { + return session_table_find_session(mgr->udp_sess_table, &key); + } + else if (key.ip_proto == IPPROTO_TCP) + { + return session_table_find_session(mgr->tcp_sess_table, &key); + } + else { return NULL; } +} - struct session *sess = session_table_find_session(mgr->sess_table, &key); - if (sess == NULL) +// OK +/* + * Return NULL in the following cases: + * 1.not a TCP or UDP packet + * 2.TCP packet miss session but no syn packet seen + * 3.TCP duplicate packet + * 4.TCP discards packets + * 5.UDP evict packet + * pakcet will not update the session and needs to be fast forwarded + */ +struct session *session_manager_update_session(struct session_manager *mgr, const struct packet *pkt) +{ + assert(session_manager_get_evicted_session(mgr) == NULL); + + session_manager_recycle_session(mgr); + + struct tuple6 key; + memset(&key, 0, sizeof(struct tuple6)); + if (packet_get_innermost_tuple6(pkt, &key)) { - if (session_pool_get_count(mgr->sess_pool) == 1) + mgr->npkts_miss_l4_proto++; + return NULL; + } + + struct session *sess = NULL; + if (key.ip_proto == IPPROTO_UDP) + { + sess = session_table_find_session(mgr->udp_sess_table, &key); + if (sess) { - struct session *unused_sess = session_table_find_least_recently_unused_session(mgr->sess_table); - assert(unused_sess); - - update_counter_on_closing(mgr, unused_sess); - session_set_state(unused_sess, SESSION_STATE_CLOSING); - session_queue_push(mgr->evicted_sess, unused_sess); - session_manager_update_session_timer(mgr, unused_sess, session_to_closed, mgr->timeout_toclosed); - } - - sess = session_pool_alloc(mgr->sess_pool); - assert(sess != NULL); - - // return 0: success - // return -1: tcp not syn packet, discard - if (handle_new_session(mgr, &key, sess, pkt) == 0) - { - session_table_add_session(mgr->sess_table, &key, sess); + return session_manager_handle_udp_old_session(mgr, sess, pkt, &key); } else { - session_pool_free(mgr->sess_pool, sess); - return NULL; + return session_manager_handle_udp_new_session(mgr, pkt, &key); + } + } + else if (key.ip_proto == IPPROTO_TCP) + { + sess = session_table_find_session(mgr->tcp_sess_table, &key); + if (sess) + { + return session_manager_handle_tcp_old_session(mgr, sess, pkt, &key); + } + else + { + return session_manager_handle_tcp_new_session(mgr, pkt, &key); } } else { - handle_old_session(mgr, &key, sess, pkt); + return NULL; } - - return sess; } -struct session *session_manager_expire(struct session_manager *mgr) +// OK +struct session *session_manager_get_expired_session(struct session_manager *mgr) { + session_manager_recycle_session(mgr); + SESSION_LOG_DEBUG("current timestamp: %lu s", timestamp_get_sec()); struct session *sess = session_timer_expire_session(mgr->sess_timer, timestamp_get_msec()); if (sess == NULL) { - return NULL; - } - - session_run_expirecb(sess); - - if (session_get_state(sess) == SESSION_STATE_CLOSED) - { - return NULL; + session_run_expirecb(sess); + session_queue_push(mgr->sess_toclosed_queue, sess); } return sess; } -struct session *session_manager_evicte(struct session_manager *mgr) +// OK +struct session *session_manager_get_evicted_session(struct session_manager *mgr) { - return session_queue_pop(mgr->evicted_sess); + session_manager_recycle_session(mgr); + + struct session *sess = session_queue_pop(mgr->sess_evicted_queue); + if (sess) + { + session_queue_push(mgr->sess_toclosed_queue, sess); + } + + return sess; } -uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_type type, enum session_state state) +// OK +// return interval (seconds) to next required update, return 0 if no session +uint64_t session_manager_get_expire_interval(struct session_manager *mgr) +{ + return session_timer_next_expire_interval(mgr->sess_timer); +} + +// OK +uint64_t session_manager_get_session_number(struct session_manager *mgr, enum session_type type, enum session_state state) { if (type == SESSION_TYPE_TCP) { @@ -699,8 +1098,7 @@ uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_ return 0; } } - - if (type == SESSION_TYPE_UDP) + else if (type == SESSION_TYPE_UDP) { switch (state) { diff --git a/src/session/session_manager.h b/src/session/session_manager.h index eadb5a0..8e762ae 100644 --- a/src/session/session_manager.h +++ b/src/session/session_manager.h @@ -8,32 +8,65 @@ extern "C" #include "session.h" -// #define SESSION_LOG_ERROR(format, ...) void(0) #ifndef SESSION_LOG_ERROR #define SESSION_LOG_ERROR(format, ...) \ fprintf(stderr, "ERROR (session), " format "\n", ##__VA_ARGS__); #endif -// #define SESSION_LOG_DEBUG(format, ...) void(0) #ifndef SESSION_LOG_DEBUG #define SESSION_LOG_DEBUG(format, ...) \ fprintf(stderr, "DEBUG (session), " format "\n", ##__VA_ARGS__); #endif +struct session_manager_config +{ + // max session number + uint64_t max_tcp_session_num; + uint64_t max_udp_session_num; + + // TCP timeout + uint64_t tcp_timeout_init; // seconds, Range: 1-60 + uint64_t tcp_timeout_handshake; // seconds, Range: 1-60 + uint64_t tcp_timeout_data; // seconds, Range: 1-15,999,999 + uint64_t tcp_timeout_half_closed; // seconds, Range: 1-604,800 + uint64_t tcp_timeout_discard; // seconds, Range: 1-15,999,999 + + // UDP timeout + uint64_t udp_timeout_data; // seconds, Range: 1-15,999,999 + + // TCP duplicate packet filter + uint8_t tcp_dupkt_filter_enable; + uint64_t tcp_dupkt_filter_capacity; + uint64_t tcp_dupkt_filter_timeout; // seconds, Range: 1-60 + double tcp_dupkt_filter_error_rate; + + // UDP eviction filter + uint8_t udp_eviction_filter_enable; + uint64_t udp_eviction_filter_capacity; + uint64_t udp_eviction_filter_timeout; // seconds, Range: 1-60 + double udp_eviction_filter_error_rate; +}; + struct session_manager; -struct session_manager *session_manager_create(uint64_t max_session_num); +struct session_manager *session_manager_create(struct session_manager_config *config); void session_manager_destroy(struct session_manager *mgr); -void session_manager_set_timeout_toclosing(struct session_manager *mgr, uint64_t timeout_ms); -void session_manager_set_timeout_toclosed(struct session_manager *mgr, uint64_t timeout_ms); - -struct session *session_manager_lookup(struct session_manager *mgr, const struct packet *pkt); -// return null: invalid tuple6 or tcp first packet is not syn -struct session *session_manager_update(struct session_manager *mgr, const struct packet *pkt); -struct session *session_manager_expire(struct session_manager *mgr); -struct session *session_manager_evicte(struct session_manager *mgr); - -// for debug -uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_type type, enum session_state state); +// only use the packet six-tuple to find the session, not update it +struct session *session_manager_lookup_sesssion(struct session_manager *mgr, const struct packet *pkt); +/* + * Return NULL in the following cases: + * 1.not a TCP or UDP packet + * 2.TCP packet miss session but no syn packet seen + * 3.TCP duplicate packet + * 4.TCP discards packets + * 5.UDP evict packet + * pakcet will not update the session and needs to be fast forwarded + */ +struct session *session_manager_update_session(struct session_manager *mgr, const struct packet *pkt); +struct session *session_manager_get_expired_session(struct session_manager *mgr); +struct session *session_manager_get_evicted_session(struct session_manager *mgr); +// return interval (seconds) to next required update, return 0 if no session +uint64_t session_manager_get_expire_interval(struct session_manager *mgr); +uint64_t session_manager_get_session_number(struct session_manager *mgr, enum session_type type, enum session_state state); #ifdef __cpluscplus } diff --git a/src/session/session_private.h b/src/session/session_private.h index 18460a4..4e3dbd6 100644 --- a/src/session/session_private.h +++ b/src/session/session_private.h @@ -22,8 +22,8 @@ enum tcp_ex_data TCP_SYN_RECVED = 1 << 0, TCP_SYNACK_RECVED = 1 << 1, // ESTABLISHED - TCP_C2S_PAYLOAD_RECVED = 1 << 2, - TCP_S2C_PAYLOAD_RECVED = 1 << 3, + TCP_C2S_DATA_RECVED = 1 << 2, + TCP_S2C_DATA_RECVED = 1 << 3, // FIN TCP_C2S_FIN_RECVED = 1 << 4, TCP_S2C_FIN_RECVED = 1 << 5, @@ -49,6 +49,9 @@ struct session // session type enum session_type type; + // dup traffic flag + enum dup_traffic_flag dup_flag; + // closing reasion enum closing_reasion closing_reasion; diff --git a/src/session/session_table.cpp b/src/session/session_table.cpp index 48b72b4..2c47e33 100644 --- a/src/session/session_table.cpp +++ b/src/session/session_table.cpp @@ -172,7 +172,7 @@ int session_table_add_session(struct session_table *table, const struct tuple6 * return 0; } -void session_table_delete_session(struct session_table *table, const struct tuple6 *tuple) +void session_table_del_session(struct session_table *table, const struct tuple6 *tuple) { if (table == NULL) { diff --git a/src/session/session_table.h b/src/session/session_table.h index 4dd2900..d8e3b58 100644 --- a/src/session/session_table.h +++ b/src/session/session_table.h @@ -18,7 +18,7 @@ void session_table_set_freecb(struct session_table *table, session_free_cb free_ // return 0: success // return -1: failed int session_table_add_session(struct session_table *table, const struct tuple6 *tuple, struct session *sess); -void session_table_delete_session(struct session_table *table, const struct tuple6 *tuple); +void session_table_del_session(struct session_table *table, const struct tuple6 *tuple); struct session *session_table_find_session(struct session_table *table, const struct tuple6 *tuple); struct session *session_table_find_least_recently_unused_session(struct session_table *table); struct session *session_table_find_least_recently_used_session(struct session_table *table); diff --git a/src/session/session_timer.cpp b/src/session/session_timer.cpp index 1bb7f23..ae1c01a 100644 --- a/src/session/session_timer.cpp +++ b/src/session/session_timer.cpp @@ -67,3 +67,9 @@ struct session *session_timer_expire_session(struct session_timer *timer, uint64 struct session *sess = (struct session *)timeout->callback.arg; return sess; } + +// return interval to next required update, return 0 if no timer +uint64_t session_timer_next_expire_interval(struct session_timer *timer) +{ + return timeouts_timeout(timer->timeouts); +} \ No newline at end of file diff --git a/src/session/session_timer.h b/src/session/session_timer.h index 93690c5..21ffd10 100644 --- a/src/session/session_timer.h +++ b/src/session/session_timer.h @@ -18,6 +18,8 @@ void session_timer_del_session(struct session_timer *timer, struct session *sess * if return session, the session will be removed from timer. */ struct session *session_timer_expire_session(struct session_timer *timer, uint64_t abs_current_ts); +// return interval to next required update, return 0 if no timer +uint64_t session_timer_next_expire_interval(struct session_timer *timer); #ifdef __cpluscplus } diff --git a/src/session/test/gtest_session_manager.cpp b/src/session/test/gtest_session_manager.cpp index 207e7bb..24d7ae6 100644 --- a/src/session/test/gtest_session_manager.cpp +++ b/src/session/test/gtest_session_manager.cpp @@ -454,7 +454,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_C2S_PAYLOAD) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_C2S_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_C2S_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -582,7 +582,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_S2C_PAYLOAD) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_S2C_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_S2C_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -708,7 +708,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_C2S_PAYLOAD) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -835,7 +835,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_S2C_PAYLOAD) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_S2C_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYNACK_RECVED | TCP_S2C_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1043,7 +1043,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1346,7 +1346,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1384,7 +1384,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1422,7 +1422,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1460,7 +1460,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1498,7 +1498,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1536,7 +1536,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_C2S); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED | TCP_C2S_FIN_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED | TCP_C2S_FIN_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1574,7 +1574,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == &pkt); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_S2C); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); @@ -1612,7 +1612,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM) EXPECT_TRUE(session_get_create_time(sess) == session_get_last_time(sess)); EXPECT_TRUE(session_get0_cur_pkt(sess) == NULL); EXPECT_TRUE(session_get_cur_dir(sess) == SESSION_DIR_NONE); - EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_PAYLOAD_RECVED | TCP_S2C_PAYLOAD_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED)); + EXPECT_TRUE((uint64_t)session_get0_ex_data(sess, tcp_builtin_ex) == (TCP_SYN_RECVED | TCP_SYNACK_RECVED | TCP_C2S_DATA_RECVED | TCP_S2C_DATA_RECVED | TCP_C2S_FIN_RECVED | TCP_S2C_FIN_RECVED)); // check session manager info EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_TCP, SESSION_STATE_OPENING) == 0); diff --git a/src/session/test/gtest_session_table.cpp b/src/session/test/gtest_session_table.cpp index 126077a..bd6c709 100644 --- a/src/session/test/gtest_session_table.cpp +++ b/src/session/test/gtest_session_table.cpp @@ -106,17 +106,17 @@ TEST(SESSION_TABLE, OP_SESSION) EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_3) == sess3); // Delete - session_table_delete_session(sess_table, &tuple_1); + session_table_del_session(sess_table, &tuple_1); EXPECT_TRUE(session_table_get_count(sess_table) == 2); EXPECT_TRUE(session_table_find_session(sess_table, &tuple_1) == NULL); EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_1) == NULL); - session_table_delete_session(sess_table, &reversed_tuple_2); + session_table_del_session(sess_table, &reversed_tuple_2); EXPECT_TRUE(session_table_get_count(sess_table) == 1); EXPECT_TRUE(session_table_find_session(sess_table, &tuple_2) == NULL); EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_2) == NULL); - session_table_delete_session(sess_table, &tuple_3); + session_table_del_session(sess_table, &tuple_3); EXPECT_TRUE(session_table_get_count(sess_table) == 0); EXPECT_TRUE(session_table_find_session(sess_table, &tuple_3) == NULL); EXPECT_TRUE(session_table_find_session(sess_table, &reversed_tuple_3) == NULL); @@ -176,15 +176,15 @@ TEST(SESSION_TABLE, FIND_OLDEST_NEWEST) // Delete Session - session_table_delete_session(sess_table, &tuple_1); + session_table_del_session(sess_table, &tuple_1); EXPECT_TRUE(session_table_find_least_recently_unused_session(sess_table) == sess2); EXPECT_TRUE(session_table_find_least_recently_used_session(sess_table) == sess3); - session_table_delete_session(sess_table, &tuple_2); + session_table_del_session(sess_table, &tuple_2); EXPECT_TRUE(session_table_find_least_recently_unused_session(sess_table) == sess3); EXPECT_TRUE(session_table_find_least_recently_used_session(sess_table) == sess3); - session_table_delete_session(sess_table, &tuple_3); + session_table_del_session(sess_table, &tuple_3); EXPECT_TRUE(session_table_find_least_recently_unused_session(sess_table) == NULL); EXPECT_TRUE(session_table_find_least_recently_used_session(sess_table) == NULL); diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt index ee9495d..88c7ade 100644 --- a/src/stellar/CMakeLists.txt +++ b/src/stellar/CMakeLists.txt @@ -1,7 +1,4 @@ add_executable(stellar stellar.cpp) -target_link_libraries(stellar session_manager) -target_link_libraries(stellar dupkt_filter) -target_link_libraries(stellar eviction_filter) -target_link_libraries(stellar pthread) +target_link_libraries(stellar session_manager pthread) install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program) \ No newline at end of file diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp index 9fd10c1..86d209c 100644 --- a/src/stellar/stellar.cpp +++ b/src/stellar/stellar.cpp @@ -10,8 +10,6 @@ #include "packet.h" #include "timestamp.h" #include "session_manager.h" -#include "dupkt_filter.h" -#include "eviction_filter.h" #ifndef STELLAR_LOG_ERROR #define STELLAR_LOG_ERROR(format, ...) \ @@ -32,8 +30,6 @@ struct thread_ctx uint64_t need_exit; uint64_t is_runing; struct session_manager *sess_mgr; - struct dupkt_filter *dupkt_filter; - struct eviction_filter *eviction_filter; }; struct stellar_ctx @@ -41,48 +37,16 @@ struct stellar_ctx uint64_t need_exit; uint16_t max_worker_num; - // session manager - uint64_t sess_mgr_max_session_num; - uint64_t sess_mgr_timeout_ms_toclsoing; - uint64_t sess_mgr_timeout_ms_toclosed; + struct session_manager_config sess_mgr_cfg; - // duplicated packet filter - uint8_t dupkt_filter_enable; - unsigned int dupkt_filter_capacity; - double dupkt_filter_error_rate; - int dupkt_filter_timeout_s; - - // eviction filter - uint8_t eviction_filter_enable; - unsigned int eviction_filter_capacity; - double eviction_filter_error_rate; - int eviction_filter_timeout_s; - - // thread struct thread_ctx thread_ctx[128]; } g_stellar_ctx = { .need_exit = 0, .max_worker_num = 1, - // session manager - .sess_mgr_max_session_num = 1000000, - .sess_mgr_timeout_ms_toclsoing = 1000, - .sess_mgr_timeout_ms_toclosed = 10000, - - // duplicated packet filter - .dupkt_filter_enable = 1, - .dupkt_filter_capacity = 1000000, - .dupkt_filter_error_rate = 0.0001, - .dupkt_filter_timeout_s = 10, - - // eviction filter - .eviction_filter_enable = 1, - .eviction_filter_capacity = 1000000, - .eviction_filter_error_rate = 0.0001, - .eviction_filter_timeout_s = 10, + // TODO session manager config }; -// TODO static int recv_packet(const char **data) { static unsigned char packet_data[] = { @@ -96,6 +60,10 @@ static int recv_packet(const char **data) return sizeof(packet_data); } +static void send_packet(const char *data, uint16_t len) +{ +} + static void signal_handler(int signo) { if (signo == SIGINT) @@ -119,11 +87,6 @@ static void signal_handler(int signo) static void __session_dispatch(struct session *sess) { - if (sess == NULL) - { - return; - } - printf("\n"); printf("=> session dispatch: %p\n", sess); session_dump(sess); @@ -141,17 +104,9 @@ static void thread_ctx_init(struct stellar_ctx *ctx) thd_ctx->index = i; thd_ctx->need_exit = 0; thd_ctx->is_runing = 0; - // session manager - thd_ctx->sess_mgr = session_manager_create(ctx->sess_mgr_max_session_num); + + thd_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg); assert(thd_ctx->sess_mgr != NULL); - session_manager_set_timeout_toclosing(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclsoing); - session_manager_set_timeout_toclosed(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclosed); - // duplicated packet filter - thd_ctx->dupkt_filter = dupkt_filter_create(ctx->dupkt_filter_enable, ctx->dupkt_filter_capacity, ctx->dupkt_filter_error_rate, ctx->dupkt_filter_timeout_s); - assert(thd_ctx->dupkt_filter != NULL); - // eviction filter - thd_ctx->eviction_filter = eviction_filter_create(ctx->eviction_filter_enable, ctx->eviction_filter_capacity, ctx->eviction_filter_error_rate, ctx->eviction_filter_timeout_s); - assert(thd_ctx->eviction_filter != NULL); } } @@ -175,7 +130,6 @@ static void *thread_cycle(void *arg) struct session *sess = NULL; struct thread_ctx *thd_ctx = (struct thread_ctx *)arg; struct session_manager *sess_mgr = thd_ctx->sess_mgr; - struct dupkt_filter *dupkt_filter = thd_ctx->dupkt_filter; char thread_name[16]; ATOMIC_SET(&thd_ctx->is_runing, 1); @@ -185,60 +139,38 @@ static void *thread_cycle(void *arg) while (ATOMIC_READ(&thd_ctx->need_exit) == 0) { - // TODO recv packet len = recv_packet(&data); + if (data == NULL) + { + goto poll_wait; + } - // parse packet packet_parse(&pkt, data, len); - - // duplicated packet filter - if (dupkt_filter_lookup(dupkt_filter, &pkt) == 0) - { - STELLAR_LOG_DEBUG("duplicated packet, forward it"); - goto fast_forward; - } - - // eviction filter - if (eviction_filter_lookup(thd_ctx->eviction_filter, &pkt) == 0) - { - STELLAR_LOG_DEBUG("eviction packet, forward it"); - goto fast_forward; - } - - // update session - sess = session_manager_update(sess_mgr, &pkt); + sess = session_manager_update_session(sess_mgr, &pkt); if (sess == NULL) { goto fast_forward; } - - // TODO session synchronization - __session_dispatch(sess); - dupkt_filter_add(dupkt_filter, &pkt); + + sess = session_manager_get_evicted_session(sess_mgr); + if (sess) + { + __session_dispatch(sess); + } fast_forward: - // TODO send packet + send_packet(data, len); - // dispatch expire session - sess = session_manager_expire(sess_mgr); - if (sess && session_get_type(sess) == SESSION_TYPE_UDP && session_get_state(sess) == SESSION_STATE_CLOSING) + sess = session_manager_get_expired_session(sess_mgr); + if (sess) { - const struct packet *sess_1st_pkt = session_get0_1st_pkt(sess); - eviction_filter_add(thd_ctx->eviction_filter, sess_1st_pkt); + __session_dispatch(sess); } - __session_dispatch(sess); + continue; - sess = session_manager_evicte(sess_mgr); - if (sess && session_get_type(sess) == SESSION_TYPE_UDP) - { - const struct packet *sess_1st_pkt = session_get0_1st_pkt(sess); - eviction_filter_add(thd_ctx->eviction_filter, sess_1st_pkt); - } - __session_dispatch(sess); - - // TODO get next timeout - sleep(1); + poll_wait: + sleep(session_manager_get_expire_interval(sess_mgr)); } ATOMIC_SET(&thd_ctx->is_runing, 0); @@ -255,18 +187,14 @@ int main(int argc, char **argv) // TODO init plugin - // register signal handler signal(SIGINT, signal_handler); signal(SIGQUIT, signal_handler); signal(SIGTERM, signal_handler); - // update timestamp timestamp_update(); - // init thread context thread_ctx_init(&g_stellar_ctx); - // create worker thread for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++) { struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i]; @@ -277,14 +205,12 @@ int main(int argc, char **argv) } } - // master thread update timestamp while (!g_stellar_ctx.need_exit) { timestamp_update(); sleep(1); } - // wait worker thread exit for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++) { struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i]; diff --git a/src/timestamp/CMakeLists.txt b/src/timestamp/CMakeLists.txt index 7886c77..41dd2bf 100644 --- a/src/timestamp/CMakeLists.txt +++ b/src/timestamp/CMakeLists.txt @@ -3,7 +3,7 @@ ############################################################################### add_library(timestamp timestamp.cpp) -target_include_directories(timestamp PUBLIC ${CMAKE_SOURCE_DIR}/src/timestamp) +target_include_directories(timestamp PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_link_libraries(timestamp) add_subdirectory(test) \ No newline at end of file diff --git a/src/tuple/CMakeLists.txt b/src/tuple/CMakeLists.txt index 0f7401b..77b5a2f 100644 --- a/src/tuple/CMakeLists.txt +++ b/src/tuple/CMakeLists.txt @@ -3,7 +3,7 @@ ############################################################################### add_library(tuple tuple.cpp) -target_include_directories(tuple PUBLIC ${CMAKE_SOURCE_DIR}/src/tuple) +target_include_directories(tuple PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(tuple PUBLIC ${CMAKE_SOURCE_DIR}/src/crc32) target_link_libraries(tuple)