This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/business/pangu-http/src/pangu_logger.cpp

244 lines
7.8 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <cjson/cJSON.h>
#include <librdkafka/rdkafka.h>
#include <MESA/MESA_handle_logger.h>
#include <MESA/MESA_prof_load.h>
#include <assert.h>
#include <arpa/inet.h>
#include <time.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <net/if.h>
#include <pthread.h>
#include <errno.h>
#include <tfe_utils.h>
#include "pangu_logger.h"
struct json_spec
{
const char *log_filed_name;
enum tfe_http_std_field field_id;
};
struct pangu_logger
{
char local_ip_str[TFE_SYMBOL_MAX];
int entry_id;
unsigned int local_ip_nr;
void* global_logger;
rd_kafka_t *kafka_handle;
rd_kafka_topic_t* kafka_topic;
pthread_mutex_t mutex;
char brokerlist[TFE_STRING_MAX];
const char* topic_name;
void* local_logger;
unsigned long long send_cnt;
unsigned long long random_drop;
unsigned long long user_abort;
char local_log_path[TFE_STRING_MAX];
};
static unsigned int get_ip_by_eth_name(const char *ifname)
{
int sockfd;
struct ifreq ifr;
unsigned int ip;
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (-1 == sockfd)
{
goto error;
}
strcpy(ifr.ifr_name,ifname);
if (ioctl(sockfd, SIOCGIFADDR, &ifr) < 0)
{
goto error;
}
ip = ((struct sockaddr_in*)&(ifr.ifr_addr))->sin_addr.s_addr;
close(sockfd);
return ip;
error:
close(sockfd);
return INADDR_NONE;
}
static rd_kafka_t * create_kafka_handle(const char* brokerlist)
{
char kafka_errstr[1024];
rd_kafka_t *handle=NULL;
rd_kafka_conf_t *rdkafka_conf = NULL;
rdkafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "security.protocol", "MG", kafka_errstr, sizeof(kafka_errstr));
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
rdkafka_conf=NULL;
if (handle==NULL)
{
return NULL;
}
if (rd_kafka_brokers_add(handle, brokerlist) == 0)
{
rd_kafka_destroy(handle);
return NULL;
}
return handle;
}
struct pangu_logger* pangu_log_handle_create(const char* profile, const char* section, void* local_logger)
{
int ret=-1;
char nic_name[64]={0};
struct pangu_logger* instance=ALLOC(struct pangu_logger,1);
instance->local_logger=local_logger;
TFE_LOG_ERROR(local_logger,"Pangu log is inititating from %s section %s.", profile, section);
MESA_load_profile_string_def(profile, section, "NIC_NAME",nic_name,sizeof(nic_name),"eth0");
instance->local_ip_nr=get_ip_by_eth_name(nic_name);
if(instance->local_ip_nr==INADDR_NONE)
{
TFE_LOG_ERROR(local_logger, "%s get NIC_NAME: %s error.", __FUNCTION__, nic_name);
goto error_out;
}
inet_ntop(AF_INET,&(instance->local_ip_nr),instance->local_ip_str,sizeof(instance->local_ip_str));
MESA_load_profile_int_def(profile, section, "ENTRANCE_ID",&(instance->entry_id),0);
ret=MESA_load_profile_string_def(profile, section,"KAFKA_BROKERLIST", instance->brokerlist, sizeof(instance->brokerlist), NULL);
if(ret<0)
{
TFE_LOG_ERROR(local_logger,"Pangu log init failed, no brokerlist in profile %s section %s.", profile, section);
goto error_out;
}
instance->kafka_handle=create_kafka_handle(instance->brokerlist);
if(instance->kafka_handle==NULL)
{
TFE_LOG_ERROR(local_logger,"Pangu log init failed. Cannot create lafka handle with brokerlist: %s.", instance->brokerlist);
goto error_out;
}
instance->topic_name="PXY-HTTP-LOG";
instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL);
pthread_mutex_init(&(instance->mutex), NULL);
return instance;
error_out:
free(instance);
return NULL;
}
int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg)
{
const struct tfe_http_session* http=log_msg->http;
const struct tfe_stream_addr* addr=log_msg->stream->addr;
const char* tmp_val=NULL;
cJSON *common_obj=NULL, *per_hit_obj=NULL;
char* log_payload=NULL;
int kafka_status=0;
int send_cnt=0;
time_t cur_time;
char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0};
struct json_spec req_fields[]={ {"cookie", TFE_HTTP_COOKIE},
{"referer", TFE_HTTP_REFERER},
{"user_agent", TFE_HTTP_USER_AGENT} };
struct json_spec resp_fields[]={ {"content_type", TFE_HTTP_CONT_TYPE},
{"content_len", TFE_HTTP_CONT_LENGTH} };
common_obj=cJSON_CreateObject();
cur_time = time(NULL);
cJSON_AddNumberToObject(common_obj, "found_time", cur_time);
cJSON_AddNumberToObject(common_obj, "recv_time", cur_time);
switch(addr->addrtype)
{
case TFE_ADDR_STREAM_TUPLE4_V4:
cJSON_AddNumberToObject(common_obj, "addr_type", 4);
inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str));
inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str));
cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str);
cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str);
cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->tuple4_v4->source));
cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->tuple4_v4->dest));
cJSON_AddStringToObject(common_obj, "trans_proto", "IPv4_TCP");
break;
case TFE_ADDR_STREAM_TUPLE4_V6:
cJSON_AddNumberToObject(common_obj, "addr_type", 6);
inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str));
inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str));
cJSON_AddStringToObject(common_obj, "s_ip", src_ip_str);
cJSON_AddStringToObject(common_obj, "d_ip", dst_ip_str);
cJSON_AddNumberToObject(common_obj, "s_port", ntohs(addr->tuple4_v6->source));
cJSON_AddNumberToObject(common_obj, "d_port", ntohs(addr->tuple4_v6->dest));
cJSON_AddStringToObject(common_obj, "trans_proto", "IPv6_TCP");
break;
default:
break;
}
cJSON_AddNumberToObject(common_obj, "direction", 0); //0域内->域外1域外->域内描述的是CLIENT_IP信息
cJSON_AddNumberToObject(common_obj, "stream_dir", 3); //1:c2s, 2:s2c, 3:double
cJSON_AddStringToObject(common_obj, "cap_ip", handle->local_ip_str);
cJSON_AddNumberToObject(common_obj, "entrance_id", handle->entry_id);
cJSON_AddNumberToObject(common_obj, "device_id", 0);
cJSON_AddStringToObject(common_obj, "user_region", "null");
cJSON_AddStringToObject(common_obj, "url", http->req->req_spec.url);
for(size_t i=0;i<sizeof(req_fields)/sizeof(struct json_spec);i++)
{
tmp_val=tfe_http_std_field_read(http->req, req_fields[i].field_id);
if(tmp_val!=NULL)
{
cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val);
}
}
for(size_t i=0;i<sizeof(resp_fields)/sizeof(struct json_spec) && http->resp!=NULL;i++)
{
tmp_val=tfe_http_std_field_read(http->resp, resp_fields[i].field_id);
if(tmp_val!=NULL)
{
cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val);
}
}
for(size_t i=0; i<log_msg->result_num; i++)
{
if(log_msg->result[i].do_log==0)
{
continue;
}
per_hit_obj=cJSON_Duplicate(common_obj, 1);
cJSON_AddNumberToObject(per_hit_obj, "cfg_id", log_msg->result[i].config_id);
cJSON_AddNumberToObject(per_hit_obj, "service", log_msg->result[i].service_id);
log_payload = cJSON_Print(per_hit_obj);
fprintf(stderr, "%s\n", log_payload);
kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
log_payload, strlen(log_payload), NULL, 0, NULL);
free(log_payload);
cJSON_Delete(per_hit_obj);
if(kafka_status<0)
{
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
}
send_cnt++;
}
cJSON_Delete(common_obj);
return send_cnt;
}