This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/common/src/tfe_resource.cpp
luwenpeng 444b9c7935 TSG-2489 DOH 支持 IP 归属地
* tfe maat IP 归属地回调表的注册从 pangu 移动到 tfe_resource 中
	* 修复 Pangu IP 归属地 SRC && DST 同时命中时 maat 计数的 bug
2020-07-15 11:30:28 +08:00

439 lines
16 KiB
C++

#include <tfe_utils.h>
#include <tfe_resource.h>
#include <tfe_proxy.h>
#include <tfe_kafka_logger.h>
#include <cjson/cJSON.h>
#include <MESA/Maat_rule.h>
#include <MESA/MESA_prof_load.h>
#define MAAT_INPUT_JSON 0
#define MAAT_INPUT_REDIS 1
#define MAAT_INPUT_FILE 2
struct maat_table_info
{
int id;
char *name;
Maat_plugin_EX_new_func_t *new_func;
Maat_plugin_EX_dup_func_t *dup_func;
Maat_plugin_EX_free_func_t *free_func;
};
static Maat_feather_t static_maat = NULL;
static Maat_feather_t dynamic_maat = NULL;
static tfe_kafka_logger_t *kafka_logger = NULL;
static char *device_id = NULL;
static Maat_feather_t create_maat_feather(const char *instance_name, const char *profile, const char *section, int max_thread, void *logger)
{
Maat_feather_t target;
int input_mode = 0, maat_stat_on = 0, maat_perf_on = 0;
int ret = 0, scan_detail = 0, effect_interval = 60;
char table_info[TFE_STRING_MAX] = {0}, inc_cfg_dir[TFE_STRING_MAX] = {0}, ful_cfg_dir[TFE_STRING_MAX] = {0};
char redis_server[TFE_STRING_MAX] = {0};
char redis_port_range[TFE_STRING_MAX] = {0};
char accept_tags[TFE_STRING_MAX] = {0};
int redis_port_begin = 0, redis_port_end = 0;
int redis_port_select = 0;
int redis_db_idx = 0;
int deferred_load_on = 0;
char json_cfg_file[TFE_STRING_MAX] = {0}, maat_stat_file[TFE_STRING_MAX] = {0};
MESA_load_profile_int_def(profile, section, "maat_input_mode", &(input_mode), 0);
MESA_load_profile_int_def(profile, section, "stat_switch", &(maat_stat_on), 1);
MESA_load_profile_int_def(profile, section, "perf_switch", &(maat_perf_on), 1);
MESA_load_profile_string_def(profile, section, "table_info", table_info, sizeof(table_info), "");
MESA_load_profile_string_def(profile, section, "accept_tags", accept_tags, sizeof(accept_tags), "");
MESA_load_profile_string_def(profile, section, "json_cfg_file", json_cfg_file, sizeof(json_cfg_file), "");
MESA_load_profile_string_def(profile, section, "maat_redis_server", redis_server, sizeof(redis_server), "");
MESA_load_profile_string_def(profile, section, "maat_redis_port_range", redis_port_range, sizeof(redis_server), "6379");
MESA_load_profile_int_def(profile, section, "maat_redis_db_index", &(redis_db_idx), 0);
MESA_load_profile_string_def(profile, section, "inc_cfg_dir", inc_cfg_dir, sizeof(inc_cfg_dir), "");
MESA_load_profile_string_def(profile, section, "full_cfg_dir", ful_cfg_dir, sizeof(ful_cfg_dir), "");
MESA_load_profile_string_def(profile, section, "stat_file", maat_stat_file, sizeof(maat_stat_file), "");
MESA_load_profile_int_def(profile, section, "effect_interval_s", &(effect_interval), 60);
MESA_load_profile_int_def(profile, section, "deferred_load_on", &(deferred_load_on), 0);
effect_interval *= 1000; //convert s to ms
target = Maat_feather(max_thread, table_info, logger);
Maat_set_feather_opt(target, MAAT_OPT_INSTANCE_NAME, instance_name, strlen(instance_name) + 1);
switch (input_mode)
{
case MAAT_INPUT_JSON:
if (!strlen(json_cfg_file))
{
TFE_LOG_ERROR(logger, "Invalid json_cfg_file, MAAT init failed.");
goto error_out;
}
Maat_set_feather_opt(target, MAAT_OPT_JSON_FILE_PATH, json_cfg_file, strlen(json_cfg_file) + 1);
break;
case MAAT_INPUT_REDIS:
if (!strlen(redis_server))
{
TFE_LOG_ERROR(logger, "Invalid maat_redis_server, MAAT init failed.");
goto error_out;
}
ret = sscanf(redis_port_range, "%d-%d", &redis_port_begin, &redis_port_end);
if (ret == 1)
{
redis_port_select = redis_port_begin;
}
else if (ret == 2)
{
srand(time(NULL));
redis_port_select = redis_port_begin + rand() % (redis_port_end - redis_port_begin);
}
else
{
TFE_LOG_ERROR(logger, "Invalid redis port range %s, MAAT init failed.", redis_port_range);
goto error_out;
}
Maat_set_feather_opt(target, MAAT_OPT_REDIS_IP, redis_server, strlen(redis_server) + 1);
Maat_set_feather_opt(target, MAAT_OPT_REDIS_PORT, &redis_port_select, sizeof(redis_port_select));
Maat_set_feather_opt(target, MAAT_OPT_REDIS_INDEX, &redis_db_idx, sizeof(redis_db_idx));
break;
case MAAT_INPUT_FILE:
if (!strlen(ful_cfg_dir))
{
TFE_LOG_ERROR(logger, "Invalid ful_cfg_dir, MAAT init failed.");
goto error_out;
}
if (!strlen(inc_cfg_dir))
{
TFE_LOG_ERROR(logger, "Invalid inc_cfg_dir, MAAT init failed.");
goto error_out;
}
Maat_set_feather_opt(target, MAAT_OPT_FULL_CFG_DIR, ful_cfg_dir, strlen(ful_cfg_dir) + 1);
Maat_set_feather_opt(target, MAAT_OPT_INC_CFG_DIR, inc_cfg_dir, strlen(inc_cfg_dir) + 1);
break;
default:
TFE_LOG_ERROR(logger, "Invalid MAAT Input Mode: %d.", input_mode);
goto error_out;
break;
}
Maat_set_feather_opt(target, MAAT_OPT_FOREIGN_CONT_DIR, "./pangu_files", strlen("./pangu_files") + 1);
if (maat_stat_on)
{
Maat_set_feather_opt(target, MAAT_OPT_STAT_FILE_PATH, maat_stat_file, strlen(maat_stat_file) + 1);
Maat_set_feather_opt(target, MAAT_OPT_STAT_ON, NULL, 0);
if (maat_perf_on)
{
Maat_set_feather_opt(target, MAAT_OPT_PERF_ON, NULL, 0);
}
}
Maat_set_feather_opt(target, MAAT_OPT_DEFERRED_LOAD, &deferred_load_on, sizeof(deferred_load_on));
Maat_set_feather_opt(target, MAAT_OPT_EFFECT_INVERVAL_MS, &effect_interval, sizeof(effect_interval));
Maat_set_feather_opt(target, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail));
if (strlen(accept_tags) > 0)
{
Maat_set_feather_opt(target, MAAT_OPT_ACCEPT_TAGS, &accept_tags, sizeof(accept_tags));
}
ret = Maat_initiate_feather(target);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "%s MAAT init failed.", __FUNCTION__);
goto error_out;
}
return target;
error_out:
Maat_burn_feather(target);
return NULL;
}
static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *section, void *logger)
{
int enable = 0;
char nic_name[64] = {0};
char brokerlist[TFE_STRING_MAX] = {0};
char topic_name[TFE_STRING_MAX] = {0};
tfe_kafka_logger_t *kafka_logger = NULL;
MESA_load_profile_int_def(profile, section, "enable", &enable, 1);
MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), "");
MESA_load_profile_string_def(profile, section, "KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG");
if (!strlen(brokerlist))
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, no brokerlist in profile %s section %s.", profile, section);
return NULL;
}
kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, logger);
if (kafka_logger == NULL)
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
return NULL;
}
TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE");
TFE_LOG_INFO(logger, "tfe kafka topic : %s", topic_name);
TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist);
return kafka_logger;
}
static char *cerate_device_id(const char *profile, const char *section, void *logger)
{
int ret = -1;
size_t device_id_size = 0;
char *tsg_sn_file = NULL, *device_id;
const char *device_def_id = "DFT2201925000001";
cJSON *json = NULL, *item = NULL;
char device_id_filepath[TFE_STRING_MAX] = {0};
ret = MESA_load_profile_string_def(profile, section, "device_id_filepath", device_id_filepath, sizeof(device_id_filepath), NULL);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: device_id_filepath not existed in profile %s section %s.", profile, section);
goto finish;
}
tsg_sn_file = tfe_read_file(device_id_filepath, &device_id_size);
if (tsg_sn_file == NULL)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: device sn file not existed.");
goto finish;
}
json = cJSON_Parse(tsg_sn_file);
if (json == NULL)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: %s invalid json format", tsg_sn_file);
goto finish;
}
item = cJSON_GetObjectItem(json, "sn");
if (unlikely(!item || !cJSON_IsString(item)))
{
TFE_LOG_ERROR(logger, "Invalid device parameter: %s invalid json format", tsg_sn_file);
goto finish;
}
device_id = tfe_strdup(item->valuestring);
cJSON_Delete(json);
TFE_LOG_INFO(logger, "tfe device id : %s", device_id);
return device_id;
finish:
TFE_LOG_INFO(logger, "tfe use default device id : %s", device_def_id);
if (json)
cJSON_Delete(json);
return (char *)device_def_id;
}
static void ip_asn_table_new_cb(int table_id, const char *key, const char *table_line, MAAT_PLUGIN_EX_DATA *ad, long argl, void *argp)
{
int addr_type;
int ret = 0, profile_id = 0, is_valid = 0;
char start_ip[40], end_ip[40], asn[40] = {0};
char organization[TFE_PATH_MAX];
ret = sscanf(table_line, "%d\t%d\t%s\t%s\t%s\t%s\t%d", &profile_id, &addr_type, start_ip, end_ip, asn, organization, &is_valid);
if (ret != 7)
{
TFE_LOG_ERROR(g_default_logger, "Policy table parse ip ASN failed, ret:%d, %s", ret, table_line);
return;
}
tfe_unescape(organization);
struct ip_data_table *ip_asn = ALLOC(struct ip_data_table, 1);
memset(ip_asn, 0, sizeof(struct ip_data_table));
ip_asn->profile_id = profile_id;
ip_asn->asn = tfe_strdup(asn);
ip_asn->organization = tfe_strdup(organization);
ip_asn->ref_cnt = 1;
pthread_mutex_init(&(ip_asn->lock), NULL);
TFE_LOG_INFO(g_default_logger, "Policy table add success %d", profile_id);
*ad = ip_asn;
}
static void ip_location_table_new_cb(int table_id, const char *key, const char *table_line, MAAT_PLUGIN_EX_DATA *ad, long argl, void *argp)
{
int ret = 0, profile_id = 0, is_valid = 0;
int geoname_id = 0, addr_type = 0;
double latitude, longitude, coords;
char language[40], start_ip[40], end_ip[40];
char continent_abbr[TFE_PATH_MAX], continent_full[TFE_PATH_MAX];
char country_abbr[TFE_PATH_MAX], province_abbr[TFE_PATH_MAX], time_zone[TFE_PATH_MAX];
char country_full[TFE_PATH_MAX], province_full[TFE_PATH_MAX], city_full[TFE_PATH_MAX];
ret = sscanf(table_line, "%d\t%d\t%d\t%s\t%s\t%lf\t%lf\t%lf\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%d", &profile_id, &geoname_id,
&addr_type, start_ip, end_ip, &latitude, &longitude, &coords, language,
continent_abbr, continent_full, country_abbr, country_full, province_abbr, province_full,
city_full, time_zone, &is_valid);
if (ret != 18)
{
TFE_LOG_ERROR(g_default_logger, "Policy table parse ip location failed, ret:%d, %s", ret, table_line);
return;
}
tfe_unescape(continent_full);
tfe_unescape(country_full);
tfe_unescape(province_full);
tfe_unescape(city_full);
struct ip_data_table *ip_asn = ALLOC(struct ip_data_table, 1);
memset(ip_asn, 0, sizeof(struct ip_data_table));
ip_asn->profile_id = profile_id;
ip_asn->country_full = tfe_strdup(country_full);
ip_asn->province_full = tfe_strdup(province_full);
ip_asn->city_full = tfe_strdup(city_full);
ip_asn->ref_cnt = 1;
pthread_mutex_init(&(ip_asn->lock), NULL);
TFE_LOG_INFO(g_default_logger, "Policy table add success %d", profile_id);
*ad = ip_asn;
}
static void ip_table_dup_cb(int table_id, MAAT_PLUGIN_EX_DATA *to, MAAT_PLUGIN_EX_DATA *from, long argl, void *argp)
{
struct ip_data_table *ip_asn = (struct ip_data_table *)(*from);
pthread_mutex_lock(&(ip_asn->lock));
ip_asn->ref_cnt++;
pthread_mutex_unlock(&(ip_asn->lock));
*to = ip_asn;
}
static void ip_table_free_cb(int table_id, MAAT_PLUGIN_EX_DATA *ad, long argl, void *argp)
{
if (*ad == NULL)
{
return;
}
struct ip_data_table *ip_asn = (struct ip_data_table *)(*ad);
pthread_mutex_lock(&(ip_asn->lock));
ip_asn->ref_cnt--;
if (ip_asn->ref_cnt > 0)
{
pthread_mutex_unlock(&(ip_asn->lock));
return;
}
pthread_mutex_unlock(&(ip_asn->lock));
pthread_mutex_destroy(&(ip_asn->lock));
if (ip_asn->asn)
FREE(&ip_asn->asn);
if (ip_asn->organization)
FREE(&ip_asn->organization);
if (ip_asn->country_full)
FREE(&ip_asn->country_full);
if (ip_asn->province_full)
FREE(&ip_asn->province_full);
if (ip_asn->city_full)
FREE(&ip_asn->city_full);
FREE(&ip_asn);
*ad = NULL;
return;
}
void ip_table_free(struct ip_data_table *ip_asn)
{
ip_table_free_cb(0, (void **)&ip_asn, 0, NULL);
}
static struct maat_table_info maat_pub_tables[TABLE_TYPE_MAX] = {
// TABLE_IP_ASN_USER_DEFINED
{0, "TSG_IP_ASN_USER_DEFINED", ip_asn_table_new_cb, ip_table_dup_cb, ip_table_free_cb},
// TABLE_IP_ASN_BUILT_IN
{0, "TSG_IP_ASN_BUILT_IN", ip_asn_table_new_cb, ip_table_dup_cb, ip_table_free_cb},
// TABLE_IP_LOCATION_USER_DEFINED
{0, "TSG_IP_LOCATION_USER_DEFINED", ip_location_table_new_cb, ip_table_dup_cb, ip_table_free_cb},
// TABLE_IP_LOCATION_BUILT_IN
{0, "TSG_IP_LOCATION_BUILT_IN", ip_location_table_new_cb, ip_table_dup_cb, ip_table_free_cb},
// TABLE_SECURITY_SOURCE_ASN
{0, "TSG_SECURITY_SOURCE_ASN", NULL, NULL, NULL},
// TABLE_SECURITY_DESTINATION_ASN
{0, "TSG_SECURITY_DESTINATION_ASN", NULL, NULL, NULL},
// TABLE_SECURITY_SOURCE_LOCATION
{0, "TSG_SECURITY_SOURCE_LOCATION", NULL, NULL, NULL},
// TABLE_SECURITY_DESTINATION_LOCATION
{0, "TSG_SECURITY_DESTINATION_LOCATION", NULL, NULL, NULL}
};
static int register_maat_table()
{
for (int i = 0; i < TABLE_TYPE_MAX; i++)
{
maat_pub_tables[i].id = Maat_table_register(static_maat, maat_pub_tables[i].name);
if (maat_pub_tables[i].id < 0)
{
TFE_LOG_ERROR(g_default_logger, "Maat table %s register failed.", maat_pub_tables[i].name);
return -1;
}
if (maat_pub_tables[i].new_func || maat_pub_tables[i].dup_func || maat_pub_tables[i].free_func)
{
Maat_ip_plugin_EX_register(static_maat, maat_pub_tables[i].id, maat_pub_tables[i].new_func,
maat_pub_tables[i].free_func, maat_pub_tables[i].dup_func, 0, NULL);
}
}
return 0;
}
int tfe_bussiness_resouce_init()
{
const char *profile_path = "./conf/tfe/tfe.conf";
unsigned int thread_num = tfe_proxy_get_work_thread_count();
static_maat = create_maat_feather("static", profile_path, "MAAT", thread_num, g_default_logger);
if (!static_maat)
{
return -1;
}
dynamic_maat = create_maat_feather("dyn", profile_path, "DYNAMIC_MAAT", thread_num, g_default_logger);
if (!dynamic_maat)
{
return -1;
}
kafka_logger = create_kafka_logger(profile_path, "kafka", g_default_logger);
if (!kafka_logger)
{
return -1;
}
device_id = cerate_device_id(profile_path, "kafka", g_default_logger);
if (register_maat_table())
{
return -1;
}
return 0;
}
void *tfe_bussiness_resouce_get(enum RESOURCE_TYPE type)
{
switch (type)
{
case STATIC_MAAT:
return static_maat;
case DYNAMINC_MAAT:
return dynamic_maat;
case KAFKA_LOGGER:
return kafka_logger;
case DEVICE_ID:
return device_id;
default:
return NULL;
}
}
int tfe_bussiness_tableid_get(enum TABLE_TYPE type)
{
return maat_pub_tables[type].id;
}