This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/infra/mq/test/gtest_mq_main.cpp

632 lines
19 KiB
C++
Raw Normal View History

2024-09-13 19:04:59 +08:00
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <gtest/gtest.h>
#include "mq/mq_internal.h"
2024-09-19 15:58:39 +08:00
#include "stellar/utils.h"
2024-09-13 19:04:59 +08:00
#define TOPIC_NAME_MAX 512
/*******************************************
* TEST MQ SCHEMA *
*******************************************/
2024-09-13 19:04:59 +08:00
TEST(mq_schema, new_and_free) {
2024-09-13 19:04:59 +08:00
struct mq_schema *s = mq_schema_new();
EXPECT_TRUE(s!=NULL);
mq_schema_free(s);
}
void mock_msg_free(void *msg, void *msg_free_arg){}
void mock_overwrite_msg_free(void *msg, void *msg_free_arg){}
TEST(mq_schema, mq_topic_create_and_update) {
struct mq_schema *s = mq_schema_new();
EXPECT_TRUE(s!=NULL);
const char *topic_name="PACKET_TOPIC";
EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), -1); // illegal topic_name
int topic_id = mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s);
EXPECT_GE(topic_id, 0);
struct mq_topic *topic = NULL;
{
SCOPED_TRACE("White-box test, check mq_schema internal ");
topic =
(struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
EXPECT_EQ(topic->free_cb, (void *)mock_msg_free);
EXPECT_EQ(topic->free_cb_arg, s);
EXPECT_EQ(topic->topic_id, topic_id);
EXPECT_STREQ(topic->topic_name, topic_name);
}
EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), topic_id);
EXPECT_EQ(mq_schema_create_topic(s, topic_name, NULL, NULL, mock_overwrite_msg_free, s), -1); // duplicate create, return error
{
SCOPED_TRACE("White-box test, check stellar internal schema");
topic =
(struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
EXPECT_EQ(topic->free_cb, (void *)mock_msg_free);
EXPECT_EQ(topic->free_cb_arg, s);
EXPECT_EQ(topic->topic_id, topic_id);
EXPECT_STREQ(topic->topic_name, topic_name);
}
EXPECT_EQ(mq_schema_update_topic(s, topic_id, NULL, NULL, mock_overwrite_msg_free, s), 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
topic =
(struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
EXPECT_EQ(topic->free_cb, (void *)mock_overwrite_msg_free);
EXPECT_EQ(topic->free_cb_arg, s);
EXPECT_EQ(topic->topic_id, topic_id);
EXPECT_STREQ(topic->topic_name, topic_name);
EXPECT_EQ(utarray_len(s->topic_array), 1);
}
EXPECT_EQ(mq_schema_destroy_topic(s, 10), -1); // illgeal topic_id
EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 1);
EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 0); // duplicate destroy, return 0;
{
SCOPED_TRACE("White-box test, check stellar internal schema");
EXPECT_EQ(utarray_len(s->topic_array), 1); // destory won't delete the topic schema
}
mq_schema_free(s);
}
void test_mock_on_packet_msg(int topic_id, void *msg, void *sub_arg){}
void test_mock_overwrite_on_packet_msg(int topic_id, void *msg, void *sub_arg){}
2024-09-13 19:04:59 +08:00
TEST(mq_schema, subscribe) {
struct mq_schema *s = mq_schema_new();
EXPECT_TRUE(s!=NULL);
const char *topic_name="PACKET_TOPIC";
int topic_id=mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(mq_schema_subscribe(s, 10, test_mock_on_packet_msg, s),-1);//illgeal topic_id
EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_on_packet_msg, s),0);
EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_overwrite_on_packet_msg, s),0);//duplicate subscribe
struct mq_topic *topic;
{
SCOPED_TRACE("White-box test, check stellar internal schema");
topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
EXPECT_EQ(topic->free_cb, (void *)mock_msg_free);
EXPECT_EQ(topic->free_cb_arg, s);
EXPECT_EQ(topic->topic_id, topic_id);
EXPECT_STREQ(topic->topic_name, topic_name);
}
EXPECT_EQ(topic->subscriber_cnt, 2);
EXPECT_EQ(topic->subscribers->msg_cb, (void *)test_mock_on_packet_msg);
EXPECT_EQ(topic->subscribers->next->msg_cb, (void *)test_mock_overwrite_on_packet_msg);
mq_schema_free(s);
}
/*******************************************
* TEST MQ RUNTIME *
*******************************************/
2024-09-13 19:04:59 +08:00
TEST(mq_runtime, new_and_free) {
2024-09-13 19:04:59 +08:00
struct mq_schema *s = mq_schema_new();
EXPECT_TRUE(s!=NULL);
2024-09-13 19:04:59 +08:00
struct mq_runtime *rt = mq_runtime_new(s);
EXPECT_TRUE(rt!=NULL);
2024-09-13 19:04:59 +08:00
mq_runtime_free(rt);
mq_schema_free(s);
2024-09-13 19:04:59 +08:00
}
#define PACKET_EXDATA_NUM 2
#define PACKET_TOPIC_NUM 2
#define PACKET_MQ_SUB_NUM 2
struct mock_packet_mq_env
2024-09-13 19:04:59 +08:00
{
struct mq_schema *s;
struct mq_runtime *rt;
2024-09-13 19:04:59 +08:00
int basic_on_packet_called;
int exdata_set_on_packet_called;
int exdata_get_on_packet_called;
unsigned int packet_exdata_idx[PACKET_EXDATA_NUM];
int exdata_free_called[PACKET_EXDATA_NUM];
unsigned int packet_topic_id[PACKET_TOPIC_NUM];
unsigned int packet_mq_sub_module_id[PACKET_MQ_SUB_NUM];
2024-09-13 19:04:59 +08:00
int msg_pub_cnt;
int msg_sub_cnt;
int msg_free_cnt;
};
struct mock_packet_message
2024-09-13 19:04:59 +08:00
{
unsigned char ip_proto;
int topic_in;
int topic_out;
2024-09-13 19:04:59 +08:00
};
static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{
struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)msg_free_arg;
2024-09-13 19:04:59 +08:00
env->msg_free_cnt+=1;
return;
}
static void test_mq_on_packet_topic_msg(int topic_id, void *msg, void *sub_arg)
2024-09-13 19:04:59 +08:00
{
struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg;
2024-09-13 19:04:59 +08:00
EXPECT_TRUE(env!=NULL);
env->msg_sub_cnt+=1;
return;
}
static void test_mq_on_packet_in_out(int topic_id, void *msg, void *sub_arg)
2024-09-13 19:04:59 +08:00
{
struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg;
2024-09-13 19:04:59 +08:00
EXPECT_TRUE(env!=NULL);
int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
for(int i=0; i<topic_id_num; i++)
{
EXPECT_EQ(mq_runtime_publish_message(env->rt, env->packet_topic_id[i], msg), 0);
2024-09-13 19:04:59 +08:00
env->msg_pub_cnt+=1;
}
return;
}
TEST(mq_runtime, basic_pub_sub) {
2024-09-13 19:04:59 +08:00
struct mq_schema *s = mq_schema_new();
EXPECT_TRUE(s!=NULL);
2024-09-13 19:04:59 +08:00
struct mock_packet_mq_env env;
memset(&env, 0, sizeof(struct mock_packet_mq_env));
env.s=s;
2024-09-13 19:04:59 +08:00
char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
2024-09-13 19:04:59 +08:00
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
env.packet_topic_id[i]=mq_schema_create_topic(s, topic_name[i], NULL, NULL, test_packet_msg_free_cb_func, &env);
2024-09-13 19:04:59 +08:00
EXPECT_GE(env.packet_topic_id[i], 0);
{
2024-09-13 19:04:59 +08:00
SCOPED_TRACE("White-box test, check stellar internal schema");
struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(
s->topic_array, env.packet_topic_id[i]);
2024-09-13 19:04:59 +08:00
EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func);
EXPECT_EQ(topic->free_cb_arg, &env);
EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
EXPECT_STREQ(topic->topic_name, topic_name[i]);
}
}
{
SCOPED_TRACE("White-box test, check stellar internal schema");
EXPECT_EQ(utarray_len(s->topic_array), topic_id_num);
2024-09-13 19:04:59 +08:00
}
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_module_id) / sizeof(env.packet_mq_sub_module_id[0]));
2024-09-13 19:04:59 +08:00
for (int i = 0; i < topic_sub_num; i++)
{
for(int j = 0; j < topic_id_num; j++)
{
EXPECT_EQ(mq_schema_subscribe(s, env.packet_topic_id[j], test_mq_on_packet_topic_msg, &env), 0);
2024-09-13 19:04:59 +08:00
}
}
int packet_in_topic_id=mq_schema_create_topic(s, "PACKET_IN", NULL, NULL, NULL, &env);
int packet_out_topic_id=mq_schema_create_topic(s, "PACKET_OUT", NULL, NULL, NULL, &env);
2024-09-13 19:04:59 +08:00
mq_schema_subscribe(s, packet_in_topic_id, test_mq_on_packet_in_out, &env);
mq_schema_subscribe(s, packet_out_topic_id, test_mq_on_packet_in_out, &env);
struct mock_packet_message pkt={6, packet_in_topic_id, packet_out_topic_id};
struct mq_runtime *rt = mq_runtime_new(s);
EXPECT_TRUE(rt!=NULL);
env.rt=rt;
2024-09-13 19:04:59 +08:00
int N_packet=10;
for (int i = 0; i < N_packet; i++)
{
mq_runtime_publish_message(rt, packet_in_topic_id, &pkt);
mq_runtime_dispatch(rt);
mq_runtime_publish_message(rt, packet_out_topic_id, &pkt);
mq_runtime_dispatch(rt);
2024-09-13 19:04:59 +08:00
}
EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt);
2024-09-13 19:04:59 +08:00
EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
mq_runtime_free(rt);
mq_schema_free(s);
2024-09-13 19:04:59 +08:00
}
/**********************************************
* MQ RUNTIME WITH DISPATCH *
**********************************************/
struct mock_session_message
{
int id;
int cnt;
};
struct mock_session_mq_env
{
struct mq_schema *s;
struct mq_runtime *rt;
int N_session;
struct mock_session_message sess[1024];
int intrinsc_tcp_input_topic_id;
int basic_on_tcp_called;
int sess_dispatch_called;
int test_mq_sub_called;
int sess_msg_free_called;
};
#define TOPIC_TCP "TCP"
typedef void mock_on_session_msg_cb_func(int topic_id, struct mock_session_message *sess, void *module_ctx);
static void mock_on_msg_dispatch(int topic_id,
void *msg,
on_msg_cb_func* on_msg_cb,
void *on_msg_cb_arg,
void *dispatch_arg)
{
mock_on_session_msg_cb_func *session_cb = (mock_on_session_msg_cb_func *)on_msg_cb;
struct mock_session_message *sess=(struct mock_session_message *)msg;
EXPECT_TRUE(dispatch_arg==NULL);
session_cb(topic_id, sess, on_msg_cb_arg);
struct mock_session_mq_env *env=(struct mock_session_mq_env *)on_msg_cb_arg;
env->sess_dispatch_called+=1;
}
static void mock_tcp_session_msg_free(void *msg, void *msg_free_arg)
{
struct mock_session_mq_env *env=(struct mock_session_mq_env *)msg_free_arg;
env->sess_msg_free_called+=1;
}
static int mock_tcp_session_subscribe(struct mock_session_mq_env *env, mock_on_session_msg_cb_func *on_session_cb)
{
int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP);
if(topic_id<0)
{
topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, mock_on_msg_dispatch, NULL, mock_tcp_session_msg_free, env);
}
return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env);
}
static void test_basic_on_tcp_session(int topic_id, struct mock_session_message *sess, void *sub_arg)
{
struct mock_session_mq_env *env = (struct mock_session_mq_env *)sub_arg;
EXPECT_TRUE(env!=NULL);
if(sess)
{
env->basic_on_tcp_called+=1;
}
return;
}
TEST(mq_runtime, sub_with_dispatch_cb) {
struct mock_session_mq_env env;
memset(&env, 0, sizeof(env));
env.N_session=10;
env.s=mq_schema_new();
EXPECT_EQ(mock_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0);
env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP);
env.rt=mq_runtime_new(env.s);
for(int i=0; i <env.N_session; i++)
{
env.sess[i].id=i;
mq_runtime_publish_message(env.rt, env.intrinsc_tcp_input_topic_id, &env.sess[i]);
env.sess[i].cnt+=1;
mq_runtime_dispatch(env.rt);
}
mq_runtime_free(env.rt);
mq_schema_free(env.s);
EXPECT_EQ(env.basic_on_tcp_called, env.N_session);
EXPECT_EQ(env.sess_dispatch_called, env.N_session);
EXPECT_EQ(env.sess_msg_free_called, env.N_session);
}
#if 0
2024-09-19 15:58:39 +08:00
//TODO: test case mq for overlimit
2024-09-13 19:04:59 +08:00
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
env->msg_free_cnt+=1;
FREE(msg);
return;
}
static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
EXPECT_TRUE(env!=NULL);
env->msg_sub_cnt+=1;
return;
}
static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
EXPECT_TRUE(env!=NULL);
//EXPECT_EQ(pkt->ip_proto, ip_protocol);
int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
unsigned int cnt=0;
int *msg;
for(int i=0; i<topic_id_num; i++)
{
for(unsigned int j=0; j < env->plug_mgr->max_message_dispatch; j++)
{
msg=CALLOC(int, 1);
*msg=cnt;
int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg);
if(cnt < env->plug_mgr->max_message_dispatch)
{
ASSERT_EQ(pub_ret, 0);
env->msg_pub_cnt+=1;
}
else
{
ASSERT_EQ(pub_ret, -1);
}
if(pub_ret!=0)FREE(msg);
cnt+=1;
}
}
return;
}
TEST(mq_runtime, packet_plugins_pub_overlimit) {
2024-09-13 19:04:59 +08:00
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
unsigned char ip_proto=6;
struct packet_plugin_env env;
memset(&env, 0, sizeof(struct packet_plugin_env));
env.plug_mgr=plug_mgr;
char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL,NULL,overlimit_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
EXPECT_EQ(topic->free_cb_arg, &env);
EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
EXPECT_STREQ(topic->topic_name, topic_name[i]);
}
}
{
SCOPED_TRACE("White-box test, check stellar internal schema");
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
}
int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env);
EXPECT_GE(pub_plugin_id, 0);
int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
for (int i = 0; i < topic_sub_num; i++)
{
env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok
EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0);
for(int j = 0; j < topic_id_num; j++)
{
EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
}
}
{
SCOPED_TRACE("White-box test, check stellar internal schema");
EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
}
struct packet pkt={&st, IPv4, ip_proto};
int N_packet=10;
for (int i = 0; i < N_packet; i++)
{
plugin_manager_on_packet_input(plug_mgr, &pkt);
plugin_manager_on_packet_output(plug_mgr, &pkt);
}
plugin_manager_exit(plug_mgr);
EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt);
EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
}
2024-09-19 15:58:39 +08:00
#endif
struct test_priority_mq_env
{
struct mq_schema *s;
struct mq_runtime *rt;
int N_round;
int current_round;
int tcp_topic_id;
int test_mq_topic_id;
int plugin_id_1_called;
int plugin_id_2_called;
};
2024-09-13 19:04:59 +08:00
//test dettach session
2024-09-19 15:58:39 +08:00
static void test_session_mq_priority_plugin_1_on_msg(int topic_id, void *msg, void *plugin_env)
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
2024-09-13 19:04:59 +08:00
env->plugin_id_1_called+=1;
2024-09-19 15:58:39 +08:00
if(topic_id == env->tcp_topic_id)
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority
if((long)msg%2==0)
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0);
2024-09-13 19:04:59 +08:00
}
2024-09-19 15:58:39 +08:00
else
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0);
2024-09-13 19:04:59 +08:00
}
}
2024-09-19 15:58:39 +08:00
if(topic_id == env->test_mq_topic_id)
{
if (env->current_round % 2 == 0)
{
if (env->plugin_id_1_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
}
if (env->plugin_id_1_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
}
}
else
{
if (env->plugin_id_1_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
}
if (env->plugin_id_1_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
}
}
}
2024-09-13 19:04:59 +08:00
return;
}
2024-09-19 15:58:39 +08:00
static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env)
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
2024-09-13 19:04:59 +08:00
env->plugin_id_2_called+=1;
2024-09-19 15:58:39 +08:00
if(topic_id == env->tcp_topic_id)
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
EXPECT_EQ(env->plugin_id_2_called % 3, 1);
// tcp msg has normal priority
EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0);
2024-09-13 19:04:59 +08:00
}
if(topic_id == env->test_mq_topic_id)
{
2024-09-19 15:58:39 +08:00
if (env->current_round % 2 == 0)
{
if (env->plugin_id_2_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
}
if (env->plugin_id_2_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
}
}
else
{
if (env->plugin_id_2_called % 3 == 2)
{
EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
}
if (env->plugin_id_2_called % 3 == 0)
{
EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
}
}
}
2024-09-13 19:04:59 +08:00
return;
}
2024-09-19 15:58:39 +08:00
TEST(mq_runtime, basic_mq_priority) {
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
struct test_priority_mq_env env={};
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
env.s=mq_schema_new();
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env);
EXPECT_GE(env.tcp_topic_id, 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env);
2024-09-13 19:04:59 +08:00
EXPECT_GE(env.test_mq_topic_id, 0);
2024-09-19 15:58:39 +08:00
EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
2024-09-13 19:04:59 +08:00
// mock packet and session
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
env.rt=mq_runtime_new(env.s);
env.N_round=10;
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
for (int i = 0; i < env.N_round; i++)
2024-09-13 19:04:59 +08:00
{
2024-09-19 15:58:39 +08:00
env.current_round=i;
mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i);
mq_runtime_dispatch(env.rt);
2024-09-13 19:04:59 +08:00
}
2024-09-19 15:58:39 +08:00
mq_runtime_free(env.rt);
mq_schema_free(env.s);
2024-09-13 19:04:59 +08:00
2024-09-19 15:58:39 +08:00
// publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2
EXPECT_EQ(env.plugin_id_1_called,env.N_round*3);
EXPECT_EQ(env.plugin_id_2_called,env.N_round*3);
2024-09-13 19:04:59 +08:00
}
/**********************************************
* GTEST MAIN *
**********************************************/
int main(int argc, char ** argv)
{
int ret=0;
::testing::InitGoogleTest(&argc, argv);
ret=RUN_ALL_TESTS();
return ret;
}