✨ feat(plugin_manager): add session plugin with hook
This commit is contained in:
@@ -18,6 +18,9 @@ typedef void plugin_on_unload_func(void *plugin_env);
|
|||||||
typedef void *session_ctx_new_func(struct session *sess, void *plugin_env);
|
typedef void *session_ctx_new_func(struct session *sess, void *plugin_env);
|
||||||
typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env);
|
typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env);
|
||||||
|
|
||||||
|
typedef void session_on_new_func(struct session *sess, void *session_ctx, void *plugin_env);
|
||||||
|
typedef void session_on_free_func(struct session *sess, void *session_ctx, void *plugin_env);
|
||||||
|
|
||||||
// INTRINSIC TOPIC
|
// INTRINSIC TOPIC
|
||||||
// TOPIC_TCP_STREAM on_msg need convert msg to (const struct tcp_segment *)
|
// TOPIC_TCP_STREAM on_msg need convert msg to (const struct tcp_segment *)
|
||||||
// TOPIC_UDP_INPUT/TOPIC_TCP_INPUT/TOPIC_UDP_OUTPUT/TOPIC_TCP_OUTPUT/TOPIC_CONTROL_PACKET on_msg need convert msg to (const struct packet *)
|
// TOPIC_UDP_INPUT/TOPIC_TCP_INPUT/TOPIC_UDP_OUTPUT/TOPIC_TCP_OUTPUT/TOPIC_CONTROL_PACKET on_msg need convert msg to (const struct packet *)
|
||||||
@@ -37,6 +40,13 @@ int stellar_session_plugin_register(struct stellar *st,
|
|||||||
session_ctx_free_func session_ctx_free,
|
session_ctx_free_func session_ctx_free,
|
||||||
void *plugin_env);
|
void *plugin_env);
|
||||||
|
|
||||||
|
int stellar_session_plugin_register_with_hook(struct stellar *st,
|
||||||
|
session_ctx_new_func session_ctx_new,
|
||||||
|
session_ctx_free_func session_ctx_free,
|
||||||
|
session_on_new_func session_on_new,
|
||||||
|
session_on_free_func session_on_free,
|
||||||
|
void *plugin_env);
|
||||||
|
|
||||||
void stellar_session_plugin_dettach_current_session(struct session *sess);
|
void stellar_session_plugin_dettach_current_session(struct session *sess);
|
||||||
|
|
||||||
struct tcp_segment;
|
struct tcp_segment;
|
||||||
|
|||||||
@@ -513,6 +513,28 @@ static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topi
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void plugin_manager_runtime_update_plugin_ctx(struct session *sess, struct registered_session_plugin_schema *session_plugin_schema, struct session_plugin_ctx_runtime *plugin_ctx_rt)
|
||||||
|
{
|
||||||
|
if(sess==NULL || session_plugin_schema == NULL || plugin_ctx_rt == NULL)return;
|
||||||
|
|
||||||
|
if (plugin_ctx_rt->state == INIT)
|
||||||
|
{
|
||||||
|
if (session_plugin_schema->on_ctx_new)
|
||||||
|
{
|
||||||
|
plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
|
||||||
|
if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
|
||||||
|
{
|
||||||
|
session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
|
||||||
|
plugin_ctx_rt->plugin_ctx = NULL;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
plugin_ctx_rt->state = ACTIVE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt)
|
static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt)
|
||||||
{
|
{
|
||||||
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
||||||
@@ -535,24 +557,7 @@ static void stellar_mq_dispatch_one_session_message(struct session *sess, struct
|
|||||||
plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
|
plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
|
||||||
if (session_plugin_schema)
|
if (session_plugin_schema)
|
||||||
{
|
{
|
||||||
if (plugin_ctx_rt->state == INIT)
|
plugin_manager_runtime_update_plugin_ctx(sess, session_plugin_schema, plugin_ctx_rt);
|
||||||
{
|
|
||||||
if (session_plugin_schema->on_ctx_new)
|
|
||||||
{
|
|
||||||
plugin_ctx_rt->plugin_ctx =
|
|
||||||
session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
|
|
||||||
if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
|
|
||||||
{
|
|
||||||
session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx,
|
|
||||||
session_plugin_schema->plugin_env);
|
|
||||||
plugin_ctx_rt->plugin_ctx = NULL;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
plugin_ctx_rt->state = ACTIVE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (sub_elt->sess_msg_cb &&
|
if (sub_elt->sess_msg_cb &&
|
||||||
bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
|
bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
|
||||||
0) // ctx_new maybe call detach, need check again
|
0) // ctx_new maybe call detach, need check again
|
||||||
@@ -973,9 +978,11 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
|
|||||||
*********************************************/
|
*********************************************/
|
||||||
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
|
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
|
||||||
|
|
||||||
int stellar_session_plugin_register(struct stellar *st,
|
int stellar_session_plugin_register_with_hook(struct stellar *st,
|
||||||
session_ctx_new_func session_ctx_new,
|
session_ctx_new_func session_ctx_new,
|
||||||
session_ctx_free_func session_ctx_free,
|
session_ctx_free_func session_ctx_free,
|
||||||
|
session_on_new_func session_on_new,
|
||||||
|
session_on_free_func session_on_free,
|
||||||
void *plugin_env)
|
void *plugin_env)
|
||||||
{
|
{
|
||||||
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
|
||||||
@@ -987,11 +994,21 @@ int stellar_session_plugin_register(struct stellar *st,
|
|||||||
memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema));
|
memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema));
|
||||||
session_plugin_schema.on_ctx_new = session_ctx_new;
|
session_plugin_schema.on_ctx_new = session_ctx_new;
|
||||||
session_plugin_schema.on_ctx_free = session_ctx_free;
|
session_plugin_schema.on_ctx_free = session_ctx_free;
|
||||||
|
session_plugin_schema.on_session_new = session_on_new;
|
||||||
|
session_plugin_schema.on_session_free = session_on_free;
|
||||||
session_plugin_schema.plugin_env = plugin_env;
|
session_plugin_schema.plugin_env = plugin_env;
|
||||||
utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema);
|
utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema);
|
||||||
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
|
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int stellar_session_plugin_register(struct stellar *st,
|
||||||
|
session_ctx_new_func session_ctx_new,
|
||||||
|
session_ctx_free_func session_ctx_free,
|
||||||
|
void *plugin_env)
|
||||||
|
{
|
||||||
|
return stellar_session_plugin_register_with_hook(st, session_ctx_new, session_ctx_free, NULL, NULL, plugin_env);
|
||||||
|
}
|
||||||
|
|
||||||
void plugin_manager_on_session_input(struct session *sess, struct packet *pkt)
|
void plugin_manager_on_session_input(struct session *sess, struct packet *pkt)
|
||||||
{
|
{
|
||||||
if(sess==NULL)return;
|
if(sess==NULL)return;
|
||||||
@@ -1064,30 +1081,54 @@ void plugin_manager_on_session_output(struct session *sess, struct packet *pkt)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void plugin_manager_on_session_closing(struct session *sess)
|
void plugin_manager_on_session_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
|
||||||
|
{
|
||||||
|
if(sess==NULL)return;
|
||||||
|
if(plug_mgr->registered_session_plugin_array==NULL)return;
|
||||||
|
struct plugin_manager_runtime *plug_mgr_rt = plugin_manager_session_runtime_new(plug_mgr, sess);
|
||||||
|
session_set_user_data(sess, plug_mgr_rt);
|
||||||
|
|
||||||
|
struct registered_session_plugin_schema *s = NULL;
|
||||||
|
struct session_plugin_ctx_runtime *plugin_ctx_rt;
|
||||||
|
for(unsigned int i = 0; i < utarray_len(plug_mgr_rt->plug_mgr->registered_session_plugin_array); i++)
|
||||||
|
{
|
||||||
|
s = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, i);
|
||||||
|
if (s->on_session_new)
|
||||||
|
{
|
||||||
|
plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + i);
|
||||||
|
plugin_manager_runtime_update_plugin_ctx(sess, s, plugin_ctx_rt);
|
||||||
|
s->on_session_new(sess, plugin_ctx_rt->plugin_ctx, s->plugin_env);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void plugin_manager_on_session_free(struct session *sess)
|
||||||
{
|
{
|
||||||
if(sess==NULL)return;
|
if(sess==NULL)return;
|
||||||
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
||||||
if(plug_mgr_rt==NULL)return;
|
if(plug_mgr_rt==NULL)return;
|
||||||
|
if(plug_mgr_rt->plug_mgr->registered_session_plugin_array==NULL)return;
|
||||||
plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt
|
plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt
|
||||||
switch (session_get_type(sess))
|
|
||||||
{
|
struct registered_session_plugin_schema *s = NULL;
|
||||||
case SESSION_TYPE_TCP:
|
struct session_plugin_ctx_runtime *plugin_ctx_rt;
|
||||||
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_input_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
|
for(unsigned int i = 0; i < utarray_len(plug_mgr_rt->plug_mgr->registered_session_plugin_array); i++)
|
||||||
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH);
|
{
|
||||||
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_output_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH);
|
s = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, i);
|
||||||
break;
|
if (s->on_session_free)
|
||||||
case SESSION_TYPE_UDP:
|
{
|
||||||
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_input_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
|
plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + i);
|
||||||
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_output_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
|
plugin_manager_runtime_update_plugin_ctx(sess, s, plugin_ctx_rt);
|
||||||
break;
|
s->on_session_free(sess, plugin_ctx_rt->plugin_ctx, s->plugin_env);
|
||||||
default:
|
}
|
||||||
break;
|
}
|
||||||
}
|
|
||||||
int tid=stellar_get_current_thread_index();
|
int tid=stellar_get_current_thread_index();
|
||||||
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
|
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
|
||||||
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
|
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
|
||||||
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
|
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
|
||||||
|
plugin_manager_session_runtime_free(plug_mgr_rt);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,10 +21,9 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
|
|||||||
//publish and dispatch session msg(msg, pkt) on session_mq
|
//publish and dispatch session msg(msg, pkt) on session_mq
|
||||||
void plugin_manager_on_session_input(struct session *sess,struct packet *pkt);
|
void plugin_manager_on_session_input(struct session *sess,struct packet *pkt);
|
||||||
void plugin_manager_on_session_output(struct session *sess,struct packet *pkt);
|
void plugin_manager_on_session_output(struct session *sess,struct packet *pkt);
|
||||||
void plugin_manager_on_session_closing(struct session *sess);
|
|
||||||
|
|
||||||
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
|
void plugin_manager_on_session_free(struct session *sess);
|
||||||
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
|
void plugin_manager_on_session_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -165,6 +165,8 @@ struct registered_session_plugin_schema
|
|||||||
{
|
{
|
||||||
session_ctx_new_func *on_ctx_new;
|
session_ctx_new_func *on_ctx_new;
|
||||||
session_ctx_free_func *on_ctx_free;
|
session_ctx_free_func *on_ctx_free;
|
||||||
|
session_on_new_func *on_session_new;
|
||||||
|
session_on_free_func *on_session_free;
|
||||||
void *plugin_env;
|
void *plugin_env;
|
||||||
UT_array *registed_session_mq_subscriber_info;
|
UT_array *registed_session_mq_subscriber_info;
|
||||||
}__attribute__((aligned(sizeof(void*))));
|
}__attribute__((aligned(sizeof(void*))));
|
||||||
|
|||||||
@@ -957,7 +957,7 @@ TEST(plugin_manager, no_plugin_register_runtime) {
|
|||||||
// pesudo running stage
|
// pesudo running stage
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -978,8 +978,7 @@ TEST(plugin_manager, no_plugin_register_runtime) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//exit stage
|
//exit stage
|
||||||
@@ -1091,7 +1090,7 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1110,8 +1109,7 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
plugin_manager_exit(plug_mgr);
|
plugin_manager_exit(plug_mgr);
|
||||||
@@ -1237,7 +1235,7 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1257,8 +1255,7 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
plugin_manager_exit(plug_mgr);
|
plugin_manager_exit(plug_mgr);
|
||||||
@@ -1405,7 +1402,7 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1424,8 +1421,7 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
plugin_manager_exit(plug_mgr);
|
plugin_manager_exit(plug_mgr);
|
||||||
@@ -1516,7 +1512,7 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1536,8 +1532,7 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
plugin_manager_exit(plug_mgr);
|
plugin_manager_exit(plug_mgr);
|
||||||
@@ -1559,7 +1554,7 @@ static void test_invalid_pub_msg_session_ctx_free(struct session *sess, void *se
|
|||||||
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
|
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
|
||||||
env->basic_ctx_free_called+=1;
|
env->basic_ctx_free_called+=1;
|
||||||
struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
|
struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
|
||||||
EXPECT_EQ(ctx->called,(env->N_per_session_pkt_cnt+1));
|
EXPECT_EQ(ctx->called,(env->N_per_session_pkt_cnt));
|
||||||
|
|
||||||
EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, ctx), -1);// illegal publish when session closing
|
EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, ctx), -1);// illegal publish when session closing
|
||||||
|
|
||||||
@@ -1608,7 +1603,7 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) {
|
|||||||
// pesudo running stage
|
// pesudo running stage
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1628,8 +1623,7 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pesudo exit stage
|
// pesudo exit stage
|
||||||
@@ -1665,20 +1659,25 @@ static void test_session_closing_ctx_free(struct session *sess, void *session_ct
|
|||||||
FREE(ctx);
|
FREE(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void test_session_closing_on_intrisic_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
|
|
||||||
|
static void test_session_closing_on_session_free(struct session *sess, void *per_session_ctx, void *plugin_env)
|
||||||
{
|
{
|
||||||
struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
|
struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
|
||||||
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
|
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
|
||||||
if(msg)ctx->pkt_called+=1;
|
|
||||||
if(sess->state==SESSION_STATE_CLOSING)
|
if(sess->state==SESSION_STATE_CLOSING)
|
||||||
{
|
{
|
||||||
EXPECT_EQ(msg,nullptr);
|
|
||||||
ctx->session_free_called+=1;
|
ctx->session_free_called+=1;
|
||||||
session_mq_publish_message(sess, env->test_mq_topic_id, env);
|
session_mq_publish_message(sess, env->test_mq_topic_id, env);
|
||||||
env->test_mq_pub_called+=1;
|
env->test_mq_pub_called+=1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void test_session_closing_on_intrisic_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
|
||||||
|
{
|
||||||
|
struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
|
||||||
|
if(msg)ctx->pkt_called+=1;
|
||||||
|
}
|
||||||
|
|
||||||
static void test_session_closing_on_userdefine_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
|
static void test_session_closing_on_userdefine_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
|
||||||
{
|
{
|
||||||
struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
|
struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
|
||||||
@@ -1701,7 +1700,7 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
|
|||||||
|
|
||||||
// plugin manager register plugin
|
// plugin manager register plugin
|
||||||
|
|
||||||
int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env);
|
int plugin_id=stellar_session_plugin_register_with_hook(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, NULL, test_session_closing_on_session_free , &env);
|
||||||
EXPECT_GE(plugin_id,0);
|
EXPECT_GE(plugin_id,0);
|
||||||
|
|
||||||
env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT);
|
env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP_INPUT);
|
||||||
@@ -1727,7 +1726,7 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_OPENING;
|
sess[i].state=SESSION_STATE_OPENING;
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1749,8 +1748,7 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_CLOSING;
|
sess[i].state=SESSION_STATE_CLOSING;
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pesudo exit stage
|
// pesudo exit stage
|
||||||
@@ -1853,7 +1851,7 @@ TEST(plugin_manager, test_session_mq_topic_is_active) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_OPENING;
|
sess[i].state=SESSION_STATE_OPENING;
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1875,8 +1873,7 @@ TEST(plugin_manager, test_session_mq_topic_is_active) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_CLOSING;
|
sess[i].state=SESSION_STATE_CLOSING;
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pesudo exit stage
|
// pesudo exit stage
|
||||||
@@ -1961,7 +1958,7 @@ TEST(plugin_manager, test_session_dettach) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_OPENING;
|
sess[i].state=SESSION_STATE_OPENING;
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1983,8 +1980,7 @@ TEST(plugin_manager, test_session_dettach) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_CLOSING;
|
sess[i].state=SESSION_STATE_CLOSING;
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pesudo exit stage
|
// pesudo exit stage
|
||||||
@@ -2099,7 +2095,7 @@ TEST(plugin_manager, test_session_mq_priority) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_OPENING;
|
sess[i].state=SESSION_STATE_OPENING;
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2121,16 +2117,15 @@ TEST(plugin_manager, test_session_mq_priority) {
|
|||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].state=SESSION_STATE_CLOSING;
|
sess[i].state=SESSION_STATE_CLOSING;
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pesudo exit stage
|
// pesudo exit stage
|
||||||
plugin_manager_exit(plug_mgr);
|
plugin_manager_exit(plug_mgr);
|
||||||
|
|
||||||
// each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1)
|
// each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1)
|
||||||
EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt+1)*3));
|
EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
|
||||||
EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt+1)*3));
|
EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2184,7 +2179,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) {
|
|||||||
// pesudo running stage
|
// pesudo running stage
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
|
plugin_manager_on_session_new(plug_mgr, &sess[i]);
|
||||||
sess[i].type=SESSION_TYPE_TCP;
|
sess[i].type=SESSION_TYPE_TCP;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2204,8 +2199,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) {
|
|||||||
|
|
||||||
for(int i=0; i < env.N_session; i++)
|
for(int i=0; i < env.N_session; i++)
|
||||||
{
|
{
|
||||||
plugin_manager_on_session_closing(&sess[i]);
|
plugin_manager_on_session_free(&sess[i]);
|
||||||
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pesudo exit stage
|
// pesudo exit stage
|
||||||
|
|||||||
@@ -103,16 +103,13 @@ static void update_stat(struct session *sess, struct packet *pkt)
|
|||||||
|
|
||||||
static inline void clean_session(struct session_manager *sess_mgr, uint64_t now_ms)
|
static inline void clean_session(struct session_manager *sess_mgr, uint64_t now_ms)
|
||||||
{
|
{
|
||||||
struct plugin_manager_runtime *plugin_ctx = NULL;
|
|
||||||
struct session *sess = NULL;
|
struct session *sess = NULL;
|
||||||
struct session *cleaned_sess[RX_BURST_MAX * 16];
|
struct session *cleaned_sess[RX_BURST_MAX * 16];
|
||||||
uint64_t nr_sess_cleaned = session_manager_clean_session(sess_mgr, now_ms, cleaned_sess, sizeof(cleaned_sess) / sizeof(cleaned_sess[0]));
|
uint64_t nr_sess_cleaned = session_manager_clean_session(sess_mgr, now_ms, cleaned_sess, sizeof(cleaned_sess) / sizeof(cleaned_sess[0]));
|
||||||
for (uint64_t j = 0; j < nr_sess_cleaned; j++)
|
for (uint64_t j = 0; j < nr_sess_cleaned; j++)
|
||||||
{
|
{
|
||||||
sess = cleaned_sess[j];
|
sess = cleaned_sess[j];
|
||||||
plugin_ctx = (struct plugin_manager_runtime *)session_get_user_data(sess);
|
plugin_manager_on_session_free(sess);
|
||||||
plugin_manager_on_session_closing(sess);
|
|
||||||
plugin_manager_session_runtime_free(plugin_ctx);
|
|
||||||
session_manager_free_session(sess_mgr, sess);
|
session_manager_free_session(sess_mgr, sess);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -122,7 +119,6 @@ static void *worker_thread(void *arg)
|
|||||||
int nr_pkt_received = 0;
|
int nr_pkt_received = 0;
|
||||||
uint64_t now_ms = 0;
|
uint64_t now_ms = 0;
|
||||||
char thd_name[16] = {0};
|
char thd_name[16] = {0};
|
||||||
void *plugin_ctx = NULL;
|
|
||||||
struct packet *pkt = NULL;
|
struct packet *pkt = NULL;
|
||||||
struct packet *defraged_pkt = NULL;
|
struct packet *defraged_pkt = NULL;
|
||||||
struct packet packets[RX_BURST_MAX];
|
struct packet packets[RX_BURST_MAX];
|
||||||
@@ -210,8 +206,7 @@ static void *worker_thread(void *arg)
|
|||||||
{
|
{
|
||||||
goto fast_path;
|
goto fast_path;
|
||||||
}
|
}
|
||||||
plugin_ctx = plugin_manager_session_runtime_new(plug_mgr, sess);
|
plugin_manager_on_session_new(plug_mgr, sess);
|
||||||
session_set_user_data(sess, plugin_ctx);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user