#include #include #include "stellar.h" #include "marsio.h" #include "packet_io.h" #include "packet_utils.h" #include "packet_io_marsio.h" struct packet_io_marsio { struct mr_instance *mr_ins; struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; struct packet_io_stat stat; }; /****************************************************************************** * Private API ******************************************************************************/ static void metadata_to_packet(marsio_buff_t *mbuff, struct packet *pkt) { packet_set_io_ctx(pkt, mbuff); pkt->sid_used = marsio_buff_get_sid_list(mbuff, pkt->sid_list, MAX_SID_NUM); pkt->route_len = marsio_buff_get_metadata(mbuff, MR_BUFF_ROUTE_CTX, pkt->route_ctx, MAX_ROUTE_LEN); marsio_buff_get_metadata(mbuff, MR_BUFF_SESSION_ID, &(pkt->session_id), sizeof(pkt->session_id)); marsio_buff_get_metadata(mbuff, MR_BUFF_DIR, &(pkt->direction), sizeof(pkt->direction)); packet_set_type(pkt, marsio_buff_is_ctrlbuf(mbuff) ? PACKET_TYPE_CTRL : PACKET_TYPE_DATA); packet_set_action(pkt, PACKET_ACTION_FORWARD); } static void metadata_to_mbuff(marsio_buff_t *mbuff, struct packet *pkt) { marsio_buff_set_sid_list(mbuff, pkt->sid_list, pkt->sid_used); marsio_buff_set_metadata(mbuff, MR_BUFF_ROUTE_CTX, pkt->route_ctx, pkt->route_len); marsio_buff_set_metadata(mbuff, MR_BUFF_SESSION_ID, &(pkt->session_id), sizeof(pkt->session_id)); marsio_buff_set_metadata(mbuff, MR_BUFF_DIR, &(pkt->direction), sizeof(pkt->direction)); if (packet_get_type(pkt) == PACKET_TYPE_CTRL) { marsio_buff_set_ctrlbuf(mbuff); } } static int is_keepalive_packet(const char *data, int len) { if (data == NULL || len < (int)(sizeof(struct ethhdr))) { return 0; } struct ethhdr *eth_hdr = (struct ethhdr *)data; if (eth_hdr->h_proto == 0xAAAA) { return 1; } else { return 0; } } /****************************************************************************** * Public API ******************************************************************************/ struct packet_io_marsio *packet_io_marsio_new(struct packet_io_marsio_confg *config) { int opt = 1; cpu_set_t coremask; CPU_ZERO(&coremask); for (uint8_t i = 0; i < config->nr_threads; i++) { CPU_SET(config->cpu_mask[i], &coremask); } struct packet_io_marsio *handle = (struct packet_io_marsio *)calloc(1, sizeof(struct packet_io_marsio)); if (handle == NULL) { PACKET_IO_LOG_ERROR("unable to allocate memory for packet_io_marsio"); return NULL; } handle->mr_ins = marsio_create(); if (handle->mr_ins == NULL) { PACKET_IO_LOG_ERROR("unable to create marsio instance"); goto error_out; } marsio_option_set(handle->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(coremask)); marsio_option_set(handle->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)); if (marsio_init(handle->mr_ins, config->app_symbol) != 0) { PACKET_IO_LOG_ERROR("unable to init marsio instance"); goto error_out; } handle->mr_dev = marsio_open_device(handle->mr_ins, config->dev_symbol, config->nr_threads, config->nr_threads); if (handle->mr_dev == NULL) { PACKET_IO_LOG_ERROR("unable to open marsio device"); goto error_out; } handle->mr_path = marsio_sendpath_create_by_vdev(handle->mr_dev); if (handle->mr_path == NULL) { PACKET_IO_LOG_ERROR("unable to create marsio sendpath"); goto error_out; } return handle; error_out: packet_io_marsio_free(handle); return NULL; } void packet_io_marsio_free(struct packet_io_marsio *handle) { if (handle) { if (handle->mr_path) { marsio_sendpath_destory(handle->mr_path); handle->mr_path = NULL; } if (handle->mr_dev) { marsio_close_device(handle->mr_dev); handle->mr_dev = NULL; } if (handle->mr_ins) { marsio_destory(handle->mr_ins); handle->mr_ins = NULL; } free(handle); handle = NULL; } } struct packet_io_stat *packet_io_marsio_stat(struct packet_io_marsio *handle) { return &handle->stat; } int packet_io_marsio_init(struct packet_io_marsio *handle, uint16_t thread_id) { if (marsio_thread_init(handle->mr_ins) != 0) { PACKET_IO_LOG_ERROR("unable to init marsio thread"); return -1; } return 0; } int packet_io_marsio_recv(struct packet_io_marsio *handle, uint16_t thread_id, struct packet **pkt) { marsio_buff_t *rx_buff; marsio_buff_t *rx_buffs[1]; thread_local struct packet thd_pkt; retry: if (marsio_recv_burst(handle->mr_dev, thread_id, rx_buffs, 1) <= 0) { *pkt = NULL; return -1; } rx_buff = rx_buffs[0]; char *data = marsio_buff_mtod(rx_buff); int len = marsio_buff_datalen(rx_buff); ATOMIC_ADD(&handle->stat.rx_pkts, 1); ATOMIC_ADD(&handle->stat.rx_bytes, len); if (is_keepalive_packet(data, len)) { ATOMIC_ADD(&handle->stat.keepalive_pkts, 1); ATOMIC_ADD(&handle->stat.keepalive_bytes, len); ATOMIC_ADD(&handle->stat.tx_pkts, 1); ATOMIC_ADD(&handle->stat.tx_bytes, len); marsio_send_burst(handle->mr_path, thread_id, rx_buffs, 1); goto retry; } metadata_to_packet(rx_buff, &thd_pkt); packet_parse(&thd_pkt, data, len); *pkt = &thd_pkt; return 0; } void packet_io_marsio_send(struct packet_io_marsio *handle, uint16_t thread_id, struct packet *pkt) { marsio_buff_t *tx_buff = (marsio_buff_t *)packet_get_io_ctx(pkt); if (packet_get_action(pkt) == PACKET_ACTION_DROP) { if (tx_buff) { ATOMIC_ADD(&handle->stat.drop_pkts, 1); ATOMIC_ADD(&handle->stat.drop_bytes, packet_get_len(pkt)); marsio_buff_free(handle->mr_ins, &tx_buff, 1, 0, thread_id); } else { // do nothing } } else { if (tx_buff == NULL) { if (marsio_buff_malloc_global(handle->mr_ins, &tx_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) { PACKET_IO_LOG_ERROR("unable to alloc tx buffer"); return; } ATOMIC_ADD(&handle->stat.inject_pkts, 1); ATOMIC_ADD(&handle->stat.inject_bytes, packet_get_len(pkt)); char *dst = marsio_buff_append(tx_buff, packet_get_len(pkt)); memcpy(dst, packet_get_data(pkt), packet_get_len(pkt)); } ATOMIC_ADD(&handle->stat.tx_pkts, 1); ATOMIC_ADD(&handle->stat.tx_bytes, packet_get_len(pkt)); metadata_to_mbuff(tx_buff, pkt); marsio_send_burst(handle->mr_path, thread_id, &tx_buff, 1); } packet_free(pkt); }