TSG-7471 Proxy连接kafka时增加认证信息

This commit is contained in:
fengweihao
2021-08-19 16:24:19 +08:00
parent 2a729a48b3
commit c41a67ca2b
5 changed files with 46 additions and 6 deletions

View File

@@ -23,7 +23,8 @@ extern "C"
rd_kafka_topic_t *kafka_topic; rd_kafka_topic_t *kafka_topic;
} tfe_kafka_logger_t; } tfe_kafka_logger_t;
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger); tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name,
const char *sasl_username, const char *sasl_passwd, void *local_logger);
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger); void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger);
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len); int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len);

View File

@@ -34,7 +34,7 @@ error:
return INADDR_NONE; return INADDR_NONE;
} }
static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logger) static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
{ {
int ret; int ret;
char kafka_errstr[1024] = {0}; char kafka_errstr[1024] = {0};
@@ -65,6 +65,26 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logge
return NULL; return NULL;
} }
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.username\", %s.", kafka_errstr);
rd_kafka_conf_destroy(rconf);
return NULL;
}
ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.password\", %s.", kafka_errstr);
rd_kafka_conf_destroy(rconf);
return NULL;
}
}
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr)); handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr));
rconf = NULL; rconf = NULL;
@@ -84,7 +104,7 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logge
return handle; return handle;
} }
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger) tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger)
{ {
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t)); tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
if (!logger) if (!logger)
@@ -104,7 +124,7 @@ tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, co
inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str)); inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str));
strncpy(logger->broker_list, brokerlist, strlen(brokerlist)); strncpy(logger->broker_list, brokerlist, strlen(brokerlist));
logger->kafka_handle = create_kafka_handle(logger->broker_list, local_logger); logger->kafka_handle = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, local_logger);
if (logger->kafka_handle == NULL) if (logger->kafka_handle == NULL)
{ {
TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list); TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list);

View File

@@ -159,12 +159,16 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
char nic_name[64] = {0}; char nic_name[64] = {0};
char brokerlist[TFE_STRING_MAX] = {0}; char brokerlist[TFE_STRING_MAX] = {0};
char topic_name[TFE_STRING_MAX] = {0}; char topic_name[TFE_STRING_MAX] = {0};
char sasl_username[TFE_STRING_MAX] = {0};
char sasl_passwd[TFE_STRING_MAX] = {0};
tfe_kafka_logger_t *kafka_logger = NULL; tfe_kafka_logger_t *kafka_logger = NULL;
MESA_load_profile_int_def(profile, section, "enable", &enable, 1); MESA_load_profile_int_def(profile, section, "enable", &enable, 1);
MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0"); MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), ""); MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), "");
MESA_load_profile_string_def(profile, section, "KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG"); MESA_load_profile_string_def(profile, section, "KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG");
MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
if (!strlen(brokerlist)) if (!strlen(brokerlist))
{ {
@@ -172,7 +176,7 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
return NULL; return NULL;
} }
kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, logger); kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, sasl_username, sasl_passwd, logger);
if (kafka_logger == NULL) if (kafka_logger == NULL)
{ {
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger."); TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
@@ -183,6 +187,12 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
TFE_LOG_INFO(logger, "tfe kafka topic : %s", topic_name); TFE_LOG_INFO(logger, "tfe kafka topic : %s", topic_name);
TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist); TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist);
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
TFE_LOG_INFO(logger, "tfe kafka sasl_username : %s", sasl_username);
TFE_LOG_INFO(logger, "tfe kafka sasl_passwd : %s", sasl_passwd);
}
return kafka_logger; return kafka_logger;
} }

View File

@@ -90,6 +90,8 @@ mc_cache_enable=1
mc_cache_eth=eth0 mc_cache_eth=eth0
mc_cache_broker_list=192.168.40.224:9092 mc_cache_broker_list=192.168.40.224:9092
mc_cache_topic=PXY-EXCH-INTERMEDIA-CERT mc_cache_topic=PXY-EXCH-INTERMEDIA-CERT
sasl_username=admin
sasl_passwd=galaxy2019
[key_keeper] [key_keeper]
#Mode: debug - generate cert with ca_path, normal - generate cert with cert store #Mode: debug - generate cert with ca_path, normal - generate cert with cert store
@@ -159,6 +161,8 @@ enable=1
NIC_NAME=enp2s0 NIC_NAME=enp2s0
kafka_brokerlist=192.168.40.224:9092 kafka_brokerlist=192.168.40.224:9092
kafka_topic=PROXY-EVENT-LOG kafka_topic=PROXY-EVENT-LOG
sasl_username=admin
sasl_passwd=galaxy2019
device_id_filepath=/opt/tsg/etc/tsg_sn.json device_id_filepath=/opt/tsg/etc/tsg_sn.json
[maat] [maat]

View File

@@ -48,10 +48,15 @@ int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
char nic_name[64] = {0}; char nic_name[64] = {0};
char broker_list[TFE_SYMBOL_MAX] = {0}; char broker_list[TFE_SYMBOL_MAX] = {0};
char topic_name[TFE_SYMBOL_MAX] = {0}; char topic_name[TFE_SYMBOL_MAX] = {0};
char sasl_username[TFE_STRING_MAX] = {0};
char sasl_passwd[TFE_STRING_MAX] = {0};
MESA_load_profile_int_def(profile, section, "mc_cache_enable", &enable, 0); MESA_load_profile_int_def(profile, section, "mc_cache_enable", &enable, 0);
MESA_load_profile_string_def(profile, section, "mc_cache_eth", nic_name, sizeof(nic_name), "eth0"); MESA_load_profile_string_def(profile, section, "mc_cache_eth", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT"); MESA_load_profile_string_def(profile, section, "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT");
MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
if (!enable) if (!enable)
goto skip; goto skip;
if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", broker_list, sizeof(broker_list), NULL) < 0) if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", broker_list, sizeof(broker_list), NULL) < 0)
@@ -60,7 +65,7 @@ int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
return -1; return -1;
} }
skip: skip:
g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, g_default_logger); g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, sasl_username, sasl_passwd, g_default_logger);
if (g_kafka_logger) if (g_kafka_logger)
return 0; return 0;
else else