This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/common/src/tfe_resource.cpp

472 lines
16 KiB
C++
Raw Normal View History

#include <MESA/MESA_prof_load.h>
2023-03-30 19:39:18 +08:00
#include <MESA/maat.h>
#include <cjson/cJSON.h>
#include <tfe_kafka_logger.h>
2023-04-25 16:17:35 +08:00
#include <tfe_fieldstat.h>
#include <tfe_proxy.h>
#include <tfe_resource.h>
#define MAAT_INPUT_JSON 0
#define MAAT_INPUT_REDIS 1
#define MAAT_INPUT_FILE 2
static int scan_table_id[__SCAN_COMMON_TABLE_MAX];
2023-03-30 19:39:18 +08:00
static struct maat *static_maat = NULL;
static tfe_kafka_logger_t *kafka_logger = NULL;
2023-04-25 16:17:35 +08:00
static struct tfe_fieldstat_metric_t *dynamic_fieldstat = NULL;
static char *device_id = NULL;
static char *effective_device_tag=NULL;
2023-04-25 16:17:35 +08:00
static struct tfe_fieldstat_metric_t *create_fieldstat_instance(const char *profile, const char *section, int max_thread, void *logger)
{
int cycle=0;
unsigned short telegraf_port=0;
char telegraf_ip[TFE_STRING_MAX]={0};
char app_name[TFE_STRING_MAX]={0};
struct tfe_fieldstat_metric_t *dynamic_fieldstat=NULL;
MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port));
MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip));
MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric");
MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000);
dynamic_fieldstat = tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, logger);
if (dynamic_fieldstat == NULL)
{
TFE_LOG_ERROR(logger, "tfe fieldstat init failed, error to create fieldstat metric.");
return NULL;
}
TFE_LOG_INFO(logger, "tfe fieldstat telegraf_ip : %s", telegraf_ip);
TFE_LOG_INFO(logger, "tfe fieldstat telegraf_port : %d", telegraf_port);
TFE_LOG_INFO(logger, "tfe fieldstat app_name : %s", app_name);
TFE_LOG_INFO(logger, "tfe fieldstat cycle : %d", cycle);
return dynamic_fieldstat;
}
2023-03-30 19:39:18 +08:00
static struct maat *create_maat_feather(const char *instance_name, const char *profile, const char *section, int max_thread, void *logger)
{
2023-03-30 19:39:18 +08:00
struct maat *target=NULL;
int input_mode = 0, maat_stat_on = 0, maat_perf_on = 0;
2023-03-30 19:39:18 +08:00
int ret = 0, effect_interval = 60, log_level=0;
char table_info[TFE_STRING_MAX] = {0}, inc_cfg_dir[TFE_STRING_MAX] = {0}, ful_cfg_dir[TFE_STRING_MAX] = {0};
char redis_server[TFE_STRING_MAX] = {0};
char redis_port_range[TFE_STRING_MAX] = {0};
char accept_tags[TFE_STRING_MAX] = {0};
char accept_path[TFE_PATH_MAX] = {0};
int redis_port_begin = 0, redis_port_end = 0;
int redis_port_select = 0;
int redis_db_idx = 0;
int deferred_load_on = 0;
char json_cfg_file[TFE_STRING_MAX] = {0}, maat_stat_file[TFE_STRING_MAX] = {0};
MESA_load_profile_int_def(profile, section, "maat_input_mode", &(input_mode), 0);
MESA_load_profile_int_def(profile, section, "stat_switch", &(maat_stat_on), 1);
MESA_load_profile_int_def(profile, section, "perf_switch", &(maat_perf_on), 1);
MESA_load_profile_string_def(profile, section, "table_info", table_info, sizeof(table_info), "");
MESA_load_profile_string_def(profile, section, "accept_path", accept_path, sizeof(accept_path), "");
MESA_load_profile_string_def(profile, section, "json_cfg_file", json_cfg_file, sizeof(json_cfg_file), "");
MESA_load_profile_string_def(profile, section, "maat_redis_server", redis_server, sizeof(redis_server), "");
MESA_load_profile_string_def(profile, section, "maat_redis_port_range", redis_port_range, sizeof(redis_server), "6379");
MESA_load_profile_int_def(profile, section, "maat_redis_db_index", &(redis_db_idx), 0);
MESA_load_profile_string_def(profile, section, "inc_cfg_dir", inc_cfg_dir, sizeof(inc_cfg_dir), "");
MESA_load_profile_string_def(profile, section, "full_cfg_dir", ful_cfg_dir, sizeof(ful_cfg_dir), "");
MESA_load_profile_string_def(profile, section, "stat_file", maat_stat_file, sizeof(maat_stat_file), "");
MESA_load_profile_int_def(profile, section, "effect_interval_s", &(effect_interval), 60);
MESA_load_profile_int_def(profile, section, "deferred_load_on", &(deferred_load_on), 0);
2023-03-30 19:39:18 +08:00
MESA_load_profile_int_def(profile, section, "log_level", &(log_level), LOG_LEVEL_FATAL);
effect_interval *= 1000; //convert s to ms
2023-03-30 19:39:18 +08:00
struct maat_options *opts = maat_options_new();
maat_options_set_logger(opts, "log/maat.log", (enum log_level)log_level);
maat_options_set_instance_name(opts, instance_name);
maat_options_set_caller_thread_number(opts, max_thread);
switch (input_mode)
{
case MAAT_INPUT_JSON:
if (!strlen(json_cfg_file))
{
TFE_LOG_ERROR(logger, "Invalid json_cfg_file, MAAT init failed.");
goto error_out;
}
maat_options_set_json_file(opts, json_cfg_file);
break;
case MAAT_INPUT_REDIS:
if (!strlen(redis_server))
{
TFE_LOG_ERROR(logger, "Invalid maat_redis_server, MAAT init failed.");
goto error_out;
}
ret = sscanf(redis_port_range, "%d-%d", &redis_port_begin, &redis_port_end);
if (ret == 1)
{
redis_port_select = redis_port_begin;
}
else if (ret == 2)
{
srand(time(NULL));
redis_port_select = redis_port_begin + rand() % (redis_port_end - redis_port_begin);
}
else
{
TFE_LOG_ERROR(logger, "Invalid redis port range %s, MAAT init failed.", redis_port_range);
goto error_out;
}
maat_options_set_redis(opts, redis_server, redis_port_select, redis_db_idx);
break;
case MAAT_INPUT_FILE:
if (!strlen(ful_cfg_dir))
{
TFE_LOG_ERROR(logger, "Invalid ful_cfg_dir, MAAT init failed.");
goto error_out;
}
if (!strlen(inc_cfg_dir))
{
TFE_LOG_ERROR(logger, "Invalid inc_cfg_dir, MAAT init failed.");
goto error_out;
}
maat_options_set_iris(opts, ful_cfg_dir, inc_cfg_dir);
break;
default:
TFE_LOG_ERROR(logger, "Invalid MAAT Input Mode: %d.", input_mode);
goto error_out;
break;
}
2023-03-30 19:39:18 +08:00
maat_options_set_foreign_cont_dir(opts, "./pangu_files");
if (maat_stat_on)
{
maat_options_set_stat_on(opts);
maat_options_set_stat_file(opts, maat_stat_file);
2023-03-30 19:39:18 +08:00
if (maat_perf_on)
{
maat_options_set_perf_on(opts);
}
}
if (deferred_load_on)
{
2023-03-30 19:39:18 +08:00
maat_options_set_deferred_load_on(opts);
}
2023-03-30 19:39:18 +08:00
if (strlen(accept_path) > 0)
{
MESA_load_profile_string_def(accept_path, "maat", "ACCEPT_TAGS", accept_tags, sizeof(accept_tags), "{\"tags\":[{\"tag\":\"device_id\",\"value\":\"device_1\"}]}");
2023-03-30 19:39:18 +08:00
maat_options_set_accept_tags(opts, accept_tags);
TFE_LOG_INFO(logger, "tfe accept tags : %s", accept_tags);
}
2023-03-30 19:39:18 +08:00
target = maat_new(opts, table_info);
if (!target)
{
TFE_LOG_ERROR(logger, "%s MAAT init failed.", __FUNCTION__);
goto error_out;
}
2023-03-30 19:39:18 +08:00
maat_options_free(opts);
return target;
error_out:
2023-03-30 19:39:18 +08:00
maat_options_free(opts);
return NULL;
}
static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *section, void *logger)
{
int enable = 0, vsystem_id = 0;
char nic_name[TFE_SYMBOL_MAX] = {0};
char brokerlist[TFE_STRING_MAX] = {0};
char logger_topic[TFE_STRING_MAX] = {0};
char bucket_topic[TFE_STRING_MAX] = {0};
char sasl_username[TFE_STRING_MAX] = {0};
char sasl_passwd[TFE_STRING_MAX] = {0};
tfe_kafka_logger_t *kafka_logger = NULL;
MESA_load_profile_int_def(profile, section, "enable", &enable, 1);
MESA_load_profile_int_def(profile, section, "VSYSTEM_ID", &vsystem_id, 1);
MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), "");
MESA_load_profile_string_def(profile, section, "LOGGER_SEND_TOPIC", logger_topic, sizeof(logger_topic), "PROXY-EVENT");
MESA_load_profile_string_def(profile, section, "FILE_BUCKET_TOPIC", bucket_topic, sizeof(bucket_topic), "TRAFFIC-FILE-STREAM-RECORD");
MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
if (!strlen(brokerlist))
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, no brokerlist in profile %s section %s.", profile, section);
return NULL;
}
2023-12-19 14:23:55 +08:00
kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, sasl_username, sasl_passwd, logger);
if (kafka_logger == NULL)
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
return NULL;
}
2023-12-19 14:23:55 +08:00
int ret = tfe_kafka_logger_topic_new(kafka_logger, logger_topic, TOPIC_LOGGER, logger);
if(ret < 0)
{
return NULL;
}
ret = tfe_kafka_logger_topic_new(kafka_logger, bucket_topic, TOPIC_BUCKET, logger);
if(ret < 0)
{
return NULL;
}
kafka_logger->t_vsys_id=vsystem_id;
TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE");
TFE_LOG_INFO(logger, "tfe kafka vsystem id : %d", vsystem_id);
TFE_LOG_INFO(logger, "tfe logger kafka topic : %s", logger_topic);
TFE_LOG_INFO(logger, "tfe bucket kafka topic : %s", bucket_topic);
TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist);
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
TFE_LOG_INFO(logger, "tfe kafka sasl_username : %s", sasl_username);
TFE_LOG_INFO(logger, "tfe kafka sasl_passwd : %s", sasl_passwd);
}
return kafka_logger;
}
static char *cerate_device_id(const char *profile, const char *section, void *logger)
{
int ret = -1;
size_t device_id_size = 0;
char *tsg_sn_file = NULL, *device_id;
const char *device_def_id = "DFT2201925000001";
cJSON *json = NULL, *item = NULL;
char device_id_filepath[TFE_STRING_MAX] = {0};
ret = MESA_load_profile_string_def(profile, section, "device_id_filepath", device_id_filepath, sizeof(device_id_filepath), NULL);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: device_id_filepath not existed in profile %s section %s.", profile, section);
goto finish;
}
tsg_sn_file = tfe_read_file(device_id_filepath, &device_id_size);
if (tsg_sn_file == NULL)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: device sn file not existed.");
goto finish;
}
json = cJSON_Parse(tsg_sn_file);
if (json == NULL)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: %s invalid json format", tsg_sn_file);
goto finish;
}
item = cJSON_GetObjectItem(json, "sn");
if (unlikely(!item || !cJSON_IsString(item)))
{
TFE_LOG_ERROR(logger, "Invalid device parameter: %s invalid json format", tsg_sn_file);
goto finish;
}
device_id = tfe_strdup(item->valuestring);
if(tsg_sn_file)
{
FREE(&tsg_sn_file);
}
cJSON_Delete(json);
TFE_LOG_INFO(logger, "tfe device id : %s", device_id);
return device_id;
finish:
TFE_LOG_INFO(logger, "tfe use default device id : %s", device_def_id);
if (json)
{
cJSON_Delete(json);
}
if(tsg_sn_file)
{
FREE(&tsg_sn_file);
}
return (char *)device_def_id;
}
static char* create_effective_device_tag(const char *profile, const char *section, void *logger)
{
char *effective_device_tag=NULL;
char accept_path[TFE_PATH_MAX] = {0}, accept_tags[TFE_STRING_MAX] = {0};
MESA_load_profile_string_def(profile, section, "accept_path", accept_path, sizeof(accept_path), "");
if(strlen(accept_path) > 0)
{
MESA_load_profile_string_def(accept_path, "maat", "ACCEPT_TAGS", accept_tags, sizeof(accept_tags), "");
}
if(strlen(accept_tags) <= 0)
{
return NULL;
}
effective_device_tag = tfe_strdup(accept_tags);
TFE_LOG_INFO(logger, "tfe device tag : %s", effective_device_tag);
return effective_device_tag;
}
void app_dict_table_new_cb(const char *table_name, int table_id, const char* key, const char* table_line, void **ad, long argl, void* argp)
{
int ret=0;
size_t offset=0, len=0;
char *app_id_str=NULL, *group_id_str=NULL;
struct app_id_dict *app_dict=ALLOC(struct app_id_dict, 1);
ret = maat_helper_read_column(table_line, 1, &offset, &len);
if(ret >= 0)
{
app_id_str=ALLOC(char, len+1);
memcpy(app_id_str, table_line+offset, len);
app_dict->app_id=atoi(app_id_str);
FREE(&app_id_str);
}
ret = maat_helper_read_column(table_line, 18, &offset, &len);
if(ret >= 0)
{
group_id_str=ALLOC(char, len+1);
memcpy(group_id_str, table_line+offset, len);
app_dict->group_id=atoll(group_id_str);
FREE(&group_id_str);
}
app_dict->ref_cnt=1;
pthread_mutex_init(&(app_dict->lock), NULL);
*ad=app_dict;
return;
}
void app_dict_table_free_cb(int table_id, void **ad, long argl, void* argp)
{
if(*ad==NULL)
{
return;
}
struct app_id_dict *app_dict=(struct app_id_dict *)(*ad);
pthread_mutex_lock(&(app_dict->lock));
app_dict->ref_cnt--;
if(app_dict->ref_cnt>0)
{
pthread_mutex_unlock(&(app_dict->lock));
return;
}
pthread_mutex_unlock(&(app_dict->lock));
pthread_mutex_destroy(&(app_dict->lock));
FREE(&app_dict);
*ad=NULL;
return;
}
void app_id_dict_free(struct app_id_dict *app_dict)
{
app_dict_table_free_cb(0, (void **)&app_dict, 0, NULL);
}
void app_dict_table_dup_cb(int table_id, void **to, void **from, long argl, void* argp)
{
struct app_id_dict *app_dict=(struct app_id_dict *)(*from);
pthread_mutex_lock(&(app_dict->lock));
app_dict->ref_cnt++;
pthread_mutex_unlock(&(app_dict->lock));
*to=app_dict;
return;
}
static int maat_common_table_init()
{
const char * table_name[__SCAN_COMMON_TABLE_MAX];
table_name[PXY_CTRL_SOURCE_IP] = "ATTR_SOURCE_IP";
table_name[PXY_CTRL_DESTINATION_IP]="ATTR_DESTINATION_IP";
table_name[PXY_CTRL_INTERNAL_IP] = "ATTR_INTERNAL_IP";
table_name[PXY_CTRL_EXTERNAL_IP] = "ATTR_EXTERNAL_IP";
table_name[PXY_CTRL_SOURCE_PORT] = "ATTR_SOURCE_PORT";
table_name[PXY_CTRL_DESTINATION_PORT] = "ATTR_DESTINATION_PORT";
table_name[PXY_CTRL_INTERNAL_PORT] = "ATTR_INTERNAL_PORT";
table_name[PXY_CTRL_EXTERNAL_PORT] = "ATTR_EXTERNAL_PORT";
table_name[PXY_CTRL_IP_PROTOCOL] = "ATTR_IP_PROTOCOL";
table_name[PXY_CTRL_SOURCE_ASN] = "ATTR_SOURCE_ASN";
table_name[PXY_CTRL_DESTINATION_ASN]="ATTR_DESTINATION_ASN";
table_name[PXY_CTRL_SOURCE_LOCATION] = "ATTR_SOURCE_LOCATION";
table_name[PXY_CTRL_DESTINATION_LOCATION] = "ATTR_DESTINATION_LOCATION";
table_name[PXY_CTRL_SUBSCRIBER_ID] = "ATTR_SUBSCRIBER_ID";
table_name[PXY_CTRL_APP_ID_DICT] = "APP_ID_DICT";
for (int i = 0; i < __SCAN_COMMON_TABLE_MAX; i++)
{
scan_table_id[i] = maat_get_table_id(static_maat, table_name[i]);
if (scan_table_id[i] < 0)
{
TFE_LOG_ERROR(g_default_logger, "Maat table %s register failed.", table_name[i]);
return -1;
}
}
maat_plugin_table_ex_schema_register(static_maat, "APP_ID_DICT", app_dict_table_new_cb, app_dict_table_free_cb, app_dict_table_dup_cb, 0, NULL);
return 0;
}
int tfe_bussiness_resouce_init()
{
const char *profile_path = "./conf/tfe/tfe.conf";
unsigned int thread_num = tfe_proxy_get_work_thread_count();
static_maat = create_maat_feather("static", profile_path, "MAAT", thread_num, g_default_logger);
if (!static_maat)
{
return -1;
}
kafka_logger = create_kafka_logger(profile_path, "kafka", g_default_logger);
if (!kafka_logger)
{
return -1;
}
2023-04-25 16:17:35 +08:00
dynamic_fieldstat = create_fieldstat_instance(profile_path, "proxy_hits", thread_num, g_default_logger);
if(!dynamic_fieldstat)
{
return -1;
}
2023-04-25 16:17:35 +08:00
device_id = cerate_device_id(profile_path, "kafka", g_default_logger);
effective_device_tag = create_effective_device_tag(profile_path, "MAAT", g_default_logger);
if (maat_common_table_init())
{
return -1;
}
return 0;
}
void *tfe_bussiness_resouce_get(enum RESOURCE_TYPE type)
{
switch (type)
{
case STATIC_MAAT:
return static_maat;
case KAFKA_LOGGER:
return kafka_logger;
case DEVICE_ID:
return device_id;
case EFFECTIVE_DEVICE_TAG:
return effective_device_tag;
2023-04-25 16:17:35 +08:00
case DYNAMIC_FIELDSTAT:
return dynamic_fieldstat;
default:
return NULL;
}
}
int tfe_bussiness_tableid_get(enum scan_common_table type)
{
return scan_table_id[type];
}