feat(stellar mq): add dispatch_cb per topic

This commit is contained in:
yangwei
2024-09-05 18:58:17 +08:00
parent a36865275d
commit b8b8bc1add
4 changed files with 119 additions and 125 deletions

View File

@@ -149,14 +149,14 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_packet_plugin_array)
if(plug_mgr->registered_plugin_array)
{
struct registered_packet_plugin_schema *s = NULL;
while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
struct registered_plugin_schema *s = NULL;
while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, s)))
{
if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info);
if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info);
}
utarray_free(plug_mgr->registered_packet_plugin_array);
utarray_free(plug_mgr->registered_plugin_array);
}
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
@@ -314,11 +314,7 @@ static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
{
struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
*src = (struct stellar_mq_topic_schema *)_src;
dst->subscribers = src->subscribers;
dst->free_cb = src->free_cb;
dst->free_cb_arg = src->free_cb_arg;
dst->topic_id = src->topic_id;
dst->subscriber_cnt = src->subscriber_cnt;
memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema));
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
@@ -351,7 +347,7 @@ int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
return -1;
}
int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
@@ -360,12 +356,13 @@ int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_c
if(len < (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->dispatch_cb=on_dispatch_cb;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->stellar_mq_schema_array == NULL)
@@ -379,6 +376,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_
}
struct stellar_mq_topic_schema t_schema;
memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
t_schema.dispatch_cb=on_dispatch_cb;
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
@@ -415,60 +413,12 @@ int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
return 1; // success
}
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
static int __stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
{
if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
// if plugin already subscribe current topic, return 0
struct stellar_mq_subscriber_info *p=NULL;
while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p)))
{
if(p->topic_id==topic_id)
{
struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
int cnt=0;
while(tmp_subscriber)
{
if(cnt==p->subscriber_idx)
{
tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb;
return 0;
}
cnt++;
tmp_subscriber=tmp_subscriber->next;
}
}
};
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->plugin_idx = plugin_idx;
new_subscriber->plugin_msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
struct stellar_mq_subscriber_info sub_info;
memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
sub_info.topic_id=topic_id;
sub_info.subscriber_idx=topic->subscriber_cnt;
utarray_push_back(registed_mq_subscriber_info, &sub_info);
topic->subscriber_cnt+=1;
plug_mgr->mq_topic_subscriber_num+=1;
return 0;
}
static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
{
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct registered_packet_plugin_schema *packet_plugin_schema;
struct registered_plugin_schema *plugin_schema;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic)
@@ -477,11 +427,13 @@ static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
{
if (sub_elt->plugin_msg_cb)
{
packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(
plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (packet_plugin_schema)
plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(
plug_mgr->registered_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (plugin_schema)
{
sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
//TODO: maybe need pub_plugin_env as dispatch_cb parameter
if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, plugin_schema->plugin_env);
else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env);
}
}
}
@@ -529,9 +481,7 @@ static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_a
}
}
/*******************************
* PACKET MQ *
*******************************/
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
//return 0 if success, otherwise return -1.
int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id)
@@ -539,17 +489,57 @@ int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugi
int plugin_idx=plugin_id;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL)return -1;
struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
if(packet_plugin_schema==NULL)return -1;
struct registered_plugin_schema *plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(plug_mgr->registered_plugin_array, (unsigned)plugin_idx);
if(plugin_schema==NULL)return -1;
if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
if(plugin_schema->registed_mq_subscriber_info==NULL)
{
utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
utarray_new(plugin_schema->registed_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
}
return __stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
// if plugin already subscribe current topic, return 0
struct stellar_mq_subscriber_info *p=NULL;
while( (p=(struct stellar_mq_subscriber_info *)utarray_next(plugin_schema->registed_mq_subscriber_info,p)))
{
if(p->topic_id==topic_id)
{
struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
int cnt=0;
while(tmp_subscriber)
{
if(cnt==p->subscriber_idx)
{
tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb;
return 0;
}
cnt++;
tmp_subscriber=tmp_subscriber->next;
}
}
};
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->plugin_idx = plugin_idx;
new_subscriber->plugin_msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
struct stellar_mq_subscriber_info sub_info;
memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
sub_info.topic_id=topic_id;
sub_info.subscriber_idx=topic->subscriber_cnt;
utarray_push_back(plugin_schema->registed_mq_subscriber_info, &sub_info);
topic->subscriber_cnt+=1;
plug_mgr->mq_topic_subscriber_num+=1;
return 0;
}
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority)
@@ -618,31 +608,31 @@ void session_exdata_runtime_free(struct stellar_exdata *exdata_rt)
/*********************************************
* PLUGIN MANAGER PACKET PLUGIN *
* PLUGIN MANAGER PLUGIN *
*********************************************/
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), NULL, NULL, NULL};
int stellar_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_packet_plugin_array == NULL)
if(plug_mgr->registered_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_packet_plugin_array, &registered_packet_plugin_array_icd);
utarray_new(plug_mgr->registered_plugin_array, &registered_plugin_array_icd);
}
struct registered_packet_plugin_schema packet_plugin_schema;
struct registered_plugin_schema packet_plugin_schema;
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
packet_plugin_schema.ip_protocol = ip_proto;
packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input;
packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
packet_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
utarray_push_back(plug_mgr->registered_plugin_array, &packet_plugin_schema);
return (utarray_len(plug_mgr->registered_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
}
static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
{
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
if(plug_mgr==NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return;
struct registered_plugin_schema *p=NULL;
//TODO: get innermost layer ip protocol by packet api
struct tuple6 t6;
@@ -652,7 +642,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
int tid=stellar_get_current_thread_index();
//TODO : provide public api to reset pub_msg_cnt
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet[in_out])
{
@@ -670,7 +660,7 @@ void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, stru
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
int tid=stellar_get_current_thread_index();
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish