Refactored packet IO to use packet_manager_schedule_packet() instead of stellar_send_build_packet() to send user-built packets

This commit is contained in:
luwenpeng
2024-10-09 11:46:53 +08:00
parent 0f082d975e
commit 9e954386fd
9 changed files with 77 additions and 159 deletions

View File

@@ -20,9 +20,6 @@ int stellar_raw_packet_subscribe(struct stellar *st, plugin_on_packet_func *on_p
void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str);
// only send user build packet, can't send packet which come from network
// void stellar_send_build_packet(struct stellar *st, struct packet *pkt);
struct stellar *stellar_new(const char *toml_file); struct stellar *stellar_new(const char *toml_file);
void stellar_run(struct stellar *st); void stellar_run(struct stellar *st);
void stellar_free(struct stellar *st); void stellar_free(struct stellar *st);

View File

@@ -10,7 +10,7 @@
#include "packet_parser.h" #include "packet_parser.h"
#include "packet_internal.h" #include "packet_internal.h"
#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__) #define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio io", format, ##__VA_ARGS__)
struct marsio_io struct marsio_io
{ {
@@ -329,7 +329,9 @@ uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts,
void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{ {
int is_injected_packet = 0;
int len; int len;
char *ptr;
struct packet *pkt; struct packet *pkt;
marsio_buff_t *mbuff; marsio_buff_t *mbuff;
struct marsio_io *mr_io = (struct marsio_io *)handle; struct marsio_io *mr_io = (struct marsio_io *)handle;
@@ -337,17 +339,29 @@ void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint1
for (uint16_t i = 0; i < nr_pkts; i++) for (uint16_t i = 0; i < nr_pkts; i++)
{ {
is_injected_packet = 0;
pkt = &pkts[i]; pkt = &pkts[i];
len = packet_get_raw_len(pkt); len = packet_get_raw_len(pkt);
mbuff = (marsio_buff_t *)packet_get_origin_ctx(pkt);
if (mbuff == NULL)
{
if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet");
continue;
}
ptr = marsio_buff_append(mbuff, len);
memcpy(ptr, packet_get_raw_data(pkt), len);
is_injected_packet = 1;
}
metadata_from_packet_to_mbuff(pkt, mbuff);
stat->pkts_tx++; stat->pkts_tx++;
stat->bytes_tx += len; stat->bytes_tx += len;
mbuff = (marsio_buff_t *)packet_get_origin_ctx(pkt); if (packet_is_ctrl(pkt))
assert(mbuff != NULL);
metadata_from_packet_to_mbuff(pkt, mbuff);
if (marsio_buff_is_ctrlbuf(mbuff))
{ {
stat->ctrl_pkts_tx++; stat->ctrl_pkts_tx++;
stat->ctrl_bytes_tx += len; stat->ctrl_bytes_tx += len;
@@ -358,8 +372,16 @@ void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint1
stat->raw_bytes_tx += len; stat->raw_bytes_tx += len;
} }
marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1); if (is_injected_packet)
packet_free(pkt); {
stat->pkts_injected++;
stat->bytes_injected += len;
marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH);
}
else
{
marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1);
}
} }
} }
@@ -380,53 +402,9 @@ void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_
stat->bytes_dropped += packet_get_raw_len(pkt); stat->bytes_dropped += packet_get_raw_len(pkt);
marsio_buff_free(mr_io->mr_ins, &mbuff, 1, 0, thr_idx); marsio_buff_free(mr_io->mr_ins, &mbuff, 1, 0, thr_idx);
} }
packet_free(pkt);
} }
} }
uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
int len;
char *ptr;
uint16_t nr_inject = 0;
struct packet *pkt;
marsio_buff_t *mbuff;
struct marsio_io *mr_io = (struct marsio_io *)handle;
struct packet_io_stat *stat = &mr_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
len = packet_get_raw_len(pkt);
if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet");
continue;
}
stat->pkts_injected++;
stat->bytes_injected += len;
stat->raw_pkts_tx++;
stat->raw_bytes_tx += len;
stat->pkts_tx++;
stat->bytes_tx += len;
nr_inject++;
ptr = marsio_buff_append(mbuff, len);
memcpy(ptr, packet_get_raw_data(pkt), len);
metadata_from_packet_to_mbuff(pkt, mbuff);
marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH);
packet_free(pkt);
}
return nr_inject;
}
void marsio_io_yield(void *handle, uint16_t thr_idx) void marsio_io_yield(void *handle, uint16_t thr_idx)
{ {
struct marsio_io *mr_io = (struct marsio_io *)handle; struct marsio_io *mr_io = (struct marsio_io *)handle;

View File

@@ -15,7 +15,6 @@ int marsio_io_init(void *handle, uint16_t thr_idx);
uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void marsio_io_yield(void *handle, uint16_t thr_idx); void marsio_io_yield(void *handle, uint16_t thr_idx);
struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx); struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx);

View File

@@ -23,7 +23,6 @@ struct packet_io
uint16_t (*ingress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); uint16_t (*ingress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void (*egress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void (*egress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
uint16_t (*inject_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void (*yield_func)(void *handle, uint16_t thr_idx); void (*yield_func)(void *handle, uint16_t thr_idx);
struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx); struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx);
}; };
@@ -261,7 +260,6 @@ struct packet_io *packet_io_new(const char *toml_file)
pkt_io->ingress_func = marsio_io_ingress; pkt_io->ingress_func = marsio_io_ingress;
pkt_io->egress_func = marsio_io_egress; pkt_io->egress_func = marsio_io_egress;
pkt_io->drop_func = marsio_io_drop; pkt_io->drop_func = marsio_io_drop;
pkt_io->inject_func = marsio_io_inject;
pkt_io->yield_func = marsio_io_yield; pkt_io->yield_func = marsio_io_yield;
pkt_io->stat_func = marsio_io_stat; pkt_io->stat_func = marsio_io_stat;
} }
@@ -274,7 +272,6 @@ struct packet_io *packet_io_new(const char *toml_file)
pkt_io->ingress_func = pcap_io_ingress; pkt_io->ingress_func = pcap_io_ingress;
pkt_io->egress_func = pcap_io_egress; pkt_io->egress_func = pcap_io_egress;
pkt_io->drop_func = pcap_io_drop; pkt_io->drop_func = pcap_io_drop;
pkt_io->inject_func = pcap_io_inject;
pkt_io->yield_func = pcap_io_yield; pkt_io->yield_func = pcap_io_yield;
pkt_io->stat_func = pcap_io_stat; pkt_io->stat_func = pcap_io_stat;
} }
@@ -331,11 +328,6 @@ void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *p
pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts); pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
} }
uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
return pkt_io->inject_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
}
void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx) void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx)
{ {
pkt_io->yield_func(pkt_io->handle, thr_idx); pkt_io->yield_func(pkt_io->handle, thr_idx);

View File

@@ -74,7 +74,6 @@ int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx);
uint16_t packet_io_ingress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); uint16_t packet_io_ingress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void packet_io_egress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void packet_io_egress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx); void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx);
struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx); struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx);

View File

@@ -407,10 +407,15 @@ uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, ui
void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{ {
int len; int len;
struct tuple6 tuple;
struct packet *pkt = NULL; struct packet *pkt = NULL;
struct pcap_io *pcap_io = (struct pcap_io *)handle; struct pcap_io *pcap_io = (struct pcap_io *)handle;
struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
char file[PATH_MAX] = {0};
char src_addr[INET6_ADDRSTRLEN] = {0};
char dst_addr[INET6_ADDRSTRLEN] = {0};
for (uint16_t i = 0; i < nr_pkts; i++) for (uint16_t i = 0; i < nr_pkts; i++)
{ {
pkt = &pkts[i]; pkt = &pkts[i];
@@ -419,15 +424,51 @@ void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_
stat->pkts_tx++; stat->pkts_tx++;
stat->bytes_tx += len; stat->bytes_tx += len;
stat->raw_pkts_tx++; if (packet_is_ctrl(pkt))
stat->raw_bytes_tx += len; {
stat->ctrl_pkts_tx++;
stat->ctrl_bytes_tx += len;
}
else
{
stat->raw_pkts_tx++;
stat->raw_bytes_tx += len;
}
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt);
if (pcap_pkt) if (pcap_pkt)
{ {
free(pcap_pkt); free(pcap_pkt);
} }
packet_free(pkt); else
{
stat->pkts_injected++;
stat->bytes_injected += len;
memset(&tuple, 0, sizeof(struct tuple6));
packet_get_innermost_tuple6(pkt, &tuple);
if (tuple.addr_family == AF_INET)
{
inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN);
}
else
{
inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN);
}
snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected);
if (packet_dump_pcap(pkt, file) == -1)
{
PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file);
}
else
{
PCAP_IO_LOG_FATAL("dump inject packet: %s", file);
}
}
} }
} }
@@ -451,61 +492,6 @@ void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t
} }
} }
uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
uint16_t len;
struct tuple6 tuple;
struct packet *pkt = NULL;
struct pcap_io *pcap_io = (struct pcap_io *)handle;
struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
char file[PATH_MAX] = {0};
char src_addr[INET6_ADDRSTRLEN] = {0};
char dst_addr[INET6_ADDRSTRLEN] = {0};
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
len = packet_get_raw_len(pkt);
stat->pkts_injected++;
stat->bytes_injected += len;
stat->raw_pkts_tx++;
stat->raw_bytes_tx += len;
stat->pkts_tx++;
stat->bytes_tx += len;
memset(&tuple, 0, sizeof(struct tuple6));
packet_get_innermost_tuple6(pkt, &tuple);
if (tuple.addr_family == AF_INET)
{
inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN);
}
else
{
inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN);
}
snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected);
if (packet_dump_pcap(pkt, file) == -1)
{
PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file);
}
else
{
PCAP_IO_LOG_FATAL("dump inject packet: %s", file);
}
packet_free(pkt);
}
return nr_pkts;
}
void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)))
{ {
return; return;

View File

@@ -15,7 +15,6 @@ int pcap_io_init(void *handle, uint16_t thr_idx);
uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void pcap_io_yield(void *handle, uint16_t thr_idx); void pcap_io_yield(void *handle, uint16_t thr_idx);
struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx); struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx);

View File

@@ -100,10 +100,12 @@ static void *worker_thread(void *arg)
if (packet_get_action(pkt) == PACKET_ACTION_DROP) if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{ {
packet_io_drop(pkt_io, thread_id, pkt, 1); packet_io_drop(pkt_io, thread_id, pkt, 1);
packet_free(pkt);
} }
else else
{ {
packet_io_egress(pkt_io, thread_id, pkt, 1); packet_io_egress(pkt_io, thread_id, pkt, 1);
packet_free(pkt);
} }
stellar_polling_dispatch(polling_mgr); stellar_polling_dispatch(polling_mgr);
} }
@@ -281,40 +283,6 @@ void stellar_reload_log_level(struct stellar *st)
} }
} }
/******************************************************************************
* Stellar Utility Function
******************************************************************************/
// TODO
#if 0
// only send user build packet, can't send packet which come from network
void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
{
uint16_t thread_id = stellar_module_manager_get_thread_id(st->st.mod_mgr);
struct packet_io *pkt_io = st->st.pkt_io;
struct session_manager_runtime *sess_mgr_rt = st->st.threads[thread_id].sess_mgr_rt;
session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt);
if (packet_is_claim(pkt))
{
PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order");
assert(0);
return;
}
if (packet_get_origin_ctx(pkt))
{
// TODO
abort();
packet_io_egress(pkt_io, thread_id, pkt, 1);
}
else
{
packet_io_inject(pkt_io, thread_id, pkt, 1);
}
}
#endif
struct logger *stellar_get_logger(struct stellar *st) struct logger *stellar_get_logger(struct stellar *st)
{ {
if (st) if (st)

View File

@@ -42,7 +42,7 @@ global:
stellar_session_plugin_dettach_current_session; stellar_session_plugin_dettach_current_session;
stellar_packet_plugin_register; stellar_packet_plugin_register;
stellar_polling_plugin_register; stellar_polling_plugin_register;
stellar_send_build_packet;
stellar_new; stellar_new;
stellar_run; stellar_run;
stellar_free; stellar_free;