TSG-935 重构 kafka log 接口
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
add_library(common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp)
|
||||
add_library(common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp)
|
||||
target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
|
||||
target_link_libraries(common PUBLIC libevent-static libevent-static-openssl libevent-static-pthreads)
|
||||
target_link_libraries(common PUBLIC MESA_handle_logger)
|
||||
|
||||
34
common/include/tfe_kafka_logger.h
Normal file
34
common/include/tfe_kafka_logger.h
Normal file
@@ -0,0 +1,34 @@
|
||||
#ifndef _TFE_KAFKA_LOGGER_H
|
||||
#define _TFE_KAFKA_LOGGER_H
|
||||
|
||||
#ifdef __cpluscplus
|
||||
extern "C"
|
||||
{
|
||||
#endif
|
||||
|
||||
#include <tfe_utils.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
typedef struct tfe_kafka_logger_s
|
||||
{
|
||||
int enable;
|
||||
|
||||
unsigned int local_ip_num;
|
||||
char local_ip_str[TFE_SYMBOL_MAX];
|
||||
|
||||
char topic_name[TFE_STRING_MAX];
|
||||
char broker_list[TFE_STRING_MAX];
|
||||
|
||||
rd_kafka_t *kafka_handle;
|
||||
rd_kafka_topic_t *kafka_topic;
|
||||
} tfe_kafka_logger_t;
|
||||
|
||||
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger);
|
||||
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger);
|
||||
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len);
|
||||
|
||||
#ifdef __cpluscplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
149
common/src/tfe_kafka_logger.cpp
Normal file
149
common/src/tfe_kafka_logger.cpp
Normal file
@@ -0,0 +1,149 @@
|
||||
#include <sys/ioctl.h>
|
||||
#include <unistd.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <net/if.h>
|
||||
|
||||
#include <tfe_kafka_logger.h>
|
||||
|
||||
// return INADDR_NONE if error occur
|
||||
static unsigned int get_ip_by_eth_name(const char *ifname)
|
||||
{
|
||||
int sockfd = -1;
|
||||
struct ifreq ifr;
|
||||
unsigned int ip;
|
||||
|
||||
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (-1 == sockfd)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
strcpy(ifr.ifr_name, ifname);
|
||||
if (ioctl(sockfd, SIOCGIFADDR, &ifr) == -1)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
close(sockfd);
|
||||
|
||||
ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr;
|
||||
return ip;
|
||||
|
||||
error:
|
||||
if (sockfd > 0)
|
||||
close(sockfd);
|
||||
return INADDR_NONE;
|
||||
}
|
||||
|
||||
static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logger)
|
||||
{
|
||||
int ret;
|
||||
char kafka_errstr[1024] = {0};
|
||||
rd_kafka_t *handle = NULL;
|
||||
rd_kafka_conf_t *rconf = NULL;
|
||||
|
||||
rconf = rd_kafka_conf_new();
|
||||
|
||||
ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
|
||||
rd_kafka_conf_destroy(rconf);
|
||||
return NULL;
|
||||
}
|
||||
ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
|
||||
rd_kafka_conf_destroy(rconf);
|
||||
return NULL;
|
||||
}
|
||||
ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to set kafka \"security.protocol\", %s.", kafka_errstr);
|
||||
rd_kafka_conf_destroy(rconf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
|
||||
handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr));
|
||||
rconf = NULL;
|
||||
if (handle == NULL)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to new kafka, %s.", kafka_errstr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rd_kafka_brokers_add(handle, brokerlist) == 0)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to add kakfa bokers.");
|
||||
rd_kafka_destroy(handle);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger)
|
||||
{
|
||||
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
|
||||
if (!logger)
|
||||
return NULL;
|
||||
|
||||
logger->enable = enable;
|
||||
if (!logger->enable)
|
||||
return logger;
|
||||
|
||||
logger->local_ip_num = get_ip_by_eth_name(nic_name);
|
||||
if (logger->local_ip_num == INADDR_NONE)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to get NIC_NAME: %s.", nic_name);
|
||||
free(logger);
|
||||
return NULL;
|
||||
}
|
||||
inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str));
|
||||
|
||||
strncpy(logger->broker_list, brokerlist, strlen(brokerlist));
|
||||
logger->kafka_handle = create_kafka_handle(logger->broker_list, local_logger);
|
||||
if (logger->kafka_handle == NULL)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list);
|
||||
free(logger);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
strncpy(logger->topic_name, topic_name, strlen(topic_name));
|
||||
logger->kafka_topic = rd_kafka_topic_new(logger->kafka_handle, logger->topic_name, NULL);
|
||||
if (logger->kafka_topic == NULL)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name);
|
||||
rd_kafka_destroy(logger->kafka_handle);
|
||||
free(logger);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return logger;
|
||||
}
|
||||
|
||||
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger)
|
||||
{
|
||||
if (logger)
|
||||
{
|
||||
if (logger->kafka_handle)
|
||||
rd_kafka_destroy(logger->kafka_handle);
|
||||
|
||||
if (logger->kafka_topic)
|
||||
rd_kafka_topic_destroy(logger->kafka_topic);
|
||||
|
||||
free(logger);
|
||||
logger = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len)
|
||||
{
|
||||
if (logger && logger->enable)
|
||||
return rd_kafka_produce(logger->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL);
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
@@ -2,17 +2,11 @@
|
||||
// Created by lwp on 2019/10/16.
|
||||
//
|
||||
|
||||
#include "ssl_utils.h"
|
||||
#include "tfe_utils.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <unistd.h>
|
||||
#include <net/if.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include <cjson/cJSON.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
#include <ssl_utils.h>
|
||||
#include <tfe_kafka_logger.h>
|
||||
#include <MESA/MESA_prof_load.h>
|
||||
|
||||
typedef struct x509_object_st {
|
||||
@@ -26,133 +20,39 @@ typedef struct x509_object_st {
|
||||
} data;
|
||||
} X509_OBJECT;
|
||||
|
||||
typedef struct ssl_kafka_logger_s {
|
||||
int enable;
|
||||
static tfe_kafka_logger_t *g_kafka_logger = NULL;
|
||||
|
||||
char tfe_ip[TFE_SYMBOL_MAX];
|
||||
char topic_name[TFE_STRING_MAX];
|
||||
char broker_list[TFE_STRING_MAX];
|
||||
|
||||
rd_kafka_t *handle;
|
||||
rd_kafka_topic_t *topic;
|
||||
} ssl_kafka_logger_t;
|
||||
|
||||
static ssl_kafka_logger_t *g_kafka_logger = NULL;
|
||||
|
||||
static unsigned int get_ip_by_eth(const char *eth) {
|
||||
int sockfd = -1;
|
||||
unsigned int ip;
|
||||
struct ifreq ifr;
|
||||
|
||||
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (-1 == sockfd) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
memset(&ifr, 0, sizeof(ifr));
|
||||
strcpy(ifr.ifr_name, eth);
|
||||
if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr;
|
||||
|
||||
close(sockfd);
|
||||
return ip;
|
||||
|
||||
error:
|
||||
if (sockfd > 0)
|
||||
close(sockfd);
|
||||
return INADDR_NONE;
|
||||
}
|
||||
|
||||
static rd_kafka_t *create_kafka_handle(const char *broker_list) {
|
||||
char errstr[1024];
|
||||
rd_kafka_t *handle = NULL;
|
||||
rd_kafka_conf_t *conf = NULL;
|
||||
|
||||
conf = rd_kafka_conf_new();
|
||||
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
|
||||
rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr));
|
||||
rd_kafka_conf_set(conf, "security.protocol", "MG", errstr, sizeof(errstr));
|
||||
|
||||
// The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
|
||||
handle = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
|
||||
conf = NULL;
|
||||
if (handle == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rd_kafka_brokers_add(handle, broker_list) == 0) {
|
||||
rd_kafka_destroy(handle);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
void ssl_mid_cert_kafka_logger_destory(void) {
|
||||
if (g_kafka_logger) {
|
||||
if (g_kafka_logger->handle) {
|
||||
free(g_kafka_logger->handle);
|
||||
}
|
||||
if (g_kafka_logger->topic) {
|
||||
free(g_kafka_logger->topic);
|
||||
}
|
||||
free(g_kafka_logger);
|
||||
}
|
||||
void ssl_mid_cert_kafka_logger_destory(void)
|
||||
{
|
||||
tfe_kafka_logger_destroy(g_kafka_logger);
|
||||
}
|
||||
|
||||
int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
|
||||
{
|
||||
unsigned int ip;
|
||||
char eth[64] = {0};
|
||||
int enable = 0;
|
||||
char nic_name[64] = {0};
|
||||
char broker_list[TFE_SYMBOL_MAX] = {0};
|
||||
char topic_name[TFE_SYMBOL_MAX] = {0};
|
||||
const char *errstr = "SSL mid cert cache occer error, ";
|
||||
|
||||
g_kafka_logger = ALLOC(ssl_kafka_logger_t, 1);
|
||||
assert(g_kafka_logger);
|
||||
|
||||
MESA_load_profile_int_def(profile, section, "mc_cache_enable", &(g_kafka_logger->enable), 0);
|
||||
if (!g_kafka_logger->enable) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
MESA_load_profile_string_def(profile, section, "mc_cache_eth", eth, sizeof(eth), "eth0");
|
||||
ip = get_ip_by_eth(eth);
|
||||
if (ip == INADDR_NONE) {
|
||||
TFE_LOG_ERROR(g_default_logger, "%s, Fail to get ip by %s.", errstr, eth);
|
||||
goto error;
|
||||
}
|
||||
inet_ntop(AF_INET, &ip, g_kafka_logger->tfe_ip, sizeof(g_kafka_logger->tfe_ip));
|
||||
|
||||
if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", g_kafka_logger->broker_list, sizeof(g_kafka_logger->broker_list), NULL) < 0) {
|
||||
MESA_load_profile_int_def(profile, section, "mc_cache_enable", &enable, 0);
|
||||
MESA_load_profile_string_def(profile, section, "mc_cache_eth", nic_name, sizeof(nic_name), "eth0");
|
||||
MESA_load_profile_string_def(profile, section, "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT");
|
||||
if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", broker_list, sizeof(broker_list), NULL) < 0)
|
||||
{
|
||||
TFE_LOG_ERROR(g_default_logger, "%s, Fail to get mc_cache_broker_list in profile %s section %s.", errstr, profile, section);
|
||||
goto error;
|
||||
return -1;
|
||||
}
|
||||
|
||||
g_kafka_logger->handle = create_kafka_handle(g_kafka_logger->broker_list);
|
||||
if (g_kafka_logger->handle == NULL) {
|
||||
TFE_LOG_ERROR(g_default_logger, "%s, Fail to create kafka handle with broker list: %s.", errstr, g_kafka_logger->broker_list);
|
||||
goto error;
|
||||
}
|
||||
|
||||
MESA_load_profile_string_def(profile, section, "mc_cache_topic", g_kafka_logger->topic_name, sizeof(g_kafka_logger->topic_name), "PXY-EXCH-INTERMEDIA-CERT");
|
||||
g_kafka_logger->topic = rd_kafka_topic_new(g_kafka_logger->handle, g_kafka_logger->topic_name, NULL);
|
||||
if (g_kafka_logger->topic == NULL) {
|
||||
TFE_LOG_ERROR(g_default_logger, "%s, Fail to create kafka topic with broker list: %s.", errstr, g_kafka_logger->broker_list);
|
||||
goto error;
|
||||
}
|
||||
|
||||
g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, g_default_logger);
|
||||
if (g_kafka_logger)
|
||||
return 0;
|
||||
|
||||
error:
|
||||
ssl_mid_cert_kafka_logger_destory();
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
|
||||
void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert)
|
||||
static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert)
|
||||
{
|
||||
if (g_kafka_logger == NULL || g_kafka_logger->enable == 0)
|
||||
if (g_kafka_logger->enable == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -164,11 +64,11 @@ void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, co
|
||||
cJSON_AddStringToObject(obj, "sni", sni);
|
||||
cJSON_AddStringToObject(obj, "fingerprint", fingerprint);
|
||||
cJSON_AddStringToObject(obj, "cert", cert);
|
||||
cJSON_AddStringToObject(obj, "tfe_ip", g_kafka_logger->tfe_ip);
|
||||
cJSON_AddStringToObject(obj, "tfe_ip", g_kafka_logger->local_ip_str);
|
||||
dup = cJSON_Duplicate(obj, 1);
|
||||
msg = cJSON_PrintUnformatted(dup);
|
||||
TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", g_kafka_logger->topic_name, msg);
|
||||
rd_kafka_produce(g_kafka_logger->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, msg, strlen(msg), NULL, 0, NULL);
|
||||
tfe_kafka_logger_send(g_kafka_logger, msg, strlen(msg));
|
||||
|
||||
free(msg);
|
||||
cJSON_Delete(dup);
|
||||
|
||||
@@ -1,19 +1,6 @@
|
||||
#include <cjson/cJSON.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
|
||||
#include <MESA/MESA_handle_logger.h>
|
||||
#include <MESA/MESA_prof_load.h>
|
||||
#include <assert.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <time.h>
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <net/if.h>
|
||||
#include <pthread.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <tfe_utils.h>
|
||||
#include <tfe_kafka_logger.h>
|
||||
#include <cache_evbase_client.h>
|
||||
|
||||
#include "pangu_logger.h"
|
||||
@@ -25,27 +12,20 @@ struct json_spec
|
||||
};
|
||||
struct pangu_logger
|
||||
{
|
||||
char local_ip_str[TFE_SYMBOL_MAX];
|
||||
int entry_id;
|
||||
|
||||
unsigned int en_sendlog;
|
||||
unsigned int en_sendlog_meta;
|
||||
unsigned int en_sendlog_body;
|
||||
|
||||
unsigned int local_ip_nr;
|
||||
void* global_logger;
|
||||
rd_kafka_t *kafka_handle;
|
||||
rd_kafka_topic_t* kafka_topic;
|
||||
pthread_mutex_t mutex;
|
||||
char brokerlist[TFE_STRING_MAX];
|
||||
char topic_name[TFE_STRING_MAX];
|
||||
|
||||
void* local_logger;
|
||||
|
||||
unsigned long long send_cnt;
|
||||
unsigned long long random_drop;
|
||||
unsigned long long user_abort;
|
||||
char local_log_path[TFE_STRING_MAX];
|
||||
tfe_kafka_logger_t *kafka_logger;
|
||||
struct cache_evbase_instance * log_file_upload_instance;
|
||||
};
|
||||
|
||||
@@ -63,65 +43,12 @@ enum _log_action //Bigger action number is prior.
|
||||
__LG_ACTION_MAX
|
||||
};
|
||||
|
||||
static unsigned int get_ip_by_eth_name(const char *ifname)
|
||||
{
|
||||
int sockfd;
|
||||
struct ifreq ifr;
|
||||
unsigned int ip;
|
||||
|
||||
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (-1 == sockfd)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
strcpy(ifr.ifr_name,ifname);
|
||||
if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0)
|
||||
{
|
||||
goto error;
|
||||
}
|
||||
|
||||
ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr;
|
||||
close(sockfd);
|
||||
return ip;
|
||||
|
||||
error:
|
||||
close(sockfd);
|
||||
return INADDR_NONE;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static rd_kafka_t * create_kafka_handle(const char* brokerlist)
|
||||
{
|
||||
char kafka_errstr[1024];
|
||||
rd_kafka_t *handle=NULL;
|
||||
rd_kafka_conf_t *rdkafka_conf = NULL;
|
||||
|
||||
rdkafka_conf = rd_kafka_conf_new();
|
||||
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
|
||||
rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
|
||||
rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr));
|
||||
|
||||
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
|
||||
handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
|
||||
rdkafka_conf=NULL;
|
||||
if (handle==NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
if (rd_kafka_brokers_add(handle, brokerlist) == 0)
|
||||
{
|
||||
rd_kafka_destroy(handle);
|
||||
return NULL;
|
||||
}
|
||||
return handle;
|
||||
}
|
||||
|
||||
struct pangu_logger* pangu_log_handle_create(const char* profile, const char* section, void* local_logger)
|
||||
{
|
||||
int ret=-1;
|
||||
char nic_name[64]={0};
|
||||
char brokerlist[TFE_STRING_MAX] = { 0 };
|
||||
char topic_name[TFE_STRING_MAX] = { 0 };
|
||||
struct tango_cache_parameter *log_file_upload_para=NULL;
|
||||
|
||||
struct pangu_logger* instance=ALLOC(struct pangu_logger,1);
|
||||
@@ -149,39 +76,27 @@ struct pangu_logger* pangu_log_handle_create(const char* profile, const char* s
|
||||
}
|
||||
|
||||
MESA_load_profile_string_def(profile, section, "NIC_NAME",nic_name,sizeof(nic_name),"eth0");
|
||||
instance->local_ip_nr=get_ip_by_eth_name(nic_name);
|
||||
if(instance->local_ip_nr==INADDR_NONE)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger, "%s get NIC_NAME: %s error.", __FUNCTION__, nic_name);
|
||||
goto error_out;
|
||||
}
|
||||
|
||||
inet_ntop(AF_INET,&(instance->local_ip_nr),instance->local_ip_str,sizeof(instance->local_ip_str));
|
||||
|
||||
MESA_load_profile_int_def(profile, section, "ENTRANCE_ID",&(instance->entry_id),0);
|
||||
ret=MESA_load_profile_string_def(profile, section,"KAFKA_BROKERLIST", instance->brokerlist, sizeof(instance->brokerlist), NULL);
|
||||
ret=MESA_load_profile_string_def(profile, section,"KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), NULL);
|
||||
if(ret<0)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger,"Pangu log init failed, no brokerlist in profile %s section %s.", profile, section);
|
||||
goto error_out;
|
||||
}
|
||||
MESA_load_profile_string_def(profile, section,"KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG");
|
||||
|
||||
instance->kafka_handle=create_kafka_handle(instance->brokerlist);
|
||||
if(instance->kafka_handle==NULL)
|
||||
TFE_LOG_INFO(local_logger, "Pangu kafka brokerlist : %s", brokerlist);
|
||||
TFE_LOG_INFO(local_logger, "Pangu kafka topic : %s", topic_name);
|
||||
|
||||
instance->kafka_logger = tfe_kafka_logger_create(instance->en_sendlog, nic_name, brokerlist, topic_name, local_logger);
|
||||
if (instance->kafka_logger)
|
||||
{
|
||||
TFE_LOG_ERROR(local_logger,"Pangu log init failed. Cannot create lafka handle with brokerlist: %s.", instance->brokerlist);
|
||||
TFE_LOG_ERROR(local_logger,"Pangu log init failed, error to create kafka logger.");
|
||||
goto error_out;
|
||||
}
|
||||
|
||||
MESA_load_profile_string_def(profile, section,"KAFKA_TOPIC", instance->topic_name, sizeof(instance->topic_name), "POLICY-EVENT-LOG");
|
||||
|
||||
TFE_LOG_INFO(local_logger, "Pangu kafka brokerlist : %s", instance->brokerlist);
|
||||
TFE_LOG_INFO(local_logger, "Pangu kafka topic : %s", instance->topic_name);
|
||||
|
||||
instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL);
|
||||
log_file_upload_para=cache_evbase_parameter_new(profile, section, local_logger);
|
||||
instance->log_file_upload_instance=cache_evbase_instance_new(log_file_upload_para, local_logger);
|
||||
pthread_mutex_init(&(instance->mutex), NULL);
|
||||
return instance;
|
||||
|
||||
error_out:
|
||||
@@ -290,7 +205,7 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg)
|
||||
cJSON_AddNumberToObject(common_obj, "common_direction", 0); //0:域内->域外,1:域外->域内,描述的是CLIENT_IP信息
|
||||
cJSON_AddNumberToObject(common_obj, "common_link_id", 0);
|
||||
cJSON_AddNumberToObject(common_obj, "common_stream_dir", 3); //1:c2s, 2:s2c, 3:double
|
||||
cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->local_ip_str);
|
||||
cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->kafka_logger->local_ip_str);
|
||||
cJSON_AddNumberToObject(common_obj, "common_entrance_id", handle->entry_id);
|
||||
cJSON_AddNumberToObject(common_obj, "common_device_id", 0);
|
||||
cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url);
|
||||
@@ -398,8 +313,7 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg)
|
||||
|
||||
TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload);
|
||||
|
||||
kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
|
||||
log_payload, strlen(log_payload), NULL, 0, NULL);
|
||||
kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload));
|
||||
free(log_payload);
|
||||
cJSON_Delete(per_hit_obj);
|
||||
if(kafka_status<0)
|
||||
|
||||
Reference in New Issue
Block a user