diff --git a/src/tsg_entry.cpp b/src/tsg_entry.cpp index fd50e60..9adc663 100644 --- a/src/tsg_entry.cpp +++ b/src/tsg_entry.cpp @@ -52,8 +52,9 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{TLD_TYPE_UNKNOWN, TSG_FS2_LINKS, "link {TLD_TYPE_UNKNOWN, TSG_FS2_HIT_ADDR, "hit_addr"}, {TLD_TYPE_UNKNOWN, TSG_FS2_HIT_SHARE, "hit_share"}, {TLD_TYPE_UNKNOWN, TSG_FS2_INTERCEPT, "intercept"}, - {TLD_TYPE_UNKNOWN, TSG_FS2_LOG, "log"}, - {TLD_TYPE_UNKNOWN, TSG_FS2_DENY, "deny"} + {TLD_TYPE_UNKNOWN, TSG_FS2_SUCCESS_LOG, "success_log"}, + {TLD_TYPE_UNKNOWN, TSG_FS2_FAILED_LOG, "failed_log"}, + {TLD_TYPE_UNKNOWN, TSG_FS2_DROP_LOG, "drop_log"} }; id2field_t g_tsg_proto_name2id[PROTO_MAX]={{TLD_TYPE_UNKNOWN, PROTO_UNKONWN, "unknown"}, @@ -1068,6 +1069,13 @@ extern "C" int TSG_MASTER_INIT() g_tsg_para.fs2_field_id[g_tsg_fs2_field[i].id]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, g_tsg_fs2_field[i].name); } + char buff[32]={0}; + int thread_num=get_thread_count(); + for(i=0; ifs_status_ids[i]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, buff); + } FS_start(g_tsg_para.fs2_handle); diff --git a/src/tsg_entry.h b/src/tsg_entry.h index bbe774a..d50d1fa 100644 --- a/src/tsg_entry.h +++ b/src/tsg_entry.h @@ -50,8 +50,9 @@ enum TSG_FS2_TYPE{ TSG_FS2_HIT_ADDR, TSG_FS2_HIT_SHARE, TSG_FS2_INTERCEPT, - TSG_FS2_LOG, - TSG_FS2_DENY, + TSG_FS2_SUCCESS_LOG, + TSG_FS2_FAILED_LOG, + TSG_FS2_DROP_LOG, TSG_FS2_MAX }; diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 24d8e5e..c21dbe9 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -32,8 +33,26 @@ const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, " extern "C" int MESA_get_dev_ipv4(const char *device, int *ip_add); +static void on_delivery(int errno, int thread_id) +{ + struct timespec cur_time; + struct tsg_log_instance_t *_instance=g_tsg_log_instance; + if(errno) + { + clock_gettime(CLOCK_REALTIME, &cur_time); + if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=1) + { + _instance->send_log_percent[thread_id]/=2; + clock_gettime(CLOCK_REALTIME, &_instance->drop_start[thread_id]); + FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); + } + } + +} + int is_multi_hit_same_policy(struct Maat_rule_t *result, int *policy_id, int *policy_id_num) { + return 0; int j=0; for(j=0;j<*policy_id_num;j++) @@ -460,6 +479,16 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) struct tsg_log_instance_t *_instance=NULL; _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); + + int thread_num=get_thread_count(); + _instance->drop_start=(struct timespec *)calloc(1, sizeof(struct timespec)*thread_num); + _instance->fs_status_ids=(int *)calloc(1, sizeof(int)*thread_num); + _instance->send_log_percent=(int *)calloc(1, sizeof(int)*thread_num); + + for(i=0;isend_log_percent[i]=100; + } MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(level), 30); MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./tsglog/tsglog"); @@ -477,9 +506,16 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "TSG_LOG", "Disable tsg_send_log"); return _instance; } + + MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30); + MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL); MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "REQUIRE_ACK", _instance->require_ack, sizeof(_instance->require_ack), "1"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "TCP_LABEL", _instance->tcp_label, sizeof(_instance->tcp_label), "tcp_flow_stat"); MESA_load_profile_string_def(conffile, "TSG_LOG", "UDP_LABEL", _instance->udp_label, sizeof(_instance->udp_label), "udp_flow_stat"); @@ -512,10 +548,10 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str)); - rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "request.required.acks", "1", kafka_errstr, sizeof(kafka_errstr)); + rdkafka_conf = rd_kafka_conf_new(); + rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", _instance->send_queue_max_msg, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr)); if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) { @@ -568,6 +604,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl int i=0,status=0; char *payload=NULL; int repeat_cnt=0; + struct timespec cur_time; int policy_id[MAX_RESULT_NUM]={0}; struct TLD_handle_t *_handle=handle; struct tsg_log_instance_t *_instance=instance; @@ -581,7 +618,8 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl if(_instance->mode==CLOSE) { - TLD_cancel(handle); + TLD_cancel(handle); + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1); MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log."); return 0; } @@ -596,6 +634,13 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl { continue; } + + clock_gettime(CLOCK_MONOTONIC, &cur_time); + if((cur_time.tv_nsec%100)>_instance->send_log_percent[thread_id]) + { + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1); + continue; + } switch(log_msg->result[i].do_log) { @@ -632,14 +677,35 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl if(status < 0) { - MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", + //on_delivery(status, thread_id); + clock_gettime(CLOCK_REALTIME, &cur_time); + if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=1) + { + _instance->send_log_percent[thread_id]/=2; + clock_gettime(CLOCK_REALTIME, &_instance->drop_start[thread_id]); + FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); + } + + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FAILED_LOG], 0, FS_OP_ADD, 1); + + MESA_handle_runtime_log(_instance->logger, + RLOG_LV_INFO, + "TSG_SEND_LOG", "tsg_send_log to kafka is error, status: %d, topic: %s payload: %s", - status, _instance->service2topic[log_msg->result[i].service_id].name, payload); + status, + _instance->service2topic[log_msg->result[i].service_id].name, + payload + ); } else { - MESA_handle_runtime_log(_instance->logger,RLOG_LV_INFO, "TSG_SEND_LOG", - "log send successfully %s: %s", _instance->service2topic[log_msg->result[i].service_id].name, payload); + MESA_handle_runtime_log(_instance->logger, + RLOG_LV_DEBUG, + "TSG_SEND_LOG", + "log send successfully %s: %s", + _instance->service2topic[log_msg->result[i].service_id].name, + payload + ); } free(payload); @@ -649,12 +715,24 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name); TLD_delete(_handle, _instance->id2field[LOG_COMMON_ACTION].name); TLD_delete(_handle, _instance->id2field[LOG_COMMON_SUB_ACTION].name); - - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_LOG], 0, FS_OP_ADD, 1); + + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCCESS_LOG], 0, FS_OP_ADD, 1); + FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); } TLD_cancel(handle); + if(_instance->send_log_percent[thread_id]<100) + { + clock_gettime(CLOCK_REALTIME, &cur_time); + if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=_instance->recovery_interval) + { + _instance->send_log_percent[thread_id]++; + _instance->drop_start[thread_id].tv_sec=cur_time.tv_sec; + FS_operate(g_tsg_para.fs2_handle,_instance ->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); + } + } + return 0; } diff --git a/src/tsg_send_log_internal.h b/src/tsg_send_log_internal.h index 93dff80..1c1b32d 100644 --- a/src/tsg_send_log_internal.h +++ b/src/tsg_send_log_internal.h @@ -3,6 +3,8 @@ #include #include +#include + #define MAX_IPV4_LEN 16 @@ -85,19 +87,26 @@ struct TLD_handle_t struct tsg_log_instance_t { int mode; - int max_service; + int max_service; + int recovery_interval; int internal_project_id; int tcp_flow_project_id; int udp_flow_project_id; - void *logger; + int *send_log_percent; + int *fs_status_ids; + struct timespec *drop_start; char tcp_label[MAX_STRING_LEN]; char udp_label[MAX_STRING_LEN]; char common_field_file[MAX_STRING_LEN*4]; char broker_list[MAX_STRING_LEN*4]; + char send_queue_max_msg[MAX_STRING_LEN]; + char require_ack[MAX_STRING_LEN]; + char refresh_interval_ms[MAX_STRING_LEN]; char local_ip_str[MAX_IPV4_LEN]; id2field_t id2field[LOG_COMMON_MAX]; rd_kafka_topic_t **topic_rkt; - id2field_t *service2topic; + id2field_t *service2topic; + void *logger; }; char *log_field_id2name(struct tsg_log_instance_t *instance, tsg_log_field_id_t id);