diff --git a/common/include/kni_utils.h b/common/include/kni_utils.h index dc583c8..b4f5e9e 100644 --- a/common/include/kni_utils.h +++ b/common/include/kni_utils.h @@ -22,6 +22,10 @@ #define KNI_SYMBOL_MAX 64 #define KNI_DOMAIN_MAX 256 +#ifndef MAX +#define MAX(a, b) (((a) > (b)) ? (a) : (b)) +#endif + #ifndef MIN #define MIN(a, b) (((a) < (b)) ? (a) : (b)) #endif diff --git a/common/src/kni_cmsg.cpp b/common/src/kni_cmsg.cpp index 0815c1b..ea8090b 100644 --- a/common/src/kni_cmsg.cpp +++ b/common/src/kni_cmsg.cpp @@ -157,7 +157,6 @@ int kni_cmsg_deserialize(const unsigned char *data, uint16_t len, struct kni_cms cmsg = ALLOC(struct kni_cmsg, 1); offset = sizeof(struct kni_cmsg_serialize_header); nr_tlvs = ntohs(header->nr_tlvs); - printf("nr_tlvs is %d\n", nr_tlvs); for(int i = 0; i < nr_tlvs; i++) { struct kni_cmsg_tlv *tlv = (struct kni_cmsg_tlv*)(data + offset); @@ -166,7 +165,6 @@ int kni_cmsg_deserialize(const unsigned char *data, uint16_t len, struct kni_cms goto error_out; } uint16_t type = ntohs(tlv->type); - printf("type = %d\n", type); uint16_t length = ntohs(tlv->length); if(length < sizeof(struct kni_cmsg_tlv) || offset + length > len) { diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index 5c2fa04..3b76535 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -19,6 +19,11 @@ struct kni_field_stat_handle *g_kni_fs_handle = NULL; #define STREAM_TRACE_ID_LEN 37 #define TFE_COUNT_MAX 16 + +/* TODO: +1. con_duration_ms: how to calculate +*/ + enum kni_protocol{ KNI_PROTOCOL_UNKNOWN = 0, KNI_PROTOCOL_SSL, @@ -41,17 +46,16 @@ struct pme_info{ int tfe_id; void *logger; char stream_trace_id[STREAM_TRACE_ID_LEN]; - union{ - char host[KNI_DOMAIN_MAX]; //http only - char sni[KNI_DOMAIN_MAX]; //ssl only - }; + char host[KNI_DOMAIN_MAX]; //http only + char sni[KNI_DOMAIN_MAX]; //ssl only //tfe_release = 1: tfe don't need pmeinfo int tfe_release; int sapp_release; //kafka log struct streaminfo *stream; time_t start_time; - uint64_t con_duration; + time_t end_time; + uint64_t con_duration_ms; //from tfe, kafka log uint64_t intercept_state; uint64_t pinningst; //defalut 0 @@ -158,7 +162,9 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ //start_time cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time); //end_time - cJSON_AddNumberToObject(log_obj, "end_time", time(NULL)); + cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time); + //con_duration_ms + cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time - pmeinfo->start_time) * 1000); //stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port const struct layer_addr *addr = &(pmeinfo->stream->addr); char client_ip_str[INET6_ADDRSTRLEN] = ""; @@ -236,7 +242,7 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ //s2c_byte_num cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->stream->ptcpdetail->clientbytes); int ret = -1; - char *log_msg = cJSON_Print(log_obj); + char *log_msg = cJSON_PrintUnformatted(log_obj); cJSON_Delete(log_obj); if(log_msg == NULL){ KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print"); @@ -278,6 +284,10 @@ static void pme_info_destroy(struct pme_info *pmeinfo){ } FREE(&pmeinfo); } + else{ + KNI_LOG_DEBUG(logger, "can not free pmeinfo, sapp_release is %d, tfe_release is %d", + pmeinfo->sapp_release, pmeinfo->tfe_release); + } } static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){ @@ -286,7 +296,7 @@ static int protocol_identify(const struct streaminfo* stream, char *buf, int len if(project != NULL){ result->protocol = KNI_PROTOCOL_HTTP; result->domain_len = project->host_len; - strncpy(result->domain, project->host, strnlen(project->host, sizeof(result->domain))); + strncpy(result->domain, project->host, strnlen(project->host, sizeof(result->domain) - 1)); return 0; } @@ -301,7 +311,7 @@ static int protocol_identify(const struct streaminfo* stream, char *buf, int len } else{ result->domain_len = strnlen(chello->sni, KNI_DOMAIN_MAX); - strncpy(result->domain, chello->sni, strnlen(chello->sni, sizeof(result->domain))); + strncpy(result->domain, chello->sni, strnlen(chello->sni, sizeof(result->domain) - 1)); } ssl_chello_free(chello); return 0; @@ -371,7 +381,8 @@ static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, st if(ret < 0) goto error_out; //stream trace id trace_id = pmeinfo->stream_trace_id; - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id, STREAM_TRACE_ID_LEN); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id, + strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id))); if(ret < 0) goto error_out; bufflen = kni_cmsg_serialize_size_get(cmsg); @@ -474,6 +485,10 @@ static char pending_opstate(const struct streaminfo *stream, struct pme_info *pm } static char data_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){ + //pmeinfo->tfe_release = 1: intercept, tfe end first. so droppkt and dropme + if(pmeinfo->tfe_release == 1){ + return APP_STATE_DROPPKT | APP_STATE_DROPME; + } void *logger = g_kni_handle->local_logger; char *buf = (char*)pktinfo->iphdr; int len = pktinfo->ip_totlen; @@ -531,11 +546,11 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; case KNI_PROTOCOL_SSL: - strncpy(pmeinfo->sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->sni))); + strncpy(pmeinfo->sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->sni) - 1)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SSL_STM], 0, FS_OP_ADD, 1); break; case KNI_PROTOCOL_HTTP: - strncpy(pmeinfo->host, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->host))); + strncpy(pmeinfo->host, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->host) - 1)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_HTTP_STM], 0, FS_OP_ADD, 1); break; default: @@ -590,7 +605,12 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein } static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){ - //close: sendto tfe + //pmeinfo->tfe_release = 1: intercept, tfe end first. so droppkt and dropme + if(pmeinfo->tfe_release == 1){ + return APP_STATE_DROPPKT | APP_STATE_DROPME; + } + //close: also need to send to tfe + pmeinfo->end_time = time(NULL); void *logger = g_kni_handle->local_logger; char *buf = (char*)pktinfo->iphdr; char stream_addr[KNI_SYMBOL_MAX] = ""; @@ -608,6 +628,9 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei //KNI_LOG_DEBUG(logger, "Succeed at send last packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); + //reset clock: when sapp end, start clock + MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_trace_id, + strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id))); return APP_STATE_DROPPKT | APP_STATE_DROPME; default: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); @@ -646,7 +669,7 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in switch(stream->pktstate){ case OP_STATE_PENDING: *pme = pmeinfo = pme_info_new(stream, thread_seq, logger); - key_size = strlen(pmeinfo->stream_trace_id); + key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_trace_id), key_size, (const void*)pmeinfo); if(ret < 0){ @@ -654,7 +677,7 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in "table is traceid2pme_htable, key is %s", pmeinfo->stream_trace_id); } KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add, table is traceid2pme_htable, key is %s, key_size is %d", - key_size, pmeinfo->stream_trace_id); + pmeinfo->stream_trace_id, key_size); ret = pending_opstate(stream, pmeinfo, pktinfo); break; case OP_STATE_DATA: @@ -672,7 +695,6 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in } FREE(&pktinfo); if((ret & APP_STATE_DROPME)){ - //sendlog_to_kafka(pmeinfo, logger); pmeinfo->sapp_release = 1; pme_info_destroy(pmeinfo); *pme = NULL; @@ -821,13 +843,18 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger); FREE(&cmsg); pmeinfo->tfe_release = 1; - int key_size = strlen(pmeinfo->stream_trace_id); + pmeinfo->end_time = time(NULL); + int key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); int ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id, key_size, NULL); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d", "traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret); } + else{ + KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d", + "traceid2pme_htable", pmeinfo->stream_trace_id, key_size); + } } FREE(&cmsg); return 0; @@ -910,7 +937,7 @@ void* thread_tfe_cmsg_receiver(void *args){ cb_args.cmsg = cmsg; cb_args.logger = logger; MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_trace_id, - strlen((char*)stream_trace_id), traceid2pme_htable_search_cb, &cb_args, &cb_ret); + value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret); } return NULL; @@ -1116,6 +1143,8 @@ extern "C" void kni_destroy(struct kni_handle *handle){ } static void traceid2pme_htable_data_free_cb(void *data){ + void *logger = g_kni_handle->local_logger; + KNI_LOG_DEBUG(logger, "call traceid2pme_htable_data_free_cb"); struct pme_info *pmeinfo = (struct pme_info*)data; pme_info_destroy(pmeinfo); } @@ -1123,10 +1152,13 @@ static void traceid2pme_htable_data_free_cb(void *data){ //eliminate_type: 0:FIFO; 1:LRU //ret: 1: the item can be eliminated; 0: the item can't be eliminated static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){ + void *logger = g_kni_handle->local_logger; struct pme_info *pmeinfo = (struct pme_info*)data; if(pmeinfo->sapp_release == 0){ + KNI_LOG_DEBUG(logger, "Failed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release); return 0; } + KNI_LOG_DEBUG(logger, "Succeed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release); pmeinfo->tfe_release = 1; return 1; } @@ -1218,7 +1250,7 @@ extern "C" int kni_init(){ //create thread_tfe_cmsg_receiver cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1); cmsg_receiver_args->logger = local_logger; - strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile))); + strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1)); ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret);