This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tsg-service-chaining-…/platform/src/packet_io.cpp

1465 lines
56 KiB
C++
Raw Normal View History

#include <assert.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <netinet/ether.h>
#include <marsio.h>
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
#include "log.h"
#include "sce.h"
#include "utils.h"
#include "g_vxlan.h"
#include "sf_metrics.h"
#include "ctrl_packet.h"
#include "global_metrics.h"
/*
* add: vxlan_hdr
* del: marsio_buff_ctrlzone_reset()
* +----+ NF2SF +----+
* | |--------------------------->| |
* | | | |
* | |-------+ | |-------+
* | NF | | NF2NF (undo) | SF | | SF2SF (del old vxlan_hdr; add new vxlan_hdr)
* | |<------+ | |<------+
* | | | |
* | |<---------------------------| |
* | | SF2NF | |
* +---+ del: vxlan_hdr +----+
* add: session_id + route_ctx + sid
*/
/******************************************************************************
* Struct
******************************************************************************/
#define RX_BURST_MAX 128
struct config
{
int bypass_all_traffic;
int rx_burst_max;
char app_symbol[256];
char dev_endpoint[256];
char dev_nf_interface[256];
char dev_endpoint_src_ip[16];
char dev_endpoint_src_mac[32];
};
struct device
{
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
};
struct packet_io
{
int thread_num;
struct mr_instance *instance;
struct device dev_nf_interface;
struct device dev_endpoint;
struct config config;
};
enum raw_pkt_action
{
RAW_PKT_ERR_BYPASS,
RAW_PKT_HIT_BYPASS,
RAW_PKT_HIT_BLOCK,
2023-03-10 15:12:04 +08:00
RAW_PKT_HIT_STEERING,
RAW_PKT_HIT_MIRRORING,
};
enum inject_pkt_action
{
INJT_PKT_ERR_DROP,
INJT_PKT_MIRR_RX_DROP,
INJT_PKT_HIT_BLOCK,
INJT_PKT_HIT_FWD2SF, // forward to service function
INJT_PKT_HIT_FWD2NF, // forward to network function
};
struct metadata
{
uint64_t session_id;
char *raw_data;
int raw_len;
int dir_is_e2i;
int is_ctrl_pkt;
uint16_t l7_offset; // only control packet set l7_offset
int traffic_is_decrypted; // only raw packet set traffic_is_decrypted
struct sids sids;
struct route_ctx route_ctx;
};
/******************************************************************************
* API Declaration
******************************************************************************/
struct packet_io *packet_io_create(const char *profile, int thread_num);
void packet_io_destory(struct packet_io *handle);
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx);
int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *ctx);
// return 0 : success
// return -1 : error
static int packet_io_config(const char *profile, struct config *config);
// return 0 : success
// return -1 : error
static int packet_io_get_metadata(marsio_buff_t *tx_buff, struct metadata *meta);
// return 0 : success
// return -1 : error
static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta);
static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *meta);
// return 0 : success
// return -1 : error
static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx);
// return : RAW_PKT_ERR_BYPASS
// return : RAW_PKT_HIT_BYPASS
// return : RAW_PKT_HIT_BLOCK
2023-03-10 15:12:04 +08:00
// reutrn : RAW_PKT_HIT_STEERING
// return : RAW_PKT_HIT_MIRRORING
static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes);
// return : INJT_PKT_ERR_DROP
// return : INJT_PKT_MIRR_RX_DROP
// return : INJT_PKT_HIT_BLOCK
// return : INJT_PKT_HIT_FWD2SF
// return : INJT_PKT_HIT_FWD2NF
static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes);
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int forward_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx);
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int mirror_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx);
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int forward_packet_to_nf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, int thread_seq, void *ct);
// return 0 : success
// return -1 : error
static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx);
// return 0 : success
// return -1 : error
static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx);
// return 0 : success
// return -1 : error
static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx);
// return 0 : success
// return -1 : error
static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx);
static void session_value_free_cb(void *ctx);
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff);
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff);
/******************************************************************************
* API Definition
******************************************************************************/
struct packet_io *packet_io_create(const char *profile, int thread_num)
{
int opt = 1;
struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io));
assert(handle != NULL);
handle->thread_num = thread_num;
if (packet_io_config(profile, &(handle->config)) != 0)
{
goto error_out;
}
handle->instance = marsio_create();
if (handle->instance == NULL)
{
LOG_ERROR("%s: unable to create marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0)
{
LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_init(handle->instance, handle->config.app_symbol) != 0)
{
LOG_ERROR("%s: unable to initialize marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
// Netwrok Function Interface
handle->dev_nf_interface.mr_dev = marsio_open_device(handle->instance, handle->config.dev_nf_interface, handle->thread_num, handle->thread_num);
if (handle->dev_nf_interface.mr_dev == NULL)
{
LOG_ERROR("%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface);
goto error_out;
}
handle->dev_nf_interface.mr_path = marsio_sendpath_create_by_vdev(handle->dev_nf_interface.mr_dev);
if (handle->dev_nf_interface.mr_path == NULL)
{
LOG_ERROR("%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_nf_interface);
goto error_out;
}
// EndPoint Interface
handle->dev_endpoint.mr_dev = marsio_open_device(handle->instance, handle->config.dev_endpoint, handle->thread_num, handle->thread_num);
if (handle->dev_endpoint.mr_dev == NULL)
{
LOG_ERROR("%s: unable to open device %s", LOG_TAG_PKTIO, handle->config.dev_endpoint);
goto error_out;
}
handle->dev_endpoint.mr_path = marsio_sendpath_create_by_vdev(handle->dev_endpoint.mr_dev);
if (handle->dev_endpoint.mr_path == NULL)
{
LOG_ERROR("%s: unable to create sendpath for device %s", LOG_TAG_PKTIO, handle->config.dev_endpoint);
goto error_out;
}
if (strlen(handle->config.dev_endpoint_src_mac) == 0)
{
marsio_get_device_ether_addr(handle->dev_endpoint.mr_dev, handle->config.dev_endpoint_src_mac, sizeof(handle->config.dev_endpoint_src_mac));
LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_mac : %s (get from marsio api)", LOG_TAG_PKTIO, handle->config.dev_endpoint_src_mac);
}
return handle;
error_out:
packet_io_destory(handle);
return NULL;
}
void packet_io_destory(struct packet_io *handle)
{
if (handle)
{
if (handle->dev_nf_interface.mr_path)
{
marsio_sendpath_destory(handle->dev_nf_interface.mr_path);
handle->dev_nf_interface.mr_path = NULL;
}
if (handle->dev_nf_interface.mr_dev)
{
marsio_close_device(handle->dev_nf_interface.mr_dev);
handle->dev_nf_interface.mr_dev = NULL;
}
if (handle->dev_endpoint.mr_path)
{
marsio_sendpath_destory(handle->dev_endpoint.mr_path);
handle->dev_endpoint.mr_path = NULL;
}
if (handle->dev_endpoint.mr_dev)
{
marsio_close_device(handle->dev_endpoint.mr_dev);
handle->dev_endpoint.mr_dev = NULL;
}
if (handle->instance)
{
marsio_destory(handle->instance);
handle->instance = NULL;
}
free(handle);
handle = NULL;
}
}
// return n_packet_recv
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
// nr_recv <= rx_burst_max <= RX_BURST_MAX
int nr_recv = marsio_recv_burst(handle->dev_nf_interface.mr_dev, thread_seq, rx_buffs, handle->config.rx_burst_max);
if (nr_recv <= 0)
{
return 0;
}
if (handle->config.bypass_all_traffic == 1)
{
for (int j = 0; j < nr_recv; j++)
{
if (!marsio_buff_is_ctrlbuf(rx_buffs[j]))
{
int raw_len = marsio_buff_datalen(rx_buffs[j]);
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, raw_len);
}
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, rx_buffs, nr_recv);
return nr_recv;
}
for (int j = 0; j < nr_recv; j++)
{
marsio_buff_t *rx_buff = rx_buffs[j];
int raw_len = marsio_buff_datalen(rx_buff);
if (is_downstream_keepalive_packet(rx_buff))
{
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->downlink_keepalive_pkt_rx, 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
continue;
}
if (marsio_buff_is_ctrlbuf(rx_buff))
{
handle_control_packet(handle, rx_buff, thread_seq, ctx);
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->ctrl_pkt_rx, 1, raw_len);
// all control packet need bypass
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
}
else
{
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->raw_pkt_rx, 1, raw_len);
int action_bytes = 0;
enum raw_pkt_action action = handle_raw_packet(handle, rx_buff, thread_seq, ctx, &action_bytes);
assert(action_bytes > 0);
switch (action)
{
case RAW_PKT_ERR_BYPASS:
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->raw_pkt_err_bypass, 1, action_bytes);
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, raw_len);
break;
case RAW_PKT_HIT_BYPASS:
throughput_metrics_inc(&g_metrics->hit_bypass_policy, 1, action_bytes);
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, raw_len);
break;
case RAW_PKT_HIT_BLOCK:
throughput_metrics_inc(&g_metrics->hit_block_policy, 1, action_bytes);
break;
2023-03-10 15:12:04 +08:00
case RAW_PKT_HIT_STEERING:
throughput_metrics_inc(&g_metrics->steering_tx, 1, action_bytes);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, action_bytes);
break;
2023-03-10 15:12:04 +08:00
case RAW_PKT_HIT_MIRRORING:
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, action_bytes);
break;
}
}
}
return nr_recv;
}
// return n_packet_recv
int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
// nr_recv <= rx_burst_max <= RX_BURST_MAX
int nr_recv = marsio_recv_burst(handle->dev_endpoint.mr_dev, thread_seq, rx_buffs, handle->config.rx_burst_max);
if (nr_recv <= 0)
{
return 0;
}
if (handle->config.bypass_all_traffic == 1)
{
for (int j = 0; j < nr_recv; j++)
{
int raw_len = marsio_buff_datalen(rx_buffs[j]);
throughput_metrics_inc(&g_metrics->dev_endpoint_rx, 1, raw_len);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, raw_len);
}
marsio_send_burst(handle->dev_endpoint.mr_path, thread_seq, rx_buffs, nr_recv);
return nr_recv;
}
for (int j = 0; j < nr_recv; j++)
{
marsio_buff_t *rx_buff = rx_buffs[j];
int data_len = marsio_buff_datalen(rx_buff);
if (is_upstream_keepalive_packet(rx_buff))
{
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->uplink_keepalive_pkt_rx, 1, data_len);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
continue;
}
throughput_metrics_inc(&g_metrics->dev_endpoint_rx, 1, data_len);
int action_bytes = 0;
enum inject_pkt_action action = handle_inject_packet(handle, rx_buff, thread_seq, ctx, &action_bytes);
assert(action_bytes > 0);
switch (action)
{
case INJT_PKT_ERR_DROP:
throughput_metrics_inc(&g_metrics->dev_endpoint_err_drop, 1, action_bytes);
break;
case INJT_PKT_MIRR_RX_DROP:
throughput_metrics_inc(&g_metrics->mirroring_rx_drop, 1, data_len); // use data_len
break;
case INJT_PKT_HIT_BLOCK:
throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len
throughput_metrics_inc(&g_metrics->hit_block_policy, 1, action_bytes);
break;
case INJT_PKT_HIT_FWD2SF: // forward to next service function
throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len
throughput_metrics_inc(&g_metrics->steering_tx, 1, action_bytes); // use action_bytes
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, action_bytes);
break;
case INJT_PKT_HIT_FWD2NF: // forward to network function
throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len
2023-03-02 11:56:44 +08:00
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, action_bytes);
break;
}
}
return nr_recv;
}
// return 0 : success
// return -1 : error
static int packet_io_config(const char *profile, struct config *config)
{
MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_all_traffic", (int *)&(config->bypass_all_traffic), 0);
MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1);
MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint", config->dev_endpoint, sizeof(config->dev_endpoint));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_nf_interface", config->dev_nf_interface, sizeof(config->dev_nf_interface));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint_src_ip", config->dev_endpoint_src_ip, sizeof(config->dev_endpoint_src_ip));
MESA_load_profile_string_nodef(profile, "PACKET_IO", "dev_endpoint_src_mac", config->dev_endpoint_src_mac, sizeof(config->dev_endpoint_src_mac));
if (config->rx_burst_max > RX_BURST_MAX)
{
LOG_ERROR("%s: invalid rx_burst_max, exceeds limit %d", LOG_TAG_PKTIO, RX_BURST_MAX);
return -1;
}
if (strlen(config->app_symbol) == 0)
{
LOG_ERROR("%s: invalid app_symbol in %s", LOG_TAG_PKTIO, profile);
return -1;
}
if (strlen(config->dev_endpoint) == 0)
{
LOG_ERROR("%s: invalid dev_endpoint in %s", LOG_TAG_PKTIO, profile);
return -1;
}
if (strlen(config->dev_nf_interface) == 0)
{
LOG_ERROR("%s: invalid dev_nf_interface in %s", LOG_TAG_PKTIO, profile);
return -1;
}
LOG_DEBUG("%s: PACKET_IO->bypass_all_traffic : %d", LOG_TAG_PKTIO, config->bypass_all_traffic);
LOG_DEBUG("%s: PACKET_IO->rx_burst_max : %d", LOG_TAG_PKTIO, config->rx_burst_max);
LOG_DEBUG("%s: PACKET_IO->app_symbol : %s", LOG_TAG_PKTIO, config->app_symbol);
LOG_DEBUG("%s: PACKET_IO->dev_endpoint : %s", LOG_TAG_PKTIO, config->dev_endpoint);
LOG_DEBUG("%s: PACKET_IO->dev_nf_interface : %s", LOG_TAG_PKTIO, config->dev_nf_interface);
LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_ip : %s", LOG_TAG_PKTIO, config->dev_endpoint_src_ip);
if (strlen(config->dev_endpoint_src_mac))
{
LOG_DEBUG("%s: PACKET_IO->dev_endpoint_src_mac : %s (get from configuration file)", LOG_TAG_PKTIO, config->dev_endpoint_src_mac);
}
return 0;
}
// return 0 : success
// return -1 : error
static int packet_io_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta)
{
memset(meta, 0, sizeof(struct metadata));
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) <= 0)
{
LOG_ERROR("%s: unable to get session_id from metadata", LOG_TAG_PKTIO);
return -1;
}
meta->raw_len = marsio_buff_datalen(rx_buff);
meta->raw_data = marsio_buff_mtod(rx_buff);
if (meta->raw_data == NULL || meta->raw_len == 0)
{
LOG_ERROR("%s: unable to get raw_data from metadata", LOG_TAG_PKTIO);
return -1;
}
// 1: E2I
// 0: I2E
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_DIR, &(meta->dir_is_e2i), sizeof(meta->dir_is_e2i)) <= 0)
{
LOG_ERROR("%s: unable to get buff_dir from metadata", LOG_TAG_PKTIO);
return -1;
}
if (marsio_buff_is_ctrlbuf(rx_buff))
{
meta->is_ctrl_pkt = 1;
// only control packet set MR_BUFF_PAYLOAD_OFFSET
if (marsio_buff_get_metadata(rx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7_offset), sizeof(meta->l7_offset)) <= 0)
{
LOG_ERROR("%s: unable to get l7_offset from metadata", LOG_TAG_PKTIO);
return -1;
}
}
else
{
meta->is_ctrl_pkt = 0;
// only raw packet set MR_IS_DECRYPTED
// TODO
#if 0
if (marsio_buff_get_metadata(rx_buff, MR_IS_DECRYPTED, &(meta->traffic_is_decrypted), sizeof(meta->traffic_is_decrypted)) <= 0)
{
LOG_ERROR("%s: unable to get traffic_is_decrypted from metadata", LOG_TAG_PKTIO);
return -1;
}
#endif
}
meta->route_ctx.len = marsio_buff_get_metadata(rx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, sizeof(meta->route_ctx.data));
if (meta->route_ctx.len <= 0)
{
LOG_ERROR("%s: unable to get route_ctx from metadata", LOG_TAG_PKTIO);
return -1;
}
meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0]));
if (meta->sids.num < 0)
{
LOG_ERROR("%s: unable to get sid_list from metadata", LOG_TAG_PKTIO);
return -1;
}
return 0;
}
// return 0 : success
// return -1 : error
static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta)
{
if (meta->session_id)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &(meta->session_id), sizeof(meta->session_id)) != 0)
{
LOG_ERROR("%s: unable to set session_id for metadata", LOG_TAG_PKTIO);
return -1;
}
}
// 1: E2I
// 0: I2E
#if 0
// use MR_BUFF_ROUTE_CTX instead
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_DIR, &(meta->dir_is_e2i), sizeof(meta->dir_is_e2i)) != 0)
{
LOG_ERROR("%s: unable to set buff_dir for metadata", LOG_TAG_PKTIO);
return -1;
}
#endif
if (meta->is_ctrl_pkt)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7_offset), sizeof(meta->l7_offset)) != 0)
{
LOG_ERROR("%s: unable to set l7_offset for metadata", LOG_TAG_PKTIO);
return -1;
}
}
else
{
// TODO
#if 0
if (marsio_buff_set_metadata(tx_buff, MR_IS_DECRYPTED, &(meta->traffic_is_decrypted), sizeof(meta->traffic_is_decrypted)) != 0)
{
LOG_ERROR("%s: unable to set traffic_is_decrypted for metadata", LOG_TAG_PKTIO);
return -1;
}
#endif
}
if (meta->route_ctx.len > 0)
{
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, meta->route_ctx.data, meta->route_ctx.len) != 0)
{
LOG_ERROR("%s: unable to set route_ctx for metadata", LOG_TAG_PKTIO);
return -1;
}
}
if (meta->sids.num > 0)
{
if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0)
{
LOG_ERROR("%s: unable to set sid_list for metadata", LOG_TAG_PKTIO);
return -1;
}
}
return 0;
}
static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *meta)
{
LOG_DEBUG("%s: META={session_id: %lu, raw_len: %d, dir_is_e2i: %d, is_ctrl_pkt: %d, l7_offset: %d, traffic_is_decrypted: %d, sids_num: %d}", LOG_TAG_PKTIO, meta->session_id, meta->raw_len, meta->dir_is_e2i, meta->is_ctrl_pkt, meta->l7_offset, meta->traffic_is_decrypted, meta->sids.num);
}
// return 0 : success
// return -1 : error
static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct sce_ctx *sce_ctx = thread->ref_sce_ctx;
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta) == -1)
{
LOG_ERROR("%s: unexpected control packet, unable to get metadata", LOG_TAG_PKTIO);
packet_io_dump_metadata(rx_buff, &meta);
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
struct ctrl_pkt_parser ctrl_parser;
ctrl_packet_parser_init(&ctrl_parser);
if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7_offset, meta.raw_len - meta.l7_offset) == -1)
{
LOG_ERROR("%s: unexpected control packet, unable to parse data", LOG_TAG_PKTIO);
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
if (ctrl_parser.session_id != meta.session_id)
{
LOG_ERROR("%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id);
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
if (sce_ctx->enable_debug)
{
LOG_INFO("%s: recv control packet, session %lu %s", LOG_TAG_PKTIO, ctrl_parser.session_id, session_state_to_string(ctrl_parser.state));
}
switch (ctrl_parser.state)
{
case SESSION_STATE_OPENING:
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED);
// when session opening, firewall not send policy id
// return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx);
break;
case SESSION_STATE_CLOSING:
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED);
return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_ACTIVE:
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED);
return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_RESETALL:
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED);
return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx);
default:
2023-03-02 11:56:44 +08:00
__atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
}
return 0;
}
// return : RAW_PKT_ERR_BYPASS
// return : RAW_PKT_HIT_BYPASS
// return : RAW_PKT_HIT_BLOCK
2023-03-10 15:12:04 +08:00
// reutrn : RAW_PKT_HIT_STEERING
// return : RAW_PKT_HIT_MIRRORING
static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes)
{
int nsend = 0;
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct sce_ctx *sce_ctx = thread->ref_sce_ctx;
int raw_len = marsio_buff_datalen(rx_buff);
*action_bytes = 0;
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta) == -1)
{
LOG_ERROR("%s: unexpected raw packet, unable to get metadata, bypass !!!", LOG_TAG_PKTIO);
packet_io_dump_metadata(rx_buff, &meta);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id);
if (node == NULL)
{
LOG_ERROR("%s: unexpected raw packet, unable to find session %lu from session table, bypass !!!", LOG_TAG_PKTIO, meta.session_id);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
if (sce_ctx->enable_debug)
{
struct addr_tuple4 inner_addr;
struct addr_tuple4 reverse_addr;
struct raw_pkt_parser raw_parser;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
memset(&reverse_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)meta.raw_data, meta.raw_len);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr);
addr_tuple4_reverse(&inner_addr, &reverse_addr);
if (memcmp(&s_ctx->first_ctrl_pkt.tuple4, &inner_addr, sizeof(struct addr_tuple4)) != 0 && memcmp(&s_ctx->first_ctrl_pkt.tuple4, &reverse_addr, sizeof(struct addr_tuple4)) != 0)
{
char *addr_str = addr_tuple4_to_str(&inner_addr);
LOG_ERROR("%s: unexpected raw packet, session %lu expected address tuple4 to be %s, but now the packet's tuple4 is %s, bypass !!!", LOG_TAG_PKTIO, meta.session_id, s_ctx->first_ctrl_pkt.addr_string, addr_str);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
free(addr_str);
return RAW_PKT_ERR_BYPASS;
}
}
// update sids
if (meta.dir_is_e2i)
{
sids_write_once(&(s_ctx->raw_pkt_e2i_sids), &(meta.sids));
if (route_ctx_is_empty(&s_ctx->raw_pkt_e2i_route_ctx))
{
route_ctx_copy(&s_ctx->raw_pkt_e2i_route_ctx, &meta.route_ctx);
}
}
else
{
sids_write_once(&(s_ctx->raw_pkt_i2e_sids), &(meta.sids));
if (route_ctx_is_empty(&s_ctx->raw_pkt_i2e_route_ctx))
{
route_ctx_copy(&s_ctx->raw_pkt_i2e_route_ctx, &meta.route_ctx);
}
}
// search chaining
struct selected_chaining *chaining = s_ctx->chaining;
if (chaining == NULL)
{
LOG_ERROR("%s: unexpected raw packet, session %lu %s misses policy, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
2023-03-10 15:12:04 +08:00
int last_sf_is_action_bypass = 1;
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node = &(chaining->chaining[i]);
LOG_INFO("%s: session %lu %s execute policy: %d -> sff_profile_id %d -> sf_profile_id %d -> sf_need_skip %d sf_action_reason : %s",
LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, node->policy_id, node->sff_profile_id, node->sf_profile_id, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason));
if (node->sf_need_skip)
{
continue;
}
switch (node->sf_action)
{
case SESSION_ACTION_BYPASS:
// BYPASS CURRENT SF
2023-03-10 15:12:04 +08:00
last_sf_is_action_bypass = 1;
continue;
case SESSION_ACTION_BLOCK:
if (node->sff_forward_type == FORWARD_TYPE_STEERING)
{
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
}
else
{
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, raw_len);
}
*action_bytes = raw_len;
return RAW_PKT_HIT_BLOCK;
case SESSION_ACTION_FORWARD:
if (node->sf_connectivity.method != PACKAGE_METHOD_VXLAN_G)
{
LOG_ERROR("%s: processing raw packets, session %lu %s requires encapsulation format not supported, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
if (node->sff_forward_type == FORWARD_TYPE_STEERING)
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
sf_metrics_inc(thread->sf_metrics, node->policy_id, node->sff_profile_id, node->sf_profile_id, 0, 0, 1, nsend);
throughput_metrics_inc(&node->tx, 1, nsend);
*action_bytes = nsend;
2023-03-10 15:12:04 +08:00
return RAW_PKT_HIT_STEERING;
}
else
{
LOG_ERROR("%s: processing raw packet, session %lu %s forwarding packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
}
else
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = mirror_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
sf_metrics_inc(thread->sf_metrics, node->policy_id, node->sff_profile_id, node->sf_profile_id, 0, 0, 1, nsend);
throughput_metrics_inc(&node->tx, 1, nsend);
throughput_metrics_inc(&g_metrics->mirroring_tx, 1, nsend);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, nsend);
2023-03-10 15:12:04 +08:00
last_sf_is_action_bypass = 0;
continue;
}
else
{
LOG_ERROR("%s: processing raw packet, session %lu %s mirroring packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
}
default:
2023-03-10 15:12:04 +08:00
last_sf_is_action_bypass = 1;
continue;
}
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
2023-03-10 15:12:04 +08:00
if (last_sf_is_action_bypass)
{
// BYPASS ALL SF or LAST SF IS MIRRORING
return RAW_PKT_HIT_BYPASS;
}
else
{
return RAW_PKT_HIT_MIRRORING;
}
}
// return : INJT_PKT_ERR_DROP
// return : INJT_PKT_MIRR_RX_DROP
// return : INJT_PKT_HIT_BLOCK
// return : INJT_PKT_HIT_FWD2SF
// return : INJT_PKT_HIT_FWD2NF
static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes)
{
int nsend = 0;
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct g_vxlan *g_vxlan_hdr = NULL;
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
*action_bytes = 0;
if (g_vxlan_decode(&g_vxlan_hdr, raw_data, raw_len) == -1)
{
// LOG_ERROR("%s: unexpected inject packet, not a vxlan-encapsulated packet, drop !!!", LOG_TAG_PKTIO);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
struct metadata meta;
memset(&meta, 0, sizeof(struct metadata));
meta.raw_data = (char *)g_vxlan_hdr + sizeof(struct g_vxlan);
meta.raw_len = raw_len - sizeof(struct ethhdr) - sizeof(struct ip) - sizeof(struct udp_hdr) - sizeof(struct g_vxlan);
meta.dir_is_e2i = g_vxlan_get_packet_dir(g_vxlan_hdr);
meta.traffic_is_decrypted = g_vxlan_get_traffic_type(g_vxlan_hdr);
meta.is_ctrl_pkt = 0;
meta.l7_offset = 0;
// meta.session_id set later
// meta.sids set later
int sf_index = g_vxlan_get_sf_index(g_vxlan_hdr);
struct addr_tuple4 inner_addr;
struct raw_pkt_parser raw_parser;
memset(&inner_addr, 0, sizeof(struct addr_tuple4));
raw_packet_parser_init(&raw_parser, 0, LAYER_TYPE_ALL, 8);
raw_packet_parser_parse(&raw_parser, (const void *)meta.raw_data, meta.raw_len);
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &inner_addr);
struct session_node *node = session_table_search_by_addr(thread->session_table, &inner_addr);
if (node == NULL)
{
char *addr_string = addr_tuple4_to_str(&inner_addr);
LOG_ERROR("%s: unexpected inject packet, unable to find session %s from session table, drop !!!", LOG_TAG_PKTIO, addr_string);
free(addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
meta.session_id = s_ctx->session_id;
if (meta.dir_is_e2i)
{
sids_copy(&meta.sids, &s_ctx->raw_pkt_e2i_sids);
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_pkt_e2i_route_ctx);
}
else
{
sids_copy(&meta.sids, &s_ctx->raw_pkt_i2e_sids);
route_ctx_copy(&meta.route_ctx, &s_ctx->raw_pkt_i2e_route_ctx);
}
LOG_DEBUG("%s: session %lu get metadata from inject packet, META={raw_len: %d, dir_is_e2i: %d, traffic_is_decrypted: %d, sf_index: %d}", LOG_TAG_PKTIO, meta.session_id, meta.raw_len, meta.dir_is_e2i, meta.traffic_is_decrypted, sf_index);
struct selected_chaining *chaining = s_ctx->chaining;
if (chaining == NULL || sf_index < 0 || sf_index >= chaining->chaining_used)
{
LOG_ERROR("%s: unexpected inject packet, session %lu %s misses chaining index, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
if (chaining->chaining[sf_index].sff_forward_type == FORWARD_TYPE_MIRRORING)
{
LOG_DEBUG("%s: unexpected inject packet, session %lu %s with sf_profile_id %d executes mirror and does not require reflow, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, chaining->chaining[sf_index].sf_profile_id);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_MIRR_RX_DROP;
}
sf_metrics_inc(thread->sf_metrics, chaining->chaining[sf_index].policy_id, chaining->chaining[sf_index].sff_profile_id, chaining->chaining[sf_index].sf_profile_id, 1, raw_len, 0, 0);
throughput_metrics_inc(&chaining->chaining[sf_index].rx, 1, raw_len);
marsio_buff_adj(rx_buff, raw_len - meta.raw_len);
int next_sf_index;
for (next_sf_index = sf_index + 1; next_sf_index < chaining->chaining_used; next_sf_index++)
{
struct selected_sf *node = &(chaining->chaining[next_sf_index]);
LOG_INFO("%s: session %lu %s execute policy: %d -> sff_profile_id %d -> sf_profile_id %d -> sf_need_skip %d sf_action_reason : %s",
LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, node->policy_id, node->sff_profile_id, node->sf_profile_id, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason));
if (node->sf_need_skip)
{
continue;
}
switch (node->sf_action)
{
case SESSION_ACTION_BYPASS:
// BYPASS CURRENT SF
continue;
case SESSION_ACTION_BLOCK:
if (node->sff_forward_type == FORWARD_TYPE_STEERING)
{
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_HIT_BLOCK;
}
else
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
int nsend = forward_packet_to_nf(handle, rx_buff, &meta, thread_seq, ctx);
if (nsend > 0)
{
*action_bytes = raw_len;
throughput_metrics_inc(&g_metrics->raw_pkt_tx, 1, nsend);
return INJT_PKT_HIT_BLOCK;
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to network function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
}
case SESSION_ACTION_FORWARD:
if (node->sf_connectivity.method != PACKAGE_METHOD_VXLAN_G)
{
LOG_ERROR("%s: processing inject packets, session %lu %s requires encapsulation format not supported, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
if (node->sff_forward_type == FORWARD_TYPE_STEERING)
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
sf_metrics_inc(thread->sf_metrics, node->policy_id, node->sff_profile_id, node->sf_profile_id, 0, 0, 1, nsend);
throughput_metrics_inc(&node->tx, 1, nsend);
*action_bytes = nsend;
return INJT_PKT_HIT_FWD2SF;
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
}
else
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = mirror_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
sf_metrics_inc(thread->sf_metrics, node->policy_id, node->sff_profile_id, node->sf_profile_id, 0, 0, 1, nsend);
throughput_metrics_inc(&node->tx, 1, nsend);
throughput_metrics_inc(&g_metrics->mirroring_tx, 1, nsend);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, nsend);
continue;
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s mirroring packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
}
default:
assert(0);
continue;
}
}
// the last sf need bypass or need skip
if (next_sf_index != chaining->chaining_used)
{
LOG_ERROR("%s: unexpected inject packet, session %lu %s using invalid chaining index, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
else
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
int nsend = forward_packet_to_nf(handle, rx_buff, &meta, thread_seq, ctx);
if (nsend > 0)
{
*action_bytes = nsend;
return INJT_PKT_HIT_FWD2NF;
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to network function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
}
}
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int forward_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx)
{
marsio_buff_ctrlzone_reset(rx_buff);
struct ethhdr *eth_hdr = (struct ethhdr *)marsio_buff_prepend(rx_buff, sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr) + sizeof(struct g_vxlan));
struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr));
struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip));
struct g_vxlan *g_vxlan_hdr = (struct g_vxlan *)((char *)udp_hdr + sizeof(struct udp_hdr));
LOG_DEBUG("%s: session %lu set metadata to inject packet, META={raw_len: %d, dir_is_e2i: %d, traffic_is_decrypted: %d, sf_index: %d}", LOG_TAG_PKTIO, meta->session_id, meta->raw_len, meta->dir_is_e2i, meta->traffic_is_decrypted, sf->sf_index);
memset(g_vxlan_hdr, 0, sizeof(struct g_vxlan));
g_vxlan_set_packet_dir(g_vxlan_hdr, meta->dir_is_e2i);
g_vxlan_set_sf_index(g_vxlan_hdr, sf->sf_index);
g_vxlan_set_traffic_type(g_vxlan_hdr, meta->traffic_is_decrypted);
build_ether_header(eth_hdr, ETH_P_IP, handle->config.dev_endpoint_src_mac, sf->sf_dst_mac);
build_ip_header(ip_hdr, IPPROTO_UDP, handle->config.dev_endpoint_src_ip, sf->sf_dst_ip, sizeof(struct udp_hdr) + sizeof(struct g_vxlan) + meta->raw_len);
build_udp_header((const char *)&ip_hdr->ip_src, 8, udp_hdr, meta->session_id % (65535 - 49152) + 49152, 4789, sizeof(struct g_vxlan) + meta->raw_len);
int raw_len = marsio_buff_datalen(rx_buff);
if (marsio_send_burst(handle->dev_endpoint.mr_path, thread_seq, &rx_buff, 1) != 0)
{
LOG_ERROR("%s: unable to send burst on device %s, thread_seq: %d", LOG_TAG_PKTIO, handle->config.dev_endpoint, thread_seq);
return -1;
}
return raw_len;
}
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int mirror_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx)
{
marsio_buff_t *new_buff = NULL;
if (marsio_buff_malloc_global(handle->instance, &new_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
LOG_ERROR("%s: unable to malloc buff on marsio instance, thread_seq: %d", LOG_TAG_PKTIO, thread_seq);
return -1;
}
unsigned int raw_len = marsio_buff_datalen(rx_buff);
const char *raw_data = marsio_buff_mtod(rx_buff);
char *copy_ptr = marsio_buff_append(new_buff, raw_len);
memcpy(copy_ptr, raw_data, raw_len);
int nsend = forward_packet_to_sf(handle, new_buff, meta, sf, thread_seq, ctx);
if (nsend == -1)
{
marsio_buff_free(handle->instance, &new_buff, 1, 0, thread_seq);
}
return nsend;
}
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int forward_packet_to_nf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, int thread_seq, void *ct)
{
marsio_buff_ctrlzone_reset(rx_buff);
if (packet_io_set_metadata(rx_buff, meta) != 0)
{
return -1;
}
int raw_len = marsio_buff_datalen(rx_buff);
if (marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1) != 0)
{
LOG_ERROR("%s: unable to send burst on device %s, thread_seq: %d", LOG_TAG_PKTIO, handle->config.dev_nf_interface, thread_seq);
return -1;
}
return raw_len;
}
// return 0 : success
// return -1 : error
static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
if (session_table_search_by_id(thread->session_table, meta->session_id))
{
return -1;
}
struct raw_pkt_parser raw_parser;
raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8);
const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len);
if ((char *)payload - (char *)&meta->raw_data != meta->l7_offset)
{
LOG_ERROR("%s: incorrect dataoffset in the control zone of session %lu", LOG_TAG_PKTIO, meta->session_id);
}
struct session_ctx *s_ctx = session_ctx_new();
s_ctx->ref_thread_ctx = thread;
fixed_num_array_init(&s_ctx->policy_ids);
s_ctx->session_id = meta->session_id;
s_ctx->first_ctrl_pkt.dir_is_e2i = meta->dir_is_e2i;
raw_packet_parser_get_most_inner_tuple4(&raw_parser, &(s_ctx->first_ctrl_pkt.tuple4));
s_ctx->first_ctrl_pkt.addr_string = addr_tuple4_to_str(&(s_ctx->first_ctrl_pkt.tuple4));
s_ctx->first_ctrl_pkt.header_data = strndup(meta->raw_data, meta->l7_offset);
s_ctx->first_ctrl_pkt.header_len = meta->l7_offset;
sids_copy(&s_ctx->first_ctrl_pkt.sids, &meta->sids);
route_ctx_copy(&s_ctx->first_ctrl_pkt.route_ctx, &meta->route_ctx);
s_ctx->chaining = selected_chaining_create(policy_enforce_max_chaining_size(thread->ref_enforcer));
LOG_INFO("%s: session %lu %s active first", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
for (int i = 0; i < parser->policy_id_num; i++)
{
int new_policy_id = parser->policy_ids[i];
if (fixed_num_array_exist_elem(&s_ctx->policy_ids, new_policy_id))
{
continue;
}
else
{
policy_enforce_select_chaining(s_ctx->chaining, thread->ref_enforcer, &raw_parser, new_policy_id, meta->dir_is_e2i, s_ctx);
selected_chaining_bref(s_ctx->chaining);
fixed_num_array_add_elem(&s_ctx->policy_ids, new_policy_id);
}
}
__atomic_fetch_add(&g_metrics->session_nums, 1, __ATOMIC_RELAXED);
session_table_insert(thread->session_table, s_ctx->session_id, &(s_ctx->first_ctrl_pkt.tuple4), s_ctx, session_value_free_cb);
return 0;
}
/*
{
"tsync": "1.0",
"session_id": "123456789",
"state": "active",
"method": "log_update",
"params": {
"sf_profile_ids": [
2,
3,
4,
5,
6,
7
]
}
}
*/
static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct sce_ctx *sce_ctx = thread->ref_sce_ctx;
struct packet_io *packet_io = thread->ref_io;
struct selected_chaining *chaining = s_ctx->chaining;
char buffer[32] = {0};
sprintf(buffer, "%lu", s_ctx->session_id);
cJSON *root = cJSON_CreateObject();
cJSON_AddStringToObject(root, "tsync", "1.0");
cJSON_AddStringToObject(root, "session_id", buffer);
cJSON_AddStringToObject(root, "state", "closing");
cJSON_AddStringToObject(root, "method", "log_update");
cJSON *sf_profile_ids = cJSON_CreateArray();
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node = &(chaining->chaining[i]);
if (node->sf_need_skip == 0 && node->sf_action == SESSION_ACTION_FORWARD)
{
cJSON *id = cJSON_CreateNumber(node->sf_profile_id);
cJSON_AddItemToArray(sf_profile_ids, id);
}
}
cJSON *params = cJSON_CreateObject();
cJSON_AddItemToObject(params, "sf_profile_ids", sf_profile_ids);
cJSON_AddItemToObject(root, "params", params);
char *json_str = cJSON_PrintUnformatted(root);
LOG_INFO("%s: session %lu %s event log: %s", LOG_TAG_METRICS, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, json_str);
marsio_buff_t *tx_buffs[1];
marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread_seq);
char *dst = marsio_buff_append(tx_buffs[0], s_ctx->first_ctrl_pkt.header_len + strlen(json_str));
memcpy(dst, s_ctx->first_ctrl_pkt.header_data, s_ctx->first_ctrl_pkt.header_len);
memcpy(dst + s_ctx->first_ctrl_pkt.header_len, json_str, strlen(json_str));
struct metadata meta = {0};
meta.session_id = s_ctx->session_id;
meta.is_ctrl_pkt = 1;
meta.l7_offset = s_ctx->first_ctrl_pkt.header_len;
meta.sids.num = 1;
meta.sids.elems[0] = sce_ctx->firewall_sids;
route_ctx_copy(&meta.route_ctx, &s_ctx->first_ctrl_pkt.route_ctx);
packet_io_set_metadata(tx_buffs[0], &meta);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1);
free(json_str);
cJSON_Delete(root);
}
// return 0 : success
// return -1 : error
static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (node)
{
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
LOG_INFO("%s: session %lu %s closing", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
struct selected_chaining *chaining = s_ctx->chaining;
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node = &(chaining->chaining[i]);
LOG_INFO("%s: session %lu %s metrics log: policy %d sff_profile_id %d sf_profile_id %d sf_need_skip %d sf_action_reason %s rx_pkts %lu rx_bytes %lu tx_pkts %lu tx_bytes %lu", LOG_TAG_METRICS, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, node->policy_id, node->sff_profile_id, node->sf_profile_id, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason), node->rx.n_pkts, node->rx.n_bytes, node->tx.n_pkts, node->tx.n_bytes);
}
send_event_log(s_ctx, thread_seq, ctx);
__atomic_fetch_add(&g_metrics->send_log, 1, __ATOMIC_RELAXED);
__atomic_fetch_sub(&g_metrics->session_nums, 1, __ATOMIC_RELAXED);
session_table_delete_by_id(thread->session_table, meta->session_id);
return 0;
}
return -1;
}
// return 0 : success
// return -1 : error
static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
2023-02-20 11:16:34 +08:00
// struct global_metrics *g_metrics = thread->ref_metrics;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (node)
{
struct raw_pkt_parser raw_parser;
raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8);
const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len);
if ((char *)payload - (char *)&meta->raw_data != meta->l7_offset)
{
LOG_ERROR("%s: incorrect dataoffset in the control zone of session %lu", LOG_TAG_PKTIO, meta->session_id);
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
LOG_INFO("%s: session %lu %s active again", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
for (int i = 0; i < parser->policy_id_num; i++)
{
int new_policy_id = parser->policy_ids[i];
if (fixed_num_array_exist_elem(&s_ctx->policy_ids, new_policy_id))
{
continue;
}
else
{
policy_enforce_select_chaining(s_ctx->chaining, thread->ref_enforcer, &raw_parser, new_policy_id, meta->dir_is_e2i, s_ctx);
selected_chaining_bref(s_ctx->chaining);
fixed_num_array_add_elem(&s_ctx->policy_ids, new_policy_id);
}
}
}
else
{
return handle_session_opening(meta, parser, thread_seq, ctx);
}
return 0;
}
// return 0 : success
// return -1 : error
static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct sce_ctx *sce_ctx = thread->ref_sce_ctx;
LOG_ERROR("%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
__atomic_fetch_and(&g_metrics->session_nums, 0, __ATOMIC_RELAXED);
for (int i = 0; i < sce_ctx->nr_worker_threads; i++)
{
struct thread_ctx *thread_ctx = &sce_ctx->work_threads[i];
__atomic_fetch_add(&thread_ctx->session_table_need_reset, 1, __ATOMIC_RELAXED);
}
return 0;
}
static void session_value_free_cb(void *ctx)
{
struct session_ctx *s_ctx = (struct session_ctx *)ctx;
session_ctx_free(s_ctx);
}
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
if (eth_hdr->h_proto == 0xAAAA)
{
return 1;
}
else
{
return 0;
}
}
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
if (eth_hdr->h_proto != htons(ETH_P_IP))
{
return 0;
}
struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr));
if (ip_hdr->ip_p != IPPROTO_UDP)
{
return 0;
}
struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip));
if (udp_hdr->uh_dport != htons(3784))
{
return 0;
}
return 1;
}