This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-kni/entry/src/kni_entry.cpp
2019-10-16 18:26:42 +08:00

2346 lines
94 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
intercept: destroy_pme + send_log + del traceid2pem + del tuple2stream
bypass: drome: pme_new_fail: destroy_pme
no_tfe: destroy_pme
stream_error: destroy_pme + send_log
giveme: policy: destroy_pme + send_log
dup_traffic: destroy_pme + send_log
*/
#include "kni_utils.h"
#include "ssl_utils.h"
#include "marsio.h"
#include "kni_maat.h"
#include "MESA/http.h"
#include "MESA/stream_inc/sapp_inject.h"
#include "kni_cmsg.h"
#include "uuid/uuid.h"
#include "cjson/cJSON.h"
#include "kni_send_logger.h"
#include <linux/if_ether.h>
#include <signal.h>
#include "tfe_mgr.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "dablooms.h"
#ifdef __cplusplus
}
#endif
#include "kni_tun.h"
struct kni_handle *g_kni_handle = NULL;
struct kni_field_stat_handle *g_kni_fs_handle = NULL;
#define HTTP_PROJECT_NAME "kni_http_tag"
#define BURST_MAX 1
#define STREAM_TRACEID_LEN 37
#define CALLER_SAPP 0
#define CALLER_TFE 1
enum kni_protocol{
KNI_PROTOCOL_UNKNOWN = 0,
KNI_PROTOCOL_SSL,
KNI_PROTOCOL_HTTP,
};
enum stream_error{
STREAM_ERROR_ASYM_ROUTING = -1,
STREAM_ERROR_NO_SYN = -2,
STREAM_ERROR_NO_SYN_ACK = -3,
STREAM_ERROR_NO_DATA = -4,
STREAM_ERROR_UNSUPPORTED_PROTOCOL = -5,
STREAM_ERROR_INVALID_IP_HDR = -6,
STREAM_ERROR_EXCEED_MTU = -7,
//internal
STREAM_ERROR_INVALID_ACTION = -8,
STREAM_ERROR_SENDTO_TFE_FAIL = -9,
STREAM_ERROR_TUPLE2STM_ADD_FAIL = -10,
STREAM_ERROR_NO_TFE = -11,
STREAM_ERROR_PME_INIT_FAIL= -12,
STREAM_ERROR_DUP_TRAFFIC = -13,
STREAM_ERROR_CMSG_ADD_FAIL = -14,
};
struct http_project{
int host_len;
char host[KNI_DOMAIN_MAX];
};
//memset 0
struct dup_traffic_dabloom_key{
union{
struct stream_tuple4_v4 v4;
struct stream_tuple4_v6 v6;
}addr;
uint16_t ipid;
uint32_t seq;
uint32_t ack_seq;
uint32_t timestamp;
};
struct pme_info{
addr_type_t addr_type;
char stream_addr[KNI_ADDR_MAX];
int protocol;
int do_log;
int policy_id;
int maat_hit;
enum kni_action action;
int service;
struct kni_tcpopt_info client_tcpopt;
struct kni_tcpopt_info server_tcpopt;
char has_syn;
char has_syn_ack;
uint16_t client_window;
uint16_t server_window;
int tfe_id;
pthread_mutex_t lock;
enum stream_error error;
char stream_traceid[STREAM_TRACEID_LEN];
//cjson check protocol
union{
char host[KNI_DOMAIN_MAX]; //http only
char sni[KNI_DOMAIN_MAX]; //ssl only
}domain;
//tfe_release = 1: tfe don't need pmeinfo
int tfe_release;
int sapp_release;
//kafka log
struct layer_addr *addr;
unsigned char dir;
uint64_t server_bytes;
uint64_t client_bytes;
uint64_t server_pkts;
uint64_t client_pkts;
struct timespec start_time;
struct timespec end_time;
uint64_t con_duration_ms;
//from tfe, kafka log
uint64_t intercept_state;
uint64_t pinningst; //defalut 0
uint64_t ssl_server_side_latency;
uint64_t ssl_client_side_latency;
char ssl_server_side_version[KNI_SYMBOL_MAX];
char ssl_client_side_version[KNI_SYMBOL_MAX];
int64_t ssl_cert_verify;
char ssl_error[KNI_STRING_MAX];
//for dup traffic detect
int has_dup_traffic;
int has_dup_syn;
int has_dup_syn_ack;
struct dup_traffic_dabloom_key *syn_packet;
struct dup_traffic_dabloom_key *syn_ack_packet;
};
struct wrapped_packet{
char data[KNI_MTU];
};
struct tcp_option_restore{
uint8_t kind;
uint8_t len;
uint16_t offset;
};
struct tfe_enabled_node{
int tfe_id;
struct mr_vdev *dev_eth_handler;
struct mr_sendpath *dev_eth_sendpath;
};
struct kni_marsio_handle{
struct mr_instance *instance;
int tfe_enabled_node_count;
struct tfe_enabled_node tfe_enabled_nodes[TFE_COUNT_MAX];
};
struct protocol_identify_result{
int protocol;
char domain[KNI_DOMAIN_MAX];
int domain_len;
};
struct thread_tfe_cmsg_receiver_args{
void *logger;
char profile[KNI_SYMBOL_MAX];
};
struct per_thread_handle{
MESA_htable_handle tuple2stream_htable;
struct expiry_dablooms_handle *dabloom_handle;
};
struct tuple2stream_htable_value{
struct streaminfo *stream;
struct pme_info *pmeinfo;
int route_dir;
int reversed;
};
struct kni_handle{
int http_project_id;
struct kni_marsio_handle *marsio_handle;
struct kni_tun_handle *tun_handle;
struct kni_maat_handle *maat_handle;
struct kni_send_logger *send_logger;
MESA_htable_handle traceid2pme_htable;
struct per_thread_handle *threads_handle;
uint32_t local_ipv4;
void *local_logger;
struct tfe_mgr *_tfe_mgr;
int thread_count;
int dup_traffic_switch;
int dup_traffic_action;
enum kni_deploy_mode deploy_mode;
char src_mac_addr[6];
char dst_mac_addr[6];
};
struct traceid2pme_search_cb_args{
struct kni_cmsg *cmsg;
void *logger;
};
static char* stream_errmsg_session_record(enum stream_error _errno){
switch(_errno){
case STREAM_ERROR_ASYM_ROUTING:
return (char*)"e_asym_routing";
case STREAM_ERROR_NO_SYN:
return (char*)"e_no_syn";
case STREAM_ERROR_NO_SYN_ACK:
return (char*)"e_no_synack";
case STREAM_ERROR_NO_DATA:
return (char*)"e_no_data";
case STREAM_ERROR_UNSUPPORTED_PROTOCOL:
return (char*)"e_unsupported_protocol";
case STREAM_ERROR_INVALID_IP_HDR:
return (char*)"e_invalid_ip_hdr";
case STREAM_ERROR_EXCEED_MTU:
return (char*)"e_exceed_mtu";
case STREAM_ERROR_INVALID_ACTION:
return (char*)"e_internal_1";
case STREAM_ERROR_SENDTO_TFE_FAIL:
return (char*)"e_internal_2";
case STREAM_ERROR_TUPLE2STM_ADD_FAIL:
return (char*)"e_internal_3";
case STREAM_ERROR_NO_TFE:
return (char*)"e_internal_4";
case STREAM_ERROR_PME_INIT_FAIL:
return (char*)"e_internal_5";
case STREAM_ERROR_DUP_TRAFFIC:
return (char*)"e_internal_6";
case STREAM_ERROR_CMSG_ADD_FAIL:
return (char*)"e_internal_7";
default:
return (char*)"unknown error";
}
}
static int dup_traffic_dabloom_key_get(struct pkt_info *pktinfo, struct dup_traffic_dabloom_key *key){
//ipv6
struct tcphdr *tcphdr = pktinfo->tcphdr;
key->seq = tcphdr->seq;
key->ack_seq = tcphdr->ack_seq;
struct kni_tcpopt_info tcpopt;
kni_get_tcpopt(&tcpopt, tcphdr, pktinfo->tcphdr_len);
key->timestamp = tcpopt.ts_value;
if(pktinfo->addr_type == ADDR_TYPE_IPV6){
struct ip6_hdr *iphdr = pktinfo->iphdr.v6;
memcpy(key->addr.v6.saddr, &(iphdr->ip6_src), sizeof(key->addr.v6.saddr));
memcpy(key->addr.v6.daddr, &(iphdr->ip6_dst), sizeof(key->addr.v6.daddr));
key->addr.v6.source = tcphdr->source;
key->addr.v6.dest = tcphdr->dest;
}
//ipv4
else{
struct iphdr *iphdr = pktinfo->iphdr.v4;
key->addr.v4.saddr = iphdr->saddr;
key->addr.v4.daddr = iphdr->daddr;
key->addr.v4.source = tcphdr->source;
key->addr.v4.dest = tcphdr->dest;
key->ipid = iphdr->id;
}
return 0;
}
static void pme_info_destroy(void *data){
struct pme_info *pmeinfo = (struct pme_info *)data;
void *logger = g_kni_handle->local_logger;
if(pmeinfo != NULL){
//free layer_addr
layer_addr_free(pmeinfo->addr);
pmeinfo->addr=NULL;
//free lock
pthread_mutex_destroy(&(pmeinfo->lock));
//free syn/syn_ack
FREE(&(pmeinfo->syn_packet));
FREE(&(pmeinfo->syn_ack_packet));
FREE(&pmeinfo);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_FREE], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_CNT], 0, FS_OP_ADD, -1);
}
else{
KNI_LOG_ERROR(logger, "Failed at pme_info_destroy, pmeinfo = null");
}
}
static int pme_info_init(struct pme_info *pmeinfo, const struct streaminfo *stream, int thread_seq){
void *logger = g_kni_handle->local_logger;
pmeinfo->addr_type = (enum addr_type_t)stream->addr.addrtype;
pmeinfo->ssl_cert_verify = -1;
//uuid_t uu;
//uuid_generate_random(uu);
//uuid_unparse(uu, pmeinfo->stream_traceid);
clock_gettime(CLOCK_REALTIME, &(pmeinfo->start_time));
snprintf(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid), "%d%lld.%.9ld",
thread_seq, (long long)pmeinfo->start_time.tv_sec, pmeinfo->start_time.tv_nsec);
pmeinfo->addr = layer_addr_dup(&(stream->addr));
//init pme_lock
int ret = pthread_mutex_init(&(pmeinfo->lock), NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
goto error_out;
}
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
kni_addr_trans_v6(stream->addr.tuple4_v6, pmeinfo->stream_addr, sizeof(pmeinfo->stream_addr));
}
else{
kni_addr_trans_v4(stream->addr.tuple4_v4, pmeinfo->stream_addr, sizeof(pmeinfo->stream_addr));
}
//KNI_LOG_INFO(logger, "stream addr = %s, stream traceid = %s", pmeinfo->stream_addr, pmeinfo->stream_traceid);
return 0;
error_out:
return -1;
}
static int log_generate(struct pme_info *pmeinfo, void *local_logger){
//create cjson
cJSON *log_obj = cJSON_CreateObject();
//stream_traceid
cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_traceid);
//policy_id
cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id);
//action
cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action);
//service
cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service);
//start_time
cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time.tv_sec);
if(pmeinfo->error >= 0){
//end_time
cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time.tv_sec);
//con_duration_ms
cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time.tv_sec - pmeinfo->start_time.tv_sec) * 1000
+ (pmeinfo->end_time.tv_nsec - pmeinfo->start_time.tv_nsec) / 1000000);
}
//stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port
const struct layer_addr *addr = pmeinfo->addr;
char client_ip_str[INET6_ADDRSTRLEN] = "";
char server_ip_str[INET6_ADDRSTRLEN] = "";
switch(addr->addrtype){
case ADDR_TYPE_IPV4:
cJSON_AddNumberToObject(log_obj, "addr_type", 4);
inet_ntop(AF_INET, &(addr->tuple4_v4->saddr), client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET, &(addr->tuple4_v4->daddr), server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP");
break;
case ADDR_TYPE_IPV6:
cJSON_AddNumberToObject(log_obj, "addr_type", 6);
inet_ntop(AF_INET6, addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET6, addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP");
break;
default:
break;
}
//entrance_id: 0
cJSON_AddNumberToObject(log_obj, "entrance_id", 0);
//device_id: 0
cJSON_AddNumberToObject(log_obj, "device_id", 0);
//link_id: 0
cJSON_AddNumberToObject(log_obj, "link_id", 0);
//isp: null
cJSON_AddStringToObject(log_obj, "isp", "");
//encap_type: from sapp, 先填0
cJSON_AddNumberToObject(log_obj, "encap_type", 0);
//pinning state: from tfe
cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst);
//intercept state: from tfe
cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state);
//ssl upstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency);
//ssl downstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency);
//ssl upstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version);
//ssl downstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version);
//ssl cert verify
if(pmeinfo->ssl_cert_verify != -1){
cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify);
}
//direction: 0
cJSON_AddNumberToObject(log_obj, "direction", 0);
//stream_dir: from sapp
cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->dir);
//cap_ip: kni ip
char local_ipv4_str[INET6_ADDRSTRLEN];
inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str));
cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str);
//addr_list
cJSON_AddStringToObject(log_obj, "addr_list", "");
//host: http_only
if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){
cJSON_AddStringToObject(log_obj, "host", pmeinfo->domain.host);
}
//sni: ssl only
if(pmeinfo->protocol == KNI_PROTOCOL_SSL){
cJSON_AddStringToObject(log_obj, "sni", pmeinfo->domain.sni);
}
//c2s_pkt_num
cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts);
//s2c_pkt_num
cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->client_pkts);
//c2s_byte_num
cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->server_bytes);
//s2c_byte_num
cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->client_bytes);
//dup_traffic
cJSON_AddNumberToObject(log_obj, "has_dup_traffic", pmeinfo->has_dup_traffic);
//stream_error
if(pmeinfo->error < 0){
char *stream_errmsg = stream_errmsg_session_record(pmeinfo->error);
cJSON_AddStringToObject(log_obj, "stream_error", stream_errmsg);
}
int ret = -1;
char *log_msg = cJSON_PrintUnformatted(log_obj);
cJSON_Delete(log_obj);
if(log_msg == NULL){
KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid = %s", pmeinfo->stream_traceid);
goto error_out;
}
//local log
KNI_LOG_DEBUG(local_logger, "log_msg = %s\n", log_msg);
//sendto kafka
ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg));
if(ret < 0){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(local_logger, "Failed at sendlog_to_kafka, ret = %d, strem_traceid = %s",
ret, pmeinfo->stream_traceid);
goto error_out;
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
cJSON_free(log_msg);
return 0;
error_out:
if(log_msg != NULL){
cJSON_free(log_msg);
}
return -1;
}
static void stream_destroy(struct pme_info *pmeinfo, int do_log){
//sendlog
void *logger = g_kni_handle->local_logger;
int ret;
if(do_log == 1){
ret = log_generate(pmeinfo, logger);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
else{
KNI_LOG_DEBUG(logger, "Succeed at log_generate, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
}
//free pme
pme_info_destroy(pmeinfo);
}
static int judge_stream_can_destroy(struct pme_info *pmeinfo, int caller){
void *logger = g_kni_handle->local_logger;
int can_destroy = 0;
if(pmeinfo != NULL){
pthread_mutex_lock(&(pmeinfo->lock));
if(caller == CALLER_SAPP){
pmeinfo->sapp_release = 1;
}
if(caller == CALLER_TFE){
pmeinfo->tfe_release = 1;
}
if(pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){
can_destroy = 1;
}
pthread_mutex_unlock(&(pmeinfo->lock));
}
else{
KNI_LOG_ERROR(logger, "Failed at judge_stream_can_destroy, pmeinfo = null");
}
return can_destroy;
}
static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){
//http
struct http_project* project = (struct http_project*)project_req_get_struct(stream, g_kni_handle->http_project_id);
if(project != NULL){
result->protocol = KNI_PROTOCOL_HTTP;
result->domain_len = project->host_len;
strncpy(result->domain, project->host, strnlen(project->host, sizeof(result->domain) - 1));
return 0;
}
//ssl
enum chello_parse_result chello_status = CHELLO_PARSE_INVALID_FORMAT;
struct ssl_chello *chello = NULL;
chello = ssl_chello_parse((const unsigned char*)buf, len, &chello_status);
if(chello_status == CHELLO_PARSE_SUCCESS){
result->protocol = KNI_PROTOCOL_SSL;
if(chello->sni == NULL){
result->domain_len = 0;
}
else{
result->domain_len = strnlen(chello->sni, KNI_DOMAIN_MAX);
strncpy(result->domain, chello->sni, strnlen(chello->sni, sizeof(result->domain) - 1));
}
ssl_chello_free(chello);
return 0;
}
ssl_chello_free(chello);
result->protocol = KNI_PROTOCOL_UNKNOWN;
return 0;
}
static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, uint16_t size, struct pme_info *pmeinfo){
void *logger = g_kni_handle->local_logger;
int ret = kni_cmsg_set(cmsg, type, value, size);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed set cmsg, type = %d, stream traceid = %s, stream addr = %s", type, pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
return ret;
}
static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, struct streaminfo *stream, struct pkt_info *pktinfo, uint16_t *len){
void *logger = g_kni_handle->local_logger;
uint16_t bufflen = 0, serialize_len = 0;
unsigned char *buff = NULL;
uint8_t protocol_type = pmeinfo->protocol == KNI_PROTOCOL_SSL ? 0x1 : 0x0;
struct kni_cmsg *cmsg = kni_cmsg_init();
int policy_id = -1;
char *trace_id = NULL;
uint32_t seq = pktinfo->tcphdr->seq;
uint32_t ack = pktinfo->tcphdr->ack_seq;
uint16_t client_mss = htons(pmeinfo->client_tcpopt.mss);
uint16_t server_mss = htons(pmeinfo->server_tcpopt.mss);
uint16_t client_window = htons(pmeinfo->client_window);
uint16_t server_window = htons(pmeinfo->server_window);
//seq
int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4, pmeinfo);
if(ret < 0) goto error_out;
//ack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4, pmeinfo);
if(ret < 0) goto error_out;
//client mss
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2, pmeinfo);
if(ret < 0) goto error_out;
//server mss
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2, pmeinfo);
if(ret < 0) goto error_out;
//both = 1, send to tfe
if(pmeinfo->client_tcpopt.wscale_set && pmeinfo->server_tcpopt.wscale_set){
//client wscale
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt.wscale), 1, pmeinfo);
if(ret < 0) goto error_out;
//server wscale
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt.wscale), 1, pmeinfo);
if(ret < 0) goto error_out;
}
//client sack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt.sack), 1, pmeinfo);
if(ret < 0) goto error_out;
//server sack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt.sack), 1, pmeinfo);
if(ret < 0) goto error_out;
//client timestamp
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt.ts_set), 1, pmeinfo);
if(ret < 0) goto error_out;
//server timestamp
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt.ts_set), 1, pmeinfo);
if(ret < 0) goto error_out;
//protocol
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1, pmeinfo);
if(ret < 0) goto error_out;
//client window
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2, pmeinfo);
if(ret < 0) goto error_out;
//server window
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2, pmeinfo);
if(ret < 0) goto error_out;
//maat policy id
policy_id = pmeinfo->policy_id;
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id), pmeinfo);
if(ret < 0) goto error_out;
//stream trace id
trace_id = pmeinfo->stream_traceid;
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id,
strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)), pmeinfo);
if(ret < 0) goto error_out;
//src mac
char src_mac[6];
ret = get_rawpkt_opt_from_streaminfo(stream, RAW_PKT_GET_VXLAN_OUTER_GDEV_MAC, src_mac);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at get src mac from rawpkt, ret = %d", ret);
goto error_out;
}
KNI_LOG_DEBUG(logger, "Succeed at get src mac from rawpkt, addr = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
src_mac[0], src_mac[1], src_mac[2], src_mac[3], src_mac[4], src_mac[5]);
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_SRC_MAC, (const unsigned char*)src_mac, sizeof(src_mac), pmeinfo);
if(ret < 0) goto error_out;
//dst mac
char dst_mac[6];
ret = get_rawpkt_opt_from_streaminfo(stream, RAW_PKT_GET_VXLAN_OUTER_LOCAL_MAC, dst_mac);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at get dst mac from rawpkt, ret = %d", ret);
goto error_out;
}
KNI_LOG_DEBUG(logger, "Succeed at get dst mac from rawpkt, addr = %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
dst_mac[0], dst_mac[1], dst_mac[2], dst_mac[3], dst_mac[4], dst_mac[5]);
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_DST_MAC, (const unsigned char*)dst_mac, sizeof(dst_mac), pmeinfo);
if(ret < 0) goto error_out;
bufflen = kni_cmsg_serialize_size_get(cmsg);
buff = (unsigned char*)ALLOC(char, bufflen);
serialize_len = 0;
ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret = %d, stream traceid = %s, stream addr = %s",
ret, pmeinfo->stream_traceid, pmeinfo->stream_addr);
goto error_out;
}
*len = serialize_len;
kni_cmsg_destroy(cmsg);
return buff;
error_out:
if(buff != NULL){
FREE(&buff);
}
kni_cmsg_destroy(cmsg);
return NULL;
}
static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct streaminfo *stream, struct pkt_info *pktinfo, int *len){
//tcp option: kind 88, len 4, control_info_len
char *new_pkt = (char*)ALLOC(struct wrapped_packet, 1);
int offset = 0;
//iphdr
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
memcpy(new_pkt, (void*)pktinfo->iphdr.v6, pktinfo->iphdr_len);
}
else{
memcpy(new_pkt, (void*)pktinfo->iphdr.v4, pktinfo->iphdr_len);
}
offset += pktinfo->iphdr_len;
//tcphdr
struct tcphdr *tcphdr = (struct tcphdr*)(new_pkt + offset);
memcpy(new_pkt + offset, (void*)pktinfo->tcphdr, 20);
offset += 20;
tcphdr->doff = pktinfo->tcphdr->doff + 1;
struct tcp_option_restore *opt = ALLOC(struct tcp_option_restore, 1);
opt->kind = 88;
opt->len = 4;
opt->offset = htons(pktinfo->data_len);
memcpy(new_pkt + offset, (void*)opt, 4);
FREE(&opt);
offset += 4;
memcpy(new_pkt + offset, (void*)((char*)pktinfo->tcphdr + 20), pktinfo->tcphdr_len - 20);
offset += pktinfo->tcphdr_len - 20;
//data
memcpy(new_pkt + offset, (void*)pktinfo->data, pktinfo->data_len);
offset += pktinfo->data_len;
//kni_cmsg_serialize_header
uint16_t header_len = 0;
unsigned char* header = kni_cmsg_serialize_header_new(pmeinfo, stream, pktinfo, &header_len);
if(header == NULL){
goto error_out;
}
memcpy(new_pkt + offset, (void*)header, header_len);
offset += header_len;
FREE(&header);
//ipv6
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
kni_ipv6_header_parse((void*)new_pkt, pktinfo);
pktinfo->iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_plen = htons(offset - sizeof(ip6_hdr));
pktinfo->tcphdr->check = 0;
pktinfo->tcphdr->check = kni_tcp_checksum_v6((void*)pktinfo->tcphdr,
offset - pktinfo->iphdr_len, pktinfo->iphdr.v6->ip6_src, pktinfo->iphdr.v6->ip6_dst);
}
else{
struct iphdr *iphdr = (struct iphdr*)new_pkt;
iphdr->tot_len = htons(offset);
//must set check = 0
iphdr->check = 0;
iphdr->check = kni_ip_checksum((void*)iphdr, pktinfo->iphdr_len);
//tcphdr: checkdum
tcphdr->check = 0;
tcphdr->check = kni_tcp_checksum((void*)tcphdr, offset - pktinfo->iphdr_len, iphdr->saddr, iphdr->daddr);
}
*len = offset;
return new_pkt;
error_out:
if(new_pkt != NULL){
FREE(&new_pkt);
}
return NULL;
}
static int add_ether_header(void *dst_data, void *raw_data, uint16_t raw_len, addr_type_t addr_type){
char *src_mac = g_kni_handle->src_mac_addr;
char *dst_mac = g_kni_handle->dst_mac_addr;
//ether_header[14]
struct ethhdr *ether_hdr = (struct ethhdr*)dst_data;
memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest));
memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source));
if(addr_type == ADDR_TYPE_IPV6){
ether_hdr->h_proto = htons(ETH_P_IPV6);
}
else{
ether_hdr->h_proto = htons(ETH_P_IP);
}
memcpy((char*)dst_data + sizeof(*ether_hdr), raw_data, raw_len);
return 0;
}
static int send_to_tfe_normal_mode(char *raw_data, uint16_t raw_len, int thread_seq, int tfe_id, addr_type_t addr_type){
void *logger = g_kni_handle->local_logger;
struct kni_marsio_handle *handle = g_kni_handle->marsio_handle;
marsio_buff_t *tx_buffs[BURST_MAX];
int index = -1;
for(int i = 0; i < handle->tfe_enabled_node_count; i++){
if(handle->tfe_enabled_nodes[i].tfe_id == tfe_id){
index = i;
break;
}
}
if(index == -1){
KNI_LOG_ERROR(logger, "tfd %d = disabled");
return -1;
}
struct mr_vdev *dev_eth_handler = handle->tfe_enabled_nodes[index].dev_eth_handler;
struct mr_sendpath *dev_eth_sendpath = handle->tfe_enabled_nodes[index].dev_eth_sendpath;
//only send one packet, alloc_ret <= nr_send <= BURST_MAX
int nr_send = 1;
int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq);
if (alloc_ret < 0){
KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret = %d, thread_seq = %d",
alloc_ret, thread_seq);
return -1;
}
for(int i = 0; i < nr_send; i++){
char* dst_data = marsio_buff_append(tx_buffs[i], raw_len + sizeof(struct ethhdr));
add_ether_header(dst_data, raw_data, raw_len, addr_type);
}
marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, nr_send);
return 0;
}
static int send_to_tfe_tun_mode(char *raw_data, uint16_t raw_len, addr_type_t addr_type){
struct kni_tun_handle *handle = g_kni_handle->tun_handle;
char *dst_data = ALLOC(char, KNI_MTU);
add_ether_header(dst_data, raw_data, raw_len, addr_type);
int ret = kni_tun_write(handle, dst_data, raw_len + sizeof(struct ethhdr));
FREE(&dst_data);
if(ret < 0){
return -1;
}
return 0;
}
static int send_to_tfe(char *raw_data, uint16_t raw_len, int thread_seq, int tfe_id, addr_type_t addr_type){
int mode = g_kni_handle->deploy_mode;
int ret;
if(mode == KNI_DEPLOY_MODE_TUN){
ret = send_to_tfe_tun_mode(raw_data, raw_len, addr_type);
}
else{
ret = send_to_tfe_normal_mode(raw_data, raw_len, thread_seq, tfe_id, addr_type);
}
return ret;
}
static int wrapped_kni_header_parse(const void *a_packet, struct pme_info *pmeinfo, struct pkt_info *pktinfo){
void *logger = g_kni_handle->local_logger;
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
int ret = kni_ipv6_header_parse(a_packet, pktinfo);
if(ret < 0){
char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret);
KNI_LOG_DEBUG(logger, "Stream error: failed at parse ipv6 header, errmsg = %s, stream treaceid = %s",
errmsg, pmeinfo->stream_traceid);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1);
return -1;
}
}
else{
int ret = kni_ipv4_header_parse(a_packet, pktinfo);
if(ret < 0){
char *errmsg = kni_ipv4_errmsg_get((enum kni_ipv4hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "Stream error: failed at parse ipv4 header, errmsg = %s, stream treaceid = %s",
errmsg, pmeinfo->stream_traceid);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL], 0, FS_OP_ADD, 1);
return -1;
}
}
return 0;
}
static int tuple2stream_htable_key_get_v4_by_packet(struct pkt_info *pktinfo, struct stream_tuple4_v4 *key, int *reversed){
if(pktinfo->iphdr.v4->saddr < pktinfo->iphdr.v4->daddr){
key->saddr = pktinfo->iphdr.v4->saddr;
key->daddr = pktinfo->iphdr.v4->daddr;
key->source = pktinfo->tcphdr->source;
key->dest = pktinfo->tcphdr->dest;
*reversed = 0;
}
else{
key->saddr = pktinfo->iphdr.v4->daddr;
key->daddr = pktinfo->iphdr.v4->saddr;
key->source = pktinfo->tcphdr->dest;
key->dest = pktinfo->tcphdr->source;
*reversed = 1;
}
return 0;
}
static int tuple2stream_htable_key_get_v6_by_packet(struct pkt_info *pktinfo, struct stream_tuple4_v6 *key, int *reversed){
if(memcmp((void*)&(pktinfo->iphdr.v6->ip6_src), (void*)&(pktinfo->iphdr.v6->ip6_dst), sizeof(key->saddr)) < 0){
memcpy(key->saddr, &(pktinfo->iphdr.v6->ip6_src), sizeof(key->saddr));
memcpy(key->daddr, &(pktinfo->iphdr.v6->ip6_dst), sizeof(key->daddr));
key->source = pktinfo->tcphdr->source;
key->dest = pktinfo->tcphdr->dest;
*reversed = 0;
}
else{
memcpy(key->saddr, &(pktinfo->iphdr.v6->ip6_dst), sizeof(key->saddr));
memcpy(key->daddr, &(pktinfo->iphdr.v6->ip6_src), sizeof(key->daddr));
key->source = pktinfo->tcphdr->dest;
key->dest = pktinfo->tcphdr->source;
*reversed = 1;
}
return 0;
}
static int tuple2stream_htable_key_get_v4_by_stream(const struct streaminfo *stream, struct stream_tuple4_v4 *key, int *reversed){
if(stream->addr.tuple4_v4->saddr < stream->addr.tuple4_v4->daddr){
key->saddr = stream->addr.tuple4_v4->saddr;
key->daddr = stream->addr.tuple4_v4->daddr;
key->source = stream->addr.tuple4_v4->source;
key->dest = stream->addr.tuple4_v4->dest;
*reversed = 0;
}
else{
key->saddr = stream->addr.tuple4_v4->daddr;
key->daddr = stream->addr.tuple4_v4->saddr;
key->source = stream->addr.tuple4_v4->dest;
key->dest = stream->addr.tuple4_v4->source;
*reversed = 1;
}
return 0;
}
static int tuple2stream_htable_key_get_v6_by_stream(const struct streaminfo *stream, struct stream_tuple4_v6 *key, int *reversed){
if(memcmp(stream->addr.tuple4_v6->saddr, stream->addr.tuple4_v6->daddr, sizeof(key->saddr)) < 0){
memcpy(key->saddr, stream->addr.tuple4_v6->saddr, sizeof(key->saddr));
memcpy(key->daddr, stream->addr.tuple4_v6->daddr, sizeof(key->daddr));
key->source = stream->addr.tuple4_v6->source;
key->dest = stream->addr.tuple4_v6->dest;
*reversed = 0;
}
else{
memcpy(key->saddr, stream->addr.tuple4_v6->daddr, sizeof(key->saddr));
memcpy(key->daddr, stream->addr.tuple4_v6->saddr, sizeof(key->daddr));
key->source = stream->addr.tuple4_v6->dest;
key->dest = stream->addr.tuple4_v6->source;
*reversed = 1;
}
return 0;
}
static int tuple2stream_htable_add(addr_type_t addr_type, struct pkt_info *pktinfo,
struct streaminfo *stream, struct pme_info *pmeinfo, int thread_seq){
MESA_htable_handle tuple2stream_htable = g_kni_handle->threads_handle[thread_seq].tuple2stream_htable;
void *logger = g_kni_handle->local_logger;
int ret;
struct tuple2stream_htable_value *value = ALLOC(struct tuple2stream_htable_value, 1);
value->stream = stream;
value->pmeinfo = pmeinfo;
value->route_dir = stream->routedir;
//ipv6
if(addr_type == ADDR_TYPE_IPV6){
struct stream_tuple4_v6 key;
tuple2stream_htable_key_get_v6_by_packet(pktinfo, &key, &(value->reversed));
ret = MESA_htable_add(tuple2stream_htable, (const unsigned char *)&key, sizeof(key), (const void*)value);
if(ret < 0){
char key_str[KNI_ADDR_MAX];
kni_addr_trans_v6(&key, key_str, sizeof(key_str));
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table = tuple2stream_htable, key = %s, key_size = %d, ret = %d",
key_str, sizeof(key), ret);
}
else{
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[thread_seq], FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[g_kni_handle->thread_count], FS_OP_ADD, 1);
}
}
//ipv4
else{
struct stream_tuple4_v4 key;
tuple2stream_htable_key_get_v4_by_packet(pktinfo, &key, &(value->reversed));
ret = MESA_htable_add(tuple2stream_htable, (const unsigned char *)&key, sizeof(key), (const void*)value);
char key_str[KNI_ADDR_MAX];
kni_addr_trans_v4(&key, key_str, sizeof(key_str));
if(ret < 0){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table = tuple2stream_htable, key = %s, key_size = %d, ret = %d",
key_str, sizeof(key), ret);
}
else{
//KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at add, table = tuple2stream_htable, key = %s, key_size = %d, value = %p, ret = %d",
// key_str, sizeof(key), value, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[thread_seq], FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[g_kni_handle->thread_count], FS_OP_ADD, 1);
}
}
return ret;
}
static char pending_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){
void *logger = g_kni_handle->local_logger;
struct pkt_info pktinfo;
int ret = wrapped_kni_header_parse(a_packet, pmeinfo, &pktinfo);
if(ret < 0){
pmeinfo->error = STREAM_ERROR_INVALID_IP_HDR;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_INVALID_IP_HDR], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
if(!pktinfo.tcphdr->syn){
//pending_opstate not syn, bypass and dropme
KNI_LOG_DEBUG(logger, "Stream error: pending opstate, not syn, stream traceid = %s, stream addr = %s",
pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_NO_SYN], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_NO_SYN;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//dup traffic detect
if(g_kni_handle->dup_traffic_switch == 1){
if(pmeinfo->syn_packet == NULL){
struct dup_traffic_dabloom_key *syn_packet = ALLOC(struct dup_traffic_dabloom_key, 1);
dup_traffic_dabloom_key_get(&pktinfo, syn_packet);
pmeinfo->syn_packet = syn_packet;
}
else{
struct dup_traffic_dabloom_key *syn_packet = ALLOC(struct dup_traffic_dabloom_key, 1);
dup_traffic_dabloom_key_get(&pktinfo, syn_packet);
if(memcmp(pmeinfo->syn_packet, syn_packet, sizeof(*syn_packet)) == 0){
pmeinfo->has_dup_syn = 1;
}
FREE(&(pmeinfo->syn_packet));
pmeinfo->syn_packet = syn_packet;
}
}
pmeinfo->client_window = ntohs(pktinfo.tcphdr->window);
pmeinfo->has_syn = 1;
kni_get_tcpopt(&(pmeinfo->client_tcpopt), pktinfo.tcphdr, pktinfo.tcphdr_len);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
static int traceid2pme_htable_add(struct pme_info *pmeinfo){
void *logger = g_kni_handle->local_logger;
int key_size =0, ret;
key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid),
key_size, (const void*)pmeinfo);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at add,"
"table = traceid2pme_htable, key = %s, ret = %d", pmeinfo->stream_traceid, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1);
}
else{
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_CNT], 0, FS_OP_ADD, 1);
}
return ret;
}
int tuple2stream_htable_del(const struct streaminfo *stream, int thread_seq){
MESA_htable_handle handle = g_kni_handle->threads_handle[thread_seq].tuple2stream_htable;
void *logger = g_kni_handle->local_logger;
int reversed = 0, ret;
//ipv6
if(stream->addr.addrtype == ADDR_TYPE_IPV6){
struct stream_tuple4_v6 key;
tuple2stream_htable_key_get_v6_by_stream(stream, &key, &reversed);
ret = MESA_htable_del(handle, (const unsigned char *)(&key),
sizeof(key), NULL);
if(ret < 0){
char key_str[KNI_ADDR_MAX];
kni_addr_trans_v6(&key, key_str, sizeof(key_str));
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table = %s, key = %s, key_size = %d, ret = %d",
"tuple2stream_htable", key_str, sizeof(key), ret);
}
else{
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[thread_seq], FS_OP_ADD, -1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[g_kni_handle->thread_count], FS_OP_ADD, -1);
}
}
//ipv4
else{
struct stream_tuple4_v4 key;
tuple2stream_htable_key_get_v4_by_stream(stream, &key, &reversed);
ret = MESA_htable_del(handle, (const unsigned char *)(&key), sizeof(key), NULL);
char key_str[KNI_ADDR_MAX];
kni_addr_trans_v4(&key, key_str, sizeof(key_str));
if(ret < 0){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table = %s, key = %s, key_size = %d, ret = %d",
"tuple2stream_htable", key_str, sizeof(key), ret);
}
else{
//KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table = %s, key = %s, key_size = %d, ret = %d",
// "tuple2stream_htable", key_str, sizeof(key), ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[thread_seq], FS_OP_ADD, -1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[1], g_kni_fs_handle->column_ids[g_kni_handle->thread_count], FS_OP_ADD, -1);
}
}
return ret;
}
static void traceid2pme_htable_del(struct pme_info *pmeinfo){
//del traceid2pme htable
if(pmeinfo->action == KNI_ACTION_INTERCEPT){
void *logger = g_kni_handle->local_logger;
int key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
int ret;
ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_traceid,
key_size, NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table = %s, key = %s, key_size = %d, ret = %d",
"traceid2pme_htable", pmeinfo->stream_traceid, key_size, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL], 0, FS_OP_ADD, 1);
}
else{
//KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table = %s, key = %s, key_size = %d",
// "traceid2pme_htable", pmeinfo->stream_traceid, key_size);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_CNT], 0, FS_OP_ADD, -1);
}
}
}
static int first_data_intercept(struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, char *stream_addr, int thread_seq){
//dup_traffic_check
if(g_kni_handle->dup_traffic_switch == 1){
//has dup traffic
if(pmeinfo->has_dup_syn == 1 || pmeinfo->has_dup_syn_ack == 1){
pmeinfo->has_dup_traffic = 1;
}
if(pmeinfo->has_dup_traffic == 1){
if(g_kni_handle->dup_traffic_action == KNI_ACTION_BYPASS){
KNI_LOG_DEBUG(g_kni_handle->local_logger, "Stream error: stream has dup traffic, dup_traffic_action = bypass, "
"stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_DUP_TRAFFIC], 0, FS_OP_ADD, 1);
pmeinfo->intercept_state=0;
pmeinfo->error = STREAM_ERROR_DUP_TRAFFIC;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
}
void *logger = g_kni_handle->local_logger;
int ret;
//only intercept: add to tuple2stream_htable
ret = tuple2stream_htable_add(pmeinfo->addr_type, pktinfo, stream, pmeinfo, thread_seq);
if(ret < 0){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_TUPLE2STM_ADD_FAIL], 0, FS_OP_ADD, 1);
KNI_LOG_DEBUG(logger, "Stream error: tuple2stm add fail, stream traceid = %s, stream addr = %s",
pmeinfo->stream_traceid, pmeinfo->stream_addr);
pmeinfo->intercept_state=0;
pmeinfo->error = STREAM_ERROR_TUPLE2STM_ADD_FAIL;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//only intercept: add to traceid2pme htable
traceid2pme_htable_add(pmeinfo);
//action = KNI_ACTION_INTERCEPT, sendto tfe
int len = 0;
//add cmsg
char *buff = add_cmsg_to_packet(pmeinfo, stream, pktinfo, &len);
if(buff == NULL){
KNI_LOG_DEBUG(logger, "Stream error: failed at add cmsg to packet, stream traceid = %s, stream addr = %s",
pmeinfo->stream_traceid, pmeinfo->stream_addr);
pmeinfo->error = STREAM_ERROR_CMSG_ADD_FAIL;
pmeinfo->intercept_state=0;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_CMSG_ADD_FAIL], 0, FS_OP_ADD, 1);
FREE(&buff);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//send to tfe
ret = send_to_tfe(buff, len, thread_seq, pmeinfo->tfe_id, pmeinfo->addr_type);
if(ret < 0){
KNI_LOG_DEBUG(logger, "Stream error: failed at send first packet to tfe%d, stream traceid = %s, stream addr = %s",
pmeinfo->tfe_id, pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_SENDTO_TFE_FAIL], 0, FS_OP_ADD, 1);
pmeinfo->intercept_state=0;
pmeinfo->error = STREAM_ERROR_SENDTO_TFE_FAIL;
FREE(&buff);
tuple2stream_htable_del(stream, thread_seq);
traceid2pme_htable_del(pmeinfo);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
else{
KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream traceid = %s, stream addr = %s",
pmeinfo->tfe_id, pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
FREE(&buff);
//fs stat
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_BYTE], 0, FS_OP_ADD, pktinfo->ip_totlen);
//ipv4 or ipv6
if(stream->addr.addrtype == ADDR_TYPE_IPV6){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_STM], 0, FS_OP_ADD, 1);
}
else{
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4_STM], 0, FS_OP_ADD, 1);
}
//http or ssl
if(pmeinfo->protocol == KNI_PROTOCOL_SSL){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SSL_STM], 0, FS_OP_ADD, 1);
}
if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_HTTP_STM], 0, FS_OP_ADD, 1);
}
//dup_traffic_stm
if(pmeinfo->has_dup_traffic == 1){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_DUP_TFC_STM], 0, FS_OP_ADD, 1);
KNI_LOG_DEBUG(logger, "stream has dup traffic, traceid = %s", pmeinfo->stream_traceid);
}
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
}
static int dabloom_search(struct pkt_info *pktinfo, int thread_seq){
void *logger = g_kni_handle->local_logger;
struct dup_traffic_dabloom_key bloom_key;
memset(&bloom_key, 0, sizeof(bloom_key));
dup_traffic_dabloom_key_get(pktinfo, &bloom_key);
int ret = expiry_dablooms_search(g_kni_handle->threads_handle[thread_seq].dabloom_handle, (const char*)&bloom_key, sizeof(bloom_key));
//ret = 1, = dup packet, bypass the packet
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at expiry_dablooms_search, errmsg = %s", expiry_dablooms_errno_trans((enum expiry_dablooms_errno)ret));
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_FAIL], 0, FS_OP_ADD, 1);
}
else{
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_SUCC], 0, FS_OP_ADD, 1);
if(ret == 1){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_HIT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_DUP_TFC_BYTE], 0, FS_OP_ADD, pktinfo->ip_totlen);
}
else{
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_MISS], 0, FS_OP_ADD, 1);
}
}
uint64_t count = 0;
expiry_dablooms_element_count_get(g_kni_handle->threads_handle[thread_seq].dabloom_handle, &count);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[0], g_kni_fs_handle->column_ids[thread_seq], FS_OP_SET, count);
return ret;
}
static int dabloom_add(struct pkt_info *pktinfo, int thread_seq){
void *logger = g_kni_handle->local_logger;
struct dup_traffic_dabloom_key bloom_key;
memset(&bloom_key, 0, sizeof(bloom_key));
dup_traffic_dabloom_key_get(pktinfo, &bloom_key);
int ret = expiry_dablooms_add(g_kni_handle->threads_handle[thread_seq].dabloom_handle, (const char*)&bloom_key, sizeof(bloom_key));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at expiry_dablooms_add, errmsg = %s", expiry_dablooms_errno_trans((enum expiry_dablooms_errno)ret));
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_ADD_FAIL], 0, FS_OP_ADD, 1);
}
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BLOOM_ADD_SUCC], 0, FS_OP_ADD, 1);
uint64_t count = 0;
expiry_dablooms_element_count_get(g_kni_handle->threads_handle[thread_seq].dabloom_handle, &count);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->line_ids[0], g_kni_fs_handle->column_ids[thread_seq], FS_OP_SET, count);
return ret;
}
static char data_opstate(struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){
//pmeinfo->tfe_release = 1: intercept, tfe end first. DO NOT droppkt and dropme
if(pmeinfo->tfe_release == 1){
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum;
pmeinfo->dir=stream->dir;
#if 0
return APP_STATE_DROPPKT | APP_STATE_DROPME;
#endif
}
void *logger = g_kni_handle->local_logger;
struct iphdr *ipv4_hdr = NULL;
struct ip6_hdr *ipv6_hdr = NULL;
uint16_t len = 0, ret;
char stream_addr[KNI_SYMBOL_MAX] = "";
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
kni_addr_trans_v6(stream->addr.tuple4_v6, stream_addr, sizeof(stream_addr));
}
else{
kni_addr_trans_v4(stream->addr.tuple4_v4, stream_addr, sizeof(stream_addr));
}
//parse ipv4/6 header
struct pkt_info pktinfo;
ret = wrapped_kni_header_parse(a_packet, pmeinfo, &pktinfo);
if(ret < 0){
pmeinfo->error = STREAM_ERROR_INVALID_IP_HDR;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_INVALID_IP_HDR], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//pmeinfo->action has only 3 value: KNI_ACTION_NONE KNI_ACTION_INTERCEPT KNI_ACTION_BYPASS
if(pmeinfo->action != KNI_ACTION_NONE){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_READY_BYTE], 0, FS_OP_ADD, pktinfo.ip_totlen);
}
switch (pmeinfo->action){
case KNI_ACTION_NONE:
break;
case KNI_ACTION_INTERCEPT:
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TX_TFE_BYTE], 0, FS_OP_ADD, pktinfo.ip_totlen);
//search dabloom
if(g_kni_handle->dup_traffic_switch == 1){
if(pmeinfo->has_dup_traffic == 1){
//ret = 1, = dup packet, bypass the packet
ret = dabloom_search(&pktinfo, thread_seq);
if(ret == 1){
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
}
}
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
ipv6_hdr = (struct ip6_hdr*)a_packet;
len = ntohs(ipv6_hdr->ip6_ctlun.ip6_un1.ip6_un1_plen) + sizeof(struct ip6_hdr);
}
else{
ipv4_hdr = (struct iphdr*)a_packet;
len = ntohs(ipv4_hdr->tot_len);
}
ret = send_to_tfe((char*)a_packet, len, thread_seq, pmeinfo->tfe_id, pmeinfo->addr_type);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream traceid = %s, stream addr = %s",
pmeinfo->tfe_id, pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_SENDTO_TFE_FAIL], 0, FS_OP_ADD, 1);
}
else{
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_BYTE], 0, FS_OP_ADD, pktinfo.ip_totlen);
}
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
case KNI_ACTION_BYPASS:
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
default:
assert(0);
break;
}
//first data > 1500, bypass and dropme
if(pktinfo.ip_totlen > KNI_DEFAULT_MTU){
pmeinfo->error = STREAM_ERROR_EXCEED_MTU;
KNI_LOG_DEBUG(logger, "Stream error: first data packet exceed MTU(1500), stream traceid = %s, stream addr = %s",
pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_EXCEED_MTU], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
// syn/ack
if(pktinfo.tcphdr->syn && pktinfo.tcphdr->ack){
pmeinfo->server_window = ntohs(pktinfo.tcphdr->window);
pmeinfo->has_syn_ack = 1;
kni_get_tcpopt(&(pmeinfo->server_tcpopt), pktinfo.tcphdr, pktinfo.tcphdr_len);
//dup traffic detect
if(g_kni_handle->dup_traffic_switch == 1){
if(pmeinfo->syn_ack_packet == NULL){
struct dup_traffic_dabloom_key *syn_ack_packet = ALLOC(struct dup_traffic_dabloom_key, 1);
dup_traffic_dabloom_key_get(&pktinfo, syn_ack_packet);
pmeinfo->syn_ack_packet = syn_ack_packet;
}
else{
struct dup_traffic_dabloom_key *syn_ack_packet = ALLOC(struct dup_traffic_dabloom_key, 1);
dup_traffic_dabloom_key_get(&pktinfo, syn_ack_packet);
if(memcmp(pmeinfo->syn_ack_packet, syn_ack_packet, sizeof(*syn_ack_packet)) == 0){
pmeinfo->has_dup_syn_ack = 1;
}
FREE(&(pmeinfo->syn_ack_packet));
pmeinfo->syn_ack_packet = syn_ack_packet;
}
}
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
//no data, maybe ack
if(pktinfo.data_len <= 0){
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
//not double dir, bypass and dropme
if(stream->dir != DIR_DOUBLE){
KNI_LOG_DEBUG(logger, "Stream error: asym routing, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_ASYM_ROUTING], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_ASYM_ROUTING;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
struct protocol_identify_result protocol_identify_res;
memset(&protocol_identify_res, 0, sizeof(protocol_identify_res));
protocol_identify(stream, pktinfo.data, pktinfo.data_len, &protocol_identify_res);
pmeinfo->protocol = protocol_identify_res.protocol;
switch(pmeinfo->protocol){
//can not identify protocol from first data packet, bypass and dropme
case KNI_PROTOCOL_UNKNOWN:
KNI_LOG_DEBUG(logger, "Stream error: failed at protocol_identify, stream traceid = %s, stream addr = %s",
pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_UNSUPPORTED_PROTOCOL], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_UNSUPPORTED_PROTOCOL;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
case KNI_PROTOCOL_SSL:
strncpy(pmeinfo->domain.sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->domain.sni) - 1));
break;
case KNI_PROTOCOL_HTTP:
strncpy(pmeinfo->domain.host, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->domain.host) - 1));
break;
default:
break;
}
//receive client hello, but no syn/ack, bypass and dropme
if(pmeinfo->has_syn == 0 || pmeinfo->has_syn_ack == 0){
KNI_LOG_DEBUG(logger, "Stream error: %s, %s, stream traceid = %s, stream addr = %s", pmeinfo->has_syn == 0 ? "no syn" : "have syn",
pmeinfo->has_syn_ack == 0 ? "no syn/ack" : "have syn/ack", pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_NO_SYN_ACK], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_NO_SYN_ACK;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
pmeinfo->action = intercept_policy_scan(g_kni_handle->maat_handle, (struct ipaddr*)(&stream->addr),
protocol_identify_res.domain, protocol_identify_res.domain_len,
thread_seq, &(pmeinfo->policy_id), &(pmeinfo->do_log), &(pmeinfo->maat_hit));
//policy scan log
char *action_str = kni_maat_action_trans(pmeinfo->action);
KNI_LOG_INFO(logger, "intercept_policy_scan: %s, %s, policy_id = %d, action = %d(%s), maat_hit = %d, stream traceid = %s, stream addr = %s",
stream_addr, protocol_identify_res.domain, pmeinfo->policy_id, pmeinfo->action, action_str, pmeinfo->maat_hit, pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_READY_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_READY_BYTE], 0, FS_OP_ADD, pktinfo.ip_totlen);
switch(pmeinfo->action){
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_POLICY], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
pmeinfo->intercept_state=0;
return APP_STATE_FAWPKT | APP_STATE_GIVEME; //GIVEME: for session record
case KNI_ACTION_INTERCEPT:
pmeinfo->intercept_state=1;
return first_data_intercept(stream, pmeinfo, &pktinfo, stream_addr, thread_seq);
default:
//action != intercept && action != bypassbypass and dropme
KNI_LOG_DEBUG(logger, "Stream error: action %d(%s) = invalid: policy_id = %d, domain = %s, stream traceid = %s, stream addr = %s",
pmeinfo->action, action_str, pmeinfo->policy_id, protocol_identify_res.domain, pmeinfo->stream_traceid, pmeinfo->stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_INVALID_ACTION], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_INVALID_ACTION;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, int thread_seq){
//close: a_packet = null, do not sendto tfe
clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time));
void *logger = g_kni_handle->local_logger;
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum;
pmeinfo->dir=stream->dir;
switch(pmeinfo->action){
case KNI_ACTION_INTERCEPT:
//reset clock: when sapp end, start clock
MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_traceid,
strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)));
tuple2stream_htable_del(stream, thread_seq);
return APP_STATE_DROPPKT | APP_STATE_DROPME;
case KNI_ACTION_BYPASS:
//KNI_LOG_DEBUG(logger, "action = bypass, set tfe_release = 1, stream_trace_id = %s", pmeinfo->stream_traceid);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
//stream has only syn, ack. no data.
default:
char *action_str = kni_maat_action_trans(pmeinfo->action);
pmeinfo->error = STREAM_ERROR_NO_DATA;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_NO_DATA], 0, FS_OP_ADD, 1);
KNI_LOG_DEBUG(logger, "Stream error: close_opstate, action %d(%s) = abnormal, stream traceid = %s, stream addr = %s",
pmeinfo->action, action_str, pmeinfo->stream_traceid, pmeinfo->stream_addr);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
//from syn
extern "C" char kni_tcpall_entry(struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){
void *logger = g_kni_handle->local_logger;
int ret;
int can_destroy;
struct pme_info *pmeinfo = *(struct pme_info **)pme;
/* a_packet == NULL && not op_state_close, continue
close: a_packet may be null, if a_packet = null, do not send to tfe
*/
if(a_packet == NULL && stream->pktstate != OP_STATE_CLOSE){
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
switch(stream->pktstate){
case OP_STATE_PENDING:
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STATE_PENDING], 0, FS_OP_ADD, 1);
pmeinfo = ALLOC(struct pme_info, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_CNT], 0, FS_OP_ADD, 1);
*pme = pmeinfo;
//stream error: pme init fail
ret = pme_info_init(pmeinfo, stream, thread_seq);
if(ret < 0){
KNI_LOG_DEBUG(logger, "Stream error: fail at pme_info_init, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
pmeinfo->error = STREAM_ERROR_PME_INIT_FAIL;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_PME_INIT_FAIL], 0, FS_OP_ADD, 1);
goto error_out;
}
//stream error: no tfe
pmeinfo->tfe_id = tfe_mgr_alive_node_get(g_kni_handle->_tfe_mgr, thread_seq);
if(pmeinfo->tfe_id < 0){
KNI_LOG_DEBUG(logger, "Stream error: no available tfe, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
pmeinfo->error = STREAM_ERROR_NO_TFE;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STMERR_NO_TFE], 0, FS_OP_ADD, 1);
goto error_out;
}
ret = pending_opstate(stream, pmeinfo, a_packet, thread_seq);
if(pmeinfo->error < 0){
goto error_out;
}
break;
case OP_STATE_DATA:
ret = data_opstate(stream, pmeinfo, a_packet, thread_seq);
//exception stream, dropme and destroy pmeinfo
if(pmeinfo->error < 0){
goto error_out;
}
break;
case OP_STATE_CLOSE:
//sapp stream close
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STATE_CLOSE], 0, FS_OP_ADD, 1);
ret = close_opstate(stream, pmeinfo, thread_seq);
if(pmeinfo->error < 0){
goto error_out;
}
break;
default:
ret = APP_STATE_FAWPKT | APP_STATE_GIVEME;
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STATE_UNKNOWN], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "Unknown stream opstate %d, stream traceid = %s, stream addr = %s",
stream->pktstate, pmeinfo->stream_traceid, pmeinfo->stream_addr);
break;
}
//sapp release: bypass or intercept
if((ret & APP_STATE_DROPME)){
can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_SAPP);
if(can_destroy == 1){
if(pmeinfo->action == KNI_ACTION_INTERCEPT){
traceid2pme_htable_del(pmeinfo);
}
stream_destroy(pmeinfo, pmeinfo->do_log);
}
}
return ret;
//error out: stream error, send log and destroy_pme, do not need to del htable
error_out:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM_ERR], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
if(pmeinfo != NULL){
pmeinfo->policy_id = -1;
stream_destroy(pmeinfo, 1);
}
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
void http_project_free(int thread_seq, void *project_req_value){
FREE(&project_req_value);
}
static int http_project_init(){
void *logger = g_kni_handle->local_logger;
int id = project_producer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT, http_project_free);
if(id < 0){
KNI_LOG_ERROR(logger, "Failed at project_producer_register, project name = %s, ret = %d", HTTP_PROJECT_NAME, id);
return -1;
}
id = project_customer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT);
if(id < 0){
KNI_LOG_ERROR(logger, "Failed at project_customer_register, project name = %s, ret = %d", HTTP_PROJECT_NAME, id);
return -1;
}
return id;
}
extern "C" char kni_http_entry(stSessionInfo* session_info, void **pme, int thread_seq, struct streaminfo *a_stream, const void *a_packet){
http_infor* http_info = (http_infor*)(session_info->app_info);
//only process first http session
if(http_info->http_session_seq != 1){
return PROT_STATE_DROPME;
}
if(session_info->prot_flag != HTTP_HOST){
return PROT_STATE_GIVEME;
}
int host_len = MIN(session_info->buflen, KNI_DEFAULT_MTU);
struct http_project* host_info = ALLOC(struct http_project, 1);
host_info->host_len = host_len;
memcpy(host_info->host, session_info->buf, host_len);
if(project_req_add_struct(a_stream, g_kni_handle->http_project_id, host_info) < 0){
FREE(&host_info);
host_info = NULL;
}
return PROT_STATE_DROPME;
}
static void kni_marsio_destroy(struct kni_marsio_handle *handle){
if(handle != NULL){
if(handle->instance != NULL){
marsio_destory(handle->instance);
}
}
FREE(&handle);
handle = NULL;
}
int tuple2stream_htable_search(MESA_htable_handle handle, struct ethhdr *ether_hdr, int thread_seq){
void *logger = g_kni_handle->local_logger;
if(ether_hdr->h_proto != htons(ETH_P_IP) && ether_hdr->h_proto != htons(ETH_P_IPV6)){
return -1;
}
void *raw_packet = (char*)ether_hdr + sizeof(*ether_hdr);
tuple2stream_htable_value *value = NULL;
struct pkt_info pktinfo;
int reversed = 0, ret;
int key_size = 0;
char key_str[KNI_ADDR_MAX];
//ipv6
if(ether_hdr->h_proto == htons(ETH_P_IPV6)){
ret = kni_ipv6_header_parse(raw_packet, &pktinfo);
if(ret < 0){
char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "failed at parse ipv6 header, errmsg = %s", errmsg);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1);
return -1;
}
struct stream_tuple4_v6 key;
kni_addr_trans_v6(&key, key_str, sizeof(key_str));
tuple2stream_htable_key_get_v6_by_packet(&pktinfo, &key, &reversed);
value = (tuple2stream_htable_value*)MESA_htable_search(handle, (const unsigned char*)(&key), sizeof(key));
}
//ipv4
else{
ret = kni_ipv4_header_parse(raw_packet, &pktinfo);
if(ret < 0){
char *errmsg = kni_ipv4_errmsg_get((enum kni_ipv4hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "failed at parse ipv4 header, errmsg = %s", errmsg);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL], 0, FS_OP_ADD, 1);
return -1;
}
struct stream_tuple4_v4 key;
key_size = sizeof(key);
tuple2stream_htable_key_get_v4_by_packet(&pktinfo, &key, &reversed);
kni_addr_trans_v4(&key, key_str, sizeof(key_str));
value = (tuple2stream_htable_value*)MESA_htable_search(handle, (const unsigned char*)(&key), key_size);
}
if(value == NULL){
KNI_LOG_DEBUG(logger, "MESA_htable: search not hit, table is tuple2stream_htable, key = %s, key_size = %d", key_str, key_size);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_MISS], 0, FS_OP_ADD, 1);
return -1;
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_HIT], 0, FS_OP_ADD, 1);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_SUCC], 0, FS_OP_ADD, 1);
unsigned char dir = value->route_dir;
if(reversed != value->reversed){
dir = MESA_dir_reverse(dir);
}
ret = sapp_inject_pkt(value->stream, SIO_EXCLUDE_THIS_LAYER_HDR, raw_packet, pktinfo.ip_totlen, dir);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at sapp_inject_pkt, stream addr = %s", key_str);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SAPP_INJECT_FAIL], 0, FS_OP_ADD, 1);
return -1;
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SAPP_INJECT_SUCC], 0, FS_OP_ADD, 1);
//add to dabloom
if(g_kni_handle->dup_traffic_switch == 1){
if(value->pmeinfo->has_dup_traffic == 1){
ret = dabloom_add(&pktinfo, thread_seq);
if(ret < 0){
return -1;
}
}
}
return 0;
}
extern "C" char kni_polling_all_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){
void *logger = g_kni_handle->local_logger;
MESA_htable_handle tuple2stream_htable = g_kni_handle->threads_handle[thread_seq].tuple2stream_htable;
int flag = 0;
//normal mode
if(g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_NORMAL){
//polling tfe
for(int i = 0; i < g_kni_handle->marsio_handle->tfe_enabled_node_count; i++){
marsio_buff_t *rx_buffs[BURST_MAX];
int nr_burst = 1;
struct mr_vdev *dev_eth_handler = g_kni_handle->marsio_handle->tfe_enabled_nodes[i].dev_eth_handler;
//receive from tfe, nr_recv <= nr_burst <= BURST_MAX
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst);
if(nr_recv <= 0){
continue;
}
for(int j = 0; j < nr_recv; j++){
struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[j]);
tuple2stream_htable_search(tuple2stream_htable, ether_hdr, thread_seq);
flag = 1;
}
marsio_buff_free(g_kni_handle->marsio_handle->instance, rx_buffs, nr_recv, 0, 0);
}
}
//tun mode
else{
char buff[KNI_MTU];
int ret = kni_tun_read(g_kni_handle->tun_handle, buff, sizeof(buff));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at read from tun");
}
else{
if(ret > 0){
struct ethhdr *ether_hdr = (struct ethhdr*)buff;
tuple2stream_htable_search(tuple2stream_htable, ether_hdr, thread_seq);
flag = 1;
}
}
}
return flag;
}
static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type,
uint16_t value_size_max, void *logger){
uint16_t value_size = 0;
unsigned char *value = NULL;
int ret = kni_cmsg_get(cmsg, type, &value_size, &value);
if(ret < 0){
if(ret == KNI_CMSG_INVALID_TYPE){
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type = %d, ret = %d, stream traceid = %s, stream addr = %s",
type, ret, pmeinfo->stream_traceid, pmeinfo->stream_addr);
}
return -1;
}
if(value_size > value_size_max){
KNI_LOG_ERROR(logger, "kni_cmsg_get: type = %d, size = %d, which should <= %d, stream traceid = %s, stream addr = %s",
type, value_size, value_size_max, pmeinfo->stream_traceid, pmeinfo->stream_addr);
return -1;
}
switch(type)
{
case TFE_CMSG_SSL_INTERCEPT_STATE:
memcpy((char*)&(pmeinfo->intercept_state), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size);
break;
case TFE_CMSG_SSL_DOWNSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_client_side_latency), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_VERSION:
memcpy(pmeinfo->ssl_server_side_version, value, value_size);
break;
case TFE_CMSG_SSL_DOWNSTREAM_VERSION:
memcpy(pmeinfo->ssl_client_side_version, value, value_size);
break;
case TFE_CMSG_SSL_PINNING_STATE:
memcpy((char*)&(pmeinfo->pinningst), value, value_size);
break;
case TFE_CMSG_SSL_CERT_VERIFY:
memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size);
break;
case TFE_CMSG_SSL_ERROR:
memcpy((char*)&(pmeinfo->ssl_error), value, value_size);
break;
default:
break;
}
return 0;
}
static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size, void *user_args){
struct traceid2pme_search_cb_args *args = (struct traceid2pme_search_cb_args*)user_args;
void *logger = args->logger;
struct kni_cmsg *cmsg = args->cmsg;
struct pme_info *pmeinfo = (struct pme_info*)data;
int can_destroy;
if(pmeinfo != NULL){
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger);
clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time));
KNI_LOG_DEBUG(logger, "recv cmsg from tfe, stream traceid = %s, stream addr = %s", pmeinfo->stream_traceid, pmeinfo->stream_addr);
can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_TFE);
if(can_destroy == 1){
traceid2pme_htable_del(pmeinfo);
stream_destroy(pmeinfo, pmeinfo->do_log);
}
}
kni_cmsg_destroy(cmsg);
return 0;
}
static void* thread_tfe_cmsg_receiver(void *args){
struct thread_tfe_cmsg_receiver_args *_args = (struct thread_tfe_cmsg_receiver_args*)args;
const char *profile = _args->profile;
const char *section = "tfe_cmsg_receiver";
void *logger = _args->logger;
char listen_eth[KNI_SYMBOL_MAX];
uint32_t listen_ip;
int listen_port = -1;
char buff[KNI_MTU];
int sockfd = 0;
struct sockaddr_in server_addr, client_addr;
int ret = MESA_load_profile_string_nodef(profile, section, "listen_eth", listen_eth, sizeof(listen_eth));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: listen_eth not set, profile = %s, section = %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "listen_port", &listen_port);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: listen_port not set, profile = %s, section = %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d",
section, listen_eth, listen_port);
FREE(&args);
//create socket
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at create udp socket, errno = %d, %s", errno, strerror(errno));
goto error_out;
}
memset(&server_addr, 0, sizeof(server_addr));
memset(&client_addr, 0, sizeof(client_addr));
ret = kni_ipv4_addr_get_by_eth(listen_eth, &listen_ip);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth = %s", listen_eth);
goto error_out;
}
server_addr.sin_family = AF_INET; // IPv4
server_addr.sin_addr.s_addr = listen_ip;
server_addr.sin_port = htons(listen_port);
//bind
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at bind udp socket, errno = %d, %s", errno, strerror(errno));
goto error_out;
}
//receive
while(true){
socklen_t client_len = sizeof(client_addr);
int recv_len = recvfrom(sockfd, (char *)buff, sizeof(buff), MSG_WAITALL,
(struct sockaddr*)&client_addr, &client_len);
if(recv_len < 0){
KNI_LOG_ERROR(logger, "Failed at recv udp data, errno = %d, %s", errno, strerror(errno));
continue;
}
//KNI_LOG_DEBUG(logger, "recv udp data: recv_len = %d\n", recv_len);
struct kni_cmsg *cmsg = NULL;
ret = kni_cmsg_deserialize((const unsigned char*)buff, recv_len, &cmsg);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret = %d", ret);
continue;
}
//get stream_traceid
unsigned char *stream_traceid = NULL;
uint16_t value_size;
ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_traceid);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type = %d, ret = %d", TFE_CMSG_STREAM_TRACE_ID, ret);
continue;
}
//get pme
long cb_ret = -1;
struct traceid2pme_search_cb_args cb_args;
memset((void*)&cb_args, 0, sizeof(cb_args));
cb_args.cmsg = cmsg;
cb_args.logger = logger;
MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_traceid,
value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret);
}
return NULL;
error_out:
if(sockfd >= 0){
close(sockfd);
}
return NULL;
}
static struct kni_marsio_handle* kni_marsio_init(const char* profile, int tfe_node_count){
void *logger = g_kni_handle->local_logger;
const char* section = "marsio";
char appsym[KNI_SYMBOL_MAX];
unsigned int opt_value = 1;
int tfe_node_enabled;
struct mr_instance *mr_inst = NULL;
struct mr_vdev *dev_eth_handler = NULL;
struct mr_sendpath *dev_eth_sendpath = NULL;
struct kni_marsio_handle *handle = NULL;
int j;
int ret = MESA_load_profile_string_nodef(profile, section, "appsym", appsym, sizeof(appsym));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: appsym not set, profile = %s, section = %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n appsym: %s", section, appsym);
mr_inst = marsio_create();
if(mr_inst == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio instance");
goto error_out;
}
handle = ALLOC(struct kni_marsio_handle, 1);
handle->instance = mr_inst;
marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value));
marsio_init(mr_inst, appsym);
j = 0;
for(int i = 0; i < tfe_node_count; i++){
//load tfe conf
char _section[KNI_SYMBOL_MAX];
char dev_eth_symbol[KNI_SYMBOL_MAX];
snprintf(_section, sizeof(_section), "tfe%d", i);
MESA_load_profile_int_def(profile, _section, "enabled", &tfe_node_enabled, 1);
if(tfe_node_enabled != 1){
continue;
}
struct tfe_enabled_node tfe_node;
memset(&tfe_node, 0, sizeof(tfe_node));
ret = MESA_load_profile_string_nodef(profile, _section, "dev_eth_symbol", dev_eth_symbol, sizeof(dev_eth_symbol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile = %s, section = %s", profile, _section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n enabled: %d\n dev_eth_symbol: %s",
_section, tfe_node_enabled, dev_eth_symbol);
//eth_handler receive thread = thread_count, send thread = thread_count
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->thread_count, g_kni_handle->thread_count);
if(dev_eth_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol = %s", dev_eth_symbol);
goto error_out;
}
//sendpath
dev_eth_sendpath = marsio_sendpath_create_by_vdev(dev_eth_handler);
if(dev_eth_sendpath == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol = %s", dev_eth_symbol);
goto error_out;
}
//tfe_node
tfe_node.dev_eth_handler = dev_eth_handler;
tfe_node.dev_eth_sendpath = dev_eth_sendpath;
tfe_node.tfe_id = i;
handle->tfe_enabled_nodes[j++] = tfe_node;
}
handle->tfe_enabled_node_count = j;
//marsio_thread_init(mr_instance);
return handle;
error_out:
kni_marsio_destroy(handle);
return NULL;
}
static void fs_destroy(struct kni_field_stat_handle *fs_handle){
if(fs_handle != NULL){
FS_stop(&(fs_handle->handle));
}
FREE(&fs_handle);
}
static struct kni_field_stat_handle * fs_init(const char *profile){
void *logger = g_kni_handle->local_logger;
const char *section = "field_stat";
char local_path[KNI_PATH_MAX];
struct kni_field_stat_handle *fs_handle = NULL;
screen_stat_handle_t handle = NULL;
const char *app_name = "fs2_kni";
int value = 0, ret;
int remote_switch = 0;
char remote_ip[INET_ADDRSTRLEN];
int remote_port;
MESA_load_profile_int_def(profile, section, "remote_switch", &remote_switch, 0);
MESA_load_profile_string_def(profile, section, "local_path", local_path, sizeof(local_path), "./fs2_kni.status");
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n remote_switch: %d\n local_path: %s", section, remote_switch, local_path);
handle = FS_create_handle();
if(handle == NULL){
KNI_LOG_ERROR(logger, "Failed at create FS_create_handle");
goto error_out;
}
if(remote_switch == 1){
ret = MESA_load_profile_string_nodef(profile, section, "remote_ip", remote_ip, sizeof(remote_ip));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: remote_ip not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "remote_port", &remote_port);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: remote_port not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n remote_ip: %s\n remote_port: %d", section, remote_ip, remote_port);
FS_set_para(handle, STATS_SERVER_IP, remote_ip, strlen(remote_ip));
FS_set_para(handle, STATS_SERVER_PORT, &remote_port, sizeof(remote_port));
value=FS_OUTPUT_STATSD;
FS_set_para(handle, STATS_FORMAT, &value, sizeof(value));
}
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
fs_handle->handle = handle;
FS_set_para(handle, APP_NAME, app_name, strlen(app_name) + 1);
FS_set_para(handle, OUTPUT_DEVICE, local_path, strlen(local_path)+1);
value = 0;
FS_set_para(handle, FLUSH_BY_DATE, &value, sizeof(value));
value = 1;
FS_set_para(handle, PRINT_MODE, &value, sizeof(value));
value = 1;
FS_set_para(handle, CREATE_THREAD, &value, sizeof(value));
value = 5;
FS_set_para(handle, STAT_CYCLE, &value, sizeof(value));
value = 4096;
FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
fs_handle->handle = handle;
//bypass stream
fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm");
fs_handle->fields[KNI_FIELD_BYP_STM_POLICY] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_policy");
fs_handle->fields[KNI_FIELD_BYP_STM_ERR] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm_err");
//stream error
fs_handle->fields[KNI_FIELD_STMERR_ASYM_ROUTING] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_asym_route");
fs_handle->fields[KNI_FIELD_STMERR_NO_SYN] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_no_syn");
fs_handle->fields[KNI_FIELD_STMERR_NO_SYN_ACK] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_no_s/a");
fs_handle->fields[KNI_FIELD_STMERR_NO_DATA] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_no_data");
fs_handle->fields[KNI_FIELD_STMERR_UNSUPPORTED_PROTOCOL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_unspt_pro");
fs_handle->fields[KNI_FIELD_STMERR_INVALID_IP_HDR] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_ip_hdr");
fs_handle->fields[KNI_FIELD_STMERR_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_exc_mtu");
//stream error: internal error
fs_handle->fields[KNI_FIELD_STMERR_INVALID_ACTION] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_invaid_act");
fs_handle->fields[KNI_FIELD_STMERR_SENDTO_TFE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_tfe_tx");
fs_handle->fields[KNI_FIELD_STMERR_TUPLE2STM_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_tup2stm_add");
fs_handle->fields[KNI_FIELD_STMERR_NO_TFE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_no_tfe");
fs_handle->fields[KNI_FIELD_STMERR_PME_INIT_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_pme_init");
fs_handle->fields[KNI_FIELD_STMERR_DUP_TRAFFIC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_dup_tfc");
fs_handle->fields[KNI_FIELD_STMERR_CMSG_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_cmsg_add");
//intercept stream
fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm");
fs_handle->fields[KNI_FIELD_INTCP_BYTE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_B");
fs_handle->fields[KNI_FIELD_IPV4_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv4_stm");
fs_handle->fields[KNI_FIELD_IPV6_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_stm");
fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm");
fs_handle->fields[KNI_FIELD_HTTP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "http_stm");
fs_handle->fields[KNI_FIELD_DUP_TFC_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "dup_tfc_stm");
fs_handle->fields[KNI_FIELD_DUP_TFC_BYTE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "dup_tfc_B");
//intercept ready stream
fs_handle->fields[KNI_FIELD_INTCP_READY_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_rdy_stm");
fs_handle->fields[KNI_FIELD_INTCP_READY_BYTE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_rdy_B");
//pme
fs_handle->fields[KNI_FIELD_PME_NEW_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_new");
fs_handle->fields[KNI_FIELD_PME_FREE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_free");
fs_handle->fields[KNI_FIELD_PME_CNT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_cnt");
//errors
fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_sendlog");
fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_id2pme_add");
fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_id2pme_del");
fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_tup2stm_add");
fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_tup2stm_del");
fs_handle->fields[KNI_FIELD_SAPP_INJECT_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_sapp_inject");
fs_handle->fields[KNI_FIELD_BLOOM_SEARCH_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_bloom_srch");
fs_handle->fields[KNI_FIELD_BLOOM_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "e_bloom_add");
//htable
fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_S");
fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_S");
fs_handle->fields[KNI_FIELD_ID2PME_CNT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_cnt");
fs_handle->fields[KNI_FIELD_TUPLE2STM_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_add_S");
fs_handle->fields[KNI_FIELD_TUPLE2STM_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_del_S");
fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_HIT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_hit");
fs_handle->fields[KNI_FIELD_TUPLE2STM_SEARCH_MISS] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tup2stm_miss");
//sendlog
fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_S");
//sapp_inject
fs_handle->fields[KNI_FIELD_SAPP_INJECT_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sapp_inject_S");
//dabloom
fs_handle->fields[KNI_FIELD_BLOOM_HIT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "bloom_hit");
fs_handle->fields[KNI_FIELD_BLOOM_MISS] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "bloom_miss");
if(g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_NORMAL){
for(int i = 0; i < g_kni_handle->marsio_handle->tfe_enabled_node_count; i++){
int tfe_id = g_kni_handle->marsio_handle->tfe_enabled_nodes[i].tfe_id;
char tfe_status[KNI_SYMBOL_MAX] = "";
snprintf(tfe_status, sizeof(tfe_status), "tfe%d", tfe_id);
fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + i] = FS_register(handle, FS_STYLE_STATUS, FS_CALC_CURRENT, tfe_status);
}
}
//table
char buff[KNI_PATH_MAX];
for(int i = 0; i < g_kni_handle->thread_count; i++){
snprintf(buff, sizeof(buff), "tid%d", i);
fs_handle->column_ids[i] = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff);
}
snprintf(buff, sizeof(buff), "Total");
//lines
fs_handle->column_ids[g_kni_handle->thread_count] = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff);
snprintf(buff, sizeof(buff), "bloom_cnt");
fs_handle->line_ids[0] = FS_register(handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff);
snprintf(buff, sizeof(buff), "tuple2stm_cnt");
fs_handle->line_ids[1] = FS_register(handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff);
fs_handle->handle = handle;
FS_start(handle);
return fs_handle;
error_out:
fs_destroy(fs_handle);
return NULL;
}
extern "C" void kni_destroy(struct kni_handle *handle){
if(handle != NULL){
}
FREE(&handle);
handle = NULL;
}
//eliminate_type: 0:FIFO; 1:LRU
//ret: 1: the item can be eliminated; 0: the item can't be eliminated
static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
struct pme_info *pmeinfo = (struct pme_info*)data;
int can_destroy;
if(pmeinfo->sapp_release == 1){
can_destroy = judge_stream_can_destroy(pmeinfo, CALLER_TFE);
if(can_destroy == 1){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_CNT], 0, FS_OP_ADD, -1);
stream_destroy(pmeinfo, pmeinfo->do_log);
return 1;
}
}
return 0;
}
static void tuple2stream_htable_data_free_cb(void *data){
FREE(&data);
}
int dup_traffic_dabloom_init(const char *profile, void *logger){
const char *section = "dup_traffic";
MESA_load_profile_int_def(profile, section, "switch", &(g_kni_handle->dup_traffic_switch), 0);
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n switch: %d", section, g_kni_handle->dup_traffic_switch);
if(g_kni_handle->dup_traffic_switch == 1){
unsigned int capacity = 0;
char error_rate_str[KNI_SYMBOL_MAX];
double error_rate = 0.05;
int expiry_time = 0;
MESA_load_profile_int_def(profile, section, "action", &(g_kni_handle->dup_traffic_action), KNI_ACTION_BYPASS);
MESA_load_profile_uint_def(profile, section, "capacity", &capacity, 1000000);
MESA_load_profile_string_def(profile, section, "error_rate", error_rate_str, sizeof(error_rate_str), "0.05");
MESA_load_profile_int_def(profile, section, "expiry_time", &expiry_time, 30);
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n action: %d\n capacity: %d\n error_rate: %s\n expiry_time: %d",
section, g_kni_handle->dup_traffic_action, capacity, error_rate_str, expiry_time);
error_rate = atof(error_rate_str);
for(int i = 0; i < g_kni_handle->thread_count; i++){
struct expiry_dablooms_handle* dabloom_handle = expiry_dablooms_init(capacity, error_rate, expiry_time);
if(dabloom_handle == NULL){
KNI_LOG_ERROR(logger, "Failed at expiry_dablooms_init, capacity = %d,"
"error_rate = %lf, expire_time = %d", capacity, error_rate, expiry_time);
return -1;
}
g_kni_handle->threads_handle[i].dabloom_handle = dabloom_handle;
}
return 0;
}
return 0;
}
/*
void my_handler(int s){
printf("Caught signal %d\n",s);
exit(1);
}
int register_signal_handle(){
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = my_handler;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
return 0;
}
*/
extern "C" int kni_init(){
//register_signal_handle();
char *kni_git_verison = (char*)KNI_GIT_VERSION;
const char *profile = "./etc/kni/kni.conf";
const char *section = "global";
//init logger
char log_path[KNI_PATH_MAX] = "";
int tfe_node_count = 1;
char manage_eth[KNI_SYMBOL_MAX] = "";
struct kni_send_logger *send_logger = NULL;
struct kni_field_stat_handle *fs_handle = NULL;
int id = -1;
void *local_logger = NULL;
int log_level = -1;
pthread_t thread_id = -1;
struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args;
MESA_htable_handle traceid2pme_htable = NULL;
struct tfe_mgr *_tfe_mgr = NULL;
int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path));
if(ret < 0){
printf("MESA_prof_load: log_path not set, profile = %s, section = %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "log_level", &log_level);
if(ret < 0){
printf("MESA_prof_load: log_level not set, profile = %s, section = %s", profile, section);
goto error_out;
}
local_logger = MESA_create_runtime_log_handle(log_path, log_level);
if (unlikely(local_logger == NULL)){
printf("Failed at create logger: %s", log_path);
goto error_out;
}
g_kni_handle = ALLOC(struct kni_handle, 1);
g_kni_handle->local_logger = local_logger;
//kni_git_log
KNI_LOG_ERROR(local_logger, "----------kni version = %s-----------", kni_git_verison);
char deploy_mode[KNI_SYMBOL_MAX];
ret = MESA_load_profile_string_def(profile, section, "deploy_mode", deploy_mode, sizeof(deploy_mode), "normal");
g_kni_handle->deploy_mode = KNI_DEPLOY_MODE_NORMAL;
if(strcmp(deploy_mode, "tun") == 0){
g_kni_handle->deploy_mode = KNI_DEPLOY_MODE_TUN;
}
if(g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_NORMAL){
ret = MESA_load_profile_int_nodef(profile, section, "tfe_node_count", &tfe_node_count);
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_node_count not set, profile = %s, section = %s", profile, section);
goto error_out;
}
if(tfe_node_count > TFE_COUNT_MAX){
KNI_LOG_ERROR(local_logger, "tfe_node_count = %d, exceed the max_tfe_node_count %d", tfe_node_count, TFE_COUNT_MAX);
goto error_out;
}
if(tfe_node_count <= 0){
KNI_LOG_ERROR(local_logger, "tfe_node_count = %d, <= 0", tfe_node_count);
goto error_out;
}
}
ret = MESA_load_profile_string_nodef(profile, section, "manage_eth", manage_eth, sizeof(manage_eth));
if(ret < 0){
printf("MESA_prof_load: manage_eth not set, profile = %s, section = %s", profile, section);
goto error_out;
}
char src_mac_addr_str[KNI_SYMBOL_MAX];
char dst_mac_addr_str[KNI_SYMBOL_MAX];
ret = MESA_load_profile_string_nodef(profile, section, "src_mac_addr", src_mac_addr_str, sizeof(src_mac_addr_str));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: src_mac_addr not set, profile = %s, section = %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "dst_mac_addr", dst_mac_addr_str, sizeof(dst_mac_addr_str));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: dst_mac_addr not set, profile = %s, section = %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_node_count: %d\n manage_eth: %s\n deploy_mode: %s\n"
"src_mac_addr: %s\n dst_mac_addr: %s", section, log_path, log_level, tfe_node_count, manage_eth, deploy_mode, src_mac_addr_str, dst_mac_addr_str);
//ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa
ret = sscanf(src_mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
&(g_kni_handle->src_mac_addr[0]), &(g_kni_handle->src_mac_addr[1]),
&(g_kni_handle->src_mac_addr[2]), &(g_kni_handle->src_mac_addr[3]),
&(g_kni_handle->src_mac_addr[4]), &(g_kni_handle->src_mac_addr[5]));
if(ret != 6){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: src_mac_addr = invalid, ret = %d, profile = %s, section = %s", ret, profile, section);
goto error_out;
}
ret = sscanf(dst_mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
&(g_kni_handle->dst_mac_addr[0]), &(g_kni_handle->dst_mac_addr[1]),
&(g_kni_handle->dst_mac_addr[2]), &(g_kni_handle->dst_mac_addr[3]),
&(g_kni_handle->dst_mac_addr[4]), &(g_kni_handle->dst_mac_addr[5]));
if(ret != 6){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: dst_mac_addr = invalid, ret = %d, profile = %s, section = %s", ret, profile, section);
goto error_out;
}
//init http_project
id = http_project_init();
if(id < 0){
KNI_LOG_ERROR(local_logger, "Failed at init http project, ret = %d", id);
goto error_out;
}
g_kni_handle->http_project_id = id;
// get thread count
g_kni_handle->thread_count = get_thread_count();
if(g_kni_handle->thread_count <= 0){
KNI_LOG_ERROR(local_logger, "Failed at get_thread_count, ret = %d");
goto error_out;
}
//init marsio
if(g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_NORMAL){
g_kni_handle->marsio_handle = kni_marsio_init(profile, tfe_node_count);
if(g_kni_handle->marsio_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init marsio");
goto error_out;
}
}
//init tun
if(g_kni_handle->deploy_mode == KNI_DEPLOY_MODE_TUN){
char tun_name[KNI_SYMBOL_MAX];
ret = MESA_load_profile_string_nodef(profile, section, "tun_name", tun_name, sizeof(tun_name));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tun_name not set, profile = %s, section = %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n tun_name: %s", section, tun_name);
g_kni_handle->tun_handle = kni_tun_init(tun_name, KNI_TUN_MODE_NOBLOCK, local_logger);
if(g_kni_handle->tun_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kni_tun");
goto error_out;
}
}
//init maat
g_kni_handle->maat_handle = kni_maat_init(profile, local_logger, g_kni_handle->thread_count);
if(g_kni_handle->maat_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init maat");
goto error_out;
}
//init_filedstat
fs_handle = fs_init(profile);
if(fs_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init field_stat");
goto error_out;
}
g_kni_fs_handle = fs_handle;
//init local_ipv4
ret = kni_ipv4_addr_get_by_eth(manage_eth, &(g_kni_handle->local_ipv4));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth = %s", manage_eth);
goto error_out;
}
//init kni_send_logger
send_logger = kni_send_logger_init(profile, local_logger);
if(send_logger == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", manage_eth);
goto error_out;
}
g_kni_handle->send_logger = send_logger;
//init traceid2pme_htable
traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", NULL,
(void*)traceid2pme_htable_expire_notify_cb, local_logger);
if(traceid2pme_htable == NULL){
KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable");
goto error_out;
}
g_kni_handle->traceid2pme_htable = traceid2pme_htable;
//init tuple2stream_htable
g_kni_handle->threads_handle = ALLOC(struct per_thread_handle, g_kni_handle->thread_count);
for(int i = 0; i < g_kni_handle->thread_count; i++){
MESA_htable_handle tuple2stream_htable = kni_create_htable(profile, "tuple2stream_htable",
(void*)tuple2stream_htable_data_free_cb, NULL, local_logger);
if(tuple2stream_htable == NULL){
KNI_LOG_ERROR(local_logger, "Failed at kni_create_htable, table = tuple2stream_htable");
goto error_out;
}
g_kni_handle->threads_handle[i].tuple2stream_htable = tuple2stream_htable;
}
//init dabloom_handle
ret = dup_traffic_dabloom_init(profile, local_logger);
if(ret < 0){
goto error_out;
}
//init tfe_mgr
_tfe_mgr = tfe_mgr_init(tfe_node_count, profile, g_kni_handle->deploy_mode, local_logger);
if(_tfe_mgr == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init tfe_mgr");
goto error_out;
}
g_kni_handle->_tfe_mgr = _tfe_mgr;
//create thread_tfe_cmsg_receiver
cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1);
cmsg_receiver_args->logger = local_logger;
strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1));
ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func = thread_tfe_cmsg_receiver, errno = %d, errmsg = %s", errno, strerror(errno));
FREE(&cmsg_receiver_args);
goto error_out;
}
return 0;
error_out:
kni_destroy(g_kni_handle);
exit(0);
}