#pragma GCC diagnostic ignored "-Wunused-parameter" #include #include "mq/mq_internal.h" #include "stellar/utils.h" #define TOPIC_NAME_MAX 512 /******************************************* * TEST MQ SCHEMA * *******************************************/ TEST(mq_schema, new_and_free) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); mq_schema_free(s); } void mock_msg_free(void *msg, void *msg_free_arg){} void mock_overwrite_msg_free(void *msg, void *msg_free_arg){} TEST(mq_schema, mq_topic_create_and_update) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); const char *topic_name="PACKET_TOPIC"; EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), -1); // illegal topic_name int topic_id = mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); EXPECT_GE(topic_id, 0); struct mq_topic *topic = NULL; { SCOPED_TRACE("White-box test, check mq_schema internal "); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); } EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), topic_id); EXPECT_EQ(mq_schema_create_topic(s, topic_name, NULL, NULL, mock_overwrite_msg_free, s), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); } EXPECT_EQ(mq_schema_update_topic(s, topic_id, NULL, NULL, mock_overwrite_msg_free, s), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_overwrite_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); EXPECT_EQ(utarray_len(s->topic_array), 1); } EXPECT_EQ(mq_schema_destroy_topic(s, 10), -1); // illgeal topic_id EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 1); EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 0); // duplicate destroy, return 0; { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(s->topic_array), 1); // destory won't delete the topic schema } mq_schema_free(s); } void test_mock_on_packet_msg(int topic_id, void *msg, void *sub_arg){} void test_mock_overwrite_on_packet_msg(int topic_id, void *msg, void *sub_arg){} TEST(mq_schema, subscribe) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); const char *topic_name="PACKET_TOPIC"; int topic_id=mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s); EXPECT_GE(topic_id, 0); EXPECT_EQ(mq_schema_subscribe(s, 10, test_mock_on_packet_msg, s),-1);//illgeal topic_id EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_on_packet_msg, s),0); EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_overwrite_on_packet_msg, s),0);//duplicate subscribe struct mq_topic *topic; { SCOPED_TRACE("White-box test, check stellar internal schema"); topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); EXPECT_EQ(topic->free_cb, (void *)mock_msg_free); EXPECT_EQ(topic->free_cb_arg, s); EXPECT_EQ(topic->topic_id, topic_id); EXPECT_STREQ(topic->topic_name, topic_name); } EXPECT_EQ(topic->subscriber_cnt, 2); EXPECT_EQ(topic->subscribers->msg_cb, (void *)test_mock_on_packet_msg); EXPECT_EQ(topic->subscribers->next->msg_cb, (void *)test_mock_overwrite_on_packet_msg); mq_schema_free(s); } /******************************************* * TEST MQ RUNTIME * *******************************************/ TEST(mq_runtime, new_and_free) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); struct mq_runtime *rt = mq_runtime_new(s); EXPECT_TRUE(rt!=NULL); mq_runtime_free(rt); mq_schema_free(s); } struct test_pub_and_clean_env { struct mq_schema *s; struct mq_runtime *rt; int N_round; int current_round; int topic_id; int on_msg_free_called; int on_msg_called; }; void test_pub_and_clean_free(void *msg, void *msg_free_arg) { struct test_pub_and_clean_env *env = (struct test_pub_and_clean_env *)msg_free_arg; env->on_msg_free_called+=1; FREE(msg); return; } void test_pub_and_clean_on_msg(int topic_id, void *msg, void *sub_arg) { struct test_pub_and_clean_env *env = (struct test_pub_and_clean_env *)sub_arg; env->on_msg_called+=1; return; } TEST(mq_runtime, pub_then_clean) { struct test_pub_and_clean_env env={}; env.s = mq_schema_new(); EXPECT_TRUE(env.s!=NULL); env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_and_clean_free , &env); EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_and_clean_on_msg, &env), 0); env.N_round=10; env.rt=mq_runtime_new(env.s); EXPECT_TRUE(env.rt!=NULL); for(int i=0; imsg_free_cnt+=1; return; } static void test_mq_on_packet_topic_msg(int topic_id, void *msg, void *sub_arg) { struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg; EXPECT_TRUE(env!=NULL); env->msg_sub_cnt+=1; return; } static void test_mq_on_packet_in_out(int topic_id, void *msg, void *sub_arg) { struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg; EXPECT_TRUE(env!=NULL); int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); for(int i=0; irt, env->packet_topic_id[i], msg), 0); env->msg_pub_cnt+=1; } return; } TEST(mq_runtime, basic_pub_sub) { struct mq_schema *s = mq_schema_new(); EXPECT_TRUE(s!=NULL); struct mock_packet_mq_env env; memset(&env, 0, sizeof(struct mock_packet_mq_env)); env.s=s; char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); for(int i=0; itopic_array, env.packet_topic_id[i]); EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); EXPECT_EQ(topic->free_cb_arg, &env); EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); EXPECT_STREQ(topic->topic_name, topic_name[i]); } } { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(s->topic_array), topic_id_num); } int topic_sub_num=(int)(sizeof(env.packet_mq_sub_module_id) / sizeof(env.packet_mq_sub_module_id[0])); for (int i = 0; i < topic_sub_num; i++) { for(int j = 0; j < topic_id_num; j++) { EXPECT_EQ(mq_schema_subscribe(s, env.packet_topic_id[j], test_mq_on_packet_topic_msg, &env), 0); } } int packet_in_topic_id=mq_schema_create_topic(s, "PACKET_IN", NULL, NULL, NULL, &env); int packet_out_topic_id=mq_schema_create_topic(s, "PACKET_OUT", NULL, NULL, NULL, &env); mq_schema_subscribe(s, packet_in_topic_id, test_mq_on_packet_in_out, &env); mq_schema_subscribe(s, packet_out_topic_id, test_mq_on_packet_in_out, &env); struct mock_packet_message pkt={6, packet_in_topic_id, packet_out_topic_id}; struct mq_runtime *rt = mq_runtime_new(s); EXPECT_TRUE(rt!=NULL); env.rt=rt; int N_packet=10; for (int i = 0; i < N_packet; i++) { mq_runtime_publish_message(rt, packet_in_topic_id, &pkt); mq_runtime_dispatch(rt); mq_runtime_publish_message(rt, packet_out_topic_id, &pkt); mq_runtime_dispatch(rt); } mq_runtime_free(rt); mq_schema_free(s); EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt); EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); } struct test_pub_on_free_env { mq_schema *s; mq_runtime *rt; int N_round; int current_round; int topic_id; int on_msg_free_called; int on_msg_called; }; static void test_pub_on_msg_free(void *msg, void *msg_free_arg) { struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)msg_free_arg; env->on_msg_free_called+=1; if(env->current_round==env->N_round-1 && (long)msg!=env->N_round) { EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void *)(long)(env->N_round)), -1);//on message free, publish always failed } return; } static void test_pub_on_free_on_msg(int topic_id, void *msg, void *sub_arg) { struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)sub_arg; env->on_msg_called+=1; if((int)(long)msg==env->N_round) { EXPECT_EQ(env->on_msg_called, env->N_round+1); EXPECT_EQ(env->current_round, env->N_round-1); } return; } TEST(mq_runtime, pub_on_msg_free) { struct test_pub_on_free_env env={}; env.s=mq_schema_new(); EXPECT_TRUE(env.s!=NULL); env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_on_msg_free , &env); EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_on_free_on_msg, &env), 0); env.N_round=10; env.rt=mq_runtime_new(env.s); EXPECT_TRUE(env.rt!=NULL); for(int i=0; ion_msg_free_called+=1; return; } static void test_dispatch_on_msg(int topic_id, void *msg, void *sub_arg) { struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)sub_arg; env->on_msg_called+=1; if(env->current_round==(long)msg) { EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void*)(long)env->N_round), 0); } else { EXPECT_EQ((long)msg, env->N_round); } mq_runtime_dispatch(env->rt); return; } TEST(mq_runtime, call_dispatch_when_dispatch) { struct test_dispatch_on_msg_env env={}; env.s=mq_schema_new(); EXPECT_TRUE(env.s!=NULL); env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_dispatch_on_msg_free , &env); EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_dispatch_on_msg, &env), 0); env.N_round=10; env.rt=mq_runtime_new(env.s); EXPECT_TRUE(env.rt!=NULL); for(int i=0; isess_dispatch_called+=1; } static void mock_tcp_session_msg_free(void *msg, void *msg_free_arg) { struct mock_session_mq_env *env=(struct mock_session_mq_env *)msg_free_arg; env->sess_msg_free_called+=1; } static int mock_tcp_session_subscribe(struct mock_session_mq_env *env, mock_on_session_msg_cb_func *on_session_cb) { int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP); if(topic_id<0) { topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, mock_on_msg_dispatch, NULL, mock_tcp_session_msg_free, env); } return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env); } static void test_basic_on_tcp_session(int topic_id, struct mock_session_message *sess, void *sub_arg) { struct mock_session_mq_env *env = (struct mock_session_mq_env *)sub_arg; EXPECT_TRUE(env!=NULL); if(sess) { env->basic_on_tcp_called+=1; } return; } TEST(mq_runtime, sub_with_dispatch_cb) { struct mock_session_mq_env env; memset(&env, 0, sizeof(env)); env.N_session=10; env.s=mq_schema_new(); EXPECT_EQ(mock_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0); env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP); env.rt=mq_runtime_new(env.s); for(int i=0; i plugin_id_1_called+=1; if(topic_id == env->tcp_topic_id) { EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority if((long)msg%2==0) { EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0); } else { EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0); } } if(topic_id == env->test_mq_topic_id) { if (env->current_round % 2 == 0) { if (env->plugin_id_1_called % 3 == 2) { EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority } if (env->plugin_id_1_called % 3 == 0) { EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority } } else { if (env->plugin_id_1_called % 3 == 2) { EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority } if (env->plugin_id_1_called % 3 == 0) { EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority } } } return; } static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env) { struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env; env->plugin_id_2_called+=1; if(topic_id == env->tcp_topic_id) { EXPECT_EQ(env->plugin_id_2_called % 3, 1); // tcp msg has normal priority EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0); } if(topic_id == env->test_mq_topic_id) { if (env->current_round % 2 == 0) { if (env->plugin_id_2_called % 3 == 2) { EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority } if (env->plugin_id_2_called % 3 == 0) { EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority } } else { if (env->plugin_id_2_called % 3 == 2) { EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority } if (env->plugin_id_2_called % 3 == 0) { EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority } } } return; } TEST(mq_runtime, basic_mq_priority) { struct test_priority_mq_env env={}; env.s=mq_schema_new(); env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env); EXPECT_GE(env.tcp_topic_id, 0); EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0); EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0); env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0); EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0); // mock packet and session env.rt=mq_runtime_new(env.s); env.N_round=10; for (int i = 0; i < env.N_round; i++) { env.current_round=i; mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i); mq_runtime_dispatch(env.rt); } mq_runtime_free(env.rt); mq_schema_free(env.s); // publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2 EXPECT_EQ(env.plugin_id_1_called,env.N_round*3); EXPECT_EQ(env.plugin_id_2_called,env.N_round*3); } #if 0 //TODO: test case mq for overlimit static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) { struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; env->msg_free_cnt+=1; FREE(msg); return; } static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); env->msg_sub_cnt+=1; return; } static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); //EXPECT_EQ(pkt->ip_proto, ip_protocol); int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); unsigned int cnt=0; int *msg; for(int i=0; iplug_mgr->max_message_dispatch; j++) { msg=CALLOC(int, 1); *msg=cnt; int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg); if(cnt < env->plug_mgr->max_message_dispatch) { ASSERT_EQ(pub_ret, 0); env->msg_pub_cnt+=1; } else { ASSERT_EQ(pub_ret, -1); } if(pub_ret!=0)FREE(msg); cnt+=1; } } return; } TEST(mq_runtime, pub_overlimit) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE); whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); unsigned char ip_proto=6; struct packet_plugin_env env; memset(&env, 0, sizeof(struct packet_plugin_env)); env.plug_mgr=plug_mgr; char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); for(int i=0; istellar_mq_schema_array, env.packet_topic_id[i]); EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); EXPECT_EQ(topic->free_cb_arg, &env); EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); EXPECT_STREQ(topic->topic_name, topic_name[i]); } } { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); } int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env); EXPECT_GE(pub_plugin_id, 0); int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); for (int i = 0; i < topic_sub_num; i++) { env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0); for(int j = 0; j < topic_id_num; j++) { EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); } } { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); } struct packet pkt={&st, IPv4, ip_proto}; int N_packet=10; for (int i = 0; i < N_packet; i++) { plugin_manager_on_packet_input(plug_mgr, &pkt); plugin_manager_on_packet_output(plug_mgr, &pkt); } plugin_manager_exit(plug_mgr); EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt); EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); } #endif /********************************************** * GTEST MAIN * **********************************************/ int main(int argc, char ** argv) { int ret=0; ::testing::InitGoogleTest(&argc, argv); ret=RUN_ALL_TESTS(); return ret; }