diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c index 439869f..4d65257 100644 --- a/infra/plugin_manager/plugin_manager.c +++ b/infra/plugin_manager/plugin_manager.c @@ -10,6 +10,24 @@ #include "packet_private.h" #include "session_private.h" +#include + +void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment) +{ + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment; +} + +void stellar_per_stage_message_counter_set(struct plugin_manager_schema *plug_mgr, int tid, long long increment) +{ + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=increment; +} + +bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *plug_mgr, int tid) +{ + if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true; + return false; +} + UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) @@ -548,8 +566,7 @@ int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, v if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1; int tid = stellar_get_current_thread_index(); - if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1; - if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1; + if(stellar_per_stage_message_counter_overlimt(plug_mgr, tid)==true)return -1; unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; @@ -559,7 +576,7 @@ int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, v msg->header.priority = priority; msg->body = data; DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg); - plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1; + stellar_per_stage_message_counter_incby(plug_mgr, tid, 1); return 0; } @@ -634,8 +651,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str struct registered_plugin_schema *p=NULL; int tid=stellar_get_current_thread_index(); - //TODO : provide public api to reset pub_msg_cnt - plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt + stellar_per_stage_message_counter_set(plug_mgr, tid, 0); while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->on_packet[in_out]) @@ -657,7 +673,7 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); int tid=stellar_get_current_thread_index(); - plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish + stellar_per_stage_message_counter_set(plug_mgr, tid, -1); stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr->stellar_mq_schema_array); per_thread_packet_exdata_arrary_clean(plug_mgr); diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h index ae82573..10aae73 100644 --- a/infra/plugin_manager/plugin_manager_interna.h +++ b/infra/plugin_manager/plugin_manager_interna.h @@ -13,7 +13,6 @@ extern "C" #include "uthash/utarray.h" - struct stellar_message; struct per_thread_exdata_array @@ -26,10 +25,9 @@ struct plugin_manager_per_thread_data struct per_thread_exdata_array per_thread_pkt_exdata_array; struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list struct stellar_message *dealth_letter_queue;// dlq list - long long pub_packet_msg_cnt; + unsigned long long pub_packet_msg_cnt; }; - struct plugin_manager_schema { struct stellar *st; @@ -44,6 +42,7 @@ struct plugin_manager_schema struct plugin_manager_per_thread_data *per_thread_data; }__attribute__((aligned(sizeof(void*)))); + enum plugin_exdata_state { INIT, ACTIVE, EXIT }; diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp index fe49514..1492871 100644 --- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp +++ b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp @@ -44,7 +44,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p * TEST PLUGIN MANAGER INIT & EXIT * ***********************************/ -//TODO: test plugin_specs_load +//TODO: test case, plugin_specs_load TEST(plugin_manager_init, init_with_null_toml) { @@ -1033,7 +1033,7 @@ TEST(plugin_manager, session_plugin_on_tcp) { } -// TODO: message pub overlimit test case +//TODO: test case, message pub overlimit #if 0 static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env) {