TSG-15739:功能端支持输出IPFIX封装的UDP报文

This commit is contained in:
杨玉波
2023-09-01 08:50:24 +00:00
committed by 刘学利
parent fc4c49379f
commit 5c1e250c7a
9 changed files with 1281 additions and 32 deletions

View File

@@ -1045,7 +1045,7 @@ int TLD_cancel(struct TLD_handle_t *handle)
tsg_stat_log_handle_update(LOG_HANDLE_FREE_CNT, 1);
}
free(handle);
handle = NULL;
}
@@ -1106,7 +1106,6 @@ int TLD_append(struct TLD_handle_t *handle, char *key, void *value, TLD_TYPE typ
abort();
default:
return -1;
break;
}
tsg_stat_log_handle_update(LOG_HANDLE_APPEND_CNT, 1);
@@ -1168,7 +1167,7 @@ struct TLD_handle_t *TLD_duplicate(struct TLD_handle_t *handle)
struct TLD_handle_t *TLD_create(int thread_id)
{
if(g_tsg_log_instance->mode==CLOSE)
if(g_tsg_log_instance->mode==CLOSE_SEND_MODE)
{
return NULL;
}
@@ -1180,7 +1179,7 @@ struct TLD_handle_t *TLD_create(int thread_id)
_handle->document = new Document(_handle->valueAllocator);
_handle->document->SetObject();
tsg_stat_log_handle_update(LOG_HANDLE_CREATE_CNT, 1);
return _handle;
}
@@ -1987,6 +1986,37 @@ int log_common_fields_new(const char *filename, id2field_t *id2field, struct top
return 0;
}
static unsigned char tsg_send_mode_get(char *mode_str)
{
if (mode_str == NULL)
{
return KAFKA_SEND_MODE; // kafka is defualt
}
unsigned char mode = CLOSE_SEND_MODE;
if (strstr(mode_str, "close") != NULL)
{
return CLOSE_SEND_MODE;
}
if (strstr(mode_str, "kafka") != NULL)
{
mode |= KAFKA_SEND_MODE;
}
if (strstr(mode_str, "ipfix") != NULL)
{
mode |= IPFIX_SEND_MODE;
}
if (mode == CLOSE_SEND_MODE)
{
return KAFKA_SEND_MODE; // kafka is defualt
}
return mode;
}
struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
{
char override_sled_ip[32]={0};
@@ -1997,6 +2027,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
struct tsg_log_instance_t *_instance=NULL;
char common_field_file[128]={0};
char log_path[128]={0};
char send_mode[128] = {0};
_instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
@@ -2028,14 +2059,31 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
return NULL;
}
MESA_load_profile_int_def(conffile, "TSG_LOG", "MODE",&(_instance->mode), 0);
if(_instance->mode==CLOSE)
MESA_load_profile_string_def(conffile, "TSG_LOG", "MODE", send_mode, sizeof(send_mode), "kafka");
_instance->mode = tsg_send_mode_get(send_mode);
if(_instance->mode==CLOSE_SEND_MODE)
{
MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "Disable tsg_send_log");
return _instance;
}
MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30);
if (_instance->mode&IPFIX_SEND_MODE)
{
char ipfix_conf_path[128] = {0};
MESA_load_profile_string_def(conffile, "TSG_LOG", "IPFIX_EXPORTER_CONF", ipfix_conf_path, sizeof(ipfix_conf_path), "./tsgconf/ipfix_conf.json");
_instance->ipfix_instance = ipfix_exporter_instance_init(ipfix_conf_path, _instance->logger, get_thread_count());
if (_instance->ipfix_instance == NULL)
{
free(_instance);
_instance=NULL;
return NULL;
}
MESA_load_profile_int_def(conffile, "TSG_LOG", "IPFIX_TEMPLATE_INTERVAL_PKTS", &(_instance->ipfix_template_interval_pkts), 1000);
}
MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30);
MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL);
MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL);
@@ -2136,7 +2184,7 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
return ;
}
if(instance->mode!=CLOSE)
if(instance->mode!=CLOSE_SEND_MODE)
{
for(int i=0; i<instance->max_service; i++)
{
@@ -2170,6 +2218,10 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
instance->service2topic=NULL;
}
if (instance->mode&IPFIX_SEND_MODE)
{
ipfix_exporter_destroy(instance->ipfix_instance);
}
MESA_destroy_runtime_log_handle(instance->logger);
instance->logger=NULL;
@@ -2179,6 +2231,48 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
return ;
}
static int tsg_send_ipfix_message(struct TLD_handle_t *_handle, int thread_id)
{
if (_handle == NULL)
{
return -1;
}
if (ipfix_message_get_current_sequence() % g_tsg_log_instance->ipfix_template_interval_pkts == 0)
{
ipfix_message_template_send(g_tsg_log_instance->ipfix_instance, thread_id);
}
Value::ConstMemberIterator schema_type = _handle->document->FindMember("common_schema_type");
if (schema_type == _handle->document->MemberEnd())
{
return -1;
}
struct ipfix_message* message = ipfix_message_new(g_tsg_log_instance->ipfix_instance, schema_type->value.GetString());
if (message == NULL)
{
return -1;
}
for (rapidjson::Value::ConstMemberIterator iter = _handle->document->MemberBegin(); iter != _handle->document->MemberEnd(); ++iter)
{
if (iter->value.GetType() == rapidjson::kStringType)
{
ipfix_message_append(message, iter->name.GetString(), iter->name.GetStringLength(), (char *)iter->value.GetString(), iter->value.GetStringLength());
}
else if (iter->value.GetType() == rapidjson::kNumberType)
{
int64_t value = iter->value.GetInt64();
ipfix_message_append(message, iter->name.GetString(), iter->name.GetStringLength(), (char *)&(value), sizeof(int64_t));
}
}
ipfix_message_send(g_tsg_log_instance->ipfix_instance, message, (uint16_t)thread_id);
ipfix_message_free(message);
return 0;
}
int send_log_by_type(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, int thread_id)
{
int ret=update_percent(_instance, log_type, LOG_STATUS_DROP, thread_id);
@@ -2191,13 +2285,21 @@ int send_log_by_type(struct tsg_log_instance_t *_instance, struct TLD_handle_t *
(a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id))
);
}
StringBuffer sb(0, 2048);
Writer<StringBuffer> writer(sb);
_handle->document->Accept(writer);
tsg_send_payload(_instance, log_type, (char *)sb.GetString(), sb.GetSize(), thread_id);
if (_instance->mode&KAFKA_SEND_MODE)
{
StringBuffer sb(0, 2048);
Writer<StringBuffer> writer(sb);
_handle->document->Accept(writer);
tsg_send_payload(_instance, log_type, (char *)sb.GetString(), sb.GetSize(), thread_id);
}
if (_instance->mode&IPFIX_SEND_MODE && log_type == LOG_TYPE_SESSION_RECORD)
{
tsg_send_ipfix_message(_handle, thread_id);
}
return 0;
}
@@ -2320,7 +2422,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
return -1;
}
if(_instance->mode==CLOSE)
if(_instance->mode==CLOSE_SEND_MODE)
{
TLD_cancel(_handle);
tsg_stat_sendlog_update(_instance->sum_stat_row_id, LOG_STATUS_DROP, 1);
@@ -2396,7 +2498,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name)
{
struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance;
if(_instance==NULL || _instance->mode==CLOSE || topic_name==NULL || _instance->kafka_handle==NULL)
if(_instance==NULL || _instance->mode==CLOSE_SEND_MODE || topic_name==NULL || _instance->kafka_handle==NULL)
{
return -1;
}
@@ -2417,7 +2519,7 @@ int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *pa
int status=0;
struct tsg_log_instance_t *_instance=instance;
if(_instance==NULL || _instance->mode==CLOSE)
if(_instance==NULL || _instance->mode==CLOSE_SEND_MODE)
{
return 0;
}