#include #include #include #include #include #include #include #include "log.h" #include "sce.h" #include "utils.h" #include "g_vxlan.h" #include "sf_metrics.h" #include "ctrl_packet.h" #include "global_metrics.h" /****************************************************************************** * struct ******************************************************************************/ #define RX_BURST_MAX 128 struct config { int bypass_all_traffic; int rx_burst_max; char app_symbol[256]; char dev_endpoint[256]; char dev_nf_interface[256]; char dev_endpoint_src_ip[16]; char dev_endpoint_src_mac[32]; }; struct device { struct mr_vdev *mr_dev; struct mr_sendpath *mr_path; }; struct packet_io { int thread_num; struct mr_instance *instance; struct device dev_nf_interface; struct device dev_endpoint; struct config config; }; /****************************************************************************** * metadata ******************************************************************************/ // return 0 : success // return -1 : error int mbuff_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta) { memset(meta, 0, sizeof(struct metadata)); meta->raw_len = marsio_buff_datalen(rx_buff); meta->raw_data = marsio_buff_mtod(rx_buff); if (meta->raw_data == NULL || meta->raw_len == 0) { LOG_ERROR("%s: unable to get raw_data from metadata", LOG_TAG_PKTIO); return -1; } if (marsio_buff_get_metadata(rx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) <= 0) { LOG_ERROR("%s: unable to get session_id from metadata", LOG_TAG_PKTIO); return -1; } // 1: E2I // 0: I2E if (marsio_buff_get_metadata(rx_buff, MR_BUFF_DIR, &(meta->is_e2i_dir), sizeof(meta->is_e2i_dir)) <= 0) { LOG_ERROR("%s: unable to get buff_dir from metadata", LOG_TAG_PKTIO); return -1; } if (marsio_buff_is_ctrlbuf(rx_buff)) { meta->is_ctrl_pkt = 1; if (marsio_buff_get_metadata(rx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) <= 0) { LOG_ERROR("%s: unable to get l7offset from metadata", LOG_TAG_PKTIO); return -1; } } else { meta->is_ctrl_pkt = 0; #if 0 if (marsio_buff_get_metadata(rx_buff, MR_IS_DECRYPTED, &(meta->is_decrypted), sizeof(meta->is_decrypted)) <= 0) { LOG_ERROR("%s: unable to get is_decrypted from metadata", LOG_TAG_PKTIO); return -1; } #endif } meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0])); if (meta->sids.num < 0) { LOG_ERROR("%s: unable to get sid_list from metadata", LOG_TAG_PKTIO); return -1; } meta->route_ctx.len = marsio_buff_get_metadata(rx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, sizeof(meta->route_ctx.data)); if (meta->route_ctx.len <= 0) { LOG_ERROR("%s: unable to get route_ctx from metadata", LOG_TAG_PKTIO); return -1; } return 0; } // return 0 : success // return -1 : error int mbuff_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta) { if (meta->session_id) { if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) != 0) { LOG_ERROR("%s: unable to set session_id for metadata", LOG_TAG_PKTIO); return -1; } } // need't set MR_BUFF_DIR, set MR_BUFF_ROUTE_CTX instead if (meta->is_ctrl_pkt) { marsio_buff_set_ctrlbuf(tx_buff); if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0) { LOG_ERROR("%s: unable to set l7offset for metadata", LOG_TAG_PKTIO); return -1; } } else { // TODO #if 0 if (marsio_buff_set_metadata(tx_buff, MR_IS_DECRYPTED, &(meta->is_decrypted), sizeof(meta->is_decrypted)) != 0) { LOG_ERROR("%s: unable to set is_decrypted for metadata", LOG_TAG_PKTIO); return -1; } #endif } if (meta->sids.num > 0) { if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0) { LOG_ERROR("%s: unable to set sid_list for metadata", LOG_TAG_PKTIO); return -1; } } if (meta->route_ctx.len > 0) { if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, meta->route_ctx.len) != 0) { LOG_ERROR("%s: unable to set route_ctx for metadata", LOG_TAG_PKTIO); return -1; } } return 0; } // return 0 : not keepalive packet // return 1 : is keepalive packet static int is_downlink_keepalive_packet(marsio_buff_t *rx_buff) { int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr))) { return 0; } struct ethhdr *eth_hdr = (struct ethhdr *)raw_data; if (eth_hdr->h_proto == 0xAAAA) { return 1; } else { return 0; } } // return 0 : not keepalive packet // return 1 : is keepalive packet static int is_uplink_keepalive_packet(marsio_buff_t *rx_buff) { int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr))) { return 0; } struct ethhdr *eth_hdr = (struct ethhdr *)raw_data; if (eth_hdr->h_proto != htons(ETH_P_IP)) { return 0; } struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr)); if (ip_hdr->ip_p != IPPROTO_UDP) { return 0; } struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip)); if (udp_hdr->uh_dport != htons(3784)) { return 0; } return 1; } /****************************************************************************** * search session ctx ******************************************************************************/ // return !NULL // return NULL static struct session_ctx *raw_packet_search_session(struct session_table *table, const char *raw_data, int raw_len, uint64_t session_id) { struct addr_tuple4 inner_addr; struct addr_tuple4 reverse_addr; struct raw_pkt_parser raw_parser; memset(&inner_addr, 0, sizeof(struct addr_tuple4)); memset(&reverse_addr, 0, sizeof(struct addr_tuple4)); raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8); raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr); addr_tuple4_reverse(&inner_addr, &reverse_addr); struct session_node *node = session_table_search_by_id(table, session_id); if (node == NULL) { return NULL; } struct session_ctx *session_ctx = (struct session_ctx *)node->value; if (memcmp(&session_ctx->inner_tuple4, &inner_addr, sizeof(struct addr_tuple4)) != 0 && memcmp(&session_ctx->inner_tuple4, &reverse_addr, sizeof(struct addr_tuple4)) != 0) { char *addr_str = addr_tuple4_to_str(&inner_addr); LOG_ERROR("%s: unexpected raw packet, session %lu expected address tuple4 is %s, but current packet's address tuple4 is %s, bypass !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr, addr_str); free(addr_str); return NULL; } return session_ctx; } // return !NULL // return NULL static struct session_ctx *inject_packet_search_session(struct session_table *table, const char *raw_data, int raw_len) { struct addr_tuple4 inner_addr; struct raw_pkt_parser raw_parser; memset(&inner_addr, 0, sizeof(struct addr_tuple4)); raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8); raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr); struct session_node *node = session_table_search_by_addr(table, &inner_addr); if (node == NULL) { char *addr_str = addr_tuple4_to_str(&inner_addr); LOG_ERROR("%s: unexpected inject packet, unable to find session %s from session table, drop !!!", LOG_TAG_PKTIO, addr_str); free(addr_str); return NULL; } return (struct session_ctx *)node->value; } /****************************************************************************** * action bypass/block/forward ******************************************************************************/ static void vxlan_encapsulate(char *buffer, const char *src_mac_str, const char *dst_mac_str, const char *src_ip_str, const char *dst_ip_str, int payload_len, int is_e2i, int is_decrypted, int sf_index) { struct ethhdr *eth_hdr = (struct ethhdr *)buffer; struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr)); struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip)); struct g_vxlan *g_vxlan_hdr = (struct g_vxlan *)((char *)udp_hdr + sizeof(struct udp_hdr)); memset(g_vxlan_hdr, 0, sizeof(struct g_vxlan)); g_vxlan_set_packet_dir(g_vxlan_hdr, is_e2i); g_vxlan_set_sf_index(g_vxlan_hdr, sf_index); g_vxlan_set_traffic_type(g_vxlan_hdr, is_decrypted); build_ether_header(eth_hdr, ETH_P_IP, src_mac_str, dst_mac_str); build_ip_header(ip_hdr, IPPROTO_UDP, src_ip_str, dst_ip_str, sizeof(struct udp_hdr) + sizeof(struct g_vxlan) + payload_len); build_udp_header((const char *)&ip_hdr->ip_src, 8, udp_hdr, rand() % (65535 - 49152) + 49152, 4789, sizeof(struct g_vxlan) + payload_len); } static int send_packet_to_sf(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; const char *src_mac_str = packet_io->config.dev_endpoint_src_mac; const char *dst_mac_str = sf->sf_dst_mac; const char *src_ip_str = packet_io->config.dev_endpoint_src_ip; const char *dst_ip_str = sf->sf_dst_ip; int payload_len = meta->raw_len; int is_e2i = meta->is_e2i_dir; int is_decrypted = meta->is_decrypted; int sf_index = sf->sf_index; int prepend_len = 0; char *buffer = NULL; marsio_buff_ctrlzone_reset(rx_buff); switch (sf->sf_connectivity.method) { case PACKAGE_METHOD_VXLAN_G: prepend_len = sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr) + sizeof(struct g_vxlan); buffer = marsio_buff_prepend(rx_buff, prepend_len); vxlan_encapsulate(buffer, src_mac_str, dst_mac_str, src_ip_str, dst_ip_str, payload_len, is_e2i, is_decrypted, sf_index); break; case PACKAGE_METHOD_LAYER2_SWITCH: // TODO break; case PACKAGE_METHOD_LAYER3_SWITCH: // TODO break; default: break; } int nsend = marsio_buff_datalen(rx_buff); marsio_send_burst(packet_io->dev_endpoint.mr_path, thread_index, &rx_buff, 1); return nsend; } static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx); static void action_err_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int nsend = action_nf_inject(rx_buff, meta, sf, thread_ctx); if (nsend > 0) { throughput_metrics_inc(&(g_metrics->raw_pkt.error_bypass), 1, nsend); } } static void action_err_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; int raw_len = marsio_buff_datalen(rx_buff); throughput_metrics_inc(&(g_metrics->raw_pkt.error_block), 1, raw_len); marsio_buff_free(packet_io->instance, &rx_buff, 1, 0, thread_index); } // return nsend static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; marsio_buff_ctrlzone_reset(rx_buff); if (mbuff_set_metadata(rx_buff, meta) != 0) { action_err_block(rx_buff, meta, sf, thread_ctx); return 0; } int raw_len = marsio_buff_datalen(rx_buff); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, &rx_buff, 1); throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len); return raw_len; } static void action_mirr_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int raw_len = marsio_buff_datalen(rx_buff); throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_bypass), 1, raw_len); } static void action_mirr_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int raw_len = marsio_buff_datalen(rx_buff); throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_block), 1, raw_len); } static void action_mirr_forward(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); marsio_buff_t *new_buff = NULL; if (marsio_buff_malloc_global(packet_io->instance, &new_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) { LOG_ERROR("%s: unable to malloc buff on marsio instance, thread_index: %d", LOG_TAG_PKTIO, thread_index); return; } char *copy_ptr = marsio_buff_append(new_buff, raw_len); memcpy(copy_ptr, raw_data, raw_len); int nsend = send_packet_to_sf(new_buff, meta, sf, thread_ctx); throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, nsend); throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_tx), 1, raw_len); throughput_metrics_inc(&sf->tx, 1, nsend); sf_metrics_inc(thread_ctx->sf_metrics, sf->policy_id, sf->sff_profile_id, sf->sf_profile_id, 0, 0, 1, nsend); } static void action_stee_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int raw_len = marsio_buff_datalen(rx_buff); throughput_metrics_inc(&(g_metrics->raw_pkt.stee_bypass), 1, raw_len); } static void action_stee_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; int raw_len = marsio_buff_datalen(rx_buff); throughput_metrics_inc(&(g_metrics->raw_pkt.stee_block), 1, raw_len); marsio_buff_free(packet_io->instance, &rx_buff, 1, 0, thread_index); } static void action_stee_forward(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int raw_len = marsio_buff_datalen(rx_buff); int nsend = send_packet_to_sf(rx_buff, meta, sf, thread_ctx); throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, nsend); throughput_metrics_inc(&(g_metrics->raw_pkt.stee_tx), 1, raw_len); throughput_metrics_inc(&sf->tx, 1, nsend); sf_metrics_inc(thread_ctx->sf_metrics, sf->policy_id, sf->sff_profile_id, sf->sf_profile_id, 0, 0, 1, nsend); } static void action_sf_chaining(struct thread_ctx *thread_ctx, struct session_ctx *session_ctx, struct selected_chaining *chaining, marsio_buff_t *rx_buff, struct metadata *meta, int next_sf_index) { int sf_index; for (sf_index = next_sf_index; sf_index < chaining->chaining_used; sf_index++) { struct selected_sf *sf = &(chaining->chaining[sf_index]); LOG_INFO("%s: session: %lu %s execute chaining [%d/%d] policy_id: %d, sff_profile_id: %d, sf_profile_id: %d, sf_need_skip: %d, sf_action_reason: %s, is_e2i: %d, is_decrypted: %d", LOG_TAG_POLICY, session_ctx->session_id, session_ctx->session_addr, sf_index, chaining->chaining_used, sf->policy_id, sf->sff_profile_id, sf->sf_profile_id, sf->sf_need_skip, action_reason_to_string(sf->sf_action_reason), meta->is_e2i_dir, meta->is_decrypted); if (sf->sf_need_skip) { continue; } switch (sf->sf_action) { case SESSION_ACTION_BYPASS: if (sf->sff_forward_type == FORWARD_TYPE_STEERING) { action_stee_bypass(rx_buff, meta, sf, thread_ctx); continue; } else { action_mirr_bypass(rx_buff, meta, sf, thread_ctx); continue; } case SESSION_ACTION_BLOCK: if (sf->sff_forward_type == FORWARD_TYPE_STEERING) { action_stee_block(rx_buff, meta, sf, thread_ctx); return; } else { action_mirr_block(rx_buff, meta, sf, thread_ctx); return; } case SESSION_ACTION_FORWARD: if (sf->sf_connectivity.method != PACKAGE_METHOD_VXLAN_G) { LOG_ERROR("%s: processing packets, session %lu %s requires encapsulation format not supported, bypass !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr); action_err_bypass(rx_buff, meta, sf, thread_ctx); return; } if (sf->sff_forward_type == FORWARD_TYPE_STEERING) { action_stee_forward(rx_buff, meta, sf, thread_ctx); return; } else { action_mirr_forward(rx_buff, meta, sf, thread_ctx); continue; } } } if (sf_index == chaining->chaining_used) { action_nf_inject(rx_buff, meta, NULL, thread_ctx); } } /****************************************************************************** * handle session status ******************************************************************************/ static int send_event_log(struct session_ctx *session_ctx, struct selected_chaining *chaining, struct thread_ctx *thread_ctx) { struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; char buffer[32] = {0}; sprintf(buffer, "%lu", session_ctx->session_id); cJSON *root = cJSON_CreateObject(); cJSON_AddStringToObject(root, "tsync", "1.0"); cJSON_AddStringToObject(root, "session_id", buffer); cJSON_AddStringToObject(root, "state", "closing"); cJSON_AddStringToObject(root, "method", "log_update"); cJSON *sf_profile_ids = cJSON_CreateArray(); for (int i = 0; i < chaining->chaining_used; i++) { struct selected_sf *sf = &(chaining->chaining[i]); if (sf->sf_need_skip == 0 && sf->sf_action == SESSION_ACTION_FORWARD) { cJSON *id = cJSON_CreateNumber(sf->sf_profile_id); cJSON_AddItemToArray(sf_profile_ids, id); } } cJSON *params = cJSON_CreateObject(); cJSON_AddItemToObject(params, "sf_profile_ids", sf_profile_ids); cJSON_AddItemToObject(root, "params", params); char *json_str = cJSON_PrintUnformatted(root); LOG_INFO("%s: session %lu %s event log: %s", LOG_TAG_METRICS, session_ctx->session_id, session_ctx->session_addr, json_str); marsio_buff_t *tx_buffs[1]; char *raw_packet_header_data = session_ctx->ctrl_meta->raw_data; int raw_packet_header_len = session_ctx->ctrl_meta->l7offset; marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_index); char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + strlen(json_str)); memcpy(dst, raw_packet_header_data, raw_packet_header_len); memcpy(dst + raw_packet_header_len, json_str, strlen(json_str)); struct metadata meta = {0}; meta.session_id = session_ctx->session_id; meta.l7offset = raw_packet_header_len; meta.is_ctrl_pkt = 1; meta.sids.num = 1; meta.sids.elems[0] = sce_ctx->firewall_sids; route_ctx_copy(&meta.route_ctx, &session_ctx->ctrl_meta->route_ctx); mbuff_set_metadata(tx_buffs[0], &meta); int nsend = marsio_buff_datalen(tx_buffs[0]); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, tx_buffs, 1); free(json_str); cJSON_Delete(root); return nsend; } static void dump_event_log(struct session_ctx *session_ctx, struct selected_chaining *chaining, const char *tag) { if (chaining == NULL) { return; } for (int i = 0; i < chaining->chaining_used; i++) { struct selected_sf *sf = &(chaining->chaining[i]); LOG_INFO("%s: session %lu %s %s metrics log: policy %d sff_profile_id %d sf_profile_id %d sf_need_skip %d sf_action_reason %s rx_pkts %lu rx_bytes %lu tx_pkts %lu tx_bytes %lu", LOG_TAG_METRICS, session_ctx->session_id, session_ctx->session_addr, tag, sf->policy_id, sf->sff_profile_id, sf->sf_profile_id, sf->sf_need_skip, action_reason_to_string(sf->sf_action_reason), sf->rx.n_pkts, sf->rx.n_bytes, sf->tx.n_pkts, sf->tx.n_bytes); } } static void session_value_free_cb(void *ctx) { struct session_ctx *s_ctx = (struct session_ctx *)ctx; session_ctx_free(s_ctx); } static void handle_policy_mutil_hits(struct policy_enforcer *enforcer, struct session_ctx *session_ctx, struct ctrl_pkt_parser *ctrl_parser, raw_pkt_parser *raw_parser, int is_e2i_dir) { for (int i = 0; i < ctrl_parser->policy_id_num; i++) { int policy_id = ctrl_parser->policy_ids[i]; if (fixed_num_array_exist_elem(&session_ctx->policy_ids, policy_id)) { continue; } else { policy_enforce_select_chainings(enforcer, &session_ctx->chainings, session_ctx, raw_parser, policy_id, is_e2i_dir); selected_chaining_bref(session_ctx->chainings.chaining_raw); selected_chaining_bref(session_ctx->chainings.chaining_decrypted); fixed_num_array_add_elem(&session_ctx->policy_ids, policy_id); } } } static void handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct policy_enforcer *enforcer = thread_ctx->ref_enforcer; struct session_table *session_table = thread_ctx->session_table; int chaining_size = policy_enforce_chaining_size(enforcer); #if 0 if (session_table_search_by_id(session_table, meta->session_id)) { return ; } #endif struct raw_pkt_parser raw_parser; struct addr_tuple4 inner_tuple4; raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8); const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len); raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_tuple4); uint16_t real_offset = (char *)payload - meta->raw_data; if (real_offset != meta->l7offset) { char *addr_str = addr_tuple4_to_str(&inner_tuple4); LOG_ERROR("%s: incorrect dataoffset %d in the control zone of session %lu %s, the expect value is %d", LOG_TAG_PKTIO, meta->l7offset, meta->session_id, addr_str, real_offset); free(addr_str); } struct session_ctx *session_ctx = session_ctx_new(); session_ctx->session_id = meta->session_id; session_ctx->session_addr = addr_tuple4_to_str(&inner_tuple4); addr_tuple4_copy(&session_ctx->inner_tuple4, &inner_tuple4); metadata_deep_copy(session_ctx->ctrl_meta, meta); session_ctx->chainings.chaining_raw = selected_chaining_create(chaining_size, session_ctx->session_id, session_ctx->session_addr); session_ctx->chainings.chaining_decrypted = selected_chaining_create(chaining_size, session_ctx->session_id, session_ctx->session_addr); session_ctx->ref_thread_ctx = thread_ctx; LOG_INFO("%s: session %lu %s active first", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr); handle_policy_mutil_hits(enforcer, session_ctx, ctrl_parser, &raw_parser, meta->is_e2i_dir); session_table_insert(session_table, session_ctx->session_id, &session_ctx->inner_tuple4, session_ctx, session_value_free_cb); ATOMIC_INC(&(g_metrics->sf_session.num)); } static void handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct session_table *session_table = thread_ctx->session_table; struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; int nsend = 0; struct session_node *node = session_table_search_by_id(session_table, meta->session_id); if (node) { struct session_ctx *s_ctx = (struct session_ctx *)node->value; LOG_INFO("%s: session %lu %s closing", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->session_addr); struct selected_chaining *chaining_raw = s_ctx->chainings.chaining_raw; dump_event_log(s_ctx, chaining_raw, "raw_traffic"); if (chaining_raw->chaining_used && sce_ctx->enable_send_log) { nsend = send_event_log(s_ctx, chaining_raw, thread_ctx); ATOMIC_INC(&(g_metrics->sf_session.log)); throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, nsend); throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, nsend); } struct selected_chaining *chaining_decrypted = s_ctx->chainings.chaining_decrypted; dump_event_log(s_ctx, chaining_decrypted, "decrypted_traffic"); if (chaining_decrypted->chaining_used && sce_ctx->enable_send_log) { nsend = send_event_log(s_ctx, chaining_decrypted, thread_ctx); ATOMIC_INC(&(g_metrics->sf_session.log)); throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, nsend); throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, nsend); } session_table_delete_by_id(session_table, meta->session_id); ATOMIC_DEC(&(g_metrics->sf_session.num)); } } static void handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx) { struct session_table *session_table = thread_ctx->session_table; struct policy_enforcer *enforcer = thread_ctx->ref_enforcer; struct session_node *node = session_table_search_by_id(session_table, meta->session_id); if (node) { struct session_ctx *session_ctx = (struct session_ctx *)node->value; struct raw_pkt_parser raw_parser; raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8); const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len); uint16_t real_offset = (char *)payload - meta->raw_data; if (real_offset != meta->l7offset) { LOG_ERROR("%s: incorrect dataoffset %d in the control zone of session %lu %s, the expect value is %d", LOG_TAG_PKTIO, meta->l7offset, meta->session_id, session_ctx->session_addr, real_offset); } LOG_INFO("%s: session %lu %s active again", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr); handle_policy_mutil_hits(enforcer, session_ctx, ctrl_parser, &raw_parser, meta->is_e2i_dir); } else { handle_session_opening(meta, ctrl_parser, thread_ctx); } } static void handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; LOG_ERROR("%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); ATOMIC_ZERO(&(g_metrics->sf_session.num)); for (int i = 0; i < sce_ctx->nr_worker_threads; i++) { struct thread_ctx *temp_ctx = &sce_ctx->work_threads[i]; ATOMIC_INC(&temp_ctx->session_table_need_reset); } } /****************************************************************************** * handle control/raw/inject packet ******************************************************************************/ static void handle_control_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct metadata meta; struct ctrl_pkt_parser ctrl_parser; if (mbuff_get_metadata(rx_buff, &meta) == -1) { LOG_ERROR("%s: unexpected control packet, unable to get metadata", LOG_TAG_PKTIO); goto error_ctrl_pkt; } ctrl_packet_parser_init(&ctrl_parser); if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7offset, meta.raw_len - meta.l7offset) == -1) { LOG_ERROR("%s: unexpected control packet, unable to parse data", LOG_TAG_PKTIO); goto error_ctrl_pkt; } if (ctrl_parser.session_id != meta.session_id) { LOG_ERROR("%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id); goto error_ctrl_pkt; } switch (ctrl_parser.state) { case SESSION_STATE_OPENING: ATOMIC_INC(&(g_metrics->ctrl_pkt.opening)); // when session opening, firewall not send policy id // return handle_session_opening(&meta, &ctrl_parser, ctx); break; case SESSION_STATE_CLOSING: ATOMIC_INC(&(g_metrics->ctrl_pkt.closing)); handle_session_closing(&meta, &ctrl_parser, thread_ctx); break; case SESSION_STATE_ACTIVE: ATOMIC_INC(&(g_metrics->ctrl_pkt.active)); handle_session_active(&meta, &ctrl_parser, thread_ctx); break; case SESSION_STATE_RESETALL: ATOMIC_INC(&(g_metrics->ctrl_pkt.resetall)); handle_session_resetall(&meta, &ctrl_parser, thread_ctx); break; default: goto error_ctrl_pkt; } return; error_ctrl_pkt: ATOMIC_INC(&(g_metrics->ctrl_pkt.error)); return; } static void handle_raw_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx) { struct session_table *session_table = thread_ctx->session_table; struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct metadata meta; struct session_ctx *session_ctx = NULL; struct selected_chaining *chaining = NULL; if (mbuff_get_metadata(rx_buff, &meta) == -1) { LOG_ERROR("%s: unexpected raw packet, unable to get metadata, bypass !!!", LOG_TAG_PKTIO); goto error_bypass; } session_ctx = raw_packet_search_session(session_table, meta.raw_data, meta.raw_len, meta.session_id); if (session_ctx == NULL) { throughput_metrics_inc(&(g_metrics->raw_pkt.miss_sess), 1, meta.raw_len); goto error_bypass; } if (meta.is_e2i_dir) { if (metadata_is_empty(session_ctx->raw_meta_e2i)) { metadata_deep_copy(session_ctx->raw_meta_e2i, &meta); } } else { if (metadata_is_empty(session_ctx->raw_meta_i2e)) { metadata_deep_copy(session_ctx->raw_meta_i2e, &meta); } } if (meta.is_decrypted == 1) { chaining = session_ctx->chainings.chaining_decrypted; } else { chaining = session_ctx->chainings.chaining_raw; } if (chaining == NULL) { LOG_ERROR("%s: unexpected raw packet, session %lu %s misses policy, bypass !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr); goto error_bypass; } action_sf_chaining(thread_ctx, session_ctx, chaining, rx_buff, &meta, 0); return; error_bypass: action_err_bypass(rx_buff, &meta, NULL, thread_ctx); } static void handle_inject_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx) { struct session_table *session_table = thread_ctx->session_table; struct global_metrics *g_metrics = thread_ctx->ref_metrics; struct metadata meta; struct g_vxlan *g_vxlan_hdr = NULL; struct session_ctx *session_ctx = NULL; struct selected_chaining *chaining = NULL; int sf_index = 0; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); if (g_vxlan_decode(&g_vxlan_hdr, raw_data, raw_len) == -1) { goto error_block; } memset(&meta, 0, sizeof(struct metadata)); meta.raw_data = (char *)g_vxlan_hdr + sizeof(struct g_vxlan); meta.raw_len = raw_len - sizeof(struct ethhdr) - sizeof(struct ip) - sizeof(struct udp_hdr) - sizeof(struct g_vxlan); meta.l7offset = 0; meta.is_e2i_dir = g_vxlan_get_packet_dir(g_vxlan_hdr); meta.is_ctrl_pkt = 0; meta.is_decrypted = g_vxlan_get_traffic_type(g_vxlan_hdr); sf_index = g_vxlan_get_sf_index(g_vxlan_hdr); session_ctx = inject_packet_search_session(session_table, meta.raw_data, meta.raw_len); if (session_ctx == NULL) { goto error_block; } meta.session_id = session_ctx->session_id; if (meta.is_e2i_dir) { sids_copy(&meta.sids, &session_ctx->raw_meta_e2i->sids); route_ctx_copy(&meta.route_ctx, &session_ctx->raw_meta_e2i->route_ctx); } else { sids_copy(&meta.sids, &session_ctx->raw_meta_i2e->sids); route_ctx_copy(&meta.route_ctx, &session_ctx->raw_meta_i2e->route_ctx); } if (meta.is_decrypted == 1) { chaining = session_ctx->chainings.chaining_decrypted; } else { chaining = session_ctx->chainings.chaining_raw; } if (chaining == NULL || sf_index < 0 || sf_index >= chaining->chaining_used) { LOG_ERROR("%s: unexpected inject packet, session %lu %s misses chaining index, drop !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr); goto error_block; } if (chaining->chaining[sf_index].sff_forward_type == FORWARD_TYPE_MIRRORING) { LOG_DEBUG("%s: unexpected inject packet, session %lu %s with sf_profile_id %d executes mirror and does not require reflow, drop !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr, chaining->chaining[sf_index].sf_profile_id); throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_rx_drop), 1, meta.raw_len); goto error_block; } else { struct selected_sf *sf = &(chaining->chaining[sf_index]); throughput_metrics_inc(&sf->rx, 1, raw_len); throughput_metrics_inc(&(g_metrics->raw_pkt.stee_rx), 1, meta.raw_len); sf_metrics_inc(thread_ctx->sf_metrics, sf->policy_id, sf->sff_profile_id, sf->sf_profile_id, 1, raw_len, 0, 0); } marsio_buff_adj(rx_buff, raw_len - meta.raw_len); action_sf_chaining(thread_ctx, session_ctx, chaining, rx_buff, &meta, sf_index + 1); return; error_block: throughput_metrics_inc(&(g_metrics->device.endpoint_drop), 1, raw_len); marsio_buff_adj(rx_buff, raw_len - meta.raw_len); action_err_block(rx_buff, &meta, NULL, thread_ctx); } /****************************************************************************** * packet io ******************************************************************************/ // return 0 : success // return -1 : error static int packet_io_config(const char *profile, struct config *config) { MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_all_traffic", (int *)&(config->bypass_all_traffic), 0); MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1); MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol)); MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint", config->dev_endpoint, sizeof(config->dev_endpoint)); MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface)); MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint_src_ip", config->dev_endpoint_src_ip, sizeof(config->dev_endpoint_src_ip)); MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint_src_mac", config->dev_endpoint_src_mac, sizeof(config->dev_endpoint_src_mac)); if (config->rx_burst_max > RX_BURST_MAX) { LOG_ERROR("%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX); return -1; } if (strlen(config->app_symbol) == 0) { LOG_ERROR("%s: invalid app_symbol in %s", LOG_TAG_PKTIO, profile); return -1; } if (strlen(config->dev_endpoint) == 0) { LOG_ERROR("%s: invalid dev_endpoint in %s", LOG_TAG_PKTIO, profile); return -1; } if (strlen(config->dev_nf_interface) == 0) { LOG_ERROR("%s: invalid dev_nf_interface in %s", LOG_TAG_PKTIO, profile); return -1; } LOG_DEBUG("%s: PACKET_IO->bypass_all_traffic : %d", LOG_TAG_PKTIO, config->bypass_all_traffic); LOG_DEBUG("%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max); LOG_DEBUG("%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol); LOG_DEBUG("%s: PACKET_IO->dev_endpoint : %s", LOG_TAG_PKTIO, config->dev_endpoint); LOG_DEBUG("%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface); LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_ip : %s", LOG_TAG_PKTIO, config->dev_endpoint_src_ip); if (strlen(config->dev_endpoint_src_mac)) { LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_mac : %s (get from configuration file)", LOG_TAG_PKTIO, config->dev_endpoint_src_mac); } return 0; } struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask) { int opt = 1; struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io)); assert(handle != NULL); handle->thread_num = thread_num; if (packet_io_config(profile, &(handle->config)) != 0) { goto error_out; } handle->instance = marsio_create(); if (handle->instance == NULL) { LOG_ERROR("%s: unable to create marsio instance", LOG_TAG_PKTIO); goto error_out; } if (marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, coremask, sizeof(cpu_set_t)) != 0) { LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO); goto error_out; } if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) { LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO); goto error_out; } if (marsio_init(handle->instance, handle->config.app_symbol) != 0) { LOG_ERROR("%s: unable to initialize marsio instance", LOG_TAG_PKTIO); goto error_out; } handle->dev_nf_interface.mr_dev = marsio_open_device(handle->instance, handle->config.dev_nf_interface, handle->thread_num, handle->thread_num); if (handle->dev_nf_interface.mr_dev == NULL) { LOG_ERROR("%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface); goto error_out; } handle->dev_nf_interface.mr_path = marsio_sendpath_create_by_vdev(handle->dev_nf_interface.mr_dev); if (handle->dev_nf_interface.mr_path == NULL) { LOG_ERROR("%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface); goto error_out; } handle->dev_endpoint.mr_dev = marsio_open_device(handle->instance, handle->config.dev_endpoint, handle->thread_num, handle->thread_num); if (handle->dev_endpoint.mr_dev == NULL) { LOG_ERROR("%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_endpoint); goto error_out; } handle->dev_endpoint.mr_path = marsio_sendpath_create_by_vdev(handle->dev_endpoint.mr_dev); if (handle->dev_endpoint.mr_path == NULL) { LOG_ERROR("%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_endpoint); goto error_out; } if (strlen(handle->config.dev_endpoint_src_mac) == 0) { marsio_get_device_ether_addr(handle->dev_endpoint.mr_dev, handle->config.dev_endpoint_src_mac, sizeof(handle->config.dev_endpoint_src_mac)); LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_mac : %s (get from marsio api)", LOG_TAG_PKTIO, handle->config.dev_endpoint_src_mac); } return handle; error_out: packet_io_destory(handle); return NULL; } void packet_io_destory(struct packet_io *handle) { if (handle) { if (handle->dev_nf_interface.mr_path) { marsio_sendpath_destory(handle->dev_nf_interface.mr_path); handle->dev_nf_interface.mr_path = NULL; } if (handle->dev_nf_interface.mr_dev) { marsio_close_device(handle->dev_nf_interface.mr_dev); handle->dev_nf_interface.mr_dev = NULL; } if (handle->dev_endpoint.mr_path) { marsio_sendpath_destory(handle->dev_endpoint.mr_path); handle->dev_endpoint.mr_path = NULL; } if (handle->dev_endpoint.mr_dev) { marsio_close_device(handle->dev_endpoint.mr_dev); handle->dev_endpoint.mr_dev = NULL; } if (handle->instance) { marsio_destory(handle->instance); handle->instance = NULL; } free(handle); handle = NULL; } } int packet_io_thread_init(struct packet_io *handle, struct thread_ctx *thread_ctx) { if (marsio_thread_init(handle->instance) != 0) { LOG_ERROR("%s: unable to init marsio thread %d", LOG_TAG_PKTIO, thread_ctx->thread_index); return -1; } return 0; } void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_ctx, int timeout_ms) { struct mr_vdev *vdevs[] = { handle->dev_nf_interface.mr_dev, handle->dev_endpoint.mr_dev}; marsio_poll_wait(handle->instance, vdevs, 2, thread_ctx->thread_index, timeout_ms); } int packet_io_thread_polling_nf(struct packet_io *handle, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int thread_index = thread_ctx->thread_index; marsio_buff_t *rx_buffs[RX_BURST_MAX]; int nr_recv = marsio_recv_burst(handle->dev_nf_interface.mr_dev, thread_index, rx_buffs, handle->config.rx_burst_max); if (nr_recv <= 0) { return 0; } if (handle->config.bypass_all_traffic == 1) { for (int j = 0; j < nr_recv; j++) { int raw_len = marsio_buff_datalen(rx_buffs[j]); throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len); } marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, rx_buffs, nr_recv); return nr_recv; } for (int j = 0; j < nr_recv; j++) { marsio_buff_t *rx_buff = rx_buffs[j]; int raw_len = marsio_buff_datalen(rx_buff); if (is_downlink_keepalive_packet(rx_buff)) { throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len); throughput_metrics_inc(&(g_metrics->kee_pkt.downlink_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->kee_pkt.downlink_tx), 1, raw_len); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, &rx_buff, 1); } else if (marsio_buff_is_ctrlbuf(rx_buff)) { throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len); throughput_metrics_inc(&(g_metrics->ctrl_pkt.rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, raw_len); handle_control_packet(rx_buff, thread_ctx); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, &rx_buff, 1); } else { throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len); handle_raw_packet(rx_buff, thread_ctx); } } return nr_recv; } int packet_io_thread_polling_endpoint(struct packet_io *handle, struct thread_ctx *thread_ctx) { struct global_metrics *g_metrics = thread_ctx->ref_metrics; int thread_index = thread_ctx->thread_index; marsio_buff_t *rx_buffs[RX_BURST_MAX]; int nr_recv = marsio_recv_burst(handle->dev_endpoint.mr_dev, thread_index, rx_buffs, handle->config.rx_burst_max); if (nr_recv <= 0) { return 0; } if (handle->config.bypass_all_traffic == 1) { for (int j = 0; j < nr_recv; j++) { int raw_len = marsio_buff_datalen(rx_buffs[j]); throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, raw_len); } marsio_send_burst(handle->dev_endpoint.mr_path, thread_index, rx_buffs, nr_recv); return nr_recv; } for (int j = 0; j < nr_recv; j++) { marsio_buff_t *rx_buff = rx_buffs[j]; int raw_len = marsio_buff_datalen(rx_buff); if (is_uplink_keepalive_packet(rx_buff)) { throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->kee_pkt.uplink_rx), 1, raw_len); throughput_metrics_inc(&(g_metrics->kee_pkt.uplink_tx_drop), 1, raw_len); marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_index); } else { throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len); handle_inject_packet(rx_buff, thread_ctx); } } return nr_recv; } struct mr_instance *packet_io_get_mr_instance(struct packet_io *handle) { if (handle) { return handle->instance; } else { return NULL; } }