#include #include #include #include #include // return INADDR_NONE if error occur static unsigned int get_ip_by_eth_name(const char *ifname) { int sockfd = -1; struct ifreq ifr; unsigned int ip; sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (-1 == sockfd) { goto error; } strcpy(ifr.ifr_name, ifname); if (ioctl(sockfd, SIOCGIFADDR, &ifr) == -1) { goto error; } close(sockfd); ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr; return ip; error: if (sockfd > 0) close(sockfd); return INADDR_NONE; } static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logger) { int ret; char kafka_errstr[1024] = {0}; rd_kafka_t *handle = NULL; rd_kafka_conf_t *rconf = NULL; rconf = rd_kafka_conf_new(); ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { TFE_LOG_ERROR(local_logger, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); rd_kafka_conf_destroy(rconf); return NULL; } ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { TFE_LOG_ERROR(local_logger, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); rd_kafka_conf_destroy(rconf); return NULL; } ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { TFE_LOG_ERROR(local_logger, "Error to set kafka \"security.protocol\", %s.", kafka_errstr); rd_kafka_conf_destroy(rconf); return NULL; } //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr)); rconf = NULL; if (handle == NULL) { TFE_LOG_ERROR(local_logger, "Error to new kafka, %s.", kafka_errstr); return NULL; } if (rd_kafka_brokers_add(handle, brokerlist) == 0) { TFE_LOG_ERROR(local_logger, "Error to add kakfa bokers."); rd_kafka_destroy(handle); return NULL; } return handle; } tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger) { tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t)); if (!logger) return NULL; logger->enable = enable; if (!logger->enable) return logger; logger->local_ip_num = get_ip_by_eth_name(nic_name); if (logger->local_ip_num == INADDR_NONE) { TFE_LOG_ERROR(local_logger, "Error to get NIC_NAME: %s.", nic_name); free(logger); return NULL; } inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str)); strncpy(logger->broker_list, brokerlist, strlen(brokerlist)); logger->kafka_handle = create_kafka_handle(logger->broker_list, local_logger); if (logger->kafka_handle == NULL) { TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list); free(logger); return NULL; } strncpy(logger->topic_name, topic_name, strlen(topic_name)); logger->kafka_topic = rd_kafka_topic_new(logger->kafka_handle, logger->topic_name, NULL); if (logger->kafka_topic == NULL) { TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name); rd_kafka_destroy(logger->kafka_handle); free(logger); return NULL; } return logger; } void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger) { if (logger) { if (logger->kafka_handle) rd_kafka_destroy(logger->kafka_handle); if (logger->kafka_topic) rd_kafka_topic_destroy(logger->kafka_topic); free(logger); logger = NULL; } } int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len) { if (logger && logger->enable) return rd_kafka_produce(logger->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL); else return 0; }