diff --git a/decoders/lpi_plus/lpip_module.c b/decoders/lpi_plus/lpip_module.c index 2b801c5..6bffeec 100644 --- a/decoders/lpi_plus/lpip_module.c +++ b/decoders/lpi_plus/lpip_module.c @@ -286,9 +286,13 @@ static int lpi_plus_detect(struct lpi_plus_detect_context *ctx, struct lpi_plus_ return new_appid; } -static void lpi_plus_on_session(struct session *sess, struct packet *pkt, void *args) +static void lpi_plus_on_session(struct session *sess, enum session_state state, struct packet *pkt, void *args) { - if(pkt==NULL)return; + if (state == SESSION_STATE_CLOSED) + { + assert(pkt == NULL); + return; + } struct lpi_plus_env *env=(struct lpi_plus_env *)args; struct lpi_plus_exdata *exdata = (struct lpi_plus_exdata *)session_get_exdata(sess, env->lpip_session_exdata_idx); if(exdata==NULL) diff --git a/include/stellar/session.h b/include/stellar/session.h index aff1974..4710dfa 100644 --- a/include/stellar/session.h +++ b/include/stellar/session.h @@ -151,15 +151,15 @@ struct session_manager; struct session_manager *stellar_module_get_session_manager(struct stellar_module_manager *mod_mgr); int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg); -typedef void on_session_free_callback(struct session *sess, void *args); -typedef void on_session_packet_callback(struct session *sess, struct packet *pkt, void *args); -typedef void on_tcp_stream_callback(struct session *sess, const char *tcp_payload, uint32_t tcp_payload_len, void *args); +// When the state is SESSION_STATE_CLOSED, the packet is NULL, and the session will be destroyed. +typedef void on_session_message_callback(struct session *sess, enum session_state state, struct packet *pkt, void *args); +// When the state is SESSION_STATE_CLOSED, the tcp_payload is NULL, and the session will be destroyed. +typedef void on_tcp_payload_callback(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *args); -int session_manager_subscribe_free(struct session_manager *sess_mgr, on_session_free_callback *cb, void *args); -int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args); -int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args); -int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args); -int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args); +int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args); +int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args); +int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args); +int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args); #ifdef __cplusplus } diff --git a/infra/session_manager/session_internal.h b/infra/session_manager/session_internal.h index b117ba0..51413dc 100644 --- a/infra/session_manager/session_internal.h +++ b/infra/session_manager/session_internal.h @@ -21,9 +21,8 @@ extern "C" struct tcp_half { struct tcp_reassembly *tcp_reass; - struct tcp_segment in_order; // current packet in order segment - uint32_t in_order_ref; // reference count of current packet in order segment - + struct tcp_segment inorder_seg; // current packet in order segment + uint32_t inorder_seg_consumed; uint32_t seq; // current packet sequence number uint32_t ack; // current packet ack number uint16_t len; // current packet payload length @@ -48,6 +47,7 @@ struct session uint64_t timestamps[MAX_TIMESTAMP]; // realtime msec struct tcp_half tcp_halfs[MAX_FLOW_TYPE]; struct timeout timeout; + struct tcp_segment empty_seg; TAILQ_ENTRY(session) lru_tqe; TAILQ_ENTRY(session) free_tqe; TAILQ_ENTRY(session) evicte_tqe; diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 3aa9af4..7fd0805 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -21,7 +21,6 @@ struct session_manager_schema int pkt_exdata_idx; - int topic_id_free; int topic_id_tcp; int topic_id_udp; int topic_id_ctrl_pkt; @@ -40,28 +39,25 @@ struct session_manager * callback ******************************************************************************/ -static void on_session_free_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) -{ - struct session *sess = (struct session *)msg; - ((on_session_free_callback *)(void *)cb)(sess, cb_args); -} - -static void on_session_packet_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) +static void on_session_message_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) { struct session *sess = (struct session *)msg; struct packet *pkt = (struct packet *)session_get0_current_packet(sess); + enum session_state state = session_get_current_state(sess); - ((on_session_packet_callback *)(void *)cb)(sess, pkt, cb_args); + ((on_session_message_callback *)(void *)cb)(sess, state, pkt, cb_args); } -static void on_tcp_stream_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) +static void on_tcp_payload_message_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) { struct tcp_segment *seg = (struct tcp_segment *)msg; + struct session *sess = (struct session *)seg->user_data; + enum session_state state = session_get_current_state(sess); - ((on_tcp_stream_callback *)(void *)cb)(seg->user_data, seg->data, seg->len, cb_args); + ((on_tcp_payload_callback *)(void *)cb)(sess, state, seg->data, seg->len, cb_args); } -static void on_tcp_stream_free(void *msg, void *args) +static void on_tcp_payload_message_free(void *msg, void *args) { struct tcp_segment *seg = (struct tcp_segment *)msg; struct session *sess = (struct session *)seg->user_data; @@ -69,29 +65,31 @@ static void on_tcp_stream_free(void *msg, void *args) session_free_tcp_segment(sess, seg); } -static void on_session_free(void *msg, void *args) +static void on_session_message_free(void *msg, void *args) { struct session *sess = (struct session *)msg; - struct session_manager *sess_mgr = (struct session_manager *)args; - struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; - int thread_id = stellar_module_manager_get_thread_id(mod_mgr); - struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; - char buffer[4096] = {0}; - session_to_str(sess, 0, buffer, sizeof(buffer)); - SESSION_MANAGER_LOG_INFO("session free: %s", buffer); + if (session_get_current_state(sess) == SESSION_STATE_CLOSED) + { + struct session_manager *sess_mgr = (struct session_manager *)args; + int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr); + struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; - struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess); - exdata_runtime_free(exdata_rt); - session_manager_runtime_free_session(sess_mgr_rt, sess); + char buffer[4096] = {0}; + session_to_str(sess, 0, buffer, sizeof(buffer)); + SESSION_MANAGER_LOG_INFO("session free: %s", buffer); + + struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess); + exdata_runtime_free(exdata_rt); + session_manager_runtime_free_session(sess_mgr_rt, sess); + } } static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; - struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; - int thread_id = stellar_module_manager_get_thread_id(mod_mgr); - struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr); + int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr); + struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr); struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; /* @@ -181,8 +179,7 @@ fast_path: static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; - struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; - int thread_id = stellar_module_manager_get_thread_id(mod_mgr); + int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr); struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx); @@ -227,24 +224,44 @@ static void on_packet_output(struct packet *pkt, enum packet_stage stage, void * } } -static void on_polling(struct stellar_module_manager *mod_mgr, void *args) +static void publish_session_closed_message_batch(struct session_manager *sess_mgr, uint16_t thread_id, uint64_t now_ms) { - uint64_t now_ms = clock_get_real_time_ms(); + struct session *sess = NULL; struct session *cleaned[MAX_CLEANED_SESS] = {NULL}; - struct session_manager *sess_mgr = (struct session_manager *)args; - int thread_id = stellar_module_manager_get_thread_id(mod_mgr); - struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr); struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; + struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr); uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned, MAX_CLEANED_SESS); for (uint64_t i = 0; i < used; i++) { - mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]); - } + sess = cleaned[i]; + assert(session_get_current_state(sess) == SESSION_STATE_CLOSED); + session_set_current_packet(sess, NULL); + session_set_flow_type(sess, FLOW_TYPE_NONE); + if (session_get_type(sess) == SESSION_TYPE_TCP) + { + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp_stream, &sess->empty_seg); + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess); + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp, sess); + } + else + { + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess); + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess); + } + } +} + +static void on_polling(struct stellar_module_manager *mod_mgr, void *args) +{ + uint64_t now_ms = clock_get_real_time_ms(); + struct session_manager *sess_mgr = (struct session_manager *)args; + int thread_id = stellar_module_manager_get_thread_id(mod_mgr); + + publish_session_closed_message_batch(sess_mgr, thread_id, now_ms); // TODO // ouput stat to fs4 - // session_manager_runtime_print_stat(sess_mgr_rt); } /****************************************************************************** @@ -257,7 +274,6 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema) { if (sess_mgr_schema->mq) { - mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_free); mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp); mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_udp); mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_ctrl_pkt); @@ -270,14 +286,14 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema) } } -struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *subscribe_args) +struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *sess_mgr) { - if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, subscribe_args)) + if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr)) { SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD"); return NULL; } - if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, subscribe_args)) + if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr)) { SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT"); return NULL; @@ -305,31 +321,34 @@ struct session_manager_schema *session_manager_schema_new(struct packet_manager goto error_out; } - sess_mgr_schema->topic_id_free = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_FREE", &on_session_free_dispatch, NULL, &on_session_free, subscribe_args); - if (sess_mgr_schema->topic_id_free == -1) - { - SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE"); - goto error_out; - } - sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_packet_dispatch, NULL, NULL, NULL); + /* + * Publish session closed messages to multiple topics. + * Each topic has its own session message free callback. + * To prevent the same session from being freeed multiple times, + * only TCP/UDP topics register session message free callbacks, + * and other topics do not register session message callbacks; + * + * Restriction: MQ ensures that the session message free order is consistent with the publishing order + */ + sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_message_dispatch, NULL, &on_session_message_free, sess_mgr); if (sess_mgr_schema->topic_id_tcp == -1) { SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE"); goto error_out; } - sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_packet_dispatch, NULL, NULL, NULL); + sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_message_dispatch, NULL, &on_session_message_free, sess_mgr); if (sess_mgr_schema->topic_id_udp == -1) { SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_UDP"); goto error_out; } - sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_packet_dispatch, NULL, NULL, NULL); + sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_message_dispatch, NULL, NULL, NULL); if (sess_mgr_schema->topic_id_ctrl_pkt == -1) { SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_CTRL_PKT"); goto error_out; } - sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL); + sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_payload_message_dispatch, NULL, &on_tcp_payload_message_free, sess_mgr); if (sess_mgr_schema->topic_id_tcp_stream == -1) { SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_TCP_STREAM"); @@ -397,15 +416,7 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c return exdata_schema_new_index(sess_mgr->schema->exdata, name, func, arg); } -int session_manager_subscribe_free(struct session_manager *sess_mgr, on_session_free_callback *cb, void *args) -{ - assert(sess_mgr); - assert(cb); - - return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_free, (on_msg_cb_func *)(void *)cb, args); -} - -int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args) +int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args) { assert(sess_mgr); assert(cb); @@ -413,7 +424,7 @@ int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_p return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp, (on_msg_cb_func *)(void *)cb, args); } -int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args) +int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args) { assert(sess_mgr); assert(cb); @@ -421,7 +432,7 @@ int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_p return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_udp, (on_msg_cb_func *)(void *)cb, args); } -int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args) +int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args) { assert(sess_mgr); assert(cb); @@ -429,7 +440,7 @@ int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, o return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_ctrl_pkt, (on_msg_cb_func *)(void *)cb, args); } -int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args) +int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args) { assert(sess_mgr); assert(cb); @@ -462,8 +473,7 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id) assert(sess_mgr); assert(thread_id < sess_mgr->cfg->thread_num); - struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; - struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr); + struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr); struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; if (sess_mgr_rt == NULL) { @@ -473,13 +483,9 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id) struct session_manager_stat *stat = session_manager_runtime_get_stat(sess_mgr_rt); while (stat->tcp_sess_used || stat->udp_sess_used) { - struct session *cleaned[MAX_CLEANED_SESS] = {NULL}; - uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, UINT64_MAX, cleaned, MAX_CLEANED_SESS); - for (uint64_t i = 0; i < used; i++) - { - mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]); - mq_runtime_dispatch(mq_rt); - } + publish_session_closed_message_batch(sess_mgr, thread_id, UINT64_MAX); + // here we need to dispatch the message to ensure that the session is cleaned up + mq_runtime_dispatch(mq_rt); } SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, thread_id); diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c index 9c4f123..97fbb28 100644 --- a/infra/session_manager/session_manager_runtime.c +++ b/infra/session_manager/session_manager_runtime.c @@ -243,9 +243,9 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len); sess_mgr_rt->stat.tcp_segs_inorder++; - half->in_order.data = tcp_layer->pld_ptr; - half->in_order.len = len; - half->in_order_ref = 0; + half->inorder_seg.data = tcp_layer->pld_ptr; + half->inorder_seg.len = len; + half->inorder_seg_consumed = 0; } return; } @@ -284,9 +284,9 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len); sess_mgr_rt->stat.tcp_segs_inorder++; - half->in_order.data = tcp_layer->pld_ptr; - half->in_order.len = len; - half->in_order_ref = 0; + half->inorder_seg.data = tcp_layer->pld_ptr; + half->inorder_seg.len = len; + half->inorder_seg_consumed = 0; tcp_reassembly_inc_recv_next(half->tcp_reass, len); } // retransmission diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c index 630cf59..801ddd4 100644 --- a/infra/session_manager/session_utils.c +++ b/infra/session_manager/session_utils.c @@ -5,6 +5,9 @@ void session_init(struct session *sess) { memset(sess, 0, sizeof(struct session)); + sess->empty_seg.data = NULL; + sess->empty_seg.len = 0; + sess->empty_seg.user_data = sess; } void session_set_id(struct session *sess, uint64_t id) @@ -205,12 +208,12 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess) enum flow_type type = session_get_flow_type(sess); struct tcp_half *half = &sess->tcp_halfs[type]; - if (half->in_order.data != NULL && half->in_order.len > 0 && half->in_order_ref == 0) + if (half->inorder_seg.data != NULL && half->inorder_seg.len > 0 && !half->inorder_seg_consumed) { sess->sess_mgr_stat->tcp_segs_consumed++; - half->in_order_ref++; - half->in_order.user_data = sess; - return &half->in_order; + half->inorder_seg_consumed = 1; + half->inorder_seg.user_data = sess; + return &half->inorder_seg; } else { @@ -236,15 +239,23 @@ void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg) return; } + // empty segment for end of session + if (seg == &sess->empty_seg) + { + return; + } + enum flow_type type = session_get_flow_type(sess); struct tcp_half *half = &sess->tcp_halfs[type]; - if (seg == &half->in_order) + // in order segment + if (seg == &half->inorder_seg) { - half->in_order.data = NULL; - half->in_order.len = 0; + half->inorder_seg.data = NULL; + half->inorder_seg.len = 0; return; } + // tcp reassembly segment else { session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RELEASED, 1); diff --git a/infra/version.map b/infra/version.map index 9f1ee3a..9941f31 100644 --- a/infra/version.map +++ b/infra/version.map @@ -24,10 +24,6 @@ global: packet_manager_claim_packet; packet_manager_schedule_packet; - exdata_*; - mq_*; - stellar_module_*; - session_is_symmetric; session_has_duplicate_traffic; session_get_type; @@ -44,6 +40,19 @@ global: session_set_discard; session_get_exdata; session_set_exdata; + session_manager_on_init; + session_manager_on_exit; + session_manager_on_thread_init; + session_manager_on_thread_exit; + session_manager_new_session_exdata_index; + session_manager_subscribe_tcp; + session_manager_subscribe_udp; + session_manager_subscribe_control_packet; + session_manager_subscribe_tcp_stream; + + exdata_*; + mq_*; + stellar_module_*; stellar_new; stellar_run; @@ -58,17 +67,6 @@ global: log_print; log_check_level; - session_manager_on_init; - session_manager_on_exit; - session_manager_on_thread_init; - session_manager_on_thread_exit; - session_manager_new_session_exdata_index; - session_manager_subscribe_free; - session_manager_subscribe_tcp; - session_manager_subscribe_udp; - session_manager_subscribe_control_packet; - session_manager_subscribe_tcp_stream; - http_message_*; http_decoder_init; http_decoder_exit; diff --git a/test/lpi_plus/gtest_lpip_module.c b/test/lpi_plus/gtest_lpip_module.c index abcbe61..49382a8 100644 --- a/test/lpi_plus/gtest_lpip_module.c +++ b/test/lpi_plus/gtest_lpip_module.c @@ -130,9 +130,14 @@ static void gtest_lpip_on_appid_msg(struct session *sess, enum APPID_ORIGIN orig return; } -static void on_session(struct session *sess, struct packet *pkt, void *args) +static void on_session(struct session *sess, enum session_state state, struct packet *pkt, void *args) { - if(sess==NULL || pkt==NULL || args==NULL)return; + if (state == SESSION_STATE_CLOSED) + { + assert(pkt == NULL); + return; + } + struct test_lpip_env *env = (struct test_lpip_env *)args; if (session_get_current_state(sess) == SESSION_STATE_OPENING) { diff --git a/test/session_debugger/session_debugger.c b/test/session_debugger/session_debugger.c index d75aede..710b288 100644 --- a/test/session_debugger/session_debugger.c +++ b/test/session_debugger/session_debugger.c @@ -168,7 +168,7 @@ static void session_debugger_exdata_free_callback(int idx, void *ex_ptr, void *a session_debugger_exdata_free((struct session_debugger_exdata *)ex_ptr); } -static void on_session_free(struct session *sess, void *arg) +static void on_session_closed(struct session *sess, void *arg) { struct session_debugger *dbg = (struct session_debugger *)arg; struct session_debugger_exdata *exdata = (struct session_debugger_exdata *)session_get_exdata(sess, dbg->sess_exdata_idx); @@ -201,9 +201,17 @@ static void on_session_free(struct session *sess, void *arg) session_debugger_log(exdata->dbg->fd, "session %lu %s statistics:\n%s", session_get_id(exdata->sess), session_get0_readable_addr(exdata->sess), buff); } -static void on_session_packet(struct session *sess, struct packet *pkt, void *arg) +static void on_session_message(struct session *sess, enum session_state state, struct packet *pkt, void *arg) { struct session_debugger *dbg = (struct session_debugger *)arg; + + if (state == SESSION_STATE_CLOSED) + { + on_session_closed(sess, dbg); + assert(pkt == NULL); + return; + } + int is_ctrl = packet_is_ctrl(pkt); char buff[PATH_MAX]; @@ -256,10 +264,17 @@ static void on_session_packet(struct session *sess, struct packet *pkt, void *ar pthread_spin_unlock(&dbg->lock); } -static void on_tcp_stream(struct session *sess, const char *tcp_payload, uint32_t tcp_payload_len, void *arg) +static void on_tcp_payload_message(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *arg) { struct session_debugger *dbg = (struct session_debugger *)arg; + if (state == SESSION_STATE_CLOSED) + { + assert(tcp_payload == NULL); + assert(tcp_payload_len == 0); + return; + } + char buff[PATH_MAX]; enum flow_type flow = session_get_flow_type(sess); assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C); @@ -292,10 +307,16 @@ static void on_tcp_stream(struct session *sess, const char *tcp_payload, uint32_ pthread_spin_unlock(&dbg->lock); } -static void on_udp_payload(struct session *sess, struct packet *pkt, void *arg) +static void on_udp_payload_message(struct session *sess, enum session_state state, struct packet *pkt, void *arg) { struct session_debugger *dbg = (struct session_debugger *)arg; + if (state == SESSION_STATE_CLOSED) + { + assert(pkt == NULL); + return; + } + char buff[PATH_MAX]; enum flow_type flow = session_get_flow_type(sess); assert(flow == FLOW_TYPE_C2S || flow == FLOW_TYPE_S2C); @@ -373,32 +394,27 @@ static struct session_debugger *session_debugger_new(struct session_manager *ses goto error_out; } - if (session_manager_subscribe_free(sess_mgr, on_session_free, dbg) == -1) - { - session_debugger_log(STDERR_FILENO, "subscribe free failed\n"); - goto error_out; - } - if (session_manager_subscribe_tcp(sess_mgr, on_session_packet, dbg) == -1) + if (session_manager_subscribe_tcp(sess_mgr, on_session_message, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe tcp failed\n"); goto error_out; } - if (session_manager_subscribe_udp(sess_mgr, on_session_packet, dbg) == -1) + if (session_manager_subscribe_udp(sess_mgr, on_session_message, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe udp failed\n"); goto error_out; } - if (session_manager_subscribe_control_packet(sess_mgr, on_session_packet, dbg) == -1) + if (session_manager_subscribe_control_packet(sess_mgr, on_session_message, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe control packet failed\n"); goto error_out; } - if (session_manager_subscribe_tcp_stream(sess_mgr, on_tcp_stream, dbg) == -1) + if (session_manager_subscribe_tcp_stream(sess_mgr, on_tcp_payload_message, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe tcp stream failed\n"); goto error_out; } - if (session_manager_subscribe_udp(sess_mgr, on_udp_payload, dbg) == -1) + if (session_manager_subscribe_udp(sess_mgr, on_udp_payload_message, dbg) == -1) { session_debugger_log(STDERR_FILENO, "subscribe udp failed\n"); goto error_out;