OMPUB-965: 功能端支持对发送kafka的日志内容进行压缩
This commit is contained in:
@@ -2041,6 +2041,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
|||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL);
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL);
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), "");
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), "");
|
||||||
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "COMPRESSION_TYPE", _instance->compression, sizeof(_instance->compression), ""); //snappy
|
||||||
|
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "SEND_QUEUE_MAX_MESSAGE", _instance->send_queue_max_msg, sizeof(_instance->send_queue_max_msg), "1000000");
|
||||||
MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
|
MESA_load_profile_string_def(conffile, "TSG_LOG", "REFRESH_INTERVAL_MS", _instance->refresh_interval_ms, sizeof(_instance->refresh_interval_ms), "600000");
|
||||||
@@ -2089,6 +2090,11 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
|
|||||||
rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
|
rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr));
|
||||||
rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr));
|
rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
|
||||||
|
if(strlen(_instance->compression)>0)
|
||||||
|
{
|
||||||
|
rd_kafka_conf_set(rdkafka_conf, "compression.codec", _instance->compression, kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
}
|
||||||
|
|
||||||
if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0)
|
if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0)
|
||||||
{
|
{
|
||||||
rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
|
rd_kafka_conf_set(rdkafka_conf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
|
||||||
|
|||||||
@@ -202,6 +202,7 @@ struct tsg_log_instance_t
|
|||||||
char udp_label[MAX_STRING_LEN32];
|
char udp_label[MAX_STRING_LEN32];
|
||||||
char sasl_username[MAX_STRING_LEN32];
|
char sasl_username[MAX_STRING_LEN32];
|
||||||
char sasl_passwd[MAX_STRING_LEN32];
|
char sasl_passwd[MAX_STRING_LEN32];
|
||||||
|
char compression[MAX_STRING_LEN32];
|
||||||
char send_queue_max_msg[MAX_STRING_LEN32];
|
char send_queue_max_msg[MAX_STRING_LEN32];
|
||||||
char require_ack[MAX_STRING_LEN32];
|
char require_ack[MAX_STRING_LEN32];
|
||||||
char refresh_interval_ms[MAX_STRING_LEN32];
|
char refresh_interval_ms[MAX_STRING_LEN32];
|
||||||
|
|||||||
Reference in New Issue
Block a user