🦄 refactor(plug_mgr): per stage msg counter encap

This commit is contained in:
yangwei
2024-09-06 14:01:23 +08:00
parent 9cb50f81fb
commit 3de8bbdabc
3 changed files with 26 additions and 11 deletions

View File

@@ -10,6 +10,24 @@
#include "packet_private.h" #include "packet_private.h"
#include "session_private.h" #include "session_private.h"
#include <stdbool.h>
void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment;
}
void stellar_per_stage_message_counter_set(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=increment;
}
bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *plug_mgr, int tid)
{
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true;
return false;
}
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
@@ -548,8 +566,7 @@ int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, v
if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1; if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1;
int tid = stellar_get_current_thread_index(); int tid = stellar_get_current_thread_index();
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1; if(stellar_per_stage_message_counter_overlimt(plug_mgr, tid)==true)return -1;
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1; if (len <= (unsigned int)topic_id)return -1;
@@ -559,7 +576,7 @@ int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, v
msg->header.priority = priority; msg->header.priority = priority;
msg->body = data; msg->body = data;
DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg); DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg);
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1; stellar_per_stage_message_counter_incby(plug_mgr, tid, 1);
return 0; return 0;
} }
@@ -634,8 +651,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
struct registered_plugin_schema *p=NULL; struct registered_plugin_schema *p=NULL;
int tid=stellar_get_current_thread_index(); int tid=stellar_get_current_thread_index();
//TODO : provide public api to reset pub_msg_cnt stellar_per_stage_message_counter_set(plug_mgr, tid, 0);
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{ {
if(p->on_packet[in_out]) if(p->on_packet[in_out])
@@ -657,7 +673,7 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT); plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
int tid=stellar_get_current_thread_index(); int tid=stellar_get_current_thread_index();
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish stellar_per_stage_message_counter_set(plug_mgr, tid, -1);
stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
plug_mgr->stellar_mq_schema_array); plug_mgr->stellar_mq_schema_array);
per_thread_packet_exdata_arrary_clean(plug_mgr); per_thread_packet_exdata_arrary_clean(plug_mgr);

View File

@@ -13,7 +13,6 @@ extern "C"
#include "uthash/utarray.h" #include "uthash/utarray.h"
struct stellar_message; struct stellar_message;
struct per_thread_exdata_array struct per_thread_exdata_array
@@ -26,10 +25,9 @@ struct plugin_manager_per_thread_data
struct per_thread_exdata_array per_thread_pkt_exdata_array; struct per_thread_exdata_array per_thread_pkt_exdata_array;
struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
struct stellar_message *dealth_letter_queue;// dlq list struct stellar_message *dealth_letter_queue;// dlq list
long long pub_packet_msg_cnt; unsigned long long pub_packet_msg_cnt;
}; };
struct plugin_manager_schema struct plugin_manager_schema
{ {
struct stellar *st; struct stellar *st;
@@ -44,6 +42,7 @@ struct plugin_manager_schema
struct plugin_manager_per_thread_data *per_thread_data; struct plugin_manager_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*)))); }__attribute__((aligned(sizeof(void*))));
enum plugin_exdata_state enum plugin_exdata_state
{ INIT, ACTIVE, EXIT }; { INIT, ACTIVE, EXIT };

View File

@@ -44,7 +44,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
* TEST PLUGIN MANAGER INIT & EXIT * * TEST PLUGIN MANAGER INIT & EXIT *
***********************************/ ***********************************/
//TODO: test plugin_specs_load //TODO: test case, plugin_specs_load
TEST(plugin_manager_init, init_with_null_toml) { TEST(plugin_manager_init, init_with_null_toml) {
@@ -1033,7 +1033,7 @@ TEST(plugin_manager, session_plugin_on_tcp) {
} }
// TODO: message pub overlimit test case //TODO: test case, message pub overlimit
#if 0 #if 0
static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env) static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env)
{ {