This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/business/traffic-mirror/src/entry.cpp

748 lines
25 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <assert.h>
#include <cjson/cJSON.h>
#include <tfe_stream.h>
#include <tfe_plugin.h>
#include <tfe_proxy.h>
#include <traffic_mirror.h>
#include <MESA/MESA_prof_load.h>
const static struct ether_addr ether_addr_broadcast{0xff,0xff,0xff,0xff, 0xff, 0xff};
struct traffic_mirror_me
{
struct profile_table_ex_data * profile_ex_data;
struct traffic_mirror_rebuild * rebuild_ctx;
/* Make the DEFER data not to mirror twice
* TODO: the size of (size_t) is enough for a tcp stream offset ? */
size_t downstream_rx_offset_mirrored;
size_t upstream_tx_offset_mirrored;
};
struct traffic_mirror_instance __g_traffic_mirror_instance;
struct traffic_mirror_instance * g_traffic_mirror_instance = &__g_traffic_mirror_instance;
void policy_table_ex_data_free(struct policy_table_ex_data * object)
{
if ((__sync_sub_and_fetch(&object->atomic_refcnt, 1) == 0)) free(object);
}
void policy_table_ex_data_dup_cb(int table_id, MAAT_PLUGIN_EX_DATA * to,
MAAT_PLUGIN_EX_DATA * from, long argl, void * argp)
{
struct policy_table_ex_data * ex_data = (struct policy_table_ex_data *)*from;
if(ex_data==NULL)
{
*to=NULL;
}
else
{
__sync_add_and_fetch(&ex_data->atomic_refcnt, 1);
*to = (void *)ex_data;
}
}
void policy_table_ex_data_free_cb(int table_id, MAAT_PLUGIN_EX_DATA * ad, long argl, void * argp)
{
struct policy_table_ex_data * ex_data = (struct policy_table_ex_data *)*ad;
if(ex_data)
{
policy_table_ex_data_free(ex_data);
}
}
void policy_table_ex_data_new_cb(int table_id, const char * key, const char * table_line,
MAAT_PLUGIN_EX_DATA * ad, long argl, void * argp)
{
struct traffic_mirror_instance * instance = (struct traffic_mirror_instance *) argp;
assert(instance != nullptr && instance->logger != nullptr);
char * str_json = NULL;
cJSON * json_root = NULL;
cJSON * json_subroot = NULL;
cJSON * json_item = NULL;
struct policy_table_ex_data * ex_data = NULL;
size_t user_region_offset;
size_t user_region_len;
int result = Maat_helper_read_column(table_line, 7, &user_region_offset, &user_region_len);
if (unlikely(result < 0))
{
TFE_LOG_ERROR(instance->logger, "Failed at get policy table's user region.");
goto ignore;
}
str_json = ALLOC(char, user_region_len + 1);
memcpy(str_json, table_line + user_region_offset, user_region_len);
json_root = cJSON_Parse(str_json);
if (unlikely(!json_root))
{
TFE_LOG_ERROR(instance->logger, "failed at parsing user region as JSON format.");
goto ignore;
}
json_item=cJSON_GetObjectItem(json_root, "protocol");
if (unlikely(!json_item || !cJSON_IsString(json_item)))
{
TFE_LOG_ERROR(instance->logger, "invalid JSON, protocol not existed or invalid type.");
goto ignore;
}
if(0!=strcasecmp(json_item->valuestring, "SSL")&& 0!=strcasecmp(json_item->valuestring, "HTTP"))
{
goto out;
}
json_subroot = cJSON_GetObjectItem(json_root, "traffic_mirror");
if (unlikely(!json_subroot))
{
TFE_LOG_ERROR(instance->logger, "invalid format, traffic_mirror is not defined.");
goto ignore;
}
ex_data = ALLOC(struct policy_table_ex_data, 1);
ex_data->atomic_refcnt = 1;
ex_data->enable = 0;
ex_data->profile_id = 0;
ex_data->is_profile_set = 0;
json_item = cJSON_GetObjectItem(json_subroot, "enable");
if (unlikely(!json_item || !cJSON_IsNumber(json_item)))
{
TFE_LOG_ERROR(instance->logger, "invalid JSON, traffic_mirror->enable not existed or invalid type.");
goto ignore;
}
ex_data->enable = json_item->valueint;
if (!ex_data->enable)
{
goto success;
}
json_item = cJSON_GetObjectItem(json_subroot, "mirror_profile");
if (unlikely(!json_item || !cJSON_IsNumber(json_item)))
{
TFE_LOG_DEBUG(instance->logger, "traffic_mirror->mirror_profile not existed, user default vlan id :%d.", instance->default_vlan_id);
ex_data->is_profile_set = 0;
ex_data->profile_id = 0;
}
else
{
ex_data->is_profile_set = 1;
ex_data->profile_id = json_item->valueint;
}
success:
TFE_LOG_DEBUG(instance->logger, "traffic mirror policy, key %s: enable = %d, profile = %d",
key, ex_data->enable, ex_data->profile_id);
*ad = ex_data;
ex_data = nullptr;
goto out;
ignore:
TFE_LOG_ERROR(instance->logger, "table line in TSG_SECURITY_COMPILE ignored %s: %s", key, table_line);
goto out;
out:
if (ex_data) policy_table_ex_data_free(ex_data);
if (json_root) cJSON_Delete(json_root);
if (str_json) free(str_json);
}
void profile_table_ex_data_free(struct profile_table_ex_data * object)
{
if ((__sync_sub_and_fetch(&object->atomic_refcnt, 1) == 0)) free(object);
}
void profile_table_ex_data_dup_cb(int table_id, MAAT_PLUGIN_EX_DATA * to,
MAAT_PLUGIN_EX_DATA * from, long argl, void * argp)
{
struct profile_table_ex_data * ex_data = (struct profile_table_ex_data *)*from;
if(ex_data)
{
__sync_add_and_fetch(&ex_data->atomic_refcnt, 1);
*to = (void *)ex_data;
}
else
{
*to = NULL;
}
}
void profile_table_ex_data_free_cb(int table_id, MAAT_PLUGIN_EX_DATA * ad, long argl, void * argp)
{
struct profile_table_ex_data * ex_data = (struct profile_table_ex_data *)*ad;
if(ex_data)
{
profile_table_ex_data_free(ex_data);
}
}
void profile_table_ex_data_new_cb(int table_id, const char * key, const char * table_line,
MAAT_PLUGIN_EX_DATA * ad, long argl, void * argp)
{
struct traffic_mirror_instance * instance = (struct traffic_mirror_instance *) argp;
assert(instance != nullptr && instance->logger != nullptr);
char * str_json = NULL;
cJSON * json_root = NULL;
cJSON * element = NULL;
unsigned int iter = 0;
struct profile_table_ex_data * ex_data = NULL;
size_t addr_list_offset;
size_t addr_list_len;
int result = Maat_helper_read_column(table_line, 3, &addr_list_offset, &addr_list_len);
if (unlikely(result < 0))
{
TFE_LOG_ERROR(instance->logger, "Failed at get profile table's addrlist.");
goto ignore;
}
str_json = ALLOC(char, addr_list_len + 1);
memcpy(str_json, table_line + addr_list_offset, addr_list_len);
json_root = cJSON_Parse(str_json);
if (unlikely(!json_root))
{
TFE_LOG_ERROR(instance->logger, "failed at parsing addrlist as JSON format.");
goto ignore;
}
ex_data = ALLOC(struct profile_table_ex_data, 1);
ex_data->atomic_refcnt = 1;
ex_data->rewrite_mac = 0;
ex_data->rewrite_vlan = 0;
if (unlikely(!cJSON_IsArray(json_root)))
{
TFE_LOG_ERROR(instance->logger, "invalid JSON, mirror_profile->vlan is not a array, %s.", str_json);
goto ignore;
}
ex_data->nr_targets = cJSON_GetArraySize(json_root);
ex_data->vlans = (unsigned int *)calloc(ex_data->nr_targets, sizeof(unsigned int));
ex_data->ether_addrs = (struct ether_addr *)calloc(ex_data->nr_targets, sizeof(struct ether_addr));
cJSON_ArrayForEach(element, json_root)
{
if (unlikely(!cJSON_IsNumber(element)))
{
TFE_LOG_ERROR(instance->logger, "invalid JSON, elements in mirror_profile->vlan is not a number, %s.", str_json);
goto ignore;
}
unsigned int vlan_in_number = element->valueint;
if (unlikely(vlan_in_number <= 0 || vlan_in_number > 4094))
{
TFE_LOG_ERROR(instance->logger, "invalid JSON, vlan id must between 1 and 4094.");
goto ignore;
}
TFE_LOG_DEBUG(instance->logger, "traffic mirror profile %s: vlan id[%d] %d", key, iter, vlan_in_number);
ex_data->rewrite_vlan = 1;
ex_data->vlans[iter] = vlan_in_number;
ex_data->ether_addrs[iter] = ether_addr_broadcast;
iter++;
}
assert(iter == ex_data->nr_targets);
*ad = (void *)ex_data;
ex_data = nullptr;
TFE_LOG_DEBUG(instance->logger, "traffic mirror profile %s: %s", key, str_json);
goto out;
ignore:
TFE_LOG_ERROR(instance->logger, "table line in TSG_PROFILE_TRAFFIC_MIRROR ignored %s: %s", key, table_line);
goto out;
out:
if (ex_data)
{
profile_table_ex_data_free(ex_data);
}
if (str_json)
{
free(str_json);
}
if (json_root)
{
cJSON_Delete(json_root);
}
}
#define MAAT_INPUT_JSON 0
#define MAAT_INPUT_REDIS 1
#define MAAT_INPUT_FILE 2
static Maat_feather_t maat_feather_create_with_override(const char * instance_name,
const char * profile, const char * section, const char * override_section,
unsigned int max_thread, void * logger)
{
Maat_feather_t target;
int input_mode = 0, maat_stat_on = 0, maat_perf_on = 0;
int ret = 0, scan_detail = 0, effect_interval = 60;
char table_info[TFE_STRING_MAX] = {0}, inc_cfg_dir[TFE_STRING_MAX] = {0}, ful_cfg_dir[TFE_STRING_MAX] = {0};
char redis_server[TFE_STRING_MAX] = {0};
char redis_port_range[TFE_STRING_MAX] = {0};
char accept_tags[TFE_STRING_MAX] = {0};
int redis_port_begin = 0, redis_port_end = 0;
int redis_port_select = 0, deferred_load_on=0;
int redis_db_idx = 0;
char json_cfg_file[TFE_STRING_MAX] = {0};
char maat_stat_file[TFE_STRING_MAX] = {0};
MESA_load_profile_int_def(profile, section, "maat_input_mode", &(input_mode), 0);
MESA_load_profile_int_def(profile, section, "stat_switch", &(maat_stat_on), 1);
MESA_load_profile_int_def(profile, section, "perf_switch", &(maat_perf_on), 1);
MESA_load_profile_string_def(profile, section, "table_info", table_info, sizeof(table_info), "");
MESA_load_profile_string_def(profile, section, "accept_tags", accept_tags, sizeof(accept_tags), "");
MESA_load_profile_string_def(profile, section, "json_cfg_file", json_cfg_file, sizeof(json_cfg_file), "");
MESA_load_profile_string_def(profile, section, "maat_redis_server", redis_server, sizeof(redis_server), "");
MESA_load_profile_string_def(profile, section, "maat_redis_port_range", redis_port_range, sizeof(redis_server),
"6379");
MESA_load_profile_int_def(profile, section, "deferred_load_on", &(deferred_load_on), 0);
ret = sscanf(redis_port_range, "%d-%d", &redis_port_begin, &redis_port_end);
if (ret == 1)
{
redis_port_select = redis_port_begin;
}
else if (ret == 2)
{
srand(time(NULL));
redis_port_select = redis_port_begin + rand() % (redis_port_end - redis_port_begin);
}
else
{
TFE_LOG_ERROR(logger, "Invalid redis port range %s, MAAT init failed.", redis_port_range);
}
MESA_load_profile_int_def(profile, section, "maat_redis_db_index", &(redis_db_idx), 0);
MESA_load_profile_string_def(profile, section, "inc_cfg_dir", inc_cfg_dir, sizeof(inc_cfg_dir), "");
MESA_load_profile_string_def(profile, section, "full_cfg_dir", ful_cfg_dir, sizeof(ful_cfg_dir), "");
MESA_load_profile_string_def(profile, section, "stat_file", maat_stat_file, sizeof(maat_stat_file), "");
MESA_load_profile_int_def(profile, section, "effect_interval_s", &(effect_interval), 60);
/* Only override the table info */
MESA_load_profile_string_def(profile, override_section, "table_info", table_info, sizeof(table_info), table_info);
MESA_load_profile_string_def(profile, override_section, "stat_file", maat_stat_file, sizeof(maat_stat_file), maat_stat_file);
effect_interval *= 1000;//convert s to ms
assert(strlen(inc_cfg_dir) != 0 || strlen(ful_cfg_dir) != 0 || strlen(redis_server) != 0
|| strlen(json_cfg_file) != 0);
target = Maat_feather(max_thread, table_info, logger);
Maat_set_feather_opt(target, MAAT_OPT_INSTANCE_NAME, instance_name, strlen(instance_name) + 1);
switch (input_mode)
{
case MAAT_INPUT_JSON:
Maat_set_feather_opt(target, MAAT_OPT_JSON_FILE_PATH, json_cfg_file, strlen(json_cfg_file) + 1);
break;
case MAAT_INPUT_REDIS:Maat_set_feather_opt(target, MAAT_OPT_REDIS_IP, redis_server, strlen(redis_server) + 1);
Maat_set_feather_opt(target, MAAT_OPT_REDIS_PORT, &redis_port_select, sizeof(redis_port_select));
Maat_set_feather_opt(target, MAAT_OPT_REDIS_INDEX, &redis_db_idx, sizeof(redis_db_idx));
break;
case MAAT_INPUT_FILE: Maat_set_feather_opt(target, MAAT_OPT_FULL_CFG_DIR, ful_cfg_dir, strlen(ful_cfg_dir) + 1);
Maat_set_feather_opt(target, MAAT_OPT_INC_CFG_DIR, inc_cfg_dir, strlen(inc_cfg_dir) + 1);
break;
default: TFE_LOG_ERROR(logger, "Invalid MAAT Input Mode: %d.", input_mode);
goto error_out;
break;
}
if (maat_stat_on)
{
Maat_set_feather_opt(target, MAAT_OPT_STAT_FILE_PATH, maat_stat_file, strlen(maat_stat_file) + 1);
Maat_set_feather_opt(target, MAAT_OPT_STAT_ON, NULL, 0);
if (maat_perf_on)
{
Maat_set_feather_opt(target, MAAT_OPT_PERF_ON, NULL, 0);
}
}
Maat_set_feather_opt(target, MAAT_OPT_DEFERRED_LOAD, &deferred_load_on, sizeof(deferred_load_on));
Maat_set_feather_opt(target, MAAT_OPT_EFFECT_INVERVAL_MS, &effect_interval, sizeof(effect_interval));
Maat_set_feather_opt(target, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail));
if (strlen(accept_tags) > 0)
{
Maat_set_feather_opt(target, MAAT_OPT_ACCEPT_TAGS, &accept_tags, sizeof(accept_tags));
}
ret = Maat_initiate_feather(target);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "%s MAAT init failed.", __FUNCTION__);
goto error_out;
}
return target;
error_out:
Maat_burn_feather(target);
return NULL;
}
static int traffic_mirror_ethdev_init(struct traffic_mirror_instance * instance)
{
char str_ethdev[TFE_SYMBOL_MAX] = {0};
const char * profile = "./conf/tfe/tfe.conf";
int ret = MESA_load_profile_string_nodef(profile, "traffic_mirror", "device",
str_ethdev, sizeof(str_ethdev));
if (ret < 0)
{
TFE_LOG_ERROR(instance->logger, "failed at reading conffile, "
"[traffic_mirror]device is not defined.");
return -1;
}
MESA_load_profile_uint_def(profile, "traffic_mirror", "default_vlan_id", &(instance->default_vlan_id), 0);
unsigned int device_type;
MESA_load_profile_uint_def(profile, "traffic_mirror", "type", &device_type, TRAFFIC_MIRROR_ETHDEV_AF_PACKET);
if (device_type == TRAFFIC_MIRROR_ETHDEV_AF_PACKET)
{
instance->ethdev = traffic_mirror_ethdev_pcap_create(str_ethdev, instance->logger);
}
else if(device_type == TRAFFIC_MIRROR_ETHDEV_MARSIO)
{
instance->ethdev = traffic_mirror_ethdev_mr4_create(str_ethdev,
tfe_proxy_get_work_thread_count(), instance->logger);
}
else
{
TFE_LOG_ERROR(instance->logger, "invalid traffic mirror device type, [traffic_mirror]type = %d", device_type);
return -2;
}
if (!instance->ethdev)
{
TFE_LOG_ERROR(instance->logger, "failed at traffic mirror device init. ");
return -3;
}
return 0;
}
int traffic_mirror_init(struct tfe_proxy * proxy)
{
int result = 0;
struct traffic_mirror_instance * instance = g_traffic_mirror_instance;
instance->logger = tfe_proxy_get_error_logger();
/* Using PANGU-HTTP's profile */
MESA_load_profile_uint_def("./conf/tfe/tfe.conf", "traffic_mirror", "enable", &(instance->enable), 1);
if (!instance->enable)
{
TFE_LOG_INFO(instance->logger, "traffic_mirror is disabled.");
return 0;
}
/* INIT DECRYPT MIRROR INSTANCE */
instance->nr_threads = tfe_proxy_get_work_thread_count();
/* MAAT Feather, the configuration is same with pangu-http */
instance->maat_feather = maat_feather_create_with_override(
"traffic-mirror", "./conf/tfe/tfe.conf",
"maat", "traffic_mirror", instance->nr_threads, instance->logger);
if (unlikely(!instance->maat_feather))
{
TFE_LOG_ERROR(instance->logger, "failed at creating maat feather.");
goto errout;
}
/* REGISTER MAAT FEATHER */
instance->policy_table_id = Maat_table_register(instance->maat_feather, "TSG_SECURITY_COMPILE");
if (unlikely(instance->policy_table_id < 0))
{
TFE_LOG_ERROR(instance->logger, "failed at register table TSG_SECURITY_COMPILE, ret = %d",
instance->policy_table_id); goto errout;
}
instance->profile_table_id = Maat_table_register(instance->maat_feather, "TSG_PROFILE_TRAFFIC_MIRROR");
if (unlikely(instance->profile_table_id < 0))
{
TFE_LOG_ERROR(instance->logger, "failed at register table TSG_PROFILE_TRAFFIC_MIRROR, ret = %d",
instance->profile_table_id); goto errout;
}
result = Maat_plugin_EX_register(instance->maat_feather, instance->policy_table_id,
policy_table_ex_data_new_cb, policy_table_ex_data_free_cb, policy_table_ex_data_dup_cb,
nullptr, 0, instance);
if(unlikely(result < 0))
{
TFE_LOG_ERROR(instance->logger, "failed at Maat_plugin_EX_register(TSG_SECURITY_COMPILE), "
"table_id = %d, ret = %d", instance->policy_table_id, result);
goto errout;
}
result = Maat_plugin_EX_register(instance->maat_feather, instance->profile_table_id,
profile_table_ex_data_new_cb, profile_table_ex_data_free_cb, profile_table_ex_data_dup_cb,
nullptr, 0, instance);
if (unlikely(result < 0))
{
TFE_LOG_ERROR(instance->logger, "failed at Maat_plugin_EX_register(TSG_PROFILE_TRAFFIC_MIRROR), "
"table_id = %d, ret = %d", instance->policy_table_id, result);
}
if (traffic_mirror_ethdev_init(instance) < 0)
{
goto errout;
}
return 0;
errout:
return -1;
}
const static ether_addr zero_mac = {0};
const static unsigned char default_src_mac[6] = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05};
const static unsigned char default_dst_mac[6] = {0x00, 0x05, 0x04, 0x03, 0x02, 0x01};
int traffic_mirror_on_open_cb(const struct tfe_stream * stream, unsigned int thread_id,
enum tfe_conn_dir dir, void ** pme)
{
/* Firstly, fetch destination address of traffic mirror */
struct traffic_mirror_me * me = NULL;
struct traffic_mirror_instance * instance = g_traffic_mirror_instance;
if (!instance->enable)
{
return ACTION_FORWARD_DATA;
}
struct tfe_cmsg * cmsg = tfe_stream_get0_cmsg(stream);
unsigned int target_id;
struct traffic_mirror_rebuild_target * rebuild_target = NULL;
assert(instance != NULL);
assert(cmsg != NULL);
char str_policy_id[TFE_SYMBOL_MAX] = {0};
char str_profile_id[TFE_SYMBOL_MAX] = {0};
unsigned int opt_val;
uint16_t opt_out_size;
struct policy_table_ex_data * policy_ex_data = NULL;
struct profile_table_ex_data * profile_ex_data = NULL;
struct ether_addr c_ether_addr = {};
struct ether_addr s_ether_addr = {};
int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *) &opt_val, sizeof(opt_val), &opt_out_size);
if (ret < 0)
{
TFE_LOG_ERROR(instance->logger, "failed at getting policy id from cmsg, detach the stream.");
goto detach;
}
snprintf(str_policy_id, sizeof(str_policy_id), "%u", opt_val);
policy_ex_data = (struct policy_table_ex_data *) Maat_plugin_get_EX_data(instance->maat_feather,
instance->policy_table_id, str_policy_id);
if (!policy_ex_data || !policy_ex_data->enable)
{
goto detach;
}
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_SRC_MAC, (unsigned char *) &c_ether_addr,
sizeof(c_ether_addr), &opt_out_size);
if (ret < 0 || memcmp(&c_ether_addr, &zero_mac, sizeof(c_ether_addr)) == 0)
{
TFE_LOG_ERROR(instance->logger, "failed at source mac address, user default src mac: {0x01, 0x02, 0x03, 0x04, 0x05, 0x06}");
memcpy(&c_ether_addr, &default_src_mac, sizeof(c_ether_addr));
}
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_DST_MAC, (unsigned char *) &s_ether_addr,
sizeof(s_ether_addr), &opt_out_size);
if (ret < 0 || memcmp(&s_ether_addr, &zero_mac, sizeof(s_ether_addr)) == 0)
{
TFE_LOG_ERROR(instance->logger, "failed at dest mac address, user default dest mac: {0x06, 0x05, 0x04, 0x03, 0x02, 0x01}");
memcpy(&s_ether_addr, &default_dst_mac, sizeof(s_ether_addr));
}
rebuild_target = ALLOC(struct traffic_mirror_rebuild_target, 1);
if (policy_ex_data->is_profile_set)
{
snprintf(str_profile_id, sizeof(str_policy_id), "%u", policy_ex_data->profile_id);
profile_ex_data = (struct profile_table_ex_data *)Maat_plugin_get_EX_data(instance->maat_feather,
instance->profile_table_id, str_profile_id);
if (!profile_ex_data)
{
TFE_LOG_ERROR(instance->logger, "failed at getting policy %s's profile, profile id = %s, "
"detach the stream",
str_policy_id, str_profile_id);
goto detach;
}
target_id = random() % profile_ex_data->nr_targets;
rebuild_target->vlan_tci = profile_ex_data->vlans[target_id];
rebuild_target->ether_addr = profile_ex_data->ether_addrs[target_id];
rebuild_target->rewrite_as_target_mac = profile_ex_data->rewrite_mac;
rebuild_target->rewrite_as_target_vlan = profile_ex_data->rewrite_vlan;
}
else
{
rebuild_target->vlan_tci = instance->default_vlan_id;
rebuild_target->ether_addr = ether_addr_broadcast;
rebuild_target->rewrite_as_target_mac = 0;
rebuild_target->rewrite_as_target_vlan = 1;
}
me = ALLOC(struct traffic_mirror_me, 1);
me->rebuild_ctx = traffic_mirror_rebuild_create(stream->addr, &c_ether_addr, &s_ether_addr,
rebuild_target, instance->ethdev);
me->profile_ex_data = profile_ex_data;
*pme = (void *) me;
/* the ownership is transfer to struct me and rebuild_target */
profile_ex_data = NULL;
rebuild_target = NULL;
traffic_mirror_rebuild_handshake(me->rebuild_ctx, thread_id);
return ACTION_FORWARD_DATA;
detach:
if (me)
{
free(me);
}
if (policy_ex_data)
{
policy_table_ex_data_free(policy_ex_data);
}
if (profile_ex_data)
{
profile_table_ex_data_free(profile_ex_data);
}
if (rebuild_target)
{
free(rebuild_target);
}
tfe_stream_detach(stream);
return ACTION_FORWARD_DATA;
}
enum tfe_stream_action traffic_mirror_on_data_cb(const struct tfe_stream * stream, unsigned int thread_id,
enum tfe_conn_dir dir, const unsigned char * data, size_t len, void ** pme)
{
struct traffic_mirror_instance * instance = g_traffic_mirror_instance;
if (!instance->enable)
{
return ACTION_FORWARD_DATA;
}
struct traffic_mirror_me * me = (struct traffic_mirror_me *)(*pme);
/* Rx offset of this callback */
size_t rx_offset_this_time;
size_t rx_offset_mirrored;
enum tfe_stream_info rx_offset_type;
/* Need to mirrored data */
const unsigned char * ptr_data_need_to_mirrored;
size_t sz_data_need_to_mirrored = 0;
if (dir == CONN_DIR_DOWNSTREAM)
{
rx_offset_type = INFO_FROM_DOWNSTREAM_RX_OFFSET;
rx_offset_mirrored = me->downstream_rx_offset_mirrored;
}
else
{
rx_offset_type = INFO_FROM_UPSTREAM_RX_OFFSET;
rx_offset_mirrored = me->upstream_tx_offset_mirrored;
}
/* Get the offset of this callback */
int ret = tfe_stream_info_get(stream, rx_offset_type, &rx_offset_this_time, sizeof(rx_offset_this_time));
if (unlikely(ret < 0))
{
TFE_STREAM_LOG_ERROR(stream, "Failed at fetch rx offset, detached.");
goto errout;
}
/* Mirrored offset must be larger than rx_offset,
* Otherwise, there is a hole in data stream */
assert(rx_offset_mirrored >= rx_offset_this_time);
assert(rx_offset_this_time + len >= rx_offset_mirrored);
sz_data_need_to_mirrored = len - (rx_offset_mirrored - rx_offset_this_time);
ptr_data_need_to_mirrored = data + (len - sz_data_need_to_mirrored);
/* Don't need to mirrored, the data has been mirrored in DEFER state */
if (sz_data_need_to_mirrored == 0)
goto out;
/* Update the mirrored offset */
if (dir == CONN_DIR_DOWNSTREAM) me->downstream_rx_offset_mirrored += sz_data_need_to_mirrored;
else me->upstream_tx_offset_mirrored += sz_data_need_to_mirrored;
traffic_mirror_rebuild_data(me->rebuild_ctx, thread_id, (const char *) ptr_data_need_to_mirrored,
(size_t) sz_data_need_to_mirrored, dir);
out:
return ACTION_FORWARD_DATA;
errout:
tfe_stream_detach(stream);
return ACTION_FORWARD_DATA;
}
void traffic_mirror_on_close_cb(const struct tfe_stream * stream, unsigned int thread_id,
enum tfe_stream_close_reason reason, void ** pme)
{
struct traffic_mirror_instance * instance = g_traffic_mirror_instance;
if (!instance->enable)
{
return;
}
struct traffic_mirror_me * me = (struct traffic_mirror_me *)(*pme);
traffic_mirror_rebuild_farewell(me->rebuild_ctx, thread_id);
traffic_mirror_rebuild_destroy(me->rebuild_ctx);
if (me->profile_ex_data)
{
profile_table_ex_data_free(me->profile_ex_data);
}
free(me);
*pme = NULL;
}
void traffic_mirror_deinit(struct tfe_proxy * proxy){}
struct tfe_plugin traffic_mirror_plugin_desc =
{
.symbol= "traffic_mirror",
.type = TFE_PLUGIN_TYPE_BUSINESS,
.on_init = traffic_mirror_init,
.on_deinit = traffic_mirror_deinit,
.on_open = traffic_mirror_on_open_cb,
.on_data = traffic_mirror_on_data_cb,
.on_close = traffic_mirror_on_close_cb
};
TFE_PLUGIN_REGISTER(traffic_mirror, traffic_mirror_plugin_desc)