feature: TSG-21853 Refactoring TFE Kafka infrastructure

This commit is contained in:
luwenpeng
2024-07-19 18:20:04 +08:00
parent 88a7a8c5c4
commit 2045d517ca
25 changed files with 484 additions and 662 deletions

View File

@@ -1,12 +1,12 @@
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
#include <tfe_kafka_logger.h>
#include <tfe_utils.h>
#include <tfe_resource.h>
#include <event2/event.h>
#include <event.h>
#include "kafka.h"
#include "mpack.h"
#include "tsg_proxy_logger.h"
@@ -19,15 +19,12 @@ struct proxy_logger
{
int entry_id;
unsigned int en_sendlog;
const char *device_id;
const char *effective_device_tag;
void* local_logger;
unsigned long long send_cnt;
unsigned long long random_drop;
unsigned long long user_abort;
char local_log_path[TFE_STRING_MAX];
tfe_kafka_logger_t *kafka_logger;
struct cache_evbase_instance * log_file_upload_instance;
};
@@ -59,7 +56,6 @@ void get_http_body_uuid(char *uuid)
size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body)
{
int kafka_status=0;
mpack_writer_t writer;
char *mpack_data=NULL, *data=NULL;
size_t mpack_size=0, datalen=0;
@@ -94,11 +90,7 @@ size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct e
{
TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid);
}
kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size);
if(kafka_status<0)
{
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
}
kafka_send(tfe_get_kafka_handle(), TOPIC_FILE_STREAM, mpack_data, mpack_size);
free(mpack_data);
mpack_data = NULL;
@@ -112,29 +104,10 @@ struct proxy_logger* proxy_log_handle_create(const char* profile, const char* se
struct proxy_logger* instance=ALLOC(struct proxy_logger,1);
instance->local_logger=local_logger;
TFE_LOG_INFO(local_logger,"Tsg-Pxy log is inititating from %s section %s.", profile, section);
MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1);
TFE_LOG_INFO(local_logger, "Tsg-Pxy sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE");
if (!instance->en_sendlog)
{
return instance;
}
instance->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID);
instance->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG);
instance->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER);
if (instance->kafka_logger && !instance->kafka_logger->enable)
{
TFE_LOG_ERROR(local_logger, "Tsg-Pxy sendlog ENABLE, but tfe kafka logger DISABLED.");
goto error_out;
}
return instance;
error_out:
free(instance);
return NULL;
}
static int get_ip_client_geolocation(struct tfe_cmsg * cmsg, cJSON *per_hit_obj)
@@ -193,7 +166,6 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
const char* tmp_val=NULL;
cJSON *common_obj=NULL, *per_hit_obj=NULL;
char* log_payload=NULL;
int kafka_status=0;
int send_cnt=0;
struct timeval cur_time;
char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
@@ -305,17 +277,17 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
cJSON_AddStringToObject(common_obj, "ip_protocol", "tcp");
cJSON_AddNumberToObject(common_obj, "out_link_id", 0);
cJSON_AddNumberToObject(common_obj, "in_link_id", 0);
cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str);
cJSON_AddNumberToObject(common_obj, "t_vsys_id", handle->kafka_logger->t_vsys_id);
cJSON_AddStringToObject(common_obj, "device_id", handle->device_id);
cJSON_AddStringToObject(common_obj, "sled_ip", tfe_get_sled_ip());
cJSON_AddNumberToObject(common_obj, "t_vsys_id", tfe_get_vsys_id());
cJSON_AddStringToObject(common_obj, "device_id", tfe_get_device_id());
cJSON_AddNumberToObject(common_obj, "sent_bytes", c2s_byte_num);
cJSON_AddNumberToObject(common_obj, "received_bytes", s2c_byte_num);
cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url);
proxy_add_host_to_object(common_obj, http->req->req_spec.host);
if(handle->effective_device_tag)
if (tfe_get_device_tag())
{
cJSON_AddStringToObject(common_obj, "device_tag", handle->effective_device_tag);
cJSON_AddStringToObject(common_obj, "device_tag", tfe_get_device_tag());
}
for(size_t i=0;i<sizeof(req_fields)/sizeof(struct json_spec);i++)
@@ -459,14 +431,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload);
kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload));
if (kafka_send(tfe_get_kafka_handle(), TOPIC_PROXY_EVENT, log_payload, strlen(log_payload)) == 0)
{
send_cnt++;
}
free(log_payload);
cJSON_Delete(per_hit_obj);
if(kafka_status<0)
{
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
}
send_cnt++;
}
cJSON_Delete(common_obj);