TSG-10275: kafka缺少topic时触发发送日志降级机制,仅丢弃本topic的日志,不影响别的topic日志发送
This commit is contained in:
@@ -1,7 +1,6 @@
|
|||||||
#TYPE:1:UCHAR,2:USHORT,3:ULONG,4:ULOG,5:USTRING,6:FILE,7:UBASE64,8:PACKET
|
#TYPE:1:UCHAR,2:USHORT,3:ULONG,4:ULOG,5:USTRING,6:FILE,7:UBASE64,8:PACKET
|
||||||
#TYPE TOPIC SERVICE
|
#TYPE TOPIC SERVICE
|
||||||
TOPIC SECURITY-EVENT 0
|
TOPIC SECURITY-EVENT 0
|
||||||
TOPIC SECURITY-EVENT 1
|
|
||||||
TOPIC SESSION-RECORD 2
|
TOPIC SESSION-RECORD 2
|
||||||
TOPIC INTERNAL-RTP-RECORD 4
|
TOPIC INTERNAL-RTP-RECORD 4
|
||||||
TOPIC VOIP-RECORD 5
|
TOPIC VOIP-RECORD 5
|
||||||
|
|||||||
@@ -66,14 +66,6 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{0, TSG_FS2_TCP_LINKS, "tcp_links"},
|
|||||||
{0, TSG_FS2_HIT_SHARE, "hit_share"},
|
{0, TSG_FS2_HIT_SHARE, "hit_share"},
|
||||||
{0, TSG_FS2_INTERCEPT, "intercept"},
|
{0, TSG_FS2_INTERCEPT, "intercept"},
|
||||||
{0, TSG_FS2_EXCLUSION, "exclusion"},
|
{0, TSG_FS2_EXCLUSION, "exclusion"},
|
||||||
{0, TSG_FS2_SUCCESS_LOG, "success_log"},
|
|
||||||
{0, TSG_FS2_FAILED_LOG, "failed_log"},
|
|
||||||
{0, TSG_FS2_DROP_LOG, "drop_log"},
|
|
||||||
{0, TSG_FS2_ABORT_ALLOW, "abort_allow"},
|
|
||||||
{0, TSG_FS2_ABORT_DENY, "abort_deny"},
|
|
||||||
{0, TSG_FS2_ABORT_MONITOR, "abort_monitor"},
|
|
||||||
{0, TSG_FS2_ABORT_INTERCEPT, "abort_intercept"},
|
|
||||||
{0, TSG_FS2_ABORT_UNKNOWN, "abort_unknown"},
|
|
||||||
{0, TSG_FS2_APP_DPKT_RESULT, "D_result"},
|
{0, TSG_FS2_APP_DPKT_RESULT, "D_result"},
|
||||||
{0, TSG_FS2_APP_Q_RESULT, "Q_result"},
|
{0, TSG_FS2_APP_Q_RESULT, "Q_result"},
|
||||||
{0, TSG_FS2_APP_USER_RESULT, "U_result"},
|
{0, TSG_FS2_APP_USER_RESULT, "U_result"},
|
||||||
@@ -84,19 +76,11 @@ id2field_t g_tsg_fs2_field[TSG_FS2_MAX]={{0, TSG_FS2_TCP_LINKS, "tcp_links"},
|
|||||||
{0, TSG_FS2_MIRRORED_BYTE_SUCCESS, "mirror_byte_suc"},
|
{0, TSG_FS2_MIRRORED_BYTE_SUCCESS, "mirror_byte_suc"},
|
||||||
{0, TSG_FS2_MIRRORED_PKT_FAILED, "mirror_pkt_fai"},
|
{0, TSG_FS2_MIRRORED_PKT_FAILED, "mirror_pkt_fai"},
|
||||||
{0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"},
|
{0, TSG_FS2_MIRRORED_BYTE_FAILED, "mirror_byte_fai"},
|
||||||
{0, TSG_FS2_DDOS_SUCCESS_LOG, "ddos_suc_log"},
|
|
||||||
{0, TSG_FS2_DDOS_FAILED_LOG, "ddos_fai_log"},
|
|
||||||
{0, TSG_FS2_SET_TIMOUT_SUCCESS, "set_timeout_suc"},
|
{0, TSG_FS2_SET_TIMOUT_SUCCESS, "set_timeout_suc"},
|
||||||
{0, TSG_FS2_SET_TIMOUT_FAILED, "set_timeout_fai"},
|
{0, TSG_FS2_SET_TIMOUT_FAILED, "set_timeout_fai"},
|
||||||
{0, TSG_FS2_CREATE_LOG_HANDLE, "create_log_cnt"},
|
|
||||||
{0, TSG_FS2_DUP_LOG_HANDLE, "dup_log_cnt"},
|
|
||||||
{0, TSG_FS2_APPEND_LOG_HANDLE, "append_log_cnt"},
|
|
||||||
{0, TSG_FS2_FREE_LOG_HANDLE, "free_log_cnt"},
|
|
||||||
{0, TSG_FS2_FREE_RAPID_SIZE, "free_rapid_size"},
|
|
||||||
{0, TSG_FS2_FREE_RAPID_CAPACITY, "free_rapid_capacity"},
|
|
||||||
{0, TSG_FS2_SUCESS_TAMPER, "tamper_sucess"},
|
{0, TSG_FS2_SUCESS_TAMPER, "tamper_sucess"},
|
||||||
{0, TSG_FS2_TAMPER_FAILED_PLOAD_LESS_4, "tamper_nopload"},
|
{0, TSG_FS2_TAMPER_FAILED_PLOAD_LESS_4, "tamper_nopload"},
|
||||||
{0, TSG_FS2_TAMPER_FAILED_NOSWOP, "tamper_noswop"}
|
{0, TSG_FS2_TAMPER_FAILED_NOSWAP, "tamper_noswap"}
|
||||||
};
|
};
|
||||||
|
|
||||||
id2field_t g_tsg_proto_name2id[PROTO_MAX]={{PROTO_UNKONWN, 0, "unknown"},
|
id2field_t g_tsg_proto_name2id[PROTO_MAX]={{PROTO_UNKONWN, 0, "unknown"},
|
||||||
@@ -2283,14 +2267,6 @@ extern "C" int TSG_MASTER_INIT()
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
g_tsg_log_instance=tsg_sendlog_init(tsg_conffile);
|
|
||||||
if(g_tsg_log_instance==NULL)
|
|
||||||
{
|
|
||||||
MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_SENDLOG", "tsg_sendlog_init failed ...");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
g_tsg_log_instance->session_attribute_project_id=g_tsg_para.session_attribute_project_id;
|
|
||||||
|
|
||||||
MESA_load_profile_int_def(tsg_conffile, "FIELD_STAT", "CYCLE", &cycle, 30);
|
MESA_load_profile_int_def(tsg_conffile, "FIELD_STAT", "CYCLE", &cycle, 30);
|
||||||
MESA_load_profile_short_nodef(tsg_conffile, "FIELD_STAT","TELEGRAF_PORT", (short *)&(fs_server_port));
|
MESA_load_profile_short_nodef(tsg_conffile, "FIELD_STAT","TELEGRAF_PORT", (short *)&(fs_server_port));
|
||||||
MESA_load_profile_string_nodef(tsg_conffile,"FIELD_STAT","TELEGRAF_IP",fs_server_ip, sizeof(fs_server_ip));
|
MESA_load_profile_string_nodef(tsg_conffile,"FIELD_STAT","TELEGRAF_IP",fs_server_ip, sizeof(fs_server_ip));
|
||||||
@@ -2325,20 +2301,16 @@ extern "C" int TSG_MASTER_INIT()
|
|||||||
{
|
{
|
||||||
g_tsg_para.fs2_field_id[i]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_FIELD, FS_CALC_SPEED, g_tsg_fs2_field[i].name);
|
g_tsg_para.fs2_field_id[i]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_FIELD, FS_CALC_SPEED, g_tsg_fs2_field[i].name);
|
||||||
}
|
}
|
||||||
|
|
||||||
int thread_num=get_thread_count();
|
|
||||||
for(i=0; i<thread_num && g_tsg_log_instance!=NULL; i++)
|
|
||||||
{
|
|
||||||
snprintf(buff, sizeof(buff), "send_log_percent_%02d", i);
|
|
||||||
g_tsg_log_instance->fs_status_ids[i]=FS_register(g_tsg_para.fs2_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, buff);
|
|
||||||
}
|
|
||||||
|
|
||||||
FS_start(g_tsg_para.fs2_handle);
|
g_tsg_log_instance=tsg_sendlog_init(tsg_conffile, g_tsg_para.fs2_handle);
|
||||||
|
if(g_tsg_log_instance==NULL)
|
||||||
for(i=0; i<thread_num; i++)
|
|
||||||
{
|
{
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_log_instance->fs_status_ids[i], 0, FS_OP_SET, g_tsg_log_instance->send_log_percent[i]);
|
MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_SENDLOG", "tsg_sendlog_init failed ...");
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
g_tsg_log_instance->session_attribute_project_id=g_tsg_para.session_attribute_project_id;
|
||||||
|
|
||||||
|
FS_start(g_tsg_para.fs2_handle);
|
||||||
|
|
||||||
ret=tsg_statistic_init(tsg_conffile, g_tsg_para.logger);
|
ret=tsg_statistic_init(tsg_conffile, g_tsg_para.logger);
|
||||||
if(ret<0)
|
if(ret<0)
|
||||||
|
|||||||
@@ -92,7 +92,6 @@ enum MASTER_TABLE{
|
|||||||
TABLE_MAX
|
TABLE_MAX
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
enum TSG_FS2_TYPE{
|
enum TSG_FS2_TYPE{
|
||||||
TSG_FS2_TCP_LINKS=0,
|
TSG_FS2_TCP_LINKS=0,
|
||||||
TSG_FS2_UDP_LINKS,
|
TSG_FS2_UDP_LINKS,
|
||||||
@@ -101,14 +100,6 @@ enum TSG_FS2_TYPE{
|
|||||||
TSG_FS2_HIT_SHARE,
|
TSG_FS2_HIT_SHARE,
|
||||||
TSG_FS2_INTERCEPT,
|
TSG_FS2_INTERCEPT,
|
||||||
TSG_FS2_EXCLUSION,
|
TSG_FS2_EXCLUSION,
|
||||||
TSG_FS2_SUCCESS_LOG,
|
|
||||||
TSG_FS2_FAILED_LOG,
|
|
||||||
TSG_FS2_DROP_LOG,
|
|
||||||
TSG_FS2_ABORT_ALLOW,
|
|
||||||
TSG_FS2_ABORT_DENY,
|
|
||||||
TSG_FS2_ABORT_MONITOR,
|
|
||||||
TSG_FS2_ABORT_INTERCEPT,
|
|
||||||
TSG_FS2_ABORT_UNKNOWN,
|
|
||||||
TSG_FS2_APP_DPKT_RESULT,
|
TSG_FS2_APP_DPKT_RESULT,
|
||||||
TSG_FS2_APP_Q_RESULT,
|
TSG_FS2_APP_Q_RESULT,
|
||||||
TSG_FS2_APP_USER_RESULT,
|
TSG_FS2_APP_USER_RESULT,
|
||||||
@@ -119,19 +110,11 @@ enum TSG_FS2_TYPE{
|
|||||||
TSG_FS2_MIRRORED_BYTE_SUCCESS,
|
TSG_FS2_MIRRORED_BYTE_SUCCESS,
|
||||||
TSG_FS2_MIRRORED_PKT_FAILED,
|
TSG_FS2_MIRRORED_PKT_FAILED,
|
||||||
TSG_FS2_MIRRORED_BYTE_FAILED,
|
TSG_FS2_MIRRORED_BYTE_FAILED,
|
||||||
TSG_FS2_DDOS_SUCCESS_LOG,
|
|
||||||
TSG_FS2_DDOS_FAILED_LOG,
|
|
||||||
TSG_FS2_SET_TIMOUT_SUCCESS,
|
TSG_FS2_SET_TIMOUT_SUCCESS,
|
||||||
TSG_FS2_SET_TIMOUT_FAILED,
|
TSG_FS2_SET_TIMOUT_FAILED,
|
||||||
TSG_FS2_CREATE_LOG_HANDLE,
|
|
||||||
TSG_FS2_DUP_LOG_HANDLE,
|
|
||||||
TSG_FS2_APPEND_LOG_HANDLE,
|
|
||||||
TSG_FS2_FREE_LOG_HANDLE,
|
|
||||||
TSG_FS2_FREE_RAPID_SIZE,
|
|
||||||
TSG_FS2_FREE_RAPID_CAPACITY,
|
|
||||||
TSG_FS2_SUCESS_TAMPER,
|
TSG_FS2_SUCESS_TAMPER,
|
||||||
TSG_FS2_TAMPER_FAILED_PLOAD_LESS_4,
|
TSG_FS2_TAMPER_FAILED_PLOAD_LESS_4,
|
||||||
TSG_FS2_TAMPER_FAILED_NOSWOP,
|
TSG_FS2_TAMPER_FAILED_NOSWAP,
|
||||||
TSG_FS2_MAX
|
TSG_FS2_MAX
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -41,6 +41,21 @@ struct TLD_handle_t
|
|||||||
Document *document;
|
Document *document;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
id2field_t g_log_fs2_field[LOG_FS2_TYPE_MAX]={
|
||||||
|
{0, LOG_FS2_ABORT_ALLOW, "abort_allow"},
|
||||||
|
{0, LOG_FS2_ABORT_DENY, "abort_deny"},
|
||||||
|
{0, LOG_FS2_ABORT_MONITOR, "abort_monitor"},
|
||||||
|
{0, LOG_FS2_ABORT_INTERCEPT, "abort_intercept"},
|
||||||
|
{0, LOG_FS2_ABORT_UNKNOWN, "abort_unknown"},
|
||||||
|
{0, LOG_FS2_CREATE_LOG_HANDLE, "create_log_cnt"},
|
||||||
|
{0, LOG_FS2_DUP_LOG_HANDLE, "dup_log_cnt"},
|
||||||
|
{0, LOG_FS2_APPEND_LOG_HANDLE, "append_log_cnt"},
|
||||||
|
{0, LOG_FS2_FREE_LOG_HANDLE, "free_log_cnt"},
|
||||||
|
{0, LOG_FS2_FREE_RAPID_SIZE, "free_rapid_size"},
|
||||||
|
{0, LOG_FS2_FREE_RAPID_CAPACITY, "free_rapid_capacity"}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "UNKOWN"},
|
const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "UNKOWN"},
|
||||||
{TLD_TYPE_LONG, TLD_TYPE_LONG, "LONG"},
|
{TLD_TYPE_LONG, TLD_TYPE_LONG, "LONG"},
|
||||||
{TLD_TYPE_STRING, TLD_TYPE_STRING, "STRING"},
|
{TLD_TYPE_STRING, TLD_TYPE_STRING, "STRING"},
|
||||||
@@ -92,6 +107,101 @@ static void add_str_member(struct TLD_handle_t *_handle, Value *object, const ch
|
|||||||
object->AddMember(temp_key, temp_val, _handle->document->GetAllocator());
|
object->AddMember(temp_key, temp_val, _handle->document->GetAllocator());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int register_topic(struct tsg_log_instance_t *instance, struct topic_stat *topic)
|
||||||
|
{
|
||||||
|
rd_kafka_topic_conf_t *topic_conf;
|
||||||
|
struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance;
|
||||||
|
|
||||||
|
topic_conf=rd_kafka_topic_conf_new();
|
||||||
|
topic->status=1;
|
||||||
|
topic->topic_rkt=(rd_kafka_topic_t *)calloc(1, sizeof(rd_kafka_topic_t*));
|
||||||
|
topic->topic_rkt=rd_kafka_topic_new(_instance->kafka_handle, topic->name, topic_conf);
|
||||||
|
|
||||||
|
int thread_num=get_thread_count();
|
||||||
|
topic->drop_start=(struct timespec *)calloc(thread_num, sizeof(struct timespec));
|
||||||
|
topic->send_log_percent=(int *)calloc(thread_num, sizeof(int));
|
||||||
|
|
||||||
|
for(int i=0; i<thread_num; i++)
|
||||||
|
{
|
||||||
|
topic->send_log_percent[i]=100;
|
||||||
|
clock_gettime(CLOCK_REALTIME, &(topic->drop_start[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
topic->fs2_line_id=FS_register(_instance->fs2_handle, FS_STYLE_LINE, FS_CALC_SPEED, topic->name);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int update_percent(struct tsg_log_instance_t *_instance, int service_id, enum LOG_COLUMN_STATUS column, int thread_id)
|
||||||
|
{
|
||||||
|
struct timespec cur_time;
|
||||||
|
struct topic_stat *topic=(struct topic_stat *)&(_instance->service2topic[service_id]);
|
||||||
|
|
||||||
|
clock_gettime(CLOCK_REALTIME, &cur_time);
|
||||||
|
|
||||||
|
switch(column)
|
||||||
|
{
|
||||||
|
case LOG_COLUMN_STATUS_SUCCESS:
|
||||||
|
FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1);
|
||||||
|
FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1);
|
||||||
|
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1);
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1);
|
||||||
|
break;
|
||||||
|
case LOG_COLUMN_STATUS_FAIL:
|
||||||
|
FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1);
|
||||||
|
FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1);
|
||||||
|
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1);
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1);
|
||||||
|
if(cur_time.tv_sec - topic->drop_start[thread_id].tv_sec>=1)
|
||||||
|
{
|
||||||
|
topic->send_log_percent[thread_id]/=2;
|
||||||
|
topic->drop_start[thread_id]=cur_time;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case LOG_COLUMN_STATUS_DROP:
|
||||||
|
if((cur_time.tv_nsec%100) > topic->send_log_percent[thread_id])
|
||||||
|
{
|
||||||
|
FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1);
|
||||||
|
FS_operate(_instance->fs2_handle, topic->fs2_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1);
|
||||||
|
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column], FS_OP_ADD, 1);
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_column_id[column+1], FS_OP_ADD, 1);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case LOG_COLUMN_STATUS_MAX:
|
||||||
|
if(topic->send_log_percent[thread_id]>=100)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if((cur_time.tv_sec - topic->drop_start[thread_id].tv_sec) >= _instance->recovery_interval)
|
||||||
|
{
|
||||||
|
topic->send_log_percent[thread_id]++;
|
||||||
|
topic->drop_start[thread_id]=cur_time;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct tsg_log_instance_t *get_log_instance(void)
|
||||||
|
{
|
||||||
|
if(g_tsg_log_instance!=NULL)
|
||||||
|
{
|
||||||
|
return g_tsg_log_instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int is_tunnels(struct streaminfo *a_stream)
|
static int is_tunnels(struct streaminfo *a_stream)
|
||||||
{
|
{
|
||||||
const struct streaminfo *ptmp = a_stream;
|
const struct streaminfo *ptmp = a_stream;
|
||||||
@@ -857,37 +967,39 @@ static int action2fs_id(int action)
|
|||||||
switch(action)
|
switch(action)
|
||||||
{
|
{
|
||||||
case TSG_ACTION_DENY:
|
case TSG_ACTION_DENY:
|
||||||
return TSG_FS2_ABORT_DENY;
|
return LOG_FS2_ABORT_DENY;
|
||||||
break;
|
break;
|
||||||
case TSG_ACTION_BYPASS:
|
case TSG_ACTION_BYPASS:
|
||||||
return TSG_FS2_ABORT_ALLOW;
|
return LOG_FS2_ABORT_ALLOW;
|
||||||
break;
|
break;
|
||||||
case TSG_ACTION_MONITOR:
|
case TSG_ACTION_MONITOR:
|
||||||
return TSG_FS2_ABORT_MONITOR;
|
return LOG_FS2_ABORT_MONITOR;
|
||||||
break;
|
break;
|
||||||
case TSG_ACTION_INTERCEPT:
|
case TSG_ACTION_INTERCEPT:
|
||||||
return TSG_FS2_ABORT_INTERCEPT;
|
return LOG_FS2_ABORT_INTERCEPT;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
return TSG_FS2_ABORT_UNKNOWN;
|
return LOG_FS2_ABORT_UNKNOWN;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSG_FS2_ABORT_UNKNOWN;
|
return LOG_FS2_ABORT_UNKNOWN;
|
||||||
}
|
}
|
||||||
|
|
||||||
int TLD_cancel(struct TLD_handle_t *handle)
|
int TLD_cancel(struct TLD_handle_t *handle)
|
||||||
{
|
{
|
||||||
long long length=0;
|
|
||||||
if (handle != NULL)
|
if (handle != NULL)
|
||||||
{
|
{
|
||||||
if (handle->document != NULL)
|
if (handle->document != NULL)
|
||||||
{
|
{
|
||||||
|
long long length=0;
|
||||||
|
struct tsg_log_instance_t *_instance=get_log_instance();
|
||||||
|
|
||||||
length=handle->document->GetAllocator().Size();
|
length=handle->document->GetAllocator().Size();
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FREE_RAPID_SIZE], 0, FS_OP_ADD, length);
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_FREE_RAPID_SIZE], 0, FS_OP_ADD, length);
|
||||||
|
|
||||||
length=handle->document->GetAllocator().Capacity();
|
length=handle->document->GetAllocator().Capacity();
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FREE_RAPID_CAPACITY], 0, FS_OP_ADD, length);
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_FREE_RAPID_CAPACITY], 0, FS_OP_ADD, length);
|
||||||
|
|
||||||
delete handle->document;
|
delete handle->document;
|
||||||
handle->document = NULL;
|
handle->document = NULL;
|
||||||
@@ -895,7 +1007,7 @@ int TLD_cancel(struct TLD_handle_t *handle)
|
|||||||
delete handle->valueAllocator;
|
delete handle->valueAllocator;
|
||||||
handle->valueAllocator=NULL;
|
handle->valueAllocator=NULL;
|
||||||
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FREE_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_FREE_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(handle);
|
free(handle);
|
||||||
@@ -961,7 +1073,9 @@ int TLD_append(struct TLD_handle_t *handle, char *key, void *value, TLD_TYPE typ
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_APPEND_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
struct tsg_log_instance_t *_instance=get_log_instance();
|
||||||
|
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_APPEND_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -1012,8 +1126,9 @@ struct TLD_handle_t *TLD_duplicate(struct TLD_handle_t *handle)
|
|||||||
//_handle->document->SetObject();
|
//_handle->document->SetObject();
|
||||||
|
|
||||||
_handle->document->CopyFrom(*handle->document, _handle->document->GetAllocator());
|
_handle->document->CopyFrom(*handle->document, _handle->document->GetAllocator());
|
||||||
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DUP_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
struct tsg_log_instance_t *_instance=get_log_instance();
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_DUP_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
||||||
|
|
||||||
return _handle;
|
return _handle;
|
||||||
}
|
}
|
||||||
@@ -1029,7 +1144,8 @@ struct TLD_handle_t *TLD_create(int thread_id)
|
|||||||
_handle->document = new Document(_handle->valueAllocator);
|
_handle->document = new Document(_handle->valueAllocator);
|
||||||
_handle->document->SetObject();
|
_handle->document->SetObject();
|
||||||
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_CREATE_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
struct tsg_log_instance_t *_instance=get_log_instance();
|
||||||
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[LOG_FS2_CREATE_LOG_HANDLE], 0, FS_OP_ADD, 1);
|
||||||
|
|
||||||
return _handle;
|
return _handle;
|
||||||
}
|
}
|
||||||
@@ -1440,15 +1556,15 @@ int TLD_append_streaminfo(struct tsg_log_instance_t *instance, struct TLD_handle
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t **service2topic, int *max_service)
|
int load_log_common_field(const char *filename, id2field_t *id2field, struct topic_stat **service2topic, int *max_service)
|
||||||
{
|
{
|
||||||
int i=0;
|
int i=0,flag=0;
|
||||||
int ret=0,id=0;
|
int ret=0,id=0;
|
||||||
FILE *fp=NULL;
|
FILE *fp=NULL;
|
||||||
char line[1024]={0};
|
char line[1024]={0};
|
||||||
char field_name[64]={0};
|
char field_name[64]={0};
|
||||||
char type_name[32]={0};
|
char type_name[32]={0};
|
||||||
id2field_t *_service2topic=NULL;
|
struct topic_stat *_service2topic=NULL;
|
||||||
|
|
||||||
fp=fopen(filename, "r");
|
fp=fopen(filename, "r");
|
||||||
if(fp==NULL)
|
if(fp==NULL)
|
||||||
@@ -1481,13 +1597,14 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
|
|||||||
id2field[id].type = tld_type[i].type;
|
id2field[id].type = tld_type[i].type;
|
||||||
id2field[id].id = id;
|
id2field[id].id = id;
|
||||||
memcpy(id2field[id].name, field_name, strlen(field_name));
|
memcpy(id2field[id].name, field_name, strlen(field_name));
|
||||||
|
flag=1;
|
||||||
break;
|
break;
|
||||||
case TLD_TYPE_TOPIC:
|
case TLD_TYPE_TOPIC:
|
||||||
if(_service2topic==NULL)
|
if(_service2topic==NULL)
|
||||||
{
|
{
|
||||||
_service2topic=(id2field_t *)calloc(1, sizeof(id2field_t)*(id+1));
|
_service2topic=(struct topic_stat *)calloc(1, sizeof(struct topic_stat)*(id+1));
|
||||||
_service2topic[id].type = TLD_TYPE_MAX;
|
_service2topic[id].type = TLD_TYPE_MAX;
|
||||||
_service2topic[id].id = id;
|
//_service2topic[id].id = id;
|
||||||
memcpy(_service2topic[id].name, field_name, strlen(field_name));
|
memcpy(_service2topic[id].name, field_name, strlen(field_name));
|
||||||
|
|
||||||
*max_service=id+1;
|
*max_service=id+1;
|
||||||
@@ -1496,26 +1613,33 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
|
|||||||
{
|
{
|
||||||
if(*max_service<=id)
|
if(*max_service<=id)
|
||||||
{
|
{
|
||||||
_service2topic=(id2field_t *)realloc(_service2topic, sizeof(id2field_t)*(id+1));
|
_service2topic=(struct topic_stat *)realloc(_service2topic, sizeof(struct topic_stat)*(id+1));
|
||||||
memset(&_service2topic[id], 0, sizeof(id2field_t));
|
memset(&_service2topic[id], 0, sizeof(struct topic_stat));
|
||||||
_service2topic[id].type = TLD_TYPE_MAX;
|
_service2topic[id].type = TLD_TYPE_MAX;
|
||||||
_service2topic[id].id = id;
|
//_service2topic[id].id = id;
|
||||||
memcpy(_service2topic[id].name, field_name, strlen(field_name));
|
memcpy(_service2topic[id].name, field_name, strlen(field_name));
|
||||||
|
|
||||||
*max_service=id+1;
|
*max_service=id+1;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
memset(&_service2topic[id], 0, sizeof(id2field_t));
|
memset(&_service2topic[id], 0, sizeof(struct topic_stat));
|
||||||
_service2topic[id].type = TLD_TYPE_MAX;
|
_service2topic[id].type = TLD_TYPE_MAX;
|
||||||
_service2topic[id].id = id;
|
//_service2topic[id].id = id;
|
||||||
memcpy(_service2topic[id].name, field_name, strlen(field_name));
|
memcpy(_service2topic[id].name, field_name, strlen(field_name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
flag=1;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(flag==1)
|
||||||
|
{
|
||||||
|
flag=0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
memset(line, 0, sizeof(line));
|
memset(line, 0, sizeof(line));
|
||||||
@@ -1526,13 +1650,13 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
|
|||||||
|
|
||||||
if(service2topic!=NULL)
|
if(service2topic!=NULL)
|
||||||
{
|
{
|
||||||
*service2topic=_service2topic;
|
(*service2topic)=_service2topic;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_handle_t fs2_handle)
|
||||||
{
|
{
|
||||||
int i=0,ret=0;
|
int i=0,ret=0;
|
||||||
char label_buff[128]={0};
|
char label_buff[128]={0};
|
||||||
@@ -1540,20 +1664,25 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
|||||||
char kafka_errstr[1024]={0};
|
char kafka_errstr[1024]={0};
|
||||||
unsigned int local_ip_nr=0;
|
unsigned int local_ip_nr=0;
|
||||||
rd_kafka_conf_t *rdkafka_conf = NULL;
|
rd_kafka_conf_t *rdkafka_conf = NULL;
|
||||||
rd_kafka_topic_conf_t *topic_conf;
|
|
||||||
struct tsg_log_instance_t *_instance=NULL;
|
struct tsg_log_instance_t *_instance=NULL;
|
||||||
|
|
||||||
_instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
|
_instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
|
||||||
|
_instance->fs2_handle=fs2_handle;
|
||||||
int thread_num=get_thread_count();
|
|
||||||
_instance->drop_start=(struct timespec *)calloc(1, sizeof(struct timespec)*thread_num);
|
|
||||||
_instance->fs_status_ids=(int *)calloc(1, sizeof(int)*thread_num);
|
|
||||||
_instance->send_log_percent=(int *)calloc(1, sizeof(int)*thread_num);
|
|
||||||
|
|
||||||
for(i=0;i<thread_num; i++)
|
for(i=0; i<LOG_FS2_TYPE_MAX; i++)
|
||||||
{
|
{
|
||||||
_instance->send_log_percent[i]=100;
|
_instance->fs2_field_id[i]=FS_register(_instance->fs2_handle, FS_STYLE_FIELD, FS_CALC_SPEED, g_log_fs2_field[i].name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_instance->fs2_column_id[LOG_COLUMN_STATUS_SUCCESS]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, "T_success_log");
|
||||||
|
_instance->fs2_column_id[LOG_COLUMN_STATUS_FAIL]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, "T_fail_log");
|
||||||
|
_instance->fs2_column_id[LOG_COLUMN_STATUS_DROP]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, "T_drop_log");
|
||||||
|
|
||||||
|
_instance->fs2_column_id[LOG_COLUMN_STATUS_SUCCESS_S]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, "success_log/s");
|
||||||
|
_instance->fs2_column_id[LOG_COLUMN_STATUS_FAIL_S]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, "fail_log/s");
|
||||||
|
_instance->fs2_column_id[LOG_COLUMN_STATUS_DROP_S]=FS_register(_instance->fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, "drop_log/s");
|
||||||
|
|
||||||
|
_instance->sum_line_id=FS_register(_instance->fs2_handle, FS_STYLE_LINE, FS_CALC_SPEED, "SUM");
|
||||||
|
|
||||||
MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30);
|
MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30);
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", _instance->log_path, sizeof(_instance->log_path), "./tsglog/tsglog");
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", _instance->log_path, sizeof(_instance->log_path), "./tsglog/tsglog");
|
||||||
@@ -1665,14 +1794,16 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
|||||||
|
|
||||||
if(_instance->service2topic!=NULL)
|
if(_instance->service2topic!=NULL)
|
||||||
{
|
{
|
||||||
_instance->topic_rkt=(rd_kafka_topic_t **)calloc(1, (_instance->max_service)*sizeof(rd_kafka_topic_t*));
|
|
||||||
|
|
||||||
for(i=0; i<_instance->max_service; i++)
|
for(i=0; i<_instance->max_service; i++)
|
||||||
{
|
{
|
||||||
if(_instance->service2topic[i].type==TLD_TYPE_MAX)
|
if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0)
|
||||||
{
|
{
|
||||||
topic_conf=rd_kafka_topic_conf_new();
|
register_topic(_instance, &( _instance->service2topic[i]));
|
||||||
_instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(_instance->kafka_handle, _instance->service2topic[i].name, topic_conf);
|
}
|
||||||
|
|
||||||
|
if(i==1)
|
||||||
|
{
|
||||||
|
memcpy(&(_instance->service2topic[i]), &(_instance->service2topic[0]), sizeof(struct topic_stat)); // service id of security event is 0 and 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1699,20 +1830,32 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
|
|||||||
{
|
{
|
||||||
for(int i=0; i<instance->max_service; i++)
|
for(int i=0; i<instance->max_service; i++)
|
||||||
{
|
{
|
||||||
if(instance->topic_rkt[i]==NULL)
|
if(instance->service2topic[i].type!=TLD_TYPE_MAX || i==1) //i=1 equal i=0, service id of security event is 0 and 1
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(instance->service2topic[i].topic_rkt!=NULL)
|
||||||
|
{
|
||||||
|
rd_kafka_topic_destroy(instance->service2topic[i].topic_rkt);
|
||||||
|
}
|
||||||
|
|
||||||
rd_kafka_topic_destroy(instance->topic_rkt[i]);
|
if(instance->service2topic[i].drop_start!=NULL)
|
||||||
|
{
|
||||||
|
free(instance->service2topic[i].drop_start);
|
||||||
|
instance->service2topic[i].drop_start=NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(instance->service2topic[i].send_log_percent!=NULL)
|
||||||
|
{
|
||||||
|
free(instance->service2topic[i].send_log_percent);
|
||||||
|
instance->service2topic[i].send_log_percent=NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//rd_kafka_destroy_flags(instance->kafka_handle, 4);
|
//rd_kafka_destroy_flags(instance->kafka_handle, 4);
|
||||||
rd_kafka_destroy(instance->kafka_handle);
|
rd_kafka_destroy(instance->kafka_handle);
|
||||||
|
|
||||||
free(instance->topic_rkt);
|
|
||||||
instance->topic_rkt=NULL;
|
|
||||||
|
|
||||||
free(instance->service2topic);
|
free(instance->service2topic);
|
||||||
instance->service2topic=NULL;
|
instance->service2topic=NULL;
|
||||||
}
|
}
|
||||||
@@ -1721,15 +1864,6 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
|
|||||||
MESA_destroy_runtime_log_handle(instance->logger);
|
MESA_destroy_runtime_log_handle(instance->logger);
|
||||||
instance->logger=NULL;
|
instance->logger=NULL;
|
||||||
|
|
||||||
free(instance->drop_start);
|
|
||||||
instance->drop_start=NULL;
|
|
||||||
|
|
||||||
free(instance->fs_status_ids);
|
|
||||||
instance->fs_status_ids=NULL;
|
|
||||||
|
|
||||||
free(instance->send_log_percent);
|
|
||||||
instance->send_log_percent=NULL;
|
|
||||||
|
|
||||||
free(instance);
|
free(instance);
|
||||||
instance=NULL;
|
instance=NULL;
|
||||||
/*
|
/*
|
||||||
@@ -1751,10 +1885,8 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance)
|
|||||||
|
|
||||||
int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id)
|
int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id)
|
||||||
{
|
{
|
||||||
int fs_id=0;
|
int fs_id=0,ret=0;
|
||||||
int i=0,status=0;
|
int i=0,repeat_cnt=0;
|
||||||
int repeat_cnt=0;
|
|
||||||
struct timespec cur_time;
|
|
||||||
int policy_id[MAX_RESULT_NUM]={0};
|
int policy_id[MAX_RESULT_NUM]={0};
|
||||||
struct TLD_handle_t *_handle=handle;
|
struct TLD_handle_t *_handle=handle;
|
||||||
struct tsg_log_instance_t *_instance=instance;
|
struct tsg_log_instance_t *_instance=instance;
|
||||||
@@ -1769,7 +1901,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
|
|||||||
if(_instance->mode==CLOSE)
|
if(_instance->mode==CLOSE)
|
||||||
{
|
{
|
||||||
TLD_cancel(handle);
|
TLD_cancel(handle);
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1);
|
FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_field_id[LOG_COLUMN_STATUS_DROP], FS_OP_ADD, 1);
|
||||||
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log.");
|
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log.");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -1807,19 +1939,18 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
clock_gettime(CLOCK_REALTIME, &cur_time);
|
ret=update_percent(_instance, log_msg->result[i].service_id, LOG_COLUMN_STATUS_DROP, thread_id);
|
||||||
if((cur_time.tv_nsec%100)>_instance->send_log_percent[thread_id])
|
if(ret==1)
|
||||||
{
|
{
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DROP_LOG], 0, FS_OP_ADD, 1);
|
|
||||||
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO,
|
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO,
|
||||||
"TSG_SEND_LOG",
|
"TSG_SEND_LOG",
|
||||||
"tsg drop log:cfg_id=%d service=%d send_log_percent: %d addr=%s",
|
"tsg drop log:cfg_id=%d service=%d send_log_percent: %d addr=%s",
|
||||||
log_msg->result[i].config_id,
|
log_msg->result[i].config_id,
|
||||||
log_msg->result[i].service_id,
|
log_msg->result[i].service_id,
|
||||||
_instance->send_log_percent[thread_id],
|
_instance->service2topic[log_msg->result[i].service_id].send_log_percent[thread_id],
|
||||||
(log_msg->a_stream==NULL ? "" : PRINTADDR(log_msg->a_stream,_instance->level))
|
(log_msg->a_stream==NULL ? "" : PRINTADDR(log_msg->a_stream,_instance->level))
|
||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(log_msg->result[i].do_log)
|
switch(log_msg->result[i].do_log)
|
||||||
@@ -1834,7 +1965,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
|
|||||||
);
|
);
|
||||||
|
|
||||||
fs_id=action2fs_id((int)log_msg->result[i].action);
|
fs_id=action2fs_id((int)log_msg->result[i].action);
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[fs_id], 0, FS_OP_ADD, 1);
|
FS_operate(_instance->fs2_handle, _instance->fs2_field_id[fs_id], 0, FS_OP_ADD, 1);
|
||||||
continue;
|
continue;
|
||||||
break;
|
break;
|
||||||
case LOG_ALL:
|
case LOG_ALL:
|
||||||
@@ -1871,42 +2002,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
|
|||||||
Writer<StringBuffer> writer(sb);
|
Writer<StringBuffer> writer(sb);
|
||||||
_handle->document->Accept(writer);
|
_handle->document->Accept(writer);
|
||||||
|
|
||||||
status=rd_kafka_produce(_instance->topic_rkt[log_msg->result[i].service_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)sb.GetString(), sb.GetSize(), NULL, 0, NULL);
|
tsg_send_payload(_instance, log_msg->result[i].service_id, (char *)sb.GetString(), sb.GetSize(), thread_id);
|
||||||
if(status<0)
|
|
||||||
{
|
|
||||||
clock_gettime(CLOCK_REALTIME, &cur_time);
|
|
||||||
if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=1)
|
|
||||||
{
|
|
||||||
_instance->send_log_percent[thread_id]/=2;
|
|
||||||
clock_gettime(CLOCK_REALTIME, &_instance->drop_start[thread_id]);
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, _instance->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
|
|
||||||
}
|
|
||||||
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_FAILED_LOG], 0, FS_OP_ADD, 1);
|
|
||||||
|
|
||||||
MESA_handle_runtime_log(_instance->logger,
|
|
||||||
RLOG_LV_INFO,
|
|
||||||
"TSG_SEND_LOG",
|
|
||||||
"tsg_send_log to kafka is error of %s(%s), status: %d, topic: %s payload: %s",
|
|
||||||
rd_kafka_err2name(rd_kafka_last_error()),
|
|
||||||
rd_kafka_err2str(rd_kafka_last_error()),
|
|
||||||
status,
|
|
||||||
_instance->service2topic[log_msg->result[i].service_id].name,
|
|
||||||
sb.GetString()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MESA_handle_runtime_log(_instance->logger,
|
|
||||||
RLOG_LV_DEBUG,
|
|
||||||
"TSG_SEND_LOG",
|
|
||||||
"log send successfully %s: %s",
|
|
||||||
_instance->service2topic[log_msg->result[i].service_id].name,
|
|
||||||
sb.GetString()
|
|
||||||
);
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCCESS_LOG], 0, FS_OP_ADD, 1);
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, _instance->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
|
|
||||||
}
|
|
||||||
|
|
||||||
TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name);
|
TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name);
|
||||||
TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name);
|
TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name);
|
||||||
@@ -1917,48 +2013,25 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
|
|||||||
|
|
||||||
TLD_cancel(handle);
|
TLD_cancel(handle);
|
||||||
|
|
||||||
if(_instance->send_log_percent[thread_id]<100)
|
|
||||||
{
|
|
||||||
clock_gettime(CLOCK_REALTIME, &cur_time);
|
|
||||||
if(cur_time.tv_sec - _instance->drop_start[thread_id].tv_sec>=_instance->recovery_interval)
|
|
||||||
{
|
|
||||||
_instance->send_log_percent[thread_id]++;
|
|
||||||
_instance->drop_start[thread_id].tv_sec=cur_time.tv_sec;
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, _instance->fs_status_ids[thread_id], 0, FS_OP_SET, _instance->send_log_percent[thread_id]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name)
|
int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name)
|
||||||
{
|
{
|
||||||
rd_kafka_topic_conf_t *topic_conf;
|
|
||||||
struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance;
|
struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance;
|
||||||
if(_instance==NULL || _instance->mode==CLOSE)
|
if(_instance==NULL || _instance->mode==CLOSE || topic_name==NULL || _instance->kafka_handle!=NULL)
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(topic_name!=NULL && _instance->kafka_handle!=NULL)
|
|
||||||
{
|
|
||||||
_instance->service2topic=(id2field_t *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(id2field_t));
|
|
||||||
_instance->service2topic[_instance->max_service].id=_instance->max_service;
|
|
||||||
_instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX;
|
|
||||||
memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN);
|
|
||||||
memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name)));
|
|
||||||
|
|
||||||
_instance->topic_rkt=(rd_kafka_topic_t **)realloc(_instance->topic_rkt, (_instance->max_service+1)*sizeof(rd_kafka_topic_t*));
|
|
||||||
topic_conf=rd_kafka_topic_conf_new();
|
|
||||||
_instance->topic_rkt[_instance->max_service]=rd_kafka_topic_new(_instance->kafka_handle, topic_name, topic_conf);
|
|
||||||
|
|
||||||
_instance->max_service++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_instance->service2topic=(struct topic_stat *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(struct topic_stat));
|
||||||
|
_instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX;
|
||||||
|
memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN);
|
||||||
|
memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name)));
|
||||||
|
|
||||||
|
register_topic(_instance, &(_instance->service2topic[_instance->max_service]));
|
||||||
|
_instance->max_service++;
|
||||||
|
|
||||||
return (_instance->max_service-1);
|
return (_instance->max_service-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1972,30 +2045,38 @@ int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *pa
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(payload==NULL || payload_len<=0 || topic_id<0 || _instance->topic_rkt==NULL)
|
if(payload==NULL || payload_len<=0 || topic_id<0 || _instance->service2topic[topic_id].topic_rkt==NULL)
|
||||||
{
|
{
|
||||||
|
MESA_handle_runtime_log(_instance->logger,
|
||||||
|
RLOG_LV_INFO,
|
||||||
|
"TSG_SEND_LOG",
|
||||||
|
"tsg_send_log to kafka is error (payload==NULL || payload_len<=0 || topic_id<0 || _instance->service2topic[topic_id].topic_rkt==NULL), topic: %s",
|
||||||
|
_instance->service2topic[topic_id].name
|
||||||
|
);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
status=rd_kafka_produce(_instance->topic_rkt[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, NULL, 0, NULL);
|
status=rd_kafka_produce(_instance->service2topic[topic_id].topic_rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, NULL, 0, NULL);
|
||||||
if(status<0)
|
if(status<0)
|
||||||
{
|
{
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_FAILED_LOG], 0, FS_OP_ADD, 1);
|
update_percent(_instance, topic_id, LOG_COLUMN_STATUS_FAIL, thread_id);
|
||||||
|
|
||||||
MESA_handle_runtime_log(_instance->logger,
|
MESA_handle_runtime_log(_instance->logger,
|
||||||
RLOG_LV_INFO,
|
RLOG_LV_INFO,
|
||||||
"TSG_SEND_LOG",
|
"TSG_SEND_LOG",
|
||||||
"tsg_send_log to kafka is error of %s(%s), status: %d, topic: %s",
|
"tsg_send_log to kafka is error of code: %d %s(%s), status: %d, topic: %s %s",
|
||||||
|
rd_kafka_last_error(),
|
||||||
rd_kafka_err2name(rd_kafka_last_error()),
|
rd_kafka_err2name(rd_kafka_last_error()),
|
||||||
rd_kafka_err2str(rd_kafka_last_error()),
|
rd_kafka_err2str(rd_kafka_last_error()),
|
||||||
status,
|
status,
|
||||||
_instance->service2topic[topic_id].name
|
_instance->service2topic[topic_id].name,
|
||||||
|
payload
|
||||||
);
|
);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
update_percent(_instance, topic_id, LOG_COLUMN_STATUS_SUCCESS, thread_id);
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_DDOS_SUCCESS_LOG], 0, FS_OP_ADD, 1);
|
update_percent(_instance, topic_id, LOG_COLUMN_STATUS_MAX, thread_id);
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,6 +128,33 @@ typedef enum _tsg_log_field_id
|
|||||||
LOG_COMMON_MAX
|
LOG_COMMON_MAX
|
||||||
}tsg_log_field_id_t;
|
}tsg_log_field_id_t;
|
||||||
|
|
||||||
|
enum LOG_COLUMN_STATUS
|
||||||
|
{
|
||||||
|
LOG_COLUMN_STATUS_SUCCESS=0,
|
||||||
|
LOG_COLUMN_STATUS_SUCCESS_S,
|
||||||
|
LOG_COLUMN_STATUS_FAIL,
|
||||||
|
LOG_COLUMN_STATUS_FAIL_S,
|
||||||
|
LOG_COLUMN_STATUS_DROP,
|
||||||
|
LOG_COLUMN_STATUS_DROP_S,
|
||||||
|
LOG_COLUMN_STATUS_MAX
|
||||||
|
};
|
||||||
|
|
||||||
|
enum LOG_FS2_TYPE{
|
||||||
|
LOG_FS2_ABORT_ALLOW,
|
||||||
|
LOG_FS2_ABORT_DENY,
|
||||||
|
LOG_FS2_ABORT_MONITOR,
|
||||||
|
LOG_FS2_ABORT_INTERCEPT,
|
||||||
|
LOG_FS2_ABORT_UNKNOWN,
|
||||||
|
LOG_FS2_CREATE_LOG_HANDLE,
|
||||||
|
LOG_FS2_DUP_LOG_HANDLE,
|
||||||
|
LOG_FS2_APPEND_LOG_HANDLE,
|
||||||
|
LOG_FS2_FREE_LOG_HANDLE,
|
||||||
|
LOG_FS2_FREE_RAPID_SIZE,
|
||||||
|
LOG_FS2_FREE_RAPID_CAPACITY,
|
||||||
|
LOG_FS2_TYPE_MAX
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
typedef struct _id2field
|
typedef struct _id2field
|
||||||
{
|
{
|
||||||
int type;
|
int type;
|
||||||
@@ -135,6 +162,17 @@ typedef struct _id2field
|
|||||||
char name[MAX_STRING_LEN];
|
char name[MAX_STRING_LEN];
|
||||||
}id2field_t;
|
}id2field_t;
|
||||||
|
|
||||||
|
struct topic_stat
|
||||||
|
{
|
||||||
|
int status;
|
||||||
|
int type;
|
||||||
|
int fs2_line_id;
|
||||||
|
int *send_log_percent;
|
||||||
|
char name[MAX_STRING_LEN];
|
||||||
|
struct timespec *drop_start;
|
||||||
|
rd_kafka_topic_t *topic_rkt;
|
||||||
|
};
|
||||||
|
|
||||||
struct tsg_log_instance_t
|
struct tsg_log_instance_t
|
||||||
{
|
{
|
||||||
int mode;
|
int mode;
|
||||||
@@ -151,9 +189,9 @@ struct tsg_log_instance_t
|
|||||||
int mac_linkinfo_project_id;
|
int mac_linkinfo_project_id;
|
||||||
int nat_c2s_linkinfo_project_id;
|
int nat_c2s_linkinfo_project_id;
|
||||||
int nat_s2c_linkinfo_project_id;
|
int nat_s2c_linkinfo_project_id;
|
||||||
int *send_log_percent;
|
int sum_line_id;
|
||||||
int *fs_status_ids;
|
int fs2_column_id[LOG_COLUMN_STATUS_MAX];
|
||||||
struct timespec *drop_start;
|
int fs2_field_id[LOG_FS2_TYPE_MAX];
|
||||||
char l7_unknown_name[MAX_STRING_LEN];
|
char l7_unknown_name[MAX_STRING_LEN];
|
||||||
char log_path[MAX_STRING_LEN*2];
|
char log_path[MAX_STRING_LEN*2];
|
||||||
char tcp_label[MAX_STRING_LEN];
|
char tcp_label[MAX_STRING_LEN];
|
||||||
@@ -168,14 +206,14 @@ struct tsg_log_instance_t
|
|||||||
char local_ip_str[MAX_IPV4_LEN];
|
char local_ip_str[MAX_IPV4_LEN];
|
||||||
char l7_proto_id_file[MAX_STRING_LEN*4];
|
char l7_proto_id_file[MAX_STRING_LEN*4];
|
||||||
id2field_t id2field[LOG_COMMON_MAX];
|
id2field_t id2field[LOG_COMMON_MAX];
|
||||||
rd_kafka_topic_t **topic_rkt;
|
|
||||||
rd_kafka_t *kafka_handle;
|
rd_kafka_t *kafka_handle;
|
||||||
id2field_t *service2topic;
|
struct topic_stat *service2topic;
|
||||||
|
screen_stat_handle_t fs2_handle;
|
||||||
void *logger;
|
void *logger;
|
||||||
};
|
};
|
||||||
|
|
||||||
char *log_field_id2name(struct tsg_log_instance_t *instance, tsg_log_field_id_t id);
|
char *log_field_id2name(struct tsg_log_instance_t *instance, tsg_log_field_id_t id);
|
||||||
struct tsg_log_instance_t *tsg_sendlog_init(const char *filename);
|
struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_handle_t fs2_handle);
|
||||||
void tsg_sendlog_destroy(struct tsg_log_instance_t * instance);
|
void tsg_sendlog_destroy(struct tsg_log_instance_t * instance);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
#define IPV6_UDP_PALYLOAD_START_INDEX 48 //ipv6_len(40) + udp_len(8)
|
#define IPV6_UDP_PALYLOAD_START_INDEX 48 //ipv6_len(40) + udp_len(8)
|
||||||
#define IPV6_IP_PAYLOAD_INDEX 4 //ipv6_payload_index(4)
|
#define IPV6_IP_PAYLOAD_INDEX 4 //ipv6_payload_index(4)
|
||||||
|
|
||||||
int swop_payload2byte(char *str, int endlen)
|
int swap_payload2byte(char *str, int endlen)
|
||||||
{
|
{
|
||||||
int i = 0;
|
int i = 0;
|
||||||
int j = 0;
|
int j = 0;
|
||||||
@@ -88,7 +88,7 @@ int send_tamper_xxx(const struct streaminfo *a_stream, long *tamper_count, const
|
|||||||
}
|
}
|
||||||
|
|
||||||
memcpy(tamper_buf, p_trans_payload, trans_layload_len);
|
memcpy(tamper_buf, p_trans_payload, trans_layload_len);
|
||||||
tamper_index = swop_payload2byte(tamper_buf, trans_layload_len);
|
tamper_index = swap_payload2byte(tamper_buf, trans_layload_len);
|
||||||
if(tamper_index > 0 ){
|
if(tamper_index > 0 ){
|
||||||
if(0 == tsg_send_inject_packet(a_stream, SIO_DEFAULT, tamper_buf, trans_layload_len, a_stream->routedir)){
|
if(0 == tsg_send_inject_packet(a_stream, SIO_DEFAULT, tamper_buf, trans_layload_len, a_stream->routedir)){
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCESS_TAMPER], 0, FS_OP_ADD, 1);
|
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_SUCESS_TAMPER], 0, FS_OP_ADD, 1);
|
||||||
@@ -108,7 +108,7 @@ int send_tamper_xxx(const struct streaminfo *a_stream, long *tamper_count, const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_TAMPER_FAILED_NOSWOP], 0, FS_OP_ADD, 1);
|
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_TAMPER_FAILED_NOSWAP], 0, FS_OP_ADD, 1);
|
||||||
MESA_handle_runtime_log(g_tsg_para.logger,
|
MESA_handle_runtime_log(g_tsg_para.logger,
|
||||||
RLOG_LV_DEBUG,
|
RLOG_LV_DEBUG,
|
||||||
__FUNCTION__,
|
__FUNCTION__,
|
||||||
|
|||||||
Reference in New Issue
Block a user