This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tsg-service-chaining-…/platform/src/packet_io.cpp

1437 lines
50 KiB
C++
Raw Normal View History

#include <assert.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/ether.h>
#include "mpack.h"
#include <MESA/MESA_prof_load.h>
#include "log.h"
#include "sce.h"
#include "utils.h"
#include "g_vxlan.h"
#include "packet_io.h"
#include "sf_metrics.h"
#include "ctrl_packet.h"
#include "global_metrics.h"
#define RX_BURST_MAX 128
#define MR_MASK_DECRYPTED 0x01
/******************************************************************************
* struct
******************************************************************************/
struct config
{
int bypass_traffic;
int rx_burst_max;
char app_symbol[256];
char dev_endpoint[256];
char dev_nf_interface[256];
char dev_endpoint_src_ip[16];
char dev_endpoint_src_mac[32];
};
struct device
{
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
};
struct packet_io
{
int thread_num;
struct mr_instance *instance;
struct device dev_nf_interface;
struct device dev_endpoint;
struct config config;
};
/******************************************************************************
* metadata
******************************************************************************/
// return 0 : success
// return -1 : error
int mbuff_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta)
{
memset(meta, 0, sizeof(struct metadata));
meta->raw_len = marsio_buff_datalen(rx_buff);
meta->raw_data = marsio_buff_mtod(rx_buff);
if (meta->raw_data == NULL || meta->raw_len == 0)
{
LOG_ERROR("%s: unable to get raw_data from metadata", LOG_TAG_PKTIO);
return -1;
}
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) <= 0)
{
LOG_ERROR("%s: unable to get session_id from metadata", LOG_TAG_PKTIO);
return -1;
}
// 1: E2I
// 0: I2E
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_DIR, &(meta->is_e2i_dir), sizeof(meta->is_e2i_dir)) <= 0)
{
LOG_ERROR("%s: unable to get buff_dir from metadata", LOG_TAG_PKTIO);
return -1;
}
if (marsio_buff_is_ctrlbuf(rx_buff))
{
meta->is_ctrl_pkt = 1;
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) <= 0)
{
LOG_ERROR("%s: unable to get l7offset from metadata", LOG_TAG_PKTIO);
return -1;
}
}
else
{
meta->is_ctrl_pkt = 0;
uint16_t user_data = 0;
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_USER_0, &user_data, sizeof(user_data)) <= 0)
{
LOG_ERROR("%s: unable to get is_decrypted from metadata", LOG_TAG_PKTIO);
return -1;
}
if (user_data & MR_MASK_DECRYPTED)
{
meta->is_decrypted = 1;
}
else
{
meta->is_decrypted = 0;
}
}
meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0]));
if (meta->sids.num < 0)
{
LOG_ERROR("%s: unable to get sid_list from metadata", LOG_TAG_PKTIO);
return -1;
}
meta->route_ctx.len = marsio_buff_get_metadata(rx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, sizeof(meta->route_ctx.data));
if (meta->route_ctx.len <= 0)
{
LOG_ERROR("%s: unable to get route_ctx from metadata", LOG_TAG_PKTIO);
return -1;
}
return 0;
}
// return 0 : success
// return -1 : error
int mbuff_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta)
{
if (meta->session_id)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) != 0)
{
LOG_ERROR("%s: unable to set session_id for metadata", LOG_TAG_PKTIO);
return -1;
}
}
// need't set MR_BUFF_DIR, set MR_BUFF_ROUTE_CTX instead
if (meta->is_ctrl_pkt)
{
marsio_buff_set_ctrlbuf(tx_buff);
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0)
{
LOG_ERROR("%s: unable to set l7offset for metadata", LOG_TAG_PKTIO);
return -1;
}
}
else
{
uint16_t user_data = 0;
if (meta->is_decrypted)
{
user_data = MR_MASK_DECRYPTED;
}
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_USER_0, &user_data, sizeof(user_data)) != 0)
{
LOG_ERROR("%s: unable to set is_decrypted for metadata", LOG_TAG_PKTIO);
return -1;
}
}
if (meta->sids.num > 0)
{
if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0)
{
LOG_ERROR("%s: unable to set sid_list for metadata", LOG_TAG_PKTIO);
return -1;
}
}
if (meta->route_ctx.len > 0)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, meta->route_ctx.len) != 0)
{
LOG_ERROR("%s: unable to set route_ctx for metadata", LOG_TAG_PKTIO);
return -1;
}
}
return 0;
}
static void update_session_by_metadata(struct session_ctx *ctx, struct metadata *meta)
{
struct metadata *dst_meta_i2e = NULL;
struct metadata *dst_meta_e2i = NULL;
if (meta->is_decrypted)
{
dst_meta_i2e = ctx->decrypted_meta_i2e;
dst_meta_e2i = ctx->decrypted_meta_e2i;
}
else
{
dst_meta_i2e = ctx->raw_meta_i2e;
dst_meta_e2i = ctx->raw_meta_e2i;
}
if (meta->is_e2i_dir)
{
// first packet update metadata
if (metadata_is_empty(dst_meta_e2i))
{
metadata_shallow_copy(dst_meta_e2i, meta);
}
else
{
// next packet only update sids
sids_copy(&dst_meta_e2i->sids, &meta->sids);
}
}
else
{
// first packet update metadata
if (metadata_is_empty(dst_meta_i2e))
{
metadata_shallow_copy(dst_meta_i2e, meta);
}
else
{
// next packet only update sids
sids_copy(&dst_meta_i2e->sids, &meta->sids);
}
}
}
static void update_metadata_by_session(struct session_ctx *ctx, struct metadata *meta)
{
struct sids *sids = NULL;
struct route_ctx *route_ctx = NULL;
meta->session_id = ctx->session_id;
if (meta->is_e2i_dir)
{
if (meta->is_decrypted)
{
sids = &ctx->decrypted_meta_e2i->sids;
route_ctx = &ctx->decrypted_meta_e2i->route_ctx;
}
else
{
sids = &ctx->raw_meta_e2i->sids;
route_ctx = &ctx->raw_meta_e2i->route_ctx;
}
}
else
{
if (meta->is_decrypted)
{
sids = &ctx->decrypted_meta_i2e->sids;
route_ctx = &ctx->decrypted_meta_i2e->route_ctx;
}
else
{
sids = &ctx->raw_meta_i2e->sids;
route_ctx = &ctx->raw_meta_i2e->route_ctx;
}
}
sids_copy(&meta->sids, sids);
route_ctx_copy(&meta->route_ctx, route_ctx);
}
/******************************************************************************
* keepalive
******************************************************************************/
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_downlink_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
if (eth_hdr->h_proto == 0xAAAA)
{
return 1;
}
else
{
return 0;
}
}
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_uplink_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
if (eth_hdr->h_proto != htons(ETH_P_IP))
{
return 0;
}
struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr));
if (ip_hdr->ip_p != IPPROTO_UDP)
{
return 0;
}
struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip));
if (udp_hdr->uh_dport != htons(3784))
{
return 0;
}
return 1;
}
/******************************************************************************
* search session ctx
******************************************************************************/
// return !NULL
// return NULL
static struct session_ctx *raw_packet_search_session(struct session_table *table, const char *raw_data, int raw_len, uint64_t session_id)
{
struct addr_tuple4 inner_addr;
struct addr_tuple4 reverse_addr;
struct raw_pkt_parser raw_parser;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
memset(&reverse_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr);
addr_tuple4_reverse(&inner_addr, &reverse_addr);
struct session_node *node = session_table_search_by_id(table, session_id);
if (node == NULL)
{
return NULL;
}
struct session_ctx *session_ctx = (struct session_ctx *)node->value;
if (memcmp(&session_ctx->inner_tuple4, &inner_addr, sizeof(struct addr_tuple4)) != 0 && memcmp(&session_ctx->inner_tuple4, &reverse_addr, sizeof(struct addr_tuple4)) != 0)
{
char *addr_str = addr_tuple4_to_str(&inner_addr);
LOG_ERROR("%s: unexpected raw packet, session %lu expected address tuple4 is %s, but current packet's address tuple4 is %s, bypass !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr, addr_str);
free(addr_str);
return NULL;
}
return session_ctx;
}
// return !NULL
// return NULL
static struct session_ctx *inject_packet_search_session(struct session_table *table, const char *raw_data, int raw_len)
{
struct addr_tuple4 inner_addr;
struct raw_pkt_parser raw_parser;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)raw_data, raw_len);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr);
struct session_node *node = session_table_search_by_addr(table, &inner_addr);
if (node == NULL)
{
char *addr_str = addr_tuple4_to_str(&inner_addr);
LOG_ERROR("%s: unexpected inject packet, unable to find session %s from session table, drop !!!", LOG_TAG_PKTIO, addr_str);
free(addr_str);
return NULL;
}
return (struct session_ctx *)node->value;
}
/******************************************************************************
* action bypass/block/forward
******************************************************************************/
static void vxlan_encapsulate(char *buffer, const char *src_mac_str, const char *dst_mac_str, const char *src_ip_str, const char *dst_ip_str, int payload_len, int is_e2i, int is_decrypted, int sf_index)
{
struct ethhdr *eth_hdr = (struct ethhdr *)buffer;
struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr));
struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip));
struct g_vxlan *g_vxlan_hdr = (struct g_vxlan *)((char *)udp_hdr + sizeof(struct udp_hdr));
memset(g_vxlan_hdr, 0, sizeof(struct g_vxlan));
g_vxlan_set_packet_dir(g_vxlan_hdr, is_e2i);
g_vxlan_set_sf_index(g_vxlan_hdr, sf_index);
g_vxlan_set_traffic_type(g_vxlan_hdr, is_decrypted);
build_ether_header(eth_hdr, ETH_P_IP, src_mac_str, dst_mac_str);
build_ip_header(ip_hdr, IPPROTO_UDP, src_ip_str, dst_ip_str, sizeof(struct udp_hdr) + sizeof(struct g_vxlan) + payload_len);
build_udp_header((const char *)&ip_hdr->ip_src, 8, udp_hdr, rand() % (65535 - 49152) + 49152, 4789, sizeof(struct g_vxlan) + payload_len);
}
static int send_packet_to_sf(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
const char *src_mac_str = packet_io->config.dev_endpoint_src_mac;
const char *dst_mac_str = sf->sf_dst_mac;
const char *src_ip_str = packet_io->config.dev_endpoint_src_ip;
const char *dst_ip_str = sf->sf_dst_ip;
int payload_len = meta->raw_len;
int is_e2i = meta->is_e2i_dir;
int is_decrypted = meta->is_decrypted;
int sf_index = sf->sf_index;
int prepend_len = 0;
char *buffer = NULL;
marsio_buff_ctrlzone_reset(rx_buff);
switch (sf->sf_connectivity.method)
{
case PACKAGE_METHOD_VXLAN_G:
prepend_len = sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr) + sizeof(struct g_vxlan);
buffer = marsio_buff_prepend(rx_buff, prepend_len);
vxlan_encapsulate(buffer, src_mac_str, dst_mac_str, src_ip_str, dst_ip_str, payload_len, is_e2i, is_decrypted, sf_index);
break;
case PACKAGE_METHOD_LAYER2_SWITCH:
// TODO
break;
case PACKAGE_METHOD_LAYER3_SWITCH:
// TODO
break;
default:
break;
}
int nsend = marsio_buff_datalen(rx_buff);
marsio_send_burst(packet_io->dev_endpoint.mr_path, thread_index, &rx_buff, 1);
return nsend;
}
static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx);
static void action_err_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int nsend = action_nf_inject(rx_buff, meta, sf, thread_ctx);
if (nsend > 0)
{
throughput_metrics_inc(&(g_metrics->raw_pkt.error_bypass), 1, nsend);
}
}
static void action_err_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.error_block), 1, raw_len);
marsio_buff_free(packet_io->instance, &rx_buff, 1, 0, thread_index);
}
// return nsend
static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
marsio_buff_ctrlzone_reset(rx_buff);
if (mbuff_set_metadata(rx_buff, meta) != 0)
{
action_err_block(rx_buff, meta, sf, thread_ctx);
return 0;
}
int raw_len = marsio_buff_datalen(rx_buff);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, &rx_buff, 1);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
return raw_len;
}
static void action_mirr_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_bypass), 1, raw_len);
}
static void action_mirr_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_block), 1, raw_len);
}
static void action_mirr_forward(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
marsio_buff_t *new_buff = NULL;
if (marsio_buff_malloc_global(packet_io->instance, &new_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
LOG_ERROR("%s: unable to malloc buff on marsio instance, thread_index: %d", LOG_TAG_PKTIO, thread_index);
return;
}
char *copy_ptr = marsio_buff_append(new_buff, raw_len);
memcpy(copy_ptr, raw_data, raw_len);
int nsend = send_packet_to_sf(new_buff, meta, sf, thread_ctx);
throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_tx), 1, raw_len);
throughput_metrics_inc(&sf->tx, 1, nsend);
sf_metrics_inc(thread_ctx->sf_metrics, sf->rule_vsys_id, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, 0, 0, 1, nsend);
}
static void action_stee_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_bypass), 1, raw_len);
}
static void action_stee_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_block), 1, raw_len);
marsio_buff_free(packet_io->instance, &rx_buff, 1, 0, thread_index);
}
static void action_stee_forward(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
int nsend = send_packet_to_sf(rx_buff, meta, sf, thread_ctx);
throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_tx), 1, raw_len);
throughput_metrics_inc(&sf->tx, 1, nsend);
sf_metrics_inc(thread_ctx->sf_metrics, sf->rule_vsys_id, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, 0, 0, 1, nsend);
}
static void action_sf_chaining(struct thread_ctx *thread_ctx, struct session_ctx *session_ctx, struct selected_chaining *chaining, marsio_buff_t *rx_buff, struct metadata *meta, int next_sf_index)
{
int sf_index;
for (sf_index = next_sf_index; sf_index < chaining->chaining_used; sf_index++)
{
struct selected_sf *sf = &(chaining->chaining[sf_index]);
LOG_INFO("%s: session: %lu %s execute chaining [%d/%d] rule_id: %lu, sff_profile_id: %d, sf_profile_id: %d, sf_need_skip: %d, sf_action_reason: %s, is_e2i: %d, is_decrypted: %d",
LOG_TAG_POLICY, session_ctx->session_id, session_ctx->session_addr, sf_index, chaining->chaining_used,
sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, sf->sf_need_skip, action_reason_to_string(sf->sf_action_reason),
meta->is_e2i_dir, meta->is_decrypted);
if (sf->sf_need_skip)
{
continue;
}
switch (sf->sf_action)
{
case SESSION_ACTION_BYPASS:
if (sf->sff_forward_type == FORWARD_TYPE_STEERING)
{
action_stee_bypass(rx_buff, meta, sf, thread_ctx);
continue;
}
else
{
action_mirr_bypass(rx_buff, meta, sf, thread_ctx);
continue;
}
case SESSION_ACTION_BLOCK:
if (sf->sff_forward_type == FORWARD_TYPE_STEERING)
{
action_stee_block(rx_buff, meta, sf, thread_ctx);
return;
}
else
{
action_mirr_block(rx_buff, meta, sf, thread_ctx);
action_nf_inject(rx_buff, meta, NULL, thread_ctx);
return;
}
case SESSION_ACTION_FORWARD:
if (sf->sf_connectivity.method != PACKAGE_METHOD_VXLAN_G)
{
LOG_ERROR("%s: processing packets, session %lu %s requires encapsulation format not supported, bypass !!!",
LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr);
action_err_bypass(rx_buff, meta, sf, thread_ctx);
return;
}
if (sf->sff_forward_type == FORWARD_TYPE_STEERING)
{
action_stee_forward(rx_buff, meta, sf, thread_ctx);
return;
}
else
{
action_mirr_forward(rx_buff, meta, sf, thread_ctx);
continue;
}
}
}
if (sf_index == chaining->chaining_used)
{
action_nf_inject(rx_buff, meta, NULL, thread_ctx);
}
}
/******************************************************************************
* handle session status
******************************************************************************/
static int send_ctrl_packet(struct session_ctx *session_ctx, struct selected_chaining *chaining, struct thread_ctx *thread_ctx)
{
struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
char *data;
size_t size;
mpack_writer_t writer;
mpack_writer_init_growable(&writer, &data, &size);
// write the example on the msgpack homepage
mpack_build_map(&writer); // root begin
// tsync
mpack_write_cstr(&writer, "tsync");
mpack_write_cstr(&writer, "2.0");
// session_id
mpack_write_cstr(&writer, "session_id");
mpack_write_u64(&writer, session_ctx->session_id);
// state
mpack_write_cstr(&writer, "state");
mpack_write_cstr(&writer, "active");
// method
mpack_write_cstr(&writer, "method");
mpack_write_cstr(&writer, "log_update");
// params
{
mpack_write_cstr(&writer, "params");
mpack_build_map(&writer); // params value begin
// sce
{
mpack_write_cstr(&writer, "sce");
mpack_build_map(&writer); // sce value begin
mpack_write_cstr(&writer, "sf_profile_ids");
mpack_build_array(&writer);
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *sf = &(chaining->chaining[i]);
if (sf->sf_need_skip == 0 && sf->sf_action == SESSION_ACTION_FORWARD)
{
mpack_write_u32(&writer, sf->sf_profile_id);
}
}
mpack_complete_array(&writer);
mpack_complete_map(&writer); // sce value end
}
mpack_complete_map(&writer); // params value end
}
mpack_complete_map(&writer); // root end
// finish writing
if (mpack_writer_destroy(&writer) != mpack_ok)
{
assert(0);
if (data)
{
free(data);
data = NULL;
}
return 0;
}
LOG_INFO("%s: session %lu %s send event log %ld bytes", LOG_TAG_METRICS, session_ctx->session_id, session_ctx->session_addr, size);
marsio_buff_t *tx_buffs[1];
char *raw_packet_header_data = session_ctx->ctrl_meta->raw_data;
int raw_packet_header_len = session_ctx->ctrl_meta->l7offset;
marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_index);
char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size);
memcpy(dst, raw_packet_header_data, raw_packet_header_len);
memcpy(dst + raw_packet_header_len, data, size);
struct metadata meta = {0};
meta.session_id = session_ctx->session_id;
meta.l7offset = raw_packet_header_len;
meta.is_ctrl_pkt = 1;
meta.sids.num = 1;
meta.sids.elems[0] = sce_ctx->firewall_sids;
route_ctx_copy(&meta.route_ctx, &session_ctx->ctrl_meta->route_ctx);
mbuff_set_metadata(tx_buffs[0], &meta);
int nsend = marsio_buff_datalen(tx_buffs[0]);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, tx_buffs, 1);
free(data);
return nsend;
}
static void send_event_log(struct session_ctx *session_ctx, struct thread_ctx *thread_ctx)
{
int nsend = 0;
struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx;
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct selected_chaining *chaining_raw = session_ctx->chainings.chaining_raw;
struct selected_chaining *chaining_decrypted = session_ctx->chainings.chaining_decrypted;
if (chaining_raw->chaining_used && sce_ctx->enable_send_log)
{
nsend = send_ctrl_packet(session_ctx, chaining_raw, thread_ctx);
if (nsend > 0)
{
ATOMIC_INC(&(g_metrics->sf_session.log));
throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, nsend);
}
}
if (chaining_decrypted->chaining_used && sce_ctx->enable_send_log)
{
nsend = send_ctrl_packet(session_ctx, chaining_decrypted, thread_ctx);
if (nsend > 0)
{
ATOMIC_INC(&(g_metrics->sf_session.log));
throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, nsend);
}
}
}
static void dump_sf_metrics(struct session_ctx *session_ctx, struct selected_chaining *chaining, const char *tag)
{
if (chaining == NULL)
{
return;
}
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *sf = &(chaining->chaining[i]);
LOG_INFO("%s: session %lu %s %s metrics: rule_id %lu sff_profile_id %d sf_profile_id %d sf_need_skip %d sf_action_reason %s rx_pkts %lu rx_bytes %lu tx_pkts %lu tx_bytes %lu",
LOG_TAG_METRICS, session_ctx->session_id, session_ctx->session_addr, tag, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, sf->sf_need_skip, action_reason_to_string(sf->sf_action_reason), sf->rx.n_pkts, sf->rx.n_bytes, sf->tx.n_pkts, sf->tx.n_bytes);
}
}
static void session_value_free_cb(void *ctx)
{
struct session_ctx *s_ctx = (struct session_ctx *)ctx;
session_ctx_free(s_ctx);
}
static void handle_policy_mutil_hits(struct policy_enforcer *enforcer, struct session_ctx *session_ctx, struct ctrl_pkt_parser *ctrl_parser, raw_pkt_parser *raw_parser, int is_e2i_dir)
{
for (int i = 0; i < ctrl_parser->rule_id_num; i++)
{
uint64_t rule_id = ctrl_parser->rule_ids[i];
if (fixed_num_array_exist_elem(&session_ctx->rule_ids, rule_id))
{
continue;
}
else
{
policy_enforce_select_chainings(enforcer, &session_ctx->chainings, session_ctx, raw_parser, rule_id, is_e2i_dir);
selected_chaining_bref(session_ctx->chainings.chaining_raw);
selected_chaining_bref(session_ctx->chainings.chaining_decrypted);
fixed_num_array_add_elem(&session_ctx->rule_ids, rule_id);
}
}
}
static void handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct policy_enforcer *enforcer = thread_ctx->ref_enforcer;
struct session_table *session_table = thread_ctx->session_table;
int chaining_size = policy_enforce_chaining_size(enforcer);
#if 0
if (session_table_search_by_id(session_table, meta->session_id))
{
return ;
}
#endif
struct raw_pkt_parser raw_parser;
struct addr_tuple4 inner_tuple4;
raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8);
const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_tuple4);
uint16_t real_offset = (char *)payload - meta->raw_data;
if (real_offset != meta->l7offset)
{
char *addr_str = addr_tuple4_to_str(&inner_tuple4);
LOG_ERROR("%s: incorrect dataoffset %d in the control zone of session %lu %s, the expect value is %d", LOG_TAG_PKTIO, meta->l7offset, meta->session_id, addr_str, real_offset);
free(addr_str);
}
struct session_ctx *session_ctx = session_ctx_new();
session_ctx->session_id = meta->session_id;
session_ctx->session_addr = addr_tuple4_to_str(&inner_tuple4);
addr_tuple4_copy(&session_ctx->inner_tuple4, &inner_tuple4);
metadata_deep_copy(session_ctx->ctrl_meta, meta);
session_ctx->chainings.chaining_raw = selected_chaining_create(chaining_size, session_ctx->session_id, session_ctx->session_addr);
session_ctx->chainings.chaining_decrypted = selected_chaining_create(chaining_size, session_ctx->session_id, session_ctx->session_addr);
session_ctx->ref_thread_ctx = thread_ctx;
LOG_INFO("%s: session %lu %s active first", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr);
handle_policy_mutil_hits(enforcer, session_ctx, ctrl_parser, &raw_parser, meta->is_e2i_dir);
send_event_log(session_ctx, thread_ctx);
session_table_insert(session_table, session_ctx->session_id, &session_ctx->inner_tuple4, session_ctx, session_value_free_cb);
ATOMIC_INC(&(g_metrics->sf_session.num));
}
static void handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct session_table *session_table = thread_ctx->session_table;
struct session_node *node = session_table_search_by_id(session_table, meta->session_id);
if (node)
{
struct session_ctx *s_ctx = (struct session_ctx *)node->value;
LOG_INFO("%s: session %lu %s closing", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->session_addr);
struct selected_chaining *chaining_raw = s_ctx->chainings.chaining_raw;
dump_sf_metrics(s_ctx, chaining_raw, "raw_traffic");
struct selected_chaining *chaining_decrypted = s_ctx->chainings.chaining_decrypted;
dump_sf_metrics(s_ctx, chaining_decrypted, "decrypted_traffic");
session_table_delete_by_id(session_table, meta->session_id);
ATOMIC_DEC(&(g_metrics->sf_session.num));
}
}
static void handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct session_table *session_table = thread_ctx->session_table;
struct policy_enforcer *enforcer = thread_ctx->ref_enforcer;
struct session_node *node = session_table_search_by_id(session_table, meta->session_id);
if (node)
{
struct session_ctx *session_ctx = (struct session_ctx *)node->value;
struct raw_pkt_parser raw_parser;
raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8);
const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len);
uint16_t real_offset = (char *)payload - meta->raw_data;
if (real_offset != meta->l7offset)
{
LOG_ERROR("%s: incorrect dataoffset %d in the control zone of session %lu %s, the expect value is %d", LOG_TAG_PKTIO, meta->l7offset, meta->session_id, session_ctx->session_addr, real_offset);
}
LOG_INFO("%s: session %lu %s active again", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr);
handle_policy_mutil_hits(enforcer, session_ctx, ctrl_parser, &raw_parser, meta->is_e2i_dir);
send_event_log(session_ctx, thread_ctx);
}
else
{
handle_session_opening(meta, ctrl_parser, thread_ctx);
}
}
static void handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx;
LOG_ERROR("%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
ATOMIC_ZERO(&(g_metrics->sf_session.num));
for (int i = 0; i < sce_ctx->nr_worker_threads; i++)
{
struct thread_ctx *temp_ctx = &sce_ctx->work_threads[i];
ATOMIC_INC(&temp_ctx->session_table_need_reset);
}
}
/******************************************************************************
* handle control/raw/inject packet
******************************************************************************/
static void handle_control_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct metadata meta;
struct ctrl_pkt_parser ctrl_parser;
if (mbuff_get_metadata(rx_buff, &meta) == -1)
{
LOG_ERROR("%s: unexpected control packet, unable to get metadata", LOG_TAG_PKTIO);
goto error_ctrl_pkt;
}
ctrl_packet_parser_init(&ctrl_parser);
if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7offset, meta.raw_len - meta.l7offset) == -1)
{
LOG_ERROR("%s: unexpected control packet, unable to parse data", LOG_TAG_PKTIO);
goto error_ctrl_pkt;
}
if (ctrl_parser.session_id != meta.session_id)
{
LOG_ERROR("%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id);
goto error_ctrl_pkt;
}
switch (ctrl_parser.state)
{
case SESSION_STATE_OPENING:
ATOMIC_INC(&(g_metrics->ctrl_pkt.opening));
// when session opening, firewall not send policy id
// return handle_session_opening(&meta, &ctrl_parser, ctx);
break;
case SESSION_STATE_CLOSING:
ATOMIC_INC(&(g_metrics->ctrl_pkt.closing));
handle_session_closing(&meta, &ctrl_parser, thread_ctx);
break;
case SESSION_STATE_ACTIVE:
ATOMIC_INC(&(g_metrics->ctrl_pkt.active));
handle_session_active(&meta, &ctrl_parser, thread_ctx);
break;
case SESSION_STATE_RESETALL:
ATOMIC_INC(&(g_metrics->ctrl_pkt.resetall));
handle_session_resetall(&meta, &ctrl_parser, thread_ctx);
break;
default:
goto error_ctrl_pkt;
}
return;
error_ctrl_pkt:
ATOMIC_INC(&(g_metrics->ctrl_pkt.error));
return;
}
static void handle_raw_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx)
{
struct session_table *session_table = thread_ctx->session_table;
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct metadata meta;
struct session_ctx *session_ctx = NULL;
struct selected_chaining *chaining = NULL;
if (mbuff_get_metadata(rx_buff, &meta) == -1)
{
LOG_ERROR("%s: unexpected raw packet, unable to get metadata, bypass !!!", LOG_TAG_PKTIO);
goto error_bypass;
}
// bypass_traffic:0 disable
// bypass_traffic:1 bypass all traffic
// bypass_traffic:2 bypass raw traffic
// bypass_traffic:3 bypass decrypted traffic
if (unlikely(thread_ctx->ref_io->config.bypass_traffic == 2 && meta.is_decrypted == 0))
{
LOG_DEBUG("%s: session %lu bypass, enable raw traffic bypass !!!", LOG_TAG_PKTIO, meta.session_id);
goto error_bypass;
}
if (unlikely(thread_ctx->ref_io->config.bypass_traffic == 3 && meta.is_decrypted == 1))
{
LOG_DEBUG("%s: session %lu bypass, enable decrypted traffic bypass !!!", LOG_TAG_PKTIO, meta.session_id);
goto error_bypass;
}
session_ctx = raw_packet_search_session(session_table, meta.raw_data, meta.raw_len, meta.session_id);
if (session_ctx == NULL)
{
throughput_metrics_inc(&(g_metrics->raw_pkt.miss_sess), 1, meta.raw_len);
goto error_bypass;
}
update_session_by_metadata(session_ctx, &meta);
if (meta.is_decrypted == 1)
{
chaining = session_ctx->chainings.chaining_decrypted;
}
else
2023-03-10 15:12:04 +08:00
{
chaining = session_ctx->chainings.chaining_raw;
2023-03-10 15:12:04 +08:00
}
if (chaining == NULL)
2023-03-10 15:12:04 +08:00
{
LOG_ERROR("%s: unexpected raw packet, session %lu %s misses policy, bypass !!!", LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr);
goto error_bypass;
2023-03-10 15:12:04 +08:00
}
action_sf_chaining(thread_ctx, session_ctx, chaining, rx_buff, &meta, 0);
return;
error_bypass:
action_err_bypass(rx_buff, &meta, NULL, thread_ctx);
}
static void handle_inject_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx)
{
struct session_table *session_table = thread_ctx->session_table;
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct metadata meta;
struct g_vxlan *g_vxlan_hdr = NULL;
struct session_ctx *session_ctx = NULL;
struct selected_chaining *chaining = NULL;
int sf_index = 0;
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (g_vxlan_decode(&g_vxlan_hdr, raw_data, raw_len) == -1)
{
goto error_block;
}
memset(&meta, 0, sizeof(struct metadata));
meta.raw_data = (char *)g_vxlan_hdr + sizeof(struct g_vxlan);
meta.raw_len = raw_len - sizeof(struct ethhdr) - sizeof(struct ip) - sizeof(struct udp_hdr) - sizeof(struct g_vxlan);
meta.l7offset = 0;
meta.is_e2i_dir = g_vxlan_get_packet_dir(g_vxlan_hdr);
meta.is_ctrl_pkt = 0;
meta.is_decrypted = g_vxlan_get_traffic_type(g_vxlan_hdr);
sf_index = g_vxlan_get_sf_index(g_vxlan_hdr);
session_ctx = inject_packet_search_session(session_table, meta.raw_data, meta.raw_len);
if (session_ctx == NULL)
{
goto error_block;
}
update_metadata_by_session(session_ctx, &meta);
if (meta.is_decrypted == 1)
{
chaining = session_ctx->chainings.chaining_decrypted;
}
else
{
chaining = session_ctx->chainings.chaining_raw;
}
if (chaining == NULL || sf_index < 0 || sf_index >= chaining->chaining_used)
{
LOG_ERROR("%s: unexpected inject packet, session %lu %s misses chaining index, drop !!!",
LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr);
goto error_block;
}
if (chaining->chaining[sf_index].sff_forward_type == FORWARD_TYPE_MIRRORING)
{
LOG_DEBUG("%s: unexpected inject packet, session %lu %s with sf_profile_id %d executes mirror and does not require reflow, drop !!!",
LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr, chaining->chaining[sf_index].sf_profile_id);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_rx_drop), 1, meta.raw_len);
goto error_block;
}
else
{
struct selected_sf *sf = &(chaining->chaining[sf_index]);
throughput_metrics_inc(&sf->rx, 1, raw_len);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_rx), 1, meta.raw_len);
sf_metrics_inc(thread_ctx->sf_metrics, sf->rule_vsys_id, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, 1, raw_len, 0, 0);
}
marsio_buff_adj(rx_buff, raw_len - meta.raw_len);
action_sf_chaining(thread_ctx, session_ctx, chaining, rx_buff, &meta, sf_index + 1);
return;
error_block:
throughput_metrics_inc(&(g_metrics->device.endpoint_drop), 1, raw_len);
marsio_buff_adj(rx_buff, raw_len - meta.raw_len);
action_err_block(rx_buff, &meta, NULL, thread_ctx);
}
/******************************************************************************
* packet io
******************************************************************************/
// return 0 : success
// return -1 : error
static int packet_io_config(const char *profile, struct config *config)
{
// bypass_traffic:0 disable
// bypass_traffic:1 bypass all traffic
// bypass_traffic:2 bypass raw traffic
// bypass_traffic:3 bypass decrypted traffic
MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_traffic", (int *)&(config->bypass_traffic), 0);
MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint", config->dev_endpoint, sizeof(config->dev_endpoint));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint_src_ip", config->dev_endpoint_src_ip, sizeof(config->dev_endpoint_src_ip));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint_src_mac", config->dev_endpoint_src_mac, sizeof(config->dev_endpoint_src_mac));
if (config->rx_burst_max > RX_BURST_MAX)
{
LOG_ERROR("%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX);
return -1;
}
if (strlen(config->app_symbol) == 0)
{
LOG_ERROR("%s: invalid app_symbol in %s", LOG_TAG_PKTIO, profile);
return -1;
}
if (strlen(config->dev_endpoint) == 0)
{
LOG_ERROR("%s: invalid dev_endpoint in %s", LOG_TAG_PKTIO, profile);
return -1;
}
if (strlen(config->dev_nf_interface) == 0)
{
LOG_ERROR("%s: invalid dev_nf_interface in %s", LOG_TAG_PKTIO, profile);
return -1;
}
LOG_DEBUG("%s: PACKET_IO->bypass_traffic : %d", LOG_TAG_PKTIO, config->bypass_traffic);
LOG_DEBUG("%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max);
LOG_DEBUG("%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol);
LOG_DEBUG("%s: PACKET_IO->dev_endpoint : %s", LOG_TAG_PKTIO, config->dev_endpoint);
LOG_DEBUG("%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface);
LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_ip : %s", LOG_TAG_PKTIO, config->dev_endpoint_src_ip);
if (strlen(config->dev_endpoint_src_mac))
{
LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_mac : %s (get from configuration file)", LOG_TAG_PKTIO, config->dev_endpoint_src_mac);
}
return 0;
}
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask)
{
int opt = 1;
struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io));
assert(handle != NULL);
handle->thread_num = thread_num;
if (packet_io_config(profile, &(handle->config)) != 0)
{
goto error_out;
}
handle->instance = marsio_create();
if (handle->instance == NULL)
{
LOG_ERROR("%s: unable to create marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, coremask, sizeof(cpu_set_t)) != 0)
{
LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0)
{
LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_init(handle->instance, handle->config.app_symbol) != 0)
{
LOG_ERROR("%s: unable to initialize marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
handle->dev_nf_interface.mr_dev = marsio_open_device(handle->instance, handle->config.dev_nf_interface, handle->thread_num, handle->thread_num);
if (handle->dev_nf_interface.mr_dev == NULL)
{
LOG_ERROR("%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface);
goto error_out;
}
handle->dev_nf_interface.mr_path = marsio_sendpath_create_by_vdev(handle->dev_nf_interface.mr_dev);
if (handle->dev_nf_interface.mr_path == NULL)
{
LOG_ERROR("%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface);
goto error_out;
}
handle->dev_endpoint.mr_dev = marsio_open_device(handle->instance, handle->config.dev_endpoint, handle->thread_num, handle->thread_num);
if (handle->dev_endpoint.mr_dev == NULL)
{
LOG_ERROR("%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_endpoint);
goto error_out;
}
handle->dev_endpoint.mr_path = marsio_sendpath_create_by_vdev(handle->dev_endpoint.mr_dev);
if (handle->dev_endpoint.mr_path == NULL)
{
LOG_ERROR("%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_endpoint);
goto error_out;
}
if (strlen(handle->config.dev_endpoint_src_mac) == 0)
{
marsio_get_device_ether_addr(handle->dev_endpoint.mr_dev, handle->config.dev_endpoint_src_mac, sizeof(handle->config.dev_endpoint_src_mac));
LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_mac : %s (get from marsio api)", LOG_TAG_PKTIO, handle->config.dev_endpoint_src_mac);
}
return handle;
error_out:
packet_io_destory(handle);
return NULL;
}
void packet_io_destory(struct packet_io *handle)
{
if (handle)
{
if (handle->dev_nf_interface.mr_path)
{
marsio_sendpath_destory(handle->dev_nf_interface.mr_path);
handle->dev_nf_interface.mr_path = NULL;
}
if (handle->dev_nf_interface.mr_dev)
{
marsio_close_device(handle->dev_nf_interface.mr_dev);
handle->dev_nf_interface.mr_dev = NULL;
}
if (handle->dev_endpoint.mr_path)
{
marsio_sendpath_destory(handle->dev_endpoint.mr_path);
handle->dev_endpoint.mr_path = NULL;
}
if (handle->dev_endpoint.mr_dev)
{
marsio_close_device(handle->dev_endpoint.mr_dev);
handle->dev_endpoint.mr_dev = NULL;
}
if (handle->instance)
{
marsio_destory(handle->instance);
handle->instance = NULL;
}
free(handle);
handle = NULL;
}
}
int packet_io_thread_init(struct packet_io *handle, struct thread_ctx *thread_ctx)
{
if (marsio_thread_init(handle->instance) != 0)
{
LOG_ERROR("%s: unable to init marsio thread %d", LOG_TAG_PKTIO, thread_ctx->thread_index);
return -1;
}
return 0;
}
void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_ctx, int timeout_ms)
{
struct mr_vdev *vdevs[] = {
handle->dev_nf_interface.mr_dev,
handle->dev_endpoint.mr_dev};
marsio_poll_wait(handle->instance, vdevs, 2, thread_ctx->thread_index, timeout_ms);
}
int packet_io_thread_polling_nf(struct packet_io *handle, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int thread_index = thread_ctx->thread_index;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
int nr_recv = marsio_recv_burst(handle->dev_nf_interface.mr_dev, thread_index, rx_buffs, handle->config.rx_burst_max);
if (nr_recv <= 0)
{
return 0;
}
if (handle->config.bypass_traffic == 1)
{
for (int j = 0; j < nr_recv; j++)
{
int raw_len = marsio_buff_datalen(rx_buffs[j]);
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, rx_buffs, nr_recv);
return nr_recv;
}
for (int j = 0; j < nr_recv; j++)
{
marsio_buff_t *rx_buff = rx_buffs[j];
int raw_len = marsio_buff_datalen(rx_buff);
if (is_downlink_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.downlink_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.downlink_tx), 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, &rx_buff, 1);
}
else if (marsio_buff_is_ctrlbuf(rx_buff))
{
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->ctrl_pkt.rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, raw_len);
handle_control_packet(rx_buff, thread_ctx);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, &rx_buff, 1);
}
else
{
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
handle_raw_packet(rx_buff, thread_ctx);
}
}
return nr_recv;
}
int packet_io_thread_polling_endpoint(struct packet_io *handle, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
int thread_index = thread_ctx->thread_index;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
int nr_recv = marsio_recv_burst(handle->dev_endpoint.mr_dev, thread_index, rx_buffs, handle->config.rx_burst_max);
if (nr_recv <= 0)
{
return 0;
}
if (handle->config.bypass_traffic == 1)
{
for (int j = 0; j < nr_recv; j++)
{
int raw_len = marsio_buff_datalen(rx_buffs[j]);
throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, raw_len);
}
marsio_send_burst(handle->dev_endpoint.mr_path, thread_index, rx_buffs, nr_recv);
return nr_recv;
}
for (int j = 0; j < nr_recv; j++)
{
marsio_buff_t *rx_buff = rx_buffs[j];
int raw_len = marsio_buff_datalen(rx_buff);
if (is_uplink_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.uplink_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.uplink_tx_drop), 1, raw_len);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_index);
}
else
{
throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len);
handle_inject_packet(rx_buff, thread_ctx);
}
}
return nr_recv;
}
struct mr_instance *packet_io_get_mr_instance(struct packet_io *handle)
{
if (handle)
{
return handle->instance;
}
else
{
return NULL;
}
}