同一发送common_l7_protocol字段

This commit is contained in:
liuxueli
2020-11-14 15:50:44 +06:00
parent 39335c9543
commit 23f1e9238c
7 changed files with 280 additions and 14 deletions

View File

@@ -186,6 +186,9 @@ static int init_context(void **pme, tsg_protocol_t proto, struct Maat_rule_t *p_
*pme=dictator_malloc(thread_seq, sizeof(struct _master_context));
_context=(struct _master_context *)*pme;
_context->proto=proto;
_context->domain_len=0;
memset(_context->domain, 0, sizeof(_context->domain));
_context->hit_cnt=1;
_context->result=(struct Maat_rule_t *)dictator_malloc(thread_seq, sizeof(struct Maat_rule_t));
memcpy(_context->result, p_result, sizeof(struct Maat_rule_t));

View File

@@ -16,6 +16,7 @@
#include <MESA/MESA_prof_load.h>
#include <MESA/MESA_handle_logger.h>
#include "app_label.h"
#include "tsg_entry.h"
#include "tsg_send_log.h"
#include "tsg_send_log_internal.h"
@@ -150,6 +151,18 @@ char *log_field_id2name(struct tsg_log_instance_t *instance, tsg_log_field_id_t
return NULL;
}
char *tsg_l7_protocol_id2name(struct tsg_log_instance_t *instance, unsigned short id)
{
struct tsg_log_instance_t *_instance=instance;
if(_instance!=NULL && id>=MIN_L7_PROTO_ID && id<=MAX_L7_PROTO_ID)
{
return _instance->l7_proto_id2field[id].name;
}
return NULL;
}
static int set_common_sub_action(struct TLD_handle_t *handle, char *field_name, struct Maat_rule_t *p_result)
{
cJSON *item=NULL;
@@ -189,16 +202,33 @@ int set_common_field_from_label(struct tsg_log_instance_t *_instance, struct TLD
{
char buff[1024]={0};
char *l7_protocol=NULL;
struct _basic_proto_label *l7_proto_label=NULL;
struct _location_info_t *location=NULL;
struct _session_attribute_label_t *internal_label=NULL;
internal_label=(struct _session_attribute_label_t *)project_req_get_struct(a_stream, _instance->internal_project_id);
if(internal_label!=NULL)
{
l7_protocol=tsg_schema_index2string(internal_label->proto);
if(l7_protocol!=NULL)
{
TLD_append(_handle, _instance->id2field[LOG_COMMON_L7_PROTOCOL].name, (void *)l7_protocol, TLD_TYPE_STRING);
l7_proto_label=(struct _basic_proto_label *)project_req_get_struct(a_stream, _instance->l7_proto_project_id);
if(l7_proto_label!=NULL)
{
l7_protocol=tsg_l7_protocol_id2name(_instance, l7_proto_label->proto_id);
if(l7_protocol!=NULL)
{
TLD_append(_handle, _instance->id2field[LOG_COMMON_L7_PROTOCOL].name, (void *)l7_protocol, TLD_TYPE_STRING);
}
}
else
{
l7_protocol=tsg_schema_index2string(internal_label->proto);
if(l7_protocol!=NULL)
{
TLD_append(_handle, _instance->id2field[LOG_COMMON_L7_PROTOCOL].name, (void *)l7_protocol, TLD_TYPE_STRING);
}
else
{
TLD_append(_handle, _instance->id2field[LOG_COMMON_L7_PROTOCOL].name, (void *)"UNCATEGORIZED", TLD_TYPE_STRING);
}
}
TLD_append(_handle, _instance->id2field[LOG_COMMON_ESTABLISH_LATENCY_MS].name, (void *)internal_label->establish_latency_ms, TLD_TYPE_LONG);
@@ -234,6 +264,10 @@ int set_common_field_from_label(struct tsg_log_instance_t *_instance, struct TLD
TLD_append(_handle, _instance->id2field[LOG_SSL_JA3_FINGERPRINT].name, (void *)internal_label->ja3_fingerprint, TLD_TYPE_STRING);
}
}
else
{
TLD_append(_handle, _instance->id2field[LOG_COMMON_L7_PROTOCOL].name, (void *)"UNCATEGORIZED", TLD_TYPE_STRING);
}
return 0;
}
@@ -430,7 +464,7 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
}
memset(type_name, 0, sizeof(type_name));
ret=sscanf(line, "%s %s %d", type_name, field_name, &id);
assert(ret==3 && id<LOG_COMMON_MAX);
assert(ret==3);
for(i=0; i<TLD_TYPE_MAX; i++)
{
@@ -487,7 +521,10 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
fclose(fp);
fp=NULL;
*service2topic=_service2topic;
if(service2topic!=NULL)
{
*service2topic=_service2topic;
}
return 0;
}
@@ -495,6 +532,7 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
{
int i=0,ret=0;
int tmp_value=0;
char nic_name[32]={0};
char kafka_errstr[1024]={0};
unsigned int local_ip_nr=0;
@@ -543,6 +581,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
MESA_load_profile_string_def(conffile, "TSG_LOG", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat");
MESA_load_profile_string_def(conffile, "TSG_LOG", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat");
MESA_load_profile_string_def(conffile, "TSG_LOG", "L7_PROTO_LABEL", _instance->l7_proto_label, sizeof(_instance->l7_proto_label), "BASIC_PROTO_LABEL");
_instance->tcp_flow_project_id=project_customer_register(_instance->tcp_label, "struct");
_instance->udp_flow_project_id=project_customer_register(_instance->udp_label, "struct");
@@ -557,7 +596,17 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
);
}
_instance->l7_proto_project_id=project_customer_register(_instance->l7_proto_label, "struct");
if(_instance->l7_proto_project_id<0)
{
MESA_handle_runtime_log(_instance->logger,
RLOG_LV_FATAL,
"L7_PROTO_LABEL",
"project_customer_register is error, l7_proto_label: %s, please check etc/project.conf",
_instance->l7_proto_label
);
}
MESA_load_profile_string_def(conffile, "TSG_LOG", "NIC_NAME", nic_name, sizeof(nic_name), "eth0");
ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr);
if(ret<0)
@@ -571,7 +620,9 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
return NULL;
}
inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str));
MESA_load_profile_string_def(conffile, "TSG_LOG", "L7_PROTO_ID_FILE", _instance->l7_proto_id_file, sizeof(_instance->l7_proto_id_file), "./tsgconf/app_l7_proto_id.conf");
load_log_common_field(_instance->l7_proto_id_file, _instance->l7_proto_id2field, NULL, &tmp_value);
rdkafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr));

View File

@@ -5,7 +5,8 @@
#include <MESA/cJSON.h>
#include <time.h>
#define MIN_L7_PROTO_ID 100
#define MAX_L7_PROTO_ID 150
#define MAX_IPV4_LEN 16
#define MAX_STRING_LEN 32
@@ -93,6 +94,7 @@ struct tsg_log_instance_t
int level;
int max_service;
int recovery_interval;
int l7_proto_project_id;
int internal_project_id;
int tcp_flow_project_id;
int udp_flow_project_id;
@@ -102,15 +104,18 @@ struct tsg_log_instance_t
char log_path[MAX_STRING_LEN*2];
char tcp_label[MAX_STRING_LEN];
char udp_label[MAX_STRING_LEN];
char l7_proto_label[MAX_STRING_LEN];
char common_field_file[MAX_STRING_LEN*4];
char broker_list[MAX_STRING_LEN*4];
char send_queue_max_msg[MAX_STRING_LEN];
char require_ack[MAX_STRING_LEN];
char refresh_interval_ms[MAX_STRING_LEN];
char local_ip_str[MAX_IPV4_LEN];
char l7_proto_id_file[MAX_STRING_LEN*4];
id2field_t id2field[LOG_COMMON_MAX];
rd_kafka_topic_t **topic_rkt;
id2field_t *service2topic;
id2field_t l7_proto_id2field[MAX_L7_PROTO_ID+1];
void *logger;
};