From 8913a7d22eacccc639fe6ae2fd21f21b269b9b18 Mon Sep 17 00:00:00 2001 From: liuxueli Date: Fri, 15 Nov 2019 19:29:54 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=84=E5=88=99=E6=8E=92=E5=BA=8F=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=20=E7=BB=9F=E4=B8=80=E5=8F=91=E9=80=81=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inc/tsg_entry.h | 45 ++++ inc/tsg_log_id.h | 41 ---- inc/tsg_send_log.h | 4 +- inc/tsg_types.h | 127 ++++------- src/tsg_entry.cpp | 279 ++++++++++++++++++++++- src/tsg_rule.cpp | 245 +++++++++++++++++++- src/tsg_send_log.cpp | 437 ++++++++++++++++++++++++++++++++---- src/tsg_send_log_internal.h | 56 ++++- 8 files changed, 1050 insertions(+), 184 deletions(-) diff --git a/inc/tsg_entry.h b/inc/tsg_entry.h index 8b13789..7d56227 100644 --- a/inc/tsg_entry.h +++ b/inc/tsg_entry.h @@ -1 +1,46 @@ +#ifndef __TSG_ENTRY_H__ +#define __TSG_ENTRY_H__ +#include +#include "tsg_types.h" + +#define FW_ACTION_UNKNOWN 0x00 +#define FW_ACTION_DROP 0x01 +#define FW_ACTION_RESET 0x02 +#define FW_ACTION_BLOCK 0x04 + +#define TSG_ACTION_NONE 0x00 +#define TSG_ACTION_MONITOR 0x01 +#define TSG_ACTION_INTERCEPT 0x02 +#define TSG_ACTION_DENY 0x10 +#define TSG_ACTION_MANIPULATE 0x30 +#define TSG_ACTION_BYPASS 0x80 + + +#define MAX_RESULT_NUM 8 +#define MAX_DOAMIN_LEN 2048 + +typedef struct _policy_priority_label +{ + tsg_protocol_t proto; //enum _tsg_protocol (tsg_types.h) + int domain_len; + int result_type; //enum _PULL_RESULT_TYPE (tsg_rule.h) + int result_num; + char domain[MAX_DOAMIN_LEN]; + Maat_rule_t result[MAX_RESULT_NUM]; +}policy_priority_label_t; + + +typedef struct _tsg_para +{ + int ip_addr_table_id; + int subscribe_id_table_id; + int priority_project_id; + void *logger; +}g_tsg_para_t; + + + +extern g_tsg_para_t g_tsg_para; + +#endif \ No newline at end of file diff --git a/inc/tsg_log_id.h b/inc/tsg_log_id.h index 0f5f2f5..177ebf8 100644 --- a/inc/tsg_log_id.h +++ b/inc/tsg_log_id.h @@ -85,47 +85,6 @@ #define CABOT_LOG_OPT_DNS_RR 78 -#define CABOT_LOG_OPT_SSL_VERSION 81 -#define CABOT_LOG_OPT_SSL_SNI 82 -#define CABOT_LOG_OPT_SSL_SAN 83 -#define CABOT_LOG_OPT_SSL_CN 84 -#define CABOT_LOG_OPT_FTP_URL 88 -#define CABOT_LOG_OPT_FTP_CONTENT 89 - -#define CABOT_LOG_OPT_VOIP_CALLING_ACCOUNT 90 -#define CABOT_LOG_OPT_VOIP_CALLED_ACCOUNT 91 -#define CABOT_LOG_OPT_VOIP_CALLING_NUMBER 92 -#define CABOT_LOG_OPT_VOIP_CALLED_NUMBER 93 - -#define CABOT_LOG_OPT_BGP_PACKET_TYPE 100 -#define CABOT_LOG_OPT_BGP_AS_NUM 101 -#define CABOT_LOG_OPT_BGP_ROUTE 102 - -#define CABOT_LOG_OPT_RADIUS_PACKET_TYPE 112 -#define CABOT_LOG_OPT_RADIUS_ACCOUNT 113 -#define CABOT_LOG_OPT_RADIUS_CALLBACK_NUMBER 114 -#define CABOT_LOG_OPT_RADIUS_CALLBACK_ID 115 -#define CABOT_LOG_OPT_RADIUS_CALLED_STATION_ID 116 -#define CABOT_LOG_OPT_RADIUS_CALLING_STATION_ID 117 -#define CABOT_LOG_OPT_RADIUS_ACCT_SESSION_ID 118 -#define CABOT_LOG_OPT_RADIUS_ACCT_MULTI_SESSION_ID 119 -#define CABOT_LOG_OPT_RADIUS_NAS_IP_ADDRESS 120 -#define CABOT_LOG_OPT_RADIUS_FRAMED_IP_ADDRESS 121 -#define CABOT_LOG_OPT_RADIUS_FRAMED_IP_NETMASK 122 -#define CABOT_LOG_OPT_RADIUS_SERVICE_TYPE 123 -#define CABOT_LOG_OPT_RADIUS_FRAMED_MTU 124 -#define CABOT_LOG_OPT_RADIUS_SESSION_TIMEOUT 125 -#define CABOT_LOG_OPT_RADIUS_IDLE_TIMEOUT 126 -#define CABOT_LOG_OPT_RADIUS_TERMINATION_CATION 127 -#define CABOT_LOG_OPT_RADIUS_PROXY_STATE 128 -#define CABOT_LOG_OPT_RADIUS_ACCT_STATUS_TYPE 129 -#define CABOT_LOG_OPT_RADIUS_ACCT_INPUT_OCTETS 130 -#define CABOT_LOG_OPT_RADIUS_ACCT_INPUT_PACKETS 131 -#define CABOT_LOG_OPT_RADIUS_ACCT_OUTPUT_OCTETS 132 -#define CABOT_LOG_OPT_RADIUS_ACCT_OUTPUT_PACKETS 133 -#define CABOT_LOG_OPT_RADIUS_ACCT_TERMINATE_CAUSE 134 -#define CABOT_LOG_OPT_RADIUS_ACCT_LINK_COUNT 135 -#define CABOT_LOG_OPT_RADIUS_ACCT_INTERIM_INTERVAL 136 #endif diff --git a/inc/tsg_send_log.h b/inc/tsg_send_log.h index 6758046..cb0f5d7 100644 --- a/inc/tsg_send_log.h +++ b/inc/tsg_send_log.h @@ -17,9 +17,11 @@ typedef struct _tsg_log typedef enum _tld_type { + TLD_TYPE_UNKNOWN=0, TLD_TYPE_LONG=1, TLD_TYPE_STRING, - TLD_TYPE_FILE + TLD_TYPE_FILE, + TLD_TYPE_MAX }TLD_TYPE; diff --git a/inc/tsg_types.h b/inc/tsg_types.h index 0967ef3..da32e5f 100644 --- a/inc/tsg_types.h +++ b/inc/tsg_types.h @@ -1,97 +1,52 @@ #ifndef __TSG_TYPES_H__ #define __TSG_TYPES_H__ -typedef enum _tsg_opt +typedef enum _tsg_log_field_id { - LOG_OPT_HTTP_URL=1, - LOG_OPT_HTTP_HOST, - LOG_OPT_HTTP_REQUEST_LINE, - LOG_OPT_HTTP_RESPONSE_LINE, - LOG_OPT_HTTP_REQUEST_HEADER, - LOG_OPT_HTTP_RESPONSE_HEADER, - LOG_OPT_HTTP_REQUEST_BODY, - LOG_OPT_HTTP_RESPONSE_BODY, - LOG_OPT_HTTP_PROXY_FLAG, - LOG_OPT_HTTP_SEQUENCE, - LOG_OPT_HTTP_SNAPSHOT, - LOG_OPT_HTTP_COOKIE, - LOG_OPT_HTTP_REFERER, - LOG_OPT_HTTP_USER_AGENT, - LOG_OPT_HTTP_CONTENT_LENGTH, - LOG_OPT_HTTP_CONTENT_TYPE, - LOG_OPT_HTTP_SET_COOKIE, - LOG_OPT_HTTP_VERSION, + LOG_COMMON_SERVICE, + LOG_COMMON_DIRECTION, + LOG_COMMON_L4_PROTOCOL, + LOG_COMMON_ADDRESS_TYPE, + LOG_COMMON_SCHAME_TYPE, + LOG_COMMON_POLICY_ID, + LOG_COMMON_USER_TAG, + LOG_COMMON_ACTION, + LOG_COMMON_USER_REGION, + LOG_COMMON_CLIENT_IP, + LOG_COMMON_CLIENT_PORT, + LOG_COMMON_ENTRANCE_ID, + LOG_COMMON_DEVICE_ID, + LOG_COMMON_LINK_ID, + LOG_COMMON_ISP, + LOG_COMMON_ENCAPSULATION, + LOG_COMMON_SLED_IP, + LOG_COMMON_SERVER_IP, + LOG_COMMON_SERVER_PORT, + LOG_COMMON_APP_LABEL, + LOG_COMMON_APP_ID, + LOG_COMMON_PROTOCOL_ID, + LOG_COMMON_C2S_PKT_NUM, + LOG_COMMON_S2C_PKT_NUM, + LOG_COMMON_C2S_BYTE_NUM, + LOG_COMMON_S2C_BYTE_NUM, + LOG_COMMON_START_TIME, + LOG_COMMON_END_TIME, + LOG_COMMON_CON_DURATION_MS, + LOG_COMMON_STREAM_DIR, + LOG_COMMON_ADDRESS_LIST, + LOG_COMMON_HAS_DUP_TRAFFIC, + LOG_COMMON_STREAM_ERROR, + LOG_COMMON_STREAM_TRACE_ID, + LOG_HTTP_HOST, + LOG_SSL_SNI, + LOG_COMMON_MAX +}tsg_log_field_id_t; - LOG_OPT_MAIL_PROTOCOL_TYPE, - LOG_OPT_MAIL_SENDER, - LOG_OPT_MAIL_RECEIVER, - LOG_OPT_MAIL_SUBJECT, - LOG_OPT_MAIL_CONTENT, - LOG_OPT_MAIL_ATTACHMENT_NAME, - LOG_OPT_MAIL_ATTACHMENT_CONTENT, - LOG_OPT_MAIL_EML_FILE, - LOG_OPT_MAIL_SNAPSHOT, - LOG_OPT_MAIL_SUBJECT_CHARSET, - - LOG_OPT_DNS_MESSAGE_ID, - LOG_OPT_DNS_QR, - LOG_OPT_DNS_OPCODE, - LOG_OPT_DNS_AA, - LOG_OPT_DNS_TC, - LOG_OPT_DNS_RD, - LOG_OPT_DNS_RA, - LOG_OPT_DNS_RCODE, - LOG_OPT_DNS_QDCOUNT, - LOG_OPT_DNS_ANCOUNT, - LOG_OPT_DNS_NSCOUNT, - LOG_OPT_DNS_ARCOUNT, - LOG_OPT_DNS_QNAME, - LOG_OPT_DNS_QTYPE, - LOG_OPT_DNS_QCLASS, - LOG_OPT_DNS_CNAME, - LOG_OPT_DNS_SUB, - LOG_OPT_DNS_RR, - - LOG_OPT_SSL_VERSION, - LOG_OPT_SSL_SNI, - LOG_OPT_SSL_SAN, - LOG_OPT_SSL_CN, - LOG_OPT_SSL_PINNINGST, - LOG_OPT_SSL_INTERCEPT_STATE, - LOG_OPT_SSL_SERVER_SIDE_LATENCY, - LOG_OPT_SSL_CLINET_SIDE_LATENCY, - LOG_OPT_SSL_SERVER_SIDE_VERSION, - LOG_OPT_SSL_CLIENT_SIDE_VERSION, - LOG_OPT_SSL_CERT_VERIFY, - LOG_OPT_SSL_ERROR, - LOG_OPT_SSL_CON_LATENCY_MS, - - LOG_OPT_FTP_URL, - LOG_OPT_FTP_CONTENT, - - LOG_OPT_BGP_TYPE, - LOG_OPT_BGP_AS_NUM, - LOG_OPT_BGP_ROUTE, - - LOG_OPT_VOIP_CALLING_ACCOUNT, - LOG_OPT_VOIP_CALLED_ACCOUNT, - LOG_OPT_VOIP_CALLING_NUMBER, - LOG_OPT_VOIP_CALLED_NUMBER, - - LOG_OPT_RADIUS_PACKET_TYPE, - LOG_OPT_RADIUS_NAS_IP, - LOG_OPT_RADIUS_FRAMED_IP, - LOG_OPT_RADIUS_ACCOUNT, - LOG_OPT_RADIUS_SEESION_TIMEOUT, - LOG_OPT_RADIUS_IDLE_TIMEOUT, - LOG_OPT_RADIUS_ACCT_STATUS_TYPE, - LOG_OPT_RADIUS_ACCT_TERMINATE_CAUSE, - LOG_OPT_MAX -}tsg_opt_t; typedef enum _tsg_protocol { - PROTO_IPv4, + PROTO_UNKONWN=0, + PROTO_IPv4=1, PROTO_IPv6, PROTO_TCP, PROTO_UDP, diff --git a/src/tsg_entry.cpp b/src/tsg_entry.cpp index 8b69083..ec658ab 100644 --- a/src/tsg_entry.cpp +++ b/src/tsg_entry.cpp @@ -1,6 +1,283 @@ #include #include #include +#include -char TSG_MASTER_VERSION_20191112=0; +#include +#include +#include + +#include "tsg_rule.h" +#include "tsg_entry.h" +#include "tsg_send_log.h" +#include "tsg_send_log_internal.h" + + + +char TSG_MASTER_VERSION_20191115=0; +const char *tsg_conffile="tsgconf/main.conf"; +g_tsg_para_t g_tsg_para; + +static void free_policy_label(int thread_seq, void *project_req_value) +{ + dictator_free(thread_seq, project_req_value); + project_req_value=NULL; +} + +#if 0 +static int is_ip_policy(Maat_rule_t *p_result, char *protocol, int len, int thread_seq) +{ + int ret=0; + cJSON *item=NULL; + char *service_defined=NULL; + cJSON *user_define_object=NULL; + + if(p_result->serv_def_len>MAX_SERVICE_DEFINE_LEN) + { + service_defined=dictator_malloc(thread_seq, p_result->serv_def_len+1); + ret=Maat_read_rule(g_tsg_maat_feather, p_result, MAAT_RULE_SERV_DEFINE, service_defined, p_result->serv_def_len+1); + assert(ret==p_result->serv_def_len+1); + + user_define_object=cJSON_Parse(service_defined); + } + else + { + user_define_object=cJSON_Parse(p_result->service_defined); + } + + if(user_define_object!=NULL) + { + item=cJSON_GetObjectItem(user_define_object, "protocol"); + if(item!=NULL && item->valuestring!=NULL) + { + memcpy(protocol, item->valuestring, (len>strlen(item->valuestring)) ? strlen(item->valuestring): len); + } + + item=cJSON_GetObjectItem(user_define_object, "method"); + if((item==NULL) || ((strncasecmp(item->valuestring, "http", strlen(item->valuestring)))!=0 && (strncasecmp(item->valuestring, "ssl", strlen(item->valuestring)))!=0)) + { + ret=1; + } + + cJSON_Delete(user_define_object); + user_define_object=NULL; + } + + if(service_defined!=NULL) + { + dictator_free(thread_seq, service_defined); + service_defined=NULL; + } + + return ret; +} +#endif +static Maat_rule_t *tsg_policy_decision_criteria(Maat_rule_t *result, int result_num) +{ + int i=0; + Maat_rule_t *p_result=NULL; + if(result==NULL || result_num<=0) + { + return NULL; + } + + p_result=&result[0]; + + for(i=1; ip_result->action) + { + p_result=&result[i]; + continue; + } + + if(result[i].action==p_result->action) + { + if(result[i].config_idconfig_id) + { + p_result=&result[i]; + } + } + } + + return p_result; +} + +extern "C" char TSG_MASTER_TCPALL_ENTRY(struct streaminfo *a_tcp, void **pme, int thread_seq,void *a_packet) +{ + int send_log=0; + int ret=0,hit_num=0,ip_policy=0; + int state=APP_STATE_DROPME; + scan_status_t mid=NULL; + char *domain_field_name=NULL; + char *schema_field_name=NULL; + Maat_rule_t *p_result=NULL; + Maat_rule_t *q_result=NULL; + tsg_log_t log_msg; + TLD_handle_t TLD_handle=NULL; + struct _identify_info identify_info; + Maat_rule_t all_result[MAX_RESULT_NUM]; + policy_priority_label_t *priority_label=NULL; + + switch(a_tcp->pktstate) + { + case OP_STATE_PENDING: + if((a_tcp->ptcpdetail->pdata==NULL) || (a_tcp->ptcpdetail->datalen<=0) || (a_tcp->dir==DIR_DOUBLE && a_tcp->curdir==DIR_S2C)) + { + return APP_STATE_GIVEME; + } + + ret=tsg_scan_nesting_addr(g_tsg_maat_feather, a_tcp, PROTO_MAX, &mid, all_result, MAX_RESULT_NUM-hit_num); + if(ret>0) + { + hit_num+=ret; + q_result=tsg_policy_decision_criteria(all_result, hit_num); + } + + + if(a_tcp->curdir==DIR_C2S) + { + memset(&identify_info, 0, sizeof(identify_info)); + + ret=tsg_scan_shared_policy(g_tsg_maat_feather, + a_tcp->ptcpdetail->pdata, + a_tcp->ptcpdetail->datalen, + all_result, + MAX_RESULT_NUM-hit_num, + &identify_info, + &mid, + g_tsg_para.logger, + thread_seq); + if(ret>0) + { + hit_num+=ret; + } + } + + p_result=tsg_policy_decision_criteria(all_result, hit_num); + + if(p_result!=NULL) + { + if(q_result!=NULL && (p_result==q_result)) + { + ip_policy=1; + send_log=1; + } + + switch(p_result->action) + { + case TSG_ACTION_DENY: + if(ip_policy==1) + { + MESA_kill_tcp(a_tcp, a_packet); + state|=APP_STATE_DROPPKT; + } + break; + case TSG_ACTION_MONITOR: + break; + case TSG_ACTION_BYPASS: + send_log=1; + state|=APP_STATE_DROPPKT; //TODO + break; + case TSG_ACTION_INTERCEPT: + priority_label=(policy_priority_label_t *)dictator_malloc(thread_seq, sizeof(policy_priority_label_t)); + + priority_label->result_num=1; + priority_label->domain_len=identify_info.domain_len; + memcpy(priority_label->domain, identify_info.domain, identify_info.domain_len); + memcpy(priority_label->result, p_result, sizeof(struct Maat_rule_t)); + + ret=project_req_add_struct(a_tcp, g_tsg_para.priority_project_id, (void *)priority_label); + if(ret<0) + { + free_policy_label(thread_seq, (void *)priority_label); + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "PROJECT_ADD", "Add policy_priority_label failed ..."); + } + break; + case TSG_ACTION_NONE: + default: + assert(0); + break; + } + + if(send_log==1 && p_result->do_log>0) + { + TLD_handle=TLD_create(thread_seq); + if(priority_label->proto!=PROTO_UNKONWN) + { + schema_field_name=log_field_id2name(g_tsg_log_instance, LOG_COMMON_SCHAME_TYPE); + TLD_append(TLD_handle, schema_field_name, (void *)((priority_label->proto==PROTO_HTTP) ? "HTTP" : "SSL"), TLD_TYPE_LONG); + + domain_field_name=log_field_id2name(g_tsg_log_instance, ((priority_label->proto==PROTO_HTTP) ? LOG_HTTP_HOST : LOG_SSL_SNI)); + TLD_append(TLD_handle, domain_field_name, (void *)priority_label->domain, TLD_TYPE_STRING); + } + + log_msg.a_stream=a_tcp; + log_msg.result=p_result; + log_msg.result_num=1; + tsg_send_log(g_tsg_log_instance, TLD_handle, &log_msg, thread_seq); + } + } + break; + case OP_STATE_DATA: + case OP_STATE_CLOSE: + default: + break; + } + + return state; +} + + + +extern "C" int TSG_MASTER_INIT() +{ + int ret=0,level=30; + char log_path[128]={0}; + char label_buff[128]={0}; + + memset(&g_tsg_para, 0, sizeof(g_tsg_para)); + + MESA_load_profile_int_def(tsg_conffile, "SYSTEM","LOG_LEVEL", &level, 30); + MESA_load_profile_string_def(tsg_conffile, "SYSTEM","LOG_PATH", log_path, sizeof(log_path), NULL); + + g_tsg_para.logger=MESA_create_runtime_log_handle(log_path, level); + if(g_tsg_para.logger==NULL) + { + printf("MESA_create_runtime_log_handle failed ...\n"); + return -1; + } + + + MESA_load_profile_string_def(tsg_conffile, "SYSTEM", "POLICY_PRIORITY_LABEL", label_buff, sizeof(label_buff), "POLICY_PRIORITY"); + g_tsg_para.priority_project_id=project_producer_register(label_buff, PROJECT_VAL_TYPE_STRUCT, free_policy_label); + if(g_tsg_para.priority_project_id<0) + { + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "PROJECT_REGISTER", "Register %s failed ...", label_buff); + return -1; + } + + ret=tsg_rule_init(tsg_conffile, g_tsg_para.logger); + if(ret<0) + { + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_MAAT", "tsg_rule_init failed ..."); + return -1; + } + + g_tsg_log_instance=tsg_sendlog_init(tsg_conffile); + if(g_tsg_log_instance==NULL) + { + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_SENDLOG", "tsg_sendlog_init failed ..."); + return -1; + } + + return 0; +} + + + +extern "C" int T2_MASTER_UNLOAD() +{ + return 0; +} diff --git a/src/tsg_rule.cpp b/src/tsg_rule.cpp index 639285e..7a1c296 100644 --- a/src/tsg_rule.cpp +++ b/src/tsg_rule.cpp @@ -1,18 +1,257 @@ #include #include #include +#include +#include + +#include +#include +#include #include "tsg_rule.h" +#include "tsg_entry.h" -extern Maat_feather_t g_tsg_maat_feather; +Maat_feather_t g_tsg_maat_feather; -int tsg_pull_policy_result(PULL_RESULT_TYPE pull_result_type, Maat_rule_t*result, int result_num) +#define MAX_PATH_LEN 1024 + +int tsg_rule_init(const char* conffile, void *logger) { + unsigned short redis_port = 0; + int ret=0,scan_detail=0,effect_interval=60; + const char* instance_name=NULL,*module="MAAT"; + int factor=0, redis_port_num=0,redis_index=0; + char redis_ip[16]={0}, effective_flag[1024]={0}; + int maat_mode=0,maat_stat_on=0,maat_perf_on=0,thread_max=0; + char ip_addr_table[32]={0},subscriber_id_table[32]={0}; + char json_cfg_file[MAX_PATH_LEN]={0},maat_stat_file[MAX_PATH_LEN]={0}; + char table_info[MAX_PATH_LEN]={0},inc_cfg_dir[MAX_PATH_LEN]={0},ful_cfg_dir[MAX_PATH_LEN]={0}; + + memset(effective_flag, 0, sizeof(effective_flag)); + MESA_load_profile_string_def(conffile,module,"EFFECTIVE_FLAG",effective_flag, sizeof(effective_flag),""); + + MESA_load_profile_int_def(conffile, module,"MAAT_MODE", &(maat_mode),0); + MESA_load_profile_int_def(conffile, module,"STAT_SWITCH", &(maat_stat_on),1); + MESA_load_profile_int_def(conffile, module,"PERF_SWITCH", &(maat_perf_on),1); + + MESA_load_profile_string_def(conffile,module,"TABLE_INFO",table_info, sizeof(table_info), ""); + MESA_load_profile_string_def(conffile,module,"STAT_FILE",maat_stat_file, sizeof(maat_stat_file), ""); + MESA_load_profile_int_def(conffile, module,"EFFECT_INTERVAL_S", &(effect_interval), 60); + effect_interval*=1000;//convert s to ms + + thread_max=get_thread_count(); + g_tsg_maat_feather=Maat_feather(thread_max, table_info, logger); + if(maat_mode==2) + { + MESA_load_profile_string_def(conffile,module,"REDIS_IP", redis_ip, sizeof(redis_ip),""); + MESA_load_profile_int_def(conffile, module,"REDIS_PORT_NUM", &(redis_port_num), 1); + MESA_load_profile_short_def(conffile, module,"REDIS_PORT", (short*)&(redis_port), 6379); + MESA_load_profile_int_def(conffile, module,"REDIS_INDEX", &redis_index, 0); + + if(strlen(effective_flag)!=0) + { + Maat_set_feather_opt(g_tsg_maat_feather,MAAT_OPT_ACCEPT_TAGS,effective_flag, strlen(effective_flag)+1); + } + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_EFFECT_INVERVAL_MS, &effect_interval, sizeof(effect_interval)); + + srand((unsigned int)time(NULL)); + factor = rand()%redis_port_num; + redis_port = redis_port+factor; + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_REDIS_IP, redis_ip, strlen(redis_ip)+1); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_REDIS_PORT, (void *)&redis_port, sizeof(redis_port)); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_STAT_FILE_PATH, maat_stat_file, strlen(maat_stat_file)+1); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_STAT_ON, NULL, 0); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_PERF_ON, NULL, 0); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_REDIS_INDEX, &redis_index, sizeof(redis_index)); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail)); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_DEFERRED_LOAD, NULL,0); + } + else + { + if(strlen(effective_flag)!=0) + { + ret=Maat_set_feather_opt(g_tsg_maat_feather,MAAT_OPT_ACCEPT_TAGS,effective_flag, strlen(effective_flag)+1); + assert(ret>=0); + } + Maat_set_feather_opt(g_tsg_maat_feather,MAAT_OPT_INSTANCE_NAME,instance_name, strlen(instance_name)+1); + if(maat_mode==1) + { + MESA_load_profile_string_def(conffile,module,"JSON_CFG_FILE",json_cfg_file, sizeof(json_cfg_file),""); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_JSON_FILE_PATH, json_cfg_file, strlen(json_cfg_file)+1); + } + else + { + MESA_load_profile_string_def(conffile,module,"INC_CFG_DIR",inc_cfg_dir, sizeof(inc_cfg_dir),""); + MESA_load_profile_string_def(conffile,module,"FULL_CFG_DIR",ful_cfg_dir, sizeof(ful_cfg_dir),""); + assert(strlen(inc_cfg_dir)!=0&&strlen(ful_cfg_dir)!=0); + + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_FULL_CFG_DIR, ful_cfg_dir, strlen(ful_cfg_dir)+1); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_INC_CFG_DIR, inc_cfg_dir, strlen(inc_cfg_dir)+1); + } + if(maat_stat_on) + { + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_STAT_FILE_PATH, maat_stat_file, strlen(maat_stat_file)+1); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_STAT_ON, NULL, 0); + if(maat_perf_on) + { + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_PERF_ON, NULL, 0); + } + } + + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_EFFECT_INVERVAL_MS, &effect_interval, sizeof(effect_interval)); + Maat_set_feather_opt(g_tsg_maat_feather, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail)); + } + + ret=Maat_initiate_feather(g_tsg_maat_feather); + if(ret<0) + { + return -1; + } + + MESA_load_profile_string_def(conffile, module, "IP_ADDR_TABLE", ip_addr_table, sizeof(ip_addr_table), "TSG_OBJ_IP_ADDR"); + MESA_load_profile_string_def(conffile, module, "SUBSCRIBER_ID_TABLE", subscriber_id_table, sizeof(subscriber_id_table), "TSG_OBJ_SUBSCRIBER_ID"); + + g_tsg_para.ip_addr_table_id=Maat_table_register(g_tsg_maat_feather, ip_addr_table); + if(g_tsg_para.ip_addr_table_id<0) + { + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_MAAT", "Register %s failed ...", ip_addr_table); + return -1; + } + + g_tsg_para.subscribe_id_table_id=Maat_table_register(g_tsg_maat_feather, subscriber_id_table); + if(g_tsg_para.subscribe_id_table_id<0) + { + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_MAAT", "Register %s failed ...", subscriber_id_table); + return -1; + } + + ret=tsg_shared_table_init(conffile, g_tsg_maat_feather, logger); + if(ret<0) + { + MESA_handle_runtime_log(g_tsg_para.logger, RLOG_LV_FATAL, "INIT_MAAT", "tsg_shared_table_init failed ..."); + return -1; + } + + return 0; +} + + +int tsg_pull_policy_result(struct streaminfo *a_stream, PULL_RESULT_TYPE pull_result_type, Maat_rule_t*result, int result_num, struct _identify_info *identify_info) +{ + int num=0; + policy_priority_label_t *label=NULL; + + label=(policy_priority_label_t *)project_req_get_struct(a_stream, g_tsg_para.priority_project_id); + if(label!=NULL && result!=NULL && result_num>0 && identify_info!=NULL) + { + if(label->result_type==pull_result_type) + { + num=(label->result_num>result_num) ? result_num : label->result_num; + memcpy(result, label->result, num*sizeof(Maat_rule_t)); + + memcpy(identify_info->domain, label->domain, label->domain_len); + identify_info->domain_len=label->domain_len; + + identify_info->proto = label->proto; + + return num; + } + } + return 0; } int tsg_scan_nesting_addr(Maat_feather_t maat_feather, struct streaminfo *a_stream, tsg_protocol_t proto, scan_status_t *mid, Maat_rule_t*result, int result_num) { - return 0; + struct ipaddr t_addr; + struct ipaddr* p_addr = NULL; + char subscribe_id[64]={0}; + int hit_num=0,tans_proto=0; + int is_scan_addr=1, maat_ret=0,found_pos=0; + const struct streaminfo *cur_stream = a_stream; + + if(result == NULL || result_num <= 0 || a_stream == NULL || maat_feather == NULL) + { + return -1; + } + + do + { + if(cur_stream->addr.addrtype == __ADDR_TYPE_IP_PAIR_V4 || cur_stream->addr.addrtype == ADDR_TYPE_IPV4 || cur_stream->addr.addrtype == __ADDR_TYPE_IP_PAIR_V6 || cur_stream->addr.addrtype == ADDR_TYPE_IPV6) + { + is_scan_addr = 1; + if(cur_stream->addr.addrtype == __ADDR_TYPE_IP_PAIR_V4 || cur_stream->addr.addrtype == __ADDR_TYPE_IP_PAIR_V6) + { + memcpy(&t_addr, &cur_stream->addr, sizeof(t_addr)); + if(cur_stream->addr.addrtype == __ADDR_TYPE_IP_PAIR_V4) + t_addr.addrtype = ADDR_TYPE_IPV4; + else + t_addr.addrtype = ADDR_TYPE_IPV6; + p_addr = &t_addr; + } + else + { + p_addr = (struct ipaddr *)&cur_stream->addr; + } + } + else + { + is_scan_addr = 0; + p_addr = NULL; + } + + if(is_scan_addr==1 && p_addr!=NULL) + { + switch(cur_stream->type) + { + case STREAM_TYPE_TCP: + tans_proto=6; + break; + case STREAM_TYPE_UDP: + tans_proto=17; + break; + default: + tans_proto=255; + break; + } + + maat_ret=Maat_scan_proto_addr(maat_feather, + g_tsg_para.ip_addr_table_id, + p_addr, + tans_proto, + result+hit_num, + result_num-hit_num, + mid, + cur_stream->threadnum); + if(maat_ret > 0) + { + hit_num+=maat_ret; + } + } + + cur_stream = cur_stream->pfather; + + }while(cur_stream != NULL && hit_num < result_num); + + + if(hit_num < result_num) + { + maat_ret=Maat_full_scan_string(maat_feather, + g_tsg_para.subscribe_id_table_id, + CHARSET_GBK, + subscribe_id, + strlen(subscribe_id), + result+hit_num, + &found_pos, + result_num-hit_num, + mid, + a_stream->threadnum); + if(maat_ret > 0) + { + hit_num+=maat_ret; + } + } + + return hit_num; } diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 7ae1681..636fb5d 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -11,7 +12,6 @@ #include #include -#include #include #include @@ -19,7 +19,161 @@ #include "tsg_send_log.h" #include "tsg_send_log_internal.h" -tsg_logger_info_t tsg_logger_info; +char TSG_SEND_LOG_VERSION_20191115=0; +tsg_log_instance_t g_tsg_log_instance; + +const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "UNKOWN"}, + {TLD_TYPE_LONG, TLD_TYPE_LONG, "LONG"}, + {TLD_TYPE_STRING, TLD_TYPE_STRING, "STRING"}, + {TLD_TYPE_FILE, TLD_TYPE_FILE, "FILE"} + }; + +int TLD_cancel(TLD_handle_t handle) +{ + int thread_id=0; + struct _tld_handle *_handle=NULL; + if(handle!=NULL) + { + _handle=(struct _tld_handle *)handle; + cJSON_Delete(_handle->object); + _handle->object=NULL; + + thread_id=_handle->thread_id; + dictator_free(thread_id, handle); + + handle=NULL; + } + + return 0; +} + +int TLD_delete(TLD_handle_t handle, char *key) +{ + struct _tld_handle *_handle=(struct _tld_handle *)handle; + + if(_handle!=NULL && key!=NULL) + { + cJSON_DeleteItemFromObject(_handle->object, key); + } + + return 0; +} + +int TLD_append(TLD_handle_t handle, char *key, void *value, TLD_TYPE type) +{ + char buff[128]={0}; + struct _tld_handle *_handle=(struct _tld_handle *)handle; + + if(_handle==NULL || key==NULL || value==NULL) + { + return -1; + } + + switch(type) + { + case TLD_TYPE_LONG: + snprintf(buff, sizeof(buff), "%ld", *(long *)value); + cJSON_AddStringToObject(_handle->object, key, buff); + break; + case TLD_TYPE_FILE: + break; + case TLD_TYPE_STRING: + cJSON_AddStringToObject(_handle->object, key, (char *)value); + break; + default: + return -1; + break; + } + + + return 0; +} + +TLD_handle_t TLD_create(int thread_id) +{ + struct _tld_handle *_handle=(struct _tld_handle *)dictator_malloc(thread_id, sizeof(struct _tld_handle)); + _handle->thread_id = thread_id; + _handle->object = cJSON_CreateObject(); + + return (TLD_handle_t)_handle; +} + + +char *log_field_id2name(tsg_log_instance_t instance, tsg_log_field_id_t id) +{ + struct _tsg_log_instance *log_instance = (struct _tsg_log_instance *)instance; + if(log_instance!=NULL) + { + return log_instance->id2field[id].name; + } + + return NULL; +} + +int TLD_append_streaminfo(struct _tsg_log_instance *_instance, struct _tld_handle *_handle, struct streaminfo *a_stream) +{ + int ret=0,addr_type=0; + unsigned short tunnel_type=0; + char nest_addr_buf[1024]; + char *addr_proto=NULL; + int tunnel_type_size=sizeof(tunnel_type); + struct layer_addr_ipv4 *ipv4=NULL; + struct layer_addr_ipv6 *ipv6=NULL; + char server_ip[MAX_IPV4_LEN*8]={0}; + char client_ip[MAX_IPV4_LEN*8]={0}; + + switch(a_stream->addr.addrtype) + { + case ADDR_TYPE_IPV4: + case __ADDR_TYPE_IP_PAIR_V4: + ipv4=a_stream->addr.ipv4; + inet_ntop(AF_INET, (void *)&ipv4->saddr, client_ip, sizeof(client_ip)); + inet_ntop(AF_INET, (void *)&ipv4->daddr, server_ip, sizeof(server_ip)); + + addr_type=4; + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVER_IP].name, (void *)server_ip, TLD_TYPE_STRING); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_CLIENT_IP].name, (void *)client_ip, TLD_TYPE_STRING); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVER_PORT].name, (void *)&ipv4->dest, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_CLIENT_PORT].name, (void *)&ipv4->source, TLD_TYPE_LONG); + break; + case ADDR_TYPE_IPV6: + case __ADDR_TYPE_IP_PAIR_V6: + ipv6=a_stream->addr.ipv6; + inet_ntop(AF_INET6, (void *)ipv6->saddr, client_ip, sizeof(client_ip)); + inet_ntop(AF_INET6, (void *)ipv6->daddr, server_ip, sizeof(server_ip)); + + addr_type=6; + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVER_IP].name, (void *)server_ip, TLD_TYPE_STRING); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_CLIENT_IP].name, (void *)client_ip, TLD_TYPE_STRING); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVER_PORT].name, (void *)&ipv6->dest, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_CLIENT_PORT].name, (void *)&ipv6->source, TLD_TYPE_LONG); + break; + default: + break; + } + + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_STREAM_DIR].name, (void *)&a_stream->dir, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ADDRESS_TYPE].name, (void *)&addr_type, TLD_TYPE_LONG); + + + addr_proto=(char *)layer_addr_prefix_ntop(a_stream); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_L4_PROTOCOL].name, (void *)addr_proto, TLD_TYPE_STRING); + + + ret=MESA_get_stream_opt(a_stream, MSO_STREAM_TUNNEL_TYPE, &tunnel_type, &tunnel_type_size); + assert(ret==0); + if(tunnel_type==STREAM_TUNNLE_NON) + { + layer_addr_ntop_r(a_stream,nest_addr_buf, sizeof(nest_addr_buf)); + } + else + { + stream_addr_list_ntop(a_stream,nest_addr_buf, sizeof(nest_addr_buf)); + } + + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ADDRESS_LIST].name, (void *)nest_addr_buf, TLD_TYPE_STRING); + return 0; +} static unsigned int get_ip_by_eth_name(const char *ifname) { @@ -46,65 +200,260 @@ error: return INADDR_NONE; } -int steaminfo2opt() +static int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t *service2topic) { - return 0; -} - - -int tsg_sendlog_init(char *filename) -{ - int ret=0; - unsigned int local_ip_nr=0; - char nic_name[32]; - - memset(&tsg_logger_info, 0, sizeof(tsg_logger_info)); - - MESA_load_profile_int_def(filename, "TSG_LOG", "MODE",&(tsg_logger_info.mode), 0); - MESA_load_profile_string_def(filename, "TSG_LOG", "FIELD_FILE", tsg_logger_info.field_file, sizeof(tsg_logger_info.field_file), NULL); - MESA_load_profile_string_def(filename, "TSG_LOG", "BROKER_LIST", tsg_logger_info.broker_list, sizeof(tsg_logger_info.broker_list), NULL); - - MESA_load_profile_int_def(filename, "TSG_LOG", "LEVEL",&(tsg_logger_info.level), 30); - MESA_load_profile_string_def(filename, "TSG_LOG", "LOG_PATH", tsg_logger_info.log_path, sizeof(tsg_logger_info.log_path), NULL); - - tsg_logger_info.logger=MESA_create_runtime_log_handle(tsg_logger_info.log_path, tsg_logger_info.level); - if(tsg_logger_info.logger==NULL) + int i=0; + int ret=0,id=0; + FILE *fp=NULL; + char line[1024]={0}; + char field_name[64]={0}; + char type_name[32]={0}; + + fp=fopen(filename, "r"); + if(fp==NULL) { - printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", tsg_logger_info.log_path, tsg_logger_info.level); + printf("Open %s failed ...", filename); return -1; } - tsg_logger_info.cabot_handle=cabot_sendlog_create(); - cabot_sendlog_set(tsg_logger_info.cabot_handle, SENDLOG_MODE,(void *)&(tsg_logger_info.mode)); - cabot_sendlog_set(tsg_logger_info.cabot_handle, CONFIG_FILE,(void *)tsg_logger_info.field_file); - cabot_sendlog_set(tsg_logger_info.cabot_handle, BROKER_LIST,(void *)&(tsg_logger_info.mode)); + memset(line, 0, sizeof(line)); - ret=cabot_sendlog_init(tsg_logger_info.cabot_handle, tsg_logger_info.logger); - if(ret<0) + while((fgets(line, sizeof(line), fp))!=NULL) { - MESA_handle_runtime_log(tsg_logger_info.logger, RLOG_LV_FATAL, "CABOT_INIT", "cabot_sendlog_init failed ..."); - return -2; + if(line[0]=='#' || line[0]=='\n' || line[0]=='\r' ||line[0]=='\0') + { + continue; + } + memset(type_name, 0, sizeof(type_name)); + ret=sscanf(line, "%s %s %d", type_name, field_name, &id); + assert(ret==3 && idmode), 0); + MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL); + + MESA_load_profile_int_def(conffile, "TSG_LOG", "LEVEL",&(level), 30); + MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), NULL); + + _instance->logger=MESA_create_runtime_log_handle(log_path, level); + if(_instance->logger==NULL) + { + printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", log_path, level); + return NULL; + } + + MESA_load_profile_string_def(conffile, "TSG_LOG", "NIC_NAME", nic_name, sizeof(nic_name), "eth0"); local_ip_nr=get_ip_by_eth_name(nic_name); if(local_ip_nr==INADDR_NONE) { - MESA_handle_runtime_log(tsg_logger_info.logger,RLOG_LV_FATAL, "GET_LOCAL_IP","get NIC_NAME: %s error.", nic_name); - return -3; + MESA_handle_runtime_log(_instance->logger,RLOG_LV_FATAL, "GET_LOCAL_IP","get NIC_NAME: %s error.", nic_name); + return NULL; } - inet_ntop(AF_INET,&(local_ip_nr),tsg_logger_info.local_ip_str,sizeof(tsg_logger_info.local_ip_str)); + inet_ntop(AF_INET,&(local_ip_nr),_instance->local_ip_str,sizeof(_instance->local_ip_str)); - //maat - + rdkafka_conf = rd_kafka_conf_new(); + rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "request.required.acks", "1", kafka_errstr, sizeof(kafka_errstr)); + if(!(kafka_handle=rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)))) + { + MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "CABOT_INIT", "rd_kafka_new is error"); + return NULL; + } + + if(rd_kafka_brokers_add(kafka_handle, _instance->broker_list) == 0) + { + MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_brokers_add is error, broker_list: %s", _instance->broker_list); + return NULL; + } + + MESA_load_profile_int_def(conffile, "TSG_LOG", "MAX_SERVICE",&(_instance->max_service), 0); + *(_instance->topic_rkt)=(rd_kafka_topic_t *)calloc(1, (1+_instance->max_service)*sizeof(rd_kafka_topic_t*)); + _instance->service2topic=(id2field_t *)calloc(1, (1+_instance->max_service)*sizeof(id2field_t)); + load_log_common_field(_instance->common_field_file, _instance->id2field, _instance->service2topic); + + for(i=0; i<_instance->max_service+1; i++) + { + if(_instance->service2topic[i].type==TLD_TYPE_MAX) + { + topic_conf=rd_kafka_topic_conf_new(); + _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(kafka_handle, _instance->service2topic[i].name, topic_conf); + } + } + + return (tsg_log_instance_t)_instance; +} + + +int tsg_send_log(tsg_log_instance_t instance, TLD_handle_t handle, tsg_log_t *log_msg, int thread_id) +{ + int i=0,status=0; + char *payload=NULL; + time_t cur_time; + struct vxlan_info vinfo; + int opt_val_len = sizeof(vinfo); + struct _tld_handle *_handle = (struct _tld_handle *)handle; + struct _tsg_log_instance *_instance = (struct _tsg_log_instance *)instance; + + if(_instance==NULL || _handle==NULL || log_msg==NULL) + { + MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "TSG_SEND_LOG", " instance==NULL || TLD_handle==NULL || log_msg==NULL "); + return -1; + } + + if(_instance->mode==CLOSE) + { + MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log."); + return 0; + } + + //TODO + //common_user_tags + //common_user_region + //common_isp + //common_app_label + //common_app_id + //common_protocol_id + //common_has_dup_traffic + //common_stream_error + //common_stream_trace_id + + TLD_append_streaminfo(_instance, _handle, log_msg->a_stream); + + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING); + + if(log_msg->a_stream!=NULL && log_msg->a_stream->ptcpdetail!=NULL) + { + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_START_TIME].name, (void *)(&log_msg->a_stream->ptcpdetail->createtime), TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_END_TIME].name, (void *)(&log_msg->a_stream->ptcpdetail->lastmtime), TLD_TYPE_LONG); + } + else + { + cur_time=time(NULL); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_START_TIME].name, (void *)&cur_time, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_END_TIME].name, (void *)&cur_time, TLD_TYPE_LONG); + } + + + status=MESA_get_stream_opt(log_msg->a_stream, MSO_STREAM_VXLAN_INFO, &vinfo, &opt_val_len); + if(status < 0) + { + MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", "tsg log: get vxlan info error, tuple4: %s", printaddr(&log_msg->a_stream->addr, thread_id)); + } + else + { + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_LINK_ID].name, (void *)&vinfo.link_id, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_DIRECTION].name, (void *)&vinfo.link_dir, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)&vinfo.dev_id, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ENTRANCE_ID].name, (void *)&vinfo.entrance_id, TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ENCAPSULATION].name, (void *)&vinfo.encap_type, TLD_TYPE_LONG); + } + + for(i=0;iresult_num; i++) + { + switch(log_msg->result[i].do_log) + { + case LOG_ABORT: + MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, + "TSG_SEND_LOG", + "tsg abort log:cfg_id=%d service=%d addr=%s", + log_msg->result[i].config_id, + log_msg->result[i].service_id, + printaddr(&(log_msg->a_stream->addr), thread_id)); + + continue; + break; + case LOG_ALL: + break; + case LOG_NOFILE: + break; + default: + break; + } + + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(&log_msg->result[i].config_id), TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(&log_msg->result[i].service_id), TLD_TYPE_LONG); + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(&log_msg->result[i].action), TLD_TYPE_LONG); + + payload = cJSON_PrintUnformatted(_handle->object); + + status = rd_kafka_produce(_instance->topic_rkt[log_msg->result[i].service_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, strlen(payload), NULL, 0, NULL); + + if(status < 0) + { + MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", + "tsg_send_log to kafka is error, status: %d, topic: %s payload: %s", + status, _instance->service2topic[log_msg->result[i].service_id].name, payload); + } + else + { + MESA_handle_runtime_log(_instance->logger,RLOG_LV_INFO, "TSG_SEND_LOG", + "log send successfully %s: %s", _instance->service2topic[log_msg->result[i].service_id].name, payload); + } + + free(payload); + payload=NULL; + + TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name); + TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVICE].name); + TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ACTION].name); + } + + + cJSON_Delete(_handle->object); + dictator_free(thread_id, handle); + handle=NULL; + return 0; } -void tsg_send_log(const tsg_log_t* log_msg, struct _opt_unit_t* log_opt, int opt_num, int thread_id) -{ - - -} diff --git a/src/tsg_send_log_internal.h b/src/tsg_send_log_internal.h index 02b5104..e18bd5d 100644 --- a/src/tsg_send_log_internal.h +++ b/src/tsg_send_log_internal.h @@ -1,17 +1,57 @@ #ifndef __TSG_SEND_LOG_INTERNAL_H__ #define __TSG_SEND_LOG_INTERNAL_H__ -typedef struct _tsg_logger_info +#include +#include + +#include "tsg_types.h" + + +#define MAX_IPV4_LEN 16 +#define MAX_STRING_LEN 32 + +enum _SEND_MODE +{ + CLOSE=0, + KAFKA=1, +}; + +enum _DO_LOG +{ + LOG_ABORT=0, + LOG_NOFILE=1, + LOG_ALL=2 +}; + +typedef struct _id2field +{ + TLD_TYPE type; + int id; + char name[MAX_STRING_LEN]; +}id2field_t; + +struct _tld_handle +{ + int thread_id; + cJSON *object; +}; + +struct _tsg_log_instance { int mode; - int level; + int max_service; void *logger; - void *cabot_handle; - char field_file[128]; - char broker_list[128]; - char log_path[128]; - char local_ip_str[16]; -}tsg_logger_info_t; + char common_field_file[MAX_STRING_LEN*4]; + char broker_list[MAX_STRING_LEN*4]; + char local_ip_str[MAX_IPV4_LEN]; + id2field_t id2field[LOG_COMMON_MAX]; + rd_kafka_topic_t **topic_rkt; + id2field_t *service2topic; +}; + + +char *log_field_id2name(tsg_log_instance_t instance, tsg_log_field_id_t id); +tsg_log_instance_t tsg_sendlog_init(const char *filename); #endif