diff --git a/CMakeLists.txt b/CMakeLists.txt index f52825d..5fb072b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,6 @@ set(CMAKE_C_STANDARD 11) set(CMAKE_POSITION_INDEPENDENT_CODE ON) set (CMAKE_CXX_FLAGS "-Wall") set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lasan -fsanitize-recover=address -fsanitize=address -fno-omit-frame-pointer") - add_definitions(-D_GNU_SOURCE) if (CMAKE_BUILD_TYPE STREQUAL Debug) diff --git a/common/include/kni_utils.h b/common/include/kni_utils.h index b4f5e9e..e319fc1 100644 --- a/common/include/kni_utils.h +++ b/common/include/kni_utils.h @@ -1,4 +1,3 @@ -//TODO: 日志打印出文件名 + 行号 #pragma once #include #include @@ -60,7 +59,6 @@ do { \ #define KNI_DEFAULT_MSS 1460 #define KNI_DEFAULT_MTU 1500 #define KNI_MTU 3000 -//TODO: 网络序 struct kni_tcpopt_info{ uint16_t mss; uint8_t wscale; @@ -69,7 +67,7 @@ struct kni_tcpopt_info{ }; //field_stat -#define KNI_FIELD_MAX 24 +#define KNI_FIELD_MAX 32 enum kni_field{ KNI_FIELD_TOT_PKT, KNI_FIELD_BYP_PKT, @@ -87,6 +85,13 @@ enum kni_field{ KNI_FIELD_SENDLOG_SUCC, KNI_FIELD_SENDLOG_FAIL, KNI_FIELD_UNKNOWN_STM, + KNI_FIELD_STM_NO_DATA, + KNI_FIELD_PME_NEW, + KNI_FIELD_PME_FREE, + KNI_FIELD_ID2PME_ADD_SUCC, + KNI_FIELD_ID2PME_ADD_FAIL, + KNI_FIELD_ID2PME_DEL_SUCC, + KNI_FIELD_ID2PME_DEL_FAIL, }; struct kni_field_stat_handle{ diff --git a/common/src/kni_utils.cpp b/common/src/kni_utils.cpp index ae0e4a8..aa6f111 100644 --- a/common/src/kni_utils.cpp +++ b/common/src/kni_utils.cpp @@ -240,11 +240,14 @@ MESA_htable_handle kni_create_htable(const char *profile, const char *section, v { __wrapper_MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, HASH_ELIMINATE_ALGO_FIFO, logger, section); } - - __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE, - (void *)free_data_cb, sizeof(free_data_cb), logger, section); - __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY, + if(free_data_cb != NULL){ + __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE, + (void *)free_data_cb, sizeof(free_data_cb), logger, section); + } + if(expire_notify_cb != NULL){ + __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY, (void *)expire_notify_cb, sizeof(free_data_cb), logger, section); + } int ret = MESA_htable_mature(htable); if(unlikely(ret != 0)) { diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index e78ef6c..c7c89e9 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -7,7 +7,8 @@ #include "uuid/uuid.h" #include "cjson/cJSON.h" #include "kni_send_logger.h" -#include +#include +#include extern int g_iThreadNum; @@ -17,15 +18,12 @@ struct kni_field_stat_handle *g_kni_fs_handle = NULL; #define HTTP_PROJECT_NAME "kni_http_tag" #define BURST_MAX 1 -#define STREAM_TRACE_ID_LEN 37 +#define stream_traceid_LEN 37 #define TFE_COUNT_MAX 16 +#define CALLER_SAPP 0 +#define CALLER_TFE 1 -/* TODO: -1. con_duration_ms: how to calculate -2. sapp return dropme之后还会继续调用close吗 -*/ - enum kni_protocol{ KNI_PROTOCOL_UNKNOWN = 0, KNI_PROTOCOL_SSL, @@ -38,6 +36,7 @@ enum stream_error{ STREAM_ERROR_PROTOCOL_UNKNOWN = -3, STREAM_ERROR_NO_SYN_ACK = -4, STREAM_ERROR_INVALID_ACTION = -5, + STREAM_ERROR_NO_DATA = -6, }; struct http_project{ @@ -58,9 +57,12 @@ struct pme_info{ int tfe_id; pthread_mutex_t lock; enum stream_error error; - char stream_trace_id[STREAM_TRACE_ID_LEN]; - char host[KNI_DOMAIN_MAX]; //http only - char sni[KNI_DOMAIN_MAX]; //ssl only + char stream_traceid[stream_traceid_LEN]; + //TODO: union, cjson check protocol + union{ + char host[KNI_DOMAIN_MAX]; //http only + char sni[KNI_DOMAIN_MAX]; //ssl only + }; //tfe_release = 1: tfe don't need pmeinfo int tfe_release; int sapp_release; @@ -172,6 +174,7 @@ static void pme_info_destroy(void *data){ //free lock pthread_mutex_destroy(&(pmeinfo->lock)); FREE(&pmeinfo); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_FREE], 0, FS_OP_ADD, 1); } else{ KNI_LOG_ERROR(logger, "Failed at pme_info_destroy, pmeinfo is null"); @@ -184,16 +187,19 @@ static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1; uuid_t uu; uuid_generate_random(uu); - uuid_unparse(uu, pmeinfo->stream_trace_id); + uuid_unparse(uu, pmeinfo->stream_traceid); pmeinfo->addr = layer_addr_dup(&(stream->addr)); pmeinfo->start_time = time(NULL); + char stream_addr[KNI_SYMBOL_MAX] = ""; //init pme_lock int ret = pthread_mutex_init(&(pmeinfo->lock), NULL); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at init pthread mutex"); + KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream_traceid is %s", pmeinfo->stream_traceid); goto error_out; } - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1); + kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); + KNI_LOG_INFO(logger, "stream addr is %s, stream traceid is %s", stream_addr, pmeinfo->stream_traceid); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1); return pmeinfo; error_out: @@ -204,8 +210,8 @@ error_out: static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ //create cjson cJSON *log_obj = cJSON_CreateObject(); - //stream_trace_id - cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_trace_id); + //stream_traceid + cJSON_AddStringToObject(log_obj, "stream_traceid", pmeinfo->stream_traceid); //policy_id cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id); //action @@ -283,9 +289,13 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ //addr_list cJSON_AddStringToObject(log_obj, "addr_list", ""); //host: http_only - cJSON_AddStringToObject(log_obj, "host", pmeinfo->host); + if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){ + cJSON_AddStringToObject(log_obj, "host", pmeinfo->host); + } //sni: ssl only - cJSON_AddStringToObject(log_obj, "sni", pmeinfo->sni); + if(pmeinfo->protocol == KNI_PROTOCOL_SSL){ + cJSON_AddStringToObject(log_obj, "sni", pmeinfo->sni); + } //c2s_pkt_num cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts); //s2c_pkt_num @@ -298,13 +308,14 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ char *log_msg = cJSON_PrintUnformatted(log_obj); cJSON_Delete(log_obj); if(log_msg == NULL){ - KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print"); + KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid is %s", pmeinfo->stream_traceid); goto error_out; } KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg); ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg)); if(ret < 0){ - KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d", ret); + KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d, strem_traceid is %s", + ret, pmeinfo->stream_traceid); goto error_out; } cJSON_free(log_msg); @@ -316,33 +327,59 @@ error_out: } return -1; } - -//return 0: has been destroy -static int judge_pme_destroy(struct pme_info *pmeinfo){ + +static void judge_pme_destroy(struct pme_info *pmeinfo, int caller){ void *logger = g_kni_handle->local_logger; if(pmeinfo != NULL){ void *logger = g_kni_handle->local_logger; + pthread_mutex_lock(&(pmeinfo->lock)); + if(caller == CALLER_SAPP){ + KNI_LOG_DEBUG(logger, "set sapp_release = 1, caller is %d, stream_trace_id is %s, thread id is %p", + caller, pmeinfo->stream_traceid, pthread_self()); + pmeinfo->sapp_release = 1; + } + if(caller == CALLER_TFE){ + KNI_LOG_DEBUG(logger, "set tfe_release = 1, caller is %d, stream_trace_id is %s, thread id is %p", + caller, pmeinfo->stream_traceid, pthread_self()); + pmeinfo->tfe_release = 1; + } if(pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){ + //sendlog int ret = sendlog_to_kafka(pmeinfo, logger); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at sendlog to kafka, stream_trace_id is %s", pmeinfo->stream_trace_id); + KNI_LOG_ERROR(logger, "Failed at sendlog to kafka, stream traceid is %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1); } else{ - KNI_LOG_DEBUG(logger, "Succeed sendlog to kafka, stream_trace_id is %s", pmeinfo->stream_trace_id); + KNI_LOG_DEBUG(logger, "Succeed sendlog to kafka, stream traceid is %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1); } + //only intercetp stream need del htable + if(pmeinfo->action == KNI_ACTION_INTERCEPT){ + int key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)); + ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_traceid, + key_size, NULL); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d", + "traceid2pme_htable", pmeinfo->stream_traceid, key_size, ret); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL], 0, FS_OP_ADD, 1); + } + else{ + KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d", + "traceid2pme_htable", pmeinfo->stream_traceid, key_size); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1); + } + } + //free pme pme_info_destroy(pmeinfo); - return 0; - } - else{ - KNI_LOG_DEBUG(logger, "can not destroy pmeinfo, sapp_release = %d, tfe_release = %d", pmeinfo->sapp_release, pmeinfo->tfe_release); + return; } + KNI_LOG_DEBUG(logger, "can not destroy pmeinfo, sapp_release = %d, tfe_release = %d", pmeinfo->sapp_release, pmeinfo->tfe_release); + pthread_mutex_unlock(&(pmeinfo->lock)); } else{ KNI_LOG_ERROR(logger, "Failed at judge_pme_info, pmeinfo is null"); } - return -1; } static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){ @@ -376,11 +413,12 @@ static int protocol_identify(const struct streaminfo* stream, char *buf, int len result->protocol = KNI_PROTOCOL_UNKNOWN; return 0; } -static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, uint16_t size){ +static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, + uint16_t size, char *stream_traceid){ void *logger = g_kni_handle->local_logger; int ret = kni_cmsg_set(cmsg, type, value, size); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d", type); + KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d, stream traceid is %s", type, stream_traceid); } return ret; } @@ -400,52 +438,52 @@ static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, st uint16_t client_window = htons(pmeinfo->client_window); uint16_t server_window = htons(pmeinfo->server_window); //seq - int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4); + int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //ack - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client mss - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server mss - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client wscale - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server wscale - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client sack - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server sack - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client timestamp - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server timestamp - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //protocol - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client window - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server window - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //maat policy id policy_id = pmeinfo->policy_id; - ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id)); + ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id), pmeinfo->stream_traceid); if(ret < 0) goto error_out; //stream trace id - trace_id = pmeinfo->stream_trace_id; + trace_id = pmeinfo->stream_traceid; ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id, - strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id))); + strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)), pmeinfo->stream_traceid); if(ret < 0) goto error_out; bufflen = kni_cmsg_serialize_size_get(cmsg); @@ -453,7 +491,8 @@ static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, st serialize_len = 0; ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d", ret); + KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d, stream traceid is %s", + ret, pmeinfo->stream_traceid); goto error_out; } *len = serialize_len; @@ -510,24 +549,28 @@ static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktin static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw_len, int thread_seq, int tfe_id){ void *logger = g_kni_handle->local_logger; marsio_buff_t *tx_buffs[BURST_MAX]; - unsigned int ret = 1; struct mr_vdev *dev_eth_handler = handle->tfe_instance_list[tfe_id]->dev_eth_handler; struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath; char *src_mac = handle->src_mac_addr; char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr; - int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, ret, 0, thread_seq); + //only send one packet + int nr_send = 1; + int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq); if (alloc_ret < 0){ - KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d", ret, thread_seq); + KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d", + alloc_ret, thread_seq); return -1; } - char* dst_data = marsio_buff_append(tx_buffs[0], raw_len + 14); - //ethernet_header[14] - memcpy(dst_data, dst_mac, 6); - memcpy(dst_data + 6, src_mac, 6); - dst_data[12] = 0x08; - dst_data[13] = 0x00; - memcpy((char*)dst_data + 14, raw_data, raw_len); - marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, ret); + for(int i = 0; i < nr_send; i++){ + char* dst_data = marsio_buff_append(tx_buffs[i], raw_len + 14); + //ethernet_header[14] + struct ethhdr *ether_hdr = (struct ethhdr*)dst_data; + memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest)); + memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source)); + ether_hdr->h_proto = htons(ETH_P_IP); + memcpy((char*)dst_data + sizeof(*ether_hdr), raw_data, raw_len); + } + marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, nr_send); return 0; } @@ -535,16 +578,16 @@ static char pending_opstate(const struct streaminfo *stream, struct pme_info *pm void *logger = g_kni_handle->local_logger; if(!pktinfo->tcphdr->syn){ //pending_opstate not syn, bypass and dropme - KNI_LOG_ERROR(logger, "pending opstate: not syn"); + KNI_LOG_DEBUG(logger, "pending opstate: not syn, stream traceid is %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN_EXP], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_PENDING_NO_SYN; return APP_STATE_FAWPKT | APP_STATE_DROPME; } pmeinfo->client_window = pktinfo->tcphdr->window; pmeinfo->client_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } @@ -556,8 +599,8 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein void *logger = g_kni_handle->local_logger; char *buf = (char*)pktinfo->iphdr; int len = pktinfo->ip_totlen; - char stream_addr[KNI_SYMBOL_MAX] = ""; int ret; + char stream_addr[KNI_SYMBOL_MAX] = ""; kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); //pmeinfo->action has only 3 value: KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS switch (pmeinfo->action){ @@ -566,7 +609,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein case KNI_ACTION_INTERCEPT: ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream_addr is %s", pmeinfo->tfe_id, stream_addr); + KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_DROPPKT | APP_STATE_GIVEME; @@ -574,29 +617,31 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; default: + assert(0); break; } // syn/ack if(pktinfo->tcphdr->syn && pktinfo->tcphdr->ack){ pmeinfo->server_window = pktinfo->tcphdr->window; pmeinfo->server_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } + //no data, maybe ack if(pktinfo->data_len <= 0){ - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } //not double dir, bypass and dropme if(stream->dir != DIR_DOUBLE){ KNI_LOG_INFO(logger, "dir is %d, bypass, stream addr is %s", stream->dir, stream_addr); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_SINGLE_DIR; return APP_STATE_FAWPKT | APP_STATE_DROPME; } struct protocol_identify_result protocol_identify_res; - memset(&protocol_identify_res, 0, sizeof(struct protocol_identify_result)); + memset(&protocol_identify_res, 0, sizeof(protocol_identify_res)); protocol_identify(stream, pktinfo->data, pktinfo->data_len, &protocol_identify_res); pmeinfo->protocol = protocol_identify_res.protocol; switch(pmeinfo->protocol){ @@ -604,8 +649,8 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein case KNI_PROTOCOL_UNKNOWN: KNI_LOG_INFO(logger, "Failed at protocol_identify, bypass and dropme, stream addr is %s\n", pmeinfo->protocol, stream_addr); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STM], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_PROTOCOL_UNKNOWN; return APP_STATE_FAWPKT | APP_STATE_DROPME; @@ -630,28 +675,40 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein stream_addr, protocol_identify_res.domain, pmeinfo->policy_id, pmeinfo->action, action_str, pmeinfo->maat_hit); //receive client hello, but no syn/ack, bypass and dropme if(pmeinfo->client_tcpopt == NULL || pmeinfo->server_tcpopt == NULL){ - KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "", - pmeinfo->server_tcpopt == NULL ? "no syn/ack" : ""); + KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s, stream traceid is %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "", + pmeinfo->server_tcpopt == NULL ? "no syn/ack" : "", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA_EXP], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_NO_SYN_ACK; return APP_STATE_FAWPKT | APP_STATE_DROPME; } + int key_size; switch(pmeinfo->action){ case KNI_ACTION_BYPASS: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; case KNI_ACTION_INTERCEPT: + //only intercept: add to hash table + key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)); + ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid), + key_size, (const void*)pmeinfo); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add," + "table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1); + } + else{ + KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add," + "table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1); + } //action = KNI_ACTION_INTERCEPT, sendto tfe buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len); ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); - } - else{ - //KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); + KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid); } FREE(&buf); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); @@ -661,22 +718,17 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein //action != intercept && action != bypass,bypass and dropme KNI_LOG_ERROR(logger, "Action %d(%s) is invalid, bypass(dropme): policy_id is %d, stream addr is %s, domain is ", pmeinfo->action, action_str, pmeinfo->policy_id, stream_addr, protocol_identify_res.domain); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_INVALID_ACTION; return APP_STATE_FAWPKT | APP_STATE_DROPME; } } static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){ - //close: also need to send to tfe + //close: a_packet = null, do not sendto tfe pmeinfo->end_time = time(NULL); void *logger = g_kni_handle->local_logger; - char *buf = (char*)pktinfo->iphdr; - char stream_addr[KNI_SYMBOL_MAX] = ""; - kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); - int len = pktinfo->ip_totlen; - int ret; pmeinfo->server_bytes=stream->ptcpdetail->serverbytes; pmeinfo->client_bytes=stream->ptcpdetail->clientbytes; pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum; @@ -684,88 +736,69 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei pmeinfo->dir=stream->dir; switch(pmeinfo->action){ case KNI_ACTION_INTERCEPT: - ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); - if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at send last packet to tfe%d, stream addr is %s", - pmeinfo->tfe_id, stream_addr); - } - else{ - //KNI_LOG_DEBUG(logger, "Succeed at send last packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); - } - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); //reset clock: when sapp end, start clock - MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_trace_id, - strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id))); + MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_traceid, + strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid))); return APP_STATE_DROPPKT | APP_STATE_DROPME; case KNI_ACTION_BYPASS: + KNI_LOG_DEBUG(logger, "set tfe_release = 1, stream_trace_id is %s", pmeinfo->stream_traceid); pmeinfo->tfe_release = 1; - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_DROPME; - //will not happen, close has only 2 action: + //stream has only syn, ack. not data. do not send to tfe default: char action_str[KNI_SYMBOL_MAX]; kni_maat_action_trans(pmeinfo->action, action_str); - KNI_LOG_ERROR(logger, "close_opstate: action %d(%s) is abnormal", - pmeinfo->action, action_str); + pmeinfo->error = STREAM_ERROR_NO_DATA; + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STM_NO_DATA], 0, FS_OP_ADD, 1); + KNI_LOG_DEBUG(logger, "close_opstate: action %d(%s) is abnormal, stream_traceid is %s", + pmeinfo->action, action_str, pmeinfo->stream_traceid); return APP_STATE_FAWPKT | APP_STATE_DROPME; } } //from syn extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){ + void *logger = g_kni_handle->local_logger; - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1); - int ret, key_size; + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1); + int ret; struct pme_info *pmeinfo = *(struct pme_info **)pme; //pktinfo struct pkt_info pktinfo; + memset(&pktinfo, 0, sizeof(pktinfo)); //TODO: ipv6 if(stream->addr.addrtype == ADDR_TYPE_IPV6){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_PKT], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_DROPME; } - //a_packet == NULL, continue - if(a_packet == NULL){ + + //a_packet == NULL && not op_state_close, continue + //close: a_packet may be null, if a_packet = null, do not send to tfe + if(a_packet == NULL && stream->pktstate != OP_STATE_CLOSE){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } - memset(&pktinfo, 0, sizeof(struct pkt_info)); - pktinfo.iphdr = (struct iphdr*)a_packet; - pktinfo.iphdr_len = pktinfo.iphdr->ihl * 4; - pktinfo.ip_totlen = ntohs(pktinfo.iphdr->tot_len); - pktinfo.tcphdr = (struct tcphdr*)((char*)pktinfo.iphdr + pktinfo.iphdr_len); - pktinfo.tcphdr_len = pktinfo.tcphdr->doff * 4; - pktinfo.data = (char*)pktinfo.tcphdr + pktinfo.tcphdr_len; - pktinfo.data_len = pktinfo.ip_totlen - pktinfo.iphdr_len - pktinfo.tcphdr_len; - - /* for debug - if(stream->pktstate == 2){ - printf("stream->pktstate is %d\n", stream->pktstate); - } - return APP_STATE_FAWPKT | APP_STATE_GIVEME; - */ + if(a_packet != NULL){ + pktinfo.iphdr = (struct iphdr*)a_packet; + pktinfo.iphdr_len = pktinfo.iphdr->ihl * 4; + pktinfo.ip_totlen = ntohs(pktinfo.iphdr->tot_len); + pktinfo.tcphdr = (struct tcphdr*)((char*)pktinfo.iphdr + pktinfo.iphdr_len); + pktinfo.tcphdr_len = pktinfo.tcphdr->doff * 4; + pktinfo.data = (char*)pktinfo.tcphdr + pktinfo.tcphdr_len; + pktinfo.data_len = pktinfo.ip_totlen - pktinfo.iphdr_len - pktinfo.tcphdr_len; + } + switch(stream->pktstate){ case OP_STATE_PENDING: - pmeinfo = pme_info_new(stream, thread_seq); + *pme = pmeinfo = pme_info_new(stream, thread_seq); if(pmeinfo == NULL){ KNI_LOG_ERROR(logger, "Failed at new pmeinfo"); return APP_STATE_FAWPKT | APP_STATE_DROPME; } - *pme = pmeinfo; - key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); - ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_trace_id), - key_size, (const void*)pmeinfo); - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add," - "table is traceid2pme_htable, key is %s", pmeinfo->stream_trace_id); - pme_info_destroy(pmeinfo); - return APP_STATE_FAWPKT | APP_STATE_DROPME; - } - //KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add, table is traceid2pme_htable, key is %s, key_size is %d", - // pmeinfo->stream_trace_id, key_size); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW], 0, FS_OP_ADD, 1); ret = pending_opstate(stream, pmeinfo, &pktinfo); if(pmeinfo->error < 0){ goto error_out; @@ -781,40 +814,28 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in case OP_STATE_CLOSE: //sapp stream close ret = close_opstate(stream, pmeinfo, &pktinfo, thread_seq); + if(pmeinfo->error < 0){ + goto error_out; + } break; default: ret = APP_STATE_FAWPKT | APP_STATE_GIVEME; - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP], 0, FS_OP_ADD, 1); - KNI_LOG_ERROR(logger, "Unknown stream opstate %d", stream->pktstate); + KNI_LOG_ERROR(logger, "Unknown stream opstate %d, stream traceid is %s", stream->pktstate, pmeinfo->stream_traceid); break; } - //close opstate or data_opstate(tfe_release = 1), need sendlog + //sapp release: bypass or intercept if((ret & APP_STATE_DROPME)){ - //lock - pthread_mutex_lock(&(pmeinfo->lock)); - pmeinfo->sapp_release = 1; - int ret = judge_pme_destroy(pmeinfo); - if(ret < 0){ - pthread_mutex_unlock(&(pmeinfo->lock)); - } + judge_pme_destroy(pmeinfo, CALLER_SAPP); } return ret; -//error: so del htable, pmeinfo destroy. do not sendlog +//error out: no hash, no sendlog, just destroy_pme error_out: - KNI_LOG_ERROR(logger, "stream error = %d, ret is %d", pmeinfo->error, ret); + KNI_LOG_DEBUG(logger, "stream error = %d, ret is %d, stream traceid is %s", pmeinfo->error, ret, pmeinfo->stream_traceid); if(pmeinfo != NULL){ - key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); - int ret1 = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id, key_size, pme_info_destroy); - if(ret1 < 0){ - KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d", - "traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret1); - } - else{ - KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d", - "traceid2pme_htable", pmeinfo->stream_trace_id, key_size); - } + pme_info_destroy(pmeinfo); } return ret; } @@ -882,34 +903,37 @@ void* thread_tfe_data_receiver(void *args){ int thread_seq = 0; while(true){ //receive from tfe - int ret = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst); - if(ret <= 0){ + int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst); + if(nr_recv <= 0){ continue; } //tag struct mr_tunnat_ctrlzone mr_ctrlzone; - mr_ctrlzone.action |= TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER; - for(int i = 0; i < ret; i++){ + mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER); + for(int i = 0; i < nr_recv; i++){ marsio_buff_ctrlzone_set(rx_buff[i], 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone)); } //send to vxlan - marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, 1, MARSIO_SEND_OPT_FAST); + marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, nr_recv, MARSIO_SEND_OPT_FAST); } return NULL; } -static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type, uint16_t value_size_max, void *logger){ +static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type, + uint16_t value_size_max, void *logger){ uint16_t value_size = 0; unsigned char *value = NULL; int ret = kni_cmsg_get(cmsg, type, &value_size, &value); if(ret < 0){ if(ret == KNI_CMSG_INVALID_TYPE){ - KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", type, ret); + KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d, stream traceid is %s", + type, ret, pmeinfo->stream_traceid); } return -1; } if(value_size > value_size_max){ - KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %d, size is %d, which should <= %d", type, value_size, value_size_max); + KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %d, size is %d, which should <= %d, stream traceid is %s", + type, value_size, value_size_max, pmeinfo->stream_traceid); return -1; } switch(type) @@ -959,17 +983,8 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger); pmeinfo->end_time = time(NULL); - int key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); - int ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id, - key_size, NULL); - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d", - "traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret); - } - else{ - KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d", - "traceid2pme_htable", pmeinfo->stream_trace_id, key_size); - } + KNI_LOG_INFO(logger, "recv cmsg from tfe, stream traceid is %s", pmeinfo->stream_traceid); + judge_pme_destroy(pmeinfo, CALLER_TFE); } kni_cmsg_destroy(cmsg); return 0; @@ -1037,10 +1052,10 @@ void* thread_tfe_cmsg_receiver(void *args){ KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret is %d", ret); continue; } - //get stream_trace_id - unsigned char *stream_trace_id = NULL; + //get stream_traceid + unsigned char *stream_traceid = NULL; uint16_t value_size; - ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_trace_id); + ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_traceid); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", TFE_CMSG_STREAM_TRACE_ID, ret); continue; @@ -1051,7 +1066,7 @@ void* thread_tfe_cmsg_receiver(void *args){ memset((void*)&cb_args, 0, sizeof(cb_args)); cb_args.cmsg = cmsg; cb_args.logger = logger; - MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_trace_id, + MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_traceid, value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret); } return NULL; @@ -1223,7 +1238,7 @@ static struct kni_field_stat_handle * fs_init(const char *profile){ FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); fs_handle = ALLOC(struct kni_field_stat_handle, 1); fs_handle->handle = handle; - fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt"); + //fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt"); fs_handle->fields[KNI_FIELD_BYP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_pkt"); fs_handle->fields[KNI_FIELD_INTCP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_pkt"); fs_handle->fields[KNI_FIELD_IPV6_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_pkt"); @@ -1231,7 +1246,7 @@ static struct kni_field_stat_handle * fs_init(const char *profile){ fs_handle->fields[KNI_FIELD_NO_SYN_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_syn_pkt"); fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_state"); fs_handle->fields[KNI_FIELD_NO_SA_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_s/a_pkt"); - fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm"); + //fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm"); fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm"); fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm"); fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm"); @@ -1239,6 +1254,13 @@ static struct kni_field_stat_handle * fs_init(const char *profile){ fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_succ"); fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_fail"); fs_handle->fields[KNI_FIELD_UNKNOWN_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_stm"); + fs_handle->fields[KNI_FIELD_STM_NO_DATA] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "stm_no_data"); + fs_handle->fields[KNI_FIELD_PME_NEW] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_new"); + fs_handle->fields[KNI_FIELD_PME_FREE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_free"); + fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_succ"); + fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_fail"); + fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_succ"); + fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_fail"); fs_handle->handle = handle; FS_start(handle); return fs_handle; @@ -1257,29 +1279,14 @@ extern "C" void kni_destroy(struct kni_handle *handle){ handle = NULL; } -static void traceid2pme_htable_data_free_cb(void *data){ - void *logger = g_kni_handle->local_logger; - KNI_LOG_DEBUG(logger, "call traceid2pme_htable_data_free_cb"); - struct pme_info *pmeinfo = (struct pme_info*)data; - pthread_mutex_lock(&(pmeinfo->lock)); - pmeinfo->tfe_release = 1; - int ret = judge_pme_destroy(pmeinfo); - if(ret < 0){ - pthread_mutex_unlock(&(pmeinfo->lock)); - } -} - //eliminate_type: 0:FIFO; 1:LRU //ret: 1: the item can be eliminated; 0: the item can't be eliminated static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){ - void *logger = g_kni_handle->local_logger; struct pme_info *pmeinfo = (struct pme_info*)data; - if(pmeinfo->sapp_release == 0){ - KNI_LOG_DEBUG(logger, "Failed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release); - return 0; + if(pmeinfo->sapp_release == 1){ + judge_pme_destroy(pmeinfo, CALLER_TFE); } - KNI_LOG_DEBUG(logger, "Succeed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release); - return 1; + return 0; } @@ -1408,7 +1415,7 @@ extern "C" int kni_init(){ g_kni_handle->send_logger = send_logger; //init traceid2pme_htable - traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", (void*)traceid2pme_htable_data_free_cb, + traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", NULL, (void*)traceid2pme_htable_expire_notify_cb, local_logger); if(traceid2pme_htable == NULL){ KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable");