diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 66a09f1..2cce7db 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -57,7 +57,7 @@ struct packet; typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env); //return packet plugin_id -int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env); +int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env); //return polling work result, 0: no work, 1: work diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c index 5c41e12..775dd8f 100644 --- a/infra/plugin_manager/plugin_manager.c +++ b/infra/plugin_manager/plugin_manager.c @@ -884,7 +884,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) *********************************************/ UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; -int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env) +int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); if(plug_mgr->registered_packet_plugin_array == NULL) @@ -894,13 +894,14 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, p struct registered_packet_plugin_schema packet_plugin_schema; memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema)); packet_plugin_schema.ip_protocol = ip_proto; - packet_plugin_schema.on_packet = on_packet_cb; + packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input; + packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output; packet_plugin_schema.plugin_env = plugin_env; utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema); return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE } -void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out) { if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; @@ -915,20 +916,25 @@ void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, stru plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { - if(p->ip_protocol == ip_proto && p->on_packet) + if(p->ip_protocol == ip_proto && p->on_packet[in_out]) { - p->on_packet(pkt, ip_proto, p->plugin_env); + p->on_packet[in_out](pkt, ip_proto, p->plugin_env); } } stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt); return; } +void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +{ + plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_INPUT); +} + void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; + plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); int tid=stellar_get_current_thread_index(); - stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt); plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr->stellar_mq_schema_array); diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h index 44bc553..91233cf 100644 --- a/infra/plugin_manager/plugin_manager_interna.h +++ b/infra/plugin_manager/plugin_manager_interna.h @@ -141,10 +141,17 @@ struct plugin_manager_runtime int pub_session_msg_cnt; }__attribute__((aligned(sizeof(void*)))); +enum packet_stage +{ + PACKET_STAGE_INPUT=0, + PACKET_STAGE_OUTPUT, + PACKET_STAGE_MAX +}; + struct registered_packet_plugin_schema { char ip_protocol; - plugin_on_packet_func *on_packet; + plugin_on_packet_func *on_packet[PACKET_STAGE_MAX]; void *plugin_env; UT_array *registed_packet_mq_subscriber_info; }__attribute__((aligned(sizeof(void*)))); diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp index 6135f95..00f89c8 100644 --- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp +++ b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp @@ -202,7 +202,7 @@ TEST(plugin_manager_init, packet_mq_subscribe) { EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id - int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st); + int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, NULL,&st); EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0); @@ -271,7 +271,7 @@ TEST(plugin_manager, packet_plugin_illegal_exdata) { struct packet_plugin_env env; memset(&env, 0, sizeof(struct packet_plugin_env)); env.plug_mgr=plug_mgr; - int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, &env); + int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, NULL,&env); EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); { @@ -314,7 +314,7 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) { int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0])); for (int i = 0; i < proto_filter_plugin_num; i++) { - env.proto_filter_plugin_id[i] = stellar_packet_plugin_register(&st, i, test_proto_filter_on_packet, &env); + env.proto_filter_plugin_id[i] = stellar_packet_plugin_register(&st, i, test_proto_filter_on_packet, NULL,&env); EXPECT_GE(env.proto_filter_plugin_id[i], PACKET_PULGIN_ID_BASE); @@ -442,10 +442,10 @@ TEST(plugin_manager, packet_plugins_share_exdata) { EXPECT_EQ(utarray_len(plug_mgr->stellar_exdata_schema_array), exdata_idx_len); } - int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, &env); + int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, NULL,&env); EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE); - int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, &env); + int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, NULL,&env); EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE); { @@ -539,14 +539,14 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) { EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); } - int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, &env); + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, NULL,&env); EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); for (int i = 0; i < topic_sub_num; i++) { - env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, NULL,&env);// empty on_packet is ok EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); for(int j = 0; j < topic_id_num; j++) { @@ -658,14 +658,14 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) { EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); } - int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env); + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, NULL,&env); EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); for (int i = 0; i < topic_sub_num; i++) { - env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, NULL, &env);// empty on_packet is ok EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); for(int j = 0; j < topic_id_num; j++) { @@ -741,7 +741,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { struct packet_plugin_env env; memset(&env, 0, sizeof(struct packet_plugin_env)); env.plug_mgr=plug_mgr; - int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, &env); + int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, NULL,&env); EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); diff --git a/infra/version.map b/infra/version.map index 0488997..3723eb8 100644 --- a/infra/version.map +++ b/infra/version.map @@ -51,6 +51,7 @@ global: session_set_discard; stellar_session_plugin_register; + stellar_session_plugin_register_with_hook; stellar_session_plugin_dettach_current_session; stellar_packet_plugin_register; stellar_polling_plugin_register;