This commit is contained in:
yangwei
2024-09-06 11:56:34 +08:00
parent b8b8bc1add
commit 9c12523c9d
4 changed files with 260 additions and 837 deletions

View File

@@ -149,14 +149,14 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_plugin_array)
if(plug_mgr->registered_packet_plugin_array)
{
struct registered_plugin_schema *s = NULL;
while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, s)))
while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
{
if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info);
}
utarray_free(plug_mgr->registered_plugin_array);
utarray_free(plug_mgr->registered_packet_plugin_array);
}
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
@@ -428,7 +428,7 @@ static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
if (sub_elt->plugin_msg_cb)
{
plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(
plug_mgr->registered_plugin_array, (unsigned int)sub_elt->plugin_idx);
plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (plugin_schema)
{
//TODO: maybe need pub_plugin_env as dispatch_cb parameter
@@ -489,9 +489,9 @@ int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugi
int plugin_idx=plugin_id;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL)return -1;
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
struct registered_plugin_schema *plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(plug_mgr->registered_plugin_array, (unsigned)plugin_idx);
struct registered_plugin_schema *plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
if(plugin_schema==NULL)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
@@ -572,9 +572,10 @@ int stellar_mq_publish_message(struct stellar *st, int topic_id, void *data)
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
struct stellar_exdata *session_exdata_runtime_new(struct stellar *st)
{
struct stellar_exdata *exdata_rt = NULL;
struct plugin_manager_schema *plug_mgr=stellar_get_plugin_manager(st);
if(plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
if(len > 0)
@@ -615,9 +616,9 @@ UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), N
int stellar_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_plugin_array == NULL)
if(plug_mgr->registered_packet_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_plugin_array, &registered_plugin_array_icd);
utarray_new(plug_mgr->registered_packet_plugin_array, &registered_plugin_array_icd);
}
struct registered_plugin_schema packet_plugin_schema;
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
@@ -625,13 +626,13 @@ int stellar_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_o
packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input;
packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
packet_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_plugin_array, &packet_plugin_schema);
return (utarray_len(plug_mgr->registered_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
}
static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
{
if(plug_mgr==NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return;
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_plugin_schema *p=NULL;
//TODO: get innermost layer ip protocol by packet api
@@ -642,7 +643,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
int tid=stellar_get_current_thread_index();
//TODO : provide public api to reset pub_msg_cnt
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, p)))
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet[in_out])
{
@@ -660,7 +661,7 @@ void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, stru
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return;
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
int tid=stellar_get_current_thread_index();
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish

View File

@@ -19,7 +19,7 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
struct stellar_exdata;
struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr);
struct stellar_exdata *session_exdata_runtime_new(struct stellar *st);
void session_exdata_runtime_free(struct stellar_exdata *exdata_rt);
#ifdef __cplusplus

View File

@@ -27,7 +27,7 @@ struct plugin_manager_per_thread_data
struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
struct stellar_message *dealth_letter_queue;// dlq list
long long pub_packet_msg_cnt;
struct session *thread_scratch_session;
long long pub_polling_msg_cnt;//TODO
};
@@ -37,7 +37,7 @@ struct plugin_manager_schema
UT_array *plugin_load_specs_array;
UT_array *stellar_exdata_schema_array;
UT_array *stellar_mq_schema_array;
UT_array *registered_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
int stellar_mq_topic_num;
int mq_topic_subscriber_num;

File diff suppressed because it is too large Load Diff