TSG-22348 feature: adapt maat support UUID

This commit is contained in:
luwenpeng
2024-09-23 16:50:09 +08:00
parent 7ef8e44bca
commit 5799de5299
60 changed files with 2504 additions and 1043 deletions

View File

@@ -11,6 +11,7 @@
struct config
{
int enable_debug;
char brokerlist[MAX_SYMBOL_LEN];
char sasl_username[MAX_SYMBOL_LEN];
char sasl_passwd[MAX_SYMBOL_LEN];
@@ -84,6 +85,14 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro
LOG_ERROR("%s: failed to set kafka client.id, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
// bootstrap.servers
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokerlist, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
LOG_ERROR("%s: failed to set kafka bootstrap.servers, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
@@ -125,12 +134,6 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro
goto error_out;
}
if (rd_kafka_brokers_add(pppt->producer, brokerlist) == 0)
{
LOG_ERROR("%s: failed to add kafka brokers", LOG_TAG_KAFKA);
goto error_out;
}
pppt->topic = rd_kafka_topic_new(pppt->producer, topic_name, NULL);
if (pppt->topic == NULL)
{
@@ -163,6 +166,7 @@ struct kafka *kafka_create(const char *profile)
return NULL;
}
MESA_load_profile_int_def(profile, "kafka", "enable_debug", &handle->cfg.enable_debug, 0);
MESA_load_profile_string_def(profile, "kafka", "brokerlist", handle->cfg.brokerlist, sizeof(handle->cfg.brokerlist), "");
MESA_load_profile_string_def(profile, "kafka", "sasl_username", handle->cfg.sasl_username, sizeof(handle->cfg.sasl_username), "");
MESA_load_profile_string_def(profile, "kafka", "sasl_passwd", handle->cfg.sasl_passwd, sizeof(handle->cfg.sasl_passwd), "");
@@ -237,7 +241,10 @@ int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int l
}
else
{
LOG_DEBUG("%s: success to produce message with topic [%d], %s", LOG_TAG_KAFKA, idx, data);
if (handle->cfg.enable_debug)
{
LOG_DEBUG("%s: success to produce message with topic [%d], %s", LOG_TAG_KAFKA, idx, data);
}
return 0;
}
}