diff --git a/decoders/glimpse_detector/app_l7_protocol.cpp b/decoders/glimpse_detector/app_l7_protocol.cpp index 4a303bc..12750af 100644 --- a/decoders/glimpse_detector/app_l7_protocol.cpp +++ b/decoders/glimpse_detector/app_l7_protocol.cpp @@ -810,8 +810,8 @@ extern "C" void * APP_GLIMPSE_DETECTOR_LOAD(struct stellar *st) { goto INIT_ERROR; } - glimpse_detector_env->tcp_topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP); - glimpse_detector_env->udp_topic_id=stellar_mq_get_topic_id(st, TOPIC_UDP); + glimpse_detector_env->tcp_topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP_INPUT); + glimpse_detector_env->udp_topic_id=stellar_mq_get_topic_id(st, TOPIC_UDP_INPUT); if(glimpse_detector_env->tcp_topic_id < 0 || glimpse_detector_env->udp_topic_id < 0) { perror("get tcp or udp topic id failed\n"); diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 99fc74c..1e2e899 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -18,12 +18,14 @@ typedef void plugin_on_unload_func(void *plugin_env); typedef void *session_ctx_new_func(struct session *sess, void *plugin_env); typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env); -#define TOPIC_TCP "TCP" #define TOPIC_TCP_STREAM "TCP_STREAM" -#define TOPIC_UDP "UDP" -#define TOPIC_EGRESS "EGRESS" #define TOPIC_CONTROL_PACKET "CONTROL_PACKET" +#define TOPIC_TCP_INPUT "TCP_INPUT" +#define TOPIC_TCP_OUTPUT "TCP_OUTPUT" +#define TOPIC_UDP_INPUT "UDP_INPUT" +#define TOPIC_UDP_OUTPUT "UDP_OUTPUT" + //return session plugin_id int stellar_session_plugin_register(struct stellar *st, session_ctx_new_func session_ctx_new, diff --git a/infra/core/stellar_core.c b/infra/core/stellar_core.c index a2558e8..c6c9400 100644 --- a/infra/core/stellar_core.c +++ b/infra/core/stellar_core.c @@ -249,7 +249,7 @@ static void *work_thread(void *arg) defraged_pkt = NULL; pkt = &packets[i]; - plugin_manager_on_packet_ingress(plug_mgr, pkt); + plugin_manager_on_packet_input(plug_mgr, pkt); if (packet_is_fragment(pkt)) { defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms); @@ -260,7 +260,7 @@ static void *work_thread(void *arg) else { pkt = defraged_pkt; - plugin_manager_on_packet_ingress(plug_mgr, defraged_pkt); + plugin_manager_on_packet_input(plug_mgr, defraged_pkt); } } @@ -286,18 +286,18 @@ static void *work_thread(void *arg) { packet_set_session_id(pkt, session_get_id(sess)); } - plugin_manager_on_session_ingress(sess, pkt); + plugin_manager_on_session_input(sess, pkt); fast_path: - plugin_manager_on_session_egress(sess, pkt); + plugin_manager_on_session_output(sess, pkt); if (pkt == defraged_pkt) { - plugin_manager_on_packet_egress(plug_mgr, defraged_pkt); - plugin_manager_on_packet_egress(plug_mgr, &packets[i]); + plugin_manager_on_packet_output(plug_mgr, defraged_pkt); + plugin_manager_on_packet_output(plug_mgr, &packets[i]); } else { - plugin_manager_on_packet_egress(plug_mgr, pkt); + plugin_manager_on_packet_output(plug_mgr, pkt); } if (sess && session_get_current_state(sess) == SESSION_STATE_DISCARD) diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c index 1417fd9..37f6fe9 100644 --- a/infra/plugin_manager/plugin_manager.c +++ b/infra/plugin_manager/plugin_manager.c @@ -13,17 +13,14 @@ UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; -// TODO: set scratch_sess to per_thread_data -__thread struct session *per_thread_scratch_sess; - -inline static void plugin_manager_scratch_session_set(struct session *sess) +inline static void plugin_manager_scratch_session_set(struct plugin_manager_schema *plug_mgr, int tid, struct session *sess) { - per_thread_scratch_sess = sess; + plug_mgr->per_thread_data[tid].thread_scratch_session = sess; } -inline static struct session *plugin_manager_scratch_session_get() +inline static struct session *plugin_manager_scratch_session_get(struct plugin_manager_schema *plug_mgr, int tid) { - return per_thread_scratch_sess; + return plug_mgr->per_thread_data[tid].thread_scratch_session; } @@ -85,19 +82,19 @@ PLUGIN_SPEC_LOAD_ERROR: return NULL; } -static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) +static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) { if(st == NULL)return NULL; int thread_num=stellar_get_worker_thread_num(st); - struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num); + struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num); return per_thread_data; } -static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st) +static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st) { if(per_thread_data == NULL || st == NULL)return; int thread_num=stellar_get_worker_thread_num(st); - struct plugin_manger_per_thread_data *p_data; + struct plugin_manager_per_thread_data *p_data; for (int i = 0; i < thread_num; i++) { p_data=per_thread_data+i; @@ -109,7 +106,8 @@ static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_ static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg __attribute__((unused))) { - struct session *cur_sess = plugin_manager_scratch_session_get(); + struct plugin_manager_schema *plug_mgr=(struct plugin_manager_schema *)msg_free_arg; + struct session *cur_sess = plugin_manager_scratch_session_get(plug_mgr, stellar_get_current_thread_index()); if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg); } @@ -122,6 +120,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char return NULL; } struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); + //TODO: set max_message_dispatch as parameter plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH; if(spec_num > 0) { @@ -133,10 +132,11 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char stellar_set_plugin_manger(st, plug_mgr); - plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL); - plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL); - plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL); - plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); + plug_mgr->tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_INPUT, NULL, NULL); + plug_mgr->tcp_output_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_OUTPUT, NULL, NULL); + plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, plug_mgr); + plug_mgr->udp_input_topic_id=stellar_mq_create_topic(st, TOPIC_UDP_INPUT, NULL, NULL); + plug_mgr->udp_output_topic_id=stellar_mq_create_topic(st, TOPIC_UDP_OUTPUT, NULL, NULL); plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); for(int i = 0; i < spec_num; i++) @@ -895,7 +895,7 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, p return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE } -void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; @@ -919,7 +919,7 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st return; } -void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; int tid=stellar_get_current_thread_index(); @@ -992,40 +992,15 @@ int stellar_session_plugin_register(struct stellar *st, return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index } -void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) +void plugin_manager_on_session_input(struct session *sess, struct packet *pkt) { if(sess==NULL)return; struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); if(plug_mgr_rt==NULL)return; -#if 0 - int topic_id = -1; - //FIXME: get topic and tcp data by stellar api - switch (packet_get_type(pkt)) - { - case TCP: - topic_id=plug_mgr_rt->plug_mgr->tcp_topic_id; - break; - case TCP_STREAM: - topic_id=plug_mgr_rt->plug_mgr->tcp_stream_topic_id; - break; - case UDP: - topic_id=plug_mgr_rt->plug_mgr->udp_topic_id; - break; - case CONTROL: - topic_id=plug_mgr_rt->plug_mgr->control_packet_topic_id; - break; - default: - break; - } plug_mgr_rt->pub_session_msg_cnt=0; - session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); int tid=stellar_get_current_thread_index(); - stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); -#endif - - plug_mgr_rt->pub_session_msg_cnt=0; - plugin_manager_scratch_session_set(sess); + plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, sess); struct tcp_segment *seg; enum session_type type = session_get_type(sess); @@ -1038,14 +1013,14 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) switch (type) { case SESSION_TYPE_TCP: - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_input_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); while ((seg = session_get_tcp_segment(sess)) != NULL) { session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id, (void *)seg, STELLAR_MQ_PRIORITY_HIGH); } break; case SESSION_TYPE_UDP: - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_input_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); break; default: assert(0); @@ -1053,27 +1028,36 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) } } //TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead - int tid=stellar_get_current_thread_index(); stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); - plugin_manager_scratch_session_set(NULL); - - + plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, NULL); return; } -void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) +void plugin_manager_on_session_output(struct session *sess, struct packet *pkt) { if(sess==NULL)return; struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); if(plug_mgr_rt==NULL)return; - plugin_manager_scratch_session_set(sess); - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH); + if(unlikely(packet_is_ctrl(pkt)))return; int tid=stellar_get_current_thread_index(); + plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, sess); + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_output_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH); + break; + case SESSION_TYPE_UDP: + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_output_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH); + break; + default: + assert(0); + break; + } stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); - plugin_manager_scratch_session_set(NULL); + plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, NULL); return; } @@ -1086,11 +1070,11 @@ void plugin_manager_on_session_closing(struct session *sess) switch (session_get_type(sess)) { case SESSION_TYPE_TCP: - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_input_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH); break; case SESSION_TYPE_UDP: - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_input_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); break; default: break; diff --git a/infra/plugin_manager/plugin_manager.h b/infra/plugin_manager/plugin_manager.h index 1de5878..1c16ba3 100644 --- a/infra/plugin_manager/plugin_manager.h +++ b/infra/plugin_manager/plugin_manager.h @@ -13,14 +13,14 @@ struct plugin_manager_runtime; struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path); void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); -void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); -void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); +void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt); +void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt); //return polling work state, 0: idle, 1: working int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); //publish and dispatch session msg(msg, pkt) on session_mq -void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt); -void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt); +void plugin_manager_on_session_input(struct session *sess,struct packet *pkt); +void plugin_manager_on_session_output(struct session *sess,struct packet *pkt); void plugin_manager_on_session_closing(struct session *sess); struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess); diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h index 07e462d..eb075bf 100644 --- a/infra/plugin_manager/plugin_manager_interna.h +++ b/infra/plugin_manager/plugin_manager_interna.h @@ -14,23 +14,24 @@ extern "C" #include "bitmap/bitmap.h" #include "uthash/utarray.h" + +struct stellar_message; + struct per_thread_exdata_array { struct stellar_exdata *exdata_array; }; -struct stellar_message; - -struct plugin_manger_per_thread_data +struct plugin_manager_per_thread_data { struct per_thread_exdata_array per_thread_pkt_exdata_array; struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list struct stellar_message *dealth_letter_queue;// dlq list long long pub_packet_msg_cnt; + struct session *thread_scratch_session; }; - struct plugin_manager_schema { struct stellar *st; @@ -43,13 +44,14 @@ struct plugin_manager_schema int stellar_mq_topic_num; int packet_topic_subscriber_num; int session_topic_subscriber_num; - int tcp_topic_id; + int tcp_input_topic_id; + int tcp_output_topic_id; int tcp_stream_topic_id; - int udp_topic_id; - int egress_topic_id; + int udp_input_topic_id; + int udp_output_topic_id; int control_packet_topic_id; int max_message_dispatch; - struct plugin_manger_per_thread_data *per_thread_data; + struct plugin_manager_per_thread_data *per_thread_data; }__attribute__((aligned(sizeof(void*)))); enum plugin_exdata_state diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp index 44cb3d4..5c35743 100644 --- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp +++ b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp @@ -5,7 +5,7 @@ #include "plugin_manager_gtest_mock.h" -#define STELLAR_INTRINSIC_TOPIC_NUM 5 +#define STELLAR_INTRINSIC_TOPIC_NUM 6 #define TOPIC_NAME_MAX 512 void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) @@ -33,26 +33,30 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array); EXPECT_EQ(plug_mgr->stellar_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id); - EXPECT_STREQ(topic->topic_name, TOPIC_TCP); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_input_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_TCP_INPUT); + + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_output_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_TCP_OUTPUT); topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_TCP_STREAM); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id); - EXPECT_STREQ(topic->topic_name, TOPIC_UDP); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_input_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_UDP_INPUT); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id); - EXPECT_STREQ(topic->topic_name, TOPIC_EGRESS); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_output_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_UDP_OUTPUT); topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET); //intrinsic topic - EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_INPUT), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP_INPUT), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_OUTPUT), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP_OUTPUT), 0); EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0); - EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP), 0); - EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_EGRESS), 0); EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0); EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); @@ -277,8 +281,8 @@ TEST(plugin_manager, packet_plugin_illegal_exdata) { } struct packet pkt={&st, IPv4, ip_proto}; - plugin_manager_on_packet_ingress(plug_mgr, &pkt); - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); plugin_manager_exit(plug_mgr); @@ -329,8 +333,8 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) { for (int i = 0; i < proto_filter_plugin_num; i++) { pkt.ip_proto = i; - plugin_manager_on_packet_ingress(plug_mgr, &pkt); - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } } plugin_manager_exit(plug_mgr); @@ -455,8 +459,8 @@ TEST(plugin_manager, packet_plugins_share_exdata) { for(int i=0; i < N_packet; i++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } plugin_manager_exit(plug_mgr); @@ -560,8 +564,8 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) { int N_packet=10; for (int i = 0; i < N_packet; i++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } plugin_manager_exit(plug_mgr); @@ -679,8 +683,8 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) { int N_packet=10; for (int i = 0; i < N_packet; i++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } plugin_manager_exit(plug_mgr); @@ -747,8 +751,8 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { struct packet pkt={&st, IPv4, ip_proto}; - plugin_manager_on_packet_ingress(plug_mgr, &pkt); - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); plugin_manager_exit(plug_mgr); @@ -959,16 +963,16 @@ TEST(plugin_manager, no_plugin_register_runtime) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } @@ -1065,11 +1069,11 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) { int plugin_id=stellar_session_plugin_register(&st, test_basic_session_ctx_new, test_basic_session_ctx_free, &env); EXPECT_GE(plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0); - env.intrinsc_egress_topic_id=stellar_mq_get_topic_id(&st, TOPIC_EGRESS); + env.intrinsc_egress_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_OUTPUT); EXPECT_GE(env.intrinsc_egress_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_egress, plugin_id), 0); @@ -1090,15 +1094,15 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -1211,7 +1215,7 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) { env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_mq_pub_session_ctx_new, test_mq_pub_session_ctx_free, &env); EXPECT_GE(env.test_mq_pub_plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0); @@ -1235,15 +1239,15 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } @@ -1379,7 +1383,7 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) { env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env); EXPECT_GE(env.test_mq_pub_plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); @@ -1403,15 +1407,15 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -1491,7 +1495,7 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) { int plugin_id=stellar_session_plugin_register(&st, test_dettach_session_ctx_new, test_dettach_session_ctx_free, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0); @@ -1514,16 +1518,16 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -1578,7 +1582,7 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) { int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0); @@ -1606,16 +1610,16 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -1696,7 +1700,7 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) { int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); @@ -1725,17 +1729,17 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; sess[i].state=SESSION_STATE_ACTIVE; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -1825,7 +1829,7 @@ TEST(plugin_manager, test_session_mq_topic_is_active) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0); @@ -1851,17 +1855,17 @@ TEST(plugin_manager, test_session_mq_topic_is_active) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; sess[i].state=SESSION_STATE_ACTIVE; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -1933,7 +1937,7 @@ TEST(plugin_manager, test_session_dettach) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0); @@ -1959,17 +1963,17 @@ TEST(plugin_manager, test_session_dettach) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; sess[i].state=SESSION_STATE_ACTIVE; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -2066,7 +2070,7 @@ TEST(plugin_manager, test_session_mq_priority) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); @@ -2097,17 +2101,17 @@ TEST(plugin_manager, test_session_mq_priority) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; sess[i].state=SESSION_STATE_ACTIVE; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) @@ -2154,7 +2158,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) { env.plugin_id_1=stellar_session_plugin_register(&st, NULL, NULL, &env); EXPECT_GE(env.plugin_id_1,0); - env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0); @@ -2182,16 +2186,16 @@ TEST(plugin_manager, session_exdata_free_pub_msg) { for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { - plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; - plugin_manager_on_session_ingress(&sess[i], &pkt); - plugin_manager_on_session_egress(&sess[i], &pkt); + plugin_manager_on_session_input(&sess[i], &pkt); + plugin_manager_on_session_output(&sess[i], &pkt); } - plugin_manager_on_packet_egress(plug_mgr, &pkt); + plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) diff --git a/test/debug_plugin/debug_plugin.c b/test/debug_plugin/debug_plugin.c index 5456b57..951eea6 100644 --- a/test/debug_plugin/debug_plugin.c +++ b/test/debug_plugin/debug_plugin.c @@ -271,8 +271,8 @@ void *debug_plugin_init(struct stellar *st) ctx->st = st; ctx->sess_exdata_idx = stellar_exdata_new_index(st, "DEBUG_PLUGIN_SESS_EXDATA", stellar_exdata_free_default, NULL); ctx->sess_plug_id = stellar_session_plugin_register(st, on_sess_new, on_sess_free, ctx); - ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP); - ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP); + ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP_INPUT); + ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP_INPUT); ctx->tcp_stream_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM); stellar_session_mq_subscribe(st, ctx->udp_topic_id, on_sess_udp_msg, ctx->sess_plug_id); diff --git a/test/glimpse_detector/gtest_glimpse_detector_plugin.cpp b/test/glimpse_detector/gtest_glimpse_detector_plugin.cpp index e28c14e..4677630 100644 --- a/test/glimpse_detector/gtest_glimpse_detector_plugin.cpp +++ b/test/glimpse_detector/gtest_glimpse_detector_plugin.cpp @@ -167,8 +167,8 @@ extern "C" void *GLIMPSE_DETECTOR_TEST_PLUG_LOAD(struct stellar *st) exit(-1); } - int tcp_topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP); - int udp_topic_id=stellar_mq_get_topic_id(st, TOPIC_UDP); + int tcp_topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP_INPUT); + int udp_topic_id=stellar_mq_get_topic_id(st, TOPIC_UDP_INPUT); if(tcp_topic_id < 0 || udp_topic_id < 0) { perror("GLIMPSE_DETECTOR_TEST get tcp or udp topic id failed\n"); diff --git a/test/packet_inject/packet_inject.c b/test/packet_inject/packet_inject.c index 8b20f7f..6a24311 100644 --- a/test/packet_inject/packet_inject.c +++ b/test/packet_inject/packet_inject.c @@ -557,8 +557,8 @@ void *packet_inject_init(struct stellar *st) print_config(ctx->logger, &ctx->config); ctx->sess_plug_id = stellar_session_plugin_register(st, on_sess_new, on_sess_free, ctx); - ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP); - ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP); + ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP_INPUT); + ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP_OUTPUT); stellar_session_mq_subscribe(st, ctx->tcp_topic_id, on_sess_msg, ctx->sess_plug_id); stellar_session_mq_subscribe(st, ctx->udp_topic_id, on_sess_msg, ctx->sess_plug_id);