🦄 refactor(plugin_manager): refactor intrinsic topic name

This commit is contained in:
yangwei
2024-08-28 19:58:28 +08:00
parent 283d591e6b
commit b2647a5a75
10 changed files with 156 additions and 164 deletions

View File

@@ -13,17 +13,14 @@
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
// TODO: set scratch_sess to per_thread_data
__thread struct session *per_thread_scratch_sess;
inline static void plugin_manager_scratch_session_set(struct session *sess)
inline static void plugin_manager_scratch_session_set(struct plugin_manager_schema *plug_mgr, int tid, struct session *sess)
{
per_thread_scratch_sess = sess;
plug_mgr->per_thread_data[tid].thread_scratch_session = sess;
}
inline static struct session *plugin_manager_scratch_session_get()
inline static struct session *plugin_manager_scratch_session_get(struct plugin_manager_schema *plug_mgr, int tid)
{
return per_thread_scratch_sess;
return plug_mgr->per_thread_data[tid].thread_scratch_session;
}
@@ -85,19 +82,19 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL;
}
static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
{
if(st == NULL)return NULL;
int thread_num=stellar_get_worker_thread_num(st);
struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num);
struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num);
return per_thread_data;
}
static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st)
static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st)
{
if(per_thread_data == NULL || st == NULL)return;
int thread_num=stellar_get_worker_thread_num(st);
struct plugin_manger_per_thread_data *p_data;
struct plugin_manager_per_thread_data *p_data;
for (int i = 0; i < thread_num; i++)
{
p_data=per_thread_data+i;
@@ -109,7 +106,8 @@ static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_
static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg __attribute__((unused)))
{
struct session *cur_sess = plugin_manager_scratch_session_get();
struct plugin_manager_schema *plug_mgr=(struct plugin_manager_schema *)msg_free_arg;
struct session *cur_sess = plugin_manager_scratch_session_get(plug_mgr, stellar_get_current_thread_index());
if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg);
}
@@ -122,6 +120,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
return NULL;
}
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
//TODO: set max_message_dispatch as parameter
plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH;
if(spec_num > 0)
{
@@ -133,10 +132,11 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
stellar_set_plugin_manger(st, plug_mgr);
plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL);
plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
plug_mgr->tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_INPUT, NULL, NULL);
plug_mgr->tcp_output_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_OUTPUT, NULL, NULL);
plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, plug_mgr);
plug_mgr->udp_input_topic_id=stellar_mq_create_topic(st, TOPIC_UDP_INPUT, NULL, NULL);
plug_mgr->udp_output_topic_id=stellar_mq_create_topic(st, TOPIC_UDP_OUTPUT, NULL, NULL);
plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
for(int i = 0; i < spec_num; i++)
@@ -895,7 +895,7 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, p
return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE
}
void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
@@ -919,7 +919,7 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
return;
}
void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
int tid=stellar_get_current_thread_index();
@@ -992,40 +992,15 @@ int stellar_session_plugin_register(struct stellar *st,
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
}
void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
void plugin_manager_on_session_input(struct session *sess, struct packet *pkt)
{
if(sess==NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
#if 0
int topic_id = -1;
//FIXME: get topic and tcp data by stellar api
switch (packet_get_type(pkt))
{
case TCP:
topic_id=plug_mgr_rt->plug_mgr->tcp_topic_id;
break;
case TCP_STREAM:
topic_id=plug_mgr_rt->plug_mgr->tcp_stream_topic_id;
break;
case UDP:
topic_id=plug_mgr_rt->plug_mgr->udp_topic_id;
break;
case CONTROL:
topic_id=plug_mgr_rt->plug_mgr->control_packet_topic_id;
break;
default:
break;
}
plug_mgr_rt->pub_session_msg_cnt=0;
session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
int tid=stellar_get_current_thread_index();
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
#endif
plug_mgr_rt->pub_session_msg_cnt=0;
plugin_manager_scratch_session_set(sess);
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, sess);
struct tcp_segment *seg;
enum session_type type = session_get_type(sess);
@@ -1038,14 +1013,14 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
switch (type)
{
case SESSION_TYPE_TCP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_input_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
while ((seg = session_get_tcp_segment(sess)) != NULL)
{
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id, (void *)seg, STELLAR_MQ_PRIORITY_HIGH);
}
break;
case SESSION_TYPE_UDP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_input_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
assert(0);
@@ -1053,27 +1028,36 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
}
}
//TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead
int tid=stellar_get_current_thread_index();
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
plugin_manager_scratch_session_set(NULL);
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, NULL);
return;
}
void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
void plugin_manager_on_session_output(struct session *sess, struct packet *pkt)
{
if(sess==NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
plugin_manager_scratch_session_set(sess);
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
if(unlikely(packet_is_ctrl(pkt)))return;
int tid=stellar_get_current_thread_index();
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, sess);
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_output_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
case SESSION_TYPE_UDP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_output_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
assert(0);
break;
}
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
plugin_manager_scratch_session_set(NULL);
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, NULL);
return;
}
@@ -1086,11 +1070,11 @@ void plugin_manager_on_session_closing(struct session *sess)
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_input_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH);
break;
case SESSION_TYPE_UDP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_input_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
break;