diff --git a/bin/tsg_log_field.conf b/bin/tsg_log_field.conf index 6a49614..237e55e 100644 --- a/bin/tsg_log_field.conf +++ b/bin/tsg_log_field.conf @@ -1,7 +1,6 @@ #TYPE:1:UCHAR,2:USHORT,3:ULONG,4:ULOG,5:USTRING,6:FILE,7:UBASE64,8:PACKET #TYPE TOPIC SERVICE TOPIC SECURITY-EVENT 0 -TOPIC SECURITY-EVENT 1 TOPIC SESSION-RECORD 2 TOPIC INTERNAL-RTP-RECORD 4 TOPIC VOIP-RECORD 5 diff --git a/src/tsg_entry.cpp b/src/tsg_entry.cpp index c93ea33..e39a583 100644 --- a/src/tsg_entry.cpp +++ b/src/tsg_entry.cpp @@ -66,14 +66,6 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{0, TSG_FS2_TCP_LINKS, "tcp_links"}, {0, TSG_FS2_HIT_SHARE, "hit_share"}, {0, TSG_FS2_INTERCEPT, "intercept"}, {0, TSG_FS2_EXCLUSION, "exclusion"}, - {0, TSG_FS2_SUCCESS_LOG, "success_log"}, - {0, TSG_FS2_FAILED_LOG, "failed_log"}, - {0, TSG_FS2_DROP_LOG, "drop_log"}, - {0, TSG_FS2_ABORT_ALLOW, "abort_allow"}, - {0, TSG_FS2_ABORT_DENY, "abort_deny"}, - {0, TSG_FS2_ABORT_MONITOR, "abort_monitor"}, - {0, TSG_FS2_ABORT_INTERCEPT, "abort_intercept"}, - {0, TSG_FS2_ABORT_UNKNOWN, "abort_unknown"}, {0, TSG_FS2_APP_DPKT_RESULT, "D_result"}, {0, TSG_FS2_APP_Q_RESULT, "Q_result"}, {0, TSG_FS2_APP_USER_RESULT, "U_result"}, @@ -84,19 +76,11 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{0, TSG_FS2_TCP_LINKS, "tcp_links"}, {0, TSG_FS2_MIRRORED_BYTE_SUCCESS, "mirror_byte_suc"}, {0, TSG_FS2_MIRRORED_PKT_FAILED, "mirror_pkt_fai"}, {0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"}, - {0, TSG_FS2_DDOS_SUCCESS_LOG, "ddos_suc_log"}, - {0, TSG_FS2_DDOS_FAILED_LOG, "ddos_fai_log"}, {0, TSG_FS2_SET_TIMOUT_SUCCESS, "set_timeout_suc"}, {0, TSG_FS2_SET_TIMOUT_FAILED, "set_timeout_fai"}, - {0, TSG_FS2_CREATE_LOG_HANDLE, "create_log_cnt"}, - {0, TSG_FS2_DUP_LOG_HANDLE, "dup_log_cnt"}, - {0, TSG_FS2_APPEND_LOG_HANDLE, "append_log_cnt"}, - {0, TSG_FS2_FREE_LOG_HANDLE, "free_log_cnt"}, - {0, TSG_FS2_FREE_RAPID_SIZE, "free_rapid_size"}, - {0, TSG_FS2_FREE_RAPID_CAPACITY, "free_rapid_capacity"}, {0, TSG_FS2_SUCESS_TAMPER, "tamper_sucess"}, {0, TSG_FS2_TAMPER_FAILED_PLOAD_LESS_4, "tamper_nopload"}, - {0, TSG_FS2_TAMPER_FAILED_NOSWOP, "tamper_noswop"} + {0, TSG_FS2_TAMPER_FAILED_NOSWAP, "tamper_noswap"} }; id2field_t g_tsg_proto_name2id[PROTO_MAX]={{PROTO_UNKONWN, 0, "unknown"}, @@ -2283,14 +2267,6 @@ extern "C" int TSG_MASTER_INIT() return -1; } - g_tsg_log_instance=tsg_sendlog_init(tsg_conffile); - if(g_tsg_log_instance==NULL) - { - MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_SENDLOG", "tsg_sendlog_init failed ..."); - return -1; - } - g_tsg_log_instance->session_attribute_project_id=g_tsg_para.session_attribute_project_id; - MESA_load_profile_int_def(tsg_conffile, "FIELD_STAT", "CYCLE", &cycle, 30); MESA_load_profile_short_nodef(tsg_conffile, "FIELD_STAT","TELEGRAF_PORT", (short *)&(fs_server_port)); MESA_load_profile_string_nodef(tsg_conffile,"FIELD_STAT","TELEGRAF_IP",fs_server_ip, sizeof(fs_server_ip)); @@ -2325,20 +2301,16 @@ extern "C" int TSG_MASTER_INIT() { g_tsg_para.fs2_field_id[i]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_FIELD, FS_CALC_SPEED, g_tsg_fs2_field[i].name); } - - int thread_num=get_thread_count(); - for(i=0; ifs_status_ids[i]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, buff); - } - FS_start(g_tsg_para.fs2_handle); - - for(i=0; ifs_status_ids[i], 0, FS_OP_SET, g_tsg_log_instance->send_log_percent[i]); + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_SENDLOG", "tsg_sendlog_init failed ..."); + return -1; } + g_tsg_log_instance->session_attribute_project_id=g_tsg_para.session_attribute_project_id; + + FS_start(g_tsg_para.fs2_handle); ret=tsg_statistic_init(tsg_conffile, g_tsg_para.logger); if(ret<0) diff --git a/src/tsg_entry.h b/src/tsg_entry.h index 7e33e42..3a6dfdc 100644 --- a/src/tsg_entry.h +++ b/src/tsg_entry.h @@ -92,7 +92,6 @@ enum MASTER_TABLE{ TABLE_MAX }; - enum TSG_FS2_TYPE{ TSG_FS2_TCP_LINKS=0, TSG_FS2_UDP_LINKS, @@ -101,14 +100,6 @@ enum TSG_FS2_TYPE{ TSG_FS2_HIT_SHARE, TSG_FS2_INTERCEPT, TSG_FS2_EXCLUSION, - TSG_FS2_SUCCESS_LOG, - TSG_FS2_FAILED_LOG, - TSG_FS2_DROP_LOG, - TSG_FS2_ABORT_ALLOW, - TSG_FS2_ABORT_DENY, - TSG_FS2_ABORT_MONITOR, - TSG_FS2_ABORT_INTERCEPT, - TSG_FS2_ABORT_UNKNOWN, TSG_FS2_APP_DPKT_RESULT, TSG_FS2_APP_Q_RESULT, TSG_FS2_APP_USER_RESULT, @@ -119,19 +110,11 @@ enum TSG_FS2_TYPE{ TSG_FS2_MIRRORED_BYTE_SUCCESS, TSG_FS2_MIRRORED_PKT_FAILED, TSG_FS2_MIRRORED_BYTE_FAILED, - TSG_FS2_DDOS_SUCCESS_LOG, - TSG_FS2_DDOS_FAILED_LOG, TSG_FS2_SET_TIMOUT_SUCCESS, TSG_FS2_SET_TIMOUT_FAILED, - TSG_FS2_CREATE_LOG_HANDLE, - TSG_FS2_DUP_LOG_HANDLE, - TSG_FS2_APPEND_LOG_HANDLE, - TSG_FS2_FREE_LOG_HANDLE, - TSG_FS2_FREE_RAPID_SIZE, - TSG_FS2_FREE_RAPID_CAPACITY, TSG_FS2_SUCESS_TAMPER, TSG_FS2_TAMPER_FAILED_PLOAD_LESS_4, - TSG_FS2_TAMPER_FAILED_NOSWOP, + TSG_FS2_TAMPER_FAILED_NOSWAP, TSG_FS2_MAX }; diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 59ad2e1..231b83a 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -41,6 +41,21 @@ struct TLD_handle_t Document *document; }; +id2field_t g_log_fs2_field[LOG_FS2_TYPE_MAX]={ + {0, LOG_FS2_ABORT_ALLOW, "abort_allow"}, + {0, LOG_FS2_ABORT_DENY, "abort_deny"}, + {0, LOG_FS2_ABORT_MONITOR, "abort_monitor"}, + {0, LOG_FS2_ABORT_INTERCEPT, "abort_intercept"}, + {0, LOG_FS2_ABORT_UNKNOWN, "abort_unknown"}, + {0, LOG_FS2_CREATE_LOG_HANDLE, "create_log_cnt"}, + {0, LOG_FS2_DUP_LOG_HANDLE, "dup_log_cnt"}, + {0, LOG_FS2_APPEND_LOG_HANDLE, "append_log_cnt"}, + {0, LOG_FS2_FREE_LOG_HANDLE, "free_log_cnt"}, + {0, LOG_FS2_FREE_RAPID_SIZE, "free_rapid_size"}, + {0, LOG_FS2_FREE_RAPID_CAPACITY, "free_rapid_capacity"} + }; + + const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "UNKOWN"}, {TLD_TYPE_LONG, TLD_TYPE_LONG, "LONG"}, {TLD_TYPE_STRING, TLD_TYPE_STRING, "STRING"}, @@ -92,6 +107,101 @@ static void add_str_member(struct TLD_handle_t *_handle, Value *object, const ch object->AddMember(temp_key, temp_val, _handle->document->GetAllocator()); } + +static int register_topic(struct tsg_log_instance_t *instance, struct topic_stat *topic) +{ + rd_kafka_topic_conf_t *topic_conf; + struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance; + + topic_conf=rd_kafka_topic_conf_new(); + topic->status=1; + topic->topic_rkt=(rd_kafka_topic_t *)calloc(1, sizeof(rd_kafka_topic_t*)); + topic->topic_rkt=rd_kafka_topic_new(_instance->kafka_handle, topic->name, topic_conf); + + int thread_num=get_thread_count(); + topic->drop_start=(struct timespec *)calloc(thread_num, sizeof(struct timespec)); + topic->send_log_percent=(int *)calloc(thread_num, sizeof(int)); + + for(int i=0; isend_log_percent[i]=100; + clock_gettime(CLOCK_REALTIME, &(topic->drop_start[i])); + } + + topic->fs2_line_id=FS_register(_instance->fs2_handle, FS_STYLE_LINE, FS_CALC_SPEED, topic->name); + + return 1; +} + + +static int update_percent(struct tsg_log_instance_t *_instance, int service_id, enum LOG_COLUMN_STATUS column, int thread_id) +{ + struct timespec cur_time; + struct topic_stat *topic=(struct topic_stat *)&(_instance->service2topic[service_id]); + + clock_gettime(CLOCK_REALTIME, &cur_time); + + switch(column) + { + case LOG_COLUMN_STATUS_SUCCESS: + FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1); + + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1); + break; + case LOG_COLUMN_STATUS_FAIL: + FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1); + + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1); + if(cur_time.tv_sec - topic->drop_start[thread_id].tv_sec>=1) + { + topic->send_log_percent[thread_id]/=2; + topic->drop_start[thread_id]=cur_time; + } + break; + case LOG_COLUMN_STATUS_DROP: + if((cur_time.tv_nsec%100) > topic->send_log_percent[thread_id]) + { + FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1); + + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1); + return 1; + } + break; + case LOG_COLUMN_STATUS_MAX: + if(topic->send_log_percent[thread_id]>=100) + { + break; + } + + if((cur_time.tv_sec - topic->drop_start[thread_id].tv_sec) >= _instance->recovery_interval) + { + topic->send_log_percent[thread_id]++; + topic->drop_start[thread_id]=cur_time; + } + break; + default: + break; + } + + return 0; +} + +static struct tsg_log_instance_t *get_log_instance(void) +{ + if(g_tsg_log_instance!=NULL) + { + return g_tsg_log_instance; + } + + return NULL; +} + static int is_tunnels(struct streaminfo *a_stream) { const struct streaminfo *ptmp = a_stream; @@ -857,37 +967,39 @@ static int action2fs_id(int action) switch(action) { case TSG_ACTION_DENY: - return TSG_FS2_ABORT_DENY; + return LOG_FS2_ABORT_DENY; break; case TSG_ACTION_BYPASS: - return TSG_FS2_ABORT_ALLOW; + return LOG_FS2_ABORT_ALLOW; break; case TSG_ACTION_MONITOR: - return TSG_FS2_ABORT_MONITOR; + return LOG_FS2_ABORT_MONITOR; break; case TSG_ACTION_INTERCEPT: - return TSG_FS2_ABORT_INTERCEPT; + return LOG_FS2_ABORT_INTERCEPT; break; default: - return TSG_FS2_ABORT_UNKNOWN; + return LOG_FS2_ABORT_UNKNOWN; break; } - return TSG_FS2_ABORT_UNKNOWN; + return LOG_FS2_ABORT_UNKNOWN; } int TLD_cancel(struct TLD_handle_t *handle) { - long long length=0; if (handle != NULL) { if (handle->document != NULL) - { + { + long long length=0; + struct tsg_log_instance_t *_instance=get_log_instance(); + length=handle->document->GetAllocator().Size(); - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FREE_RAPID_SIZE], 0, FS_OP_ADD, length); + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_FREE_RAPID_SIZE], 0, FS_OP_ADD, length); length=handle->document->GetAllocator().Capacity(); - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FREE_RAPID_CAPACITY], 0, FS_OP_ADD, length); + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_FREE_RAPID_CAPACITY], 0, FS_OP_ADD, length); delete handle->document; handle->document = NULL; @@ -895,7 +1007,7 @@ int TLD_cancel(struct TLD_handle_t *handle) delete handle->valueAllocator; handle->valueAllocator=NULL; - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FREE_LOG_HANDLE], 0, FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_FREE_LOG_HANDLE], 0, FS_OP_ADD, 1); } free(handle); @@ -961,7 +1073,9 @@ int TLD_append(struct TLD_handle_t *handle, char *key, void *value, TLD_TYPE typ break; } - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_APPEND_LOG_HANDLE], 0, FS_OP_ADD, 1); + struct tsg_log_instance_t *_instance=get_log_instance(); + + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_APPEND_LOG_HANDLE], 0, FS_OP_ADD, 1); return 0; } @@ -1012,8 +1126,9 @@ struct TLD_handle_t *TLD_duplicate(struct TLD_handle_t *handle) //_handle->document->SetObject(); _handle->document->CopyFrom(*handle->document, _handle->document->GetAllocator()); - - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DUP_LOG_HANDLE], 0, FS_OP_ADD, 1); + + struct tsg_log_instance_t *_instance=get_log_instance(); + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_DUP_LOG_HANDLE], 0, FS_OP_ADD, 1); return _handle; } @@ -1029,7 +1144,8 @@ struct TLD_handle_t *TLD_create(int thread_id) _handle->document = new Document(_handle->valueAllocator); _handle->document->SetObject(); - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_CREATE_LOG_HANDLE], 0, FS_OP_ADD, 1); + struct tsg_log_instance_t *_instance=get_log_instance(); + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_CREATE_LOG_HANDLE], 0, FS_OP_ADD, 1); return _handle; } @@ -1440,15 +1556,15 @@ int TLD_append_streaminfo(struct tsg_log_instance_t *instance, struct TLD_handle return 0; } -int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t **service2topic, int *max_service) +int load_log_common_field(const char *filename, id2field_t *id2field, struct topic_stat **service2topic, int *max_service) { - int i=0; + int i=0,flag=0; int ret=0,id=0; FILE *fp=NULL; char line[1024]={0}; char field_name[64]={0}; char type_name[32]={0}; - id2field_t *_service2topic=NULL; + struct topic_stat *_service2topic=NULL; fp=fopen(filename, "r"); if(fp==NULL) @@ -1481,13 +1597,14 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t id2field[id].type = tld_type[i].type; id2field[id].id = id; memcpy(id2field[id].name, field_name, strlen(field_name)); + flag=1; break; case TLD_TYPE_TOPIC: if(_service2topic==NULL) { - _service2topic=(id2field_t *)calloc(1, sizeof(id2field_t)*(id+1)); + _service2topic=(struct topic_stat *)calloc(1, sizeof(struct topic_stat)*(id+1)); _service2topic[id].type = TLD_TYPE_MAX; - _service2topic[id].id = id; + //_service2topic[id].id = id; memcpy(_service2topic[id].name, field_name, strlen(field_name)); *max_service=id+1; @@ -1496,26 +1613,33 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t { if(*max_service<=id) { - _service2topic=(id2field_t *)realloc(_service2topic, sizeof(id2field_t)*(id+1)); - memset(&_service2topic[id], 0, sizeof(id2field_t)); + _service2topic=(struct topic_stat *)realloc(_service2topic, sizeof(struct topic_stat)*(id+1)); + memset(&_service2topic[id], 0, sizeof(struct topic_stat)); _service2topic[id].type = TLD_TYPE_MAX; - _service2topic[id].id = id; + //_service2topic[id].id = id; memcpy(_service2topic[id].name, field_name, strlen(field_name)); *max_service=id+1; } else { - memset(&_service2topic[id], 0, sizeof(id2field_t)); + memset(&_service2topic[id], 0, sizeof(struct topic_stat)); _service2topic[id].type = TLD_TYPE_MAX; - _service2topic[id].id = id; + //_service2topic[id].id = id; memcpy(_service2topic[id].name, field_name, strlen(field_name)); } } + flag=1; break; default: break; } + + if(flag==1) + { + flag=0; + break; + } } } memset(line, 0, sizeof(line)); @@ -1526,13 +1650,13 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t if(service2topic!=NULL) { - *service2topic=_service2topic; + (*service2topic)=_service2topic; } return 0; } -struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) +struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_handle_t fs2_handle) { int i=0,ret=0; char label_buff[128]={0}; @@ -1540,20 +1664,25 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) char kafka_errstr[1024]={0}; unsigned int local_ip_nr=0; rd_kafka_conf_t *rdkafka_conf = NULL; - rd_kafka_topic_conf_t *topic_conf; struct tsg_log_instance_t *_instance=NULL; - _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); - - int thread_num=get_thread_count(); - _instance->drop_start=(struct timespec *)calloc(1, sizeof(struct timespec)*thread_num); - _instance->fs_status_ids=(int *)calloc(1, sizeof(int)*thread_num); - _instance->send_log_percent=(int *)calloc(1, sizeof(int)*thread_num); + _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); + _instance->fs2_handle=fs2_handle; - for(i=0;isend_log_percent[i]=100; + _instance->fs2_field_id[i]=FS_register(_instance->fs2_handle, FS_STYLE_FIELD, FS_CALC_SPEED, g_log_fs2_field[i].name); } + + _instance->fs2_column_id[LOG_COLUMN_STATUS_SUCCESS]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, "T_success_log"); + _instance->fs2_column_id[LOG_COLUMN_STATUS_FAIL]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, "T_fail_log"); + _instance->fs2_column_id[LOG_COLUMN_STATUS_DROP]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, "T_drop_log"); + + _instance->fs2_column_id[LOG_COLUMN_STATUS_SUCCESS_S]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, "success_log/s"); + _instance->fs2_column_id[LOG_COLUMN_STATUS_FAIL_S]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, "fail_log/s"); + _instance->fs2_column_id[LOG_COLUMN_STATUS_DROP_S]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, "drop_log/s"); + + _instance->sum_line_id=FS_register(_instance->fs2_handle, FS_STYLE_LINE, FS_CALC_SPEED, "SUM"); MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30); MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", _instance->log_path, sizeof(_instance->log_path), "./tsglog/tsglog"); @@ -1665,14 +1794,16 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) if(_instance->service2topic!=NULL) { - _instance->topic_rkt=(rd_kafka_topic_t **)calloc(1, (_instance->max_service)*sizeof(rd_kafka_topic_t*)); - for(i=0; i<_instance->max_service; i++) { - if(_instance->service2topic[i].type==TLD_TYPE_MAX) + if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0) { - topic_conf=rd_kafka_topic_conf_new(); - _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(_instance->kafka_handle, _instance->service2topic[i].name, topic_conf); + register_topic(_instance, &( _instance->service2topic[i])); + } + + if(i==1) + { + memcpy(&(_instance->service2topic[i]), &(_instance->service2topic[0]), sizeof(struct topic_stat)); // service id of security event is 0 and 1 } } } @@ -1699,20 +1830,32 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) { for(int i=0; imax_service; i++) { - if(instance->topic_rkt[i]==NULL) - { + if(instance->service2topic[i].type!=TLD_TYPE_MAX || i==1) //i=1 equal i=0, service id of security event is 0 and 1 + { continue; } + + if(instance->service2topic[i].topic_rkt!=NULL) + { + rd_kafka_topic_destroy(instance->service2topic[i].topic_rkt); + } - rd_kafka_topic_destroy(instance->topic_rkt[i]); + if(instance->service2topic[i].drop_start!=NULL) + { + free(instance->service2topic[i].drop_start); + instance->service2topic[i].drop_start=NULL; + } + + if(instance->service2topic[i].send_log_percent!=NULL) + { + free(instance->service2topic[i].send_log_percent); + instance->service2topic[i].send_log_percent=NULL; + } } //rd_kafka_destroy_flags(instance->kafka_handle, 4); rd_kafka_destroy(instance->kafka_handle); - free(instance->topic_rkt); - instance->topic_rkt=NULL; - free(instance->service2topic); instance->service2topic=NULL; } @@ -1721,15 +1864,6 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) MESA_destroy_runtime_log_handle(instance->logger); instance->logger=NULL; - free(instance->drop_start); - instance->drop_start=NULL; - - free(instance->fs_status_ids); - instance->fs_status_ids=NULL; - - free(instance->send_log_percent); - instance->send_log_percent=NULL; - free(instance); instance=NULL; /* @@ -1751,10 +1885,8 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id) { - int fs_id=0; - int i=0,status=0; - int repeat_cnt=0; - struct timespec cur_time; + int fs_id=0,ret=0; + int i=0,repeat_cnt=0; int policy_id[MAX_RESULT_NUM]={0}; struct TLD_handle_t *_handle=handle; struct tsg_log_instance_t *_instance=instance; @@ -1769,7 +1901,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl if(_instance->mode==CLOSE) { TLD_cancel(handle); - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_field_id[LOG_COLUMN_STATUS_DROP], FS_OP_ADD, 1); MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log."); return 0; } @@ -1807,19 +1939,18 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl continue; } - clock_gettime(CLOCK_REALTIME, &cur_time); - if((cur_time.tv_nsec%100)>_instance->send_log_percent[thread_id]) - { - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1); + ret=update_percent(_instance, log_msg->result[i].service_id, LOG_COLUMN_STATUS_DROP, thread_id); + if(ret==1) + { MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "tsg drop log:cfg_id=%d service=%d send_log_percent: %d addr=%s", log_msg->result[i].config_id, log_msg->result[i].service_id, - _instance->send_log_percent[thread_id], + _instance->service2topic[log_msg->result[i].service_id].send_log_percent[thread_id], (log_msg->a_stream==NULL ? "" : PRINTADDR(log_msg->a_stream,_instance->level)) ); - continue; + continue; } switch(log_msg->result[i].do_log) @@ -1834,7 +1965,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl ); fs_id=action2fs_id((int)log_msg->result[i].action); - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[fs_id], 0, FS_OP_ADD, 1); + FS_operate(_instance->fs2_handle, _instance->fs2_field_id[fs_id], 0, FS_OP_ADD, 1); continue; break; case LOG_ALL: @@ -1871,42 +2002,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl Writer writer(sb); _handle->document->Accept(writer); - status=rd_kafka_produce(_instance->topic_rkt[log_msg->result[i].service_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)sb.GetString(), sb.GetSize(), NULL, 0, NULL); - if(status<0) - { - clock_gettime(CLOCK_REALTIME, &cur_time); - if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=1) - { - _instance->send_log_percent[thread_id]/=2; - clock_gettime(CLOCK_REALTIME, &_instance->drop_start[thread_id]); - FS_operate(g_tsg_para.fs2_handle, _instance->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); - } - - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FAILED_LOG], 0, FS_OP_ADD, 1); - - MESA_handle_runtime_log(_instance->logger, - RLOG_LV_INFO, - "TSG_SEND_LOG", - "tsg_send_log to kafka is error of %s(%s), status: %d, topic: %s payload: %s", - rd_kafka_err2name(rd_kafka_last_error()), - rd_kafka_err2str(rd_kafka_last_error()), - status, - _instance->service2topic[log_msg->result[i].service_id].name, - sb.GetString() - ); - } - else - { - MESA_handle_runtime_log(_instance->logger, - RLOG_LV_DEBUG, - "TSG_SEND_LOG", - "log send successfully %s: %s", - _instance->service2topic[log_msg->result[i].service_id].name, - sb.GetString() - ); - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCCESS_LOG], 0, FS_OP_ADD, 1); - FS_operate(g_tsg_para.fs2_handle, _instance->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); - } + tsg_send_payload(_instance, log_msg->result[i].service_id, (char *)sb.GetString(), sb.GetSize(), thread_id); TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name); TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name); @@ -1917,48 +2013,25 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl TLD_cancel(handle); - if(_instance->send_log_percent[thread_id]<100) - { - clock_gettime(CLOCK_REALTIME, &cur_time); - if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=_instance->recovery_interval) - { - _instance->send_log_percent[thread_id]++; - _instance->drop_start[thread_id].tv_sec=cur_time.tv_sec; - FS_operate(g_tsg_para.fs2_handle, _instance->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]); - } - } - return 0; } int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name) { - rd_kafka_topic_conf_t *topic_conf; struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance; - if(_instance==NULL || _instance->mode==CLOSE) - { - return 0; - } - - if(topic_name!=NULL && _instance->kafka_handle!=NULL) - { - _instance->service2topic=(id2field_t *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(id2field_t)); - _instance->service2topic[_instance->max_service].id=_instance->max_service; - _instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX; - memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN); - memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name))); - - _instance->topic_rkt=(rd_kafka_topic_t **)realloc(_instance->topic_rkt, (_instance->max_service+1)*sizeof(rd_kafka_topic_t*)); - topic_conf=rd_kafka_topic_conf_new(); - _instance->topic_rkt[_instance->max_service]=rd_kafka_topic_new(_instance->kafka_handle, topic_name, topic_conf); - - _instance->max_service++; - } - else + if(_instance==NULL || _instance->mode==CLOSE || topic_name==NULL || _instance->kafka_handle!=NULL) { return -1; } + _instance->service2topic=(struct topic_stat *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(struct topic_stat)); + _instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX; + memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN); + memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name))); + + register_topic(_instance, &(_instance->service2topic[_instance->max_service])); + _instance->max_service++; + return (_instance->max_service-1); } @@ -1972,30 +2045,38 @@ int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *pa return 0; } - if(payload==NULL || payload_len<=0 || topic_id<0 || _instance->topic_rkt==NULL) + if(payload==NULL || payload_len<=0 || topic_id<0 || _instance->service2topic[topic_id].topic_rkt==NULL) { + MESA_handle_runtime_log(_instance->logger, + RLOG_LV_INFO, + "TSG_SEND_LOG", + "tsg_send_log to kafka is error (payload==NULL || payload_len<=0 || topic_id<0 || _instance->service2topic[topic_id].topic_rkt==NULL), topic: %s", + _instance->service2topic[topic_id].name + ); return -1; } - status=rd_kafka_produce(_instance->topic_rkt[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, NULL, 0, NULL); + status=rd_kafka_produce(_instance->service2topic[topic_id].topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, NULL, 0, NULL); if(status<0) { - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_FAILED_LOG], 0, FS_OP_ADD, 1); + update_percent(_instance, topic_id, LOG_COLUMN_STATUS_FAIL, thread_id); MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", - "tsg_send_log to kafka is error of %s(%s), status: %d, topic: %s", + "tsg_send_log to kafka is error of code: %d %s(%s), status: %d, topic: %s %s", + rd_kafka_last_error(), rd_kafka_err2name(rd_kafka_last_error()), rd_kafka_err2str(rd_kafka_last_error()), status, - _instance->service2topic[topic_id].name + _instance->service2topic[topic_id].name, + payload ); + return -1; } - else - { - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_SUCCESS_LOG], 0, FS_OP_ADD, 1); - } + + update_percent(_instance, topic_id, LOG_COLUMN_STATUS_SUCCESS, thread_id); + update_percent(_instance, topic_id, LOG_COLUMN_STATUS_MAX, thread_id); return 0; } diff --git a/src/tsg_send_log_internal.h b/src/tsg_send_log_internal.h index bcb5b8c..2cf35fb 100644 --- a/src/tsg_send_log_internal.h +++ b/src/tsg_send_log_internal.h @@ -128,6 +128,33 @@ typedef enum _tsg_log_field_id LOG_COMMON_MAX }tsg_log_field_id_t; +enum LOG_COLUMN_STATUS +{ + LOG_COLUMN_STATUS_SUCCESS=0, + LOG_COLUMN_STATUS_SUCCESS_S, + LOG_COLUMN_STATUS_FAIL, + LOG_COLUMN_STATUS_FAIL_S, + LOG_COLUMN_STATUS_DROP, + LOG_COLUMN_STATUS_DROP_S, + LOG_COLUMN_STATUS_MAX +}; + +enum LOG_FS2_TYPE{ + LOG_FS2_ABORT_ALLOW, + LOG_FS2_ABORT_DENY, + LOG_FS2_ABORT_MONITOR, + LOG_FS2_ABORT_INTERCEPT, + LOG_FS2_ABORT_UNKNOWN, + LOG_FS2_CREATE_LOG_HANDLE, + LOG_FS2_DUP_LOG_HANDLE, + LOG_FS2_APPEND_LOG_HANDLE, + LOG_FS2_FREE_LOG_HANDLE, + LOG_FS2_FREE_RAPID_SIZE, + LOG_FS2_FREE_RAPID_CAPACITY, + LOG_FS2_TYPE_MAX +}; + + typedef struct _id2field { int type; @@ -135,6 +162,17 @@ typedef struct _id2field char name[MAX_STRING_LEN]; }id2field_t; +struct topic_stat +{ + int status; + int type; + int fs2_line_id; + int *send_log_percent; + char name[MAX_STRING_LEN]; + struct timespec *drop_start; + rd_kafka_topic_t *topic_rkt; +}; + struct tsg_log_instance_t { int mode; @@ -151,9 +189,9 @@ struct tsg_log_instance_t int mac_linkinfo_project_id; int nat_c2s_linkinfo_project_id; int nat_s2c_linkinfo_project_id; - int *send_log_percent; - int *fs_status_ids; - struct timespec *drop_start; + int sum_line_id; + int fs2_column_id[LOG_COLUMN_STATUS_MAX]; + int fs2_field_id[LOG_FS2_TYPE_MAX]; char l7_unknown_name[MAX_STRING_LEN]; char log_path[MAX_STRING_LEN*2]; char tcp_label[MAX_STRING_LEN]; @@ -168,14 +206,14 @@ struct tsg_log_instance_t char local_ip_str[MAX_IPV4_LEN]; char l7_proto_id_file[MAX_STRING_LEN*4]; id2field_t id2field[LOG_COMMON_MAX]; - rd_kafka_topic_t **topic_rkt; rd_kafka_t *kafka_handle; - id2field_t *service2topic; + struct topic_stat *service2topic; + screen_stat_handle_t fs2_handle; void *logger; }; char *log_field_id2name(struct tsg_log_instance_t *instance, tsg_log_field_id_t id); -struct tsg_log_instance_t *tsg_sendlog_init(const char *filename); +struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_handle_t fs2_handle); void tsg_sendlog_destroy(struct tsg_log_instance_t * instance); #endif diff --git a/src/tsg_tamper.cpp b/src/tsg_tamper.cpp index 897e6dd..9ddd027 100644 --- a/src/tsg_tamper.cpp +++ b/src/tsg_tamper.cpp @@ -32,7 +32,7 @@ #define IPV6_UDP_PALYLOAD_START_INDEX 48 //ipv6_len(40) + udp_len(8) #define IPV6_IP_PAYLOAD_INDEX 4 //ipv6_payload_index(4) -int swop_payload2byte(char *str, int endlen) +int swap_payload2byte(char *str, int endlen) { int i = 0; int j = 0; @@ -88,7 +88,7 @@ int send_tamper_xxx(const struct streaminfo *a_stream, long *tamper_count, const } memcpy(tamper_buf, p_trans_payload, trans_layload_len); - tamper_index = swop_payload2byte(tamper_buf, trans_layload_len); + tamper_index = swap_payload2byte(tamper_buf, trans_layload_len); if(tamper_index > 0 ){ if(0 == tsg_send_inject_packet(a_stream, SIO_DEFAULT, tamper_buf, trans_layload_len, a_stream->routedir)){ FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCESS_TAMPER], 0, FS_OP_ADD, 1); @@ -108,7 +108,7 @@ int send_tamper_xxx(const struct streaminfo *a_stream, long *tamper_count, const } } - FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_TAMPER_FAILED_NOSWOP], 0, FS_OP_ADD, 1); + FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_TAMPER_FAILED_NOSWAP], 0, FS_OP_ADD, 1); MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_DEBUG, __FUNCTION__,