diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 6c811d7..69c0b8e 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp) +add_library(common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(common PUBLIC libevent-static libevent-static-openssl libevent-static-pthreads) target_link_libraries(common PUBLIC MESA_handle_logger) diff --git a/common/include/tfe_kafka_logger.h b/common/include/tfe_kafka_logger.h new file mode 100644 index 0000000..85cac25 --- /dev/null +++ b/common/include/tfe_kafka_logger.h @@ -0,0 +1,34 @@ +#ifndef _TFE_KAFKA_LOGGER_H +#define _TFE_KAFKA_LOGGER_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include +#include + + typedef struct tfe_kafka_logger_s + { + int enable; + + unsigned int local_ip_num; + char local_ip_str[TFE_SYMBOL_MAX]; + + char topic_name[TFE_STRING_MAX]; + char broker_list[TFE_STRING_MAX]; + + rd_kafka_t *kafka_handle; + rd_kafka_topic_t *kafka_topic; + } tfe_kafka_logger_t; + + tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger); + void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger); + int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len); + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp new file mode 100644 index 0000000..549f9a8 --- /dev/null +++ b/common/src/tfe_kafka_logger.cpp @@ -0,0 +1,149 @@ +#include +#include +#include +#include + +#include + +// return INADDR_NONE if error occur +static unsigned int get_ip_by_eth_name(const char *ifname) +{ + int sockfd = -1; + struct ifreq ifr; + unsigned int ip; + + sockfd = socket(AF_INET, SOCK_DGRAM, 0); + if (-1 == sockfd) + { + goto error; + } + + strcpy(ifr.ifr_name, ifname); + if (ioctl(sockfd, SIOCGIFADDR, &ifr) == -1) + { + goto error; + } + close(sockfd); + + ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr; + return ip; + +error: + if (sockfd > 0) + close(sockfd); + return INADDR_NONE; +} + +static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logger) +{ + int ret; + char kafka_errstr[1024] = {0}; + rd_kafka_t *handle = NULL; + rd_kafka_conf_t *rconf = NULL; + + rconf = rd_kafka_conf_new(); + + ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + TFE_LOG_ERROR(local_logger, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); + rd_kafka_conf_destroy(rconf); + return NULL; + } + ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + TFE_LOG_ERROR(local_logger, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); + rd_kafka_conf_destroy(rconf); + return NULL; + } + ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + TFE_LOG_ERROR(local_logger, "Error to set kafka \"security.protocol\", %s.", kafka_errstr); + rd_kafka_conf_destroy(rconf); + return NULL; + } + + //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. + handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr)); + rconf = NULL; + if (handle == NULL) + { + TFE_LOG_ERROR(local_logger, "Error to new kafka, %s.", kafka_errstr); + return NULL; + } + + if (rd_kafka_brokers_add(handle, brokerlist) == 0) + { + TFE_LOG_ERROR(local_logger, "Error to add kakfa bokers."); + rd_kafka_destroy(handle); + return NULL; + } + + return handle; +} + +tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger) +{ + tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t)); + if (!logger) + return NULL; + + logger->enable = enable; + if (!logger->enable) + return logger; + + logger->local_ip_num = get_ip_by_eth_name(nic_name); + if (logger->local_ip_num == INADDR_NONE) + { + TFE_LOG_ERROR(local_logger, "Error to get NIC_NAME: %s.", nic_name); + free(logger); + return NULL; + } + inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str)); + + strncpy(logger->broker_list, brokerlist, strlen(brokerlist)); + logger->kafka_handle = create_kafka_handle(logger->broker_list, local_logger); + if (logger->kafka_handle == NULL) + { + TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list); + free(logger); + return NULL; + } + + strncpy(logger->topic_name, topic_name, strlen(topic_name)); + logger->kafka_topic = rd_kafka_topic_new(logger->kafka_handle, logger->topic_name, NULL); + if (logger->kafka_topic == NULL) + { + TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name); + rd_kafka_destroy(logger->kafka_handle); + free(logger); + return NULL; + } + + return logger; +} + +void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger) +{ + if (logger) + { + if (logger->kafka_handle) + rd_kafka_destroy(logger->kafka_handle); + + if (logger->kafka_topic) + rd_kafka_topic_destroy(logger->kafka_topic); + + free(logger); + logger = NULL; + } +} + +int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len) +{ + if (logger && logger->enable) + return rd_kafka_produce(logger->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL); + else + return 0; +} diff --git a/platform/src/ssl_fetch_cert.cpp b/platform/src/ssl_fetch_cert.cpp index 247e210..9dadcf6 100644 --- a/platform/src/ssl_fetch_cert.cpp +++ b/platform/src/ssl_fetch_cert.cpp @@ -2,157 +2,57 @@ // Created by lwp on 2019/10/16. // -#include "ssl_utils.h" -#include "tfe_utils.h" - #include -#include -#include -#include -#include - #include -#include + +#include +#include #include typedef struct x509_object_st { - /* one of the above types */ - X509_LOOKUP_TYPE type; - union { - char *ptr; - X509 *x509; - X509_CRL *crl; - EVP_PKEY *pkey; - } data; + /* one of the above types */ + X509_LOOKUP_TYPE type; + union { + char *ptr; + X509 *x509; + X509_CRL *crl; + EVP_PKEY *pkey; + } data; } X509_OBJECT; -typedef struct ssl_kafka_logger_s { - int enable; +static tfe_kafka_logger_t *g_kafka_logger = NULL; - char tfe_ip[TFE_SYMBOL_MAX]; - char topic_name[TFE_STRING_MAX]; - char broker_list[TFE_STRING_MAX]; - - rd_kafka_t *handle; - rd_kafka_topic_t *topic; -} ssl_kafka_logger_t; - -static ssl_kafka_logger_t *g_kafka_logger = NULL; - -static unsigned int get_ip_by_eth(const char *eth) { - int sockfd = -1; - unsigned int ip; - struct ifreq ifr; - - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (-1 == sockfd) { - goto error; - } - - memset(&ifr, 0, sizeof(ifr)); - strcpy(ifr.ifr_name, eth); - if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) { - goto error; - } - - ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr; - - close(sockfd); - return ip; - -error: - if (sockfd > 0) - close(sockfd); - return INADDR_NONE; -} - -static rd_kafka_t *create_kafka_handle(const char *broker_list) { - char errstr[1024]; - rd_kafka_t *handle = NULL; - rd_kafka_conf_t *conf = NULL; - - conf = rd_kafka_conf_new(); - rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "security.protocol", "MG", errstr, sizeof(errstr)); - - // The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. - handle = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); - conf = NULL; - if (handle == NULL) { - return NULL; - } - - if (rd_kafka_brokers_add(handle, broker_list) == 0) { - rd_kafka_destroy(handle); - return NULL; - } - - return handle; -} - -void ssl_mid_cert_kafka_logger_destory(void) { - if (g_kafka_logger) { - if (g_kafka_logger->handle) { - free(g_kafka_logger->handle); - } - if (g_kafka_logger->topic) { - free(g_kafka_logger->topic); - } - free(g_kafka_logger); - } +void ssl_mid_cert_kafka_logger_destory(void) +{ + tfe_kafka_logger_destroy(g_kafka_logger); } int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section) { - unsigned int ip; - char eth[64] = {0}; - const char *errstr = "SSL mid cert cache occer error, "; + int enable = 0; + char nic_name[64] = {0}; + char broker_list[TFE_SYMBOL_MAX] = {0}; + char topic_name[TFE_SYMBOL_MAX] = {0}; + const char *errstr = "SSL mid cert cache occer error, "; - g_kafka_logger = ALLOC(ssl_kafka_logger_t, 1); - assert(g_kafka_logger); - - MESA_load_profile_int_def(profile, section, "mc_cache_enable", &(g_kafka_logger->enable), 0); - if (!g_kafka_logger->enable) { - return 0; - } - - MESA_load_profile_string_def(profile, section, "mc_cache_eth", eth, sizeof(eth), "eth0"); - ip = get_ip_by_eth(eth); - if (ip == INADDR_NONE) { - TFE_LOG_ERROR(g_default_logger, "%s, Fail to get ip by %s.", errstr, eth); - goto error; - } - inet_ntop(AF_INET, &ip, g_kafka_logger->tfe_ip, sizeof(g_kafka_logger->tfe_ip)); - - if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", g_kafka_logger->broker_list, sizeof(g_kafka_logger->broker_list), NULL) < 0) { + MESA_load_profile_int_def(profile, section, "mc_cache_enable", &enable, 0); + MESA_load_profile_string_def(profile, section, "mc_cache_eth", nic_name, sizeof(nic_name), "eth0"); + MESA_load_profile_string_def(profile, section, "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT"); + if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", broker_list, sizeof(broker_list), NULL) < 0) + { TFE_LOG_ERROR(g_default_logger, "%s, Fail to get mc_cache_broker_list in profile %s section %s.", errstr, profile, section); - goto error; + return -1; } - - g_kafka_logger->handle = create_kafka_handle(g_kafka_logger->broker_list); - if (g_kafka_logger->handle == NULL) { - TFE_LOG_ERROR(g_default_logger, "%s, Fail to create kafka handle with broker list: %s.", errstr, g_kafka_logger->broker_list); - goto error; - } - - MESA_load_profile_string_def(profile, section, "mc_cache_topic", g_kafka_logger->topic_name, sizeof(g_kafka_logger->topic_name), "PXY-EXCH-INTERMEDIA-CERT"); - g_kafka_logger->topic = rd_kafka_topic_new(g_kafka_logger->handle, g_kafka_logger->topic_name, NULL); - if (g_kafka_logger->topic == NULL) { - TFE_LOG_ERROR(g_default_logger, "%s, Fail to create kafka topic with broker list: %s.", errstr, g_kafka_logger->broker_list); - goto error; - } - - return 0; - -error: - ssl_mid_cert_kafka_logger_destory(); - return -1; + g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, g_default_logger); + if (g_kafka_logger) + return 0; + else + return -1; } -void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert) +static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert) { - if (g_kafka_logger == NULL || g_kafka_logger->enable == 0) + if (g_kafka_logger->enable == 0) { return; } @@ -164,11 +64,11 @@ void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, co cJSON_AddStringToObject(obj, "sni", sni); cJSON_AddStringToObject(obj, "fingerprint", fingerprint); cJSON_AddStringToObject(obj, "cert", cert); - cJSON_AddStringToObject(obj, "tfe_ip", g_kafka_logger->tfe_ip); + cJSON_AddStringToObject(obj, "tfe_ip", g_kafka_logger->local_ip_str); dup = cJSON_Duplicate(obj, 1); msg = cJSON_PrintUnformatted(dup); TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", g_kafka_logger->topic_name, msg); - rd_kafka_produce(g_kafka_logger->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, msg, strlen(msg), NULL, 0, NULL); + tfe_kafka_logger_send(g_kafka_logger, msg, strlen(msg)); free(msg); cJSON_Delete(dup); @@ -201,7 +101,7 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE * obj->data.x509 = (X509 *)cert; // not in trusted store - if (X509_OBJECT_retrieve_match(X509_STORE_get0_objects(trusted_store), obj) == NULL) + if (X509_OBJECT_retrieve_match(X509_STORE_get0_objects(trusted_store), obj) == NULL) { ret = 0; } @@ -218,9 +118,9 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE * pem = ssl_x509_to_pem(cert); TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s; cert:%s", - i, deep, ret, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"), - ((pem && g_kafka_logger->enable == 0x10) ? pem : " ...")); - + i, deep, ret, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"), + ((pem && g_kafka_logger->enable == 0x10) ? pem : " ...")); + if (!ret && fingerprint && pem) { ssl_mid_cert_kafka_logger_send(hostname, fingerprint, pem); } diff --git a/plugin/business/pangu-http/src/pangu_logger.cpp b/plugin/business/pangu-http/src/pangu_logger.cpp index f3051ba..86dd01f 100644 --- a/plugin/business/pangu-http/src/pangu_logger.cpp +++ b/plugin/business/pangu-http/src/pangu_logger.cpp @@ -1,19 +1,6 @@ #include -#include - -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include +#include #include #include "pangu_logger.h" @@ -25,27 +12,20 @@ struct json_spec }; struct pangu_logger { - char local_ip_str[TFE_SYMBOL_MAX]; int entry_id; unsigned int en_sendlog; unsigned int en_sendlog_meta; unsigned int en_sendlog_body; - unsigned int local_ip_nr; void* global_logger; - rd_kafka_t *kafka_handle; - rd_kafka_topic_t* kafka_topic; - pthread_mutex_t mutex; - char brokerlist[TFE_STRING_MAX]; - char topic_name[TFE_STRING_MAX]; - void* local_logger; unsigned long long send_cnt; unsigned long long random_drop; unsigned long long user_abort; char local_log_path[TFE_STRING_MAX]; + tfe_kafka_logger_t *kafka_logger; struct cache_evbase_instance * log_file_upload_instance; }; @@ -63,65 +43,12 @@ enum _log_action //Bigger action number is prior. __LG_ACTION_MAX }; -static unsigned int get_ip_by_eth_name(const char *ifname) -{ - int sockfd; - struct ifreq ifr; - unsigned int ip; - - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (-1 == sockfd) - { - goto error; - } - - strcpy(ifr.ifr_name,ifname); - if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) - { - goto error; - } - - ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr; - close(sockfd); - return ip; - -error: - close(sockfd); - return INADDR_NONE; -} - - - -static rd_kafka_t * create_kafka_handle(const char* brokerlist) -{ - char kafka_errstr[1024]; - rd_kafka_t *handle=NULL; - rd_kafka_conf_t *rdkafka_conf = NULL; - - rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr)); - - //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. - handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); - rdkafka_conf=NULL; - if (handle==NULL) - { - return NULL; - } - if (rd_kafka_brokers_add(handle, brokerlist) == 0) - { - rd_kafka_destroy(handle); - return NULL; - } - return handle; -} - struct pangu_logger* pangu_log_handle_create(const char* profile, const char* section, void* local_logger) { int ret=-1; char nic_name[64]={0}; + char brokerlist[TFE_STRING_MAX] = { 0 }; + char topic_name[TFE_STRING_MAX] = { 0 }; struct tango_cache_parameter *log_file_upload_para=NULL; struct pangu_logger* instance=ALLOC(struct pangu_logger,1); @@ -149,39 +76,27 @@ struct pangu_logger* pangu_log_handle_create(const char* profile, const char* s } MESA_load_profile_string_def(profile, section, "NIC_NAME",nic_name,sizeof(nic_name),"eth0"); - instance->local_ip_nr=get_ip_by_eth_name(nic_name); - if(instance->local_ip_nr==INADDR_NONE) - { - TFE_LOG_ERROR(local_logger, "%s get NIC_NAME: %s error.", __FUNCTION__, nic_name); - goto error_out; - } - - inet_ntop(AF_INET,&(instance->local_ip_nr),instance->local_ip_str,sizeof(instance->local_ip_str)); - MESA_load_profile_int_def(profile, section, "ENTRANCE_ID",&(instance->entry_id),0); - ret=MESA_load_profile_string_def(profile, section,"KAFKA_BROKERLIST", instance->brokerlist, sizeof(instance->brokerlist), NULL); + ret=MESA_load_profile_string_def(profile, section,"KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), NULL); if(ret<0) { TFE_LOG_ERROR(local_logger,"Pangu log init failed, no brokerlist in profile %s section %s.", profile, section); goto error_out; } + MESA_load_profile_string_def(profile, section,"KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG"); - instance->kafka_handle=create_kafka_handle(instance->brokerlist); - if(instance->kafka_handle==NULL) + TFE_LOG_INFO(local_logger, "Pangu kafka brokerlist : %s", brokerlist); + TFE_LOG_INFO(local_logger, "Pangu kafka topic : %s", topic_name); + + instance->kafka_logger = tfe_kafka_logger_create(instance->en_sendlog, nic_name, brokerlist, topic_name, local_logger); + if (instance->kafka_logger) { - TFE_LOG_ERROR(local_logger,"Pangu log init failed. Cannot create lafka handle with brokerlist: %s.", instance->brokerlist); + TFE_LOG_ERROR(local_logger,"Pangu log init failed, error to create kafka logger."); goto error_out; } - MESA_load_profile_string_def(profile, section,"KAFKA_TOPIC", instance->topic_name, sizeof(instance->topic_name), "POLICY-EVENT-LOG"); - - TFE_LOG_INFO(local_logger, "Pangu kafka brokerlist : %s", instance->brokerlist); - TFE_LOG_INFO(local_logger, "Pangu kafka topic : %s", instance->topic_name); - - instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL); log_file_upload_para=cache_evbase_parameter_new(profile, section, local_logger); instance->log_file_upload_instance=cache_evbase_instance_new(log_file_upload_para, local_logger); - pthread_mutex_init(&(instance->mutex), NULL); return instance; error_out: @@ -290,7 +205,7 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg) cJSON_AddNumberToObject(common_obj, "common_direction", 0); //0:域内->域外,1:域外->域内,描述的是CLIENT_IP信息 cJSON_AddNumberToObject(common_obj, "common_link_id", 0); cJSON_AddNumberToObject(common_obj, "common_stream_dir", 3); //1:c2s, 2:s2c, 3:double - cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->local_ip_str); + cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->kafka_logger->local_ip_str); cJSON_AddNumberToObject(common_obj, "common_entrance_id", handle->entry_id); cJSON_AddNumberToObject(common_obj, "common_device_id", 0); cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); @@ -398,8 +313,7 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg) TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, - log_payload, strlen(log_payload), NULL, 0, NULL); + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0)