日志接口调用总控提供的接口

This commit is contained in:
崔一鸣
2019-11-21 14:57:41 +08:00
parent 72e38dcdab
commit 08f6d8c428
6 changed files with 120 additions and 394 deletions

View File

@@ -36,20 +36,9 @@ keepalive_idle = 2
keepalive_intvl = 1
keepalive_cnt = 3
[send_logger]
switch = 0
kafka_topic = SESSION-RECORD-LOG
#kafka_brokerlist = 192.168.10.119:9092,192.168.10.122:9092,192.168.10.123:9092
kafka_brokerlist = 192.168.10.52:9092
[marsio]
appsym = knifw
[kafka]
queue.buffering.max.messages = 1000000
topic.metadata.refresh.interval.ms = 600000
security.protocol = MG
#128:bypass, 2: intercept
[dup_traffic]
switch = 1

View File

@@ -1,3 +1,3 @@
add_library(kni SHARED src/kni_entry.cpp src/kni_send_logger.cpp src/tfe_mgr.cpp src/kni_tun.cpp)
add_library(kni SHARED src/kni_entry.cpp src/tfe_mgr.cpp src/kni_tun.cpp)
target_include_directories(kni PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
target_link_libraries(kni common MESA_prof_load MESA_htable MESA_field_stat maatframe marsio uuid cjson rdkafka dabloom)

View File

@@ -1,5 +0,0 @@
#pragma once
struct kni_send_logger;
struct kni_send_logger* kni_send_logger_init(const char *profile, void *logger);
void kni_send_logger_destroy(struct kni_send_logger *handle);
int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len);

View File

@@ -0,0 +1,38 @@
#ifndef __TSG_SEND_LOG_H__
#define __TSG_SEND_LOG_H__
#include <MESA/Maat_rule.h>
typedef struct _tsg_log
{
int result_num;
Maat_rule_t *result;
struct streaminfo *a_stream;
}tsg_log_t;
typedef enum _tld_type
{
TLD_TYPE_UNKNOWN=0,
TLD_TYPE_LONG=1,
TLD_TYPE_STRING,
TLD_TYPE_FILE,
TLD_TYPE_MAX
}TLD_TYPE;
typedef void* TLD_handle_t;
typedef void* tsg_log_instance_t;
extern tsg_log_instance_t g_tsg_log_instance;
TLD_handle_t TLD_create(int thread_id);
int TLD_append(TLD_handle_t handle, char *key, void *value, TLD_TYPE type);
int TLD_cancel(TLD_handle_t handle);
int tsg_send_log(tsg_log_instance_t instance, TLD_handle_t handle, tsg_log_t *log_msg, int thread_id);
unsigned long long tsg_get_stream_id(struct streaminfo *a_stream);
#endif

View File

@@ -8,18 +8,17 @@ bypass: drome: pme_new_fail: destroy_pme
giveme: policy: destroy_pme + send_log
dup_traffic: destroy_pme + send_log
*/
#define __STDC_FORMAT_MACROS
#include "kni_utils.h"
#include "marsio.h"
#include "MESA/stream_inc/sapp_inject.h"
#include "kni_cmsg.h"
#include "uuid/uuid.h"
#include "cjson/cJSON.h"
#include "kni_send_logger.h"
#include <linux/if_ether.h>
#include <signal.h>
#include <inttypes.h>
#include "tfe_mgr.h"
#include "tsg_rule.h"
#include "tsg_send_log.h"
//#include "tsg_rule.h"
#ifdef __cplusplus
@@ -55,20 +54,12 @@ enum intercept_error{
/* action
0x00: none
0x01: monitor
0x02: intercept
0x10: reject
0x30: Manipulate
0x60: steer
0x80: bypass
*/
enum kni_action{
KNI_ACTION_NONE = 0x00,
KNI_ACTION_MONITOR = 0x01,
KNI_ACTION_INTERCEPT = 0x02,
KNI_ACTION_REJECT = 0x10,
KNI_ACTION_MANIPULATE = 0x30,
KNI_ACTION_STEER = 0x60,
KNI_ACTION_BYPASS = 0x80
};
@@ -102,7 +93,7 @@ struct pme_info{
int tfe_id;
pthread_mutex_t lock;
enum intercept_error intcp_error;
char stream_traceid[STREAM_TRACEID_LEN];
char stream_traceid[24];
//cjson check protocol
union{
char host[MAX_DOAMIN_LEN]; //http only
@@ -113,19 +104,12 @@ struct pme_info{
int tfe_release;
int sapp_release;
//kafka log
struct layer_addr *addr;
unsigned char dir;
uint64_t server_bytes;
uint64_t client_bytes;
uint64_t server_pkts;
uint64_t client_pkts;
struct timespec start_time;
struct timespec end_time;
uint64_t con_duration_ms;
const struct streaminfo *stream;
int maat_result_num;
Maat_rule_t maat_result;
//from tfe, kafka log
uint64_t intercept_state;
uint64_t pinningst; //defalut 0
uint64_t ssl_intercept_state;
uint64_t ssl_pinningst; //defalut 0
uint64_t ssl_server_side_latency;
uint64_t ssl_client_side_latency;
char ssl_server_side_version[KNI_SYMBOL_MAX];
@@ -134,7 +118,7 @@ struct pme_info{
char ssl_error[KNI_STRING_MAX];
//for dup traffic detect
int has_dup_traffic;
uint64_t has_dup_traffic;
int has_dup_syn;
int has_dup_syn_ack;
struct dup_traffic_dabloom_key *syn_packet;
@@ -195,7 +179,6 @@ struct kni_handle{
struct kni_send_logger *send_logger;
MESA_htable_handle traceid2pme_htable;
struct per_thread_handle *threads_handle;
uint32_t local_ipv4;
void *local_logger;
struct tfe_mgr *_tfe_mgr;
int thread_count;
@@ -269,9 +252,6 @@ static void pme_info_destroy(void *data){
struct pme_info *pmeinfo = (struct pme_info *)data;
void *logger = g_kni_handle->local_logger;
if(pmeinfo != NULL){
//free layer_addr
layer_addr_free(pmeinfo->addr);
pmeinfo->addr=NULL;
//free lock
pthread_mutex_destroy(&(pmeinfo->lock));
//free syn/syn_ack
@@ -289,15 +269,11 @@ static void pme_info_destroy(void *data){
static int pme_info_init(struct pme_info *pmeinfo, const struct streaminfo *stream, int thread_seq){
void *logger = g_kni_handle->local_logger;
pmeinfo->stream = stream;
pmeinfo->addr_type = (enum addr_type_t)stream->addr.addrtype;
pmeinfo->ssl_cert_verify = -1;
//uuid_t uu;
//uuid_generate_random(uu);
//uuid_unparse(uu, pmeinfo->stream_traceid);
clock_gettime(CLOCK_REALTIME, &(pmeinfo->start_time));
snprintf(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid), "%d%lld.%.9ld",
thread_seq, (long long)pmeinfo->start_time.tv_sec, pmeinfo->start_time.tv_nsec);
pmeinfo->addr = layer_addr_dup(&(stream->addr));
uint64_t traceid = tsg_get_stream_id((struct streaminfo*)stream);
snprintf(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid), "%" PRIu64 , traceid);
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
kni_addr_trans_v6(stream->addr.tuple4_v6, pmeinfo->stream_addr, sizeof(pmeinfo->stream_addr));
}
@@ -313,156 +289,79 @@ static int pme_info_init(struct pme_info *pmeinfo, const struct streaminfo *stre
return 0;
}
static int log_generate(struct pme_info *pmeinfo, void *local_logger){
//create cjson
cJSON *log_obj = cJSON_CreateObject();
//stream_traceid
cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_traceid);
//policy_id
cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id);
//action
cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action);
//service
cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service);
//start_time
cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time.tv_sec);
if(pmeinfo->intcp_error >= 0){
//end_time
cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time.tv_sec);
//con_duration_ms
cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time.tv_sec - pmeinfo->start_time.tv_sec) * 1000
+ (pmeinfo->end_time.tv_nsec - pmeinfo->start_time.tv_nsec) / 1000000);
}
//stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port
const struct layer_addr *addr = pmeinfo->addr;
char client_ip_str[INET6_ADDRSTRLEN] = "";
char server_ip_str[INET6_ADDRSTRLEN] = "";
switch(addr->addrtype){
case ADDR_TYPE_IPV4:
cJSON_AddNumberToObject(log_obj, "addr_type", 4);
inet_ntop(AF_INET, &(addr->tuple4_v4->saddr), client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET, &(addr->tuple4_v4->daddr), server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP");
break;
case ADDR_TYPE_IPV6:
cJSON_AddNumberToObject(log_obj, "addr_type", 6);
inet_ntop(AF_INET6, addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET6, addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP");
break;
default:
break;
}
//entrance_id: 0
cJSON_AddNumberToObject(log_obj, "entrance_id", 0);
//device_id: 0
cJSON_AddNumberToObject(log_obj, "device_id", 0);
//link_id: 0
cJSON_AddNumberToObject(log_obj, "link_id", 0);
//isp: null
cJSON_AddStringToObject(log_obj, "isp", "");
//encap_type: from sapp, 先填0
cJSON_AddNumberToObject(log_obj, "encap_type", 0);
//pinning state: from tfe
cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst);
//intercept state: from tfe
cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state);
//ssl upstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency);
//ssl downstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency);
//ssl upstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version);
//ssl downstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version);
//ssl cert verify
if(pmeinfo->ssl_cert_verify != -1){
cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify);
}
//direction: 0
cJSON_AddNumberToObject(log_obj, "direction", 0);
//stream_dir: from sapp
cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->dir);
//cap_ip: kni ip
char local_ipv4_str[INET6_ADDRSTRLEN];
inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str));
cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str);
//addr_list
cJSON_AddStringToObject(log_obj, "addr_list", "");
//host: http_only
if(pmeinfo->protocol == PROTO_HTTP){
cJSON_AddStringToObject(log_obj, "host", pmeinfo->domain.host);
}
//sni: ssl only
if(pmeinfo->protocol == PROTO_SSL){
cJSON_AddStringToObject(log_obj, "sni", pmeinfo->domain.sni);
}
//c2s_pkt_num
cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts);
//s2c_pkt_num
cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->client_pkts);
//c2s_byte_num
cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->server_bytes);
//s2c_byte_num
cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->client_bytes);
/*keys:
common: common_has_dup_traffic, common_stream_error
http: http_host
ssl: ssl_sni, ssl_pinningst, ssl_intercept_state, ssl_server_side_latency, ssl_client_side_latency, ssl_server_side_version, ssl_client_side_version,
ssl_cert_verify
*/
static int log_generate(struct pme_info *pmeinfo){
void *local_logger = g_kni_handle->local_logger;
TLD_handle_t tld_handle = TLD_create(-1);
//common
//schema_type
TLD_append(tld_handle, (char*)"common_schema_type", (void*)(pmeinfo->protocol == PROTO_SSL ? "SSL" : "HTTP"), TLD_TYPE_STRING);
//dup_traffic
cJSON_AddNumberToObject(log_obj, "has_dup_traffic", pmeinfo->has_dup_traffic);
TLD_append(tld_handle, (char*)"common_has_dup_traffic", (void*)pmeinfo->has_dup_traffic, TLD_TYPE_LONG);
//intercept_error
if(pmeinfo->intcp_error < 0){
char *stream_errmsg = stream_errmsg_session_record(pmeinfo->intcp_error);
cJSON_AddStringToObject(log_obj, "intercept_error", stream_errmsg);
TLD_append(tld_handle, (char*)"common_stream_error", (void*)stream_errmsg, TLD_TYPE_STRING);
}
int ret = -1;
char *log_msg = cJSON_PrintUnformatted(log_obj);
cJSON_Delete(log_obj);
if(log_msg == NULL){
KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid = %s", pmeinfo->stream_traceid);
goto error_out;
//ssl
if(pmeinfo->protocol == PROTO_SSL){
TLD_append(tld_handle, (char*)"ssl_sni", (void*)pmeinfo->domain.sni, TLD_TYPE_STRING);
//pinning state: from tfe
TLD_append(tld_handle, (char*)"ssl_pinningst", (void*)pmeinfo->ssl_pinningst, TLD_TYPE_LONG);
//intercept state: from tfe
TLD_append(tld_handle, (char*)"ssl_intercept_state", (void*)pmeinfo->ssl_intercept_state, TLD_TYPE_LONG);
//ssl upstream latency: from tfe
TLD_append(tld_handle, (char*)"ssl_server_side_latency", (void*)pmeinfo->ssl_server_side_latency, TLD_TYPE_LONG);
//ssl downstream latency: from tfe
TLD_append(tld_handle, (char*)"ssl_client_side_latency", (void*)pmeinfo->ssl_client_side_latency, TLD_TYPE_LONG);
//ssl upstream version: from tfe
TLD_append(tld_handle, (char*)"ssl_server_side_version", (void*)pmeinfo->ssl_server_side_version, TLD_TYPE_STRING);
//ssl downstream version: from tfe
TLD_append(tld_handle, (char*)"ssl_client_side_version", (void*)pmeinfo->ssl_client_side_version, TLD_TYPE_STRING);
//ssl cert verify
if(pmeinfo->ssl_cert_verify != -1){
TLD_append(tld_handle, (char*)"ssl_cert_verify", (void*)pmeinfo->ssl_cert_verify, TLD_TYPE_LONG);
}
//local log
KNI_LOG_DEBUG(local_logger, "log_msg = %s\n", log_msg);
//sendto kafka
ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg));
}
//host
if(pmeinfo->protocol == PROTO_HTTP){
TLD_append(tld_handle, (char*)"http_host", (void*)pmeinfo->domain.host, TLD_TYPE_STRING);
}
tsg_log_t log_msg;
memset(&log_msg, 0, sizeof(log_msg));
log_msg.result_num = pmeinfo->maat_result_num;
log_msg.result = &(pmeinfo->maat_result);
log_msg.a_stream = (struct streaminfo*)pmeinfo->stream;
int ret = tsg_send_log(g_tsg_log_instance, tld_handle, &log_msg, -1);
if(ret < 0){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(local_logger, "Failed at sendlog_to_kafka, ret = %d, strem_traceid = %s",
KNI_LOG_ERROR(local_logger, "Failed at sendlog, ret = %d, strem_traceid = %s",
ret, pmeinfo->stream_traceid);
goto error_out;
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
cJSON_free(log_msg);
return 0;
error_out:
if(log_msg != NULL){
cJSON_free(log_msg);
}
return -1;
}
static void stream_destroy(struct pme_info *pmeinfo, int do_log){
static void stream_destroy(struct pme_info *pmeinfo){
//sendlog
void *logger = g_kni_handle->local_logger;
int ret;
if(do_log == 1){
ret = log_generate(pmeinfo, logger);
int ret = log_generate(pmeinfo);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
else{
KNI_LOG_DEBUG(logger, "Succeed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
}
//free pme
pme_info_destroy(pmeinfo);
}
@@ -1142,27 +1041,15 @@ static int dabloom_search(struct pkt_info *pktinfo, int thread_seq){
/* action
0x00: none
0x01: monitor
0x02: intercept
0x10: reject
0x30: Manipulate
0x60: steer
0x80: bypass
*/
char* kni_maat_action_trans(enum kni_action action){
switch(action){
case 0x00:
return (char*)"none";
case 0x01:
return (char*)"monitor";
case 0x02:
return (char*)"intercept";
case 0x10:
return (char*)"reject";
case 0x30:
return (char*)"manipulate";
case 0x60:
return (char*)"steer";
case 0x80:
return (char*)"bypass";
default:
@@ -1233,11 +1120,10 @@ void next_data_intercept(struct pme_info *pmeinfo, const void *a_packet, struct
char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){
//first data packet, get action
void *logger = g_kni_handle->local_logger;
struct Maat_rule_t result;
int maat_hit = 0;
int ret = 0;
struct _identify_info identify_info;
ret = tsg_pull_policy_result(stream, PULL_KNI_RESULT, &result, 1, &identify_info);
ret = tsg_pull_policy_result(stream, PULL_KNI_RESULT, &(pmeinfo->maat_result), 1, &identify_info);
//ret == 0, bypass and dropme
if(ret == 0){
pmeinfo->action = KNI_ACTION_NONE;
@@ -1246,12 +1132,13 @@ char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, str
pmeinfo->stream_addr, (char*)&(pmeinfo->domain), maat_hit, pmeinfo->stream_traceid);
}
else{
pmeinfo->maat_result_num = 1;
pmeinfo->protocol = identify_info.proto;
pmeinfo->domain_len = MIN(identify_info.domain_len, (int)sizeof(pmeinfo->domain) - 1);
strncpy(pmeinfo->domain.sni, identify_info.domain, pmeinfo->domain_len);
pmeinfo->action = (enum kni_action)(result.action);
pmeinfo->policy_id = result.config_id;
pmeinfo->do_log = result.do_log;
pmeinfo->action = (enum kni_action)(pmeinfo->maat_result.action);
pmeinfo->policy_id = pmeinfo->maat_result.config_id;
pmeinfo->do_log = pmeinfo->maat_result.do_log;
maat_hit = 1;
char *action_str = kni_maat_action_trans(pmeinfo->action);
KNI_LOG_INFO(logger, "intercept_policy_scan: %s, %s, maat_hit = %d, policy_id = %d, action = %d(%s), stream traceid = %s",
@@ -1259,7 +1146,7 @@ char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, str
}
switch(pmeinfo->action){
case KNI_ACTION_INTERCEPT:
pmeinfo->intercept_state = 1;
pmeinfo->ssl_intercept_state = 1;
return first_data_intercept(stream, pmeinfo, pktinfo, thread_seq);
default:
//action != interceptbypass and dropme
@@ -1268,14 +1155,6 @@ char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, str
}
static char data_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){
//pmeinfo->tfe_release = 1: intercept, tfe end first. DO NOT droppkt and dropme
if(pmeinfo->tfe_release == 1){
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum;
pmeinfo->dir=stream->dir;
}
//parse ipv4/6 header
struct pkt_info pktinfo;
memset(&pktinfo, 0, sizeof(pktinfo));
@@ -1320,12 +1199,6 @@ static char data_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, co
static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, int thread_seq){
//close: a_packet = null, do not sendto tfe
clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time));
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum;
pmeinfo->dir=stream->dir;
switch(pmeinfo->action){
case KNI_ACTION_INTERCEPT:
//reset clock: when sapp end, start clock
@@ -1418,23 +1291,23 @@ extern "C" char kni_tcpall_entry(struct streaminfo *stream, void** pme, int thre
if((ret & APP_STATE_DROPME)){
if(pmeinfo->action != KNI_ACTION_INTERCEPT){
if(pmeinfo != NULL){
stream_destroy(pmeinfo, 0);
stream_destroy(pmeinfo);
}
}
else{
if(pmeinfo->intcp_error < 0){
pmeinfo->intercept_state = 0;
pmeinfo->ssl_intercept_state = 0;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_INTCPERR], 0, FS_OP_ADD, 1);
if(pmeinfo != NULL){
//pmeinfo->policy_id = -1;
stream_destroy(pmeinfo, 1);
stream_destroy(pmeinfo);
}
}
else{
can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_SAPP);
if(can_destroy == 1){
traceid2pme_htable_del(pmeinfo);
stream_destroy(pmeinfo, pmeinfo->do_log);
stream_destroy(pmeinfo);
}
}
}
@@ -1587,7 +1460,7 @@ static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg,
switch(type)
{
case TFE_CMSG_SSL_INTERCEPT_STATE:
memcpy((char*)&(pmeinfo->intercept_state), value, value_size);
memcpy((char*)&(pmeinfo->ssl_intercept_state), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size);
@@ -1602,7 +1475,7 @@ static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg,
memcpy(pmeinfo->ssl_client_side_version, value, value_size);
break;
case TFE_CMSG_SSL_PINNING_STATE:
memcpy((char*)&(pmeinfo->pinningst), value, value_size);
memcpy((char*)&(pmeinfo->ssl_pinningst), value, value_size);
break;
case TFE_CMSG_SSL_CERT_VERIFY:
memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size);
@@ -1623,20 +1496,19 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size
struct pme_info *pmeinfo = (struct pme_info*)data;
int can_destroy;
if(pmeinfo != NULL){
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->ssl_intercept_state), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->ssl_pinningst), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger);
clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time));
KNI_LOG_DEBUG(logger, "recv cmsg from tfe, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_TFE);
if(can_destroy == 1){
traceid2pme_htable_del(pmeinfo);
stream_destroy(pmeinfo, pmeinfo->do_log);
stream_destroy(pmeinfo);
}
}
kni_cmsg_destroy(cmsg);
@@ -1976,7 +1848,7 @@ static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
if(can_destroy == 1){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_CNT], 0, FS_OP_ADD, -1);
stream_destroy(pmeinfo, pmeinfo->do_log);
stream_destroy(pmeinfo);
return 1;
}
}
@@ -2042,7 +1914,6 @@ extern "C" int kni_init(){
char log_path[KNI_PATH_MAX] = "";
int tfe_node_count = 1;
char manage_eth[KNI_SYMBOL_MAX] = "";
struct kni_send_logger *send_logger = NULL;
struct kni_field_stat_handle *fs_handle = NULL;
void *local_logger = NULL;
int log_level = -1;
@@ -2173,21 +2044,6 @@ extern "C" int kni_init(){
}
g_kni_fs_handle = fs_handle;
//init local_ipv4
ret = kni_ipv4_addr_get_by_eth(manage_eth, &(g_kni_handle->local_ipv4));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth = %s", manage_eth);
goto error_out;
}
//init kni_send_logger
send_logger = kni_send_logger_init(profile, local_logger);
if(send_logger == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", manage_eth);
goto error_out;
}
g_kni_handle->send_logger = send_logger;
//init traceid2pme_htable
struct kni_htable_opt opt;
memset(&opt, 0, sizeof(opt));

View File

@@ -1,152 +0,0 @@
#include "kni_utils.h"
#include "kni_send_logger.h"
#include "librdkafka/rdkafka.h"
struct kni_send_logger{
int sendlog_switch;
rd_kafka_t *kafka_handle;
rd_kafka_topic_t *kafka_topic;
void *local_logger;
};
static rd_kafka_t* kafka_init(const char *profile, void *logger){
rd_kafka_t *kafka_handle = NULL;
rd_kafka_conf_t *rdkafka_conf = NULL;
char kafka_errstr[1024];
const char *section = "kafka";
char queue_buffering_max_messages[KNI_SYMBOL_MAX] = "";
char topic_metadata_refresh_interval_ms[KNI_SYMBOL_MAX] = "";
char security_protocol[KNI_SYMBOL_MAX] = "";
int ret = MESA_load_profile_string_nodef(profile, section, "queue.buffering.max.messages",
queue_buffering_max_messages, sizeof(queue_buffering_max_messages));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: queue.buffering.max.messages not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "topic.metadata.refresh.interval.ms",
topic_metadata_refresh_interval_ms, sizeof(topic_metadata_refresh_interval_ms));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: topic.metadata.refresh.interval.ms not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "security.protocol", security_protocol, sizeof(security_protocol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: security.protocol not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n queue.buffering.max.messages: %s\n topic.metadata.refresh.interval.ms: %s\n"
"security.protocol: %s", "kafka", queue_buffering_max_messages, topic_metadata_refresh_interval_ms, security_protocol);
rdkafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", queue_buffering_max_messages, kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", topic_metadata_refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr));
rd_kafka_conf_set(rdkafka_conf, "security.protocol", security_protocol, kafka_errstr, sizeof(kafka_errstr));
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr));
rdkafka_conf = NULL;
if(kafka_handle == NULL){
goto error_out;
}
return kafka_handle;
error_out:
if(rdkafka_conf != NULL){
rd_kafka_conf_destroy(rdkafka_conf);
rdkafka_conf = NULL;
}
if(kafka_handle != NULL){
rd_kafka_destroy(kafka_handle);
kafka_handle = NULL;
}
return NULL;
}
void kni_send_logger_destroy(struct kni_send_logger *handle){
if(handle != NULL){
if(handle->kafka_topic != NULL){
rd_kafka_topic_destroy(handle->kafka_topic);
handle->kafka_topic = NULL;
}
if(handle->kafka_handle != NULL){
rd_kafka_destroy(handle->kafka_handle);
handle->kafka_handle = NULL;
}
FREE(&handle);
}
}
struct kni_send_logger* kni_send_logger_init(const char *profile, void *local_logger){
struct kni_send_logger *handle = NULL;
const char *section = "send_logger";
int sendlog_switch = -1;
char kafka_topic[KNI_SYMBOL_MAX] = "";
char kafka_brokerlist[KNI_SYMBOL_MAX] = "";
rd_kafka_t *kafka_handle = NULL;
rd_kafka_topic_t *topic = NULL;
int ret = MESA_load_profile_int_nodef(profile, section, "switch", &sendlog_switch);
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: switch not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "kafka_topic", kafka_topic, sizeof(kafka_topic));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_topic not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "kafka_brokerlist", kafka_brokerlist, sizeof(kafka_brokerlist));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_brokerlist not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n switch: %d\n kafka_topic: %s\n, kafka_brokerlist: %s",
section, sendlog_switch, kafka_topic, kafka_brokerlist);
handle = ALLOC(struct kni_send_logger, 1);
handle->local_logger = local_logger;
//sendlog_switch = 0, do not sendto kafka
if(sendlog_switch == 0){
handle->sendlog_switch = 0;
return handle;
}
handle->sendlog_switch = 1;
//init kafka
kafka_handle = kafka_init(profile, local_logger);
if(kafka_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kafka");
goto error_out;
}
handle->kafka_handle = kafka_handle;
//kafka_brokerlist
ret = rd_kafka_brokers_add(kafka_handle, kafka_brokerlist);
if(ret == 0){
KNI_LOG_ERROR(local_logger, "Failed at add kafka_brokers");
goto error_out;
}
//kafka topic
topic = rd_kafka_topic_new(kafka_handle, kafka_topic, NULL);
if(topic == NULL){
KNI_LOG_ERROR(local_logger, "Failed at new kafka topic");
goto error_out;
}
handle->kafka_topic = topic;
return handle;
error_out:
kni_send_logger_destroy(handle);
return NULL;
}
int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len){
if(handle->sendlog_switch == 0){
return 0;
}
void *logger = handle->local_logger;
//kafka produce
int kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
log_msg, log_msg_len, NULL, 0, NULL);
if(kafka_status < 0){
KNI_LOG_ERROR(logger, "Kafka: Failed to produce, error is %s", rd_kafka_err2name(rd_kafka_last_error()));
return -1;
}
return 0;
}