This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/business/doh/src/logger.cpp
fengweihao d4dc6394ac TSG-12005 Proxy日志增加common_vsys_id字段
TSG-12080 中间证书缓存Kafka输出Json增加vsys_id字段
2022-09-23 15:34:50 +08:00

490 lines
22 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "logger.h"
struct json_spec
{
const char *log_filed_name;
enum tfe_http_std_field field_id;
};
enum _log_action //Bigger action number is prior.
{
LG_ACTION_NONE = 0x00,
LG_ACTION_MONIT = 0x01,
LG_ACTION_FORWARD = 0x02, /* N/A */
LG_ACTION_REJECT = 0x10,
LG_ACTION_DROP = 0x20, /* N/A */
LG_ACTION_MANIPULATE = 0x30,
LG_ACTION_RATELIMIT = 0x40, /* N/A */
LG_ACTION_LOOP = 0x60, /* N/A */
LG_ACTION_WHITELIST = 0x80,
__LG_ACTION_MAX
};
static int get_rr_str2json(cJSON *object, dns_info_t *dns_info, int *dns_sec)
{
int i = 0;
char ip_str[128];
dns_rr_t *dns_rr = NULL;
cJSON *one_rr_object = NULL;
cJSON *dns_rr_array = NULL;
if (object == NULL || dns_info == NULL || dns_sec == NULL)
{
return -1;
}
dns_rr_array = cJSON_CreateArray();
for (i = 0; i < dns_info->rr_count; i++)
{
one_rr_object = cJSON_CreateObject();
dns_rr = &(dns_info->rr[i]);
if (dns_rr->type == DNS_TYPE_OPT)
{
cJSON_AddStringToObject(one_rr_object, "name", (const char *)(dns_rr->name));
cJSON_AddNumberToObject(one_rr_object, "type", dns_rr->type);
cJSON_AddNumberToObject(one_rr_object, "udp_payload", dns_rr->rr_class);
cJSON_AddNumberToObject(one_rr_object, "rcode", (int)(dns_rr->ttl >> 24));
cJSON_AddNumberToObject(one_rr_object, "version", (int)((dns_rr->ttl >> 16) & 0xFF));
cJSON_AddNumberToObject(one_rr_object, "Z", (int)(dns_rr->ttl && 0xFFFF));
cJSON_AddNumberToObject(one_rr_object, "rdlength", dns_rr->rdlength);
}
else
{
cJSON_AddStringToObject(one_rr_object, "name", (const char *)(dns_rr->name));
cJSON_AddNumberToObject(one_rr_object, "type", dns_rr->type);
cJSON_AddNumberToObject(one_rr_object, "class", dns_rr->rr_class);
cJSON_AddNumberToObject(one_rr_object, "ttl", dns_rr->ttl);
cJSON_AddNumberToObject(one_rr_object, "rdlength", dns_rr->rdlength);
}
if (dns_rr->rdata.a == NULL)
{
cJSON_AddItemToArray(dns_rr_array, one_rr_object);
continue;
}
switch (dns_rr->type)
{
case DNS_TYPE_A:
inet_ntop(AF_INET, (void *)(dns_rr->rdata.a), ip_str, sizeof(ip_str));
cJSON_AddStringToObject(one_rr_object, "a", ip_str);
break;
case DNS_TYPE_NS:
cJSON_AddStringToObject(one_rr_object, "ns", (const char *)(dns_rr->rdata.ns));
break;
case DNS_TYPE_MD:
cJSON_AddStringToObject(one_rr_object, "md", (const char *)(dns_rr->rdata.md));
break;
case DNS_TYPE_MF:
cJSON_AddStringToObject(one_rr_object, "mf", (const char *)(dns_rr->rdata.mf));
break;
case DNS_TYPE_CNAME:
cJSON_AddStringToObject(one_rr_object, "cname", (const char *)(dns_rr->rdata.cname));
break;
case DNS_TYPE_SOA:
cJSON_AddStringToObject(one_rr_object, "mname", (const char *)(dns_rr->rdata.soa.mname));
cJSON_AddStringToObject(one_rr_object, "rname", (const char *)(dns_rr->rdata.soa.rname));
cJSON_AddNumberToObject(one_rr_object, "serial", dns_rr->rdata.soa.serial);
cJSON_AddNumberToObject(one_rr_object, "refresh", dns_rr->rdata.soa.refresh);
cJSON_AddNumberToObject(one_rr_object, "retry", dns_rr->rdata.soa.retry);
cJSON_AddNumberToObject(one_rr_object, "cname", dns_rr->rdata.soa.expire);
cJSON_AddNumberToObject(one_rr_object, "minimum", dns_rr->rdata.soa.minimum);
break;
case DNS_TYPE_MB:
cJSON_AddStringToObject(one_rr_object, "mb", (const char *)(dns_rr->rdata.mb));
break;
case DNS_TYPE_MG:
cJSON_AddStringToObject(one_rr_object, "mg", (const char *)(dns_rr->rdata.mg));
break;
case DNS_TYPE_MR:
cJSON_AddStringToObject(one_rr_object, "mr", (const char *)(dns_rr->rdata.mr));
break;
case DNS_TYPE_NULL:
cJSON_AddNumberToObject(one_rr_object, "size", dns_rr->rdata.null.size);
cJSON_AddStringToObject(one_rr_object, "null", (const char *)(dns_rr->rdata.null.null));
break;
case DNS_TYPE_WKS:
cJSON_AddStringToObject(one_rr_object, "addr", ip_str);
cJSON_AddNumberToObject(one_rr_object, "protocol", dns_rr->rdata.wks.protocol);
cJSON_AddStringToObject(one_rr_object, "bitmap", (const char *)(dns_rr->rdata.wks.bitmap));
cJSON_AddNumberToObject(one_rr_object, "size", dns_rr->rdata.wks.size);
break;
case DNS_TYPE_PTR:
cJSON_AddStringToObject(one_rr_object, "ptr", (const char *)(dns_rr->rdata.ptr));
break;
case DNS_TYPE_HINFO:
cJSON_AddStringToObject(one_rr_object, "cpu", (const char *)(dns_rr->rdata.hinfo.cpu));
cJSON_AddStringToObject(one_rr_object, "os", (const char *)(dns_rr->rdata.hinfo.os));
break;
case DNS_TYPE_MINFO:
cJSON_AddStringToObject(one_rr_object, "rmailbx", (const char *)(dns_rr->rdata.minfo.rmailbx));
cJSON_AddStringToObject(one_rr_object, "emailbx", (const char *)(dns_rr->rdata.minfo.emailbx));
break;
case DNS_TYPE_MX:
cJSON_AddStringToObject(one_rr_object, "exchange", (const char *)(dns_rr->rdata.mx.exchange));
cJSON_AddNumberToObject(one_rr_object, "preference", dns_rr->rdata.mx.preference);
break;
case DNS_TYPE_TXT:
cJSON_AddStringToObject(one_rr_object, "txt", (char *)(dns_rr->rdata.txt.txt));
cJSON_AddNumberToObject(one_rr_object, "size", dns_rr->rdata.txt.size);
break;
case DNS_TYPE_RP:
cJSON_AddStringToObject(one_rr_object, "mailbox", (char *)(dns_rr->rdata.rp.mailbox));
cJSON_AddStringToObject(one_rr_object, "txt_rr", (char *)(dns_rr->rdata.rp.txt_rr));
break;
case DNS_TYPE_AAAA:
inet_ntop(AF_INET6, dns_rr->rdata.aaaa, ip_str, sizeof(ip_str));
cJSON_AddStringToObject(one_rr_object, "aaaa", ip_str);
break;
case DNS_TYPE_OPT:
break;
case DNS_TYPE_DS:
*dns_sec = 2;
cJSON_AddNumberToObject(one_rr_object, "key_tag", dns_rr->rdata.ds.key_tag);
cJSON_AddNumberToObject(one_rr_object, "algo", dns_rr->rdata.ds.algo);
cJSON_AddNumberToObject(one_rr_object, "digest_type", dns_rr->rdata.ds.digest_type);
cJSON_AddStringToObject(one_rr_object, "digest", (char *)(dns_rr->rdata.ds.digest));
break;
case DNS_TYPE_RRSIG:
*dns_sec = 2;
cJSON_AddNumberToObject(one_rr_object, "type_covered", dns_rr->rdata.rrsig.type_covered);
cJSON_AddNumberToObject(one_rr_object, "algo", dns_rr->rdata.rrsig.algo);
cJSON_AddNumberToObject(one_rr_object, "labels", dns_rr->rdata.rrsig.labels);
cJSON_AddNumberToObject(one_rr_object, "original_ttl", dns_rr->rdata.rrsig.original_ttl);
cJSON_AddNumberToObject(one_rr_object, "sig_expiration", dns_rr->rdata.rrsig.sig_expiration);
cJSON_AddNumberToObject(one_rr_object, "sig_inception", dns_rr->rdata.rrsig.sig_inception);
cJSON_AddNumberToObject(one_rr_object, "key_tag", dns_rr->rdata.rrsig.key_tag);
cJSON_AddStringToObject(one_rr_object, "signer_name", (const char *)(dns_rr->rdata.rrsig.signer_name));
cJSON_AddStringToObject(one_rr_object, "signature", (char *)(dns_rr->rdata.rrsig.signature));
break;
case DNS_TYPE_NSEC:
*dns_sec = 2;
cJSON_AddStringToObject(one_rr_object, "next_domain", (const char *)(dns_rr->rdata.nsec.next_domain));
cJSON_AddStringToObject(one_rr_object, "type_bit_maps", (char *)(dns_rr->rdata.nsec.type_bit_maps));
break;
case DNS_TYPE_DNSKEY:
*dns_sec = 2;
cJSON_AddNumberToObject(one_rr_object, "flags", dns_rr->rdata.dnskey.flags);
cJSON_AddNumberToObject(one_rr_object, "protocol", dns_rr->rdata.dnskey.protocol);
cJSON_AddNumberToObject(one_rr_object, "algo", dns_rr->rdata.dnskey.algo);
cJSON_AddStringToObject(one_rr_object, "public_key", (char *)(dns_rr->rdata.dnskey.public_key));
break;
case DNS_TYPE_NSEC3:
*dns_sec = 2;
cJSON_AddNumberToObject(one_rr_object, "hash_algo", dns_rr->rdata.nsec3.hash_algo);
cJSON_AddNumberToObject(one_rr_object, "flags", dns_rr->rdata.nsec3.flags);
cJSON_AddNumberToObject(one_rr_object, "iteration", dns_rr->rdata.nsec3.iteration);
cJSON_AddNumberToObject(one_rr_object, "salt_len", dns_rr->rdata.nsec3.salt_len);
cJSON_AddNumberToObject(one_rr_object, "hash_len", dns_rr->rdata.nsec3.hash_len);
cJSON_AddStringToObject(one_rr_object, "salt_value", (char *)(dns_rr->rdata.nsec3.salt_value));
cJSON_AddStringToObject(one_rr_object, "next_hash_owner", (char *)(dns_rr->rdata.nsec3.next_hash_owner));
cJSON_AddStringToObject(one_rr_object, "type_bit_maps", (char *)(dns_rr->rdata.nsec3.type_bit_maps));
break;
case DNS_TYPE_NSEC3PARAM:
cJSON_AddNumberToObject(one_rr_object, "hash_algo", dns_rr->rdata.nsec3param.hash_algo);
cJSON_AddNumberToObject(one_rr_object, "flags", dns_rr->rdata.nsec3param.flags);
cJSON_AddNumberToObject(one_rr_object, "iteration", dns_rr->rdata.nsec3param.iteration);
cJSON_AddNumberToObject(one_rr_object, "salt_len", dns_rr->rdata.nsec3param.salt_len);
cJSON_AddStringToObject(one_rr_object, "salt_value", (char *)(dns_rr->rdata.nsec3param.salt_value));
break;
case DNS_QTYPE_AXFR:
break;
case DNS_QTYPE_MAILB:
continue;
break;
case DNS_QTYPE_MAILA:
break;
case DNS_QTYPE_ANY:
break;
default:
break;
}
cJSON_AddItemToArray(dns_rr_array, one_rr_object);
}
cJSON_AddItemToObject(object, "rr", dns_rr_array);
return 0;
}
static void add_dns_info_to_log(cJSON *common_obj, dns_info_t *dns_info)
{
int i = 0;
dns_rr_t *rr = NULL;
int dns_sec = 1;
char *cname = NULL, *rr_buf = NULL;
cJSON_AddNumberToObject(common_obj, "doh_qr", dns_info->hdr_info.qr);
cJSON_AddNumberToObject(common_obj, "doh_aa", dns_info->hdr_info.aa);
cJSON_AddNumberToObject(common_obj, "doh_message_id", dns_info->hdr_info.id);
cJSON_AddNumberToObject(common_obj, "doh_opcode", dns_info->hdr_info.opcode);
cJSON_AddNumberToObject(common_obj, "doh_ra", dns_info->hdr_info.ra);
cJSON_AddNumberToObject(common_obj, "doh_rcode", dns_info->hdr_info.rcode);
cJSON_AddNumberToObject(common_obj, "doh_rd", dns_info->hdr_info.rd);
cJSON_AddNumberToObject(common_obj, "doh_tc", dns_info->hdr_info.tc);
cJSON_AddNumberToObject(common_obj, "doh_qdcount", dns_info->hdr_info.qdcount);
cJSON_AddNumberToObject(common_obj, "doh_ancount", dns_info->hdr_info.ancount);
cJSON_AddNumberToObject(common_obj, "doh_nscount", dns_info->hdr_info.aucount);
cJSON_AddNumberToObject(common_obj, "doh_arcount", dns_info->hdr_info.adcount);
if ((strlen((char *)dns_info->query_question.qname)) > 0)
{
cJSON_AddStringToObject(common_obj, "doh_qname", (char *)dns_info->query_question.qname);
cJSON_AddNumberToObject(common_obj, "doh_qtype", dns_info->query_question.qtype);
cJSON_AddNumberToObject(common_obj, "doh_qclass", dns_info->query_question.qclass);
}
cJSON *item = NULL;
cJSON *cname_array = cJSON_CreateArray();
for (i = 0; i < dns_info->rr_count && dns_info->rr != NULL; i++)
{
rr = &dns_info->rr[i];
if (rr != NULL && rr->type == DNS_TYPE_CNAME)
{
if (rr->rdata.cname != NULL)
{
item = cJSON_CreateString((const char *)rr->rdata.cname);
cJSON_AddItemToArray(cname_array, item);
}
}
}
cname = cJSON_PrintUnformatted(cname_array);
if (cname)
{
if (strlen(cname) > 0)
{
cJSON_AddStringToObject(common_obj, "doh_cname", cname);
}
free(cname);
}
cJSON_Delete(cname_array);
cname_array = NULL;
cJSON *object = cJSON_CreateObject();
get_rr_str2json(object, dns_info, &dns_sec);
rr_buf = cJSON_PrintUnformatted(object);
cJSON_AddStringToObject(common_obj, "doh_rr", rr_buf);
free(rr_buf);
rr_buf = NULL;
cJSON_Delete(object);
object = NULL;
cJSON_AddNumberToObject(common_obj, "doh_sub", dns_sec);
}
int doh_kafka_init(const char *profile, struct doh_conf *conf)
{
const char *section = "kafka";
MESA_load_profile_int_def(profile, section, "ENTRANCE_ID", &(conf->entry_id), 0);
MESA_load_profile_int_def(profile, section, "en_sendlog", &conf->en_sendlog, 1);
if (!conf->en_sendlog)
{
return 0;
}
conf->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID);
conf->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG);
conf->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER);
if (conf->kafka_logger && !conf->kafka_logger->enable)
{
TFE_LOG_ERROR(conf->local_logger, "Doh sendlog ENABLE, but tfe kafka logger DISABLED.");
return -1;
}
return 0;
}
int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, const struct tfe_stream *stream, struct doh_ctx *ctx)
{
Maat_rule_t *result = ctx->result;
size_t result_num = ctx->result_num;
dns_info_t *dns_info = ctx->doh_req;
const struct tfe_stream_addr *addr = stream->addr;
const char *tmp_val = NULL;
cJSON *common_obj = NULL, *per_hit_obj = NULL;
char *log_payload = NULL;
int kafka_status = 0;
int send_cnt = 0;
time_t cur_time;
char src_ip_str[MAX(INET6_ADDRSTRLEN, INET_ADDRSTRLEN)] = {0};
char dst_ip_str[MAX(INET6_ADDRSTRLEN, INET_ADDRSTRLEN)] = {0};
const char *app_proto[] = {"unkonw", "http1", "http2"};
struct json_spec req_fields[] = {{"doh_cookie", TFE_HTTP_COOKIE},
{"doh_referer", TFE_HTTP_REFERER},
{"doh_user_agent", TFE_HTTP_USER_AGENT}};
struct json_spec resp_fields[] = {{"doh_content_type", TFE_HTTP_CONT_TYPE},
{"doh_content_length", TFE_HTTP_CONT_LENGTH},
{"doh_set_cookie", TFE_HTTP_SET_COOKIE}};
if (!handle->en_sendlog)
{
return 0;
}
common_obj = cJSON_CreateObject();
cur_time = time(NULL);
cJSON_AddNumberToObject(common_obj, "common_start_time", cur_time);
cJSON_AddNumberToObject(common_obj, "common_end_time", cur_time);
cJSON_AddStringToObject(common_obj, "doh_version", app_proto[http->major_version]);
cJSON_AddStringToObject(common_obj, "common_schema_type", "DoH");
char opt_val[24] = { 0 };
uint16_t opt_out_size; unsigned int common_direction=0;
struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(stream);
if (cmsg != NULL)
{
int ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_STREAM_TRACE_ID, (unsigned char *)opt_val, sizeof(opt_val), &opt_out_size);
if (ret == 0)
{
cJSON_AddStringToObject(common_obj, "common_stream_trace_id", opt_val);
}
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_COMMON_DIRECTION, (unsigned char *)&common_direction, sizeof(common_direction), &opt_out_size);
if (ret==0)
{
cJSON_AddNumberToObject(common_obj, "common_direction", common_direction); //0域内->域外1域外->域内描述的是CLIENT_IP信息
}
}
if (http->req)
{
char *request_line = NULL;
struct tfe_http_req_spec req_spec = http->req->req_spec;
asprintf(&request_line, "%s %s HTTP/%d.%d", http_std_method_to_string(req_spec.method), req_spec.url, http->major_version, http->minor_version);
cJSON_AddStringToObject(common_obj, "doh_request_line", request_line);
free(request_line);
}
if (http->resp)
{
char *response_line = NULL;
struct tfe_http_resp_spec resp_spec = http->resp->resp_spec;
asprintf(&response_line, "HTTP/%d.%d %d OK", http->major_version, http->minor_version, resp_spec.resp_code);
cJSON_AddStringToObject(common_obj, "doh_response_line", response_line);
free(response_line);
}
switch (addr->addrtype)
{
case TFE_ADDR_STREAM_TUPLE4_V4:
cJSON_AddNumberToObject(common_obj, "common_address_type", 4);
inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str));
inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str));
cJSON_AddStringToObject(common_obj, "common_client_ip", src_ip_str);
cJSON_AddStringToObject(common_obj, "common_server_ip", dst_ip_str);
cJSON_AddNumberToObject(common_obj, "common_client_port", ntohs(addr->tuple4_v4->source));
cJSON_AddNumberToObject(common_obj, "common_server_port", ntohs(addr->tuple4_v4->dest));
cJSON_AddStringToObject(common_obj, "common_l4_protocol", "IPv4_TCP");
break;
case TFE_ADDR_STREAM_TUPLE4_V6:
cJSON_AddNumberToObject(common_obj, "common_address_type", 6);
inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str));
inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str));
cJSON_AddStringToObject(common_obj, "common_client_ip", src_ip_str);
cJSON_AddStringToObject(common_obj, "common_server_ip", dst_ip_str);
cJSON_AddNumberToObject(common_obj, "common_client_port", ntohs(addr->tuple4_v6->source));
cJSON_AddNumberToObject(common_obj, "common_server_port", ntohs(addr->tuple4_v6->dest));
cJSON_AddStringToObject(common_obj, "common_l4_protocol", "IPv6_TCP");
break;
default:
break;
}
size_t c2s_byte_num = 0, s2c_byte_num = 0;
tfe_stream_info_get(stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num));
tfe_stream_info_get(stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num));
cJSON_AddNumberToObject(common_obj, "common_link_id", 0);
cJSON_AddNumberToObject(common_obj, "common_stream_dir", 3); //1:c2s, 2:s2c, 3:double
cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->kafka_logger->local_ip_str);
cJSON_AddNumberToObject(common_obj, "common_vsys_id", handle->kafka_logger->vsys_id);
cJSON_AddNumberToObject(common_obj, "common_entrance_id", handle->entry_id);
cJSON_AddStringToObject(common_obj, "common_device_id", handle->device_id);
cJSON_AddNumberToObject(common_obj, "common_c2s_byte_num", c2s_byte_num);
cJSON_AddNumberToObject(common_obj, "common_s2c_byte_num", s2c_byte_num);
cJSON_AddStringToObject(common_obj, "doh_url", http->req->req_spec.url);
cJSON_AddStringToObject(common_obj, "doh_host", http->req->req_spec.host);
if(handle->effective_device_tag)
{
cJSON_AddStringToObject(common_obj, "common_device_tag", handle->effective_device_tag);
}
for (size_t i = 0; i < sizeof(req_fields) / sizeof(struct json_spec); i++)
{
tmp_val = tfe_http_std_field_read(http->req, req_fields[i].field_id);
if (tmp_val != NULL)
{
cJSON_AddStringToObject(common_obj, req_fields[i].log_filed_name, tmp_val);
}
}
for (size_t i = 0; i < sizeof(resp_fields) / sizeof(struct json_spec) && http->resp != NULL; i++)
{
tmp_val = tfe_http_std_field_read(http->resp, resp_fields[i].field_id);
if (tmp_val != NULL)
{
cJSON_AddStringToObject(common_obj, resp_fields[i].log_filed_name, tmp_val);
}
}
if (ctx->location_client)
{
cJSON_AddStringToObject(common_obj, "common_client_location", ctx->location_client);
}
if (ctx->location_server)
{
cJSON_AddStringToObject(common_obj, "common_server_location", ctx->location_server);
}
if (ctx->asn_client)
{
cJSON_AddStringToObject(common_obj, "common_client_asn", ctx->asn_client);
}
if (ctx->asn_server)
{
cJSON_AddStringToObject(common_obj, "common_server_asn", ctx->asn_server);
}
add_dns_info_to_log(common_obj, dns_info);
for (size_t i = 0; i < result_num; i++)
{
TFE_LOG_DEBUG(handle->local_logger, "URL: %s, policy_id: %d, service: %d, do_log:%d",
http->req->req_spec.url,
result[i].config_id,
result[i].service_id,
result[i].do_log);
if (result[i].do_log == 0)
{
continue;
}
per_hit_obj = cJSON_Duplicate(common_obj, 1);
cJSON_AddNumberToObject(per_hit_obj, "common_policy_id", result[i].config_id);
cJSON_AddNumberToObject(per_hit_obj, "common_service", result[i].service_id);
cJSON_AddNumberToObject(per_hit_obj, "common_action", LG_ACTION_MANIPULATE);
cJSON_AddStringToObject(per_hit_obj, "common_sub_action", "redirect");
log_payload = cJSON_PrintUnformatted(per_hit_obj);
TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload);
kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload));
free(log_payload);
cJSON_Delete(per_hit_obj);
if (kafka_status < 0)
{
TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
}
else
{
send_cnt++;
}
}
cJSON_Delete(common_obj);
return send_cnt;
}