feature(packet manager): support claim packt and add test case

This commit is contained in:
luwenpeng
2024-09-14 18:38:37 +08:00
parent f559d67b93
commit 721d5d1466
9 changed files with 308 additions and 101 deletions

9
deps/logger/log.c vendored
View File

@@ -284,7 +284,8 @@ int log_check_level(struct logger *logger, enum log_level level)
}
else
{
return 0;
// if logger is NULL, print to stderr
return 1;
}
}
@@ -328,6 +329,12 @@ void log_print(struct logger *logger, enum log_level level, const char *module,
// add end of line
p += snprintf(p, end - p, "\n");
if (logger == NULL)
{
fprintf(stderr, "%s", buf);
return;
}
if (logger->config.output == LOG_OUTPUT_STDERR || logger->config.output == LOG_OUTPUT_BOTH)
{
fprintf(stderr, "%s", buf);

View File

@@ -16,61 +16,37 @@ enum log_level
};
#define STELLAR_LOG_TRACE(logger, module, format, ...) \
if ((logger) == NULL) \
{ \
printf("TRACE: (%s): " format "\n", module, ##__VA_ARGS__); \
} \
else if (log_check_level((logger), LOG_TRACE)) \
if (log_check_level((logger), LOG_TRACE)) \
{ \
log_print((logger), LOG_TRACE, (module), (format), ##__VA_ARGS__); \
}
#define STELLAR_LOG_DEBUG(logger, module, format, ...) \
if ((logger) == NULL) \
{ \
printf("DEBUG: (%s): " format "\n", module, ##__VA_ARGS__); \
} \
else if (log_check_level((logger), LOG_DEBUG)) \
if (log_check_level((logger), LOG_DEBUG)) \
{ \
log_print((logger), LOG_DEBUG, (module), (format), ##__VA_ARGS__); \
}
#define STELLAR_LOG_INFO(logger, module, format, ...) \
if ((logger) == NULL) \
{ \
printf("INFO: (%s): " format "\n", module, ##__VA_ARGS__); \
} \
else if (log_check_level((logger), LOG_INFO)) \
if (log_check_level((logger), LOG_INFO)) \
{ \
log_print((logger), LOG_INFO, (module), (format), ##__VA_ARGS__); \
}
#define STELLAR_LOG_WARN(logger, module, format, ...) \
if ((logger) == NULL) \
{ \
printf("WARN: (%s): " format "\n", module, ##__VA_ARGS__); \
} \
else if (log_check_level((logger), LOG_WARN)) \
if (log_check_level((logger), LOG_WARN)) \
{ \
log_print((logger), LOG_WARN, (module), (format), ##__VA_ARGS__); \
}
#define STELLAR_LOG_ERROR(logger, module, format, ...) \
if ((logger) == NULL) \
{ \
printf("ERROR: (%s): " format "\n", module, ##__VA_ARGS__); \
} \
else if (log_check_level((logger), LOG_ERROR)) \
if (log_check_level((logger), LOG_ERROR)) \
{ \
log_print((logger), LOG_ERROR, (module), (format), ##__VA_ARGS__); \
}
#define STELLAR_LOG_FATAL(logger, module, format, ...) \
if ((logger) == NULL) \
{ \
printf("FATAL: (%s): " format "\n", module, ##__VA_ARGS__); \
} \
else if (log_check_level((logger), LOG_FATAL)) \
if (log_check_level((logger), LOG_FATAL)) \
{ \
log_print((logger), LOG_FATAL, (module), (format), ##__VA_ARGS__); \
}

View File

@@ -27,9 +27,11 @@ struct packet_manager_runtime *packet_manager_get_runtime(struct packet_manager
typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args);
int packet_manager_schema_add_subscriber(struct packet_manager_schema *pkt_mgr_schema, enum packet_stage stage, on_packet_stage_callback cb, void *args);
// take 只执行一次
// 同一 stage 后面的 msg 是否中断???不中断
void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
// if two modules claim the same packet at the same stage, the second 'claim' fails.
// return 0 on success
// return -1 on failure
typedef void on_packet_claimed_callback(struct packet *pkt, void *args);
int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args);
void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage);
#ifdef __cplusplus

View File

@@ -16,17 +16,6 @@ struct packet_manager_config
uint16_t nr_worker_thread;
};
struct packet_manager_stat
{
uint64_t input_pkts;
uint64_t output_pkts;
uint64_t take_pkts;
uint64_t schedule_pkts;
uint64_t curr_queue_len[PACKET_STAGE_MAX];
};
struct packet_manager_schema
{
struct mq_schema *mq;
@@ -36,9 +25,11 @@ struct packet_manager_schema
struct packet_manager_runtime
{
uint16_t idx;
void *cb_args;
on_packet_claimed_callback *claimed_cb;
struct mq_runtime *mq;
struct packet_queue queue[PACKET_STAGE_MAX];
struct packet_manager_stat stat;
struct packet_queue queue[QUEUE_NUM_MAX];
struct packet_manager_runtime_stat stat;
};
struct packet_manager
@@ -192,7 +183,7 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime)
{
if (runtime)
{
for (int i = 0; i < PACKET_STAGE_MAX; i++)
for (int i = 0; i < QUEUE_NUM_MAX; i++)
{
struct packet *pkt = NULL;
while ((pkt = TAILQ_FIRST(&runtime->queue[i])))
@@ -219,7 +210,7 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx)
runtime->idx = idx;
for (int i = 0; i < PACKET_STAGE_MAX; i++)
for (int i = 0; i < QUEUE_NUM_MAX; i++)
{
TAILQ_INIT(&runtime->queue[i]);
}
@@ -227,17 +218,22 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx)
return runtime;
}
static void packet_manager_runtime_stat(struct packet_manager_runtime *runtime)
void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime)
{
PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, take_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}",
PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, claim_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}",
runtime->idx,
runtime->stat.input_pkts, runtime->stat.output_pkts,
runtime->stat.take_pkts, runtime->stat.schedule_pkts,
runtime->stat.curr_queue_len[PACKET_STAGE_PREROUTING],
runtime->stat.curr_queue_len[PACKET_STAGE_INPUT],
runtime->stat.curr_queue_len[PACKET_STAGE_FORWARD],
runtime->stat.curr_queue_len[PACKET_STAGE_OUTPUT],
runtime->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]);
runtime->stat.claim_pkts, runtime->stat.schedule_pkts,
runtime->stat.queue_len[PACKET_STAGE_PREROUTING],
runtime->stat.queue_len[PACKET_STAGE_INPUT],
runtime->stat.queue_len[PACKET_STAGE_FORWARD],
runtime->stat.queue_len[PACKET_STAGE_OUTPUT],
runtime->stat.queue_len[PACKET_STAGE_POSTROUTING]);
}
struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime)
{
return &runtime->stat;
}
void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt)
@@ -248,18 +244,18 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru
void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt)
{
pkt_mgr_rt->stat.input_pkts++;
pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_PREROUTING]++;
pkt_mgr_rt->stat.queue_len[PACKET_STAGE_PREROUTING]++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe);
}
struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt)
{
struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING]);
struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1]);
if (pkt)
{
pkt_mgr_rt->stat.output_pkts++;
pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]--;
TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING], pkt, stage_tqe);
pkt_mgr_rt->stat.queue_len[QUEUE_NUM_MAX - 1]--;
TAILQ_REMOVE(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1], pkt, stage_tqe);
}
return pkt;
}
@@ -268,47 +264,64 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt)
{
for (int i = 0; i < PACKET_STAGE_MAX; i++)
{
// packet_manager_runtime_stat(pkt_mgr_rt);
packet_manager_runtime_print_stat(pkt_mgr_rt);
struct packet *pkt = NULL;
while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i])))
{
packet_set_claim(pkt, false);
pkt_mgr_rt->claimed_cb = NULL;
pkt_mgr_rt->cb_args = NULL;
TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe);
pkt_mgr_rt->stat.curr_queue_len[i]--;
pkt_mgr_rt->stat.queue_len[i]--;
mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt);
mq_runtime_dispatch(pkt_mgr_rt->mq);
if (packet_is_stolen(pkt))
if (packet_is_claim(pkt))
{
if (pkt_mgr_rt->claimed_cb)
{
pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args);
}
continue;
}
if (i + 1 == PACKET_STAGE_MAX)
{
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i], pkt, stage_tqe);
pkt_mgr_rt->stat.curr_queue_len[i]++;
break;
}
else
{
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe);
pkt_mgr_rt->stat.curr_queue_len[i + 1]++;
}
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe);
pkt_mgr_rt->stat.queue_len[i + 1]++;
}
}
}
void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt)
int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args)
{
pkt_mgr_rt->stat.take_pkts++;
packet_set_stolen(pkt, true);
if (packet_is_claim(pkt))
{
PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again");
return -1;
}
else
{
pkt_mgr_rt->claimed_cb = cb;
pkt_mgr_rt->cb_args = args;
packet_set_claim(pkt, true);
pkt_mgr_rt->stat.claim_pkts++;
return 0;
}
}
void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage)
{
if (stage >= PACKET_STAGE_MAX)
{
PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage);
assert(0);
return;
}
pkt_mgr_rt->stat.schedule_pkts++;
packet_set_stolen(pkt, false);
pkt_mgr_rt->stat.queue_len[stage]++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe);
}
@@ -364,7 +377,7 @@ void packet_manager_free(struct packet_manager *pkt_mgr)
{
if (pkt_mgr->runtime[i])
{
packet_manager_runtime_stat(pkt_mgr->runtime[i]);
packet_manager_runtime_print_stat(pkt_mgr->runtime[i]);
packet_manager_runtime_free(pkt_mgr->runtime[i]);
}
}

View File

@@ -14,8 +14,26 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru
void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt);
void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt);
// for debug
/******************************************************************************
* for gtest
******************************************************************************/
#define QUEUE_NUM_MAX (PACKET_STAGE_MAX + 1) // the last queue is for egress
struct packet_manager_runtime_stat
{
uint64_t input_pkts;
uint64_t output_pkts;
uint64_t claim_pkts;
uint64_t schedule_pkts;
uint64_t queue_len[QUEUE_NUM_MAX];
};
const char *packet_stage_to_str(enum packet_stage stage);
void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime);
struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime);
#ifdef __cplusplus
}

View File

@@ -29,7 +29,7 @@ struct metadata
uint64_t domain;
uint16_t link_id;
bool is_ctrl;
bool is_stolen;
bool is_claim;
enum packet_direction direction;
enum packet_action action;
@@ -102,8 +102,8 @@ uint16_t packet_get_link_id(const struct packet *pkt);
void packet_set_ctrl(struct packet *pkt, bool ctrl);
bool packet_is_ctrl(const struct packet *pkt);
void packet_set_stolen(struct packet *pkt, bool stolen);
bool packet_is_stolen(const struct packet *pkt);
void packet_set_claim(struct packet *pkt, bool claim);
bool packet_is_claim(const struct packet *pkt);
void packet_set_direction(struct packet *pkt, enum packet_direction dir);

View File

@@ -1,3 +1,5 @@
#include <assert.h>
#include "tuple.h"
#include "uthash.h"
#include "log_private.h"
@@ -101,14 +103,14 @@ bool packet_is_ctrl(const struct packet *pkt)
return pkt->meta.is_ctrl;
}
void packet_set_stolen(struct packet *pkt, bool stolen)
void packet_set_claim(struct packet *pkt, bool claim)
{
pkt->meta.is_stolen = stolen;
pkt->meta.is_claim = claim;
}
bool packet_is_stolen(const struct packet *pkt)
bool packet_is_claim(const struct packet *pkt)
{
return pkt->meta.is_stolen;
return pkt->meta.is_claim;
}
void packet_set_direction(struct packet *pkt, enum packet_direction dir)
@@ -929,9 +931,19 @@ struct packet *packet_dup(const struct packet *pkt)
void packet_free(struct packet *pkt)
{
if (pkt && pkt->need_free)
if (pkt)
{
free((void *)pkt);
if (packet_is_claim(pkt))
{
PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order");
assert(0);
return;
}
if (pkt->need_free)
{
free((void *)pkt);
}
}
}

View File

@@ -67,6 +67,19 @@ unsigned char data[] = {
0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01,
0x04, 0x02};
static void check_stat(struct packet_manager_runtime_stat *stat, uint64_t input_pkts, uint64_t output_pkts, uint64_t claim_pkts, uint64_t schedule_pkts)
{
EXPECT_TRUE(stat->input_pkts == input_pkts);
EXPECT_TRUE(stat->output_pkts == output_pkts);
EXPECT_TRUE(stat->claim_pkts == claim_pkts);
EXPECT_TRUE(stat->schedule_pkts == schedule_pkts);
for (int i = 0; i < PACKET_STAGE_MAX; i++)
{
EXPECT_TRUE(stat->queue_len[i] == 0);
}
}
#if 1
TEST(PACKET_MANAGER, NEW_FREE)
{
@@ -86,20 +99,19 @@ TEST(PACKET_MANAGER, NEW_FREE)
}
#endif
#if 1
static void on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
{
printf("stage: %s\n", packet_stage_to_str(stage));
printf("on_packet_stage: %s\n", packet_stage_to_str(stage));
static int count = 0;
EXPECT_TRUE(count == stage);
count++;
EXPECT_TRUE(packet_is_ctrl(pkt));
EXPECT_TRUE(args == NULL);
count++;
}
#if 1
TEST(PACKET_MANAGER, NORNMAL)
TEST(PACKET_MANAGER, SUBSCRIBER)
{
// global init
struct mq_schema *mq_schema = mq_schema_new();
@@ -129,9 +141,12 @@ TEST(PACKET_MANAGER, NORNMAL)
packet_parse(&pkt, (const char *)data, sizeof(data));
packet_set_ctrl(&pkt, true);
check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0);
packet_manager_runtime_input(runtime, &pkt);
packet_manager_runtime_dispatch(runtime);
EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt);
EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL);
check_stat(packet_manager_runtime_get_stat(runtime), 1, 1, 0, 0);
// per-thread free
@@ -145,18 +160,182 @@ TEST(PACKET_MANAGER, NORNMAL)
#endif
#if 1
TEST(PACKET_MANAGER, TAKE)
static void packet_claimed(struct packet *pkt, void *args)
{
// TODO
// packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
char *str = (char *)args;
EXPECT_STREQ(str, "hello");
printf("packet_claimed: with ctx %s\n", str);
EXPECT_TRUE(packet_is_ctrl(pkt));
EXPECT_TRUE(packet_is_claim(pkt));
free(str);
}
static void module_A_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
{
struct packet_manager *pkt_mgr = (struct packet_manager *)args;
struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0);
EXPECT_TRUE(pkt_mgr_rt);
printf("module_A_on_packet_stage: %s claim packet success\n", packet_stage_to_str(stage));
static int count = 0;
EXPECT_TRUE(count == 0);
EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING);
EXPECT_TRUE(packet_is_ctrl(pkt));
EXPECT_TRUE(!packet_is_claim(pkt));
packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, packet_claimed, strdup("hello"));
count++;
}
static void module_B_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
{
struct packet_manager *pkt_mgr = (struct packet_manager *)args;
struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0);
EXPECT_TRUE(pkt_mgr_rt);
printf("module_B_on_packet_stage: %s claim packet failed\n", packet_stage_to_str(stage));
static int count = 0;
EXPECT_TRUE(count == 0);
EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING);
EXPECT_TRUE(packet_is_ctrl(pkt));
EXPECT_TRUE(packet_is_claim(pkt));
count++;
}
TEST(PACKET_MANAGER, CLAIM_PACKET)
{
// global init
struct mq_schema *mq_schema = mq_schema_new();
EXPECT_TRUE(mq_schema);
struct mq_runtime *mq_rt = mq_runtime_new(mq_schema);
EXPECT_TRUE(mq_rt);
// module init
struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml");
EXPECT_TRUE(pkt_mgr);
struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr);
EXPECT_TRUE(schema);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_A_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_A_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_A_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_A_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_A_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_B_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_B_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_B_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_B_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_B_on_packet_stage, pkt_mgr) == 0);
// per-thread init
struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0);
EXPECT_TRUE(runtime);
packet_manager_runtime_init(runtime, mq_rt);
// per-thread run
struct packet pkt;
memset(&pkt, 0, sizeof(pkt));
packet_parse(&pkt, (const char *)data, sizeof(data));
packet_set_ctrl(&pkt, true);
check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0);
packet_manager_runtime_input(runtime, &pkt);
packet_manager_runtime_dispatch(runtime);
EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL);
check_stat(packet_manager_runtime_get_stat(runtime), 1, 0, 1, 0);
// per-thread free
// module free
packet_manager_free(pkt_mgr);
// global free
mq_runtime_free(mq_rt);
mq_schema_free(mq_schema);
}
#endif
#if 1
TEST(PACKET_MANAGER, SCHEDULE)
static void module_C_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
{
struct packet_manager *pkt_mgr = (struct packet_manager *)args;
struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0);
EXPECT_TRUE(pkt_mgr_rt);
printf("module_C_on_packet_stage: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt);
EXPECT_TRUE(!packet_is_claim(pkt));
if (stage == PACKET_STAGE_PREROUTING)
{
packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_INPUT);
packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_FORWARD);
packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_OUTPUT);
packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_POSTROUTING);
}
}
TEST(PACKET_MANAGER, SCHEDULE_PACKET)
{
// global init
struct mq_schema *mq_schema = mq_schema_new();
EXPECT_TRUE(mq_schema);
struct mq_runtime *mq_rt = mq_runtime_new(mq_schema);
EXPECT_TRUE(mq_rt);
// module init
struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml");
EXPECT_TRUE(pkt_mgr);
struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr);
EXPECT_TRUE(schema);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_C_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_C_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_C_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_C_on_packet_stage, pkt_mgr) == 0);
EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_C_on_packet_stage, pkt_mgr) == 0);
// per-thread init
struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0);
EXPECT_TRUE(runtime);
packet_manager_runtime_init(runtime, mq_rt);
// per-thread run
struct packet pkt;
memset(&pkt, 0, sizeof(pkt));
packet_parse(&pkt, (const char *)data, sizeof(data));
packet_set_ctrl(&pkt, true);
check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0);
packet_manager_runtime_input(runtime, &pkt);
packet_manager_runtime_dispatch(runtime);
struct packet *tmp = NULL;
for (int i = 0; i < 4; i++)
{
tmp = packet_manager_runtime_output(runtime);
EXPECT_TRUE(tmp);
packet_free(tmp);
}
EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt);
EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL);
check_stat(packet_manager_runtime_get_stat(runtime), 1, 5, 0, 4);
// per-thread free
// module free
packet_manager_free(pkt_mgr);
// global free
mq_runtime_free(mq_rt);
mq_schema_free(mq_schema);
}
#endif
#if 1
TEST(PACKET_MANAGER, CLAIM_AND_SCHEDULE_PACKET)
{
// TODO
// packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage);
}
#endif

View File

@@ -233,7 +233,7 @@ void tcp_reassembly_inc_recv_next(struct tcp_reassembly *tcp_reass, uint32_t off
}
tcp_reass->recv_next = uint32_add(tcp_reass->recv_next, offset);
TCP_REASSEMBLY_LOG_DEBUG("tcp_reass %p inc recv_next %u to %lu", tcp_reass, offset, tcp_reass->recv_next);
TCP_REASSEMBLY_LOG_DEBUG("tcp_reass %p inc recv_next %u to %u", tcp_reass, offset, tcp_reass->recv_next);
}
void tcp_reassembly_set_recv_next(struct tcp_reassembly *tcp_reass, uint32_t seq)