diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 7a83299..434a7ff 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -19,7 +19,7 @@ #include "tsg_send_log.h" #include "tsg_send_log_internal.h" -char TSG_SEND_LOG_VERSION_20191120=0; +char TSG_SEND_LOG_VERSION_20191121=0; tsg_log_instance_t g_tsg_log_instance; const id2field_t tld_type[TLD_TYPE_MAX]={{TLD_TYPE_UNKNOWN, TLD_TYPE_UNKNOWN, "UNKOWN"}, @@ -48,7 +48,6 @@ unsigned long long tsg_get_stream_id(struct streaminfo * a_stream) int TLD_cancel(TLD_handle_t handle) { - int thread_id=0; struct _tld_handle *_handle=NULL; if(handle!=NULL) { @@ -58,9 +57,8 @@ int TLD_cancel(TLD_handle_t handle) cJSON_Delete(_handle->object); _handle->object=NULL; } - thread_id=_handle->thread_id; - dictator_free(thread_id, handle); - + + free(handle); handle=NULL; } @@ -109,7 +107,9 @@ int TLD_append(TLD_handle_t handle, char *key, void *value, TLD_TYPE type) TLD_handle_t TLD_create(int thread_id) { - struct _tld_handle *_handle=(struct _tld_handle *)dictator_malloc(thread_id, sizeof(struct _tld_handle)); + //struct _tld_handle *_handle=(struct _tld_handle *)dictator_malloc(thread_id, sizeof(struct _tld_handle)); + + struct _tld_handle *_handle=(struct _tld_handle *)calloc(1, sizeof(struct _tld_handle)); _handle->thread_id = thread_id; _handle->object = cJSON_CreateObject(); @@ -343,7 +343,7 @@ tsg_log_instance_t tsg_sendlog_init(const char *conffile) int tsg_send_log(tsg_log_instance_t instance, TLD_handle_t handle, tsg_log_t *log_msg, int thread_id) { - int i=0,status=0; + int i=0,ret=0,status=0; char *payload=NULL; time_t cur_time; struct vxlan_info vinfo; @@ -431,6 +431,34 @@ int tsg_send_log(tsg_log_instance_t instance, TLD_handle_t handle, tsg_log_t *lo TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(log_msg->result[i].service_id), TLD_TYPE_LONG); TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)((unsigned char)log_msg->result[i].action), TLD_TYPE_LONG); + if(log_msg->result[i].serv_def_len<128) + { + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_USER_REGION].name, (void *)(log_msg->result[i].service_defined), TLD_TYPE_STRING); + } + else + { + char *service_defined=(char *)dictator_malloc(thread_id, log_msg->result[i].serv_def_len+1); + ret=Maat_read_rule(g_tsg_maat_feather, &log_msg->result[i], MAAT_RULE_SERV_DEFINE, service_defined, log_msg->result[i].serv_def_len); + if(ret==log_msg->result[i].serv_def_len) + { + service_defined[log_msg->result[i].serv_def_len]='\0'; + TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_USER_REGION].name, (void *)(log_msg->result[i].service_defined), TLD_TYPE_STRING); + } + else + { + MESA_handle_runtime_log(_instance->logger, + RLOG_LV_FATAL, + "TSG_SEND_LOG", + "Fetch service_defined failed, policy_id: %d service: %d action: %d addr: %s", + log_msg->result[i].config_id, + log_msg->result[i].service_id, + log_msg->result[i].action, + printaddr(&log_msg->a_stream->addr, thread_id)); + } + dictator_free(thread_id, (void *)service_defined); + service_defined=NULL; + } + payload = cJSON_PrintUnformatted(_handle->object); status = rd_kafka_produce(_instance->topic_rkt[log_msg->result[i].service_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, strlen(payload), NULL, 0, NULL); @@ -453,13 +481,16 @@ int tsg_send_log(tsg_log_instance_t instance, TLD_handle_t handle, tsg_log_t *lo TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name); TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_SERVICE].name); TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ACTION].name); + TLD_delete((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_USER_REGION].name); FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_LOG], 0, FS_OP_ADD, 1); } cJSON_Delete(_handle->object); - dictator_free(thread_id, handle); + _handle->object=NULL; + + free(handle); handle=NULL; return 0;