test(packet manager): re-schedule claimed packets

This commit is contained in:
luwenpeng
2024-09-18 14:23:01 +08:00
parent 721d5d1466
commit 22ba2e1d96
4 changed files with 224 additions and 80 deletions

View File

@@ -27,8 +27,9 @@ struct packet_manager_runtime
uint16_t idx;
void *cb_args;
on_packet_claimed_callback *claimed_cb;
enum packet_stage stage;
struct mq_runtime *mq;
struct packet_queue queue[QUEUE_NUM_MAX];
struct packet_queue queue[PACKET_QUEUE_MAX];
struct packet_manager_runtime_stat stat;
};
@@ -58,7 +59,6 @@ const char *packet_stage_to_str(enum packet_stage stage)
case PACKET_STAGE_POSTROUTING:
return "postrouting";
default:
assert(0);
return "unknown";
}
}
@@ -183,7 +183,7 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime)
{
if (runtime)
{
for (int i = 0; i < QUEUE_NUM_MAX; i++)
for (int i = 0; i < PACKET_QUEUE_MAX; i++)
{
struct packet *pkt = NULL;
while ((pkt = TAILQ_FIRST(&runtime->queue[i])))
@@ -210,7 +210,7 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx)
runtime->idx = idx;
for (int i = 0; i < QUEUE_NUM_MAX; i++)
for (int i = 0; i < PACKET_QUEUE_MAX; i++)
{
TAILQ_INIT(&runtime->queue[i]);
}
@@ -220,15 +220,19 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx)
void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime)
{
PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, claim_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}",
runtime->idx,
runtime->stat.input_pkts, runtime->stat.output_pkts,
runtime->stat.claim_pkts, runtime->stat.schedule_pkts,
runtime->stat.queue_len[PACKET_STAGE_PREROUTING],
runtime->stat.queue_len[PACKET_STAGE_INPUT],
runtime->stat.queue_len[PACKET_STAGE_FORWARD],
runtime->stat.queue_len[PACKET_STAGE_OUTPUT],
runtime->stat.queue_len[PACKET_STAGE_POSTROUTING]);
PACKET_MANAGER_LOG_DEBUG("runtime[%d] current stage: %s, pkts_input: %lu, pkts_output: %lu",
runtime->idx, packet_stage_to_str(runtime->stage),
runtime->stat.total.pkts_input, runtime->stat.total.pkts_output);
for (int i = 0; i < PACKET_QUEUE_MAX; i++)
{
PACKET_MANAGER_LOG_DEBUG("runtime[%d] (%11s) queue stat => pkts_in: %lu, pkts_out: %lu, pkts_claim: %lu, pkts_schedule: %lu",
runtime->idx,
packet_stage_to_str(i),
runtime->stat.queue[i].pkts_in,
runtime->stat.queue[i].pkts_out,
runtime->stat.queue[i].pkts_claim,
runtime->stat.queue[i].pkts_schedule);
}
}
struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime)
@@ -243,19 +247,19 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru
void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt)
{
pkt_mgr_rt->stat.input_pkts++;
pkt_mgr_rt->stat.queue_len[PACKET_STAGE_PREROUTING]++;
pkt_mgr_rt->stat.total.pkts_input++;
pkt_mgr_rt->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe);
}
struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt)
{
struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1]);
struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_MAX]);
if (pkt)
{
pkt_mgr_rt->stat.output_pkts++;
pkt_mgr_rt->stat.queue_len[QUEUE_NUM_MAX - 1]--;
TAILQ_REMOVE(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1], pkt, stage_tqe);
pkt_mgr_rt->stat.total.pkts_output++;
pkt_mgr_rt->stat.queue[PACKET_STAGE_MAX].pkts_out++;
TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_MAX], pkt, stage_tqe);
}
return pkt;
}
@@ -264,19 +268,20 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt)
{
for (int i = 0; i < PACKET_STAGE_MAX; i++)
{
pkt_mgr_rt->stage = i;
packet_manager_runtime_print_stat(pkt_mgr_rt);
struct packet *pkt = NULL;
while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i])))
while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[pkt_mgr_rt->stage])))
{
packet_set_claim(pkt, false);
pkt_mgr_rt->claimed_cb = NULL;
pkt_mgr_rt->cb_args = NULL;
TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe);
pkt_mgr_rt->stat.queue_len[i]--;
TAILQ_REMOVE(&pkt_mgr_rt->queue[pkt_mgr_rt->stage], pkt, stage_tqe);
pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_out++;
mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt);
mq_runtime_publish_message(pkt_mgr_rt->mq, pkt_mgr_rt->stage, pkt);
mq_runtime_dispatch(pkt_mgr_rt->mq);
if (packet_is_claim(pkt))
@@ -285,13 +290,15 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt)
{
pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args);
}
packet_set_claim(pkt, false);
continue;
}
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe);
pkt_mgr_rt->stat.queue_len[i + 1]++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[pkt_mgr_rt->stage + 1], pkt, stage_tqe);
pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage + 1].pkts_in++;
}
}
pkt_mgr_rt->stage = -1;
}
int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args)
@@ -306,7 +313,7 @@ int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_r
pkt_mgr_rt->claimed_cb = cb;
pkt_mgr_rt->cb_args = args;
packet_set_claim(pkt, true);
pkt_mgr_rt->stat.claim_pkts++;
pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_claim++;
return 0;
}
}
@@ -320,8 +327,8 @@ void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_m
return;
}
pkt_mgr_rt->stat.schedule_pkts++;
pkt_mgr_rt->stat.queue_len[stage]++;
pkt_mgr_rt->stat.queue[stage].pkts_schedule++;
pkt_mgr_rt->stat.queue[stage].pkts_in++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe);
}
@@ -377,7 +384,7 @@ void packet_manager_free(struct packet_manager *pkt_mgr)
{
if (pkt_mgr->runtime[i])
{
packet_manager_runtime_print_stat(pkt_mgr->runtime[i]);
// packet_manager_runtime_print_stat(pkt_mgr->runtime[i]);
packet_manager_runtime_free(pkt_mgr->runtime[i]);
}
}