🧪 test(mq): add dispatch_cb test case
This commit is contained in:
@@ -301,6 +301,102 @@ TEST(plugin_manager, basic_pub_sub) {
|
|||||||
mq_schema_free(s);
|
mq_schema_free(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**********************************************
|
||||||
|
* MQ RUNTIME WITH DISPATCH *
|
||||||
|
**********************************************/
|
||||||
|
|
||||||
|
struct session
|
||||||
|
{
|
||||||
|
int id;
|
||||||
|
int cnt;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct session_mq_test_env
|
||||||
|
{
|
||||||
|
struct mq_schema *s;
|
||||||
|
struct mq_runtime *rt;
|
||||||
|
int N_session;
|
||||||
|
struct session sess[1024];
|
||||||
|
int intrinsc_tcp_input_topic_id;
|
||||||
|
int basic_on_tcp_called;
|
||||||
|
int sess_dispatch_called;
|
||||||
|
int test_mq_sub_called;
|
||||||
|
int sess_msg_free_called;
|
||||||
|
};
|
||||||
|
|
||||||
|
#define TOPIC_TCP "TCP"
|
||||||
|
typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *module_ctx);
|
||||||
|
|
||||||
|
static void pesudo_on_msg_dispatch(int topic_id,
|
||||||
|
void *msg,
|
||||||
|
on_msg_cb_func* on_msg_cb,
|
||||||
|
void *on_msg_cb_arg,
|
||||||
|
void *dispatch_arg)
|
||||||
|
{
|
||||||
|
on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb;
|
||||||
|
struct session *sess=(struct session *)msg;
|
||||||
|
EXPECT_TRUE(dispatch_arg==NULL);
|
||||||
|
session_cb(topic_id, sess, on_msg_cb_arg);
|
||||||
|
struct session_mq_test_env *env=(struct session_mq_test_env *)on_msg_cb_arg;
|
||||||
|
env->sess_dispatch_called+=1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void pesudo_tcp_session_msg_free(void *msg, void *msg_free_arg)
|
||||||
|
{
|
||||||
|
struct session_mq_test_env *env=(struct session_mq_test_env *)msg_free_arg;
|
||||||
|
env->sess_msg_free_called+=1;
|
||||||
|
|
||||||
|
}
|
||||||
|
static int pesudo_tcp_session_subscribe(struct session_mq_test_env *env, on_session_msg_cb_func *on_session_cb)
|
||||||
|
{
|
||||||
|
int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP);
|
||||||
|
if(topic_id<0)
|
||||||
|
{
|
||||||
|
topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, pesudo_tcp_session_msg_free, env);
|
||||||
|
}
|
||||||
|
return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void test_basic_on_tcp_session(int topic_id, struct session *sess, void *plugin_env)
|
||||||
|
{
|
||||||
|
struct session_mq_test_env *env = (struct session_mq_test_env *)plugin_env;
|
||||||
|
EXPECT_TRUE(env!=NULL);
|
||||||
|
if(sess)
|
||||||
|
{
|
||||||
|
env->basic_on_tcp_called+=1;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(mq_runtime, sub_with_dispatch_cb) {
|
||||||
|
|
||||||
|
struct session_mq_test_env env;
|
||||||
|
memset(&env, 0, sizeof(env));
|
||||||
|
env.N_session=10;
|
||||||
|
|
||||||
|
env.s=mq_schema_new();
|
||||||
|
|
||||||
|
EXPECT_EQ(pesudo_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0);
|
||||||
|
env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP);
|
||||||
|
|
||||||
|
env.rt=mq_runtime_new(env.s);
|
||||||
|
|
||||||
|
for(int i=0; i <env.N_session; i++)
|
||||||
|
{
|
||||||
|
env.sess[i].id=i;
|
||||||
|
mq_runtime_publish_message(env.rt, env.intrinsc_tcp_input_topic_id, &env.sess[i]);
|
||||||
|
env.sess[i].cnt+=1;
|
||||||
|
mq_runtime_dispatch(env.rt);
|
||||||
|
}
|
||||||
|
|
||||||
|
mq_runtime_free(env.rt);
|
||||||
|
mq_schema_free(env.s);
|
||||||
|
|
||||||
|
EXPECT_EQ(env.basic_on_tcp_called, env.N_session);
|
||||||
|
EXPECT_EQ(env.sess_dispatch_called, env.N_session);
|
||||||
|
EXPECT_EQ(env.sess_msg_free_called, env.N_session);
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
|
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user