TSG-7510: tsg_master提供发送payload到kafka的接口

This commit is contained in:
刘学利
2021-08-24 05:28:08 +00:00
parent ea1875f229
commit e6e71dca4f
5 changed files with 83 additions and 5 deletions

View File

@@ -39,6 +39,10 @@ int TLD_cancel(struct TLD_handle_t *handle);
int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id); int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id);
//return topic_id; return >=0 if success,otherwise return -1;
int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name);
int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *payload, int payload_len, int thread_id);
unsigned long long tsg_get_stream_id(struct streaminfo *a_stream); unsigned long long tsg_get_stream_id(struct streaminfo *a_stream);
char *tsg_l7_protocol_id2name(unsigned int l7_protocol_id); char *tsg_l7_protocol_id2name(unsigned int l7_protocol_id);
unsigned int tsg_l7_protocol_name2id(const char *l7_protocol_name); unsigned int tsg_l7_protocol_name2id(const char *l7_protocol_name);

View File

@@ -75,7 +75,9 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{0, TSG_FS2_TCP_LINKS, "tcp_links"},
{0, TSG_FS2_MIRRORED_PKT_SUCCESS, "mirror_pkt_suc"}, {0, TSG_FS2_MIRRORED_PKT_SUCCESS, "mirror_pkt_suc"},
{0, TSG_FS2_MIRRORED_BYTE_SUCCESS, "mirror_byte_suc"}, {0, TSG_FS2_MIRRORED_BYTE_SUCCESS, "mirror_byte_suc"},
{0, TSG_FS2_MIRRORED_PKT_FAILED, "mirror_pkt_fai"}, {0, TSG_FS2_MIRRORED_PKT_FAILED, "mirror_pkt_fai"},
{0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"} {0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"},
{0, TSG_FS2_DDOS_SUCCESS_LOG, "ddos_suc_log"},
{0, TSG_FS2_DDOS_FAILED_LOG, "ddos_fai_log"}
}; };
id2field_t g_tsg_proto_name2id[PROTO_MAX]={{PROTO_UNKONWN, 0, "unknown"}, id2field_t g_tsg_proto_name2id[PROTO_MAX]={{PROTO_UNKONWN, 0, "unknown"},

View File

@@ -109,6 +109,8 @@ enum TSG_FS2_TYPE{
TSG_FS2_MIRRORED_BYTE_SUCCESS, TSG_FS2_MIRRORED_BYTE_SUCCESS,
TSG_FS2_MIRRORED_PKT_FAILED, TSG_FS2_MIRRORED_PKT_FAILED,
TSG_FS2_MIRRORED_BYTE_FAILED, TSG_FS2_MIRRORED_BYTE_FAILED,
TSG_FS2_DDOS_SUCCESS_LOG,
TSG_FS2_DDOS_FAILED_LOG,
TSG_FS2_MAX TSG_FS2_MAX
}; };
@@ -244,6 +246,7 @@ typedef struct tsg_para
char data_center[_MAX_TABLE_NAME_LEN]; char data_center[_MAX_TABLE_NAME_LEN];
char table_name[TABLE_MAX][_MAX_TABLE_NAME_LEN]; char table_name[TABLE_MAX][_MAX_TABLE_NAME_LEN];
void *logger; void *logger;
void *maat_logger;
struct reset_argv reset; struct reset_argv reset;
screen_stat_handle_t fs2_handle; screen_stat_handle_t fs2_handle;
struct l7_protocol *name_by_id; struct l7_protocol *name_by_id;

View File

@@ -1358,7 +1358,6 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
char nic_name[32]={0}; char nic_name[32]={0};
char kafka_errstr[1024]={0}; char kafka_errstr[1024]={0};
unsigned int local_ip_nr=0; unsigned int local_ip_nr=0;
rd_kafka_t *kafka_handle = NULL;
rd_kafka_conf_t *rdkafka_conf = NULL; rd_kafka_conf_t *rdkafka_conf = NULL;
rd_kafka_topic_conf_t *topic_conf; rd_kafka_topic_conf_t *topic_conf;
struct tsg_log_instance_t *_instance=NULL; struct tsg_log_instance_t *_instance=NULL;
@@ -1473,7 +1472,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "sasl.password", _instance->sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
} }
if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) if(!(_instance->kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
{ {
MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_new is error"); MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_new is error");
return NULL; return NULL;
@@ -1490,7 +1489,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
if(_instance->service2topic[i].type==TLD_TYPE_MAX) if(_instance->service2topic[i].type==TLD_TYPE_MAX)
{ {
topic_conf=rd_kafka_topic_conf_new(); topic_conf=rd_kafka_topic_conf_new();
_instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(kafka_handle, _instance->service2topic[i].name, topic_conf); _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(_instance->kafka_handle, _instance->service2topic[i].name, topic_conf);
} }
} }
} }
@@ -1685,3 +1684,72 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
return 0; return 0;
} }
int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name)
{
rd_kafka_topic_conf_t *topic_conf;
struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance;
if(_instance->mode==CLOSE)
{
return 0;
}
if(topic_name!=NULL && _instance!=NULL && _instance->kafka_handle!=NULL)
{
_instance->service2topic=(id2field_t *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(id2field_t));
_instance->service2topic[_instance->max_service].id=_instance->max_service;
_instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX;
memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN);
memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name)));
_instance->topic_rkt=(rd_kafka_topic_t **)realloc(_instance->topic_rkt, (_instance->max_service+1)*sizeof(rd_kafka_topic_t*));
topic_conf=rd_kafka_topic_conf_new();
_instance->topic_rkt[_instance->max_service]=rd_kafka_topic_new(_instance->kafka_handle, topic_name, topic_conf);
_instance->max_service++;
}
else
{
return -1;
}
return (_instance->max_service-1);
}
int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *payload, int payload_len, int thread_id)
{
int status=0;
struct tsg_log_instance_t *_instance=instance;
if(_instance->mode==CLOSE)
{
return 0;
}
if(_instance==NULL || payload==NULL || payload_len<=0 || topic_id<0 || _instance->topic_rkt==NULL)
{
return -1;
}
status=rd_kafka_produce(_instance->topic_rkt[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, NULL, 0, NULL);
if(status<0)
{
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_FAILED_LOG], 0, FS_OP_ADD, 1);
MESA_handle_runtime_log(_instance->logger,
RLOG_LV_INFO,
"TSG_SEND_LOG",
"tsg_send_log to kafka is error of %s(%s), status: %d, topic: %s",
rd_kafka_err2name(rd_kafka_last_error()),
rd_kafka_err2str(rd_kafka_last_error()),
status,
_instance->service2topic[topic_id].name
);
}
else
{
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_SUCCESS_LOG], 0, FS_OP_ADD, 1);
}
return 0;
}

View File

@@ -153,6 +153,7 @@ struct tsg_log_instance_t
char l7_proto_id_file[MAX_STRING_LEN*4]; char l7_proto_id_file[MAX_STRING_LEN*4];
id2field_t id2field[LOG_COMMON_MAX]; id2field_t id2field[LOG_COMMON_MAX];
rd_kafka_topic_t **topic_rkt; rd_kafka_topic_t **topic_rkt;
rd_kafka_t *kafka_handle;
id2field_t *service2topic; id2field_t *service2topic;
void *logger; void *logger;
}; };