From c2e9275a9e56f7d0567fc2512d5cab67932d39da Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Mon, 18 Nov 2024 18:33:19 +0800 Subject: [PATCH] remove session_manager_subscribe_xxx() API; convert session closing events to pseudo packets when a session is closed, use the packet manager to create a pseudo packet, set the session to be closed as packet Exdata, and schedule it to the packet forwarding stage. when the pseudo packet free, the session will be free. --- CMakeLists.txt | 1 + conf/stellar.toml | 1 + decoders/CMakeLists.txt | 2 +- include/stellar/packet.h | 2 + include/stellar/session.h | 11 +- infra/CMakeLists.txt | 2 +- infra/packet_io/packet_io.c | 2 +- infra/packet_io/pcap_io.c | 28 +- infra/packet_io/test/CMakeLists.txt | 3 +- infra/packet_io/test/conf/pcap_io.toml | 1 + infra/packet_manager/packet_manager.c | 25 +- infra/packet_manager/test/CMakeLists.txt | 3 +- infra/session_manager/session_internal.h | 3 +- infra/session_manager/session_manager.c | 416 +++++++------------- infra/session_manager/session_manager_rte.c | 51 --- infra/session_manager/session_manager_rte.h | 1 - infra/session_manager/session_utils.c | 11 - infra/session_manager/test/CMakeLists.txt | 4 +- infra/stellar_core.c | 2 + infra/tcp_reassembly/tcp_reassembly.h | 1 - infra/utils_internal.h | 2 +- infra/version.map | 8 +- scripts/stat_format.sh | 1 + test/CMakeLists.txt | 4 +- test/monitor/CMakeLists.txt | 3 +- 25 files changed, 210 insertions(+), 378 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9d28378..cb3d02a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,4 +89,5 @@ add_subdirectory(tools) add_subdirectory(test) install(DIRECTORY DESTINATION log COMPONENT PROGRAM) +install(DIRECTORY DESTINATION metrics COMPONENT PROGRAM) install(DIRECTORY DESTINATION module COMPONENT PROGRAM) \ No newline at end of file diff --git a/conf/stellar.toml b/conf/stellar.toml index fdd7ba9..20a8b91 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -7,6 +7,7 @@ dev_symbol = "nf_0_fw" pcap_path = "/tmp/test.pcap" pcap_done_exit = 1 # range: [0, 1] + pcap_queue_size = 1 # range: [1, 4294967295] thread_num = 1 # range: [1, 256] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] idle_yield_ms = 900 # range: [0, 60000] (ms) diff --git a/decoders/CMakeLists.txt b/decoders/CMakeLists.txt index 7946822..0875131 100644 --- a/decoders/CMakeLists.txt +++ b/decoders/CMakeLists.txt @@ -1,4 +1,4 @@ -add_subdirectory(lpi_plus) +#add_subdirectory(lpi_plus) #add_subdirectory(http) #add_subdirectory(socks) #add_subdirectory(stratum) diff --git a/include/stellar/packet.h b/include/stellar/packet.h index c20e1b5..ec49227 100644 --- a/include/stellar/packet.h +++ b/include/stellar/packet.h @@ -227,6 +227,8 @@ struct packet *packet_manager_build_udp_packet(struct packet_manager *pkt_mgr, c const char *udp_payload, uint16_t udp_payload_len); struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt, uint8_t ip_proto, const char *l3_payload, uint16_t l3_payload_len); +struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt); +void packet_manager_free_packet(struct packet_manager *pkt_mgr, struct packet *pkt); #ifdef __cplusplus } diff --git a/include/stellar/session.h b/include/stellar/session.h index 68a69c1..a5dae67 100644 --- a/include/stellar/session.h +++ b/include/stellar/session.h @@ -130,7 +130,6 @@ int session_has_duplicate_traffic(const struct session *sess); enum session_type session_get_type(const struct session *sess); enum session_state session_get_current_state(const struct session *sess); -const struct packet *session_get_current_packet(const struct session *sess); enum closing_reason session_get_closing_reason(const struct session *sess); enum session_direction session_get_direction(const struct session *sess); @@ -152,15 +151,7 @@ struct session_manager; struct session_manager *module_to_session_manager(struct module *mod); int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg); -// When the state is SESSION_STATE_CLOSED, the packet is NULL, and the session will be destroyed. -typedef void on_session_message_callback(struct session *sess, enum session_state state, struct packet *pkt, void *args); -// When the state is SESSION_STATE_CLOSED, the tcp_payload is NULL, and the session will be destroyed. -typedef void on_tcp_payload_callback(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *args); - -int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args); -int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args); -int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args); -int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args); +struct session *packet_exdata_to_session(struct packet *pkt); #ifdef __cplusplus } diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index fc0ad21..e76a214 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -1,6 +1,6 @@ set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor) set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf) -set(DECODERS lpi_plus) +#set(DECODERS lpi_plus) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) set(LIBS fieldstat4) diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index e0a7e1c..533ce8b 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -161,7 +161,7 @@ struct packet_io *packet_io_new(const char *toml_file) PACKET_IO_LOG_ERROR("failed to create fieldstat_easy"); goto error_out; } - if (fieldstat_easy_enable_auto_output(pkt_io->fs, "packet_io.fs4", 2) != 0) + if (fieldstat_easy_enable_auto_output(pkt_io->fs, "metrics/packet_io.json", 2) != 0) { PACKET_IO_LOG_ERROR("failed to enable auto output for fieldstat_easy"); goto error_out; diff --git a/infra/packet_io/pcap_io.c b/infra/packet_io/pcap_io.c index 555b59a..1fc9c6d 100644 --- a/infra/packet_io/pcap_io.c +++ b/infra/packet_io/pcap_io.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "pcap_io.h" #include "packet_dump.h" @@ -11,8 +12,6 @@ #include "packet_internal.h" #include "utils_internal.h" -#define RING_BUFFER_MAX_SIZE (4096 * 1000) - struct pcap_pkt { char *data; @@ -24,8 +23,9 @@ struct pcap_io_cfg { char mode[16]; // pcapfile, pcaplist char pcap_path[PATH_MAX]; - uint64_t pcap_done_exit; // range [0, 1] - uint64_t thread_num; // range [1, MAX_THREAD_NUM] + uint64_t pcap_done_exit; // range [0, 1] + uint64_t pcap_queue_size; // range [1, 4294967295] + uint64_t thread_num; // range [1, MAX_THREAD_NUM] // packet pool uint64_t capacity; // range: [1, 4294967295] @@ -140,6 +140,7 @@ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file) ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode); ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path); ret += load_toml_integer_config(toml_file, "packet_io.pcap_done_exit", &cfg->pcap_done_exit, 0, 1); + ret += load_toml_integer_config(toml_file, "packet_io.pcap_queue_size", &cfg->pcap_queue_size, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); if (strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0) @@ -249,12 +250,23 @@ static int pcap_io_handler(struct pcap_io *pcap_io, const char *pcap_file) static int all_packet_consumed(struct pcap_io *pcap_io) { uint64_t consumed_pkts = 0; + uint64_t total_tx_pkts = 0; + uint64_t total_dropped_pkts = 0; + uint64_t total_injected_pkts = 0; + uint64_t total_user_freed_pkts = 0; + uint64_t read_pcap_pkts = ATOMIC_READ(&pcap_io->read_pcap_pkts); for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { - consumed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_rx); + total_tx_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_tx); + total_dropped_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_dropped); + total_injected_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_injected); + total_user_freed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_user_freed); } - if (consumed_pkts == read_pcap_pkts) + + consumed_pkts = total_tx_pkts + total_dropped_pkts + total_user_freed_pkts - total_injected_pkts; + + if (consumed_pkts >= read_pcap_pkts) { return 1; } @@ -269,6 +281,8 @@ static void *pcap_io_thread(void *arg) struct pcap_io *pcap_io = (struct pcap_io *)arg; __thread_local_logger = pcap_io->logger; + prctl(PR_SET_NAME, "stellar:pcap", NULL, NULL, NULL); + ATOMIC_SET(&pcap_io->io_thread_is_runing, 1); PACKET_IO_LOG_FATAL("pcap io thread is running"); @@ -373,7 +387,7 @@ void *pcap_io_new(const char *toml_file) for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++) { - pcap_io->ring[i] = ring_buffer_new(RING_BUFFER_MAX_SIZE); + pcap_io->ring[i] = ring_buffer_new(pcap_io->cfg->pcap_queue_size); if (pcap_io->ring[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create ring buffer"); diff --git a/infra/packet_io/test/CMakeLists.txt b/infra/packet_io/test/CMakeLists.txt index 740dd40..be675ac 100644 --- a/infra/packet_io/test/CMakeLists.txt +++ b/infra/packet_io/test/CMakeLists.txt @@ -5,4 +5,5 @@ include(GoogleTest) gtest_discover_tests(gtest_packet_io) file(COPY ./conf/ DESTINATION ./conf/) -file(COPY ./pcap/ DESTINATION ./pcap/) \ No newline at end of file +file(COPY ./pcap/ DESTINATION ./pcap/) +file(COPY ./metrics/ DESTINATION ./metrics/) \ No newline at end of file diff --git a/infra/packet_io/test/conf/pcap_io.toml b/infra/packet_io/test/conf/pcap_io.toml index 3bf7cfc..35943f7 100644 --- a/infra/packet_io/test/conf/pcap_io.toml +++ b/infra/packet_io/test/conf/pcap_io.toml @@ -4,6 +4,7 @@ dev_symbol = "nf_0_fw" pcap_path = "./pcap/IPv4_frags_UDP.pcap" pcap_done_exit = 1 # range: [0, 1] + pcap_queue_size = 1024 # range: [1, 4294967295] thread_num = 1 # range: [1, 256] cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12] idle_yield_ms = 900 # range: [0, 60000] (ms) diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index bb4a0c1..e776a85 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -227,7 +227,7 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_sche, uint16_t th PACKET_MANAGER_LOG_ERROR("failed to create fieldstat_easy"); goto error_out; } - if (fieldstat_easy_enable_auto_output(pkt_mgr->fs, "packet_manager.fs4", 2) != 0) + if (fieldstat_easy_enable_auto_output(pkt_mgr->fs, "metrics/packet_manager.json", 2) != 0) { PACKET_MANAGER_LOG_ERROR("failed to enable auto output for fieldstat_easy"); goto error_out; @@ -477,6 +477,20 @@ struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, co return pkt; } +struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt) +{ + struct packet *pkt = packet_dup(origin_pkt); + if (pkt == NULL) + { + return NULL; + } + + struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche); + packet_set_user_data(pkt, ex_rte); + + return pkt; +} + void packet_manager_free_packet(struct packet_manager *pkt_mgr __attribute__((unused)), struct packet *pkt) { if (pkt) @@ -581,8 +595,9 @@ struct module *packet_manager_on_thread_init(struct module_manager *mod_mgr, int void packet_manager_on_thread_exit(struct module_manager *mod_mgr __attribute__((unused)), int thread_id, struct module *mod) { struct packet_manager *pkt_mgr = module_get_ctx(mod); - assert(pkt_mgr); - assert(thread_id < pkt_mgr->thread_num); - - packet_manager_clean(pkt_mgr, thread_id); + if (pkt_mgr) + { + assert(thread_id < pkt_mgr->thread_num); + packet_manager_clean(pkt_mgr, thread_id); + } } \ No newline at end of file diff --git a/infra/packet_manager/test/CMakeLists.txt b/infra/packet_manager/test/CMakeLists.txt index 184d9fc..916e1d5 100644 --- a/infra/packet_manager/test/CMakeLists.txt +++ b/infra/packet_manager/test/CMakeLists.txt @@ -84,4 +84,5 @@ gtest_discover_tests(gtest_packet_ldbc) gtest_discover_tests(gtest_packet_pool) gtest_discover_tests(gtest_packet_manager) -file(COPY ../../../conf/ DESTINATION ./conf/) \ No newline at end of file +file(COPY ../../../conf/ DESTINATION ./conf/) +file(COPY ./metrics/ DESTINATION ./metrics/) \ No newline at end of file diff --git a/infra/session_manager/session_internal.h b/infra/session_manager/session_internal.h index 2201315..ec8a0b8 100644 --- a/infra/session_manager/session_internal.h +++ b/infra/session_manager/session_internal.h @@ -48,7 +48,6 @@ struct session uint64_t timestamps[MAX_TIMESTAMP]; // realtime msec struct tcp_half tcp_halfs[MAX_FLOW_TYPE]; struct timeout timeout; - struct tcp_segment empty_seg; TAILQ_ENTRY(session) lru_tqe; TAILQ_ENTRY(session) free_tqe; TAILQ_ENTRY(session) evc_tqe; @@ -120,7 +119,7 @@ void session_set_first_packet(struct session *sess, enum flow_type type, const s // const struct packet *session_get_first_packet(const struct session *sess, enum flow_type type); void session_set_current_packet(struct session *sess, const struct packet *pkt); -// const struct packet *session_get_current_packet(const struct session *sess); +const struct packet *session_get_current_packet(const struct session *sess); // int session_is_symmetric(const struct session *sess, unsigned char *flag); diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 4358a4d..96d421c 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -2,6 +2,7 @@ #include "utils_internal.h" #include "session_internal.h" +#include "session_manager.h" #include "session_manager_log.h" #include "session_manager_cfg.h" #include "session_manager_rte.h" @@ -12,89 +13,36 @@ #pragma GCC diagnostic ignored "-Wunused-parameter" -struct session_manager_sche -{ - int pkt_ex_id; - int sess_msg_id_tcp; - int sess_msg_id_udp; - int sess_msg_id_ctrl; - int sess_msg_id_stream; - - struct mq_schema *mq_sche; - struct exdata_schema *ex_sche; -}; - struct session_manager { + int pkt_ex_to_get_sess; + int pkt_ex_to_free_sess; + struct exdata_schema *ex_sche; + int stat_idx[SESS_MGR_STAT_MAX]; struct fieldstat_easy *fs; struct session_manager_cfg *cfg; - struct session_manager_sche *sche; struct session_manager_rte *rte[MAX_THREAD_NUM]; - struct mq_runtime *mq[MAX_THREAD_NUM]; struct module_manager *mod_mgr; + struct packet_manager *pkt_mgr; }; +__thread int __thread_pkt_ex_to_get_sess = 0; + /****************************************************************************** * session manager sche ******************************************************************************/ -static void clean_closed_session(struct session_manager *sess_mgr, uint16_t thread_id, uint64_t now_ms) +static void free_session(int idx, void *ex_ptr, void *arg) { - struct session *sess = NULL; - struct session *cleaned[CLEAN_SESSION_BURST] = {NULL}; - struct mq_runtime *mq_rte = sess_mgr->mq[thread_id]; - struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id]; + struct session *sess = (struct session *)ex_ptr; + struct session_manager *sess_mgr = (struct session_manager *)arg; + assert(idx == sess_mgr->pkt_ex_to_free_sess); - uint64_t used = session_manager_rte_clean_session(sess_mgr_rte, now_ms, cleaned, CLEAN_SESSION_BURST); - for (uint64_t i = 0; i < used; i++) + if (sess && session_get_current_state(sess) == SESSION_STATE_CLOSED) { - sess = cleaned[i]; - assert(session_get_current_state(sess) == SESSION_STATE_CLOSED); - session_set_current_packet(sess, NULL); - session_set_flow_type(sess, FLOW_TYPE_NONE); - - if (session_get_type(sess) == SESSION_TYPE_TCP) - { - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_stream, &sess->empty_seg); - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess); - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_tcp, sess); - } - else - { - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess); - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_udp, sess); - } - } -} - -static void on_sess_msg_dispatch(int sess_msg_id, void *msg, on_msg_cb_func *msg_cb, void *msg_cb_args, void *dispatch_args) -{ - struct session *sess = (struct session *)msg; - struct packet *pkt = (struct packet *)session_get_current_packet(sess); - enum session_state state = session_get_current_state(sess); - - ((on_session_message_callback *)(void *)msg_cb)(sess, state, pkt, msg_cb_args); -} - -static void on_tcp_payload_msg_dispatch(int sess_msg_id, void *msg, on_msg_cb_func *msg_cb, void *msg_cb_args, void *dispatch_args) -{ - struct tcp_segment *seg = (struct tcp_segment *)msg; - struct session *sess = (struct session *)seg->user_data; - enum session_state state = session_get_current_state(sess); - - ((on_tcp_payload_callback *)(void *)msg_cb)(sess, state, seg->data, seg->len, msg_cb_args); -} - -static void on_sess_msg_free(void *msg, void *args) -{ - struct session *sess = (struct session *)msg; - - if (session_get_current_state(sess) == SESSION_STATE_CLOSED) - { - struct session_manager *sess_mgr = (struct session_manager *)args; int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr); - struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id]; + struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id); char buffer[4096] = {0}; session_to_str(sess, 0, buffer, sizeof(buffer)); @@ -106,20 +54,34 @@ static void on_sess_msg_free(void *msg, void *args) } } -static void on_tcp_payload_msg_free(void *msg, void *args) +static void notify_sess_closed_by_pseudo_pkt(struct session_manager *sess_mgr, int thread_id, struct session *sess) { - struct tcp_segment *seg = (struct tcp_segment *)msg; - struct session *sess = (struct session *)seg->user_data; + struct packet *pseudo = NULL; + struct packet_manager *pkt_mgr = sess_mgr->pkt_mgr; + assert(session_get_current_state(sess) == SESSION_STATE_CLOSED); - session_free_tcp_segment(sess, seg); + if (session_get_first_packet(sess, FLOW_TYPE_C2S)) + { + pseudo = packet_manager_dup_packet(pkt_mgr, session_get_first_packet(sess, FLOW_TYPE_C2S)); + } + else + { + pseudo = packet_manager_dup_packet(pkt_mgr, session_get_first_packet(sess, FLOW_TYPE_S2C)); + } + + assert(pseudo); + packet_set_type(pseudo, PACKET_TYPE_PSEUDO); + packet_set_action(pseudo, PACKET_ACTION_DROP); + packet_set_exdata(pseudo, sess_mgr->pkt_ex_to_free_sess, sess); + packet_manager_schedule_packet(pkt_mgr, thread_id, pseudo, PACKET_STAGE_FORWARD); + SESSION_MANAGER_LOG_INFO("notify session %lu %s closed by pseudo packet: %p", session_get_id(sess), session_get_readable_addr(sess), pseudo); } static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr); - struct mq_runtime *mq_rte = sess_mgr->mq[thread_id]; - struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id]; + struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id); /* * We use the system's real time instead of monotonic time for the following reasons: @@ -132,86 +94,62 @@ static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void uint64_t now_ms = clock_get_real_time_ms(); struct tuple6 key; - struct tcp_segment *seg = NULL; struct session *sess = session_manager_rte_lookup_session_by_packet(sess_mgr_rte, pkt); if (sess == NULL) { - if (packet_get_type(pkt) == PACKET_TYPE_PSEUDO) + if (packet_get_type(pkt) == PACKET_TYPE_RAW) { - goto fast_path; - } - - sess = session_manager_rte_new_session(sess_mgr_rte, pkt, now_ms); - if (sess == NULL) - { - goto fast_path; + sess = session_manager_rte_new_session(sess_mgr_rte, pkt, now_ms); + if (sess) + { + session_set_user_data(sess, exdata_runtime_new(sess_mgr->ex_sche)); + } } else { - session_set_user_data(sess, exdata_runtime_new(sess_mgr->sche->ex_sche)); - goto slow_path; + // TODO new session by pseudo packet } } else { if (packet_get_type(pkt) == PACKET_TYPE_PSEUDO) { - goto ctrl_path; - } - - if (session_manager_rte_update_session(sess_mgr_rte, sess, pkt, now_ms) == -1) - { - goto fast_path; + session_set_current_packet(sess, pkt); + packet_get_innermost_tuple6(pkt, &key); + if (tuple6_cmp(session_get_tuple6(sess), &key) == 0) + { + session_set_flow_type(sess, FLOW_TYPE_C2S); + sess->stats[FLOW_TYPE_C2S][STAT_PSEUDO_PACKETS_RECEIVED]++; + sess->stats[FLOW_TYPE_C2S][STAT_PSEUDO_BYTES_RECEIVED] += packet_get_raw_len(pkt); + } + else + { + session_set_flow_type(sess, FLOW_TYPE_S2C); + sess->stats[FLOW_TYPE_S2C][STAT_PSEUDO_PACKETS_RECEIVED]++; + sess->stats[FLOW_TYPE_S2C][STAT_PSEUDO_BYTES_RECEIVED] += packet_get_raw_len(pkt); + } } else { - goto slow_path; + session_manager_rte_update_session(sess_mgr_rte, sess, pkt, now_ms); } } -ctrl_path: - session_set_current_packet(sess, pkt); - packet_get_innermost_tuple6(pkt, &key); - if (tuple6_cmp(session_get_tuple6(sess), &key) == 0) - { - session_set_flow_type(sess, FLOW_TYPE_C2S); - } - else - { - session_set_flow_type(sess, FLOW_TYPE_S2C); - } - packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, sess); - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess); - return; + packet_set_exdata(pkt, sess_mgr->pkt_ex_to_get_sess, sess); -slow_path: - if (session_get_type(sess) == SESSION_TYPE_TCP) + while ((sess = session_manager_rte_get_evicted_session(sess_mgr_rte))) { - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_tcp, sess); - while ((seg = session_get_tcp_segment(sess))) - { - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_stream, seg); - } + notify_sess_closed_by_pseudo_pkt(sess_mgr, thread_id, sess); } - else - { - mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_udp, sess); - } - packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, sess); - return; - -fast_path: - packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, NULL); - return; } static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr); - struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id]; + struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id); - struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->sche->pkt_ex_id); + struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->pkt_ex_to_get_sess); if (sess) { struct tuple6 key; @@ -255,18 +193,35 @@ static void on_packet_output(struct packet *pkt, enum packet_stage stage, void * static void on_polling(struct module_manager *mod_mgr, void *args) { + struct session *sess = NULL; uint64_t now_ms = clock_get_real_time_ms(); struct session_manager *sess_mgr = (struct session_manager *)args; int thread_id = module_manager_get_thread_id(mod_mgr); - struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id]; - struct session_manager_stat *sess_mgr_stat = session_manager_rte_get_stat(sess_mgr_rte); + struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id); - clean_closed_session(sess_mgr, thread_id, now_ms); + static __thread uint64_t last_clean_expired_sess_ts = 0; + if (now_ms - last_clean_expired_sess_ts > sess_mgr->cfg->expire_period_ms) + { + for (uint64_t i = 0; i < sess_mgr->cfg->expire_batch_max; i++) + { + sess = session_manager_rte_get_expired_session(sess_mgr_rte, now_ms); + if (sess) + { + notify_sess_closed_by_pseudo_pkt(sess_mgr, thread_id, sess); + } + else + { + break; + } + } + last_clean_expired_sess_ts = now_ms; + } static __thread uint64_t last_sync_stat_ms = 0; static __thread struct session_manager_stat last_stat = {0}; if (now_ms - last_sync_stat_ms >= SYNC_STAT_INTERVAL_MS) { + struct session_manager_stat *sess_mgr_stat = session_manager_rte_get_stat(sess_mgr_rte); for (int i = 0; i < SESS_MGR_STAT_MAX; i++) { uint64_t val = session_manager_stat_get(sess_mgr_stat, i) - session_manager_stat_get(&last_stat, i); @@ -277,100 +232,6 @@ static void on_polling(struct module_manager *mod_mgr, void *args) } } -static void session_manager_sche_free(struct session_manager_sche *sess_mgr_schema) -{ - if (sess_mgr_schema) - { - if (sess_mgr_schema->mq_sche) - { - mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_tcp); - mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_udp); - mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_ctrl); - mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_stream); - } - exdata_schema_free(sess_mgr_schema->ex_sche); - - free(sess_mgr_schema); - sess_mgr_schema = NULL; - } -} - -static struct session_manager_sche *session_manager_sche_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_sche, void *sess_mgr) -{ - if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr)) - { - SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD"); - return NULL; - } - if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr)) - { - SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT"); - return NULL; - } - - struct session_manager_sche *sess_mgr_schema = calloc(1, sizeof(struct session_manager_sche)); - if (sess_mgr_schema == NULL) - { - SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager_sche"); - return NULL; - } - - sess_mgr_schema->ex_sche = exdata_schema_new(); - if (sess_mgr_schema->ex_sche == NULL) - { - SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema"); - goto error_out; - } - - sess_mgr_schema->mq_sche = mq_sche; - sess_mgr_schema->pkt_ex_id = packet_manager_new_packet_exdata_index(pkt_mgr, "session_manager", NULL, NULL); - if (sess_mgr_schema->pkt_ex_id == -1) - { - SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index"); - goto error_out; - } - - /* - * Publish session closed messages to multiple topics. - * Each topic has its own session message free callback. - * To prevent the same session from being freeed multiple times, - * only TCP/UDP topics register session message free callbacks, - * and other topics do not register session message callbacks; - * - * Restriction: MQ ensures that the session message free order is consistent with the publishing order - */ - sess_mgr_schema->sess_msg_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_TCP", &on_sess_msg_dispatch, NULL, &on_sess_msg_free, sess_mgr); - if (sess_mgr_schema->sess_msg_id_tcp == -1) - { - SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_FREE"); - goto error_out; - } - sess_mgr_schema->sess_msg_id_udp = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_UDP", &on_sess_msg_dispatch, NULL, &on_sess_msg_free, sess_mgr); - if (sess_mgr_schema->sess_msg_id_udp == -1) - { - SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_UDP"); - goto error_out; - } - sess_mgr_schema->sess_msg_id_ctrl = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_CTRL_PKT", &on_sess_msg_dispatch, NULL, NULL, NULL); - if (sess_mgr_schema->sess_msg_id_ctrl == -1) - { - SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_CTRL_PKT"); - goto error_out; - } - sess_mgr_schema->sess_msg_id_stream = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_TCP_STREAM", &on_tcp_payload_msg_dispatch, NULL, &on_tcp_payload_msg_free, sess_mgr); - if (sess_mgr_schema->sess_msg_id_stream == -1) - { - SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_TCP_STREAM"); - goto error_out; - } - - return sess_mgr_schema; - -error_out: - session_manager_sche_free(sess_mgr_schema); - return NULL; -} - /****************************************************************************** * session manager ******************************************************************************/ @@ -379,9 +240,9 @@ void session_manager_free(struct session_manager *sess_mgr) { if (sess_mgr) { - if (sess_mgr->sche) + if (sess_mgr->ex_sche) { - session_manager_sche_free(sess_mgr->sche); + exdata_schema_free(sess_mgr->ex_sche); } if (sess_mgr->fs) { @@ -395,7 +256,7 @@ void session_manager_free(struct session_manager *sess_mgr) } } -static struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) +static struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, const char *toml_file) { struct session_manager *sess_mgr = calloc(1, sizeof(struct session_manager)); if (sess_mgr == NULL) @@ -422,18 +283,45 @@ static struct session_manager *session_manager_new(struct packet_manager *pkt_mg { sess_mgr->stat_idx[i] = fieldstat_easy_register_counter(sess_mgr->fs, sess_mgr_stat_str[i]); } - if (fieldstat_easy_enable_auto_output(sess_mgr->fs, "session_manager.fs4", 2) != 0) + if (fieldstat_easy_enable_auto_output(sess_mgr->fs, "metrics/session_manager.json", 2) != 0) { SESSION_MANAGER_LOG_ERROR("failed to enable auto output"); goto error_out; } - sess_mgr->sche = session_manager_sche_new(pkt_mgr, mq_schema, sess_mgr); - if (sess_mgr->sche == NULL) + if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr)) { + SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD"); + goto error_out; + } + if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr)) + { + SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT"); goto error_out; } + sess_mgr->ex_sche = exdata_schema_new(); + if (sess_mgr->ex_sche == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema"); + goto error_out; + } + + sess_mgr->pkt_ex_to_get_sess = packet_manager_new_packet_exdata_index(pkt_mgr, "pkt_ex_key_for_get_sess", NULL, NULL); + if (sess_mgr->pkt_ex_to_get_sess == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index"); + goto error_out; + } + + sess_mgr->pkt_ex_to_free_sess = packet_manager_new_packet_exdata_index(pkt_mgr, "pkt_ex_key_for_free_sess", free_session, sess_mgr); + if (sess_mgr->pkt_ex_to_free_sess == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index"); + goto error_out; + } + + sess_mgr->pkt_mgr = pkt_mgr; return sess_mgr; error_out: @@ -446,44 +334,15 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c assert(sess_mgr); assert(name); assert(func); - return exdata_schema_new_index(sess_mgr->sche->ex_sche, name, func, arg); + return exdata_schema_new_index(sess_mgr->ex_sche, name, func, arg); } -int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args) -{ - assert(sess_mgr); - assert(cb); - return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_tcp, (on_msg_cb_func *)(void *)cb, args); -} - -int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args) -{ - assert(sess_mgr); - assert(cb); - return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_udp, (on_msg_cb_func *)(void *)cb, args); -} - -int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args) -{ - assert(sess_mgr); - assert(cb); - return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_ctrl, (on_msg_cb_func *)(void *)cb, args); -} - -int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args) -{ - assert(sess_mgr); - assert(cb); - return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_stream, (on_msg_cb_func *)(void *)cb, args); -} - -int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id, struct mq_runtime *mq_rte) +int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id) { assert(sess_mgr); uint64_t now_ms = clock_get_real_time_ms(); sess_mgr->cfg->session_id_seed = sess_mgr->cfg->instance_id << 8 | thread_id; - sess_mgr->mq[thread_id] = mq_rte; sess_mgr->rte[thread_id] = session_manager_rte_new(sess_mgr->cfg, now_ms); if (sess_mgr->rte[thread_id] == NULL) { @@ -500,22 +359,26 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id) { assert(sess_mgr); - struct mq_runtime *mq_rte = sess_mgr->mq[thread_id]; - if (sess_mgr->rte[thread_id] == NULL) + struct session_manager_rte *rte = session_manager_get_rte(sess_mgr, thread_id); + + if (rte == NULL) { return; } - struct session_manager_stat *stat = session_manager_rte_get_stat(sess_mgr->rte[thread_id]); + struct session *sess = NULL; + struct session_manager_stat *stat = session_manager_rte_get_stat(rte); + while (stat->tcp_sess_used || stat->udp_sess_used) { - clean_closed_session(sess_mgr, thread_id, UINT64_MAX); - // here we need to dispatch the message to ensure that the session is cleaned up - mq_runtime_dispatch(mq_rte); + while ((sess = session_manager_rte_get_expired_session(rte, UINT64_MAX))) + { + exdata_runtime_free((struct exdata_runtime *)session_get_user_data(sess)); + session_manager_rte_free_session(rte, sess); + } } - session_manager_rte_free(sess_mgr->rte[thread_id]); - sess_mgr->rte[thread_id] = NULL; + session_manager_rte_free(rte); } /****************************************************************************** @@ -535,12 +398,10 @@ struct module *session_manager_on_init(struct module_manager *mod_mgr) struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME); struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod); assert(pkt_mgr); - struct mq_schema *mq_sche = module_manager_get_mq_schema(mod_mgr); - assert(mq_sche); const char *toml_file = module_manager_get_toml_path(mod_mgr); assert(toml_file); - struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_sche, toml_file); + struct session_manager *sess_mgr = session_manager_new(pkt_mgr, toml_file); if (sess_mgr == NULL) { return NULL; @@ -575,10 +436,9 @@ struct module *session_manager_on_thread_init(struct module_manager *mod_mgr, in { struct session_manager *sess_mgr = module_get_ctx(mod); assert(sess_mgr); - struct mq_runtime *mq_rte = module_manager_get_mq_runtime(mod_mgr); - assert(mq_rte); - if (session_manager_init(sess_mgr, thread_id, mq_rte) != 0) + __thread_pkt_ex_to_get_sess = sess_mgr->pkt_ex_to_get_sess; + if (session_manager_init(sess_mgr, thread_id) != 0) { SESSION_MANAGER_LOG_ERROR("failed to int session_manager_init"); return NULL; @@ -592,10 +452,11 @@ struct module *session_manager_on_thread_init(struct module_manager *mod_mgr, in void session_manager_on_thread_exit(struct module_manager *mod_mgr, int thread_id, struct module *mod) { struct session_manager *sess_mgr = module_get_ctx(mod); - assert(sess_mgr); - assert(thread_id < (int)sess_mgr->cfg->thread_num); - - session_manager_clean(sess_mgr, thread_id); + if (sess_mgr) + { + assert(thread_id < (int)sess_mgr->cfg->thread_num); + session_manager_clean(sess_mgr, thread_id); + } } struct session_manager_rte *session_manager_get_rte(struct session_manager *sess_mgr, uint16_t thread_id) @@ -609,4 +470,9 @@ struct session_manager_cfg *session_manager_get_cfg(struct session_manager *sess { assert(sess_mgr); return sess_mgr->cfg; +} + +struct session *packet_exdata_to_session(struct packet *pkt) +{ + return (struct session *)packet_get_exdata(pkt, __thread_pkt_ex_to_get_sess); } \ No newline at end of file diff --git a/infra/session_manager/session_manager_rte.c b/infra/session_manager/session_manager_rte.c index 2c534a1..c3eb668 100644 --- a/infra/session_manager/session_manager_rte.c +++ b/infra/session_manager/session_manager_rte.c @@ -34,7 +34,6 @@ struct session_manager_rte // only used for session_set_discard() or session_manager_rte_record_duplicated_packet(), because the function is called by plugin and has no time input. uint64_t now_ms; - uint64_t last_clean_expired_sess_ts; struct snowflake *sf; }; @@ -810,7 +809,6 @@ struct session_manager_rte *session_manager_rte_new(const struct session_manager TAILQ_INIT(&sess_mgr_rte->evc_list); session_transition_init(); sess_mgr_rte->now_ms = now_ms; - sess_mgr_rte->last_clean_expired_sess_ts = now_ms; return sess_mgr_rte; @@ -1064,55 +1062,6 @@ struct session *session_manager_rte_get_evicted_session(struct session_manager_r return sess; } -uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size) -{ - sess_mgr_rte->now_ms = now_ms; - struct session *sess = NULL; - uint64_t cleaned_sess_num = 0; - uint64_t expired_sess_num = 0; - - uint8_t expired_sess_canbe_clean = 0; - if (now_ms - sess_mgr_rte->last_clean_expired_sess_ts >= sess_mgr_rte->cfg.expire_period_ms || - now_ms == UINT64_MAX) - { - expired_sess_canbe_clean = 1; - } - - for (uint64_t i = 0; i < array_size; i++) - { - // frist clean evicted session - sess = session_manager_rte_get_evicted_session(sess_mgr_rte); - if (sess) - { - cleaned_sess_ptr[cleaned_sess_num++] = sess; - } - // then clean expired session - else - { - if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rte->cfg.expire_batch_max) - { - sess_mgr_rte->last_clean_expired_sess_ts = now_ms; - sess = session_manager_rte_get_expired_session(sess_mgr_rte, now_ms); - if (sess) - { - cleaned_sess_ptr[cleaned_sess_num++] = sess; - expired_sess_num++; - } - else - { - break; - } - } - else - { - break; - } - } - } - - return cleaned_sess_num; -} - uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size) { uint64_t capacity = 0; diff --git a/infra/session_manager/session_manager_rte.h b/infra/session_manager/session_manager_rte.h index 8adae88..9a3e9e5 100644 --- a/infra/session_manager/session_manager_rte.h +++ b/infra/session_manager/session_manager_rte.h @@ -43,7 +43,6 @@ int session_manager_rte_update_session(struct session_manager_rte *sess_mgr_rte, struct session *session_manager_rte_get_expired_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms); struct session *session_manager_rte_get_evicted_session(struct session_manager_rte *sess_mgr_rte); -uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size); uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size); void session_manager_rte_record_duplicated_packet(struct session_manager_rte *sess_mgr_rte, const struct packet *pkt); diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c index fae6e77..be2d392 100644 --- a/infra/session_manager/session_utils.c +++ b/infra/session_manager/session_utils.c @@ -5,9 +5,6 @@ void session_init(struct session *sess) { memset(sess, 0, sizeof(struct session)); - sess->empty_seg.data = NULL; - sess->empty_seg.len = 0; - sess->empty_seg.user_data = sess; } void session_set_id(struct session *sess, uint64_t id) @@ -212,7 +209,6 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess) { sess->sess_mgr_stat->tcp_segs_consumed++; half->inorder_seg_consumed = 1; - half->inorder_seg.user_data = sess; return &half->inorder_seg; } else @@ -226,7 +222,6 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess) // TODO sess->sess_mgr_stat->tcp_segs_consumed++; sess->sess_mgr_stat->tcp_segs_reordered++; - seg->user_data = sess; } return seg; } @@ -239,12 +234,6 @@ void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg) return; } - // empty segment for end of session - if (seg == &sess->empty_seg) - { - return; - } - enum flow_type type = session_get_flow_type(sess); struct tcp_half *half = &sess->tcp_halfs[type]; diff --git a/infra/session_manager/test/CMakeLists.txt b/infra/session_manager/test/CMakeLists.txt index e0e7ede..2b346d9 100644 --- a/infra/session_manager/test/CMakeLists.txt +++ b/infra/session_manager/test/CMakeLists.txt @@ -149,4 +149,6 @@ gtest_discover_tests(gtest_session_transition) gtest_discover_tests(gtest_sess_mgr_tcp_reassembly) gtest_discover_tests(gtest_sess_mgr_scan) -gtest_discover_tests(gtest_case_tcp_fast_open) \ No newline at end of file +gtest_discover_tests(gtest_case_tcp_fast_open) + +file(COPY ./metrics/ DESTINATION ./metrics/) \ No newline at end of file diff --git a/infra/stellar_core.c b/infra/stellar_core.c index bdb730e..6272241 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -95,6 +95,8 @@ static void *worker_thread(void *arg) } } + CORE_LOG_FATAL("worker thread %d cleaning", thread_id); + module_manager_unregister_thread(mod_mgr, thread_id); mq_runtime_free(mq_rt); diff --git a/infra/tcp_reassembly/tcp_reassembly.h b/infra/tcp_reassembly/tcp_reassembly.h index 87660c4..50e369b 100644 --- a/infra/tcp_reassembly/tcp_reassembly.h +++ b/infra/tcp_reassembly/tcp_reassembly.h @@ -12,7 +12,6 @@ struct tcp_segment { uint32_t len; const void *data; - void *user_data; }; struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len); diff --git a/infra/utils_internal.h b/infra/utils_internal.h index c026418..b2deb07 100644 --- a/infra/utils_internal.h +++ b/infra/utils_internal.h @@ -17,7 +17,7 @@ extern "C" #define RX_BURST_MAX 32 #define MAX_THREAD_NUM 256 // limit by snowflake -#define SYNC_STAT_INTERVAL_MS 1000 +#define SYNC_STAT_INTERVAL_MS 1 // TODO #define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED) #define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED) diff --git a/infra/version.map b/infra/version.map index 8560564..47a20b9 100644 --- a/infra/version.map +++ b/infra/version.map @@ -25,12 +25,13 @@ global: packet_manager_build_tcp_packet; packet_manager_build_udp_packet; packet_manager_build_l3_packet; + packet_manager_dup_packet; + packet_manager_free_packet; session_is_symmetric; session_has_duplicate_traffic; session_get_type; session_get_current_state; - session_get_current_packet; session_get_closing_reason; session_get_direction; session_get_flow_type; @@ -47,10 +48,7 @@ global: session_manager_on_thread_init; session_manager_on_thread_exit; session_manager_new_session_exdata_index; - session_manager_subscribe_tcp; - session_manager_subscribe_udp; - session_manager_subscribe_control_packet; - session_manager_subscribe_tcp_stream; + packet_exdata_to_session; session_monitor_on_init; session_monitor_on_exit; diff --git a/scripts/stat_format.sh b/scripts/stat_format.sh index 999e839..05bcf08 100644 --- a/scripts/stat_format.sh +++ b/scripts/stat_format.sh @@ -6,4 +6,5 @@ if [ $# -ne 1 ]; then fi f4_json_file=$1 +# python3 -m pip install prettytable jinja2 /opt/MESA/bin/fieldstat_exporter.py local -j $f4_json_file -l --clear-screen diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 11d1abc..5f93a2e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,7 +1,7 @@ #add_subdirectory(packet_inject) add_subdirectory(packet_tool) -add_subdirectory(session_debugger) -add_subdirectory(lpi_plus) +#add_subdirectory(session_debugger) +#add_subdirectory(lpi_plus) #add_subdirectory(decoders/http) #add_subdirectory(decoders/socks) #add_subdirectory(decoders/stratum) diff --git a/test/monitor/CMakeLists.txt b/test/monitor/CMakeLists.txt index 5f447da..8b5ca5f 100644 --- a/test/monitor/CMakeLists.txt +++ b/test/monitor/CMakeLists.txt @@ -26,7 +26,8 @@ foreach(tfile ${MONITOR_TEST_FILE}) endforeach() set(MONITOR_TEST_RUN_DIR ${CMAKE_CURRENT_BINARY_DIR}) -add_test(NAME MONITOR_ENV_SETUP COMMAND sh -c "mkdir -p ${MONITOR_TEST_RUN_DIR}/conf && +add_test(NAME MONITOR_ENV_SETUP COMMAND sh -c "mkdir -p ${MONITOR_TEST_RUN_DIR}/metrics && + mkdir -p ${MONITOR_TEST_RUN_DIR}/conf && mkdir -p ${MONITOR_TEST_RUN_DIR}/plugin && mkdir -p ${MONITOR_TEST_RUN_DIR}/log && mkdir -p ${MONITOR_TEST_RUN_DIR}/pcap &&