#include #include #include #include #include #include #include "stellar.h" #include "file_scan.h" #include "packet_io.h" #include "lock_free_queue.h" #include "packet_io_dumpfile.h" #define MAX_PACKET_QUEUE_SIZE (4096 * 1000) struct packet_io_dumpfile { uint8_t nr_threads; char dumpfile_dir[256]; pcap_t *pcap; struct lock_free_queue *queue[MAX_THREAD_NUM]; struct packet_io_stat stat; uint64_t io_thread_need_exit; uint64_t io_thread_is_runing; }; struct pcap_pkt { char *data; int len; }; /****************************************************************************** * Private API ******************************************************************************/ static void pcap_handle(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes) { struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)user; // copy packet data to new memory struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen); if (pcap_pkt == NULL) { PACKET_IO_LOG_ERROR("unable to alloc packet"); return; } pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt); pcap_pkt->len = h->caplen; memcpy((char *)pcap_pkt->data, bytes, h->caplen); // calculate packet hash struct packet pkt; packet_parse(&pkt, pcap_pkt->data, pcap_pkt->len); uint64_t hash = packet_get_hash(&pkt, LDBC_METHOD_HASH_INT_IP_AND_EXT_IP, 0); // push packet to queue struct lock_free_queue *queue = handle->queue[hash % handle->nr_threads]; lock_free_queue_push(queue, pcap_pkt); if (ATOMIC_READ(&handle->io_thread_need_exit)) { PACKET_IO_LOG_STATE("dumpfile io thread need exit"); pcap_breakloop(handle->pcap); } } static int dumpfile_handle(const char *file, void *arg) { char resolved_path[256]; char pcap_errbuf[PCAP_ERRBUF_SIZE]; struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg; realpath(file, resolved_path); PACKET_IO_LOG_STATE("dumpfile %s in-processing", resolved_path) handle->pcap = pcap_open_offline(file, pcap_errbuf); if (handle->pcap == NULL) { PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf); return -1; } pcap_loop(handle->pcap, -1, pcap_handle, (u_char *)handle); pcap_close(handle->pcap); PACKET_IO_LOG_STATE("dumpfile %s processed", resolved_path) return 0; } static void *dumpfile_thread_cycle(void *arg) { struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)arg; ATOMIC_SET(&handle->io_thread_is_runing, 1); PACKET_IO_LOG_STATE("dumpfile io thread is running"); file_scan(handle->dumpfile_dir, dumpfile_handle, arg); PACKET_IO_LOG_STATE("dumpfile io thread is exiting"); ATOMIC_SET(&handle->io_thread_is_runing, 0); return NULL; } /****************************************************************************** * Public API ******************************************************************************/ struct packet_io_dumpfile *packet_io_dumpfile_new(struct packet_io_dumpfile_options *opts) { pthread_t tid; struct packet_io_dumpfile *handle = (struct packet_io_dumpfile *)calloc(1, sizeof(struct packet_io_dumpfile)); if (handle == NULL) { PACKET_IO_LOG_ERROR("unable to allocate memory for packet_io_dumpfile"); return NULL; } handle->nr_threads = opts->nr_threads; strncpy(handle->dumpfile_dir, opts->dumpfile_dir, sizeof(handle->dumpfile_dir)); for (uint16_t i = 0; i < handle->nr_threads; i++) { handle->queue[i] = lock_free_queue_new(MAX_PACKET_QUEUE_SIZE); if (handle->queue[i] == NULL) { PACKET_IO_LOG_ERROR("unable to create packet queue"); goto error_out; } } if (pthread_create(&tid, NULL, dumpfile_thread_cycle, (void *)handle) != 0) { PACKET_IO_LOG_ERROR("unable to create packet io thread"); goto error_out; } return handle; error_out: packet_io_dumpfile_free(handle); return NULL; } void packet_io_dumpfile_free(struct packet_io_dumpfile *handle) { if (handle) { ATOMIC_SET(&handle->io_thread_need_exit, 1); while (ATOMIC_READ(&handle->io_thread_is_runing)) { usleep(1000); } for (uint16_t i = 0; i < handle->nr_threads; i++) { lock_free_queue_free(handle->queue[i]); } free(handle); handle = NULL; } } struct packet_io_stat *packet_io_dumpfile_get_stat(struct packet_io_dumpfile *handle) { return &handle->stat; } int packet_io_dumpfile_init(struct packet_io_dumpfile *handle, uint16_t thread_id) { return 0; } int packet_io_dumpfile_ingress(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts) { struct lock_free_queue *queue = handle->queue[thread_id]; struct pcap_pkt *pcap_pkt = NULL; int nr_parsed = 0; for (int i = 0; i < nr_pkts; i++) { lock_free_queue_pop(queue, (void **)&pcap_pkt); if (pcap_pkt == NULL) { break; } else { ATOMIC_ADD(&handle->stat.rx_pkts, 1); ATOMIC_ADD(&handle->stat.rx_bytes, pcap_pkt->len); struct packet *pkt = &pkts[nr_parsed++]; memset(pkt, 0, sizeof(struct packet)); packet_parse(pkt, pcap_pkt->data, pcap_pkt->len); packet_set_user_data(pkt, pcap_pkt); packet_set_type(pkt, PACKET_TYPE_DATA); packet_set_action(pkt, PACKET_ACTION_FORWARD); } } return nr_parsed; } // pkts from packet_io_dumpfile_ingress void packet_io_dumpfile_egress(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts) { struct packet *pkt = NULL; for (int i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; ATOMIC_ADD(&handle->stat.tx_pkts, 1); ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt)); struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_user_data(pkt); assert(pcap_pkt != NULL); free(pcap_pkt); } } // pkts from packet_io_dumpfile_ingress void packet_io_dumpfile_drop(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts) { struct packet *pkt = NULL; for (int i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; ATOMIC_ADD(&handle->stat.drop_pkts, 1); ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt)); struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_user_data(pkt); assert(pcap_pkt != NULL); free(pcap_pkt); } } // pkts build by packet_new void packet_io_dumpfile_inject(struct packet_io_dumpfile *handle, uint16_t thread_id, struct packet *pkts, int nr_pkts) { struct packet *pkt = NULL; for (int i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; ATOMIC_ADD(&handle->stat.inject_pkts, 1); ATOMIC_ADD(&handle->stat.inject_bytes, packet_get_len(pkt)); assert(packet_get_user_data(pkt) == NULL); packet_free(pkt); } }