From 22ba2e1d96671efd09119ede285b1786586daeb7 Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Wed, 18 Sep 2024 14:23:01 +0800 Subject: [PATCH] test(packet manager): re-schedule claimed packets --- include/CMakeLists.txt | 4 +- infra/packet_manager/packet_manager.c | 65 +++--- infra/packet_manager/packet_manager_private.h | 22 +- .../test/gtest_packet_manager.cpp | 213 ++++++++++++++---- 4 files changed, 224 insertions(+), 80 deletions(-) diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 29dcfeb..0e982cb 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -2,5 +2,5 @@ install(FILES stellar/utils.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/packet.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/session.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/stellar.h DESTINATION include/stellar/ COMPONENT LIBRARIES) -install(FILES stellar/stellar_mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES) -install(FILES stellar/stellar_exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES) \ No newline at end of file +install(FILES stellar/mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES) +install(FILES stellar/exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES) \ No newline at end of file diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index 4cb92ca..3278236 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -27,8 +27,9 @@ struct packet_manager_runtime uint16_t idx; void *cb_args; on_packet_claimed_callback *claimed_cb; + enum packet_stage stage; struct mq_runtime *mq; - struct packet_queue queue[QUEUE_NUM_MAX]; + struct packet_queue queue[PACKET_QUEUE_MAX]; struct packet_manager_runtime_stat stat; }; @@ -58,7 +59,6 @@ const char *packet_stage_to_str(enum packet_stage stage) case PACKET_STAGE_POSTROUTING: return "postrouting"; default: - assert(0); return "unknown"; } } @@ -183,7 +183,7 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime) { if (runtime) { - for (int i = 0; i < QUEUE_NUM_MAX; i++) + for (int i = 0; i < PACKET_QUEUE_MAX; i++) { struct packet *pkt = NULL; while ((pkt = TAILQ_FIRST(&runtime->queue[i]))) @@ -210,7 +210,7 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx) runtime->idx = idx; - for (int i = 0; i < QUEUE_NUM_MAX; i++) + for (int i = 0; i < PACKET_QUEUE_MAX; i++) { TAILQ_INIT(&runtime->queue[i]); } @@ -220,15 +220,19 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx) void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime) { - PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, claim_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}", - runtime->idx, - runtime->stat.input_pkts, runtime->stat.output_pkts, - runtime->stat.claim_pkts, runtime->stat.schedule_pkts, - runtime->stat.queue_len[PACKET_STAGE_PREROUTING], - runtime->stat.queue_len[PACKET_STAGE_INPUT], - runtime->stat.queue_len[PACKET_STAGE_FORWARD], - runtime->stat.queue_len[PACKET_STAGE_OUTPUT], - runtime->stat.queue_len[PACKET_STAGE_POSTROUTING]); + PACKET_MANAGER_LOG_DEBUG("runtime[%d] current stage: %s, pkts_input: %lu, pkts_output: %lu", + runtime->idx, packet_stage_to_str(runtime->stage), + runtime->stat.total.pkts_input, runtime->stat.total.pkts_output); + for (int i = 0; i < PACKET_QUEUE_MAX; i++) + { + PACKET_MANAGER_LOG_DEBUG("runtime[%d] (%11s) queue stat => pkts_in: %lu, pkts_out: %lu, pkts_claim: %lu, pkts_schedule: %lu", + runtime->idx, + packet_stage_to_str(i), + runtime->stat.queue[i].pkts_in, + runtime->stat.queue[i].pkts_out, + runtime->stat.queue[i].pkts_claim, + runtime->stat.queue[i].pkts_schedule); + } } struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime) @@ -243,19 +247,19 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) { - pkt_mgr_rt->stat.input_pkts++; - pkt_mgr_rt->stat.queue_len[PACKET_STAGE_PREROUTING]++; + pkt_mgr_rt->stat.total.pkts_input++; + pkt_mgr_rt->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++; TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe); } struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt) { - struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1]); + struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_MAX]); if (pkt) { - pkt_mgr_rt->stat.output_pkts++; - pkt_mgr_rt->stat.queue_len[QUEUE_NUM_MAX - 1]--; - TAILQ_REMOVE(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1], pkt, stage_tqe); + pkt_mgr_rt->stat.total.pkts_output++; + pkt_mgr_rt->stat.queue[PACKET_STAGE_MAX].pkts_out++; + TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_MAX], pkt, stage_tqe); } return pkt; } @@ -264,19 +268,20 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) { for (int i = 0; i < PACKET_STAGE_MAX; i++) { + pkt_mgr_rt->stage = i; packet_manager_runtime_print_stat(pkt_mgr_rt); struct packet *pkt = NULL; - while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i]))) + while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[pkt_mgr_rt->stage]))) { packet_set_claim(pkt, false); pkt_mgr_rt->claimed_cb = NULL; pkt_mgr_rt->cb_args = NULL; - TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe); - pkt_mgr_rt->stat.queue_len[i]--; + TAILQ_REMOVE(&pkt_mgr_rt->queue[pkt_mgr_rt->stage], pkt, stage_tqe); + pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_out++; - mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt); + mq_runtime_publish_message(pkt_mgr_rt->mq, pkt_mgr_rt->stage, pkt); mq_runtime_dispatch(pkt_mgr_rt->mq); if (packet_is_claim(pkt)) @@ -285,13 +290,15 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) { pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args); } + packet_set_claim(pkt, false); continue; } - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe); - pkt_mgr_rt->stat.queue_len[i + 1]++; + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[pkt_mgr_rt->stage + 1], pkt, stage_tqe); + pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage + 1].pkts_in++; } } + pkt_mgr_rt->stage = -1; } int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args) @@ -306,7 +313,7 @@ int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_r pkt_mgr_rt->claimed_cb = cb; pkt_mgr_rt->cb_args = args; packet_set_claim(pkt, true); - pkt_mgr_rt->stat.claim_pkts++; + pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_claim++; return 0; } } @@ -320,8 +327,8 @@ void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_m return; } - pkt_mgr_rt->stat.schedule_pkts++; - pkt_mgr_rt->stat.queue_len[stage]++; + pkt_mgr_rt->stat.queue[stage].pkts_schedule++; + pkt_mgr_rt->stat.queue[stage].pkts_in++; TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe); } @@ -377,7 +384,7 @@ void packet_manager_free(struct packet_manager *pkt_mgr) { if (pkt_mgr->runtime[i]) { - packet_manager_runtime_print_stat(pkt_mgr->runtime[i]); + // packet_manager_runtime_print_stat(pkt_mgr->runtime[i]); packet_manager_runtime_free(pkt_mgr->runtime[i]); } } diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h index a24fa7b..8e7b01a 100644 --- a/infra/packet_manager/packet_manager_private.h +++ b/infra/packet_manager/packet_manager_private.h @@ -7,6 +7,8 @@ extern "C" #include "stellar/packet_manager.h" +#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1) + struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file); void packet_manager_free(struct packet_manager *pkt_mgr); @@ -19,16 +21,20 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt); * for gtest ******************************************************************************/ -#define QUEUE_NUM_MAX (PACKET_STAGE_MAX + 1) // the last queue is for egress struct packet_manager_runtime_stat { - uint64_t input_pkts; - uint64_t output_pkts; - - uint64_t claim_pkts; - uint64_t schedule_pkts; - - uint64_t queue_len[QUEUE_NUM_MAX]; + struct + { + uint64_t pkts_input; + uint64_t pkts_output; + } total; + struct + { + uint64_t pkts_in; // include the packets that are scheduled + uint64_t pkts_out; // include the packets that are claimed + uint64_t pkts_claim; + uint64_t pkts_schedule; + } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets }; const char *packet_stage_to_str(enum packet_stage stage); diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp index e53b69d..99c2a90 100644 --- a/infra/packet_manager/test/gtest_packet_manager.cpp +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -67,16 +67,19 @@ unsigned char data[] = { 0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01, 0x04, 0x02}; -static void check_stat(struct packet_manager_runtime_stat *stat, uint64_t input_pkts, uint64_t output_pkts, uint64_t claim_pkts, uint64_t schedule_pkts) -{ - EXPECT_TRUE(stat->input_pkts == input_pkts); - EXPECT_TRUE(stat->output_pkts == output_pkts); - EXPECT_TRUE(stat->claim_pkts == claim_pkts); - EXPECT_TRUE(stat->schedule_pkts == schedule_pkts); +static struct packet_manager_runtime_stat init_stat = {}; - for (int i = 0; i < PACKET_STAGE_MAX; i++) +static void check_stat(struct packet_manager_runtime_stat *curr_stat, struct packet_manager_runtime_stat *expect_stat) +{ + EXPECT_TRUE(curr_stat->total.pkts_input == expect_stat->total.pkts_input); + EXPECT_TRUE(curr_stat->total.pkts_output == expect_stat->total.pkts_output); + + for (int i = 0; i < PACKET_QUEUE_MAX; i++) { - EXPECT_TRUE(stat->queue_len[i] == 0); + EXPECT_TRUE(curr_stat->queue[i].pkts_in == expect_stat->queue[i].pkts_in); + EXPECT_TRUE(curr_stat->queue[i].pkts_out == expect_stat->queue[i].pkts_out); + EXPECT_TRUE(curr_stat->queue[i].pkts_claim == expect_stat->queue[i].pkts_claim); + EXPECT_TRUE(curr_stat->queue[i].pkts_schedule == expect_stat->queue[i].pkts_schedule); } } @@ -111,7 +114,7 @@ static void on_packet_stage(enum packet_stage stage, struct packet *pkt, void *a count++; } -TEST(PACKET_MANAGER, SUBSCRIBER) +TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) { // global init struct mq_schema *mq_schema = mq_schema_new(); @@ -141,12 +144,24 @@ TEST(PACKET_MANAGER, SUBSCRIBER) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0); + struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + check_stat(curr_stat, &init_stat); packet_manager_runtime_input(runtime, &pkt); packet_manager_runtime_dispatch(runtime); EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt); EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL); - check_stat(packet_manager_runtime_get_stat(runtime), 1, 1, 0, 0); + struct packet_manager_runtime_stat expect_stat = { + .total = {.pkts_input = 1, .pkts_output = 1}, + .queue = { + [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_INPUT] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_FORWARD] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_OUTPUT] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_POSTROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_MAX] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + }, + }; + check_stat(curr_stat, &expect_stat); // per-thread free @@ -170,36 +185,37 @@ static void packet_claimed(struct packet *pkt, void *args) free(str); } -static void module_A_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +static void on_packet_stage_claim_packet_success(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); EXPECT_TRUE(pkt_mgr_rt); - printf("module_A_on_packet_stage: %s claim packet success\n", packet_stage_to_str(stage)); + printf("on_packet_stage_claim_packet_success: %s\n", packet_stage_to_str(stage)); static int count = 0; EXPECT_TRUE(count == 0); EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING); EXPECT_TRUE(packet_is_ctrl(pkt)); - EXPECT_TRUE(!packet_is_claim(pkt)); - packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, packet_claimed, strdup("hello")); + EXPECT_TRUE(!packet_is_claim(pkt)); // packet not claim + EXPECT_TRUE(packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, packet_claimed, strdup("hello")) == 0); // claim packet success count++; } -static void module_B_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +static void on_packet_stage_claim_packet_failed(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); EXPECT_TRUE(pkt_mgr_rt); - printf("module_B_on_packet_stage: %s claim packet failed\n", packet_stage_to_str(stage)); + printf("on_packet_stage_claim_packet_failed: %s\n", packet_stage_to_str(stage)); static int count = 0; EXPECT_TRUE(count == 0); EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING); EXPECT_TRUE(packet_is_ctrl(pkt)); - EXPECT_TRUE(packet_is_claim(pkt)); + EXPECT_TRUE(packet_is_claim(pkt)); // packet already claim + EXPECT_TRUE(packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, NULL, NULL) == -1); // claim packet failed count++; } @@ -216,17 +232,17 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) EXPECT_TRUE(pkt_mgr); struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); EXPECT_TRUE(schema); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_A_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_A_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_A_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_A_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_A_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_B_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_B_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_B_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_B_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_B_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); // per-thread init struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); @@ -239,11 +255,23 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0); + struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + check_stat(curr_stat, &init_stat); packet_manager_runtime_input(runtime, &pkt); packet_manager_runtime_dispatch(runtime); EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL); - check_stat(packet_manager_runtime_get_stat(runtime), 1, 0, 1, 0); + struct packet_manager_runtime_stat expect_stat = { + .total = {.pkts_input = 1, .pkts_output = 0}, + .queue = { + [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 1, .pkts_schedule = 0}, + [PACKET_STAGE_INPUT] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_FORWARD] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_OUTPUT] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_POSTROUTING] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_MAX] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + }, + }; + check_stat(curr_stat, &expect_stat); // per-thread free @@ -257,13 +285,13 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) #endif #if 1 -static void module_C_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +static void on_packet_stage_schedule_packet(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); EXPECT_TRUE(pkt_mgr_rt); - printf("module_C_on_packet_stage: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt); + printf("on_packet_stage_schedule_packet: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt); EXPECT_TRUE(!packet_is_claim(pkt)); @@ -289,11 +317,11 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) EXPECT_TRUE(pkt_mgr); struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); EXPECT_TRUE(schema); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_C_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_C_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_C_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_C_on_packet_stage, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_C_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); // per-thread init struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); @@ -306,7 +334,8 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0); + struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + check_stat(curr_stat, &init_stat); packet_manager_runtime_input(runtime, &pkt); packet_manager_runtime_dispatch(runtime); @@ -319,7 +348,18 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) } EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt); EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL); - check_stat(packet_manager_runtime_get_stat(runtime), 1, 5, 0, 4); + struct packet_manager_runtime_stat expect_stat = { + .total = {.pkts_input = 1, .pkts_output = 5}, + .queue = { + [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_INPUT] = {.pkts_in = 2, .pkts_out = 2, .pkts_claim = 0, .pkts_schedule = 1}, + [PACKET_STAGE_FORWARD] = {.pkts_in = 3, .pkts_out = 3, .pkts_claim = 0, .pkts_schedule = 1}, + [PACKET_STAGE_OUTPUT] = {.pkts_in = 4, .pkts_out = 4, .pkts_claim = 0, .pkts_schedule = 1}, + [PACKET_STAGE_POSTROUTING] = {.pkts_in = 5, .pkts_out = 5, .pkts_claim = 0, .pkts_schedule = 1}, + [PACKET_STAGE_MAX] = {.pkts_in = 5, .pkts_out = 5, .pkts_claim = 0, .pkts_schedule = 0}, + }, + }; + check_stat(curr_stat, &expect_stat); // per-thread free @@ -333,9 +373,100 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) #endif #if 1 -TEST(PACKET_MANAGER, CLAIM_AND_SCHEDULE_PACKET) +static void schedule_claimed_packet(struct packet *pkt, void *args) { - // TODO + struct packet_manager_runtime *pkt_mgr_rt = (struct packet_manager_runtime *)args; + printf("schedule_claimed_packet: %p\n", pkt); + EXPECT_TRUE(packet_is_ctrl(pkt)); + EXPECT_TRUE(packet_is_claim(pkt)); + + packet_manager_runtime_schedule_packet(pkt_mgr_rt, pkt, PACKET_STAGE_POSTROUTING); +} + +static void on_packet_stage_claim_packet_to_schedule(enum packet_stage stage, struct packet *pkt, void *args) +{ + struct packet_manager *pkt_mgr = (struct packet_manager *)args; + struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(pkt_mgr_rt); + + printf("on_packet_stage_claim_packet_to_schedule: %s\n", packet_stage_to_str(stage)); + + static int count = 0; + EXPECT_TRUE(packet_is_ctrl(pkt)); + EXPECT_TRUE(!packet_is_claim(pkt)); + if (stage == PACKET_STAGE_PREROUTING) + { + EXPECT_TRUE(count == 0); // packet not claim + EXPECT_TRUE(packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, schedule_claimed_packet, pkt_mgr_rt) == 0); // claim packet success + } + else if (stage == PACKET_STAGE_POSTROUTING) + { + EXPECT_TRUE(count == 1); + EXPECT_TRUE(!packet_is_claim(pkt)); + } + else + { + EXPECT_TRUE(0); + } + count++; +} + +TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) +{ + // global init + struct mq_schema *mq_schema = mq_schema_new(); + EXPECT_TRUE(mq_schema); + struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); + EXPECT_TRUE(mq_rt); + + // module init + struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); + EXPECT_TRUE(pkt_mgr); + struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); + EXPECT_TRUE(schema); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + + // per-thread init + struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(runtime); + packet_manager_runtime_init(runtime, mq_rt); + + // per-thread run + struct packet pkt; + memset(&pkt, 0, sizeof(pkt)); + packet_parse(&pkt, (const char *)data, sizeof(data)); + packet_set_ctrl(&pkt, true); + + struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + check_stat(curr_stat, &init_stat); + packet_manager_runtime_input(runtime, &pkt); + packet_manager_runtime_dispatch(runtime); + EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt); + struct packet_manager_runtime_stat expect_stat = { + .total = {.pkts_input = 1, .pkts_output = 1}, + .queue = { + [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 1, .pkts_schedule = 0}, + [PACKET_STAGE_INPUT] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_FORWARD] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_OUTPUT] = {.pkts_in = 0, .pkts_out = 0, .pkts_claim = 0, .pkts_schedule = 0}, + [PACKET_STAGE_POSTROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 1}, + [PACKET_STAGE_MAX] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, + }, + }; + check_stat(curr_stat, &expect_stat); + + // per-thread free + + // module free + packet_manager_free(pkt_mgr); + + // global free + mq_runtime_free(mq_rt); + mq_schema_free(mq_schema); } #endif