#include "kni_utils.h" #include "kni_send_logger.h" #include "librdkafka/rdkafka.h" struct kni_send_logger{ int sendlog_switch; rd_kafka_t *kafka_handle; rd_kafka_topic_t *kafka_topic; void *local_logger; }; static rd_kafka_t* kafka_init(const char *profile, void *logger){ rd_kafka_t *kafka_handle = NULL; rd_kafka_conf_t *rdkafka_conf = NULL; char kafka_errstr[1024]; const char *section = "kafka"; char queue_buffering_max_messages[KNI_SYMBOL_MAX] = ""; char topic_metadata_refresh_interval_ms[KNI_SYMBOL_MAX] = ""; char security_protocol[KNI_SYMBOL_MAX] = ""; int ret = MESA_load_profile_string_nodef(profile, section, "queue.buffering.max.messages", queue_buffering_max_messages, sizeof(queue_buffering_max_messages)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: queue.buffering.max.messages not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "topic.metadata.refresh.interval.ms", topic_metadata_refresh_interval_ms, sizeof(topic_metadata_refresh_interval_ms)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: topic.metadata.refresh.interval.ms not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "security.protocol", security_protocol, sizeof(security_protocol)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: security.protocol not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n queue.buffering.max.messages: %s\n topic.metadata.refresh.interval.ms: %s\n" "security.protocol: %s", "kafka", queue_buffering_max_messages, topic_metadata_refresh_interval_ms, security_protocol); rdkafka_conf = rd_kafka_conf_new(); rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", queue_buffering_max_messages, kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", topic_metadata_refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "security.protocol", security_protocol, kafka_errstr, sizeof(kafka_errstr)); //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); rdkafka_conf = NULL; if(kafka_handle == NULL){ goto error_out; } return kafka_handle; error_out: if(rdkafka_conf != NULL){ rd_kafka_conf_destroy(rdkafka_conf); rdkafka_conf = NULL; } if(kafka_handle != NULL){ rd_kafka_destroy(kafka_handle); kafka_handle = NULL; } return NULL; } void kni_send_logger_destroy(struct kni_send_logger *handle){ if(handle != NULL){ if(handle->kafka_topic != NULL){ rd_kafka_topic_destroy(handle->kafka_topic); handle->kafka_topic = NULL; } if(handle->kafka_handle != NULL){ rd_kafka_destroy(handle->kafka_handle); handle->kafka_handle = NULL; } FREE(&handle); } } struct kni_send_logger* kni_send_logger_init(const char *profile, void *local_logger){ struct kni_send_logger *handle = NULL; const char *section = "send_logger"; int sendlog_switch = -1; char kafka_topic[KNI_SYMBOL_MAX] = ""; char kafka_brokerlist[KNI_SYMBOL_MAX] = ""; rd_kafka_t *kafka_handle = NULL; rd_kafka_topic_t *topic = NULL; int ret = MESA_load_profile_int_nodef(profile, section, "switch", &sendlog_switch); if(ret < 0){ KNI_LOG_ERROR(local_logger, "MESA_prof_load: switch not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "kafka_topic", kafka_topic, sizeof(kafka_topic)); if(ret < 0){ KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_topic not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "kafka_brokerlist", kafka_brokerlist, sizeof(kafka_brokerlist)); if(ret < 0){ KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_brokerlist not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n switch: %d\n kafka_topic: %s\n, kafka_brokerlist: %s", section, sendlog_switch, kafka_topic, kafka_brokerlist); handle = ALLOC(struct kni_send_logger, 1); handle->local_logger = local_logger; //sendlog_switch = 0, do not sendto kafka if(sendlog_switch == 0){ handle->sendlog_switch = 0; return handle; } handle->sendlog_switch = 1; //init kafka kafka_handle = kafka_init(profile, local_logger); if(kafka_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init kafka"); goto error_out; } handle->kafka_handle = kafka_handle; //kafka_brokerlist ret = rd_kafka_brokers_add(kafka_handle, kafka_brokerlist); if(ret == 0){ KNI_LOG_ERROR(local_logger, "Failed at add kafka_brokers"); goto error_out; } //kafka topic topic = rd_kafka_topic_new(kafka_handle, kafka_topic, NULL); if(topic == NULL){ KNI_LOG_ERROR(local_logger, "Failed at new kafka topic"); goto error_out; } handle->kafka_topic = topic; return handle; error_out: kni_send_logger_destroy(handle); return NULL; } int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len){ if(handle->sendlog_switch == 0){ return 0; } void *logger = handle->local_logger; //kafka produce int kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, log_msg, log_msg_len, NULL, 0, NULL); if(kafka_status < 0){ KNI_LOG_ERROR(logger, "Kafka: Failed to produce, error is %s", rd_kafka_err2name(rd_kafka_last_error())); return -1; } return 0; }