update stellar thread main loop

This commit is contained in:
luwenpeng
2024-03-08 18:10:38 +08:00
parent 734f6a5135
commit ee35a26a9d
14 changed files with 406 additions and 340 deletions

View File

@@ -9,8 +9,8 @@ typedef void *new_cb(void *options);
typedef void free_cb(void *handle);
typedef void *stat_cb(void *handle);
typedef int init_cb(void *handle, uint16_t thread_id);
typedef int recv_cb(void *handle, uint16_t thread_id, struct packet **pkt);
typedef void send_cb(void *handle, uint16_t thread_id, struct packet *pkt);
typedef int recv_cb(void *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
typedef void send_cb(void *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
struct packet_io
{
@@ -109,12 +109,12 @@ int packet_io_init(struct packet_io *handle, uint16_t thread_id)
return handle->on_init(handle->handle, thread_id);
}
int packet_io_recv(struct packet_io *handle, uint16_t thread_id, struct packet **pkt)
int packet_io_ingress(struct packet_io *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
{
return handle->on_recv(handle->handle, thread_id, pkt);
return handle->on_recv(handle->handle, thread_id, pkts, nr_pkts);
}
void packet_io_send(struct packet_io *handle, uint16_t thread_id, struct packet *pkt)
void packet_io_egress(struct packet_io *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
{
handle->on_send(handle->handle, thread_id, pkt);
handle->on_send(handle->handle, thread_id, pkts, nr_pkts);
}

View File

@@ -60,8 +60,8 @@ struct packet_io_stat *packet_io_get_stat(struct packet_io *handle);
// return 0 if success, -1 if failed
int packet_io_init(struct packet_io *handle, uint16_t thread_id);
int packet_io_recv(struct packet_io *handle, uint16_t thread_id, struct packet **pkt);
void packet_io_send(struct packet_io *handle, uint16_t thread_id, struct packet *pkt);
int packet_io_ingress(struct packet_io *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
void packet_io_egress(struct packet_io *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
#ifdef __cpluscplus
}

View File

@@ -157,35 +157,55 @@ int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_i
return 0;
}
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet **pkt)
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
{
struct packet_queue *queue = handle->queue[thread_id];
struct packet *pkt = NULL;
int nr_parsed = 0;
packet_queue_pop(queue, pkt);
if (*pkt == NULL)
for (int i = 0; i < nr_pkts; i++)
{
return -1;
}
else
{
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, packet_get_len(*pkt));
return 0;
packet_queue_pop(queue, &pkt);
if (pkt == NULL)
{
break;
}
else
{
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, packet_get_len(pkt));
struct packet *temp = &pkts[nr_parsed++];
memset(temp, 0, sizeof(struct packet));
packet_parse(temp, pkt->data_ptr, pkt->data_len);
packet_set_io_ctx(temp, pkt);
packet_set_type(temp, PACKET_TYPE_DATA);
packet_set_action(temp, PACKET_ACTION_FORWARD);
}
}
return nr_parsed;
}
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkt)
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
{
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
struct packet *pkt = NULL;
for (int i = 0; i < nr_pkts; i++)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
}
else
{
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
}
pkt = &pkts[i];
packet_free(pkt);
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
}
else
{
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
}
packet_free((struct packet *)packet_get_io_ctx(pkt));
packet_free(pkt);
}
}

View File

@@ -21,8 +21,8 @@ void packet_io_dumpfile_free(struct packet_io_dumpfile *handle);
struct packet_io_stat *packet_io_dumpfile_stat(struct packet_io_dumpfile *handle);
int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_id);
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet **pkt);
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkt);
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
#ifdef __cpluscplus
}

View File

@@ -162,81 +162,92 @@ int packet_io_marsio_init(struct packet_io_marsio *handle, uint16_t thread_id)
return 0;
}
int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet **pkt)
int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
{
marsio_buff_t *rx_buff;
marsio_buff_t *rx_buffs[1];
thread_local struct packet thd_pkt;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
int nr_recv;
int nr_parsed = 0;
int raw_len;
char *raw_data;
retry:
if (marsio_recv_burst(handle->mr_dev, thread_id, rx_buffs, 1) <= 0)
nr_recv = marsio_recv_burst(handle->mr_dev, thread_id, rx_buffs, MIN(RX_BURST_MAX, nr_pkts));
if (nr_recv <= 0)
{
*pkt = NULL;
return -1;
return 0;
}
rx_buff = rx_buffs[0];
char *data = marsio_buff_mtod(rx_buff);
int len = marsio_buff_datalen(rx_buff);
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, len);
if (is_keepalive_packet(data, len))
for (int i = 0; i < nr_recv; i++)
{
ATOMIC_ADD(&handle->stat.keepalive_pkts, 1);
ATOMIC_ADD(&handle->stat.keepalive_bytes, len);
rx_buff = rx_buffs[i];
raw_data = marsio_buff_mtod(rx_buff);
raw_len = marsio_buff_datalen(rx_buff);
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, len);
marsio_send_burst(handle->mr_path, thread_id, rx_buffs, 1);
goto retry;
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, raw_len);
if (is_keepalive_packet(raw_data, raw_len))
{
ATOMIC_ADD(&handle->stat.keepalive_pkts, 1);
ATOMIC_ADD(&handle->stat.keepalive_bytes, raw_len);
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, raw_len);
marsio_send_burst(handle->mr_path, thread_id, &rx_buff, 1);
continue;
}
metadata_to_packet(rx_buff, &pkts[nr_parsed++]);
}
metadata_to_packet(rx_buff, &thd_pkt);
packet_parse(&thd_pkt, data, len);
*pkt = &thd_pkt;
return 0;
return nr_parsed;
}
void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkt)
void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
{
marsio_buff_t *tx_buff = (marsio_buff_t *)packet_get_io_ctx(pkt);
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
struct packet *pkt;
marsio_buff_t *tx_buff;
for (int i = 0; i < nr_pkts; i++)
{
if (tx_buff)
pkt = &pkts[i];
tx_buff = (marsio_buff_t *)packet_get_io_ctx(pkt);
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
marsio_buff_free(handle->mr_ins, &tx_buff, 1, 0, thread_id);
if (tx_buff)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
marsio_buff_free(handle->mr_ins, &tx_buff, 1, 0, thread_id);
}
else
{
// do nothing
}
}
else
{
// do nothing
}
}
else
{
if (tx_buff == NULL)
{
if (marsio_buff_malloc_global(handle->mr_ins, &tx_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
if (tx_buff == NULL)
{
PACKET_IO_LOG_ERROR("unable to alloc tx buffer");
return;
}
ATOMIC_ADD(&handle->stat.inject_pkts, 1);
ATOMIC_ADD(&handle->stat.inject_bytes, packet_get_len(pkt));
if (marsio_buff_malloc_global(handle->mr_ins, &tx_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
PACKET_IO_LOG_ERROR("unable to alloc tx buffer");
goto fast_end;
}
ATOMIC_ADD(&handle->stat.inject_pkts, 1);
ATOMIC_ADD(&handle->stat.inject_bytes, packet_get_len(pkt));
char *dst = marsio_buff_append(tx_buff, packet_get_len(pkt));
memcpy(dst, packet_get_data(pkt), packet_get_len(pkt));
char *dst = marsio_buff_append(tx_buff, packet_get_len(pkt));
memcpy(dst, packet_get_data(pkt), packet_get_len(pkt));
}
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
metadata_to_mbuff(tx_buff, pkt);
marsio_send_burst(handle->mr_path, thread_id, &tx_buff, 1);
}
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
metadata_to_mbuff(tx_buff, pkt);
marsio_send_burst(handle->mr_path, thread_id, &tx_buff, 1);
fast_end:
packet_free(pkt);
}
packet_free(pkt);
}

View File

@@ -23,8 +23,8 @@ void packet_io_marsio_free(struct packet_io_marsio *handle);
struct packet_io_stat *packet_io_marsio_stat(struct packet_io_marsio *handle);
int packet_io_marsio_init(struct packet_io_marsio *handle, uint16_t thread_id);
int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet **pkt);
void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkt);
int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts);
#ifdef __cpluscplus
}