This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
stellar-stellar/src/packet_io/packet_io_dumpfile.cpp

192 lines
5.0 KiB
C++
Raw Normal View History

#include <pcap/pcap.h>
#include <pthread.h>
#include <unistd.h>
#include "stellar.h"
#include "file_scan.h"
#include "packet_io.h"
#include "packet_utils.h"
#include "packet_queue.h"
#include "packet_io_dumpfile.h"
#define MAX_PACKET_QUEUE_SIZE (4096 * 1000)
struct packet_io_dumpfile
{
uint8_t nr_threads;
char dumpfile_dir[256];
pcap_t *pcap;
struct packet_queue *queue[MAX_THREAD_NUM];
struct packet_io_stat stat;
uint64_t io_thread_need_exit;
uint64_t io_thread_is_runing;
};
/******************************************************************************
* Private API
******************************************************************************/
static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
{
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)user;
struct packet *pkt = packet_new(h->caplen);
if (pkt == NULL)
{
PACKET_IO_LOG_ERROR("unable to alloc packet");
return;
}
memcpy((char *)pkt->data_ptr, bytes, h->caplen);
packet_parse(pkt, pkt->data_ptr, h->caplen);
uint64_t hash = packet_get_hash(pkt, LDBC_METHOD_HASH_INT_IP_AND_EXT_IP, 0);
struct packet_queue *queue = handle->queue[hash % handle->nr_threads];
packet_queue_push(queue, pkt);
if (ATOMIC_READ(&handle->io_thread_need_exit))
{
PACKET_IO_LOG_STATE("dumpfile io thread need exit");
pcap_breakloop(handle->pcap);
}
}
static int dumpfile_handle(const char *file, void *arg)
{
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg;
PACKET_IO_LOG_STATE("dumpfile %s inprocessing", file)
handle->pcap = pcap_open_offline(file, NULL);
if (handle->pcap == NULL)
{
PACKET_IO_LOG_ERROR("unable to open pcap file: %s", file);
return -1;
}
pcap_loop(handle->pcap, -1, pcap_handle, (u_char *)handle);
pcap_close(handle->pcap);
PACKET_IO_LOG_STATE("dumpfile %s processed", file)
return 0;
}
static void *dumpfile_thread_cycle(void *arg)
{
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg;
ATOMIC_SET(&handle->io_thread_is_runing, 1);
PACKET_IO_LOG_STATE("dumpfile io thread is running");
file_scan(handle->dumpfile_dir, dumpfile_handle, arg);
PACKET_IO_LOG_STATE("dumpfile io thread is exiting");
ATOMIC_SET(&handle->io_thread_is_runing, 0);
return NULL;
}
/******************************************************************************
* Public API
******************************************************************************/
struct packet_io_dumpfile *packet_io_dumpfile_create(struct packet_io_dumpfile_confg *config)
{
pthread_t tid;
struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)calloc(1, sizeof(struct packet_io_dumpfile));
if (handle == NULL)
{
PACKET_IO_LOG_ERROR("unable to allocate memory for packet_io_dumpfile");
return NULL;
}
handle->nr_threads = config->nr_threads;
strncpy(handle->dumpfile_dir, config->dumpfile_dir, sizeof(handle->dumpfile_dir));
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
handle->queue[i] = packet_queue_create(MAX_PACKET_QUEUE_SIZE);
if (handle->queue[i] == NULL)
{
PACKET_IO_LOG_ERROR("unable to create packet queue");
goto error_out;
}
}
if (pthread_create(&tid, NULL, dumpfile_thread_cycle, (void *)handle) != 0)
{
PACKET_IO_LOG_ERROR("unable to create packet io thread");
goto error_out;
}
return handle;
error_out:
packet_io_dumpfile_destory(handle);
return NULL;
}
void packet_io_dumpfile_destory(struct packet_io_dumpfile *handle)
{
if (handle)
{
ATOMIC_SET(&handle->io_thread_need_exit, 1);
while (ATOMIC_READ(&handle->io_thread_is_runing))
{
usleep(1000);
}
for (uint16_t i = 0; i < handle->nr_threads; i++)
{
packet_queue_destory(handle->queue[i]);
}
free(handle);
handle = NULL;
}
}
struct packet_io_stat *packet_io_dumpfile_stat(struct packet_io_dumpfile *handle)
{
return &handle->stat;
}
int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_id)
{
return 0;
}
int packet_io_dumpfile_recv(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet **pkt)
{
struct packet_queue *queue = handle->queue[thread_id];
packet_queue_pop(queue, pkt);
if (*pkt == NULL)
{
return -1;
}
else
{
ATOMIC_ADD(&handle->stat.rx_pkts, 1);
ATOMIC_ADD(&handle->stat.rx_bytes, packet_get_len(*pkt));
return 0;
}
}
void packet_io_dumpfile_send(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkt)
{
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
ATOMIC_ADD(&handle->stat.drop_pkts, 1);
ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt));
}
else
{
ATOMIC_ADD(&handle->stat.tx_pkts, 1);
ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt));
}
packet_free(pkt);
}