/* intercept: destroy_pme + send_log + del traceid2pem + del tuple2stream bypass: drome: pme_new_fail: destroy_pme no_tfe: destroy_pme stream_error: destroy_pme + send_log giveme: policy: destroy_pme + send_log dup_traffic: destroy_pme + send_log */ #include "kni_utils.h" #include "ssl_utils.h" #include "marsio.h" #include "kni_maat.h" #include "MESA/http.h" #include "MESA/stream_inc/sapp_inject.h" #include "kni_cmsg.h" #include "uuid/uuid.h" #include "cjson/cJSON.h" #include "kni_send_logger.h" #include #include "tfe_mgr.h" #include "dablooms.h" struct kni_handle *g_kni_handle = NULL; struct kni_field_stat_handle *g_kni_fs_handle = NULL; #define HTTP_PROJECT_NAME "kni_http_tag" #define BURST_MAX 1 #define STREAM_TRACEID_LEN 37 #define CALLER_SAPP 0 #define CALLER_TFE 1 enum kni_protocol{ KNI_PROTOCOL_UNKNOWN = 0, KNI_PROTOCOL_SSL, KNI_PROTOCOL_HTTP, }; enum stream_error{ STREAM_ERROR_PENDING_NO_SYN = -1, STREAM_ERROR_SINGLE_DIR = -2, STREAM_ERROR_PROTOCOL_UNKNOWN = -3, STREAM_ERROR_NO_SYN_ACK = -4, STREAM_ERROR_INVALID_ACTION = -5, STREAM_ERROR_NO_DATA = -6, STREAM_ERROR_IPHDR_PARSE_FAIL = -7, STREAM_ERROR_EXCEED_MTU = -8, STREAM_ERROR_SENDTO_TFE_FAIL = -9, STREAM_ERROR_TUPLE2STM_ADD_FAIL = -10, }; struct http_project{ int host_len; char host[KNI_DOMAIN_MAX]; }; //memset 0 struct dup_traffic_dabloom_key{ union{ struct stream_tuple4_v4 v4; struct stream_tuple4_v6 v6; }addr; uint16_t ipid; uint8_t ttl; uint32_t seq; uint32_t ack_seq; uint32_t timestamp; }; struct pme_info{ addr_type_t addr_type; int protocol; int do_log; int policy_id; int maat_hit; enum kni_action action; int service; struct kni_tcpopt_info *client_tcpopt; struct kni_tcpopt_info *server_tcpopt; uint16_t client_window; uint16_t server_window; int tfe_id; pthread_mutex_t lock; enum stream_error error; char stream_traceid[STREAM_TRACEID_LEN]; //cjson check protocol union{ char host[KNI_DOMAIN_MAX]; //http only char sni[KNI_DOMAIN_MAX]; //ssl only }domain; //tfe_release = 1: tfe don't need pmeinfo int tfe_release; int sapp_release; //kafka log struct layer_addr *addr; unsigned char dir; uint64_t server_bytes; uint64_t client_bytes; uint64_t server_pkts; uint64_t client_pkts; struct timespec start_time; struct timespec end_time; uint64_t con_duration_ms; //from tfe, kafka log uint64_t intercept_state; uint64_t pinningst; //defalut 0 uint64_t ssl_server_side_latency; uint64_t ssl_client_side_latency; char ssl_server_side_version[KNI_SYMBOL_MAX]; char ssl_client_side_version[KNI_SYMBOL_MAX]; uint64_t ssl_cert_verify; char ssl_error[KNI_STRING_MAX]; //for dup traffic detect int has_dup_traffic; int has_dup_syn; int has_dup_syn_ack; struct dup_traffic_dabloom_key *syn_packet; struct dup_traffic_dabloom_key *syn_ack_packet; }; struct wrapped_packet{ char data[KNI_MTU]; }; struct tcp_option_restore{ uint8_t kind; uint8_t len; uint16_t offset; }; struct tfe_enabled_node{ int tfe_id; struct mr_vdev *dev_eth_handler; struct mr_sendpath *dev_eth_sendpath; char mac_addr[6]; }; struct kni_marsio_handle{ struct mr_instance *instance; int tfe_enabled_node_count; struct tfe_enabled_node tfe_enabled_nodes[TFE_COUNT_MAX]; char src_mac_addr[6]; }; struct protocol_identify_result{ int protocol; char domain[KNI_DOMAIN_MAX]; int domain_len; }; struct thread_tfe_cmsg_receiver_args{ void *logger; char profile[KNI_SYMBOL_MAX]; }; struct per_thread_handle{ MESA_htable_handle tuple2stream_htable; struct expiry_dablooms_handle *dabloom_handle; }; struct tuple2stream_htable_value{ struct streaminfo *stream; struct pme_info *pmeinfo; int route_dir; int reversed; }; struct kni_handle{ int http_project_id; struct kni_marsio_handle *marsio_handle; struct kni_maat_handle *maat_handle; struct kni_send_logger *send_logger; MESA_htable_handle traceid2pme_htable; struct per_thread_handle *threads_handle; uint32_t local_ipv4; void *local_logger; struct tfe_mgr *_tfe_mgr; int thread_count; int dup_traffic_switch; int dup_traffic_action; }; struct traceid2pme_search_cb_args{ struct kni_cmsg *cmsg; void *logger; }; static char* stream_errmsg_get(enum stream_error _errno){ switch(_errno){ case STREAM_ERROR_PENDING_NO_SYN: return (char*)"penging not syn"; case STREAM_ERROR_SINGLE_DIR: return (char*)"single dir"; case STREAM_ERROR_PROTOCOL_UNKNOWN: return (char*)"protocol unknown"; case STREAM_ERROR_NO_SYN_ACK: return (char*)"no syn/ack"; case STREAM_ERROR_INVALID_ACTION: return (char*)"invalid aciton"; case STREAM_ERROR_NO_DATA: return (char*)"no data"; case STREAM_ERROR_IPHDR_PARSE_FAIL: return (char*)"ip header parse fail"; case STREAM_ERROR_EXCEED_MTU: return (char*)"exceed mtu(1500)"; case STREAM_ERROR_SENDTO_TFE_FAIL: return (char*)"sendto_tfe_fail"; case STREAM_ERROR_TUPLE2STM_ADD_FAIL: return (char*)"tuple2stm_add_fail"; default: return (char*)"unknown error"; } } static int dup_traffic_dabloom_key_get(struct pkt_info *pktinfo, struct dup_traffic_dabloom_key *key){ //ipv6 struct tcphdr *tcphdr = pktinfo->tcphdr; key->seq = tcphdr->seq; key->ack_seq = tcphdr->ack_seq; struct kni_tcpopt_info* tcpopt = kni_get_tcpopt(tcphdr, pktinfo->tcphdr_len); key->timestamp = tcpopt->ts_value; FREE(&tcpopt); if(pktinfo->addr_type == ADDR_TYPE_IPV6){ struct ip6_hdr *iphdr = pktinfo->iphdr.v6; memcpy(key->addr.v6.saddr, &(iphdr->ip6_src), sizeof(key->addr.v6.saddr)); memcpy(key->addr.v6.daddr, &(iphdr->ip6_dst), sizeof(key->addr.v6.daddr)); key->addr.v6.source = tcphdr->source; key->addr.v6.dest = tcphdr->dest; } //ipv4 else{ struct iphdr *iphdr = pktinfo->iphdr.v4; key->addr.v4.saddr = iphdr->saddr; key->addr.v4.daddr = iphdr->daddr; key->addr.v4.source = tcphdr->source; key->addr.v4.dest = tcphdr->dest; key->ttl = iphdr->ttl; key->ipid = iphdr->id; } return 0; } static void pme_info_destroy(void *data){ struct pme_info *pmeinfo = (struct pme_info *)data; void *logger = g_kni_handle->local_logger; if(pmeinfo != NULL){ //free client_tcpopt if(pmeinfo->client_tcpopt != NULL){ FREE(&(pmeinfo->client_tcpopt)); } //free server tcpopt if(pmeinfo->server_tcpopt != NULL){ FREE(&(pmeinfo->server_tcpopt)); } //free layer_addr layer_addr_free(pmeinfo->addr); pmeinfo->addr=NULL; //free lock pthread_mutex_destroy(&(pmeinfo->lock)); //free syn/syn_ack FREE(&(pmeinfo->syn_packet)); FREE(&(pmeinfo->syn_ack_packet)); FREE(&pmeinfo); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_FREE], 0, FS_OP_ADD, 1); } else{ KNI_LOG_ERROR(logger, "Failed at pme_info_destroy, pmeinfo = null"); } } static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread_seq){ void *logger = g_kni_handle->local_logger; struct pme_info* pmeinfo = ALLOC(struct pme_info, 1); pmeinfo->addr_type = (enum addr_type_t)stream->addr.addrtype; uuid_t uu; uuid_generate_random(uu); uuid_unparse(uu, pmeinfo->stream_traceid); pmeinfo->addr = layer_addr_dup(&(stream->addr)); clock_gettime(CLOCK_REALTIME, &(pmeinfo->start_time)); char stream_addr[KNI_ADDR_MAX] = ""; //init pme_lock int ret = pthread_mutex_init(&(pmeinfo->lock), NULL); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream_traceid = %s", pmeinfo->stream_traceid); goto error_out; } if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ kni_addr_trans_v6(stream->addr.tuple4_v6, stream_addr, sizeof(stream_addr)); } else{ kni_addr_trans_v4(stream->addr.tuple4_v4, stream_addr, sizeof(stream_addr)); } KNI_LOG_INFO(logger, "stream addr = %s, stream traceid = %s", stream_addr, pmeinfo->stream_traceid); return pmeinfo; error_out: pme_info_destroy(pmeinfo); return NULL; } static int log_generate(struct pme_info *pmeinfo, void *local_logger){ //create cjson cJSON *log_obj = cJSON_CreateObject(); //stream_traceid cJSON_AddStringToObject(log_obj, "stream_traceid", pmeinfo->stream_traceid); //policy_id cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id); //action cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action); //service cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service); //start_time cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time.tv_sec); //end_time cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time.tv_sec); //con_duration_ms cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time.tv_sec - pmeinfo->start_time.tv_sec) * 1000 + (pmeinfo->end_time.tv_nsec - pmeinfo->start_time.tv_nsec) / 1000000); //stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port const struct layer_addr *addr = pmeinfo->addr; char client_ip_str[INET6_ADDRSTRLEN] = ""; char server_ip_str[INET6_ADDRSTRLEN] = ""; switch(addr->addrtype){ case ADDR_TYPE_IPV4: cJSON_AddNumberToObject(log_obj, "addr_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, client_ip_str, sizeof(client_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, server_ip_str, sizeof(server_ip_str)); cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str); cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str); cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest)); cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP"); break; case ADDR_TYPE_IPV6: cJSON_AddNumberToObject(log_obj, "addr_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str)); cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str); cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str); cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest)); cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP"); break; default: break; } //entrance_id: 0 cJSON_AddNumberToObject(log_obj, "entrance_id", 0); //device_id: 0 cJSON_AddNumberToObject(log_obj, "device_id", 0); //link_id: 0 cJSON_AddNumberToObject(log_obj, "link_id", 0); //isp: null cJSON_AddStringToObject(log_obj, "isp", ""); //encap_type: from sapp, 先填0 cJSON_AddNumberToObject(log_obj, "encap_type", 0); //pinning state: from tfe cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst); //intercept state: from tfe cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state); //ssl upstream latency: from tfe cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency); //ssl downstream latency: from tfe cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency); //ssl upstream version: from tfe cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version); //ssl downstream version: from tfe cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version); //ssl cert verify cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify); //direction: 0 cJSON_AddNumberToObject(log_obj, "direction", 0); //stream_dir: from sapp cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->dir); //cap_ip: kni ip char local_ipv4_str[INET6_ADDRSTRLEN]; inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str)); cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str); //addr_list cJSON_AddStringToObject(log_obj, "addr_list", ""); //host: http_only if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){ cJSON_AddStringToObject(log_obj, "host", pmeinfo->domain.host); } //sni: ssl only if(pmeinfo->protocol == KNI_PROTOCOL_SSL){ cJSON_AddStringToObject(log_obj, "sni", pmeinfo->domain.sni); } //c2s_pkt_num cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts); //s2c_pkt_num cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->client_pkts); //c2s_byte_num cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->server_bytes); //s2c_byte_num cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->client_bytes); //dup_traffic cJSON_AddNumberToObject(log_obj, "has_dup_traffic", pmeinfo->has_dup_traffic); //stream_error if(pmeinfo->error < 0){ char *stream_errmsg = stream_errmsg_get(pmeinfo->error); cJSON_AddStringToObject(log_obj, "stream_error", stream_errmsg); } int ret = -1; char *log_msg = cJSON_PrintUnformatted(log_obj); cJSON_Delete(log_obj); if(log_msg == NULL){ KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid = %s", pmeinfo->stream_traceid); goto error_out; } //local log KNI_LOG_DEBUG(local_logger, "log_msg = %s\n", log_msg); //sendto kafka ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg)); if(ret < 0){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(local_logger, "Failed at sendlog_to_kafka, ret = %d, strem_traceid = %s", ret, pmeinfo->stream_traceid); goto error_out; } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1); cJSON_free(log_msg); return 0; error_out: if(log_msg != NULL){ cJSON_free(log_msg); } return -1; } static void stream_destroy(struct pme_info *pmeinfo, int do_log){ //sendlog void *logger = g_kni_handle->local_logger; int ret; if(do_log == 1){ ret = log_generate(pmeinfo, logger); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at log_generate, stream traceid = %s", pmeinfo->stream_traceid); } else{ KNI_LOG_DEBUG(logger, "Succeed at log_generate, stream traceid = %s", pmeinfo->stream_traceid); } } //intercept traffic stat FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KNI_INTCP_BYTES], 0, FS_OP_ADD, pmeinfo->server_bytes + pmeinfo->client_bytes); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KNI_INTCP_STM], 0, FS_OP_ADD, 1); if(pmeinfo->intercept_state == 1){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_INTCP_BYTES], 0, FS_OP_ADD, pmeinfo->server_bytes + pmeinfo->client_bytes); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_INTCP_STM], 0, FS_OP_ADD, 1); } //free pme pme_info_destroy(pmeinfo); } static int judge_stream_can_destroy(struct pme_info *pmeinfo, int caller){ void *logger = g_kni_handle->local_logger; int can_destroy = 0; if(pmeinfo != NULL){ pthread_mutex_lock(&(pmeinfo->lock)); if(caller == CALLER_SAPP){ pmeinfo->sapp_release = 1; } if(caller == CALLER_TFE){ pmeinfo->tfe_release = 1; } if(pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){ can_destroy = 1; } pthread_mutex_unlock(&(pmeinfo->lock)); } else{ KNI_LOG_ERROR(logger, "Failed at judge_stream_can_destroy, pmeinfo = null"); } return can_destroy; } static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){ //http struct http_project* project = (struct http_project*)project_req_get_struct(stream, g_kni_handle->http_project_id); if(project != NULL){ result->protocol = KNI_PROTOCOL_HTTP; result->domain_len = project->host_len; strncpy(result->domain, project->host, strnlen(project->host, sizeof(result->domain) - 1)); return 0; } //ssl enum chello_parse_result chello_status = CHELLO_PARSE_INVALID_FORMAT; struct ssl_chello *chello = NULL; chello = ssl_chello_parse((const unsigned char*)buf, len, &chello_status); if(chello_status == CHELLO_PARSE_SUCCESS){ result->protocol = KNI_PROTOCOL_SSL; if(chello->sni == NULL){ result->domain_len = 0; } else{ result->domain_len = strnlen(chello->sni, KNI_DOMAIN_MAX); strncpy(result->domain, chello->sni, strnlen(chello->sni, sizeof(result->domain) - 1)); } ssl_chello_free(chello); return 0; } ssl_chello_free(chello); result->protocol = KNI_PROTOCOL_UNKNOWN; return 0; } static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, uint16_t size, char *stream_traceid){ void *logger = g_kni_handle->local_logger; int ret = kni_cmsg_set(cmsg, type, value, size); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed set cmsg, type = %d, stream traceid = %s", type, stream_traceid); } return ret; } static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, struct pkt_info *pktinfo, uint16_t *len){ void *logger = g_kni_handle->local_logger; uint16_t bufflen = 0, serialize_len = 0; unsigned char *buff = NULL; uint8_t protocol_type = pmeinfo->protocol == KNI_PROTOCOL_SSL ? 0x1 : 0x0; struct kni_cmsg *cmsg = kni_cmsg_init(); int policy_id = -1; char *trace_id = NULL; uint32_t seq = pktinfo->tcphdr->seq; uint32_t ack = pktinfo->tcphdr->ack_seq; uint16_t client_mss = htons(pmeinfo->client_tcpopt->mss); uint16_t server_mss = htons(pmeinfo->server_tcpopt->mss); uint16_t client_window = htons(pmeinfo->client_window); uint16_t server_window = htons(pmeinfo->server_window); //seq int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //ack ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client mss ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server mss ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //both = 1, send to tfe if(pmeinfo->client_tcpopt->wscale_set && pmeinfo->server_tcpopt->wscale_set){ //client wscale ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server wscale ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; } //client sack ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server sack ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client timestamp ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts_set), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server timestamp ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts_set), 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //protocol ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //client window ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //server window ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2, pmeinfo->stream_traceid); if(ret < 0) goto error_out; //maat policy id policy_id = pmeinfo->policy_id; ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id), pmeinfo->stream_traceid); if(ret < 0) goto error_out; //stream trace id trace_id = pmeinfo->stream_traceid; ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id, strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)), pmeinfo->stream_traceid); if(ret < 0) goto error_out; bufflen = kni_cmsg_serialize_size_get(cmsg); buff = (unsigned char*)ALLOC(char, bufflen); serialize_len = 0; ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret = %d, stream traceid = %s", ret, pmeinfo->stream_traceid); goto error_out; } *len = serialize_len; kni_cmsg_destroy(cmsg); return buff; error_out: kni_cmsg_destroy(cmsg); return NULL; } static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktinfo, int *len){ //tcp option: kind 88, len 4, control_info_len char *new_pkt = (char*)ALLOC(struct wrapped_packet, 1); int offset = 0; //iphdr if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ memcpy(new_pkt, (void*)pktinfo->iphdr.v6, pktinfo->iphdr_len); } else{ memcpy(new_pkt, (void*)pktinfo->iphdr.v4, pktinfo->iphdr_len); } offset += pktinfo->iphdr_len; //tcphdr struct tcphdr *tcphdr = (struct tcphdr*)(new_pkt + offset); memcpy(new_pkt + offset, (void*)pktinfo->tcphdr, 20); offset += 20; tcphdr->doff = pktinfo->tcphdr->doff + 1; struct tcp_option_restore *opt = ALLOC(struct tcp_option_restore, 1); opt->kind = 88; opt->len = 4; opt->offset = htons(pktinfo->data_len); memcpy(new_pkt + offset, (void*)opt, 4); offset += 4; memcpy(new_pkt + offset, (void*)((char*)pktinfo->tcphdr + 20), pktinfo->tcphdr_len - 20); offset += pktinfo->tcphdr_len - 20; //data memcpy(new_pkt + offset, (void*)pktinfo->data, pktinfo->data_len); offset += pktinfo->data_len; //kni_cmsg_serialize_header uint16_t header_len = 0; unsigned char* header = kni_cmsg_serialize_header_new(pmeinfo, pktinfo, &header_len); memcpy(new_pkt + offset, (void*)header, header_len); offset += header_len; FREE(&header); //ipv6 if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ kni_ipv6_header_parse((void*)new_pkt, pktinfo); pktinfo->iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_plen = htons(offset - sizeof(ip6_hdr)); pktinfo->tcphdr->check = 0; pktinfo->tcphdr->check = kni_tcp_checksum_v6((void*)pktinfo->tcphdr, offset - pktinfo->iphdr_len, pktinfo->iphdr.v6->ip6_src, pktinfo->iphdr.v6->ip6_dst); } else{ struct iphdr *iphdr = (struct iphdr*)new_pkt; iphdr->tot_len = htons(offset); //must set check = 0 iphdr->check = 0; iphdr->check = kni_ip_checksum((void*)iphdr, pktinfo->iphdr_len); //tcphdr: checkdum tcphdr->check = 0; tcphdr->check = kni_tcp_checksum((void*)tcphdr, offset - pktinfo->iphdr_len, iphdr->saddr, iphdr->daddr); } *len = offset; return new_pkt; } static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, uint16_t raw_len, int thread_seq, int tfe_id, addr_type_t addr_type){ void *logger = g_kni_handle->local_logger; marsio_buff_t *tx_buffs[BURST_MAX]; int index = -1; for(int i = 0; i < handle->tfe_enabled_node_count; i++){ if(handle->tfe_enabled_nodes[i].tfe_id == tfe_id){ index = i; break; } } if(index == -1){ KNI_LOG_ERROR(logger, "tfd %d = disabled"); return -1; } struct mr_vdev *dev_eth_handler = handle->tfe_enabled_nodes[index].dev_eth_handler; struct mr_sendpath *dev_eth_sendpath = handle->tfe_enabled_nodes[index].dev_eth_sendpath; char *src_mac = handle->src_mac_addr; char *dst_mac = handle->tfe_enabled_nodes[index].mac_addr; //only send one packet, alloc_ret <= nr_send <= BURST_MAX int nr_send = 1; int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq); if (alloc_ret < 0){ KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d", alloc_ret, thread_seq); return -1; } for(int i = 0; i < nr_send; i++){ char* dst_data = marsio_buff_append(tx_buffs[i], raw_len + 14); //ethernet_header[14] struct ethhdr *ether_hdr = (struct ethhdr*)dst_data; memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest)); memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source)); if(addr_type == ADDR_TYPE_IPV6){ ether_hdr->h_proto = htons(ETH_P_IPV6); } else{ ether_hdr->h_proto = htons(ETH_P_IP); } memcpy((char*)dst_data + sizeof(*ether_hdr), raw_data, raw_len); } marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, nr_send); return 0; } static int wrapped_kni_header_parse(const void *a_packet, struct pme_info *pmeinfo, struct pkt_info *pktinfo){ void *logger = g_kni_handle->local_logger; if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ int ret = kni_ipv6_header_parse(a_packet, pktinfo); if(ret < 0){ char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret); KNI_LOG_DEBUG(logger, "Stream error: failed at parse ipv6 header, errmsg = %s, stream treaceid = %s", errmsg, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); return -1; } } else{ int ret = kni_ipv4_header_parse(a_packet, pktinfo); if(ret < 0){ char *errmsg = kni_ipv4_errmsg_get((enum kni_ipv4hdr_parse_error)ret); KNI_LOG_ERROR(logger, "Stream error: failed at parse ipv4 header, errmsg = %s, stream treaceid = %s", errmsg, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); return -1; } } return 0; } static int tuple2stream_htable_key_get_v4_by_packet(struct pkt_info *pktinfo, struct stream_tuple4_v4 *key, int *reversed){ if(pktinfo->iphdr.v4->saddr < pktinfo->iphdr.v4->daddr){ key->saddr = pktinfo->iphdr.v4->saddr; key->daddr = pktinfo->iphdr.v4->daddr; key->source = pktinfo->tcphdr->source; key->dest = pktinfo->tcphdr->dest; *reversed = 0; } else{ key->saddr = pktinfo->iphdr.v4->daddr; key->daddr = pktinfo->iphdr.v4->saddr; key->source = pktinfo->tcphdr->dest; key->dest = pktinfo->tcphdr->source; *reversed = 1; } return 0; } static int tuple2stream_htable_key_get_v6_by_packet(struct pkt_info *pktinfo, struct stream_tuple4_v6 *key, int *reversed){ if(memcmp((void*)&(pktinfo->iphdr.v6->ip6_src), (void*)&(pktinfo->iphdr.v6->ip6_dst), sizeof(key->saddr)) < 0){ memcpy(key->saddr, &(pktinfo->iphdr.v6->ip6_src), sizeof(key->saddr)); memcpy(key->daddr, &(pktinfo->iphdr.v6->ip6_dst), sizeof(key->daddr)); key->source = pktinfo->tcphdr->source; key->dest = pktinfo->tcphdr->dest; *reversed = 0; } else{ memcpy(key->saddr, &(pktinfo->iphdr.v6->ip6_dst), sizeof(key->saddr)); memcpy(key->daddr, &(pktinfo->iphdr.v6->ip6_src), sizeof(key->daddr)); key->source = pktinfo->tcphdr->dest; key->dest = pktinfo->tcphdr->source; *reversed = 1; } return 0; } static int tuple2stream_htable_key_get_v4_by_stream(const struct streaminfo *stream, struct stream_tuple4_v4 *key, int *reversed){ if(stream->addr.tuple4_v4->saddr < stream->addr.tuple4_v4->daddr){ key->saddr = stream->addr.tuple4_v4->saddr; key->daddr = stream->addr.tuple4_v4->daddr; key->source = stream->addr.tuple4_v4->source; key->dest = stream->addr.tuple4_v4->dest; *reversed = 0; } else{ key->saddr = stream->addr.tuple4_v4->daddr; key->daddr = stream->addr.tuple4_v4->saddr; key->source = stream->addr.tuple4_v4->dest; key->dest = stream->addr.tuple4_v4->source; *reversed = 1; } return 0; } static int tuple2stream_htable_key_get_v6_by_stream(const struct streaminfo *stream, struct stream_tuple4_v6 *key, int *reversed){ if(memcmp(stream->addr.tuple4_v6->saddr, stream->addr.tuple4_v6->daddr, sizeof(key->saddr)) < 0){ memcpy(key->saddr, stream->addr.tuple4_v6->saddr, sizeof(key->saddr)); memcpy(key->daddr, stream->addr.tuple4_v6->daddr, sizeof(key->daddr)); key->source = stream->addr.tuple4_v6->source; key->dest = stream->addr.tuple4_v6->dest; *reversed = 0; } else{ memcpy(key->saddr, stream->addr.tuple4_v6->daddr, sizeof(key->saddr)); memcpy(key->daddr, stream->addr.tuple4_v6->saddr, sizeof(key->daddr)); key->source = stream->addr.tuple4_v6->dest; key->dest = stream->addr.tuple4_v6->source; *reversed = 1; } return 0; } static int tuple2stream_htable_add(MESA_htable_handle tuple2stream_htable, addr_type_t addr_type, struct pkt_info *pktinfo, struct streaminfo *stream, struct pme_info *pmeinfo){ void *logger = g_kni_handle->local_logger; int ret; struct tuple2stream_htable_value *value = ALLOC(struct tuple2stream_htable_value, 1); value->stream = stream; value->pmeinfo = pmeinfo; value->route_dir = stream->routedir; //ipv6 if(addr_type == ADDR_TYPE_IPV6){ struct stream_tuple4_v6 key; tuple2stream_htable_key_get_v6_by_packet(pktinfo, &key, &(value->reversed)); ret = MESA_htable_add(tuple2stream_htable, (const unsigned char *)&key, sizeof(key), (const void*)value); if(ret < 0){ char key_str[KNI_ADDR_MAX]; kni_addr_trans_v6(&key, key_str, sizeof(key_str)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_FAIL], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table = tuple2stream_htable, key = %s, key_size = %d, ret = %d", key_str, sizeof(key), ret); } else{ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_SUCC], 0, FS_OP_ADD, 1); } } //ipv4 else{ struct stream_tuple4_v4 key; tuple2stream_htable_key_get_v4_by_packet(pktinfo, &key, &(value->reversed)); ret = MESA_htable_add(tuple2stream_htable, (const unsigned char *)&key, sizeof(key), (const void*)value); if(ret < 0){ char key_str[KNI_ADDR_MAX]; kni_addr_trans_v4(&key, key_str, sizeof(key_str)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_FAIL], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table = tuple2stream_htable, key = %s, key_size = %d, ret = %d", key_str, sizeof(key), ret); } else{ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_SUCC], 0, FS_OP_ADD, 1); } } return ret; } static char pending_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){ void *logger = g_kni_handle->local_logger; struct pkt_info pktinfo; int ret = wrapped_kni_header_parse(a_packet, pmeinfo, &pktinfo); if(ret < 0){ pmeinfo->error = STREAM_ERROR_IPHDR_PARSE_FAIL; return APP_STATE_FAWPKT | APP_STATE_DROPME; } if(!pktinfo.tcphdr->syn){ //pending_opstate not syn, bypass and dropme KNI_LOG_DEBUG(logger, "Stream error: pending opstate, not syn, stream traceid = %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_PENDING_NO_SYN; return APP_STATE_FAWPKT | APP_STATE_DROPME; } //dup traffic detect if(g_kni_handle->dup_traffic_switch == 1){ if(pmeinfo->syn_packet == NULL){ struct dup_traffic_dabloom_key *syn_packet = ALLOC(struct dup_traffic_dabloom_key, 1); dup_traffic_dabloom_key_get(&pktinfo, syn_packet); pmeinfo->syn_packet = syn_packet; } else{ struct dup_traffic_dabloom_key *syn_packet = ALLOC(struct dup_traffic_dabloom_key, 1); dup_traffic_dabloom_key_get(&pktinfo, syn_packet); if(memcmp(pmeinfo->syn_packet, syn_packet, sizeof(*syn_packet)) == 0){ pmeinfo->has_dup_syn = 1; } FREE(&(pmeinfo->syn_packet)); pmeinfo->syn_packet = syn_packet; } } pmeinfo->client_window = ntohs(pktinfo.tcphdr->window); pmeinfo->client_tcpopt = kni_get_tcpopt(pktinfo.tcphdr, pktinfo.tcphdr_len); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } static int traceid2pme_htable_add(struct pme_info *pmeinfo){ void *logger = g_kni_handle->local_logger; int key_size =0, ret; key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)); ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid), key_size, (const void*)pmeinfo); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_htable: Failed at add," "table = traceid2pme_htable, key = %s, ret = %d", pmeinfo->stream_traceid, ret); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1); } else{ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1); } return ret; } int tuple2stream_htable_del(MESA_htable_handle handle, const struct streaminfo *stream){ void *logger = g_kni_handle->local_logger; int reversed = 0, ret; //ipv6 if(stream->addr.addrtype == ADDR_TYPE_IPV6){ struct stream_tuple4_v6 key; tuple2stream_htable_key_get_v6_by_stream(stream, &key, &reversed); ret = MESA_htable_del(handle, (const unsigned char *)(&key), sizeof(key), NULL); if(ret < 0){ char key_str[KNI_ADDR_MAX]; kni_addr_trans_v6(&key, key_str, sizeof(key_str)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_FAIL], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table = %s, key = %s, key_size = %d, ret = %d", "tuple2stream_htable", key_str, sizeof(key), ret); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_SUCC], 0, FS_OP_ADD, 1); } //ipv4 else{ struct stream_tuple4_v4 key; tuple2stream_htable_key_get_v4_by_stream(stream, &key, &reversed); ret = MESA_htable_del(handle, (const unsigned char *)(&key), sizeof(key), NULL); if(ret < 0){ char key_str[KNI_ADDR_MAX]; kni_addr_trans_v4(&key, key_str, sizeof(key_str)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_FAIL], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table = %s, key = %s, key_size = %d, ret = %d", "tuple2stream_htable", key_str, sizeof(key), ret); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_SUCC], 0, FS_OP_ADD, 1); } return ret; } static void traceid2pme_htable_del(struct pme_info *pmeinfo){ //del traceid2pme htable if(pmeinfo->action == KNI_ACTION_INTERCEPT){ void *logger = g_kni_handle->local_logger; int key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)); int ret; ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_traceid, key_size, NULL); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table = %s, key = %s, key_size = %d, ret = %d", "traceid2pme_htable", pmeinfo->stream_traceid, key_size, ret); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL], 0, FS_OP_ADD, 1); } else{ //KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table = %s, key = %s, key_size = %d", // "traceid2pme_htable", pmeinfo->stream_traceid, key_size); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1); } } } static int first_data_intercept(struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, char *stream_addr, int thread_seq){ void *logger = g_kni_handle->local_logger; int ret; //only intercept: add to tuple2stream_htable ret = tuple2stream_htable_add(g_kni_handle->threads_handle[thread_seq].tuple2stream_htable, pmeinfo->addr_type, pktinfo, stream, pmeinfo); if(ret < 0){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_TUPLE2STM_ADD_FAIL], 0, FS_OP_ADD, 1); KNI_LOG_DEBUG(logger, "Stream error: tuple2stm add fail, stream traceid = %s", pmeinfo->stream_traceid); pmeinfo->error = STREAM_ERROR_TUPLE2STM_ADD_FAIL; return APP_STATE_FAWPKT | APP_STATE_DROPME; } //only intercept: add to traceid2pme htable traceid2pme_htable_add(pmeinfo); //action = KNI_ACTION_INTERCEPT, sendto tfe int len = 0; char *buff = add_cmsg_to_packet(pmeinfo, pktinfo, &len); ret = send_to_tfe(g_kni_handle->marsio_handle, buff, len, thread_seq, pmeinfo->tfe_id, pmeinfo->addr_type); if(ret < 0){ KNI_LOG_DEBUG(logger, "Stream error: failed at send first packet to tfe%d, stream traceid = %s", pmeinfo->tfe_id, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDTO_TFE_FAIL], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_SENDTO_TFE_FAIL; FREE(&buff); tuple2stream_htable_del(g_kni_handle->threads_handle[thread_seq].tuple2stream_htable, stream); traceid2pme_htable_del(pmeinfo); return APP_STATE_FAWPKT | APP_STATE_DROPME; } else{ KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream traceid = %s", pmeinfo->tfe_id, pmeinfo->stream_traceid); } FREE(&buff); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_STM], 0, FS_OP_ADD, 1); return APP_STATE_DROPPKT | APP_STATE_GIVEME; } static int dabloom_search(struct pkt_info *pktinfo, int thread_seq){ void *logger = g_kni_handle->local_logger; struct dup_traffic_dabloom_key bloom_key; memset(&bloom_key, 0, sizeof(bloom_key)); dup_traffic_dabloom_key_get(pktinfo, &bloom_key); int ret = expiry_dablooms_search(g_kni_handle->threads_handle[thread_seq].dabloom_handle, (const char*)&bloom_key, sizeof(bloom_key)); //ret = 1, = dup packet, bypass the packet if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at expiry_dablooms_search, errmsg = %s", expiry_dablooms_errno_trans((enum expiry_dablooms_errno)ret)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_FAIL], 0, FS_OP_ADD, 1); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_SUCC], 0, FS_OP_ADD, 1); uint64_t count = 0; expiry_dablooms_element_count_get(g_kni_handle->threads_handle[thread_seq].dabloom_handle, &count); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[0], g_kni_fs_handle->column_ids[thread_seq], FS_OP_SET, count); return ret; } static int dabloom_add(struct pkt_info *pktinfo, int thread_seq){ void *logger = g_kni_handle->local_logger; struct dup_traffic_dabloom_key bloom_key; memset(&bloom_key, 0, sizeof(bloom_key)); dup_traffic_dabloom_key_get(pktinfo, &bloom_key); int ret = expiry_dablooms_add(g_kni_handle->threads_handle[thread_seq].dabloom_handle, (const char*)&bloom_key, sizeof(bloom_key)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at expiry_dablooms_add, errmsg = %s", expiry_dablooms_errno_trans((enum expiry_dablooms_errno)ret)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_ADD_FAIL], 0, FS_OP_ADD, 1); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_ADD_SUCC], 0, FS_OP_ADD, 1); uint64_t count = 0; expiry_dablooms_element_count_get(g_kni_handle->threads_handle[thread_seq].dabloom_handle, &count); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[0], g_kni_fs_handle->column_ids[thread_seq], FS_OP_SET, count); return ret; } static char data_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){ //pmeinfo->tfe_release = 1: intercept, tfe end first. DO NOT droppkt and dropme if(pmeinfo->tfe_release == 1){ pmeinfo->server_bytes=stream->ptcpdetail->serverbytes; pmeinfo->client_bytes=stream->ptcpdetail->clientbytes; pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum; pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum; pmeinfo->dir=stream->dir; #if 0 return APP_STATE_DROPPKT | APP_STATE_DROPME; #endif } void *logger = g_kni_handle->local_logger; struct iphdr *ipv4_hdr = NULL; struct ip6_hdr *ipv6_hdr = NULL; uint16_t len = 0, ret; char stream_addr[KNI_SYMBOL_MAX] = ""; if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ kni_addr_trans_v6(stream->addr.tuple4_v6, stream_addr, sizeof(stream_addr)); } else{ kni_addr_trans_v4(stream->addr.tuple4_v4, stream_addr, sizeof(stream_addr)); } //parse ipv4/6 header struct pkt_info pktinfo; ret = wrapped_kni_header_parse(a_packet, pmeinfo, &pktinfo); if(ret < 0){ pmeinfo->error = STREAM_ERROR_IPHDR_PARSE_FAIL; return APP_STATE_FAWPKT | APP_STATE_DROPME; } //pmeinfo->action has only 3 value: KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS switch (pmeinfo->action){ case KNI_ACTION_NONE: break; case KNI_ACTION_INTERCEPT: //search dabloom if(g_kni_handle->dup_traffic_switch == 1){ if(pmeinfo->has_dup_traffic == 1){ //ret = 1, = dup packet, bypass the packet ret = dabloom_search(&pktinfo, thread_seq); if(ret == 1){ return APP_STATE_FAWPKT | APP_STATE_GIVEME; } } } if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ ipv6_hdr = (struct ip6_hdr*)a_packet; len = ntohs(ipv6_hdr->ip6_ctlun.ip6_un1.ip6_un1_plen) + sizeof(struct ip6_hdr); } else{ ipv4_hdr = (struct iphdr*)a_packet; len = ntohs(ipv4_hdr->tot_len); } ret = send_to_tfe(g_kni_handle->marsio_handle, (char*)a_packet, len, thread_seq, pmeinfo->tfe_id, pmeinfo->addr_type); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream traceid = %s", pmeinfo->tfe_id, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDTO_TFE_FAIL], 0, FS_OP_ADD, 1); } return APP_STATE_DROPPKT | APP_STATE_GIVEME; case KNI_ACTION_BYPASS: return APP_STATE_FAWPKT | APP_STATE_GIVEME; default: assert(0); break; } //first data > 1500, bypass and dropme if(pktinfo.ip_totlen > KNI_DEFAULT_MTU){ pmeinfo->error = STREAM_ERROR_EXCEED_MTU; KNI_LOG_DEBUG(logger, "Stream error: first data packet exceed MTU(1500), stream traceid = %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_EXCEED_MTU], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_DROPME; } // syn/ack if(pktinfo.tcphdr->syn && pktinfo.tcphdr->ack){ pmeinfo->server_window = ntohs(pktinfo.tcphdr->window); pmeinfo->server_tcpopt = kni_get_tcpopt(pktinfo.tcphdr, pktinfo.tcphdr_len); //dup traffic detect if(g_kni_handle->dup_traffic_switch == 1){ if(pmeinfo->syn_ack_packet == NULL){ struct dup_traffic_dabloom_key *syn_ack_packet = ALLOC(struct dup_traffic_dabloom_key, 1); dup_traffic_dabloom_key_get(&pktinfo, syn_ack_packet); pmeinfo->syn_ack_packet = syn_ack_packet; } else{ struct dup_traffic_dabloom_key *syn_ack_packet = ALLOC(struct dup_traffic_dabloom_key, 1); dup_traffic_dabloom_key_get(&pktinfo, syn_ack_packet); if(memcmp(pmeinfo->syn_ack_packet, syn_ack_packet, sizeof(*syn_ack_packet)) == 0){ pmeinfo->has_dup_syn_ack = 1; } FREE(&(pmeinfo->syn_ack_packet)); pmeinfo->syn_ack_packet = syn_ack_packet; } } return APP_STATE_FAWPKT | APP_STATE_GIVEME; } //no data, maybe ack if(pktinfo.data_len <= 0){ return APP_STATE_FAWPKT | APP_STATE_GIVEME; } //not double dir, bypass and dropme if(stream->dir != DIR_DOUBLE){ KNI_LOG_DEBUG(logger, "Stream error: single dir = %d, stream traceid = %s", stream->dir, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SINGLE_DIR], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_SINGLE_DIR; return APP_STATE_FAWPKT | APP_STATE_DROPME; } struct protocol_identify_result protocol_identify_res; memset(&protocol_identify_res, 0, sizeof(protocol_identify_res)); protocol_identify(stream, pktinfo.data, pktinfo.data_len, &protocol_identify_res); pmeinfo->protocol = protocol_identify_res.protocol; switch(pmeinfo->protocol){ //can not identify protocol from first data packet, bypass and dropme case KNI_PROTOCOL_UNKNOWN: KNI_LOG_DEBUG(logger, "Stream error: failed at protocol_identify, stream traceid = %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PROTO_UNKNOWN], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_PROTOCOL_UNKNOWN; return APP_STATE_FAWPKT | APP_STATE_DROPME; case KNI_PROTOCOL_SSL: strncpy(pmeinfo->domain.sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->domain.sni) - 1)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SSL_STM], 0, FS_OP_ADD, 1); break; case KNI_PROTOCOL_HTTP: strncpy(pmeinfo->domain.host, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->domain.host) - 1)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_HTTP_STM], 0, FS_OP_ADD, 1); break; default: break; } //receive client hello, but no syn/ack, bypass and dropme if(pmeinfo->client_tcpopt == NULL || pmeinfo->server_tcpopt == NULL){ KNI_LOG_DEBUG(logger, "Stream error: %s, %s, stream traceid = %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "have syn", pmeinfo->server_tcpopt == NULL ? "no syn/ack" : "have syn/ack", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_NO_SYN_ACK; return APP_STATE_FAWPKT | APP_STATE_DROPME; } //dup_traffic_check if(g_kni_handle->dup_traffic_switch == 1){ //has dup traffic if(pmeinfo->has_dup_syn == 1 || pmeinfo->has_dup_syn_ack == 1){ pmeinfo->has_dup_traffic = 1; } if(pmeinfo->has_dup_traffic == 1){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_DUP_TFC_STM], 0, FS_OP_ADD, 1); if(g_kni_handle->dup_traffic_action == KNI_ACTION_BYPASS){ pmeinfo->action = KNI_ACTION_BYPASS; pmeinfo->intercept_state=0; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_DUP_TFC], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; //GIVEME: for session record } } } pmeinfo->action = intercept_policy_scan(g_kni_handle->maat_handle, (struct ipaddr*)(&stream->addr), protocol_identify_res.domain, protocol_identify_res.domain_len, thread_seq, &(pmeinfo->policy_id), &(pmeinfo->do_log), &(pmeinfo->maat_hit)); //policy scan log char *action_str = kni_maat_action_trans(pmeinfo->action); KNI_LOG_INFO(logger, "intercept_policy_scan: %s, %s, policy_id = %d, action = %d(%s), maat_hit = %d, stream traceid = %s", stream_addr, protocol_identify_res.domain, pmeinfo->policy_id, pmeinfo->action, action_str, pmeinfo->maat_hit, pmeinfo->stream_traceid); switch(pmeinfo->action){ case KNI_ACTION_BYPASS: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_POLICY], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); pmeinfo->intercept_state=0; return APP_STATE_FAWPKT | APP_STATE_GIVEME; //GIVEME: for session record case KNI_ACTION_INTERCEPT: pmeinfo->intercept_state=1; return first_data_intercept(stream, pmeinfo, &pktinfo, stream_addr, thread_seq); default: //action != intercept && action != bypass,bypass and dropme KNI_LOG_DEBUG(logger, "Stream error: action %d(%s) = invalid: policy_id = %d, stream traceid = %s, domain = ", pmeinfo->action, action_str, pmeinfo->policy_id, pmeinfo->stream_traceid, protocol_identify_res.domain); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ACTION_INVALID], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_INVALID_ACTION; return APP_STATE_FAWPKT | APP_STATE_DROPME; } } static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, int thread_seq){ //close: a_packet = null, do not sendto tfe clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time)); void *logger = g_kni_handle->local_logger; pmeinfo->server_bytes=stream->ptcpdetail->serverbytes; pmeinfo->client_bytes=stream->ptcpdetail->clientbytes; pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum; pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum; pmeinfo->dir=stream->dir; switch(pmeinfo->action){ case KNI_ACTION_INTERCEPT: //reset clock: when sapp end, start clock MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_traceid, strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid))); tuple2stream_htable_del(g_kni_handle->threads_handle[thread_seq].tuple2stream_htable, stream); return APP_STATE_DROPPKT | APP_STATE_DROPME; case KNI_ACTION_BYPASS: //KNI_LOG_DEBUG(logger, "action = bypass, set tfe_release = 1, stream_trace_id = %s", pmeinfo->stream_traceid); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; //stream has only syn, ack. no data. default: char *action_str = kni_maat_action_trans(pmeinfo->action); pmeinfo->error = STREAM_ERROR_NO_DATA; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_DATA], 0, FS_OP_ADD, 1); KNI_LOG_DEBUG(logger, "Stream error: close_opstate, action %d(%s) = abnormal, stream_traceid = %s", pmeinfo->action, action_str, pmeinfo->stream_traceid); return APP_STATE_FAWPKT | APP_STATE_DROPME; } } //from syn extern "C" char kni_tcpall_entry(struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){ void *logger = g_kni_handle->local_logger; int ret; int can_destroy; struct pme_info *pmeinfo = *(struct pme_info **)pme; if(stream->addr.addrtype == ADDR_TYPE_IPV6){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_STM], 0, FS_OP_ADD, 1); } else{ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4_STM], 0, FS_OP_ADD, 1); } /* a_packet == NULL && not op_state_close, continue close: a_packet may be null, if a_packet = null, do not send to tfe */ if(a_packet == NULL && stream->pktstate != OP_STATE_CLOSE){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } switch(stream->pktstate){ case OP_STATE_PENDING: *pme = pmeinfo = pme_info_new(stream, thread_seq); if(pmeinfo == NULL){ KNI_LOG_ERROR(logger, "Failed at new pmeinfo, bypass and dropme"); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_PME_NEW_FAIL], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_DROPME; } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW_SUCC], 0, FS_OP_ADD, 1); pmeinfo->tfe_id = tfe_mgr_alive_node_get(g_kni_handle->_tfe_mgr, thread_seq); //printf("tfe_id = %d\n", pmeinfo->tfe_id); if(pmeinfo->tfe_id < 0){ KNI_LOG_ERROR(logger, "No alive tfe available, bypass and dropme"); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_NO_TFE], 0, FS_OP_ADD, 1); pme_info_destroy(pmeinfo); return APP_STATE_FAWPKT | APP_STATE_DROPME; } ret = pending_opstate(stream, pmeinfo, a_packet, thread_seq); if(pmeinfo->error < 0){ goto error_out; } break; case OP_STATE_DATA: ret = data_opstate(stream, pmeinfo, a_packet, thread_seq); //exception stream, dropme and destroy pmeinfo if(pmeinfo->error < 0){ goto error_out; } break; case OP_STATE_CLOSE: //sapp stream close ret = close_opstate(stream, pmeinfo, thread_seq); if(pmeinfo->error < 0){ goto error_out; } break; default: ret = APP_STATE_FAWPKT | APP_STATE_GIVEME; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STATE_UNKNOWN], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(logger, "Unknown stream opstate %d, stream traceid = %s", stream->pktstate, pmeinfo->stream_traceid); break; } //sapp release: bypass or intercept if((ret & APP_STATE_DROPME)){ can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_SAPP); if(can_destroy == 1){ if(pmeinfo->action == KNI_ACTION_INTERCEPT){ traceid2pme_htable_del(pmeinfo); } stream_destroy(pmeinfo, pmeinfo->do_log); } } return ret; //error out: stream error, send log and destroy_pme, do not need to del htable error_out: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_ERR], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); if(pmeinfo != NULL){ stream_destroy(pmeinfo, 1); } return ret; } void http_project_free(int thread_seq, void *project_req_value){ FREE(&project_req_value); } static int http_project_init(){ void *logger = g_kni_handle->local_logger; int id = project_producer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT, http_project_free); if(id < 0){ KNI_LOG_ERROR(logger, "Failed at project_producer_register, project name = %s, ret = %d", HTTP_PROJECT_NAME, id); return -1; } id = project_customer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT); if(id < 0){ KNI_LOG_ERROR(logger, "Failed at project_customer_register, project name = %s, ret = %d", HTTP_PROJECT_NAME, id); return -1; } return id; } extern "C" char kni_http_entry(stSessionInfo* session_info, void **pme, int thread_seq, struct streaminfo *a_stream, const void *a_packet){ http_infor* http_info = (http_infor*)(session_info->app_info); //only process first http session if(http_info->http_session_seq != 1){ return PROT_STATE_DROPME; } if(session_info->prot_flag != HTTP_HOST){ return PROT_STATE_GIVEME; } int host_len = MIN(session_info->buflen, KNI_DEFAULT_MTU); struct http_project* host_info = ALLOC(struct http_project, 1); host_info->host_len = host_len; memcpy(host_info->host, session_info->buf, host_len); if(project_req_add_struct(a_stream, g_kni_handle->http_project_id, host_info) < 0){ FREE(&host_info); host_info = NULL; } return PROT_STATE_DROPME; } static void kni_marsio_destroy(struct kni_marsio_handle *handle){ if(handle != NULL){ if(handle->instance != NULL){ marsio_destory(handle->instance); } } FREE(&handle); handle = NULL; } int tuple2stream_htable_search(MESA_htable_handle handle, struct ethhdr *ether_hdr, int thread_seq){ void *logger = g_kni_handle->local_logger; if(ether_hdr->h_proto != htons(ETH_P_IP) && ether_hdr->h_proto != htons(ETH_P_IPV6)){ return -1; } void *raw_packet = (char*)ether_hdr + sizeof(*ether_hdr); tuple2stream_htable_value *value = NULL; struct pkt_info pktinfo; int reversed = 0, ret; char key_str[KNI_ADDR_MAX]; //ipv6 if(ether_hdr->h_proto == htons(ETH_P_IPV6)){ ret = kni_ipv6_header_parse(raw_packet, &pktinfo); if(ret < 0){ char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret); KNI_LOG_ERROR(logger, "failed at parse ipv6 header, errmsg = %s", errmsg); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); return -1; } struct stream_tuple4_v6 key; kni_addr_trans_v6(&key, key_str, sizeof(key_str)); tuple2stream_htable_key_get_v6_by_packet(&pktinfo, &key, &reversed); value = (tuple2stream_htable_value*)MESA_htable_search(handle, (const unsigned char*)(&key), sizeof(key)); } //ipv4 else{ ret = kni_ipv4_header_parse(raw_packet, &pktinfo); if(ret < 0){ char *errmsg = kni_ipv4_errmsg_get((enum kni_ipv4hdr_parse_error)ret); KNI_LOG_ERROR(logger, "failed at parse ipv4 header, errmsg = %s", errmsg); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); return -1; } struct stream_tuple4_v4 key; kni_addr_trans_v4(&key, key_str, sizeof(key_str)); tuple2stream_htable_key_get_v4_by_packet(&pktinfo, &key, &reversed); value = (tuple2stream_htable_value*)MESA_htable_search(handle, (const unsigned char*)(&key), sizeof(key)); } if(value == NULL){ KNI_LOG_ERROR(logger, "MESA_htable: failed at search, key = %s", key_str); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_FAIL], 0, FS_OP_ADD, 1); return -1; } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_SUCC], 0, FS_OP_ADD, 1); unsigned char dir = value->stream->routedir; if(reversed == value->reversed){ dir = MESA_dir_reverse(value->stream->routedir); } ret = sapp_inject_pkt(value->stream, SIO_EXCLUDE_THIS_LAYER_HDR, raw_packet, pktinfo.ip_totlen, dir); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at sapp_inject_pkt, stream addr = %s", key_str); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SAPP_INJECT_FAIL], 0, FS_OP_ADD, 1); return -1; } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SAPP_INJECT_SUCC], 0, FS_OP_ADD, 1); //add to dabloom if(g_kni_handle->dup_traffic_switch == 1){ if(value->pmeinfo->has_dup_traffic == 1){ ret = dabloom_add(&pktinfo, thread_seq); if(ret < 0){ return -1; } } } return 0; } extern "C" char kni_polling_all_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){ MESA_htable_handle tuple2stream_htable = g_kni_handle->threads_handle[thread_seq].tuple2stream_htable; //polling tfe for(int i = 0; i < g_kni_handle->marsio_handle->tfe_enabled_node_count; i++){ marsio_buff_t *rx_buffs[BURST_MAX]; int nr_burst = 1; struct mr_vdev *dev_eth_handler = g_kni_handle->marsio_handle->tfe_enabled_nodes[i].dev_eth_handler; //receive from tfe, nr_recv <= nr_burst <= BURST_MAX int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst); if(nr_recv <= 0){ continue; } for(int j = 0; j < nr_recv; j++){ struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]); tuple2stream_htable_search(tuple2stream_htable, ether_hdr, thread_seq); } } return 0; } static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type, uint16_t value_size_max, void *logger){ uint16_t value_size = 0; unsigned char *value = NULL; int ret = kni_cmsg_get(cmsg, type, &value_size, &value); if(ret < 0){ if(ret == KNI_CMSG_INVALID_TYPE){ KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type = %d, ret = %d, stream traceid = %s", type, ret, pmeinfo->stream_traceid); } return -1; } if(value_size > value_size_max){ KNI_LOG_ERROR(logger, "kni_cmsg_get: type = %d, size = %d, which should <= %d, stream traceid = %s", type, value_size, value_size_max, pmeinfo->stream_traceid); return -1; } switch(type) { case TFE_CMSG_SSL_INTERCEPT_STATE: memcpy((char*)&(pmeinfo->intercept_state), value, value_size); break; case TFE_CMSG_SSL_UPSTREAM_LATENCY: memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size); break; case TFE_CMSG_SSL_DOWNSTREAM_LATENCY: memcpy((char*)&(pmeinfo->ssl_client_side_latency), value, value_size); break; case TFE_CMSG_SSL_UPSTREAM_VERSION: memcpy(pmeinfo->ssl_server_side_version, value, value_size); break; case TFE_CMSG_SSL_DOWNSTREAM_VERSION: memcpy(pmeinfo->ssl_client_side_version, value, value_size); break; case TFE_CMSG_SSL_PINNING_STATE: memcpy((char*)&(pmeinfo->pinningst), value, value_size); break; case TFE_CMSG_SSL_CERT_VERIFY: memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size); break; case TFE_CMSG_SSL_ERROR: memcpy((char*)&(pmeinfo->ssl_error), value, value_size); break; default: break; } return 0; } static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size, void *user_args){ struct traceid2pme_search_cb_args *args = (struct traceid2pme_search_cb_args*)user_args; void *logger = args->logger; struct kni_cmsg *cmsg = args->cmsg; struct pme_info *pmeinfo = (struct pme_info*)data; int can_destroy; if(pmeinfo != NULL){ wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger); clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time)); KNI_LOG_DEBUG(logger, "recv cmsg from tfe, stream traceid = %s", pmeinfo->stream_traceid); can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_TFE); if(can_destroy == 1){ traceid2pme_htable_del(pmeinfo); stream_destroy(pmeinfo, pmeinfo->do_log); } } kni_cmsg_destroy(cmsg); return 0; } static void* thread_tfe_cmsg_receiver(void *args){ struct thread_tfe_cmsg_receiver_args *_args = (struct thread_tfe_cmsg_receiver_args*)args; const char *profile = _args->profile; const char *section = "tfe_cmsg_receiver"; void *logger = _args->logger; char listen_eth[KNI_SYMBOL_MAX]; uint32_t listen_ip; int listen_port = -1; char buff[KNI_MTU]; int sockfd = 0; struct sockaddr_in server_addr, client_addr; int ret = MESA_load_profile_string_nodef(profile, section, "listen_eth", listen_eth, sizeof(listen_eth)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: listen_eth not set, profile = %s, section = %s", profile, section); goto error_out; } ret = MESA_load_profile_int_nodef(profile, section, "listen_port", &listen_port); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: listen_port not set, profile = %s, section = %s", profile, section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d", section, listen_eth, listen_port); FREE(&args); //create socket sockfd = socket(AF_INET, SOCK_DGRAM, 0); if(sockfd < 0){ KNI_LOG_ERROR(logger, "Failed at create udp socket, errno = %d, %s", errno, strerror(errno)); goto error_out; } memset(&server_addr, 0, sizeof(server_addr)); memset(&client_addr, 0, sizeof(client_addr)); ret = kni_ipv4_addr_get_by_eth(listen_eth, &listen_ip); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth = %s", listen_eth); goto error_out; } server_addr.sin_family = AF_INET; // IPv4 server_addr.sin_addr.s_addr = listen_ip; server_addr.sin_port = htons(listen_port); //bind ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at bind udp socket, errno = %d, %s", errno, strerror(errno)); goto error_out; } //receive while(true){ socklen_t client_len = sizeof(client_addr); int recv_len = recvfrom(sockfd, (char *)buff, sizeof(buff), MSG_WAITALL, (struct sockaddr*)&client_addr, &client_len); if(recv_len < 0){ KNI_LOG_ERROR(logger, "Failed at recv udp data, errno = %d, %s", errno, strerror(errno)); continue; } //KNI_LOG_DEBUG(logger, "recv udp data: recv_len = %d\n", recv_len); struct kni_cmsg *cmsg = NULL; ret = kni_cmsg_deserialize((const unsigned char*)buff, recv_len, &cmsg); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret = %d", ret); continue; } //get stream_traceid unsigned char *stream_traceid = NULL; uint16_t value_size; ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_traceid); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type = %d, ret = %d", TFE_CMSG_STREAM_TRACE_ID, ret); continue; } //get pme long cb_ret = -1; struct traceid2pme_search_cb_args cb_args; memset((void*)&cb_args, 0, sizeof(cb_args)); cb_args.cmsg = cmsg; cb_args.logger = logger; MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_traceid, value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret); } return NULL; error_out: if(sockfd >= 0){ close(sockfd); } return NULL; } static struct kni_marsio_handle* kni_marsio_init(const char* profile, int tfe_node_count){ void *logger = g_kni_handle->local_logger; const char* section = "marsio"; char appsym[KNI_SYMBOL_MAX]; char src_mac_addr_str[KNI_SYMBOL_MAX]; unsigned int opt_value = 1; int tfe_node_enabled; struct mr_instance *mr_inst = NULL; struct mr_vdev *dev_eth_handler = NULL; struct mr_sendpath *dev_eth_sendpath = NULL; struct kni_marsio_handle *handle = NULL; int j; int ret = MESA_load_profile_string_nodef(profile, section, "appsym", appsym, sizeof(appsym)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: appsym not set, profile = %s, section = %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "src_mac_addr", src_mac_addr_str, sizeof(src_mac_addr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr not set, profile = %s, section = %s", profile, section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n appsym: %s\n src_mac_addr: %s", section, appsym, src_mac_addr_str); mr_inst = marsio_create(); if(mr_inst == NULL){ KNI_LOG_ERROR(logger, "Failed at create marsio instance"); goto error_out; } handle = ALLOC(struct kni_marsio_handle, 1); handle->instance = mr_inst; ret = sscanf(src_mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", &(handle->src_mac_addr[0]), &(handle->src_mac_addr[1]), &(handle->src_mac_addr[2]), &(handle->src_mac_addr[3]), &(handle->src_mac_addr[4]), &(handle->src_mac_addr[5])); if(ret != 6){ KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr = invalid, ret = %d, profile = %s, section = %s", ret, profile, section); goto error_out; } marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value)); marsio_init(mr_inst, appsym); j = 0; for(int i = 0; i < tfe_node_count; i++){ //load tfe conf char _section[KNI_SYMBOL_MAX]; char mac_addr_str[KNI_SYMBOL_MAX]; char dev_eth_symbol[KNI_SYMBOL_MAX]; snprintf(_section, sizeof(_section), "tfe%d", i); MESA_load_profile_int_def(profile, _section, "enabled", &tfe_node_enabled, 1); if(tfe_node_enabled != 1){ continue; } int ret = MESA_load_profile_string_nodef(profile, _section, "mac_addr", mac_addr_str, sizeof(mac_addr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr not set, profile = %s, section = %s", profile, _section); goto error_out; } struct tfe_enabled_node tfe_node; memset(&tfe_node, 0, sizeof(tfe_node)); //ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa ret = sscanf(mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", &(tfe_node.mac_addr[0]), &(tfe_node.mac_addr[1]), &(tfe_node.mac_addr[2]), &(tfe_node.mac_addr[3]), &(tfe_node.mac_addr[4]), &(tfe_node.mac_addr[5])); if(ret != 6){ KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr = invalid, ret = %d, profile = %s, section = %s", ret, profile, _section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, _section, "dev_eth_symbol", dev_eth_symbol, sizeof(dev_eth_symbol)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile = %s, section = %s", profile, _section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n enabled: %d, mac_addr: %s\n dev_eth_symbol: %s", _section, tfe_node_enabled, mac_addr_str, dev_eth_symbol); //eth_handler receive thread = thread_count, send thread = thread_count dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->thread_count, g_kni_handle->thread_count); if(dev_eth_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol = %s", dev_eth_symbol); goto error_out; } //sendpath dev_eth_sendpath = marsio_sendpath_create_by_vdev(dev_eth_handler); if(dev_eth_sendpath == NULL){ KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol = %s", dev_eth_symbol); goto error_out; } //tfe_node tfe_node.dev_eth_handler = dev_eth_handler; tfe_node.dev_eth_sendpath = dev_eth_sendpath; tfe_node.tfe_id = i; handle->tfe_enabled_nodes[j++] = tfe_node; } handle->tfe_enabled_node_count = j; //marsio_thread_init(mr_instance); return handle; error_out: kni_marsio_destroy(handle); return NULL; } static void fs_destroy(struct kni_field_stat_handle *fs_handle){ if(fs_handle != NULL){ FS_stop(&(fs_handle->handle)); } FREE(&fs_handle); } static struct kni_field_stat_handle * fs_init(const char *profile){ void *logger = g_kni_handle->local_logger; const char *section = "field_stat"; char stat_path[KNI_PATH_MAX]; struct kni_field_stat_handle *fs_handle = NULL; screen_stat_handle_t handle = NULL; const char *app_name = "fs2_kni"; int value = 0; int ret = MESA_load_profile_string_nodef(profile, section, "stat_path", stat_path, sizeof(stat_path)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: stat_path not set, profile = %s, section = %s", profile, section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n stat_path: %s\n", "field_stat", stat_path); handle = FS_create_handle(); if(handle == NULL){ KNI_LOG_ERROR(logger, "Failed at create FS_create_handle"); goto error_out; } fs_handle = ALLOC(struct kni_field_stat_handle, 1); fs_handle->handle = handle; FS_set_para(handle, APP_NAME, app_name, strlen(app_name) + 1); FS_set_para(handle, OUTPUT_DEVICE, stat_path, strlen(stat_path)+1); value = 0; FS_set_para(handle, FLUSH_BY_DATE, &value, sizeof(value)); value = 1; FS_set_para(handle, PRINT_MODE, &value, sizeof(value)); value = 1; FS_set_para(handle, CREATE_THREAD, &value, sizeof(value)); value = 5; FS_set_para(handle, STAT_CYCLE, &value, sizeof(value)); value = 4096; FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); fs_handle = ALLOC(struct kni_field_stat_handle, 1); fs_handle->handle = handle; fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm"); fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm"); fs_handle->fields[KNI_FIELD_BYP_STM_POLICY] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_policy"); fs_handle->fields[KNI_FIELD_BYP_STM_PME_NEW_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_pme_new_F"); fs_handle->fields[KNI_FIELD_BYP_STM_NO_TFE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_no_tfe"); fs_handle->fields[KNI_FIELD_BYP_STM_ERR] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm_err"); fs_handle->fields[KNI_FIELD_BYP_STM_DUP_TFC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_dup_tfc"); fs_handle->fields[KNI_FIELD_STATE_UNKNOWN] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "state_unknow"); fs_handle->fields[KNI_FIELD_DUP_TFC_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "dup_tfc_stm"); //stream error fs_handle->fields[KNI_FIELD_NO_SYN] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_no_syn"); fs_handle->fields[KNI_FIELD_SINGLE_DIR] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_sig_dir"); fs_handle->fields[KNI_FIELD_PROTO_UNKNOWN] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_pro_unknow"); fs_handle->fields[KNI_FIELD_NO_SA] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_no_s/a"); fs_handle->fields[KNI_FIELD_ACTION_INVALID] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_act_invaid"); fs_handle->fields[KNI_FIELD_NO_DATA] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_no_data"); fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_v4_parse"); fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_v6_parse"); fs_handle->fields[KNI_FIELD_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_exced_mtu"); fs_handle->fields[KNI_FIELD_SENDTO_TFE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "err_sdtfe_F"); fs_handle->fields[KNI_FIELD_STMERR_TUPLE2STM_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "errTup2stmAddF"); //others fs_handle->fields[KNI_FIELD_NULL_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "null_pkt"); fs_handle->fields[KNI_FIELD_IPV4_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv4_stm"); fs_handle->fields[KNI_FIELD_IPV6_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_stm"); fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm"); fs_handle->fields[KNI_FIELD_HTTP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "http_stm"); fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_S"); fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_F"); fs_handle->fields[KNI_FIELD_PME_NEW_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_new"); fs_handle->fields[KNI_FIELD_PME_FREE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_free"); //intercept traffic stat fs_handle->fields[KNI_FIELD_KNI_INTCP_BYTES] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kni_intcp_B"); fs_handle->fields[KNI_FIELD_TFE_INTCP_BYTES] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tfe_intcp_B"); fs_handle->fields[KNI_FIELD_KNI_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kni_intcp_stm"); fs_handle->fields[KNI_FIELD_TFE_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tfe_intcp_stm"); //htable fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_S"); fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_F"); fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_S"); fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_F"); fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_add_S"); fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_add_F"); fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_del_S"); fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_del_F"); fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_srch_S"); fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_srch_F"); fs_handle->fields[KNI_FIELD_SAPP_INJECT_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sapp_inject_S"); fs_handle->fields[KNI_FIELD_SAPP_INJECT_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sapp_inject_F"); fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "bloom_srch_S"); fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "bloom_srch_F"); fs_handle->fields[KNI_FIELD_BLOOM_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "bloom_add_S"); fs_handle->fields[KNI_FIELD_BLOOM_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "bloom_add_F"); for(int i = 0; i < g_kni_handle->marsio_handle->tfe_enabled_node_count; i++){ int tfe_id = g_kni_handle->marsio_handle->tfe_enabled_nodes[i].tfe_id; char tfe_status[KNI_SYMBOL_MAX] = ""; snprintf(tfe_status, sizeof(tfe_status), "tfe%d", tfe_id); fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + i] = FS_register(handle, FS_STYLE_STATUS, FS_CALC_CURRENT, tfe_status); } //table fs_handle->column_cnt = g_kni_handle->thread_count; char buff[KNI_PATH_MAX]; for(int i = 0; i < fs_handle->column_cnt; i++){ snprintf(buff, sizeof(buff), "tid%d", i); fs_handle->column_ids[i] = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff); } snprintf(buff, sizeof(buff), "bloom_cnt"); fs_handle->line_ids[0] = FS_register(handle, FS_STYLE_LINE, FS_CALC_SPEED, buff); fs_handle->handle = handle; FS_start(handle); return fs_handle; error_out: fs_destroy(fs_handle); return NULL; } extern "C" void kni_destroy(struct kni_handle *handle){ if(handle != NULL){ } FREE(&handle); handle = NULL; } //eliminate_type: 0:FIFO; 1:LRU //ret: 1: the item can be eliminated; 0: the item can't be eliminated static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){ struct pme_info *pmeinfo = (struct pme_info*)data; int can_destroy; if(pmeinfo->sapp_release == 1){ can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_TFE); if(can_destroy == 1){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1); stream_destroy(pmeinfo, pmeinfo->do_log); return 1; } } return 0; } static void tuple2stream_htable_data_free_cb(void *data){ FREE(&data); } int dup_traffic_dabloom_init(const char *profile, void *logger){ const char *section = "dup_traffic"; MESA_load_profile_int_def(profile, section, "switch", &(g_kni_handle->dup_traffic_switch), 0); KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n switch: %d", g_kni_handle->dup_traffic_switch); if(g_kni_handle->dup_traffic_switch == 1){ unsigned int capacity = 0; char error_rate_str[KNI_SYMBOL_MAX]; double error_rate = 0.05; int expiry_time = 0; MESA_load_profile_int_def(profile, section, "action", &(g_kni_handle->dup_traffic_action), KNI_ACTION_BYPASS); MESA_load_profile_uint_def(profile, section, "capacity", &capacity, 1000000); MESA_load_profile_string_def(profile, section, "error_rate", error_rate_str, sizeof(error_rate_str), "0.05"); MESA_load_profile_int_def(profile, section, "expiry_time", &expiry_time, 30); KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n action: %d\n capacity: %d\n error_rate: %s\n expiry_time: %d", section, capacity, error_rate_str, expiry_time); error_rate = atof(error_rate_str); for(int i = 0; i < g_kni_handle->thread_count; i++){ struct expiry_dablooms_handle* dabloom_handle = expiry_dablooms_init(capacity, error_rate, expiry_time); if(dabloom_handle == NULL){ KNI_LOG_ERROR(logger, "Failed at expiry_dablooms_init, capacity = %d," "error_rate = %lf, expire_time = %d", capacity, error_rate, expiry_time); return -1; } g_kni_handle->threads_handle[i].dabloom_handle = dabloom_handle; } return 0; } return 0; } extern "C" int kni_init(){ char *kni_git_verison = (char*)KNI_GIT_VERSION; const char *profile = "./etc/kni/kni.conf"; const char *section = "global"; //init logger char log_path[KNI_PATH_MAX] = ""; int tfe_node_count = 0; char manage_eth[KNI_SYMBOL_MAX] = ""; struct kni_send_logger *send_logger = NULL; struct kni_field_stat_handle *fs_handle = NULL; int id = -1; void *local_logger = NULL; int log_level = -1; pthread_t thread_id = -1; struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args; MESA_htable_handle traceid2pme_htable = NULL; struct tfe_mgr *_tfe_mgr = NULL; int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path)); if(ret < 0){ printf("MESA_prof_load: log_path not set, profile = %s, section = %s", profile, section); goto error_out; } ret = MESA_load_profile_int_nodef(profile, section, "log_level", &log_level); if(ret < 0){ printf("MESA_prof_load: log_level not set, profile = %s, section = %s", profile, section); goto error_out; } local_logger = MESA_create_runtime_log_handle(log_path, log_level); if (unlikely(local_logger == NULL)){ printf("Failed at create logger: %s", log_path); goto error_out; } //kni_git_log KNI_LOG_ERROR(local_logger, "----------kni version = %s-----------", kni_git_verison); ret = MESA_load_profile_int_nodef(profile, section, "tfe_node_count", &tfe_node_count); if(ret < 0){ KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_node_count not set, profile = %s, section = %s", profile, section); goto error_out; } if(tfe_node_count > TFE_COUNT_MAX){ KNI_LOG_ERROR(local_logger, "tfe_node_count = %d, exceed the max_tfe_node_count %d", tfe_node_count, TFE_COUNT_MAX); goto error_out; } if(tfe_node_count <= 0){ KNI_LOG_ERROR(local_logger, "tfe_node_count = %d, <= 0", tfe_node_count); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "manage_eth", manage_eth, sizeof(manage_eth)); if(ret < 0){ printf("MESA_prof_load: manage_eth not set, profile = %s, section = %s", profile, section); goto error_out; } KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_node_count: %d\n manage_eth: %s", section, log_path, log_level, tfe_node_count, manage_eth); g_kni_handle = ALLOC(struct kni_handle, 1); g_kni_handle->local_logger = local_logger; //init http_project id = http_project_init(); if(id < 0){ KNI_LOG_ERROR(local_logger, "Failed at init http project, ret = %d", id); goto error_out; } g_kni_handle->http_project_id = id; // get thread count g_kni_handle->thread_count = get_thread_count(); if(g_kni_handle->thread_count <= 0){ KNI_LOG_ERROR(local_logger, "Failed at get_thread_count, ret = %d"); goto error_out; } //init marsio g_kni_handle->marsio_handle = kni_marsio_init(profile, tfe_node_count); if(g_kni_handle->marsio_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init marsio"); goto error_out; } //init maat g_kni_handle->maat_handle = kni_maat_init(profile, local_logger, g_kni_handle->thread_count); if(g_kni_handle->maat_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init maat"); goto error_out; } //init_filedstat fs_handle = fs_init(profile); if(fs_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init field_stat"); goto error_out; } g_kni_fs_handle = fs_handle; //init local_ipv4 ret = kni_ipv4_addr_get_by_eth(manage_eth, &(g_kni_handle->local_ipv4)); if(ret < 0){ KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth = %s", manage_eth); goto error_out; } //init kni_send_logger send_logger = kni_send_logger_init(profile, local_logger); if(send_logger == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", manage_eth); goto error_out; } g_kni_handle->send_logger = send_logger; //init traceid2pme_htable traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", NULL, (void*)traceid2pme_htable_expire_notify_cb, local_logger); if(traceid2pme_htable == NULL){ KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable"); goto error_out; } g_kni_handle->traceid2pme_htable = traceid2pme_htable; //init tuple2stream_htable g_kni_handle->threads_handle = ALLOC(struct per_thread_handle, g_kni_handle->thread_count); for(int i = 0; i < g_kni_handle->thread_count; i++){ MESA_htable_handle tuple2stream_htable = kni_create_htable(profile, "tuple2stream_htable", (void*)tuple2stream_htable_data_free_cb, NULL, local_logger); if(tuple2stream_htable == NULL){ KNI_LOG_ERROR(local_logger, "Failed at kni_create_htable, table = tuple2stream_htable"); goto error_out; } g_kni_handle->threads_handle[i].tuple2stream_htable = tuple2stream_htable; } //init dabloom_handle ret = dup_traffic_dabloom_init(profile, local_logger); if(ret < 0){ goto error_out; } //init tfe_mgr _tfe_mgr = tfe_mgr_init(tfe_node_count, profile, local_logger); if(_tfe_mgr == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init tfe_mgr"); goto error_out; } g_kni_handle->_tfe_mgr = _tfe_mgr; //create thread_tfe_cmsg_receiver cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1); cmsg_receiver_args->logger = local_logger; strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1)); ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func = thread_tfe_cmsg_receiver, errno = %d, errmsg = %s", errno, strerror(errno)); FREE(&cmsg_receiver_args); goto error_out; } return 0; error_out: kni_destroy(g_kni_handle); return -1; }