优化kafka句柄创建和TOPIC注册
This commit is contained in:
@@ -13,6 +13,7 @@ enum kafka_topic_type
|
|||||||
{
|
{
|
||||||
TOPIC_LOGGER,
|
TOPIC_LOGGER,
|
||||||
TOPIC_BUCKET,
|
TOPIC_BUCKET,
|
||||||
|
TOPIC_MC_CACHE,
|
||||||
TOPIC_MAX
|
TOPIC_MAX
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -31,10 +32,8 @@ typedef struct tfe_kafka_logger_s
|
|||||||
rd_kafka_topic_t *kafka_topic[TOPIC_MAX];
|
rd_kafka_topic_t *kafka_topic[TOPIC_MAX];
|
||||||
} tfe_kafka_logger_t;
|
} tfe_kafka_logger_t;
|
||||||
|
|
||||||
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name,
|
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger);
|
||||||
const char *sasl_username, const char *sasl_passwd, void *local_logger);
|
int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger);
|
||||||
|
|
||||||
int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, void *local_logger);
|
|
||||||
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger);
|
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger);
|
||||||
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len);
|
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len);
|
||||||
|
|
||||||
|
|||||||
@@ -104,22 +104,24 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_
|
|||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, void *local_logger)
|
int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger)
|
||||||
{
|
{
|
||||||
strncpy(logger->topic_name[TOPIC_BUCKET], topic_name, sizeof(logger->topic_name[TOPIC_BUCKET])-1);
|
if(logger && logger->enable)
|
||||||
logger->kafka_topic[TOPIC_BUCKET] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
|
{
|
||||||
if (logger->kafka_topic[TOPIC_BUCKET] == NULL)
|
strncpy(logger->topic_name[topic_id], topic_name, sizeof(logger->topic_name[topic_id])-1);
|
||||||
|
logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
|
||||||
|
if (logger->kafka_topic[topic_id] == NULL)
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
|
TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
|
||||||
rd_kafka_destroy(logger->kafka_handle);
|
rd_kafka_destroy(logger->kafka_handle);
|
||||||
free(logger);
|
free(logger);
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return 1;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger)
|
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
|
||||||
{
|
{
|
||||||
char *override_sled_ip=NULL;
|
char *override_sled_ip=NULL;
|
||||||
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
|
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
|
||||||
@@ -155,17 +157,6 @@ create_kafka:
|
|||||||
free(logger);
|
free(logger);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
strncpy(logger->topic_name[TOPIC_LOGGER], topic_name, sizeof(logger->topic_name[TOPIC_LOGGER])-1);
|
|
||||||
logger->kafka_topic[TOPIC_LOGGER] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
|
|
||||||
if (logger->kafka_topic[TOPIC_LOGGER] == NULL)
|
|
||||||
{
|
|
||||||
TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name[TOPIC_LOGGER]);
|
|
||||||
rd_kafka_destroy(logger->kafka_handle);
|
|
||||||
free(logger);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return logger;
|
return logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -200,14 +200,21 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, logger_topic, sasl_username, sasl_passwd, logger);
|
kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, sasl_username, sasl_passwd, logger);
|
||||||
if (kafka_logger == NULL)
|
if (kafka_logger == NULL)
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
|
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
int ret = tfe_kafka_logger_topic_new(kafka_logger, bucket_topic, logger);
|
|
||||||
if(ret == 0)
|
int ret = tfe_kafka_logger_topic_new(kafka_logger, logger_topic, TOPIC_LOGGER, logger);
|
||||||
|
if(ret < 0)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tfe_kafka_logger_topic_new(kafka_logger, bucket_topic, TOPIC_BUCKET, logger);
|
||||||
|
if(ret < 0)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -99,12 +99,6 @@ key_log_file=log/sslkeylog.log
|
|||||||
|
|
||||||
# mid cert cache
|
# mid cert cache
|
||||||
mc_cache_enable=1
|
mc_cache_enable=1
|
||||||
mc_vsystem_id=1
|
|
||||||
mc_cache_eth=eth0
|
|
||||||
mc_cache_broker_list=192.168.40.224:9092
|
|
||||||
mc_cache_topic=PXY-EXCH-INTERMEDIA-CERT
|
|
||||||
sasl_username=admin
|
|
||||||
sasl_passwd=galaxy2019
|
|
||||||
|
|
||||||
[key_keeper]
|
[key_keeper]
|
||||||
#Mode: debug - generate cert with ca_path, normal - generate cert with cert store
|
#Mode: debug - generate cert with ca_path, normal - generate cert with cert store
|
||||||
@@ -192,6 +186,7 @@ NIC_NAME=enp2s0
|
|||||||
kafka_brokerlist=192.168.40.224:9092
|
kafka_brokerlist=192.168.40.224:9092
|
||||||
logger_send_topic=PROXY-EVENT
|
logger_send_topic=PROXY-EVENT
|
||||||
file_bucket_topic=TRAFFIC-FILE-STREAM-RECORD
|
file_bucket_topic=TRAFFIC-FILE-STREAM-RECORD
|
||||||
|
mc_cache_topic=PXY-EXCH-INTERMEDIA-CERT
|
||||||
sasl_username=admin
|
sasl_username=admin
|
||||||
sasl_passwd=galaxy2019
|
sasl_passwd=galaxy2019
|
||||||
device_id_filepath=/opt/tsg/etc/tsg_sn.json
|
device_id_filepath=/opt/tsg/etc/tsg_sn.json
|
||||||
|
|||||||
@@ -7,7 +7,6 @@
|
|||||||
|
|
||||||
// return 0 for success, return -1 for failed
|
// return 0 for success, return -1 for failed
|
||||||
int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section);
|
int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section);
|
||||||
void ssl_mid_cert_kafka_logger_destory(void);
|
|
||||||
void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) *cert_chain, X509_STORE *trusted_store, const char *hostname);
|
void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) *cert_chain, X509_STORE *trusted_store, const char *hostname);
|
||||||
|
|
||||||
#endif //TFE_SSL_FETCH_CERT_H
|
#endif //TFE_SSL_FETCH_CERT_H
|
||||||
@@ -7,6 +7,7 @@
|
|||||||
|
|
||||||
#include <ssl_utils.h>
|
#include <ssl_utils.h>
|
||||||
#include <tfe_kafka_logger.h>
|
#include <tfe_kafka_logger.h>
|
||||||
|
#include <tfe_resource.h>
|
||||||
#include <MESA/MESA_prof_load.h>
|
#include <MESA/MESA_prof_load.h>
|
||||||
|
|
||||||
typedef struct x509_object_st
|
typedef struct x509_object_st
|
||||||
@@ -35,55 +36,45 @@ static char cert_type_desc[MAX_TYPE][64] = {
|
|||||||
{"Root certificate"},
|
{"Root certificate"},
|
||||||
};
|
};
|
||||||
|
|
||||||
static tfe_kafka_logger_t *g_kafka_logger = NULL;
|
struct ssl_mid_cert_ctx
|
||||||
|
|
||||||
void ssl_mid_cert_kafka_logger_destory(void)
|
|
||||||
{
|
{
|
||||||
tfe_kafka_logger_destroy(g_kafka_logger);
|
int enable;
|
||||||
}
|
tfe_kafka_logger_t *g_kafka_logger;
|
||||||
|
};
|
||||||
|
struct ssl_mid_cert_ctx mid_cert_ctx;
|
||||||
|
|
||||||
int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
|
int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
|
||||||
{
|
{
|
||||||
int enable = 0, vsystem_id = 0;
|
|
||||||
char nic_name[TFE_SYMBOL_MAX] = {0};
|
|
||||||
char broker_list[TFE_SYMBOL_MAX] = {0};
|
|
||||||
char topic_name[TFE_SYMBOL_MAX] = {0};
|
char topic_name[TFE_SYMBOL_MAX] = {0};
|
||||||
char sasl_username[TFE_STRING_MAX] = {0};
|
|
||||||
char sasl_passwd[TFE_STRING_MAX] = {0};
|
|
||||||
|
|
||||||
MESA_load_profile_int_def(profile, section, "mc_cache_enable", &enable, 0);
|
MESA_load_profile_int_def(profile, section, "mc_cache_enable", &mid_cert_ctx.enable, 0);
|
||||||
MESA_load_profile_int_def(profile, section, "mc_vsystem_id", &vsystem_id, 1);
|
MESA_load_profile_string_def(profile, "tfe", "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT");
|
||||||
MESA_load_profile_string_def(profile, section, "mc_cache_eth", nic_name, sizeof(nic_name), "eth0");
|
|
||||||
MESA_load_profile_string_def(profile, section, "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT");
|
|
||||||
MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
|
|
||||||
MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
|
|
||||||
|
|
||||||
if (!enable)
|
if(mid_cert_ctx.enable == 0)
|
||||||
goto skip;
|
|
||||||
if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", broker_list, sizeof(broker_list), NULL) < 0)
|
|
||||||
{
|
{
|
||||||
TFE_LOG_ERROR(g_default_logger, "Fail to get mc_cache_broker_list in profile %s section %s.", profile, section);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
skip:
|
|
||||||
g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, sasl_username, sasl_passwd, g_default_logger);
|
|
||||||
if (g_kafka_logger)
|
|
||||||
{
|
|
||||||
g_kafka_logger->t_vsys_id=vsystem_id;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
|
mid_cert_ctx.g_kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER);
|
||||||
|
if(!mid_cert_ctx.g_kafka_logger)
|
||||||
{
|
{
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
int ret = tfe_kafka_logger_topic_new(mid_cert_ctx.g_kafka_logger, topic_name, TOPIC_MC_CACHE, g_default_logger);
|
||||||
|
if(ret < 0)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert)
|
static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert)
|
||||||
{
|
{
|
||||||
if (g_kafka_logger->enable == 0)
|
if (mid_cert_ctx.g_kafka_logger->enable == 0)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *obj = NULL;
|
cJSON *obj = NULL;
|
||||||
cJSON *dup = NULL;
|
cJSON *dup = NULL;
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
@@ -91,13 +82,13 @@ static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerpr
|
|||||||
obj = cJSON_CreateObject();
|
obj = cJSON_CreateObject();
|
||||||
cJSON_AddStringToObject(obj, "sni", sni);
|
cJSON_AddStringToObject(obj, "sni", sni);
|
||||||
cJSON_AddStringToObject(obj, "fingerprint", fingerprint);
|
cJSON_AddStringToObject(obj, "fingerprint", fingerprint);
|
||||||
cJSON_AddNumberToObject(obj, "vsys_id", g_kafka_logger->t_vsys_id);
|
cJSON_AddNumberToObject(obj, "vsys_id", mid_cert_ctx.g_kafka_logger->t_vsys_id);
|
||||||
cJSON_AddStringToObject(obj, "cert", cert);
|
cJSON_AddStringToObject(obj, "cert", cert);
|
||||||
cJSON_AddStringToObject(obj, "tfe_ip", g_kafka_logger->local_ip_str);
|
cJSON_AddStringToObject(obj, "tfe_ip", mid_cert_ctx.g_kafka_logger->local_ip_str);
|
||||||
dup = cJSON_Duplicate(obj, 1);
|
dup = cJSON_Duplicate(obj, 1);
|
||||||
msg = cJSON_PrintUnformatted(dup);
|
msg = cJSON_PrintUnformatted(dup);
|
||||||
TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", g_kafka_logger->topic_name[TOPIC_LOGGER], msg);
|
TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", mid_cert_ctx.g_kafka_logger->topic_name[TOPIC_MC_CACHE], msg);
|
||||||
tfe_kafka_logger_send(g_kafka_logger, TOPIC_LOGGER, msg, strlen(msg));
|
tfe_kafka_logger_send(mid_cert_ctx.g_kafka_logger, TOPIC_MC_CACHE, msg, strlen(msg));
|
||||||
|
|
||||||
free(msg);
|
free(msg);
|
||||||
cJSON_Delete(dup);
|
cJSON_Delete(dup);
|
||||||
@@ -145,7 +136,7 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE *
|
|||||||
char *fingerprint = NULL;
|
char *fingerprint = NULL;
|
||||||
X509 *cert = NULL;
|
X509 *cert = NULL;
|
||||||
X509_OBJECT *obj = NULL;
|
X509_OBJECT *obj = NULL;
|
||||||
if (!g_kafka_logger || !g_kafka_logger->enable)
|
if (!mid_cert_ctx.g_kafka_logger || !mid_cert_ctx.enable)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -206,7 +197,7 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE *
|
|||||||
end:
|
end:
|
||||||
TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] is %s, in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s; cert:%s",
|
TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] is %s, in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s; cert:%s",
|
||||||
i, deep, cert_type_desc[type], in_store, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"),
|
i, deep, cert_type_desc[type], in_store, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"),
|
||||||
((pem && g_kafka_logger->enable == 0x10) ? pem : " ..."));
|
((pem && mid_cert_ctx.g_kafka_logger->enable == 0x10) ? pem : " ..."));
|
||||||
if (pem)
|
if (pem)
|
||||||
{
|
{
|
||||||
free(pem);
|
free(pem);
|
||||||
|
|||||||
Reference in New Issue
Block a user