#include #include #include "marsio.h" #include "mars_io.h" #include "packet_pool.h" #include "packet_parser.h" #include "ip_reassembly.h" #include "log_internal.h" #include "utils_internal.h" #include "packet_internal.h" #define MARS_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "mars IO", format, ##__VA_ARGS__) #define MARS_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "mars IO", format, ##__VA_ARGS__) struct mars_io_cfg { char app_symbol[64]; char dev_symbol[64]; uint16_t thread_num; // range [1, MAX_THREAD_NUM] uint64_t cpu_mask[MAX_THREAD_NUM]; uint64_t idle_yield_ms; // range: [0, 6000] (ms) // packet pool uint64_t capacity; // range: [1, 4294967295] // ip reassembly uint64_t fail_action; // 0: bypass, 1: drop uint64_t timeout_ms; // range: [1, 60000] (ms) uint64_t frag_queue_num; // range: [1, 4294967295 uint64_t frag_queue_size; // range: [2, 65535] }; struct mars_io { struct mars_io_cfg *cfg; struct mr_instance *mr_ins; struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; struct packet_pool *pool[MAX_THREAD_NUM]; struct packet_io_stat stat[MAX_THREAD_NUM]; struct ip_reassembly *ip_reass[MAX_THREAD_NUM]; }; /****************************************************************************** * Private API ******************************************************************************/ static struct mars_io_cfg *mars_io_cfg_new(const char *toml_file) { struct mars_io_cfg *cfg = (struct mars_io_cfg *)calloc(1, sizeof(struct mars_io_cfg)); if (cfg == NULL) { return NULL; } int ret = 0; int num = 0; ret += load_toml_str_config(toml_file, "packet_io.app_symbol", cfg->app_symbol); ret += load_toml_str_config(toml_file, "packet_io.dev_symbol", cfg->dev_symbol); ret += load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&cfg->thread_num, 1, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.idle_yield_ms", &cfg->idle_yield_ms, 0, 60000); num = load_toml_array_config(toml_file, "packet_io.cpu_mask", cfg->cpu_mask, MAX_THREAD_NUM); ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.fail_action", &cfg->fail_action, 0, 1); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.timeout_ms", &cfg->timeout_ms, 1, 60000); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_num", &cfg->frag_queue_num, 1, 4294967295); ret += load_toml_integer_config(toml_file, "packet_io.ip_reassembly.frag_queue_size", &cfg->frag_queue_size, 2, 65535); if (ret != 0 || num != (int)cfg->thread_num) { free(cfg); return NULL; } else { return cfg; } } static void mars_io_cfg_free(struct mars_io_cfg *cfg) { if (cfg) { free(cfg); cfg = NULL; } } static void mars_io_cfg_print(const struct mars_io_cfg *cfg) { if (cfg) { MARS_IO_LOG_INFO("packet_io.app_symbol : %s", cfg->app_symbol); MARS_IO_LOG_INFO("packet_io.dev_symbol : %s", cfg->dev_symbol); MARS_IO_LOG_INFO("packet_io.idle_yield_ms : %lu", cfg->idle_yield_ms); MARS_IO_LOG_INFO("packet_io.thread_num : %lu", cfg->thread_num); for (uint64_t i = 0; i < cfg->thread_num; i++) { MARS_IO_LOG_INFO("packet_io.cpu_mask[%03d] : %d", i, cfg->cpu_mask[i]); } MARS_IO_LOG_INFO("packet_io.packet_pool.capacity : %lu", cfg->capacity); MARS_IO_LOG_INFO("packet_io.ip_reassembly.fail_action : %lu", cfg->fail_action); MARS_IO_LOG_INFO("packet_io.ip_reassembly.timeout_ms : %lu", cfg->timeout_ms); MARS_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_num : %lu", cfg->frag_queue_num); MARS_IO_LOG_INFO("packet_io.ip_reassembly.frag_queue_size : %lu", cfg->frag_queue_size); } } static void packet_set_metadata(struct packet *pkt, marsio_buff_t *mbuff) { struct route_ctx route_ctx = {}; route_ctx.used = marsio_buff_get_metadata(mbuff, MR_BUFF_ROUTE_CTX, &route_ctx.data, sizeof(route_ctx.data)); if (route_ctx.used > 0) { packet_set_route_ctx(pkt, &route_ctx); } else { MARS_IO_LOG_ERROR("failed to get route ctx"); } struct sids sids = {}; sids.used = marsio_buff_get_sid_list(mbuff, sids.sid, sizeof(sids.sid) / sizeof(sids.sid[0])); if (sids.used > 0) { packet_set_sids(pkt, &sids); } else { MARS_IO_LOG_ERROR("failed to get sids"); } uint64_t session_id = 0; if (marsio_buff_get_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) == sizeof(session_id)) { packet_set_session_id(pkt, session_id); } else { MARS_IO_LOG_ERROR("failed to get session id"); } // TODO #if 0 if (marsio_buff_get_metadata(mbuff, MR_BUFF_DOMAIN, &domain, sizeof(domain)) == sizeof(domain)) { packet_set_domain(pkt, domain); } else { MARS_IO_LOG_ERROR("failed to get domain id"); } #endif uint16_t link_id = 0; if (marsio_buff_get_metadata(mbuff, MR_BUFF_LINK_ID, &link_id, sizeof(link_id)) == sizeof(link_id)) { packet_set_link_id(pkt, link_id); } else { MARS_IO_LOG_ERROR("failed to get link id"); } packet_set_ctrl(pkt, marsio_buff_is_ctrlbuf(mbuff)); enum packet_direction direction = PACKET_DIRECTION_OUTGOING; if (marsio_buff_get_metadata(mbuff, MR_BUFF_DIR, &direction, sizeof(direction)) == sizeof(direction)) { packet_set_direction(pkt, direction); } else { MARS_IO_LOG_ERROR("failed to get direction"); } packet_set_action(pkt, PACKET_ACTION_FORWARD); // TODO const struct timeval tv = {}; packet_set_timeval(pkt, &tv); } static void mbuff_set_metadata(marsio_buff_t *mbuff, struct packet *pkt) { const struct route_ctx *route_ctx = packet_get_route_ctx(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_ROUTE_CTX, (void *)route_ctx->data, route_ctx->used) != 0) { MARS_IO_LOG_ERROR("failed to set route ctx"); } const struct sids *sids = packet_get_sids(pkt); if (marsio_buff_set_sid_list(mbuff, (sid_t *)sids->sid, sids->used) != 0) { MARS_IO_LOG_ERROR("failed to set sids"); } uint64_t session_id = packet_get_session_id(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) != 0) { MARS_IO_LOG_ERROR("failed to set session id"); } // TODO #if 0 uint64_t domain = packet_get_domain(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_DOMAIN, &domain, sizeof(domain)) != 0) { MARS_IO_LOG_ERROR("failed to set domain id"); } #endif uint16_t link_id = packet_get_link_id(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_LINK_ID, &link_id, sizeof(link_id)) != 0) { MARS_IO_LOG_ERROR("failed to set link id"); } if (packet_is_ctrl(pkt)) { marsio_buff_set_ctrlbuf(mbuff); } enum packet_direction direction = packet_get_direction(pkt); if (marsio_buff_set_metadata(mbuff, MR_BUFF_DIR, &direction, sizeof(direction)) != 0) { MARS_IO_LOG_ERROR("failed to set direction"); } } static int is_keepalive_packet(const char *data, int len) { if (data == NULL || len < (int)(sizeof(struct ethhdr))) { return 0; } struct ethhdr *eth_hdr = (struct ethhdr *)data; if (eth_hdr->h_proto == 0xAAAA) { return 1; } else { return 0; } } static void origin_free_cb(struct packet *pkt, void *args) { struct mars_io *mars_io = (struct mars_io *)args; struct packet_origin *origin = packet_get_origin(pkt); marsio_buff_t *mbuff = (marsio_buff_t *)origin->ctx; struct packet_io_stat *stat = &mars_io->stat[origin->thr_idx]; struct packet_pool *pool = mars_io->pool[origin->thr_idx]; stat->pkts_user_freed++; stat->bytes_user_freed += packet_get_raw_len(pkt); marsio_buff_free(mars_io->mr_ins, &mbuff, 1, 0, origin->thr_idx); packet_pool_push(pool, pkt); } static struct packet *recv_packet(struct mars_io *mars_io, marsio_buff_t *mbuff, uint16_t thr_idx) { struct packet_io_stat *stat = &mars_io->stat[thr_idx]; struct ip_reassembly *ip_reass = mars_io->ip_reass[thr_idx]; struct packet_pool *pool = mars_io->pool[thr_idx]; int len = marsio_buff_datalen(mbuff); char *data = marsio_buff_mtod(mbuff); stat->pkts_rx++; stat->bytes_rx += len; if (is_keepalive_packet(data, len)) { stat->keep_alive_pkts++; stat->keep_alive_bytes += len; stat->pkts_tx++; stat->bytes_tx += len; marsio_send_burst(mars_io->mr_path, thr_idx, &mbuff, 1); return NULL; } else { struct packet *pkt = packet_pool_pop(pool); assert(pkt != NULL); struct packet_origin origin = { .type = ORIGIN_TYPE_MR, .ctx = mbuff, .cb = origin_free_cb, .args = mars_io, .thr_idx = thr_idx, }; packet_parse(pkt, data, len); packet_set_metadata(pkt, mbuff); packet_set_origin(pkt, &origin); if (packet_is_fragment(pkt)) { return ip_reassembly_defrag(ip_reass, pkt, clock_get_real_time_ms()); } else { return pkt; } } } static void send_packet(struct mars_io *mars_io, struct packet *pkt, uint16_t thr_idx) { marsio_buff_t *mbuff = NULL; struct packet_io_stat *stat = &mars_io->stat[thr_idx]; int len = packet_get_raw_len(pkt); struct packet_origin *origin = packet_get_origin(pkt); if (origin->type == ORIGIN_TYPE_MR) { mbuff = (marsio_buff_t *)origin->ctx; mbuff_set_metadata(mbuff, pkt); marsio_send_burst(mars_io->mr_path, thr_idx, &mbuff, 1); packet_pool_push(mars_io->pool[thr_idx], pkt); } else { if (marsio_buff_malloc_global(mars_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) { MARS_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet"); packet_free(pkt); return; } else { stat->pkts_injected++; stat->bytes_injected += len; char *ptr = marsio_buff_append(mbuff, len); memcpy(ptr, packet_get_raw_data(pkt), len); mbuff_set_metadata(mbuff, pkt); marsio_send_burst_with_options(mars_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH); packet_free(pkt); } } stat->pkts_tx++; stat->bytes_tx += len; } static void drop_packet(struct mars_io *mars_io, struct packet *pkt, uint16_t thr_idx) { struct packet_io_stat *stat = &mars_io->stat[thr_idx]; int len = packet_get_raw_len(pkt); struct packet_origin *origin = packet_get_origin(pkt); stat->pkts_dropped++; stat->bytes_dropped += len; if (origin->type == ORIGIN_TYPE_MR) { marsio_buff_t *mbuff = (marsio_buff_t *)origin->ctx; marsio_buff_free(mars_io->mr_ins, &mbuff, 1, 0, thr_idx); packet_pool_push(mars_io->pool[thr_idx], pkt); } else { packet_free(pkt); } } /****************************************************************************** * Public API ******************************************************************************/ void *mars_io_new(const char *toml_file) { int opt = 1; cpu_set_t coremask; CPU_ZERO(&coremask); struct mars_io *mars_io = (struct mars_io *)calloc(1, sizeof(struct mars_io)); if (mars_io == NULL) { MARS_IO_LOG_ERROR("unable to allocate memory for mars_io"); return NULL; } mars_io->cfg = mars_io_cfg_new(toml_file); if (mars_io->cfg == NULL) { MARS_IO_LOG_ERROR("unable to create mars_io_cfg"); goto error_out; } mars_io_cfg_print(mars_io->cfg); for (uint16_t i = 0; i < mars_io->cfg->thread_num; i++) { CPU_SET(mars_io->cfg->cpu_mask[i], &coremask); } mars_io->mr_ins = marsio_create(); if (mars_io->mr_ins == NULL) { MARS_IO_LOG_ERROR("unable to create marsio instance"); goto error_out; } marsio_option_set(mars_io->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(cpu_set_t)); marsio_option_set(mars_io->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)); if (marsio_init(mars_io->mr_ins, mars_io->cfg->app_symbol) != 0) { MARS_IO_LOG_ERROR("unable to init marsio instance"); goto error_out; } mars_io->mr_dev = marsio_open_device(mars_io->mr_ins, mars_io->cfg->dev_symbol, mars_io->cfg->thread_num, mars_io->cfg->thread_num); if (mars_io->mr_dev == NULL) { MARS_IO_LOG_ERROR("unable to open marsio device"); goto error_out; } mars_io->mr_path = marsio_sendpath_create_by_vdev(mars_io->mr_dev); if (mars_io->mr_path == NULL) { MARS_IO_LOG_ERROR("unable to create marsio sendpath"); goto error_out; } for (uint64_t i = 0; i < mars_io->cfg->thread_num; i++) { mars_io->pool[i] = packet_pool_new(mars_io->cfg->capacity); if (mars_io->pool[i] == NULL) { MARS_IO_LOG_ERROR("unable to create packet pool"); goto error_out; } mars_io->ip_reass[i] = ip_reassembly_new(mars_io->cfg->timeout_ms, mars_io->cfg->frag_queue_num, mars_io->cfg->frag_queue_size); if (mars_io->ip_reass[i] == NULL) { MARS_IO_LOG_ERROR("unable to create ip reassembly"); goto error_out; } } return mars_io; error_out: mars_io_free(mars_io); return NULL; } void mars_io_free(void *handle) { struct mars_io *mars_io = (struct mars_io *)handle; if (mars_io) { for (uint64_t i = 0; i < mars_io->cfg->thread_num; i++) { ip_reassembly_free(mars_io->ip_reass[i]); packet_pool_free(mars_io->pool[i]); } if (mars_io->mr_path) { marsio_sendpath_destory(mars_io->mr_path); mars_io->mr_path = NULL; } if (mars_io->mr_dev) { marsio_close_device(mars_io->mr_dev); mars_io->mr_dev = NULL; } if (mars_io->mr_ins) { marsio_destory(mars_io->mr_ins); mars_io->mr_ins = NULL; } mars_io_cfg_free(mars_io->cfg); free(mars_io); mars_io = NULL; } } int mars_io_isbreak(void *handle __attribute__((unused))) { return 0; } int mars_io_init(void *handle, uint16_t thr_idx __attribute__((unused))) { struct mars_io *mars_io = (struct mars_io *)handle; if (marsio_thread_init(mars_io->mr_ins) != 0) { MARS_IO_LOG_ERROR("unable to init marsio thread"); return -1; } else { return 0; } } int mars_io_recv(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; marsio_buff_t *mbuff = NULL; marsio_buff_t *mbuffs[RX_BURST_MAX]; struct mars_io *mars_io = (struct mars_io *)handle; int ret = 0; int nr_recv = marsio_recv_burst(mars_io->mr_dev, thr_idx, mbuffs, MIN(RX_BURST_MAX, nr_pkts)); for (int i = 0; i < nr_recv; i++) { mbuff = mbuffs[i]; pkt = recv_packet(mars_io, mbuff, thr_idx); if (pkt) { pkts[ret++] = pkt; } } return ret; } void mars_io_send(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *frag = NULL; struct packet *pkt = NULL; struct mars_io *mars_io = (struct mars_io *)handle; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; if (packet_is_defraged(pkt)) { while ((frag = packet_pop_frag(pkt))) { send_packet(mars_io, frag, thr_idx); } packet_free(pkt); } else { send_packet(mars_io, pkt, thr_idx); } pkts[i] = NULL; } } void mars_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts[], int nr_pkts) { struct packet *pkt = NULL; struct packet *frag = NULL; struct mars_io *mars_io = (struct mars_io *)handle; for (int i = 0; i < nr_pkts; i++) { pkt = pkts[i]; if (packet_is_defraged(pkt)) { while ((frag = packet_pop_frag(pkt))) { drop_packet(mars_io, frag, thr_idx); } packet_free(pkt); } else { drop_packet(mars_io, pkt, thr_idx); } pkts[i] = NULL; } } void mars_io_yield(void *handle, uint16_t thr_idx) { struct mars_io *mars_io = (struct mars_io *)handle; struct mr_vdev *vdevs[1] = { mars_io->mr_dev, }; marsio_poll_wait(mars_io->mr_ins, vdevs, 1, thr_idx, mars_io->cfg->idle_yield_ms); } void mars_io_polling(void *handle, uint16_t thr_idx) { struct mars_io *mars_io = (struct mars_io *)handle; struct ip_reassembly *ip_reass = mars_io->ip_reass[thr_idx]; struct packet *pkt = NULL; uint64_t now_ms = clock_get_real_time_ms(); while ((pkt = ip_reassembly_clean(ip_reass, now_ms))) { if (mars_io->cfg->fail_action == 0) { send_packet(mars_io, pkt, thr_idx); } else { drop_packet(mars_io, pkt, thr_idx); } } // TODO // output stat } struct packet_io_stat *mars_io_stat(void *handle, uint16_t thr_idx) { struct mars_io *mars_io = (struct mars_io *)handle; return &mars_io->stat[thr_idx]; }