reactor packet I/O & duplicated packet filter & evicted session filter
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
#include <pcap/pcap.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
|
||||
#include "stellar.h"
|
||||
#include "file_scan.h"
|
||||
#include "packet_io.h"
|
||||
#include "packet_utils.h"
|
||||
#include "packet_queue.h"
|
||||
#include "lock_free_queue.h"
|
||||
#include "packet_io_dumpfile.h"
|
||||
|
||||
#define MAX_PACKET_QUEUE_SIZE (4096 * 1000)
|
||||
@@ -17,12 +19,18 @@ struct packet_io_dumpfile
|
||||
char dumpfile_dir[256];
|
||||
|
||||
pcap_t *pcap;
|
||||
struct packet_queue *queue[MAX_THREAD_NUM];
|
||||
struct lock_free_queue *queue[MAX_THREAD_NUM];
|
||||
struct packet_io_stat stat;
|
||||
uint64_t io_thread_need_exit;
|
||||
uint64_t io_thread_is_runing;
|
||||
};
|
||||
|
||||
struct pcap_pkt
|
||||
{
|
||||
char *data;
|
||||
int len;
|
||||
};
|
||||
|
||||
/******************************************************************************
|
||||
* Private API
|
||||
******************************************************************************/
|
||||
@@ -31,19 +39,25 @@ static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char
|
||||
{
|
||||
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)user;
|
||||
|
||||
struct packet *pkt = packet_new(h->caplen);
|
||||
if (pkt == NULL)
|
||||
// copy packet data to new memory
|
||||
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen);
|
||||
if (pcap_pkt == NULL)
|
||||
{
|
||||
PACKET_IO_LOG_ERROR("unable to alloc packet");
|
||||
return;
|
||||
}
|
||||
pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt);
|
||||
pcap_pkt->len = h->caplen;
|
||||
memcpy((char *)pcap_pkt->data, bytes, h->caplen);
|
||||
|
||||
memcpy((char *)pkt->data_ptr, bytes, h->caplen);
|
||||
packet_parse(pkt, pkt->data_ptr, h->caplen);
|
||||
// calculate packet hash
|
||||
struct packet pkt;
|
||||
packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len);
|
||||
uint64_t hash = packet_get_hash(&pkt, LDBC_METHOD_HASH_INT_IP_AND_EXT_IP, 0);
|
||||
|
||||
uint64_t hash = packet_get_hash(pkt, LDBC_METHOD_HASH_INT_IP_AND_EXT_IP, 0);
|
||||
struct packet_queue *queue = handle->queue[hash % handle->nr_threads];
|
||||
packet_queue_push(queue, pkt);
|
||||
// push packet to queue
|
||||
struct lock_free_queue *queue = handle->queue[hash % handle->nr_threads];
|
||||
lock_free_queue_push(queue, pcap_pkt);
|
||||
|
||||
if (ATOMIC_READ(&handle->io_thread_need_exit))
|
||||
{
|
||||
@@ -56,7 +70,7 @@ static int dumpfile_handle(const char *file, void *arg)
|
||||
{
|
||||
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg;
|
||||
|
||||
PACKET_IO_LOG_STATE("dumpfile %s inprocessing", file)
|
||||
PACKET_IO_LOG_STATE("dumpfile %s in-processing", file)
|
||||
|
||||
handle->pcap = pcap_open_offline(file, NULL);
|
||||
if (handle->pcap == NULL)
|
||||
@@ -91,7 +105,7 @@ static void *dumpfile_thread_cycle(void *arg)
|
||||
* Public API
|
||||
******************************************************************************/
|
||||
|
||||
struct packet_io_dumpfile *packet_io_dumpfile_new(struct packet_io_dumpfile_opts *opts)
|
||||
struct packet_io_dumpfile *packet_io_dumpfile_new(struct packet_io_dumpfile_options *opts)
|
||||
{
|
||||
pthread_t tid;
|
||||
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)calloc(1, sizeof(struct packet_io_dumpfile));
|
||||
@@ -106,7 +120,7 @@ struct packet_io_dumpfile *packet_io_dumpfile_new(struct packet_io_dumpfile_opts
|
||||
|
||||
for (uint16_t i = 0; i < handle->nr_threads; i++)
|
||||
{
|
||||
handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE);
|
||||
handle->queue[i] = lock_free_queue_new(MAX_PACKET_QUEUE_SIZE);
|
||||
if (handle->queue[i] == NULL)
|
||||
{
|
||||
PACKET_IO_LOG_ERROR("unable to create packet queue");
|
||||
@@ -140,14 +154,14 @@ void packet_io_dumpfile_free(struct packet_io_dumpfile *handle)
|
||||
|
||||
for (uint16_t i = 0; i < handle->nr_threads; i++)
|
||||
{
|
||||
packet_queue_free(handle->queue[i]);
|
||||
lock_free_queue_free(handle->queue[i]);
|
||||
}
|
||||
free(handle);
|
||||
handle = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
struct packet_io_stat *packet_io_dumpfile_stat(struct packet_io_dumpfile *handle)
|
||||
struct packet_io_stat *packet_io_dumpfile_get_stat(struct packet_io_dumpfile *handle)
|
||||
{
|
||||
return &handle->stat;
|
||||
}
|
||||
@@ -157,55 +171,84 @@ int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_i
|
||||
return 0;
|
||||
}
|
||||
|
||||
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
|
||||
int packet_io_dumpfile_ingress(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
|
||||
{
|
||||
struct packet_queue *queue = handle->queue[thread_id];
|
||||
struct packet *pkt = NULL;
|
||||
struct lock_free_queue *queue = handle->queue[thread_id];
|
||||
struct pcap_pkt *pcap_pkt = NULL;
|
||||
int nr_parsed = 0;
|
||||
|
||||
for (int i = 0; i < nr_pkts; i++)
|
||||
{
|
||||
packet_queue_pop(queue, &pkt);
|
||||
if (pkt == NULL)
|
||||
lock_free_queue_pop(queue, (void **)&pcap_pkt);
|
||||
if (pcap_pkt == NULL)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.rx_bytes, packet_get_len(pkt));
|
||||
ATOMIC_ADD(&handle->stat.rx_bytes, pcap_pkt->len);
|
||||
|
||||
struct packet *temp = &pkts[nr_parsed++];
|
||||
memset(temp, 0, sizeof(struct packet));
|
||||
packet_parse(temp, pkt->data_ptr, pkt->data_len);
|
||||
packet_set_io_ctx(temp, pkt);
|
||||
packet_set_type(temp, PACKET_TYPE_DATA);
|
||||
packet_set_action(temp, PACKET_ACTION_FORWARD);
|
||||
struct packet *pkt = &pkts[nr_parsed++];
|
||||
memset(pkt, 0, sizeof(struct packet));
|
||||
packet_parse(pkt, pcap_pkt->data, pcap_pkt->len);
|
||||
packet_set_user_data(pkt, pcap_pkt);
|
||||
packet_set_type(pkt, PACKET_TYPE_DATA);
|
||||
packet_set_action(pkt, PACKET_ACTION_FORWARD);
|
||||
}
|
||||
}
|
||||
|
||||
return nr_parsed;
|
||||
}
|
||||
|
||||
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
|
||||
// pkts from packet_io_dumpfile_ingress
|
||||
void packet_io_dumpfile_egress(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
|
||||
{
|
||||
struct packet *pkt = NULL;
|
||||
for (int i = 0; i < nr_pkts; i++)
|
||||
{
|
||||
pkt = &pkts[i];
|
||||
|
||||
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
|
||||
{
|
||||
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
|
||||
}
|
||||
else
|
||||
{
|
||||
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
|
||||
}
|
||||
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
|
||||
|
||||
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_user_data(pkt);
|
||||
assert(pcap_pkt != NULL);
|
||||
free(pcap_pkt);
|
||||
}
|
||||
}
|
||||
|
||||
// pkts from packet_io_dumpfile_ingress
|
||||
void packet_io_dumpfile_drop(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
|
||||
{
|
||||
struct packet *pkt = NULL;
|
||||
for (int i = 0; i < nr_pkts; i++)
|
||||
{
|
||||
pkt = &pkts[i];
|
||||
|
||||
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
|
||||
|
||||
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_user_data(pkt);
|
||||
assert(pcap_pkt != NULL);
|
||||
free(pcap_pkt);
|
||||
}
|
||||
}
|
||||
|
||||
// pkts build by packet_new
|
||||
void packet_io_dumpfile_inject(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts)
|
||||
{
|
||||
struct packet *pkt = NULL;
|
||||
for (int i = 0; i < nr_pkts; i++)
|
||||
{
|
||||
pkt = &pkts[i];
|
||||
|
||||
ATOMIC_ADD(&handle->stat.inject_pkts, 1);
|
||||
ATOMIC_ADD(&handle->stat.inject_bytes, packet_get_len(pkt));
|
||||
|
||||
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_user_data(pkt);
|
||||
assert(pcap_pkt == NULL);
|
||||
|
||||
packet_free((struct packet *)packet_get_io_ctx(pkt));
|
||||
packet_free(pkt);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user