增加和tfe通信接口, 添加负载均衡功能等

This commit is contained in:
崔一鸣
2019-06-03 20:19:04 +08:00
parent 85aee8ba55
commit 1fa7a0673f
20 changed files with 1607 additions and 341 deletions

View File

@@ -0,0 +1,149 @@
#include "kni_utils.h"
#include "kni_send_logger.h"
#include "librdkafka/rdkafka.h"
struct kni_send_logger{
int sendlog_switch;
rd_kafka_t *kafka_handle;
rd_kafka_topic_t *kafka_topic;
void *local_logger;
};
static rd_kafka_t* kafka_init(const char *profile, void *logger){
rd_kafka_t *kafka_handle = NULL;
rd_kafka_conf_t *rdkafka_conf = NULL;
char kafka_errstr[1024];
const char *section = "kafka";
char queue_buffering_max_messages[KNI_SYMBOL_MAX] = "";
char topic_metadata_refresh_interval_ms[KNI_SYMBOL_MAX] = "";
char security_protocol[KNI_SYMBOL_MAX] = "";
int ret = MESA_load_profile_string_nodef(profile, section, "queue.buffering.max.messages",
queue_buffering_max_messages, sizeof(queue_buffering_max_messages));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: queue.buffering.max.messages not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "topic.metadata.refresh.interval.ms",
topic_metadata_refresh_interval_ms, sizeof(topic_metadata_refresh_interval_ms));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: topic.metadata.refresh.interval.ms not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "security.protocol", security_protocol, sizeof(security_protocol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: security.protocol not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n queue.buffering.max.messages: %s\n topic.metadata.refresh.interval.ms: %s\n"
"security.protocol: %s", "kafka", queue_buffering_max_messages, topic_metadata_refresh_interval_ms, security_protocol);
rdkafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", queue_buffering_max_messages, kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", topic_metadata_refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "security.protocol", security_protocol, kafka_errstr, sizeof(kafka_errstr));
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
rdkafka_conf = NULL;
if(kafka_handle == NULL){
goto error_out;
}
return kafka_handle;
error_out:
if(rdkafka_conf != NULL){
rd_kafka_conf_destroy(rdkafka_conf);
rdkafka_conf = NULL;
}
if(kafka_handle != NULL){
rd_kafka_destroy(kafka_handle);
kafka_handle = NULL;
}
return NULL;
}
void kni_send_logger_destroy(struct kni_send_logger *handle){
if(handle != NULL){
if(handle->kafka_topic != NULL){
rd_kafka_topic_destroy(handle->kafka_topic);
handle->kafka_topic = NULL;
}
if(handle->kafka_handle != NULL){
rd_kafka_destroy(handle->kafka_handle);
handle->kafka_handle = NULL;
}
FREE(&handle);
}
}
struct kni_send_logger* kni_send_logger_init(const char *profile, void *local_logger){
struct kni_send_logger *handle = NULL;
const char *section = "send_logger";
int sendlog_switch = -1;
char kafka_topic[KNI_SYMBOL_MAX] = "";
char kafka_brokerlist[KNI_SYMBOL_MAX] = "";
rd_kafka_t *kafka_handle = NULL;
rd_kafka_topic_t *topic = NULL;
int ret = MESA_load_profile_int_nodef(profile, section, "switch", &sendlog_switch);
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: switch not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "kafka_topic", kafka_topic, sizeof(kafka_topic));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_topic not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "kafka_brokerlist", kafka_brokerlist, sizeof(kafka_brokerlist));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_brokerlist not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n switch: %d\n kafka_topic: %s\n, kafka_brokerlist: %s",
section, sendlog_switch, kafka_topic, kafka_brokerlist);
handle = ALLOC(struct kni_send_logger, 1);
handle->local_logger = local_logger;
//sendlog_switch = 0, 不发送日志给kafka
if(sendlog_switch == 0){
handle->sendlog_switch = 0;
return handle;
}
handle->sendlog_switch = 1;
//init kafka
kafka_handle = kafka_init(profile, local_logger);
if(kafka_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kafka");
goto error_out;
}
handle->kafka_handle = kafka_handle;
//kafka_brokerlist
ret = rd_kafka_brokers_add(kafka_handle, kafka_brokerlist);
if(ret == 0){
KNI_LOG_ERROR(local_logger, "Failed at add kafka_brokers");
goto error_out;
}
//kafka topic
topic = rd_kafka_topic_new(kafka_handle, kafka_topic, NULL);
if(topic == NULL){
KNI_LOG_ERROR(local_logger, "Failed at new kafka topic");
goto error_out;
}
handle->kafka_topic = topic;
return handle;
error_out:
kni_send_logger_destroy(handle);
return NULL;
}
int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len){
void *logger = handle->local_logger;
//kafka produce
int kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
log_msg, log_msg_len, NULL, 0, NULL);
if(kafka_status < 0){
KNI_LOG_ERROR(logger, "Kafka: Failed to produce, error is %s", rd_kafka_err2name(rd_kafka_last_error()));
return -1;
}
return 0;
}