diff --git a/src/core/stellar_core.cpp b/src/core/stellar_core.cpp index eef53c0..e9fd970 100644 --- a/src/core/stellar_core.cpp +++ b/src/core/stellar_core.cpp @@ -594,7 +594,7 @@ void stellar_run(struct stellar *st) usleep(1000); // 1ms // only available in dump file mode, automatically exits when all sessions have been released - if (packet_io_wait_exit(runtime->packet_io) && all_session_have_freed(runtime, config)) + if (packet_io_isbreak(runtime->packet_io) && all_session_have_freed(runtime, config)) { stellar_stat_output(runtime->stat); // flush stat STELLAR_LOG_STATE("all sessions have been released, notify threads to exit"); diff --git a/src/packet_io/CMakeLists.txt b/src/packet_io/CMakeLists.txt index b2e55fc..8f07fd7 100644 --- a/src/packet_io/CMakeLists.txt +++ b/src/packet_io/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(packet_io dumpfile_io.cpp marsio_io.cpp lock_free_queue.cpp packet_io.cpp) +add_library(packet_io dumpfile_io.cpp marsio_io.cpp packet_io.cpp) target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(packet_io PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar) target_link_libraries(packet_io marsio pcap packet) \ No newline at end of file diff --git a/src/packet_io/dumpfile_io.cpp b/src/packet_io/dumpfile_io.cpp index d19585c..f8fc887 100644 --- a/src/packet_io/dumpfile_io.cpp +++ b/src/packet_io/dumpfile_io.cpp @@ -16,7 +16,6 @@ #include "packet_private.h" #include "packet_parser.h" #include "packet_dump.h" -#include "lock_free_queue.h" #define PACKET_IO_LOG_STATE(format, ...) LOG_STATE("dumpfile", format, ##__VA_ARGS__) #define PACKET_IO_LOG_ERROR(format, ...) LOG_ERROR("dumpfile", format, ##__VA_ARGS__) @@ -30,7 +29,7 @@ struct dumpfile_io char dumpfile_path[256]; pcap_t *pcap; - struct lock_free_queue *queue[MAX_THREAD_NUM]; + struct packet_queue *queue[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; @@ -44,10 +43,93 @@ struct pcap_pkt }; /****************************************************************************** - * Private API + * Private API -- queue ******************************************************************************/ -static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) +struct packet_queue +{ + uint64_t *queue; + uint32_t size; + uint32_t head; + uint32_t tail; +}; + +static struct packet_queue *packet_queue_new(uint32_t size) +{ + struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue)); + if (queue == NULL) + { + PACKET_IO_LOG_ERROR("unable to new packet queue"); + return NULL; + } + + queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); + if (queue->queue == NULL) + { + PACKET_IO_LOG_ERROR("unable to new packet queue"); + free(queue); + return NULL; + } + + queue->size = size; + queue->head = 0; + queue->tail = 0; + + return queue; +} + +static void packet_queue_free(struct packet_queue *queue) +{ + if (queue == NULL) + { + return; + } + + if (queue->queue) + { + free(queue->queue); + queue->queue = NULL; + } + + free(queue); +} + +static int packet_queue_push(struct packet_queue *queue, void *data) +{ + if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0) + { + PACKET_IO_LOG_ERROR("packet queue is full, retry later"); + return -1; + } + + queue->tail = (queue->tail + 1) % queue->size; + return 0; +} + +static void packet_queue_pop(struct packet_queue *queue, void **data) +{ + uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); + if (read == 0) + { + *data = NULL; + return; + } + __sync_val_compare_and_swap(&queue->queue[queue->head], read, 0); + *data = (void *)read; + queue->head = (queue->head + 1) % queue->size; +} + +static int packet_queue_isempty(struct packet_queue *queue) +{ + uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); + return read == 0; +} + +/****************************************************************************** + * Private API -- utils + ******************************************************************************/ + +static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct dumpfile_io *handle = (struct dumpfile_io *)user; @@ -69,8 +151,8 @@ static void pcap_packet_handler(u_char *user, const struct pcap_pkthdr *h, const uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING); // push packet to queue - struct lock_free_queue *queue = handle->queue[hash % handle->nr_threads]; - while (lock_free_queue_push(queue, pcap_pkt) == -1) + struct packet_queue *queue = handle->queue[hash % handle->nr_threads]; + while (packet_queue_push(queue, pcap_pkt) == -1) { if (ATOMIC_READ(&handle->io_thread_need_exit)) { @@ -103,7 +185,7 @@ static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file) PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } - pcap_loop(handle->pcap, -1, pcap_packet_handler, (u_char *)handle); + pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle); pcap_close(handle->pcap); PACKET_IO_LOG_STATE("dumpfile %s processed", resolved_path) @@ -115,7 +197,7 @@ static int all_packet_processed(struct dumpfile_io *handle) { for (uint16_t i = 0; i < handle->nr_threads; i++) { - if (!lock_free_queue_empty(handle->queue[i])) + if (!packet_queue_isempty(handle->queue[i])) { return 0; } @@ -213,7 +295,7 @@ struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mo for (uint16_t i = 0; i < handle->nr_threads; i++) { - handle->queue[i] = lock_free_queue_new(MAX_PACKET_QUEUE_SIZE); + handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE); if (handle->queue[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create packet queue"); @@ -249,7 +331,7 @@ void dumpfile_io_free(struct dumpfile_io *handle) { while (1) { - lock_free_queue_pop(handle->queue[i], (void **)&pcap_pkt); + packet_queue_pop(handle->queue[i], (void **)&pcap_pkt); if (pcap_pkt) { free(pcap_pkt); @@ -260,14 +342,14 @@ void dumpfile_io_free(struct dumpfile_io *handle) } } - lock_free_queue_free(handle->queue[i]); + packet_queue_free(handle->queue[i]); } free(handle); handle = NULL; } } -int dumpfile_io_wait_exit(struct dumpfile_io *handle) +int dumpfile_io_isbreak(struct dumpfile_io *handle) { return ATOMIC_READ(&handle->io_thread_wait_exit); } @@ -277,17 +359,17 @@ int dumpfile_io_init(struct dumpfile_io *handle __attribute__((unused)), uint16_ return 0; } -int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +uint16_t dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { - struct lock_free_queue *queue = handle->queue[thr_idx]; + struct packet_queue *queue = handle->queue[thr_idx]; struct packet_io_stat *stat = &handle->stat[thr_idx]; struct pcap_pkt *pcap_pkt = NULL; struct packet *pkt; - int nr_parsed = 0; + uint16_t nr_parsed = 0; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { - lock_free_queue_pop(queue, (void **)&pcap_pkt); + packet_queue_pop(queue, (void **)&pcap_pkt); if (pcap_pkt == NULL) { break; @@ -312,13 +394,13 @@ int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct pac return nr_parsed; } -void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; struct packet *pkt = NULL; struct packet_io_stat *stat = &handle->stat[thr_idx]; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); @@ -338,12 +420,12 @@ void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct pac } } -void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt = NULL; struct packet_io_stat *stat = &handle->stat[thr_idx]; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); @@ -357,7 +439,7 @@ void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packe } } -int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { uint16_t len; struct packet *pkt = NULL; @@ -368,7 +450,7 @@ int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct pack char src_addr[INET6_ADDRSTRLEN] = {0}; char dst_addr[INET6_ADDRSTRLEN] = {0}; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); diff --git a/src/packet_io/dumpfile_io.h b/src/packet_io/dumpfile_io.h index 7f76ddb..25d6ccc 100644 --- a/src/packet_io/dumpfile_io.h +++ b/src/packet_io/dumpfile_io.h @@ -10,13 +10,13 @@ extern "C" struct dumpfile_io; struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_threads); void dumpfile_io_free(struct dumpfile_io *handle); -int dumpfile_io_wait_exit(struct dumpfile_io *handle); +int dumpfile_io_isbreak(struct dumpfile_io *handle); int dumpfile_io_init(struct dumpfile_io *handle, uint16_t thr_idx); -int dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -int dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); +uint16_t dumpfile_io_ingress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void dumpfile_io_egress(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void dumpfile_io_yield(struct dumpfile_io *handle, uint16_t thr_idx, uint64_t timeout_ms); struct packet_io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx); diff --git a/src/packet_io/lock_free_queue.cpp b/src/packet_io/lock_free_queue.cpp deleted file mode 100644 index b3d60c1..0000000 --- a/src/packet_io/lock_free_queue.cpp +++ /dev/null @@ -1,90 +0,0 @@ -#include -#include -#include -#include - -#include "log.h" -#include "utils.h" -#include "lock_free_queue.h" - -#define LOCK_FREE_QUEUE_LOG_ERROR(format, ...) LOG_ERROR("lock free queue", format, ##__VA_ARGS__) -#define LOCK_FREE_QUEUE_LOG_DEBUG(format, ...) LOG_DEBUG("lock free queue", format, ##__VA_ARGS__) - -struct lock_free_queue -{ - uint64_t *queue; - uint32_t size; - uint32_t head; - uint32_t tail; -}; - -struct lock_free_queue *lock_free_queue_new(uint32_t size) -{ - struct lock_free_queue *queue = (struct lock_free_queue *)calloc(1, sizeof(struct lock_free_queue)); - if (queue == NULL) - { - LOCK_FREE_QUEUE_LOG_ERROR("unable to new lock free queue"); - return NULL; - } - - queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t)); - if (queue->queue == NULL) - { - LOCK_FREE_QUEUE_LOG_ERROR("unable to new lock free queue"); - free(queue); - return NULL; - } - - queue->size = size; - queue->head = 0; - queue->tail = 0; - - return queue; -} - -void lock_free_queue_free(struct lock_free_queue *queue) -{ - if (queue == NULL) - { - return; - } - - if (queue->queue) - { - free(queue->queue); - queue->queue = NULL; - } - - free(queue); -} - -int lock_free_queue_push(struct lock_free_queue *queue, void *data) -{ - if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0) - { - LOCK_FREE_QUEUE_LOG_ERROR("lock free queue is full, retry later"); - return -1; - } - - queue->tail = (queue->tail + 1) % queue->size; - return 0; -} - -void lock_free_queue_pop(struct lock_free_queue *queue, void **data) -{ - uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); - if (read == 0) - { - *data = NULL; - return; - } - __sync_val_compare_and_swap(&queue->queue[queue->head], read, 0); - *data = (void *)read; - queue->head = (queue->head + 1) % queue->size; -} - -int lock_free_queue_empty(struct lock_free_queue *queue) -{ - uint64_t read = ATOMIC_READ(&queue->queue[queue->head]); - return read == 0; -} \ No newline at end of file diff --git a/src/packet_io/lock_free_queue.h b/src/packet_io/lock_free_queue.h deleted file mode 100644 index 7acbc62..0000000 --- a/src/packet_io/lock_free_queue.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -struct lock_free_queue; - -struct lock_free_queue *lock_free_queue_new(uint32_t size); -void lock_free_queue_free(struct lock_free_queue *queue); -int lock_free_queue_empty(struct lock_free_queue *queue); - -int lock_free_queue_push(struct lock_free_queue *queue, void *data); -void lock_free_queue_pop(struct lock_free_queue *queue, void **data); - -#ifdef __cplusplus -} -#endif diff --git a/src/packet_io/marsio_io.cpp b/src/packet_io/marsio_io.cpp index 3fce3d8..89f8b26 100644 --- a/src/packet_io/marsio_io.cpp +++ b/src/packet_io/marsio_io.cpp @@ -256,21 +256,20 @@ int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx __attribute__((unu return 0; } -int marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +uint16_t marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt; marsio_buff_t *mbuff; marsio_buff_t *rx_buffs[RX_BURST_MAX]; struct packet_io_stat *stat = &handle->stat[thr_idx]; - int nr_recv; - int nr_parsed = 0; + uint16_t nr_parsed = 0; int len; char *data; - nr_recv = marsio_recv_burst(handle->mr_dev, thr_idx, rx_buffs, MIN(RX_BURST_MAX, nr_pkts)); + int nr_recv = marsio_recv_burst(handle->mr_dev, thr_idx, rx_buffs, MIN(RX_BURST_MAX, nr_pkts)); if (nr_recv <= 0) { - return 0; + return nr_parsed; } for (int i = 0; i < nr_recv; i++) @@ -294,10 +293,9 @@ int marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet continue; } - pkt = &pkts[nr_parsed]; + pkt = &pkts[nr_parsed++]; packet_parse(pkt, data, len); metadata_from_mbuff_to_packet(mbuff, pkt); - nr_parsed++; if (marsio_buff_is_ctrlbuf(mbuff)) { @@ -314,14 +312,14 @@ int marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet return nr_parsed; } -void marsio_io_egress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +void marsio_io_egress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt; marsio_buff_t *mbuff; struct packet_io_stat *stat = &handle->stat[thr_idx]; int len; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); @@ -349,13 +347,13 @@ void marsio_io_egress(struct marsio_io *handle, uint16_t thr_idx, struct packet } } -void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { struct packet *pkt; marsio_buff_t *mbuff; struct packet_io_stat *stat = &handle->stat[thr_idx]; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; mbuff = (marsio_buff_t *)packet_get_origin_ctx(pkt); @@ -369,16 +367,16 @@ void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *p } } -int marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; - int nr_inject = 0; char *ptr; + uint16_t nr_inject = 0; struct packet *pkt; marsio_buff_t *mbuff; struct packet_io_stat *stat = &handle->stat[thr_idx]; - for (int i = 0; i < nr_pkts; i++) + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; len = packet_get_raw_len(pkt); diff --git a/src/packet_io/marsio_io.h b/src/packet_io/marsio_io.h index 0bd0ba5..45d6cd8 100644 --- a/src/packet_io/marsio_io.h +++ b/src/packet_io/marsio_io.h @@ -12,10 +12,10 @@ struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, void marsio_io_free(struct marsio_io *handle); int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx); -int marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -void marsio_io_egress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -int marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, int nr_pkts); +uint16_t marsio_io_ingress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void marsio_io_egress(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void marsio_io_yield(struct marsio_io *handle, uint16_t thr_idx, uint64_t timeout_ms); struct packet_io_stat *marsio_io_stat(struct marsio_io *handle, uint16_t thr_idx); diff --git a/src/packet_io/packet_io.cpp b/src/packet_io/packet_io.cpp index b1049b7..b1fa2f3 100644 --- a/src/packet_io/packet_io.cpp +++ b/src/packet_io/packet_io.cpp @@ -57,7 +57,7 @@ void packet_io_free(struct packet_io *packet_io) } } -int packet_io_wait_exit(struct packet_io *packet_io) // used for dumpfile mode +int packet_io_isbreak(struct packet_io *packet_io) // used for dumpfile mode { if (likely(packet_io->mode == PACKET_IO_MARSIO)) { @@ -65,7 +65,7 @@ int packet_io_wait_exit(struct packet_io *packet_io) // used for dumpfile mode } else { - return dumpfile_io_wait_exit(packet_io->dumpfile); + return dumpfile_io_isbreak(packet_io->dumpfile); } } @@ -81,7 +81,7 @@ int packet_io_init(struct packet_io *packet_io, uint16_t thr_idx) } } -int packet_io_ingress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +uint16_t packet_io_ingress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { if (likely(packet_io->mode == PACKET_IO_MARSIO)) { @@ -93,7 +93,7 @@ int packet_io_ingress(struct packet_io *packet_io, uint16_t thr_idx, struct pack } } -void packet_io_egress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +void packet_io_egress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { if (likely(packet_io->mode == PACKET_IO_MARSIO)) { @@ -105,7 +105,7 @@ void packet_io_egress(struct packet_io *packet_io, uint16_t thr_idx, struct pack } } -void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { if (likely(packet_io->mode == PACKET_IO_MARSIO)) { @@ -117,7 +117,7 @@ void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet } } -int packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts) +uint16_t packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { if (likely(packet_io->mode == PACKET_IO_MARSIO)) { diff --git a/src/packet_io/packet_io.h b/src/packet_io/packet_io.h index ef1a16f..c04d79b 100644 --- a/src/packet_io/packet_io.h +++ b/src/packet_io/packet_io.h @@ -71,13 +71,13 @@ struct packet_io_options struct packet_io; struct packet_io *packet_io_new(struct packet_io_options *opts); void packet_io_free(struct packet_io *packet_io); -int packet_io_wait_exit(struct packet_io *packet_io); // used for dumpfile mode +int packet_io_isbreak(struct packet_io *packet_io); // used for dumpfile mode int packet_io_init(struct packet_io *packet_io, uint16_t thr_idx); -int packet_io_ingress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -void packet_io_egress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts); -int packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, int nr_pkts); +uint16_t packet_io_ingress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void packet_io_egress(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); +uint16_t packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void packet_io_yield(struct packet_io *packet_io, uint16_t thr_idx, uint64_t timeout_ms); struct packet_io_stat *packet_io_stat(struct packet_io *packet_io, uint16_t thr_idx);