#include "kni_utils.h" #include "ssl_utils.h" #include "marsio.h" #include "kni_maat.h" #include "MESA/http.h" #include "kni_cmsg.h" #include "uuid/uuid.h" #include "cjson/cJSON.h" #include "kni_send_logger.h" extern int g_iThreadNum; struct kni_handle *g_kni_handle = NULL; struct kni_field_stat_handle *g_kni_fs_handle = NULL; #define HTTP_PROJECT_NAME "kni_http_tag" #define BURST_MAX 1 #define STREAM_TRACE_ID_LEN 37 #define TFE_COUNT_MAX 16 /* TODO: 1. con_duration_ms: how to calculate */ enum kni_protocol{ KNI_PROTOCOL_UNKNOWN = 0, KNI_PROTOCOL_SSL, KNI_PROTOCOL_HTTP, }; struct http_project{ int host_len; char host[KNI_DOMAIN_MAX]; }; struct pme_info{ int protocol; int policy_id; int maat_hit; enum kni_action action; int service; struct kni_tcpopt_info *client_tcpopt; struct kni_tcpopt_info *server_tcpopt; int tfe_id; void *logger; char stream_trace_id[STREAM_TRACE_ID_LEN]; char host[KNI_DOMAIN_MAX]; //http only char sni[KNI_DOMAIN_MAX]; //ssl only //tfe_release = 1: tfe don't need pmeinfo int tfe_release; int sapp_release; //kafka log struct streaminfo *stream; time_t start_time; time_t end_time; uint64_t con_duration_ms; //from tfe, kafka log uint64_t intercept_state; uint64_t pinningst; //defalut 0 uint64_t ssl_server_side_latency; uint64_t ssl_client_side_latency; char ssl_server_side_version[KNI_SYMBOL_MAX]; char ssl_client_side_version[KNI_SYMBOL_MAX]; uint64_t ssl_cert_verify; char ssl_error[KNI_STRING_MAX]; }; struct wrapped_packet{ char data[KNI_MTU]; }; struct tcp_option_restore{ uint8_t kind; uint8_t len; uint16_t offset; }; struct tfe_instance{ struct mr_vdev *dev_eth_handler; struct mr_sendpath *dev_eth_sendpath; char mac_addr[6]; }; struct kni_marsio_handle{ struct mr_instance *instance; struct tfe_instance *tfe_instance_list[TFE_COUNT_MAX]; struct mr_vdev *dev_vxlan_handler; struct mr_sendpath *dev_vxlan_sendpath; char src_mac_addr[6]; }; struct protocol_identify_result{ int protocol; char domain[KNI_DOMAIN_MAX]; int domain_len; }; struct thread_tfe_data_receiver_args{ void *logger; struct kni_marsio_handle *marsio_handle; int tfe_id; }; struct thread_tfe_cmsg_receiver_args{ void *logger; char profile[KNI_SYMBOL_MAX]; }; struct pkt_info{ struct iphdr *iphdr; int iphdr_len; int ip_totlen; struct tcphdr *tcphdr; int tcphdr_len; char *data; int data_len; }; struct kni_handle{ int http_project_id; struct kni_marsio_handle *marsio_handle; struct kni_maat_handle *maat_handle; struct kni_send_logger *send_logger; MESA_htable_handle traceid2pme_htable; int tfe_count; uint32_t local_ipv4; void *local_logger; }; struct traceid2pme_search_cb_args{ struct kni_cmsg *cmsg; void *logger; }; static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread_seq, void *logger){ struct pme_info* pmeinfo = ALLOC(struct pme_info, 1); pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1; uuid_t uu; uuid_generate_random(uu); uuid_unparse(uu, pmeinfo->stream_trace_id); pmeinfo->stream = (struct streaminfo*)stream; pmeinfo->start_time = time(NULL); pmeinfo->logger = logger; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1); return pmeinfo; } static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ //create cjson cJSON *log_obj = cJSON_CreateObject(); //stream_trace_id cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_trace_id); //policy_id cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id); //action cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action); //service cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service); //start_time cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time); //end_time cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time); //con_duration_ms cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time - pmeinfo->start_time) * 1000); //stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port const struct layer_addr *addr = &(pmeinfo->stream->addr); char client_ip_str[INET6_ADDRSTRLEN] = ""; char server_ip_str[INET6_ADDRSTRLEN] = ""; switch(addr->addrtype){ case ADDR_TYPE_IPV4: cJSON_AddNumberToObject(log_obj, "addr_type", 4); inet_ntop(AF_INET, &addr->tuple4_v4->saddr, client_ip_str, sizeof(client_ip_str)); inet_ntop(AF_INET, &addr->tuple4_v4->daddr, server_ip_str, sizeof(server_ip_str)); cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str); cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str); cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source)); cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest)); cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP"); break; case ADDR_TYPE_IPV6: cJSON_AddNumberToObject(log_obj, "addr_type", 6); inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str)); inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str)); cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str); cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str); cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source)); cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest)); cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP"); break; default: break; } //entrance_id: 0 cJSON_AddNumberToObject(log_obj, "entrance_id", 0); //device_id: 0 cJSON_AddNumberToObject(log_obj, "device_id", 0); //link_id: 0 cJSON_AddNumberToObject(log_obj, "link_id", 0); //isp: null cJSON_AddStringToObject(log_obj, "isp", ""); //encap_type: from sapp, 先填0 cJSON_AddNumberToObject(log_obj, "encap_type", 0); //pinning state: from tfe cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst); //intercept state: from tfe cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state); //ssl upstream latency: from tfe cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency); //ssl downstream latency: from tfe cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency); //ssl upstream version: from tfe cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version); //ssl downstream version: from tfe cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version); //ssl cert verify cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify); //direction: 0 cJSON_AddNumberToObject(log_obj, "direction", 0); //stream_dir: from sapp cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->stream->dir); //cap_ip: kni ip char local_ipv4_str[INET6_ADDRSTRLEN]; inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str)); cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str); //addr_list cJSON_AddStringToObject(log_obj, "addr_list", ""); //host: http_only cJSON_AddStringToObject(log_obj, "host", pmeinfo->host); //sni: ssl only cJSON_AddStringToObject(log_obj, "sni", pmeinfo->sni); //c2s_pkt_num cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->stream->ptcpdetail->serverpktnum); //s2c_pkt_num cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->stream->ptcpdetail->clientpktnum); //c2s_byte_num cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->stream->ptcpdetail->serverbytes); //s2c_byte_num cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->stream->ptcpdetail->clientbytes); int ret = -1; char *log_msg = cJSON_PrintUnformatted(log_obj); cJSON_Delete(log_obj); if(log_msg == NULL){ KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print"); goto error_out; } KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg); ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg)); if(ret < 0){ KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d", ret); goto error_out; } cJSON_free(log_msg); return 0; error_out: if(log_msg != NULL){ cJSON_free(log_msg); } return -1; } static void pme_info_destroy(struct pme_info *pmeinfo){ void *logger = pmeinfo->logger; if(pmeinfo != NULL && pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){ int ret = sendlog_to_kafka(pmeinfo, logger); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at sendlog to kafka"); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1); } else{ KNI_LOG_INFO(logger, "Succeed sendlog to kafka"); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1); } if(pmeinfo->client_tcpopt != NULL){ FREE(&(pmeinfo->client_tcpopt)); } if(pmeinfo->server_tcpopt != NULL){ FREE(&(pmeinfo->server_tcpopt)); } FREE(&pmeinfo); } else{ KNI_LOG_DEBUG(logger, "can not free pmeinfo, sapp_release is %d, tfe_release is %d", pmeinfo->sapp_release, pmeinfo->tfe_release); } } static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){ //http struct http_project* project = (struct http_project*)project_req_get_struct(stream, g_kni_handle->http_project_id); if(project != NULL){ result->protocol = KNI_PROTOCOL_HTTP; result->domain_len = project->host_len; strncpy(result->domain, project->host, strnlen(project->host, sizeof(result->domain) - 1)); return 0; } //ssl enum chello_parse_result chello_status = CHELLO_PARSE_INVALID_FORMAT; struct ssl_chello *chello = NULL; chello = ssl_chello_parse((const unsigned char*)buf, len, &chello_status); if(chello_status == CHELLO_PARSE_SUCCESS){ result->protocol = KNI_PROTOCOL_SSL; if(chello->sni == NULL){ result->domain_len = 0; } else{ result->domain_len = strnlen(chello->sni, KNI_DOMAIN_MAX); strncpy(result->domain, chello->sni, strnlen(chello->sni, sizeof(result->domain) - 1)); } ssl_chello_free(chello); return 0; } ssl_chello_free(chello); result->protocol = KNI_PROTOCOL_UNKNOWN; return 0; } static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, uint16_t size){ void *logger = g_kni_handle->local_logger; int ret = kni_cmsg_set(cmsg, type, value, size); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d", type); } return ret; } static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, struct pkt_info *pktinfo, uint16_t *len){ void *logger = g_kni_handle->local_logger; uint16_t bufflen = 0, serialize_len = 0; unsigned char *buff = NULL; uint8_t protocol_type = pmeinfo->protocol == KNI_PROTOCOL_SSL ? 0x1 : 0x0; struct kni_cmsg *cmsg = kni_cmsg_init(); int policy_id = -1; char *trace_id = NULL; uint32_t seq = pktinfo->tcphdr->seq; uint32_t ack = pktinfo->tcphdr->ack_seq; uint16_t client_mss = htons(pmeinfo->client_tcpopt->mss); uint16_t server_mss = htons(pmeinfo->server_tcpopt->mss); //seq int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4); if(ret < 0) goto error_out; //ack ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4); if(ret < 0) goto error_out; //client mss ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2); if(ret < 0) goto error_out; //server mss ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2); if(ret < 0) goto error_out; //client wscale ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1); if(ret < 0) goto error_out; //server wscale ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1); if(ret < 0) goto error_out; //client sack ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1); if(ret < 0) goto error_out; //server sack ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1); if(ret < 0) goto error_out; //client timestamp ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1); if(ret < 0) goto error_out; //server timestamp ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1); if(ret < 0) goto error_out; //protocol ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1); if(ret < 0) goto error_out; //maat policy id policy_id = pmeinfo->policy_id; ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id)); if(ret < 0) goto error_out; //stream trace id trace_id = pmeinfo->stream_trace_id; ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id, strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id))); if(ret < 0) goto error_out; bufflen = kni_cmsg_serialize_size_get(cmsg); buff = (unsigned char*)ALLOC(char, bufflen); serialize_len = 0; ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d", ret); goto error_out; } *len = serialize_len; kni_cmsg_destroy(cmsg); return buff; error_out: kni_cmsg_destroy(cmsg); return NULL; } static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktinfo, int *len){ //tcp option: kind 88, len 4, control_info_len char *new_pkt = (char*)ALLOC(struct wrapped_packet, 1); struct iphdr *iphdr = (struct iphdr*)new_pkt; int offset = 0; //iphdr memcpy(new_pkt, (void*)pktinfo->iphdr, pktinfo->iphdr_len); offset += pktinfo->iphdr_len; //tcphdr struct tcphdr *tcphdr = (struct tcphdr*)(new_pkt + offset); memcpy(new_pkt + offset, (void*)pktinfo->tcphdr, 20); offset += 20; tcphdr->doff = pktinfo->tcphdr->doff + 1; struct tcp_option_restore *opt = ALLOC(struct tcp_option_restore, 1); opt->kind = 88; opt->len = 4; opt->offset = htons(pktinfo->data_len); memcpy(new_pkt + offset, (void*)opt, 4); offset += 4; memcpy(new_pkt + offset, (void*)((char*)pktinfo->tcphdr + 20), pktinfo->tcphdr_len - 20); offset += pktinfo->tcphdr_len - 20; //data memcpy(new_pkt + offset, (void*)pktinfo->data, pktinfo->data_len); offset += pktinfo->data_len; //kni_cmsg_serialize_header uint16_t header_len = 0; unsigned char* header = kni_cmsg_serialize_header_new(pmeinfo, pktinfo, &header_len); memcpy(new_pkt + offset, (void*)header, header_len); offset += header_len; FREE(&header); //iphdr: tot_len iphdr->tot_len = htons(offset); //must set check = 0 iphdr->check = 0; iphdr->check = kni_ip_checksum((void*)iphdr, pktinfo->iphdr_len); //tcphdr: checkdum tcphdr->check = 0; tcphdr->check = kni_tcp_checksum((void*)tcphdr, offset - pktinfo->iphdr_len, iphdr->saddr, iphdr->daddr); *len = offset; return new_pkt; } static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw_len, int thread_seq, int tfe_id){ void *logger = g_kni_handle->local_logger; marsio_buff_t *tx_buffs[BURST_MAX]; unsigned int ret = 1; struct mr_vdev *dev_eth_handler = handle->tfe_instance_list[tfe_id]->dev_eth_handler; struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath; char *src_mac = handle->src_mac_addr; char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr; int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, ret, 0, thread_seq); if (alloc_ret < 0){ KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d", ret, thread_seq); return -1; } char* dst_data = marsio_buff_append(tx_buffs[0], raw_len + 14); //ethernet_header[14] memcpy(dst_data, dst_mac, 6); memcpy(dst_data + 6, src_mac, 6); dst_data[12] = 0x08; dst_data[13] = 0x00; memcpy((char*)dst_data + 14, raw_data, raw_len); marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, ret); return 0; } static char pending_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo){ void *logger = g_kni_handle->local_logger; if(!pktinfo->tcphdr->syn){ //pending_opstate not syn, bypass and dropme KNI_LOG_ERROR(logger, "pending opstate: not syn"); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN_EXP], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; } pmeinfo->client_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } static char data_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){ //pmeinfo->tfe_release = 1: intercept, tfe end first. so droppkt and dropme if(pmeinfo->tfe_release == 1){ return APP_STATE_DROPPKT | APP_STATE_DROPME; } void *logger = g_kni_handle->local_logger; char *buf = (char*)pktinfo->iphdr; int len = pktinfo->ip_totlen; char stream_addr[KNI_SYMBOL_MAX] = ""; int ret; kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); //pmeinfo->action has only 3 value: KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS switch (pmeinfo->action){ case KNI_ACTION_NONE: break; case KNI_ACTION_INTERCEPT: ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream_addr is %s", pmeinfo->tfe_id, stream_addr); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_DROPPKT | APP_STATE_GIVEME; case KNI_ACTION_BYPASS: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_GIVEME; default: break; } // syn/ack if(pktinfo->tcphdr->syn && pktinfo->tcphdr->ack){ pmeinfo->server_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } if(pktinfo->data_len <= 0){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } //not double dir, bypass and dropme if(stream->dir != DIR_DOUBLE){ KNI_LOG_INFO(logger, "dir is %d, bypass, stream addr is %s", stream->dir, stream_addr); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; } struct protocol_identify_result protocol_identify_res; memset(&protocol_identify_res, 0, sizeof(struct protocol_identify_result)); protocol_identify(stream, pktinfo->data, pktinfo->data_len, &protocol_identify_res); pmeinfo->protocol = protocol_identify_res.protocol; switch(pmeinfo->protocol){ //can not identify protocol from first data packet, bypass and dropme case KNI_PROTOCOL_UNKNOWN: KNI_LOG_INFO(logger, "Failed at protocol_identify, bypass and dropme, stream addr is %s\n", pmeinfo->protocol, stream_addr); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STM], 0, FS_OP_ADD, 1); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; case KNI_PROTOCOL_SSL: strncpy(pmeinfo->sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->sni) - 1)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SSL_STM], 0, FS_OP_ADD, 1); break; case KNI_PROTOCOL_HTTP: strncpy(pmeinfo->host, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->host) - 1)); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_HTTP_STM], 0, FS_OP_ADD, 1); break; default: break; } pmeinfo->action = intercept_policy_scan(g_kni_handle->maat_handle, (struct ipaddr*)(&stream->addr), protocol_identify_res.domain, protocol_identify_res.domain_len, thread_seq, &(pmeinfo->policy_id), &(pmeinfo->maat_hit)); //policy scan log char action_str[KNI_SYMBOL_MAX]; kni_maat_action_trans(pmeinfo->action, action_str); KNI_LOG_DEBUG(logger, "intercept_policy_scan: %s, %s, policy_id = %d, action = %d(%s), maat_hit = %d", stream_addr, protocol_identify_res.domain, pmeinfo->policy_id, pmeinfo->action, action_str, pmeinfo->maat_hit); //receive client hello, but no syn/ack, bypass and dropme if(pmeinfo->client_tcpopt == NULL || pmeinfo->server_tcpopt == NULL){ KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "", pmeinfo->server_tcpopt == NULL ? "no syn/ack" : ""); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA_EXP], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; } switch(pmeinfo->action){ case KNI_ACTION_BYPASS: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; case KNI_ACTION_INTERCEPT: //action = KNI_ACTION_INTERCEPT, sendto tfe buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len); ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); } else{ //KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); } FREE(&buf); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_STM], 0, FS_OP_ADD, 1); return APP_STATE_DROPPKT | APP_STATE_GIVEME; default: //action != intercept && action != bypass,bypass and dropme KNI_LOG_ERROR(logger, "Action %d(%s) is invalid, bypass(dropme): policy_id is %d, stream addr is %s, domain is ", pmeinfo->action, action_str, pmeinfo->policy_id, stream_addr, protocol_identify_res.domain); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); pmeinfo->tfe_release = 1; return APP_STATE_FAWPKT | APP_STATE_DROPME; } } static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){ //pmeinfo->tfe_release = 1: intercept, tfe end first. so droppkt and dropme if(pmeinfo->tfe_release == 1){ return APP_STATE_DROPPKT | APP_STATE_DROPME; } //close: also need to send to tfe pmeinfo->end_time = time(NULL); void *logger = g_kni_handle->local_logger; char *buf = (char*)pktinfo->iphdr; char stream_addr[KNI_SYMBOL_MAX] = ""; kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); int len = pktinfo->ip_totlen; int ret; switch(pmeinfo->action){ case KNI_ACTION_INTERCEPT: ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at send last packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); } else{ //KNI_LOG_DEBUG(logger, "Succeed at send last packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr); } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1); //reset clock: when sapp end, start clock MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_trace_id, strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id))); return APP_STATE_DROPPKT | APP_STATE_DROPME; default: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_DROPME; } } //from syn extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){ void *logger = g_kni_handle->local_logger; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1); //TODO: ipv6 if(stream->addr.addrtype == ADDR_TYPE_IPV6){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_DROPME; } //a_packet == NULL, continue if(a_packet == NULL){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; } struct pme_info *pmeinfo = *(struct pme_info **)pme; //pktinfo struct pkt_info *pktinfo = (struct pkt_info*)ALLOC(struct pkt_info, 1); pktinfo->iphdr = (struct iphdr*)a_packet; pktinfo->iphdr_len = pktinfo->iphdr->ihl * 4; pktinfo->ip_totlen = ntohs(pktinfo->iphdr->tot_len); pktinfo->tcphdr = (struct tcphdr*)((char*)pktinfo->iphdr + pktinfo->iphdr_len); pktinfo->tcphdr_len = pktinfo->tcphdr->doff * 4; pktinfo->data = (char*)pktinfo->tcphdr + pktinfo->tcphdr_len; pktinfo->data_len = pktinfo->ip_totlen - pktinfo->iphdr_len - pktinfo->tcphdr_len; int ret; int key_size; switch(stream->pktstate){ case OP_STATE_PENDING: *pme = pmeinfo = pme_info_new(stream, thread_seq, logger); key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_trace_id), key_size, (const void*)pmeinfo); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add," "table is traceid2pme_htable, key is %s", pmeinfo->stream_trace_id); } KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add, table is traceid2pme_htable, key is %s, key_size is %d", pmeinfo->stream_trace_id, key_size); ret = pending_opstate(stream, pmeinfo, pktinfo); break; case OP_STATE_DATA: ret = data_opstate(stream, pmeinfo, pktinfo, thread_seq); break; case OP_STATE_CLOSE: ret = close_opstate(stream, pmeinfo, pktinfo, thread_seq); break; default: ret = APP_STATE_FAWPKT | APP_STATE_GIVEME; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP], 0, FS_OP_ADD, 1); KNI_LOG_ERROR(logger, "Unknown stream opstate %d", stream->pktstate); break; } FREE(&pktinfo); if((ret & APP_STATE_DROPME)){ pmeinfo->sapp_release = 1; pme_info_destroy(pmeinfo); *pme = NULL; } return ret; } void http_project_free(int thread_seq, void *project_req_value){ FREE(&project_req_value); } static int http_project_init(){ void *logger = g_kni_handle->local_logger; int id = project_producer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT, http_project_free); if(id < 0){ KNI_LOG_ERROR(logger, "Failed at project_producer_register, project name is %s, ret is %d", HTTP_PROJECT_NAME, id); return -1; } id = project_customer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT); if(id < 0){ KNI_LOG_ERROR(logger, "Failed at project_customer_register, project name is %s, ret is %d", HTTP_PROJECT_NAME, id); return -1; } return id; } extern "C" char kni_http_entry(stSessionInfo* session_info, void **pme, int thread_seq, struct streaminfo *a_stream, const void *a_packet){ http_infor* http_info = (http_infor*)(session_info->app_info); //only process first http session if(http_info->http_session_seq != 1){ return PROT_STATE_DROPME; } if(session_info->prot_flag != HTTP_HOST){ return PROT_STATE_GIVEME; } int host_len = MIN(session_info->buflen, KNI_DEFAULT_MTU); struct http_project* host_info = ALLOC(struct http_project, 1); host_info->host_len = host_len; memcpy(host_info->host, session_info->buf, host_len); if(project_req_add_struct(a_stream, g_kni_handle->http_project_id, host_info) < 0){ FREE(&host_info); host_info = NULL; } return PROT_STATE_DROPME; } static void kni_marsio_destroy(struct kni_marsio_handle *handle){ if(handle != NULL){ if(handle->instance != NULL){ marsio_destory(handle->instance); } for(int i = 0; i < TFE_COUNT_MAX; i++){ FREE(&handle->tfe_instance_list[i]); } } FREE(&handle); handle = NULL; } void* thread_tfe_data_receiver(void *args){ struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args; struct kni_marsio_handle *marsio_handle = _args->marsio_handle; int tfe_id = _args->tfe_id; struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[tfe_id]->dev_eth_handler; FREE(&args); marsio_buff_t * rx_buff[BURST_MAX]; int nr_burst = 1; int thread_seq = 0; while(true){ //receive from tfe int ret = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst); if(ret <= 0){ continue; } //tag struct mr_tunnat_ctrlzone mr_ctrlzone; mr_ctrlzone.action |= TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER; for(int i = 0; i < ret; i++){ marsio_buff_ctrlzone_set(rx_buff[i], 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone)); } //send to vxlan marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, 1, MARSIO_SEND_OPT_FAST); } return NULL; } static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type, uint16_t value_size_max, void *logger){ uint16_t value_size = 0; unsigned char *value = NULL; int ret = kni_cmsg_get(cmsg, type, &value_size, &value); if(ret < 0){ if(ret == KNI_CMSG_INVALID_TYPE){ KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", type, ret); } return -1; } if(value_size > value_size_max){ KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %d, size is %d, which should <= %d", type, value_size, value_size_max); return -1; } switch(type) { case TFE_CMSG_SSL_INTERCEPT_STATE: memcpy((char*)&(pmeinfo->intercept_state), value, value_size); break; case TFE_CMSG_SSL_UPSTREAM_LATENCY: memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size); break; case TFE_CMSG_SSL_DOWNSTREAM_LATENCY: memcpy((char*)&(pmeinfo->ssl_client_side_latency), value, value_size); break; case TFE_CMSG_SSL_UPSTREAM_VERSION: memcpy(pmeinfo->ssl_server_side_version, value, value_size); break; case TFE_CMSG_SSL_DOWNSTREAM_VERSION: memcpy(pmeinfo->ssl_client_side_version, value, value_size); break; case TFE_CMSG_SSL_PINNING_STATE: memcpy((char*)&(pmeinfo->pinningst), value, value_size); break; case TFE_CMSG_SSL_CERT_VERIFY: memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size); break; case TFE_CMSG_SSL_ERROR: memcpy((char*)&(pmeinfo->ssl_error), value, value_size); break; default: break; } return 0; } static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size, void *user_args){ struct traceid2pme_search_cb_args *args = (struct traceid2pme_search_cb_args*)user_args; void *logger = args->logger; struct kni_cmsg *cmsg = args->cmsg; struct pme_info *pmeinfo = (struct pme_info*)data; if(pmeinfo != NULL){ wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger); wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger); FREE(&cmsg); pmeinfo->tfe_release = 1; pmeinfo->end_time = time(NULL); int key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)); int ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id, key_size, NULL); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d", "traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret); } else{ KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d", "traceid2pme_htable", pmeinfo->stream_trace_id, key_size); } } FREE(&cmsg); return 0; } void* thread_tfe_cmsg_receiver(void *args){ struct thread_tfe_cmsg_receiver_args *_args = (struct thread_tfe_cmsg_receiver_args*)args; const char *profile = _args->profile; const char *section = "tfe_cmsg_receiver"; void *logger = _args->logger; char listen_eth[INET_ADDRSTRLEN]; uint32_t listen_ip; int listen_port = -1; char buff[KNI_MTU]; int sockfd; struct sockaddr_in server_addr, client_addr; int ret = MESA_load_profile_string_nodef(profile, section, "listen_eth", listen_eth, sizeof(listen_eth)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: listen_eth not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_int_nodef(profile, section, "listen_port", &listen_port); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: listen_port not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d", section, listen_eth, listen_port); FREE(&args); //create socket sockfd = socket(AF_INET, SOCK_DGRAM, 0); if(sockfd < 0){ KNI_LOG_ERROR(logger, "Failed at create udp socket, errno is %d, %s", errno, strerror(errno)); goto error_out; } memset(&server_addr, 0, sizeof(server_addr)); memset(&client_addr, 0, sizeof(client_addr)); ret = kni_ipv4_addr_get_by_eth(listen_eth, &listen_ip); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", listen_eth); goto error_out; } server_addr.sin_family = AF_INET; // IPv4 server_addr.sin_addr.s_addr = listen_ip; server_addr.sin_port = htons(listen_port); //bind ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at bind udp socket, errno is %d, %s", errno, strerror(errno)); goto error_out; } //receive while(true){ socklen_t client_len = sizeof(client_addr); int recv_len = recvfrom(sockfd, (char *)buff, sizeof(buff), MSG_WAITALL, (struct sockaddr*)&client_addr, &client_len); if(recv_len < 0){ KNI_LOG_ERROR(logger, "Failed at recv udp data, errno is %d, %s", errno, strerror(errno)); continue; } KNI_LOG_DEBUG(logger, "recv udp data: recv_len is %d\n", recv_len); struct kni_cmsg *cmsg = NULL; ret = kni_cmsg_deserialize((const unsigned char*)buff, recv_len, &cmsg); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret is %d", ret); continue; } //get stream_trace_id unsigned char *stream_trace_id = NULL; uint16_t value_size; ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_trace_id); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", TFE_CMSG_STREAM_TRACE_ID, ret); continue; } //get pme long cb_ret = -1; struct traceid2pme_search_cb_args cb_args; memset((void*)&cb_args, sizeof(cb_args), 0); cb_args.cmsg = cmsg; cb_args.logger = logger; MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_trace_id, value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret); } return NULL; error_out: if(sockfd >= 0){ close(sockfd); } return NULL; } static struct kni_marsio_handle* kni_marsio_init(const char* profile){ void *logger = g_kni_handle->local_logger; const char* section = "marsio"; char appsym[KNI_SYMBOL_MAX]; char dev_vxlan_symbol[KNI_SYMBOL_MAX]; char src_mac_addr_str[KNI_SYMBOL_MAX]; unsigned int opt_value = 1; int tfe_count; struct mr_instance *mr_inst = NULL; struct mr_vdev *dev_vxlan_handler = NULL; struct mr_sendpath *dev_vxlan_sendpath = NULL; struct mr_vdev *dev_eth_handler = NULL; struct mr_sendpath *dev_eth_sendpath = NULL; struct tfe_instance *tfe_inst = NULL; struct kni_marsio_handle *handle = NULL; int ret = MESA_load_profile_string_nodef(profile, section, "appsym", appsym, sizeof(appsym)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: appsym not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "dev_vxlan_symbol", dev_vxlan_symbol, sizeof(dev_vxlan_symbol)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: dev_vxlan_symbol not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "src_mac_addr", src_mac_addr_str, sizeof(src_mac_addr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n appsym: %s\n dev_vxlan_symbol: %s\n src_mac_addr: %s", section, appsym, dev_vxlan_symbol, src_mac_addr_str); mr_inst = marsio_create(); if(mr_inst == NULL){ KNI_LOG_ERROR(logger, "Failed at create marsio instance"); goto error_out; } handle = ALLOC(struct kni_marsio_handle, 1); handle->instance = mr_inst; ret = sscanf(src_mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", &(handle->src_mac_addr[0]), &(handle->src_mac_addr[1]), &(handle->src_mac_addr[2]), &(handle->src_mac_addr[3]), &(handle->src_mac_addr[4]), &(handle->src_mac_addr[5])); if(ret != 6){ KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, section); goto error_out; } marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value)); marsio_init(mr_inst, appsym); //eth_handler receive thread = 1, send thread = g_iThreadNum tfe_count = g_kni_handle->tfe_count; for(int i = 0; i < tfe_count; i++){ //load tfe conf char _section[KNI_SYMBOL_MAX]; char mac_addr_str[KNI_SYMBOL_MAX]; char dev_eth_symbol[KNI_SYMBOL_MAX]; snprintf(_section, sizeof(_section), "tfe%d", i); int ret = MESA_load_profile_string_nodef(profile, _section, "mac_addr", mac_addr_str, sizeof(mac_addr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr not set, profile is %s, section is %s", profile, _section); goto error_out; } tfe_inst = ALLOC(struct tfe_instance, 1); //ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa ret = sscanf(mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", &tfe_inst->mac_addr[0], &tfe_inst->mac_addr[1], &tfe_inst->mac_addr[2], &tfe_inst->mac_addr[3], &tfe_inst->mac_addr[4], &tfe_inst->mac_addr[5]); if(ret != 6){ KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, _section); goto error_out; } ret = MESA_load_profile_string_nodef(profile, _section, "dev_eth_symbol", dev_eth_symbol, sizeof(dev_eth_symbol)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile is %s, section is %s", profile, _section); goto error_out; } KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s", _section, mac_addr_str, dev_eth_symbol); //handler dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum); if(dev_eth_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol); goto error_out; } //sendpath dev_eth_sendpath = marsio_sendpath_create_by_vdev(dev_eth_handler); if(dev_eth_sendpath == NULL){ KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_eth_symbol); goto error_out; } //tfe_instance tfe_inst->dev_eth_handler = dev_eth_handler; tfe_inst->dev_eth_sendpath = dev_eth_sendpath; handle->tfe_instance_list[i] = tfe_inst; } //vxlan_handler: receive: 0 thread, send: 1 dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, 1); if(dev_vxlan_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol); goto error_out; } handle->dev_vxlan_handler = dev_vxlan_handler; //vxlan sendpath dev_vxlan_sendpath = marsio_sendpath_create_by_vdev(dev_vxlan_handler); if(dev_eth_sendpath == NULL){ KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_vxlan_symbol); goto error_out; } handle->dev_vxlan_sendpath = dev_vxlan_sendpath; //marsio_thread_init(mr_instance); return handle; error_out: kni_marsio_destroy(handle); return NULL; } static void fs_destroy(struct kni_field_stat_handle *fs_handle){ if(fs_handle != NULL){ FS_stop(&(fs_handle->handle)); } FREE(&fs_handle); } static struct kni_field_stat_handle * fs_init(const char *profile){ void *logger = g_kni_handle->local_logger; const char *section = "field_stat"; char stat_path[KNI_PATH_MAX]; struct kni_field_stat_handle *fs_handle = NULL; screen_stat_handle_t handle = NULL; const char *app_name = "fs2_kni"; int value = 0; int ret = MESA_load_profile_string_nodef(profile, section, "stat_path", stat_path, sizeof(stat_path)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: stat_path not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n stat_path: %s\n", "field_stat", stat_path); handle = FS_create_handle(); if(handle == NULL){ KNI_LOG_ERROR(logger, "Failed at create FS_create_handle"); goto error_out; } fs_handle = ALLOC(struct kni_field_stat_handle, 1); fs_handle->handle = handle; FS_set_para(handle, APP_NAME, app_name, strlen(app_name) + 1); FS_set_para(handle, OUTPUT_DEVICE, stat_path, strlen(stat_path)+1); value = 0; FS_set_para(handle, FLUSH_BY_DATE, &value, sizeof(value)); value = 1; FS_set_para(handle, PRINT_MODE, &value, sizeof(value)); value = 1; FS_set_para(handle, CREATE_THREAD, &value, sizeof(value)); value = 5; FS_set_para(handle, STAT_CYCLE, &value, sizeof(value)); value = 4096; FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); fs_handle = ALLOC(struct kni_field_stat_handle, 1); fs_handle->handle = handle; fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt"); fs_handle->fields[KNI_FIELD_BYP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_pkt"); fs_handle->fields[KNI_FIELD_INTCP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_pkt"); fs_handle->fields[KNI_FIELD_IPV6_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_pkt"); fs_handle->fields[KNI_FIELD_NULL_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "null_pkt"); fs_handle->fields[KNI_FIELD_NO_SYN_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_syn_pkt"); fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_state"); fs_handle->fields[KNI_FIELD_NO_SA_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_s/a_pkt"); fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm"); fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm"); fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm"); fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm"); fs_handle->fields[KNI_FIELD_HTTP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "http_stm"); fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_succ"); fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_fail"); fs_handle->fields[KNI_FIELD_UNKNOWN_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_stm"); fs_handle->handle = handle; FS_start(handle); return fs_handle; error_out: fs_destroy(fs_handle); return NULL; } extern "C" void kni_destroy(struct kni_handle *handle){ if(handle != NULL){ } FREE(&handle); handle = NULL; } static void traceid2pme_htable_data_free_cb(void *data){ void *logger = g_kni_handle->local_logger; KNI_LOG_DEBUG(logger, "call traceid2pme_htable_data_free_cb"); struct pme_info *pmeinfo = (struct pme_info*)data; pme_info_destroy(pmeinfo); } //eliminate_type: 0:FIFO; 1:LRU //ret: 1: the item can be eliminated; 0: the item can't be eliminated static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){ void *logger = g_kni_handle->local_logger; struct pme_info *pmeinfo = (struct pme_info*)data; if(pmeinfo->sapp_release == 0){ KNI_LOG_DEBUG(logger, "Failed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release); return 0; } KNI_LOG_DEBUG(logger, "Succeed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release); pmeinfo->tfe_release = 1; return 1; } extern "C" int kni_init(){ const char *profile = "./conf/kni/kni.conf"; const char *section = "global"; //init logger char log_path[KNI_PATH_MAX] = ""; int tfe_count = 0; char local_eth[KNI_SYMBOL_MAX] = ""; struct kni_send_logger *send_logger = NULL; struct kni_field_stat_handle *fs_handle = NULL; int id = -1; void *local_logger = NULL; int log_level = -1; pthread_t thread_id = -1; struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args; MESA_htable_handle traceid2pme_htable = NULL; int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path)); if(ret < 0){ printf("MESA_prof_load: log_path not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_int_nodef(profile, section, "log_level", &log_level); if(ret < 0){ printf("MESA_prof_load: log_level not set, profile is %s, section is %s", profile, section); goto error_out; } local_logger = MESA_create_runtime_log_handle(log_path, log_level); if (unlikely(local_logger == NULL)){ printf("Failed at create logger: %s", log_path); goto error_out; } ret = MESA_load_profile_int_nodef(profile, section, "tfe_count", &tfe_count); if(ret < 0){ KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section); goto error_out; } if(tfe_count > TFE_COUNT_MAX){ KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX); goto error_out; } if(tfe_count <= 0){ KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count); goto error_out; } ret = MESA_load_profile_string_nodef(profile, section, "local_eth", local_eth, sizeof(local_eth)); if(ret < 0){ printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s", section, log_path, log_level, tfe_count, local_eth); g_kni_handle = ALLOC(struct kni_handle, 1); g_kni_handle->local_logger = local_logger; g_kni_handle->tfe_count = tfe_count; //init http_project id = http_project_init(); if(id < 0){ KNI_LOG_ERROR(local_logger, "Failed at init http project, ret is %d", id); goto error_out; } g_kni_handle->http_project_id = id; //init marsio g_kni_handle->marsio_handle = kni_marsio_init(profile); if(g_kni_handle->marsio_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init marsio"); goto error_out; } //create thread_tfe_data_receiver for(int i = 0; i < tfe_count; i++){ struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1); args->logger = local_logger; args->marsio_handle = g_kni_handle->marsio_handle; args->tfe_id = i; int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret); FREE(&args); goto error_out; } } //create thread_tfe_cmsg_receiver cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1); cmsg_receiver_args->logger = local_logger; strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1)); ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret); FREE(&cmsg_receiver_args); goto error_out; } //init maat g_kni_handle->maat_handle = kni_maat_init(profile, local_logger); if(g_kni_handle->maat_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init maat"); goto error_out; } //init_filedstat fs_handle = fs_init(profile); if(fs_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init field_stat"); goto error_out; } g_kni_fs_handle = fs_handle; //init local_ipv4 ret = kni_ipv4_addr_get_by_eth(local_eth, &(g_kni_handle->local_ipv4)); if(ret < 0){ KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth is %s", local_eth); goto error_out; } //init kni_send_logger send_logger = kni_send_logger_init(profile, local_logger); if(send_logger == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", local_eth); goto error_out; } g_kni_handle->send_logger = send_logger; //init traceid2pme_htable traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", (void*)traceid2pme_htable_data_free_cb, (void*)traceid2pme_htable_expire_notify_cb, local_logger); if(traceid2pme_htable == NULL){ KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable"); goto error_out; } g_kni_handle->traceid2pme_htable = traceid2pme_htable; return 0; error_out: kni_destroy(g_kni_handle); return -1; }