diff --git a/conf/sapp/kni/kni.conf b/conf/sapp/kni/kni.conf index d892a51..9a8506e 100644 --- a/conf/sapp/kni/kni.conf +++ b/conf/sapp/kni/kni.conf @@ -36,20 +36,9 @@ keepalive_idle = 2 keepalive_intvl = 1 keepalive_cnt = 3 -[send_logger] -switch = 0 -kafka_topic = SESSION-RECORD-LOG -#kafka_brokerlist = 192.168.10.119:9092,192.168.10.122:9092,192.168.10.123:9092 -kafka_brokerlist = 192.168.10.52:9092 - [marsio] appsym = knifw -[kafka] -queue.buffering.max.messages = 1000000 -topic.metadata.refresh.interval.ms = 600000 -security.protocol = MG - #128:bypass, 2: intercept [dup_traffic] switch = 1 diff --git a/entry/CMakeLists.txt b/entry/CMakeLists.txt index d3d584d..a0f7709 100644 --- a/entry/CMakeLists.txt +++ b/entry/CMakeLists.txt @@ -1,3 +1,3 @@ -add_library(kni SHARED src/kni_entry.cpp src/kni_send_logger.cpp src/tfe_mgr.cpp src/kni_tun.cpp) +add_library(kni SHARED src/kni_entry.cpp src/tfe_mgr.cpp src/kni_tun.cpp) target_include_directories(kni PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(kni common MESA_prof_load MESA_htable MESA_field_stat maatframe marsio uuid cjson rdkafka dabloom) \ No newline at end of file diff --git a/entry/include/kni_send_logger.h b/entry/include/kni_send_logger.h deleted file mode 100644 index df62127..0000000 --- a/entry/include/kni_send_logger.h +++ /dev/null @@ -1,5 +0,0 @@ -#pragma once -struct kni_send_logger; -struct kni_send_logger* kni_send_logger_init(const char *profile, void *logger); -void kni_send_logger_destroy(struct kni_send_logger *handle); -int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len); \ No newline at end of file diff --git a/entry/include/tsg_send_log.h b/entry/include/tsg_send_log.h new file mode 100644 index 0000000..b3a4b5c --- /dev/null +++ b/entry/include/tsg_send_log.h @@ -0,0 +1,38 @@ +#ifndef __TSG_SEND_LOG_H__ +#define __TSG_SEND_LOG_H__ + +#include + + +typedef struct _tsg_log +{ + int result_num; + Maat_rule_t *result; + struct streaminfo *a_stream; +}tsg_log_t; + +typedef enum _tld_type +{ + TLD_TYPE_UNKNOWN=0, + TLD_TYPE_LONG=1, + TLD_TYPE_STRING, + TLD_TYPE_FILE, + TLD_TYPE_MAX +}TLD_TYPE; + + +typedef void* TLD_handle_t; +typedef void* tsg_log_instance_t; + +extern tsg_log_instance_t g_tsg_log_instance; + +TLD_handle_t TLD_create(int thread_id); +int TLD_append(TLD_handle_t handle, char *key, void *value, TLD_TYPE type); +int TLD_cancel(TLD_handle_t handle); + +int tsg_send_log(tsg_log_instance_t instance, TLD_handle_t handle, tsg_log_t *log_msg, int thread_id); + +unsigned long long tsg_get_stream_id(struct streaminfo *a_stream); + + +#endif diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index acc4f59..f9f449c 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -8,18 +8,17 @@ bypass: drome: pme_new_fail: destroy_pme giveme: policy: destroy_pme + send_log dup_traffic: destroy_pme + send_log */ - +#define __STDC_FORMAT_MACROS #include "kni_utils.h" #include "marsio.h" #include "MESA/stream_inc/sapp_inject.h" #include "kni_cmsg.h" -#include "uuid/uuid.h" -#include "cjson/cJSON.h" -#include "kni_send_logger.h" #include #include +#include #include "tfe_mgr.h" #include "tsg_rule.h" +#include "tsg_send_log.h" //#include "tsg_rule.h" #ifdef __cplusplus @@ -55,20 +54,12 @@ enum intercept_error{ /* action 0x00: none - 0x01: monitor 0x02: intercept - 0x10: reject - 0x30: Manipulate - 0x60: steer 0x80: bypass */ enum kni_action{ KNI_ACTION_NONE = 0x00, - KNI_ACTION_MONITOR = 0x01, KNI_ACTION_INTERCEPT = 0x02, - KNI_ACTION_REJECT = 0x10, - KNI_ACTION_MANIPULATE = 0x30, - KNI_ACTION_STEER = 0x60, KNI_ACTION_BYPASS = 0x80 }; @@ -102,7 +93,7 @@ struct pme_info{ int tfe_id; pthread_mutex_t lock; enum intercept_error intcp_error; - char stream_traceid[STREAM_TRACEID_LEN]; + char stream_traceid[24]; //cjson check protocol union{ char host[MAX_DOAMIN_LEN]; //http only @@ -113,19 +104,12 @@ struct pme_info{ int tfe_release; int sapp_release; //kafka log - struct layer_addr *addr; - unsigned char dir; - uint64_t server_bytes; - uint64_t client_bytes; - uint64_t server_pkts; - uint64_t client_pkts; - - struct timespec start_time; - struct timespec end_time; - uint64_t con_duration_ms; + const struct streaminfo *stream; + int maat_result_num; + Maat_rule_t maat_result; //from tfe, kafka log - uint64_t intercept_state; - uint64_t pinningst; //defalut 0 + uint64_t ssl_intercept_state; + uint64_t ssl_pinningst; //defalut 0 uint64_t ssl_server_side_latency; uint64_t ssl_client_side_latency; char ssl_server_side_version[KNI_SYMBOL_MAX]; @@ -134,7 +118,7 @@ struct pme_info{ char ssl_error[KNI_STRING_MAX]; //for dup traffic detect - int has_dup_traffic; + uint64_t has_dup_traffic; int has_dup_syn; int has_dup_syn_ack; struct dup_traffic_dabloom_key *syn_packet; @@ -195,7 +179,6 @@ struct kni_handle{ struct kni_send_logger *send_logger; MESA_htable_handle traceid2pme_htable; struct per_thread_handle *threads_handle; - uint32_t local_ipv4; void *local_logger; struct tfe_mgr *_tfe_mgr; int thread_count; @@ -269,9 +252,6 @@ static void pme_info_destroy(void *data){ struct pme_info *pmeinfo = (struct pme_info *)data; void *logger = g_kni_handle->local_logger; if(pmeinfo != NULL){ - //free layer_addr - layer_addr_free(pmeinfo->addr); - pmeinfo->addr=NULL; //free lock pthread_mutex_destroy(&(pmeinfo->lock)); //free syn/syn_ack @@ -289,15 +269,11 @@ static void pme_info_destroy(void *data){ static int pme_info_init(struct pme_info *pmeinfo, const struct streaminfo *stream, int thread_seq){ void *logger = g_kni_handle->local_logger; + pmeinfo->stream = stream; pmeinfo->addr_type = (enum addr_type_t)stream->addr.addrtype; pmeinfo->ssl_cert_verify = -1; - //uuid_t uu; - //uuid_generate_random(uu); - //uuid_unparse(uu, pmeinfo->stream_traceid); - clock_gettime(CLOCK_REALTIME, &(pmeinfo->start_time)); - snprintf(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid), "%d%lld.%.9ld", - thread_seq, (long long)pmeinfo->start_time.tv_sec, pmeinfo->start_time.tv_nsec); - pmeinfo->addr = layer_addr_dup(&(stream->addr)); + uint64_t traceid = tsg_get_stream_id((struct streaminfo*)stream); + snprintf(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid), "%" PRIu64 , traceid); if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ kni_addr_trans_v6(stream->addr.tuple4_v6, pmeinfo->stream_addr, sizeof(pmeinfo->stream_addr)); } @@ -313,155 +289,78 @@ static int pme_info_init(struct pme_info *pmeinfo, const struct streaminfo *stre return 0; } -static int log_generate(struct pme_info *pmeinfo, void *local_logger){ - //create cjson - cJSON *log_obj = cJSON_CreateObject(); - //stream_traceid - cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_traceid); - //policy_id - cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id); - //action - cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action); - //service - cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service); - //start_time - cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time.tv_sec); - if(pmeinfo->intcp_error >= 0){ - //end_time - cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time.tv_sec); - //con_duration_ms - cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time.tv_sec - pmeinfo->start_time.tv_sec) * 1000 - + (pmeinfo->end_time.tv_nsec - pmeinfo->start_time.tv_nsec) / 1000000); - } - //stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port - const struct layer_addr *addr = pmeinfo->addr; - char client_ip_str[INET6_ADDRSTRLEN] = ""; - char server_ip_str[INET6_ADDRSTRLEN] = ""; - switch(addr->addrtype){ - case ADDR_TYPE_IPV4: - cJSON_AddNumberToObject(log_obj, "addr_type", 4); - inet_ntop(AF_INET, &(addr->tuple4_v4->saddr), client_ip_str, sizeof(client_ip_str)); - inet_ntop(AF_INET, &(addr->tuple4_v4->daddr), server_ip_str, sizeof(server_ip_str)); - cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str); - cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str); - cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source)); - cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest)); - cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP"); - break; - case ADDR_TYPE_IPV6: - cJSON_AddNumberToObject(log_obj, "addr_type", 6); - inet_ntop(AF_INET6, addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str)); - inet_ntop(AF_INET6, addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str)); - cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str); - cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str); - cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source)); - cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest)); - cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP"); - break; - default: - break; - } - //entrance_id: 0 - cJSON_AddNumberToObject(log_obj, "entrance_id", 0); - //device_id: 0 - cJSON_AddNumberToObject(log_obj, "device_id", 0); - //link_id: 0 - cJSON_AddNumberToObject(log_obj, "link_id", 0); - //isp: null - cJSON_AddStringToObject(log_obj, "isp", ""); - //encap_type: from sapp, 先填0 - cJSON_AddNumberToObject(log_obj, "encap_type", 0); - - //pinning state: from tfe - cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst); - //intercept state: from tfe - cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state); - //ssl upstream latency: from tfe - cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency); - //ssl downstream latency: from tfe - cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency); - //ssl upstream version: from tfe - cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version); - //ssl downstream version: from tfe - cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version); - //ssl cert verify - if(pmeinfo->ssl_cert_verify != -1){ - cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify); - } - - //direction: 0 - cJSON_AddNumberToObject(log_obj, "direction", 0); - //stream_dir: from sapp - cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->dir); - //cap_ip: kni ip - char local_ipv4_str[INET6_ADDRSTRLEN]; - inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str)); - cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str); - //addr_list - cJSON_AddStringToObject(log_obj, "addr_list", ""); - //host: http_only - if(pmeinfo->protocol == PROTO_HTTP){ - cJSON_AddStringToObject(log_obj, "host", pmeinfo->domain.host); - } - //sni: ssl only - if(pmeinfo->protocol == PROTO_SSL){ - cJSON_AddStringToObject(log_obj, "sni", pmeinfo->domain.sni); - } - //c2s_pkt_num - cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts); - //s2c_pkt_num - cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->client_pkts); - //c2s_byte_num - cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->server_bytes); - //s2c_byte_num - cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->client_bytes); +/*keys: +common: common_has_dup_traffic, common_stream_error +http: http_host +ssl: ssl_sni, ssl_pinningst, ssl_intercept_state, ssl_server_side_latency, ssl_client_side_latency, ssl_server_side_version, ssl_client_side_version, + ssl_cert_verify +*/ +static int log_generate(struct pme_info *pmeinfo){ + void *local_logger = g_kni_handle->local_logger; + TLD_handle_t tld_handle = TLD_create(-1); + //common + //schema_type + TLD_append(tld_handle, (char*)"common_schema_type", (void*)(pmeinfo->protocol == PROTO_SSL ? "SSL" : "HTTP"), TLD_TYPE_STRING); //dup_traffic - cJSON_AddNumberToObject(log_obj, "has_dup_traffic", pmeinfo->has_dup_traffic); + TLD_append(tld_handle, (char*)"common_has_dup_traffic", (void*)pmeinfo->has_dup_traffic, TLD_TYPE_LONG); //intercept_error if(pmeinfo->intcp_error < 0){ char *stream_errmsg = stream_errmsg_session_record(pmeinfo->intcp_error); - cJSON_AddStringToObject(log_obj, "intercept_error", stream_errmsg); + TLD_append(tld_handle, (char*)"common_stream_error", (void*)stream_errmsg, TLD_TYPE_STRING); } - int ret = -1; - char *log_msg = cJSON_PrintUnformatted(log_obj); - cJSON_Delete(log_obj); - if(log_msg == NULL){ - KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid = %s", pmeinfo->stream_traceid); - goto error_out; + + //ssl + if(pmeinfo->protocol == PROTO_SSL){ + TLD_append(tld_handle, (char*)"ssl_sni", (void*)pmeinfo->domain.sni, TLD_TYPE_STRING); + //pinning state: from tfe + TLD_append(tld_handle, (char*)"ssl_pinningst", (void*)pmeinfo->ssl_pinningst, TLD_TYPE_LONG); + //intercept state: from tfe + TLD_append(tld_handle, (char*)"ssl_intercept_state", (void*)pmeinfo->ssl_intercept_state, TLD_TYPE_LONG); + //ssl upstream latency: from tfe + TLD_append(tld_handle, (char*)"ssl_server_side_latency", (void*)pmeinfo->ssl_server_side_latency, TLD_TYPE_LONG); + //ssl downstream latency: from tfe + TLD_append(tld_handle, (char*)"ssl_client_side_latency", (void*)pmeinfo->ssl_client_side_latency, TLD_TYPE_LONG); + //ssl upstream version: from tfe + TLD_append(tld_handle, (char*)"ssl_server_side_version", (void*)pmeinfo->ssl_server_side_version, TLD_TYPE_STRING); + //ssl downstream version: from tfe + TLD_append(tld_handle, (char*)"ssl_client_side_version", (void*)pmeinfo->ssl_client_side_version, TLD_TYPE_STRING); + //ssl cert verify + if(pmeinfo->ssl_cert_verify != -1){ + TLD_append(tld_handle, (char*)"ssl_cert_verify", (void*)pmeinfo->ssl_cert_verify, TLD_TYPE_LONG); + } } - //local log - KNI_LOG_DEBUG(local_logger, "log_msg = %s\n", log_msg); - //sendto kafka - ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg)); + //host + if(pmeinfo->protocol == PROTO_HTTP){ + TLD_append(tld_handle, (char*)"http_host", (void*)pmeinfo->domain.host, TLD_TYPE_STRING); + } + tsg_log_t log_msg; + memset(&log_msg, 0, sizeof(log_msg)); + log_msg.result_num = pmeinfo->maat_result_num; + log_msg.result = &(pmeinfo->maat_result); + log_msg.a_stream = (struct streaminfo*)pmeinfo->stream; + int ret = tsg_send_log(g_tsg_log_instance, tld_handle, &log_msg, -1); if(ret < 0){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1); - KNI_LOG_ERROR(local_logger, "Failed at sendlog_to_kafka, ret = %d, strem_traceid = %s", + KNI_LOG_ERROR(local_logger, "Failed at sendlog, ret = %d, strem_traceid = %s", ret, pmeinfo->stream_traceid); goto error_out; } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1); - cJSON_free(log_msg); return 0; error_out: - if(log_msg != NULL){ - cJSON_free(log_msg); - } return -1; } -static void stream_destroy(struct pme_info *pmeinfo, int do_log){ +static void stream_destroy(struct pme_info *pmeinfo){ //sendlog void *logger = g_kni_handle->local_logger; - int ret; - if(do_log == 1){ - ret = log_generate(pmeinfo, logger); - if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr); - } - else{ - KNI_LOG_DEBUG(logger, "Succeed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr); - } + int ret = log_generate(pmeinfo); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr); + } + else{ + KNI_LOG_DEBUG(logger, "Succeed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr); } //free pme pme_info_destroy(pmeinfo); @@ -1142,27 +1041,15 @@ static int dabloom_search(struct pkt_info *pktinfo, int thread_seq){ /* action 0x00: none - 0x01: monitor 0x02: intercept - 0x10: reject - 0x30: Manipulate - 0x60: steer 0x80: bypass */ char* kni_maat_action_trans(enum kni_action action){ switch(action){ case 0x00: return (char*)"none"; - case 0x01: - return (char*)"monitor"; case 0x02: return (char*)"intercept"; - case 0x10: - return (char*)"reject"; - case 0x30: - return (char*)"manipulate"; - case 0x60: - return (char*)"steer"; case 0x80: return (char*)"bypass"; default: @@ -1233,11 +1120,10 @@ void next_data_intercept(struct pme_info *pmeinfo, const void *a_packet, struct char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){ //first data packet, get action void *logger = g_kni_handle->local_logger; - struct Maat_rule_t result; int maat_hit = 0; int ret = 0; struct _identify_info identify_info; - ret = tsg_pull_policy_result(stream, PULL_KNI_RESULT, &result, 1, &identify_info); + ret = tsg_pull_policy_result(stream, PULL_KNI_RESULT, &(pmeinfo->maat_result), 1, &identify_info); //ret == 0, bypass and dropme if(ret == 0){ pmeinfo->action = KNI_ACTION_NONE; @@ -1246,12 +1132,13 @@ char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, str pmeinfo->stream_addr, (char*)&(pmeinfo->domain), maat_hit, pmeinfo->stream_traceid); } else{ + pmeinfo->maat_result_num = 1; pmeinfo->protocol = identify_info.proto; pmeinfo->domain_len = MIN(identify_info.domain_len, (int)sizeof(pmeinfo->domain) - 1); strncpy(pmeinfo->domain.sni, identify_info.domain, pmeinfo->domain_len); - pmeinfo->action = (enum kni_action)(result.action); - pmeinfo->policy_id = result.config_id; - pmeinfo->do_log = result.do_log; + pmeinfo->action = (enum kni_action)(pmeinfo->maat_result.action); + pmeinfo->policy_id = pmeinfo->maat_result.config_id; + pmeinfo->do_log = pmeinfo->maat_result.do_log; maat_hit = 1; char *action_str = kni_maat_action_trans(pmeinfo->action); KNI_LOG_INFO(logger, "intercept_policy_scan: %s, %s, maat_hit = %d, policy_id = %d, action = %d(%s), stream traceid = %s", @@ -1259,7 +1146,7 @@ char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, str } switch(pmeinfo->action){ case KNI_ACTION_INTERCEPT: - pmeinfo->intercept_state = 1; + pmeinfo->ssl_intercept_state = 1; return first_data_intercept(stream, pmeinfo, pktinfo, thread_seq); default: //action != intercept,bypass and dropme @@ -1268,14 +1155,6 @@ char first_data_process(struct streaminfo *stream, struct pme_info *pmeinfo, str } static char data_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){ - //pmeinfo->tfe_release = 1: intercept, tfe end first. DO NOT droppkt and dropme - if(pmeinfo->tfe_release == 1){ - pmeinfo->server_bytes=stream->ptcpdetail->serverbytes; - pmeinfo->client_bytes=stream->ptcpdetail->clientbytes; - pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum; - pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum; - pmeinfo->dir=stream->dir; - } //parse ipv4/6 header struct pkt_info pktinfo; memset(&pktinfo, 0, sizeof(pktinfo)); @@ -1320,12 +1199,6 @@ static char data_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, co static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, int thread_seq){ //close: a_packet = null, do not sendto tfe - clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time)); - pmeinfo->server_bytes=stream->ptcpdetail->serverbytes; - pmeinfo->client_bytes=stream->ptcpdetail->clientbytes; - pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum; - pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum; - pmeinfo->dir=stream->dir; switch(pmeinfo->action){ case KNI_ACTION_INTERCEPT: //reset clock: when sapp end, start clock @@ -1418,23 +1291,23 @@ extern "C" char kni_tcpall_entry(struct streaminfo *stream, void** pme, int thre if((ret & APP_STATE_DROPME)){ if(pmeinfo->action != KNI_ACTION_INTERCEPT){ if(pmeinfo != NULL){ - stream_destroy(pmeinfo, 0); + stream_destroy(pmeinfo); } } else{ if(pmeinfo->intcp_error < 0){ - pmeinfo->intercept_state = 0; + pmeinfo->ssl_intercept_state = 0; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_INTCPERR], 0, FS_OP_ADD, 1); if(pmeinfo != NULL){ //pmeinfo->policy_id = -1; - stream_destroy(pmeinfo, 1); + stream_destroy(pmeinfo); } } else{ can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_SAPP); if(can_destroy == 1){ traceid2pme_htable_del(pmeinfo); - stream_destroy(pmeinfo, pmeinfo->do_log); + stream_destroy(pmeinfo); } } } @@ -1587,7 +1460,7 @@ static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, switch(type) { case TFE_CMSG_SSL_INTERCEPT_STATE: - memcpy((char*)&(pmeinfo->intercept_state), value, value_size); + memcpy((char*)&(pmeinfo->ssl_intercept_state), value, value_size); break; case TFE_CMSG_SSL_UPSTREAM_LATENCY: memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size); @@ -1602,7 +1475,7 @@ static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, memcpy(pmeinfo->ssl_client_side_version, value, value_size); break; case TFE_CMSG_SSL_PINNING_STATE: - memcpy((char*)&(pmeinfo->pinningst), value, value_size); + memcpy((char*)&(pmeinfo->ssl_pinningst), value, value_size); break; case TFE_CMSG_SSL_CERT_VERIFY: memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size); @@ -1623,20 +1496,19 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size struct pme_info *pmeinfo = (struct pme_info*)data; int can_destroy; if(pmeinfo != NULL){ - wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger); + wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->ssl_intercept_state), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger); - wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger); + wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->ssl_pinningst), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger); - clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time)); KNI_LOG_DEBUG(logger, "recv cmsg from tfe, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr); can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_TFE); if(can_destroy == 1){ traceid2pme_htable_del(pmeinfo); - stream_destroy(pmeinfo, pmeinfo->do_log); + stream_destroy(pmeinfo); } } kni_cmsg_destroy(cmsg); @@ -1976,7 +1848,7 @@ static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){ if(can_destroy == 1){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_CNT], 0, FS_OP_ADD, -1); - stream_destroy(pmeinfo, pmeinfo->do_log); + stream_destroy(pmeinfo); return 1; } } @@ -2042,7 +1914,6 @@ extern "C" int kni_init(){ char log_path[KNI_PATH_MAX] = ""; int tfe_node_count = 1; char manage_eth[KNI_SYMBOL_MAX] = ""; - struct kni_send_logger *send_logger = NULL; struct kni_field_stat_handle *fs_handle = NULL; void *local_logger = NULL; int log_level = -1; @@ -2173,21 +2044,6 @@ extern "C" int kni_init(){ } g_kni_fs_handle = fs_handle; - //init local_ipv4 - ret = kni_ipv4_addr_get_by_eth(manage_eth, &(g_kni_handle->local_ipv4)); - if(ret < 0){ - KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth = %s", manage_eth); - goto error_out; - } - - //init kni_send_logger - send_logger = kni_send_logger_init(profile, local_logger); - if(send_logger == NULL){ - KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", manage_eth); - goto error_out; - } - g_kni_handle->send_logger = send_logger; - //init traceid2pme_htable struct kni_htable_opt opt; memset(&opt, 0, sizeof(opt)); diff --git a/entry/src/kni_send_logger.cpp b/entry/src/kni_send_logger.cpp deleted file mode 100644 index b43e2eb..0000000 --- a/entry/src/kni_send_logger.cpp +++ /dev/null @@ -1,152 +0,0 @@ -#include "kni_utils.h" -#include "kni_send_logger.h" -#include "librdkafka/rdkafka.h" - -struct kni_send_logger{ - int sendlog_switch; - rd_kafka_t *kafka_handle; - rd_kafka_topic_t *kafka_topic; - void *local_logger; -}; - -static rd_kafka_t* kafka_init(const char *profile, void *logger){ - rd_kafka_t *kafka_handle = NULL; - rd_kafka_conf_t *rdkafka_conf = NULL; - char kafka_errstr[1024]; - const char *section = "kafka"; - char queue_buffering_max_messages[KNI_SYMBOL_MAX] = ""; - char topic_metadata_refresh_interval_ms[KNI_SYMBOL_MAX] = ""; - char security_protocol[KNI_SYMBOL_MAX] = ""; - int ret = MESA_load_profile_string_nodef(profile, section, "queue.buffering.max.messages", - queue_buffering_max_messages, sizeof(queue_buffering_max_messages)); - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_prof_load: queue.buffering.max.messages not set, profile is %s, section is %s", profile, section); - goto error_out; - } - ret = MESA_load_profile_string_nodef(profile, section, "topic.metadata.refresh.interval.ms", - topic_metadata_refresh_interval_ms, sizeof(topic_metadata_refresh_interval_ms)); - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_prof_load: topic.metadata.refresh.interval.ms not set, profile is %s, section is %s", profile, section); - goto error_out; - } - ret = MESA_load_profile_string_nodef(profile, section, "security.protocol", security_protocol, sizeof(security_protocol)); - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_prof_load: security.protocol not set, profile is %s, section is %s", profile, section); - goto error_out; - } - KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n queue.buffering.max.messages: %s\n topic.metadata.refresh.interval.ms: %s\n" - "security.protocol: %s", "kafka", queue_buffering_max_messages, topic_metadata_refresh_interval_ms, security_protocol); - - rdkafka_conf = rd_kafka_conf_new(); - rd_kafka_conf_set(rdkafka_conf, "queue.buffering.max.messages", queue_buffering_max_messages, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", topic_metadata_refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "security.protocol", security_protocol, kafka_errstr, sizeof(kafka_errstr)); - - //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. - kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, rdkafka_conf, kafka_errstr, sizeof(kafka_errstr)); - rdkafka_conf = NULL; - if(kafka_handle == NULL){ - goto error_out; - } - return kafka_handle; - -error_out: - if(rdkafka_conf != NULL){ - rd_kafka_conf_destroy(rdkafka_conf); - rdkafka_conf = NULL; - } - if(kafka_handle != NULL){ - rd_kafka_destroy(kafka_handle); - kafka_handle = NULL; - } - return NULL; -} - -void kni_send_logger_destroy(struct kni_send_logger *handle){ - if(handle != NULL){ - if(handle->kafka_topic != NULL){ - rd_kafka_topic_destroy(handle->kafka_topic); - handle->kafka_topic = NULL; - } - if(handle->kafka_handle != NULL){ - rd_kafka_destroy(handle->kafka_handle); - handle->kafka_handle = NULL; - } - FREE(&handle); - } -} - -struct kni_send_logger* kni_send_logger_init(const char *profile, void *local_logger){ - struct kni_send_logger *handle = NULL; - const char *section = "send_logger"; - int sendlog_switch = -1; - char kafka_topic[KNI_SYMBOL_MAX] = ""; - char kafka_brokerlist[KNI_SYMBOL_MAX] = ""; - rd_kafka_t *kafka_handle = NULL; - rd_kafka_topic_t *topic = NULL; - int ret = MESA_load_profile_int_nodef(profile, section, "switch", &sendlog_switch); - if(ret < 0){ - KNI_LOG_ERROR(local_logger, "MESA_prof_load: switch not set, profile is %s, section is %s", profile, section); - goto error_out; - } - ret = MESA_load_profile_string_nodef(profile, section, "kafka_topic", kafka_topic, sizeof(kafka_topic)); - if(ret < 0){ - KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_topic not set, profile is %s, section is %s", profile, section); - goto error_out; - } - ret = MESA_load_profile_string_nodef(profile, section, "kafka_brokerlist", kafka_brokerlist, sizeof(kafka_brokerlist)); - if(ret < 0){ - KNI_LOG_ERROR(local_logger, "MESA_prof_load: kafka_brokerlist not set, profile is %s, section is %s", profile, section); - goto error_out; - } - KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n switch: %d\n kafka_topic: %s\n, kafka_brokerlist: %s", - section, sendlog_switch, kafka_topic, kafka_brokerlist); - handle = ALLOC(struct kni_send_logger, 1); - handle->local_logger = local_logger; - //sendlog_switch = 0, do not sendto kafka - if(sendlog_switch == 0){ - handle->sendlog_switch = 0; - return handle; - } - handle->sendlog_switch = 1; - //init kafka - kafka_handle = kafka_init(profile, local_logger); - if(kafka_handle == NULL){ - KNI_LOG_ERROR(local_logger, "Failed at init kafka"); - goto error_out; - } - handle->kafka_handle = kafka_handle; - //kafka_brokerlist - ret = rd_kafka_brokers_add(kafka_handle, kafka_brokerlist); - if(ret == 0){ - KNI_LOG_ERROR(local_logger, "Failed at add kafka_brokers"); - goto error_out; - } - //kafka topic - topic = rd_kafka_topic_new(kafka_handle, kafka_topic, NULL); - if(topic == NULL){ - KNI_LOG_ERROR(local_logger, "Failed at new kafka topic"); - goto error_out; - } - handle->kafka_topic = topic; - return handle; - -error_out: - kni_send_logger_destroy(handle); - return NULL; -} - -int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len){ - if(handle->sendlog_switch == 0){ - return 0; - } - void *logger = handle->local_logger; - //kafka produce - int kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, - log_msg, log_msg_len, NULL, 0, NULL); - if(kafka_status < 0){ - KNI_LOG_ERROR(logger, "Kafka: Failed to produce, error is %s", rd_kafka_err2name(rd_kafka_last_error())); - return -1; - } - return 0; -}