✨ feat(plugin manager api): packet plugin register with stage
This commit is contained in:
@@ -57,7 +57,7 @@ struct packet;
|
|||||||
typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
|
typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
|
||||||
|
|
||||||
//return packet plugin_id
|
//return packet plugin_id
|
||||||
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env);
|
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env);
|
||||||
|
|
||||||
|
|
||||||
//return polling work result, 0: no work, 1: work
|
//return polling work result, 0: no work, 1: work
|
||||||
|
|||||||
@@ -884,7 +884,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
|
|||||||
*********************************************/
|
*********************************************/
|
||||||
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
|
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
|
||||||
|
|
||||||
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env)
|
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
|
||||||
{
|
{
|
||||||
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
||||||
if(plug_mgr->registered_packet_plugin_array == NULL)
|
if(plug_mgr->registered_packet_plugin_array == NULL)
|
||||||
@@ -894,13 +894,14 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, p
|
|||||||
struct registered_packet_plugin_schema packet_plugin_schema;
|
struct registered_packet_plugin_schema packet_plugin_schema;
|
||||||
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
|
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
|
||||||
packet_plugin_schema.ip_protocol = ip_proto;
|
packet_plugin_schema.ip_protocol = ip_proto;
|
||||||
packet_plugin_schema.on_packet = on_packet_cb;
|
packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input;
|
||||||
|
packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
|
||||||
packet_plugin_schema.plugin_env = plugin_env;
|
packet_plugin_schema.plugin_env = plugin_env;
|
||||||
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
|
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
|
||||||
return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE
|
return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE
|
||||||
}
|
}
|
||||||
|
|
||||||
void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
|
static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
|
||||||
{
|
{
|
||||||
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
|
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
|
||||||
struct registered_packet_plugin_schema *p=NULL;
|
struct registered_packet_plugin_schema *p=NULL;
|
||||||
@@ -915,20 +916,25 @@ void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, stru
|
|||||||
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
|
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
|
||||||
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
|
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
|
||||||
{
|
{
|
||||||
if(p->ip_protocol == ip_proto && p->on_packet)
|
if(p->ip_protocol == ip_proto && p->on_packet[in_out])
|
||||||
{
|
{
|
||||||
p->on_packet(pkt, ip_proto, p->plugin_env);
|
p->on_packet[in_out](pkt, ip_proto, p->plugin_env);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
|
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
|
||||||
|
{
|
||||||
|
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
|
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
|
||||||
{
|
{
|
||||||
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
|
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
|
||||||
|
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
|
||||||
int tid=stellar_get_current_thread_index();
|
int tid=stellar_get_current_thread_index();
|
||||||
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
|
|
||||||
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish
|
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish
|
||||||
stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
|
stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
|
||||||
plug_mgr->stellar_mq_schema_array);
|
plug_mgr->stellar_mq_schema_array);
|
||||||
|
|||||||
@@ -141,10 +141,17 @@ struct plugin_manager_runtime
|
|||||||
int pub_session_msg_cnt;
|
int pub_session_msg_cnt;
|
||||||
}__attribute__((aligned(sizeof(void*))));
|
}__attribute__((aligned(sizeof(void*))));
|
||||||
|
|
||||||
|
enum packet_stage
|
||||||
|
{
|
||||||
|
PACKET_STAGE_INPUT=0,
|
||||||
|
PACKET_STAGE_OUTPUT,
|
||||||
|
PACKET_STAGE_MAX
|
||||||
|
};
|
||||||
|
|
||||||
struct registered_packet_plugin_schema
|
struct registered_packet_plugin_schema
|
||||||
{
|
{
|
||||||
char ip_protocol;
|
char ip_protocol;
|
||||||
plugin_on_packet_func *on_packet;
|
plugin_on_packet_func *on_packet[PACKET_STAGE_MAX];
|
||||||
void *plugin_env;
|
void *plugin_env;
|
||||||
UT_array *registed_packet_mq_subscriber_info;
|
UT_array *registed_packet_mq_subscriber_info;
|
||||||
}__attribute__((aligned(sizeof(void*))));
|
}__attribute__((aligned(sizeof(void*))));
|
||||||
|
|||||||
@@ -202,7 +202,7 @@ TEST(plugin_manager_init, packet_mq_subscribe) {
|
|||||||
EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id
|
EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id
|
||||||
EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id
|
EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id
|
||||||
|
|
||||||
int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st);
|
int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, NULL,&st);
|
||||||
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0);
|
EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0);
|
||||||
@@ -271,7 +271,7 @@ TEST(plugin_manager, packet_plugin_illegal_exdata) {
|
|||||||
struct packet_plugin_env env;
|
struct packet_plugin_env env;
|
||||||
memset(&env, 0, sizeof(struct packet_plugin_env));
|
memset(&env, 0, sizeof(struct packet_plugin_env));
|
||||||
env.plug_mgr=plug_mgr;
|
env.plug_mgr=plug_mgr;
|
||||||
int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, &env);
|
int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, NULL,&env);
|
||||||
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -314,7 +314,7 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) {
|
|||||||
int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0]));
|
int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0]));
|
||||||
for (int i = 0; i < proto_filter_plugin_num; i++)
|
for (int i = 0; i < proto_filter_plugin_num; i++)
|
||||||
{
|
{
|
||||||
env.proto_filter_plugin_id[i] = stellar_packet_plugin_register(&st, i, test_proto_filter_on_packet, &env);
|
env.proto_filter_plugin_id[i] = stellar_packet_plugin_register(&st, i, test_proto_filter_on_packet, NULL,&env);
|
||||||
EXPECT_GE(env.proto_filter_plugin_id[i], PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(env.proto_filter_plugin_id[i], PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
|
|
||||||
@@ -442,10 +442,10 @@ TEST(plugin_manager, packet_plugins_share_exdata) {
|
|||||||
EXPECT_EQ(utarray_len(plug_mgr->stellar_exdata_schema_array), exdata_idx_len);
|
EXPECT_EQ(utarray_len(plug_mgr->stellar_exdata_schema_array), exdata_idx_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, &env);
|
int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, NULL,&env);
|
||||||
EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, &env);
|
int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, NULL,&env);
|
||||||
EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -539,14 +539,14 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) {
|
|||||||
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
|
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
|
||||||
}
|
}
|
||||||
|
|
||||||
int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, &env);
|
int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, NULL,&env);
|
||||||
EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
|
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
|
||||||
|
|
||||||
for (int i = 0; i < topic_sub_num; i++)
|
for (int i = 0; i < topic_sub_num; i++)
|
||||||
{
|
{
|
||||||
env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
|
env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, NULL,&env);// empty on_packet is ok
|
||||||
EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
|
||||||
for(int j = 0; j < topic_id_num; j++)
|
for(int j = 0; j < topic_id_num; j++)
|
||||||
{
|
{
|
||||||
@@ -658,14 +658,14 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) {
|
|||||||
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
|
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
|
||||||
}
|
}
|
||||||
|
|
||||||
int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env);
|
int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, NULL,&env);
|
||||||
EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
|
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
|
||||||
|
|
||||||
for (int i = 0; i < topic_sub_num; i++)
|
for (int i = 0; i < topic_sub_num; i++)
|
||||||
{
|
{
|
||||||
env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
|
env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, NULL, &env);// empty on_packet is ok
|
||||||
EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
|
||||||
for(int j = 0; j < topic_id_num; j++)
|
for(int j = 0; j < topic_id_num; j++)
|
||||||
{
|
{
|
||||||
@@ -741,7 +741,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
|
|||||||
struct packet_plugin_env env;
|
struct packet_plugin_env env;
|
||||||
memset(&env, 0, sizeof(struct packet_plugin_env));
|
memset(&env, 0, sizeof(struct packet_plugin_env));
|
||||||
env.plug_mgr=plug_mgr;
|
env.plug_mgr=plug_mgr;
|
||||||
int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, &env);
|
int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, NULL,&env);
|
||||||
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
|
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
|
||||||
|
|
||||||
env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
|
env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ global:
|
|||||||
session_set_discard;
|
session_set_discard;
|
||||||
|
|
||||||
stellar_session_plugin_register;
|
stellar_session_plugin_register;
|
||||||
|
stellar_session_plugin_register_with_hook;
|
||||||
stellar_session_plugin_dettach_current_session;
|
stellar_session_plugin_dettach_current_session;
|
||||||
stellar_packet_plugin_register;
|
stellar_packet_plugin_register;
|
||||||
stellar_polling_plugin_register;
|
stellar_polling_plugin_register;
|
||||||
|
|||||||
Reference in New Issue
Block a user