diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 2f103a3..9dfd711 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -20,9 +20,6 @@ int stellar_raw_packet_subscribe(struct stellar *st, plugin_on_packet_func *on_p void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); -// only send user build packet, can't send packet which come from network -// void stellar_send_build_packet(struct stellar *st, struct packet *pkt); - struct stellar *stellar_new(const char *toml_file); void stellar_run(struct stellar *st); void stellar_free(struct stellar *st); diff --git a/infra/packet_io/marsio_io.c b/infra/packet_io/marsio_io.c index cad5ce1..cf0b6d8 100644 --- a/infra/packet_io/marsio_io.c +++ b/infra/packet_io/marsio_io.c @@ -10,7 +10,7 @@ #include "packet_parser.h" #include "packet_internal.h" -#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__) +#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio io", format, ##__VA_ARGS__) struct marsio_io { @@ -329,7 +329,9 @@ uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { + int is_injected_packet = 0; int len; + char *ptr; struct packet *pkt; marsio_buff_t *mbuff; struct marsio_io *mr_io = (struct marsio_io *)handle; @@ -337,17 +339,29 @@ void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint1 for (uint16_t i = 0; i < nr_pkts; i++) { + is_injected_packet = 0; pkt = &pkts[i]; len = packet_get_raw_len(pkt); + mbuff = (marsio_buff_t *)packet_get_origin_ctx(pkt); + if (mbuff == NULL) + { + if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) + { + MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); + continue; + } + ptr = marsio_buff_append(mbuff, len); + memcpy(ptr, packet_get_raw_data(pkt), len); + + is_injected_packet = 1; + } + metadata_from_packet_to_mbuff(pkt, mbuff); + stat->pkts_tx++; stat->bytes_tx += len; - mbuff = (marsio_buff_t *)packet_get_origin_ctx(pkt); - assert(mbuff != NULL); - metadata_from_packet_to_mbuff(pkt, mbuff); - - if (marsio_buff_is_ctrlbuf(mbuff)) + if (packet_is_ctrl(pkt)) { stat->ctrl_pkts_tx++; stat->ctrl_bytes_tx += len; @@ -358,8 +372,16 @@ void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint1 stat->raw_bytes_tx += len; } - marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1); - packet_free(pkt); + if (is_injected_packet) + { + stat->pkts_injected++; + stat->bytes_injected += len; + marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); + } + else + { + marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1); + } } } @@ -380,53 +402,9 @@ void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_ stat->bytes_dropped += packet_get_raw_len(pkt); marsio_buff_free(mr_io->mr_ins, &mbuff, 1, 0, thr_idx); } - packet_free(pkt); } } -uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) -{ - int len; - char *ptr; - uint16_t nr_inject = 0; - struct packet *pkt; - marsio_buff_t *mbuff; - struct marsio_io *mr_io = (struct marsio_io *)handle; - struct packet_io_stat *stat = &mr_io->stat[thr_idx]; - - for (uint16_t i = 0; i < nr_pkts; i++) - { - pkt = &pkts[i]; - len = packet_get_raw_len(pkt); - - if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) - { - MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); - continue; - } - - stat->pkts_injected++; - stat->bytes_injected += len; - - stat->raw_pkts_tx++; - stat->raw_bytes_tx += len; - - stat->pkts_tx++; - stat->bytes_tx += len; - - nr_inject++; - - ptr = marsio_buff_append(mbuff, len); - memcpy(ptr, packet_get_raw_data(pkt), len); - metadata_from_packet_to_mbuff(pkt, mbuff); - - marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); - packet_free(pkt); - } - - return nr_inject; -} - void marsio_io_yield(void *handle, uint16_t thr_idx) { struct marsio_io *mr_io = (struct marsio_io *)handle; diff --git a/infra/packet_io/marsio_io.h b/infra/packet_io/marsio_io.h index afbb530..ab2f37c 100644 --- a/infra/packet_io/marsio_io.h +++ b/infra/packet_io/marsio_io.h @@ -15,7 +15,6 @@ int marsio_io_init(void *handle, uint16_t thr_idx); uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void marsio_io_yield(void *handle, uint16_t thr_idx); struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx); diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c index 45eda02..897f2c9 100644 --- a/infra/packet_io/packet_io.c +++ b/infra/packet_io/packet_io.c @@ -23,7 +23,6 @@ struct packet_io uint16_t (*ingress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void (*egress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); - uint16_t (*inject_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void (*yield_func)(void *handle, uint16_t thr_idx); struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx); }; @@ -261,7 +260,6 @@ struct packet_io *packet_io_new(const char *toml_file) pkt_io->ingress_func = marsio_io_ingress; pkt_io->egress_func = marsio_io_egress; pkt_io->drop_func = marsio_io_drop; - pkt_io->inject_func = marsio_io_inject; pkt_io->yield_func = marsio_io_yield; pkt_io->stat_func = marsio_io_stat; } @@ -274,7 +272,6 @@ struct packet_io *packet_io_new(const char *toml_file) pkt_io->ingress_func = pcap_io_ingress; pkt_io->egress_func = pcap_io_egress; pkt_io->drop_func = pcap_io_drop; - pkt_io->inject_func = pcap_io_inject; pkt_io->yield_func = pcap_io_yield; pkt_io->stat_func = pcap_io_stat; } @@ -331,11 +328,6 @@ void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *p pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts); } -uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) -{ - return pkt_io->inject_func(pkt_io->handle, thr_idx, pkts, nr_pkts); -} - void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx) { pkt_io->yield_func(pkt_io->handle, thr_idx); diff --git a/infra/packet_io/packet_io.h b/infra/packet_io/packet_io.h index d5b089b..efb9453 100644 --- a/infra/packet_io/packet_io.h +++ b/infra/packet_io/packet_io.h @@ -74,7 +74,6 @@ int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx); uint16_t packet_io_ingress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void packet_io_egress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx); struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx); diff --git a/infra/packet_io/pcap_io.c b/infra/packet_io/pcap_io.c index 137b9da..00220c9 100644 --- a/infra/packet_io/pcap_io.c +++ b/infra/packet_io/pcap_io.c @@ -407,10 +407,15 @@ uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, ui void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) { int len; + struct tuple6 tuple; struct packet *pkt = NULL; struct pcap_io *pcap_io = (struct pcap_io *)handle; struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; + char file[PATH_MAX] = {0}; + char src_addr[INET6_ADDRSTRLEN] = {0}; + char dst_addr[INET6_ADDRSTRLEN] = {0}; + for (uint16_t i = 0; i < nr_pkts; i++) { pkt = &pkts[i]; @@ -419,15 +424,51 @@ void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_ stat->pkts_tx++; stat->bytes_tx += len; - stat->raw_pkts_tx++; - stat->raw_bytes_tx += len; + if (packet_is_ctrl(pkt)) + { + stat->ctrl_pkts_tx++; + stat->ctrl_bytes_tx += len; + } + else + { + stat->raw_pkts_tx++; + stat->raw_bytes_tx += len; + } struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt); if (pcap_pkt) { free(pcap_pkt); } - packet_free(pkt); + else + { + stat->pkts_injected++; + stat->bytes_injected += len; + + memset(&tuple, 0, sizeof(struct tuple6)); + packet_get_innermost_tuple6(pkt, &tuple); + + if (tuple.addr_family == AF_INET) + { + inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); + inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); + } + else + { + inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); + } + snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); + + if (packet_dump_pcap(pkt, file) == -1) + { + PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file); + } + else + { + PCAP_IO_LOG_FATAL("dump inject packet: %s", file); + } + } } } @@ -451,61 +492,6 @@ void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t } } -uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts) -{ - uint16_t len; - struct tuple6 tuple; - struct packet *pkt = NULL; - struct pcap_io *pcap_io = (struct pcap_io *)handle; - struct packet_io_stat *stat = &pcap_io->stat[thr_idx]; - - char file[PATH_MAX] = {0}; - char src_addr[INET6_ADDRSTRLEN] = {0}; - char dst_addr[INET6_ADDRSTRLEN] = {0}; - - for (uint16_t i = 0; i < nr_pkts; i++) - { - pkt = &pkts[i]; - len = packet_get_raw_len(pkt); - - stat->pkts_injected++; - stat->bytes_injected += len; - - stat->raw_pkts_tx++; - stat->raw_bytes_tx += len; - - stat->pkts_tx++; - stat->bytes_tx += len; - - memset(&tuple, 0, sizeof(struct tuple6)); - packet_get_innermost_tuple6(pkt, &tuple); - - if (tuple.addr_family == AF_INET) - { - inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN); - inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN); - } - else - { - inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN); - inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN); - } - snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected); - - if (packet_dump_pcap(pkt, file) == -1) - { - PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file); - } - else - { - PCAP_IO_LOG_FATAL("dump inject packet: %s", file); - } - packet_free(pkt); - } - - return nr_pkts; -} - void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused))) { return; diff --git a/infra/packet_io/pcap_io.h b/infra/packet_io/pcap_io.h index 0bd03a8..6619f46 100644 --- a/infra/packet_io/pcap_io.h +++ b/infra/packet_io/pcap_io.h @@ -15,7 +15,6 @@ int pcap_io_init(void *handle, uint16_t thr_idx); uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); -uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts); void pcap_io_yield(void *handle, uint16_t thr_idx); struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx); diff --git a/infra/stellar_core.c b/infra/stellar_core.c index b3e4eec..0d85a7e 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -100,10 +100,12 @@ static void *worker_thread(void *arg) if (packet_get_action(pkt) == PACKET_ACTION_DROP) { packet_io_drop(pkt_io, thread_id, pkt, 1); + packet_free(pkt); } else { packet_io_egress(pkt_io, thread_id, pkt, 1); + packet_free(pkt); } stellar_polling_dispatch(polling_mgr); } @@ -281,40 +283,6 @@ void stellar_reload_log_level(struct stellar *st) } } -/****************************************************************************** - * Stellar Utility Function - ******************************************************************************/ - -// TODO -#if 0 -// only send user build packet, can't send packet which come from network -void stellar_send_build_packet(struct stellar *st, struct packet *pkt) -{ - uint16_t thread_id = stellar_module_manager_get_thread_id(st->st.mod_mgr); - struct packet_io *pkt_io = st->st.pkt_io; - struct session_manager_runtime *sess_mgr_rt = st->st.threads[thread_id].sess_mgr_rt; - session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt); - - if (packet_is_claim(pkt)) - { - PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order"); - assert(0); - return; - } - - if (packet_get_origin_ctx(pkt)) - { - // TODO - abort(); - packet_io_egress(pkt_io, thread_id, pkt, 1); - } - else - { - packet_io_inject(pkt_io, thread_id, pkt, 1); - } -} -#endif - struct logger *stellar_get_logger(struct stellar *st) { if (st) diff --git a/infra/version.map b/infra/version.map index e9008aa..a6163bc 100644 --- a/infra/version.map +++ b/infra/version.map @@ -42,7 +42,7 @@ global: stellar_session_plugin_dettach_current_session; stellar_packet_plugin_register; stellar_polling_plugin_register; - stellar_send_build_packet; + stellar_new; stellar_run; stellar_free;