feature: TSG-21852 service_chaining_rule_hits support fieldstat4

This commit is contained in:
luwenpeng
2024-07-18 15:37:20 +08:00
parent cc5a537940
commit 9e63902c0d
15 changed files with 337 additions and 174 deletions

View File

@@ -9,9 +9,6 @@
#define MAX_SYMBOL_LEN 128
#define KAFKA_LOG_ERROR(fmt, ...) LOG_ERROR("%s: " fmt, LOG_TAG_KAFKA, ##__VA_ARGS__)
#define KAFKA_LOG_DEBUG(fmt, ...) LOG_DEBUG("%s: " fmt, LOG_TAG_KAFKA, ##__VA_ARGS__)
struct config
{
char brokerlist[MAX_SYMBOL_LEN];
@@ -69,44 +66,44 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf)
{
KAFKA_LOG_ERROR("failed to create kafka conf");
LOG_ERROR("%s: failed to create kafka conf", LOG_TAG_KAFKA);
goto error_out;
}
if (rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka queue.buffering.max.messages, %s", err_str);
LOG_ERROR("%s: failed to set kafka queue.buffering.max.messages, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka topic.metadata.refresh.interval.ms, %s", err_str);
LOG_ERROR("%s: failed to set kafka topic.metadata.refresh.interval.ms, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (rd_kafka_conf_set(conf, "client.id", topic_name, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka client.id, %s", err_str);
LOG_ERROR("%s: failed to set kafka client.id, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka security.protocol, %s", err_str);
LOG_ERROR("%s: failed to set kafka security.protocol, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka sasl.mechanisms, %s", err_str);
LOG_ERROR("%s: failed to set kafka sasl.mechanisms, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (rd_kafka_conf_set(conf, "sasl.username", sasl_username, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka sasl.username, %s", err_str);
LOG_ERROR("%s: failed to set kafka sasl.username, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (rd_kafka_conf_set(conf, "sasl.password", sasl_passwd, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka sasl.password, %s", err_str);
LOG_ERROR("%s: failed to set kafka sasl.password, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
}
@@ -114,7 +111,7 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro
{
if (rd_kafka_conf_set(conf, "security.protocol", "plaintext", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK)
{
KAFKA_LOG_ERROR("failed to set kafka security.protocol, %s", err_str);
LOG_ERROR("%s: failed to set kafka security.protocol, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
}
@@ -124,20 +121,20 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro
conf = NULL;
if (pppt->producer == NULL)
{
KAFKA_LOG_ERROR("failed to create kafka producer, %s", err_str);
LOG_ERROR("%s: failed to create kafka producer, %s", LOG_TAG_KAFKA, err_str);
goto error_out;
}
if (rd_kafka_brokers_add(pppt->producer, brokerlist) == 0)
{
KAFKA_LOG_ERROR("failed to add kafka brokers");
LOG_ERROR("%s: failed to add kafka brokers", LOG_TAG_KAFKA);
goto error_out;
}
pppt->topic = rd_kafka_topic_new(pppt->producer, topic_name, NULL);
if (pppt->topic == NULL)
{
KAFKA_LOG_ERROR("failed to create kafka topic: %s", topic_name);
LOG_ERROR("%s: failed to create kafka topic: %s", LOG_TAG_KAFKA, topic_name);
goto error_out;
}
@@ -173,7 +170,7 @@ struct kafka *kafka_create(const char *profile)
if (strlen(handle->cfg.brokerlist) == 0)
{
KAFKA_LOG_ERROR("brokerlist is empty");
LOG_ERROR("%s: brokerlist is empty", LOG_TAG_KAFKA);
goto error_out;
}
@@ -181,7 +178,7 @@ struct kafka *kafka_create(const char *profile)
{
if (strlen(handle->cfg.topic_name[i]) == 0)
{
KAFKA_LOG_ERROR("topic_name[%d] is empty", i);
LOG_ERROR("%s: topic_name[%d] is empty", LOG_TAG_KAFKA, i);
goto error_out;
}
}
@@ -217,11 +214,11 @@ void kafka_destroy(struct kafka *handle)
}
}
int kafka_send(struct kafka *handle, enum topic_idx idx, const void *data, int len)
int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len)
{
if (idx < 0 || idx >= MAX_TOPIC_NUM)
{
KAFKA_LOG_ERROR("invalid topic index: %d", idx);
LOG_ERROR("%s: invalid topic index: %d", LOG_TAG_KAFKA, idx);
return -1;
}
@@ -229,17 +226,18 @@ int kafka_send(struct kafka *handle, enum topic_idx idx, const void *data, int l
{
if (rd_kafka_produce(handle->pppt[idx]->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL) == -1)
{
KAFKA_LOG_ERROR("failed to produce message with topic [%d], %s", idx, rd_kafka_err2str(rd_kafka_last_error()));
LOG_ERROR("%s: failed to produce message with topic [%d], %s", LOG_TAG_KAFKA, idx, rd_kafka_err2str(rd_kafka_last_error()));
return -1;
}
else
{
LOG_DEBUG("%s: success to produce message with topic [%d], %s", LOG_TAG_KAFKA, idx, data);
return 0;
}
}
else
{
KAFKA_LOG_ERROR("topic %d not initialized", idx);
LOG_ERROR("%s: topic %d not initialized", LOG_TAG_KAFKA, idx);
return -1;
}
}