增加当kafka broker性能不足时丢日志的逻辑,确保不丢流量

This commit is contained in:
liuxueli
2020-09-01 11:35:49 +08:00
parent 08c8985d9d
commit 4229468e71
4 changed files with 114 additions and 18 deletions

View File

@@ -10,6 +10,7 @@
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <pthread.h>
#include <MESA/stream.h>
#include <MESA/MESA_prof_load.h>
@@ -32,8 +33,26 @@ const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "
extern "C" int MESA_get_dev_ipv4(const char *device, int *ip_add);
static void on_delivery(int errno, int thread_id)
{
struct timespec cur_time;
struct tsg_log_instance_t *_instance=g_tsg_log_instance;
if(errno)
{
clock_gettime(CLOCK_REALTIME, &cur_time);
if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=1)
{
_instance->send_log_percent[thread_id]/=2;
clock_gettime(CLOCK_REALTIME, &_instance->drop_start[thread_id]);
FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
}
}
}
int is_multi_hit_same_policy(struct Maat_rule_t *result, int *policy_id, int *policy_id_num)
{
return 0;
int j=0;
for(j=0;j<*policy_id_num;j++)
@@ -460,6 +479,16 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
struct tsg_log_instance_t *_instance=NULL;
_instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
int thread_num=get_thread_count();
_instance->drop_start=(struct timespec *)calloc(1, sizeof(struct timespec)*thread_num);
_instance->fs_status_ids=(int *)calloc(1, sizeof(int)*thread_num);
_instance->send_log_percent=(int *)calloc(1, sizeof(int)*thread_num);
for(i=0;i<thread_num; i++)
{
_instance->send_log_percent[i]=100;
}
MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(level), 30);
MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./tsglog/tsglog");
@@ -477,9 +506,16 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "TSG_LOG", "Disable tsg_send_log");
return _instance;
}
MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30);
MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL);
MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL);
MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
MESA_load_profile_string_def(conffile, "TSG_LOG", "REQUIRE_ACK", _instance->require_ack, sizeof(_instance->require_ack), "1");
MESA_load_profile_string_def(conffile, "TSG_LOG", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat");
MESA_load_profile_string_def(conffile, "TSG_LOG", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat");
@@ -512,10 +548,10 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str));
rdkafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "request.required.acks", "1", kafka_errstr, sizeof(kafka_errstr));
rdkafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr));
if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr))))
{
@@ -568,6 +604,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
int i=0,status=0;
char *payload=NULL;
int repeat_cnt=0;
struct timespec cur_time;
int policy_id[MAX_RESULT_NUM]={0};
struct TLD_handle_t *_handle=handle;
struct tsg_log_instance_t *_instance=instance;
@@ -581,7 +618,8 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
if(_instance->mode==CLOSE)
{
TLD_cancel(handle);
TLD_cancel(handle);
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1);
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log.");
return 0;
}
@@ -596,6 +634,13 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
{
continue;
}
clock_gettime(CLOCK_MONOTONIC, &cur_time);
if((cur_time.tv_nsec%100)>_instance->send_log_percent[thread_id])
{
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1);
continue;
}
switch(log_msg->result[i].do_log)
{
@@ -632,14 +677,35 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
if(status < 0)
{
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG",
//on_delivery(status, thread_id);
clock_gettime(CLOCK_REALTIME, &cur_time);
if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=1)
{
_instance->send_log_percent[thread_id]/=2;
clock_gettime(CLOCK_REALTIME, &_instance->drop_start[thread_id]);
FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
}
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FAILED_LOG], 0, FS_OP_ADD, 1);
MESA_handle_runtime_log(_instance->logger,
RLOG_LV_INFO,
"TSG_SEND_LOG",
"tsg_send_log to kafka is error, status: %d, topic: %s payload: %s",
status, _instance->service2topic[log_msg->result[i].service_id].name, payload);
status,
_instance->service2topic[log_msg->result[i].service_id].name,
payload
);
}
else
{
MESA_handle_runtime_log(_instance->logger,RLOG_LV_INFO, "TSG_SEND_LOG",
"log send successfully %s: %s", _instance->service2topic[log_msg->result[i].service_id].name, payload);
MESA_handle_runtime_log(_instance->logger,
RLOG_LV_DEBUG,
"TSG_SEND_LOG",
"log send successfully %s: %s",
_instance->service2topic[log_msg->result[i].service_id].name,
payload
);
}
free(payload);
@@ -649,12 +715,24 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name);
TLD_delete(_handle, _instance->id2field[LOG_COMMON_ACTION].name);
TLD_delete(_handle, _instance->id2field[LOG_COMMON_SUB_ACTION].name);
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_LOG], 0, FS_OP_ADD, 1);
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCCESS_LOG], 0, FS_OP_ADD, 1);
FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
}
TLD_cancel(handle);
if(_instance->send_log_percent[thread_id]<100)
{
clock_gettime(CLOCK_REALTIME, &cur_time);
if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=_instance->recovery_interval)
{
_instance->send_log_percent[thread_id]++;
_instance->drop_start[thread_id].tv_sec=cur_time.tv_sec;
FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
}
}
return 0;
}