2020-03-09 16:28:56 +08:00
|
|
|
#include <sys/ioctl.h>
|
|
|
|
|
#include <unistd.h>
|
|
|
|
|
#include <arpa/inet.h>
|
|
|
|
|
#include <net/if.h>
|
|
|
|
|
|
|
|
|
|
#include <tfe_kafka_logger.h>
|
|
|
|
|
|
|
|
|
|
// return INADDR_NONE if error occur
|
|
|
|
|
static unsigned int get_ip_by_eth_name(const char *ifname)
|
|
|
|
|
{
|
|
|
|
|
int sockfd = -1;
|
|
|
|
|
struct ifreq ifr;
|
|
|
|
|
unsigned int ip;
|
|
|
|
|
|
|
|
|
|
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
|
|
|
|
|
if (-1 == sockfd)
|
|
|
|
|
{
|
|
|
|
|
goto error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
strcpy(ifr.ifr_name, ifname);
|
|
|
|
|
if (ioctl(sockfd, SIOCGIFADDR, &ifr) == -1)
|
|
|
|
|
{
|
|
|
|
|
goto error;
|
|
|
|
|
}
|
|
|
|
|
close(sockfd);
|
|
|
|
|
|
|
|
|
|
ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr;
|
|
|
|
|
return ip;
|
|
|
|
|
|
|
|
|
|
error:
|
|
|
|
|
if (sockfd > 0)
|
|
|
|
|
close(sockfd);
|
|
|
|
|
return INADDR_NONE;
|
|
|
|
|
}
|
|
|
|
|
|
2021-08-19 16:24:19 +08:00
|
|
|
static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
|
2020-03-09 16:28:56 +08:00
|
|
|
{
|
|
|
|
|
int ret;
|
|
|
|
|
char kafka_errstr[1024] = {0};
|
|
|
|
|
rd_kafka_t *handle = NULL;
|
|
|
|
|
rd_kafka_conf_t *rconf = NULL;
|
|
|
|
|
|
|
|
|
|
rconf = rd_kafka_conf_new();
|
|
|
|
|
|
|
|
|
|
ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
|
|
|
|
|
rd_kafka_conf_destroy(rconf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
|
|
|
|
|
rd_kafka_conf_destroy(rconf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to set kafka \"security.protocol\", %s.", kafka_errstr);
|
|
|
|
|
rd_kafka_conf_destroy(rconf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
2021-08-19 16:24:19 +08:00
|
|
|
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
|
|
|
|
|
{
|
|
|
|
|
rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.username\", %s.", kafka_errstr);
|
|
|
|
|
rd_kafka_conf_destroy(rconf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.password\", %s.", kafka_errstr);
|
|
|
|
|
rd_kafka_conf_destroy(rconf);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2020-03-09 16:28:56 +08:00
|
|
|
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
|
|
|
|
|
handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr));
|
|
|
|
|
rconf = NULL;
|
|
|
|
|
if (handle == NULL)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to new kafka, %s.", kafka_errstr);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (rd_kafka_brokers_add(handle, brokerlist) == 0)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to add kakfa bokers.");
|
|
|
|
|
rd_kafka_destroy(handle);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return handle;
|
|
|
|
|
}
|
|
|
|
|
|
2021-08-19 16:24:19 +08:00
|
|
|
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger)
|
2020-03-09 16:28:56 +08:00
|
|
|
{
|
|
|
|
|
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
|
|
|
|
|
if (!logger)
|
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
|
|
logger->enable = enable;
|
|
|
|
|
if (!logger->enable)
|
|
|
|
|
return logger;
|
|
|
|
|
|
|
|
|
|
logger->local_ip_num = get_ip_by_eth_name(nic_name);
|
|
|
|
|
if (logger->local_ip_num == INADDR_NONE)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to get NIC_NAME: %s.", nic_name);
|
|
|
|
|
free(logger);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str));
|
|
|
|
|
|
|
|
|
|
strncpy(logger->broker_list, brokerlist, strlen(brokerlist));
|
2021-08-19 16:24:19 +08:00
|
|
|
logger->kafka_handle = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, local_logger);
|
2020-03-09 16:28:56 +08:00
|
|
|
if (logger->kafka_handle == NULL)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list);
|
|
|
|
|
free(logger);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
strncpy(logger->topic_name, topic_name, strlen(topic_name));
|
|
|
|
|
logger->kafka_topic = rd_kafka_topic_new(logger->kafka_handle, logger->topic_name, NULL);
|
|
|
|
|
if (logger->kafka_topic == NULL)
|
|
|
|
|
{
|
|
|
|
|
TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name);
|
|
|
|
|
rd_kafka_destroy(logger->kafka_handle);
|
|
|
|
|
free(logger);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return logger;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger)
|
|
|
|
|
{
|
|
|
|
|
if (logger)
|
|
|
|
|
{
|
|
|
|
|
if (logger->kafka_handle)
|
|
|
|
|
rd_kafka_destroy(logger->kafka_handle);
|
|
|
|
|
|
|
|
|
|
if (logger->kafka_topic)
|
|
|
|
|
rd_kafka_topic_destroy(logger->kafka_topic);
|
|
|
|
|
|
|
|
|
|
free(logger);
|
|
|
|
|
logger = NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len)
|
|
|
|
|
{
|
|
|
|
|
if (logger && logger->enable)
|
|
|
|
|
return rd_kafka_produce(logger->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL);
|
|
|
|
|
else
|
|
|
|
|
return 0;
|
|
|
|
|
}
|