TSG-1531 tfe 代码整理, 将多插件公用的基础代码移动到 tfe init 阶段

1.将 kafka 的初始化从 pangu init 阶段移动到 tfe init 阶段
	2.将 device id 的获取从 pangu init 阶段移动到 tfe init 阶段
	3.将 kafka 的配置项从 pangu.conf 移动到 tfe.conf
	4.将 maat  的配置项从 pangu.conf 移动到 tfe.conf
This commit is contained in:
luwenpeng
2020-06-24 16:40:53 +08:00
parent b4683daf32
commit 03d05dd73e
11 changed files with 223 additions and 192 deletions

View File

@@ -1,6 +1,8 @@
#include <tfe_utils.h>
#include <tfe_resource.h>
#include <tfe_proxy.h>
#include <tfe_kafka_logger.h>
#include <cjson/cJSON.h>
#include <MESA/Maat_rule.h>
#include <MESA/MESA_prof_load.h>
@@ -10,6 +12,8 @@
static Maat_feather_t static_maat = NULL;
static Maat_feather_t dynamic_maat = NULL;
static tfe_kafka_logger_t *kafka_logger = NULL;
static char *device_id = NULL;
static Maat_feather_t create_maat_feather(const char *instance_name, const char *profile, const char *section, int max_thread, void *logger)
{
@@ -135,9 +139,89 @@ error_out:
return NULL;
}
static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *section, void *logger)
{
int enable = 0;
char nic_name[64] = {0};
char brokerlist[TFE_STRING_MAX] = {0};
char topic_name[TFE_STRING_MAX] = {0};
tfe_kafka_logger_t *kafka_logger = NULL;
MESA_load_profile_int_def(profile, section, "enable", &enable, 1);
MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), "");
MESA_load_profile_string_def(profile, section, "KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG");
if (!strlen(brokerlist))
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, no brokerlist in profile %s section %s.", profile, section);
return NULL;
}
kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, logger);
if (kafka_logger == NULL)
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
return NULL;
}
TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE");
TFE_LOG_INFO(logger, "tfe kafka topic : %s", topic_name);
TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist);
return kafka_logger;
}
static char *cerate_device_id(const char *profile, const char *section, void *logger)
{
int ret = -1;
size_t device_id_size = 0;
char *tsg_sn_file = NULL, *device_id;
const char *device_def_id = "DFT2201925000001";
cJSON *json = NULL, *item = NULL;
char device_id_filepath[TFE_STRING_MAX] = {0};
ret = MESA_load_profile_string_def(profile, section, "device_id_filepath", device_id_filepath, sizeof(device_id_filepath), NULL);
if (ret < 0)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: device_id_filepath not existed in profile %s section %s.", profile, section);
goto finish;
}
tsg_sn_file = tfe_read_file(device_id_filepath, &device_id_size);
if (tsg_sn_file == NULL)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: device sn file not existed.");
goto finish;
}
json = cJSON_Parse(tsg_sn_file);
if (json == NULL)
{
TFE_LOG_ERROR(logger, "Invalid device parameter: %s invalid json format", tsg_sn_file);
goto finish;
}
item = cJSON_GetObjectItem(json, "sn");
if (unlikely(!item || !cJSON_IsString(item)))
{
TFE_LOG_ERROR(logger, "Invalid device parameter: %s invalid json format", tsg_sn_file);
goto finish;
}
device_id = tfe_strdup(item->valuestring);
cJSON_Delete(json);
TFE_LOG_INFO(logger, "tfe device id : %s", device_id);
return device_id;
finish:
TFE_LOG_INFO(logger, "tfe use default device id : %s", device_def_id);
if (json)
cJSON_Delete(json);
return (char *)device_def_id;
}
int tfe_bussiness_resouce_init()
{
const char *profile_path = "./conf/pangu/pangu_pxy.conf";
const char *profile_path = "./conf/tfe/tfe.conf";
unsigned int thread_num = tfe_proxy_get_work_thread_count();
static_maat = create_maat_feather("static", profile_path, "MAAT", thread_num, g_default_logger);
if (!static_maat)
@@ -151,19 +235,30 @@ int tfe_bussiness_resouce_init()
return -1;
}
kafka_logger = create_kafka_logger(profile_path, "kafka", g_default_logger);
if (!kafka_logger)
{
return -1;
}
device_id = cerate_device_id(profile_path, "kafka", g_default_logger);
return 0;
}
void *tfe_bussiness_resouce_get(enum RESOURCE_TYPE type)
{
if (type == STATIC_MAAT)
switch (type)
{
case STATIC_MAAT:
return static_maat;
}
if (type == DYNAMINC_MAAT)
{
case DYNAMINC_MAAT:
return dynamic_maat;
case KAFKA_LOGGER:
return kafka_logger;
case DEVICE_ID:
return device_id;
default:
return NULL;
}
return NULL;
}