#pragma GCC diagnostic ignored "-Wunused-parameter" #include #include "mq/mq_internal.h" #define TOPIC_NAME_MAX 512 #if 0 void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) { SCOPED_TRACE("whitebox test intrisic metadata"); EXPECT_TRUE(plug_mgr!=NULL); EXPECT_EQ(plug_mgr->st, st); //load spec null EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL); //session exdata schema null EXPECT_TRUE(plug_mgr->exdata_schema!=NULL); //stellar mq schema null EXPECT_TRUE(plug_mgr->stellar_mq_schema_array==NULL); //registered plugin array null EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL); EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); int thread_num=stellar_get_worker_thread_num(st); for(int i=0; iper_thread_data[i].exdata_array==NULL); EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL); for(int j=0; jper_thread_data[i].priority_mq[j]==NULL); } } #endif /******************************************* * TEST MQ SCHEMA * *******************************************/ TEST(mq_schema, new_and_free) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); mq_schema_free(s); } void mock_msg_free(void *msg, void *msg_free_arg){} void mock_overwrite_msg_free(void *msg, void *msg_free_arg){} TEST(mq_schema, mq_topic_create_and_update) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); const char *topic_name="PACKET_TOPIC"; EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), -1); // illegal topic_name int topic_id = mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); EXPECT_GE(topic_id, 0); struct mq_topic *topic = NULL; { SCOPED_TRACE("White-box test, check mq_schema internal "); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); } EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), topic_id); EXPECT_EQ(mq_schema_create_topic(s, topic_name, NULL, NULL, mock_overwrite_msg_free, s), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); } EXPECT_EQ(mq_schema_update_topic(s, topic_id, NULL, NULL, mock_overwrite_msg_free, s), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_overwrite_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); EXPECT_EQ(utarray_len(s->topic_array), 1); } EXPECT_EQ(mq_schema_destroy_topic(s, 10), -1); // illgeal topic_id EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 1); EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 0); // duplicate destroy, return 0; { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(s->topic_array), 1); // destory won't delete the topic schema } mq_schema_free(s); } void test_mock_on_packet_msg(int topic_id, void *msg, void *plugin_env){} void test_mock_overwrite_on_packet_msg(int topic_id, void *msg, void *plugin_env){} TEST(mq_schema, subscribe) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); const char *topic_name="PACKET_TOPIC"; int topic_id=mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); EXPECT_GE(topic_id, 0); EXPECT_EQ(mq_schema_subscribe(s, 10, test_mock_on_packet_msg, s),-1);//illgeal topic_id EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_on_packet_msg, s),0); EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_overwrite_on_packet_msg, s),0);//duplicate subscribe struct mq_topic *topic; { SCOPED_TRACE("White-box test, check stellar internal schema"); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); } EXPECT_EQ(topic->subscriber_cnt, 2); EXPECT_EQ(topic->subscribers->msg_cb, (void *)test_mock_on_packet_msg); EXPECT_EQ(topic->subscribers->next->msg_cb, (void *)test_mock_overwrite_on_packet_msg); mq_schema_free(s); } /******************************************* * TEST MQ RUNTIME * *******************************************/ TEST(mq_runtime, new_and_free) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); struct mq_runtime *rt = mq_runtime_new(s); EXPECT_TRUE(rt!=NULL); mq_runtime_free(rt); mq_schema_free(s); } #define PACKET_PROTO_PLUGIN_NUM 128 #define PACKET_EXDATA_NUM 2 #define PACKET_TOPIC_NUM 2 #define PACKET_MQ_SUB_NUM 2 struct packet_plugin_env { struct mq_schema *s; struct mq_runtime *rt; int basic_on_packet_called; int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM]; int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM]; int exdata_set_on_packet_called; int exdata_get_on_packet_called; unsigned int packet_exdata_idx[PACKET_EXDATA_NUM]; int exdata_free_called[PACKET_EXDATA_NUM]; unsigned int packet_topic_id[PACKET_TOPIC_NUM]; unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM]; int msg_pub_cnt; int msg_sub_cnt; int msg_free_cnt; }; struct packet_message { unsigned char ip_proto; int topic_in; int topic_out; }; static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg) { struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; env->msg_free_cnt+=1; return; } static void test_mq_on_packet_topic_msg(int topic_id, void *msg, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); env->msg_sub_cnt+=1; return; } static void test_mq_on_packet_in_out(int topic_id, void *msg, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); for(int i=0; irt, env->packet_topic_id[i], msg), 0); env->msg_pub_cnt+=1; } return; } TEST(plugin_manager, basic_pub_sub) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); struct packet_plugin_env env; memset(&env, 0, sizeof(struct packet_plugin_env)); env.s=s; char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); for(int i=0; itopic_array, env.packet_topic_id[i]); EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); EXPECT_EQ(topic->free_cb_arg, &env); EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); EXPECT_STREQ(topic->topic_name, topic_name[i]); } } { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(s->topic_array), topic_id_num); } int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); for (int i = 0; i < topic_sub_num; i++) { for(int j = 0; j < topic_id_num; j++) { EXPECT_EQ(mq_schema_subscribe(s, env.packet_topic_id[j], test_mq_on_packet_topic_msg, &env), 0); } } int packet_in_topic_id=mq_schema_create_topic(s, "PACKET_IN", NULL, NULL, NULL, &env); int packet_out_topic_id=mq_schema_create_topic(s, "PACKET_OUT", NULL, NULL, NULL, &env); mq_schema_subscribe(s, packet_in_topic_id, test_mq_on_packet_in_out, &env); mq_schema_subscribe(s, packet_out_topic_id, test_mq_on_packet_in_out, &env); struct packet_message pkt={6, packet_in_topic_id, packet_out_topic_id}; struct mq_runtime *rt = mq_runtime_new(s); EXPECT_TRUE(rt!=NULL); env.rt=rt; int N_packet=10; for (int i = 0; i < N_packet; i++) { mq_runtime_publish_message(rt, packet_in_topic_id, &pkt); mq_runtime_dispatch(rt); mq_runtime_publish_message(rt, packet_out_topic_id, &pkt); mq_runtime_dispatch(rt); } EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt); EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); mq_runtime_free(rt); mq_schema_free(s); } /********************************************** * MQ RUNTIME WITH DISPATCH * **********************************************/ struct session_message { int id; int cnt; }; struct session_mq_test_env { struct mq_schema *s; struct mq_runtime *rt; int N_session; struct session_message sess[1024]; int intrinsc_tcp_input_topic_id; int basic_on_tcp_called; int sess_dispatch_called; int test_mq_sub_called; int sess_msg_free_called; }; #define TOPIC_TCP "TCP" typedef void on_session_msg_cb_func(int topic_id, struct session_message *sess, void *module_ctx); static void pesudo_on_msg_dispatch(int topic_id, void *msg, on_msg_cb_func* on_msg_cb, void *on_msg_cb_arg, void *dispatch_arg) { on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb; struct session_message *sess=(struct session_message *)msg; EXPECT_TRUE(dispatch_arg==NULL); session_cb(topic_id, sess, on_msg_cb_arg); struct session_mq_test_env *env=(struct session_mq_test_env *)on_msg_cb_arg; env->sess_dispatch_called+=1; } static void pesudo_tcp_session_msg_free(void *msg, void *msg_free_arg) { struct session_mq_test_env *env=(struct session_mq_test_env *)msg_free_arg; env->sess_msg_free_called+=1; } static int pesudo_tcp_session_subscribe(struct session_mq_test_env *env, on_session_msg_cb_func *on_session_cb) { int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP); if(topic_id<0) { topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, pesudo_tcp_session_msg_free, env); } return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env); } static void test_basic_on_tcp_session(int topic_id, struct session_message *sess, void *plugin_env) { struct session_mq_test_env *env = (struct session_mq_test_env *)plugin_env; EXPECT_TRUE(env!=NULL); if(sess) { env->basic_on_tcp_called+=1; } return; } TEST(mq_runtime, sub_with_dispatch_cb) { struct session_mq_test_env env; memset(&env, 0, sizeof(env)); env.N_session=10; env.s=mq_schema_new(); EXPECT_EQ(pesudo_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0); env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP); env.rt=mq_runtime_new(env.s); for(int i=0; i msg_free_cnt+=1; FREE(msg); return; } static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); env->msg_sub_cnt+=1; return; } static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); //EXPECT_EQ(pkt->ip_proto, ip_protocol); int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); unsigned int cnt=0; int *msg; for(int i=0; iplug_mgr->max_message_dispatch; j++) { msg=CALLOC(int, 1); *msg=cnt; int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); if(cnt < env->plug_mgr->max_message_dispatch) { ASSERT_EQ(pub_ret, 0); env->msg_pub_cnt+=1; } else { ASSERT_EQ(pub_ret, -1); } if(pub_ret!=0)FREE(msg); cnt+=1; } } return; } //TODO: test case mq for overlimit TEST(plugin_manager, packet_plugins_pub_overlimit) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); unsigned char ip_proto=6; struct packet_plugin_env env; memset(&env, 0, sizeof(struct packet_plugin_env)); env.plug_mgr=plug_mgr; char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); EXPECT_EQ(topic->free_cb_arg, &env); EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); EXPECT_STREQ(topic->topic_name, topic_name[i]); } } { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); } int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); EXPECT_GE(pub_plugin_id, 0); int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); for (int i = 0; i < topic_sub_num; i++) { env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); for(int j = 0; j < topic_id_num; j++) { EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); } } { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); } struct packet pkt={&st, IPv4, ip_proto}; int N_packet=10; for (int i = 0; i < N_packet; i++) { plugin_manager_on_packet_input(plug_mgr, &pkt); plugin_manager_on_packet_output(plug_mgr, &pkt); } plugin_manager_exit(plug_mgr); EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); } //TODO: test case, mq priority //test dettach session static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env) { struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; env->plugin_id_1_called+=1; if(topic_id == env->intrinsc_tcp_topic_id) { struct session *sess = (struct session *)msg; struct test_session_called_ctx *ctx = (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id); if (ctx == NULL) { ctx = CALLOC(struct test_session_called_ctx, 1); session_exdata_set(sess, env->exdata_ctx_1_id, ctx); } ctx->called+=1; EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0); } if(topic_id == env->test_mq_topic_id) { if(ctx->called%3 == 2) { EXPECT_EQ((int)(long)msg, env->plugin_id_2); } if(ctx->called%3 == 0) { EXPECT_EQ((int )(long)msg, env->plugin_id_1); } } return; } static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env) { struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; env->plugin_id_2_called+=1; if(topic_id == env->intrinsc_tcp_topic_id) { struct session *sess = (struct session *)msg; struct test_session_called_ctx *ctx = (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id); if (ctx == NULL) { ctx = CALLOC(struct test_session_called_ctx, 1); session_exdata_set(sess, env->exdata_ctx_2_id, ctx); } ctx->called+=1; EXPECT_EQ(ctx->called % 3, 1); // publish msg has normal priority EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0); } if(topic_id == env->test_mq_topic_id) { if(ctx->called%3 == 2) { EXPECT_EQ((int)(long)msg, env->plugin_id_2); } if(ctx->called%3 == 0) { EXPECT_EQ((int)(long)msg, env->plugin_id_1); } } return; } TEST(plugin_manager, test_session_mq_priority) { struct stellar st={0}; struct session_plugin_env env; memset(&env, 0, sizeof(struct session_plugin_env)); // pesudo init stage struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); // plugin manager register plugin int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env); EXPECT_GE(plugin_id_1,0); int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env); EXPECT_GE(plugin_id_2,0); env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ; env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ; env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); // pesudo packet and session env.plug_mgr=plug_mgr; env.N_per_session_pkt_cnt=10; env.N_session=10; struct packet pkt={&st, TCP, 6}; struct session sess[env.N_session]; memset(&sess, 0, sizeof(sess)); // pesudo running stage for(int i=0; i < env.N_session; i++) { sess[i].state=SESSION_STATE_OPENING; sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr); sess[i].type=SESSION_TYPE_TCP; } for (int j = 0; j < env.N_per_session_pkt_cnt; j++) { plugin_manager_on_packet_input(plug_mgr, &pkt); for (int i = 0; i < env.N_session; i++) { sess[i].sess_pkt_cnt+=1; sess[i].state=SESSION_STATE_ACTIVE; stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]); } plugin_manager_on_packet_output(plug_mgr, &pkt); } for(int i=0; i < env.N_session; i++) { sess[i].state=SESSION_STATE_CLOSING; session_exdata_runtime_free(sess[i].session_exdat_rt); } // pesudo exit stage plugin_manager_exit(plug_mgr); // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1) EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3)); } #endif /********************************************** * GTEST MAIN * **********************************************/ int main(int argc, char ** argv) { int ret=0; ::testing::InitGoogleTest(&argc, argv); ret=RUN_ALL_TESTS(); return ret; }