This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-kni/entry/src/kni_entry.cpp
2019-06-14 21:40:04 +08:00

1821 lines
73 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "kni_utils.h"
#include "ssl_utils.h"
#include "marsio.h"
#include "kni_maat.h"
#include "MESA/http.h"
#include "kni_cmsg.h"
#include "uuid/uuid.h"
#include "cjson/cJSON.h"
#include "kni_send_logger.h"
#include <pthread.h>
#include <linux/if_ether.h>
extern int g_iThreadNum;
struct kni_handle *g_kni_handle = NULL;
struct kni_field_stat_handle *g_kni_fs_handle = NULL;
#define HTTP_PROJECT_NAME "kni_http_tag"
#define BURST_MAX 1
#define STREAM_TRACEID_LEN 37
#define TFE_COUNT_MAX 16
#define CALLER_SAPP 0
#define CALLER_TFE 1
enum kni_protocol{
KNI_PROTOCOL_UNKNOWN = 0,
KNI_PROTOCOL_SSL,
KNI_PROTOCOL_HTTP,
};
enum stream_error{
STREAM_ERROR_PENDING_NO_SYN = -1,
STREAM_ERROR_SINGLE_DIR = -2,
STREAM_ERROR_PROTOCOL_UNKNOWN = -3,
STREAM_ERROR_NO_SYN_ACK = -4,
STREAM_ERROR_INVALID_ACTION = -5,
STREAM_ERROR_NO_DATA = -6,
STREAM_ERROR_IPV4HDR_PARSE_FAIL = -7,
STREAM_ERROR_IPV6HDR_PARSE_FAIL = -8,
STREAM_ERROR_DUP_STREAM = -9,
STREAM_ERROR_EXCEED_MTU = -10,
};
struct http_project{
int host_len;
char host[KNI_DOMAIN_MAX];
};
struct pme_info{
addr_type_t addr_type;
int protocol;
int policy_id;
int maat_hit;
enum kni_action action;
int service;
struct kni_tcpopt_info *client_tcpopt;
struct kni_tcpopt_info *server_tcpopt;
uint16_t client_window;
uint16_t server_window;
int tfe_id;
pthread_mutex_t lock;
enum stream_error error;
char stream_traceid[STREAM_TRACEID_LEN];
//cjson check protocol
union{
char host[KNI_DOMAIN_MAX]; //http only
char sni[KNI_DOMAIN_MAX]; //ssl only
}domain;
//tfe_release = 1: tfe don't need pmeinfo
int tfe_release;
int sapp_release;
//kafka log
struct layer_addr *addr;
unsigned char dir;
uint64_t server_bytes;
uint64_t client_bytes;
uint64_t server_pkts;
uint64_t client_pkts;
struct timespec start_time;
struct timespec end_time;
uint64_t con_duration_ms;
//from tfe, kafka log
uint64_t intercept_state;
uint64_t pinningst; //defalut 0
uint64_t ssl_server_side_latency;
uint64_t ssl_client_side_latency;
char ssl_server_side_version[KNI_SYMBOL_MAX];
char ssl_client_side_version[KNI_SYMBOL_MAX];
uint64_t ssl_cert_verify;
char ssl_error[KNI_STRING_MAX];
};
struct wrapped_packet{
char data[KNI_MTU];
};
struct tcp_option_restore{
uint8_t kind;
uint8_t len;
uint16_t offset;
};
struct tfe_instance{
struct mr_vdev *dev_eth_handler;
struct mr_sendpath *dev_eth_sendpath;
char mac_addr[6];
};
struct kni_marsio_handle{
struct mr_instance *instance;
struct tfe_instance *tfe_instance_list[TFE_COUNT_MAX];
struct mr_vdev *dev_vxlan_handler;
struct mr_sendpath *dev_vxlan_sendpath;
char src_mac_addr[6];
};
struct protocol_identify_result{
int protocol;
char domain[KNI_DOMAIN_MAX];
int domain_len;
};
struct thread_tfe_data_receiver_args{
void *logger;
struct kni_marsio_handle *marsio_handle;
int thread_seq;
};
struct thread_tfe_cmsg_receiver_args{
void *logger;
char profile[KNI_SYMBOL_MAX];
};
struct kni_handle{
int http_project_id;
struct kni_marsio_handle *marsio_handle;
struct kni_maat_handle *maat_handle;
struct kni_send_logger *send_logger;
MESA_htable_handle traceid2pme_htable;
MESA_htable_handle keepalive_replay_htable;
int tfe_count;
int tfe_data_recv_thread_num;
uint32_t local_ipv4;
int keepalive_replay_switch;
void *local_logger;
};
struct traceid2pme_search_cb_args{
struct kni_cmsg *cmsg;
void *logger;
};
struct keepalive_replay_htable_value{
int has_replayed;
uint32_t first_data_len;
};
struct keepalive_replay_search_cb_args{
marsio_buff_t *rx_buff;
struct kni_marsio_handle *marsio_handle;
void *raw_packet;
addr_type_t addr_type;
int tfe_id;
int thread_seq;
};
static char* stream_errmsg_get(enum stream_error _errno){
switch(_errno){
case STREAM_ERROR_PENDING_NO_SYN:
return (char*)"penging not syn";
case STREAM_ERROR_SINGLE_DIR:
return (char*)"single dir";
case STREAM_ERROR_PROTOCOL_UNKNOWN:
return (char*)"protocol unknown";
case STREAM_ERROR_NO_SYN_ACK:
return (char*)"no syn/ack";
case STREAM_ERROR_INVALID_ACTION:
return (char*)"invalid aciton";
case STREAM_ERROR_NO_DATA:
return (char*)"no data";
case STREAM_ERROR_IPV4HDR_PARSE_FAIL:
return (char*)"ipv4 header parse fail";
case STREAM_ERROR_IPV6HDR_PARSE_FAIL:
return (char*)"ipv6 header parse fail";
case STREAM_ERROR_EXCEED_MTU:
return (char*)"exceed mtu(1500)";
default:
return (char*)"unknown error";
}
}
static void pme_info_destroy(void *data){
struct pme_info *pmeinfo = (struct pme_info *)data;
void *logger = g_kni_handle->local_logger;
if(pmeinfo != NULL){
//free client_tcpopt
if(pmeinfo->client_tcpopt != NULL){
FREE(&(pmeinfo->client_tcpopt));
}
//free server tcpopt
if(pmeinfo->server_tcpopt != NULL){
FREE(&(pmeinfo->server_tcpopt));
}
//free layer_addr
layer_addr_free(pmeinfo->addr);
pmeinfo->addr=NULL;
//free lock
pthread_mutex_destroy(&(pmeinfo->lock));
FREE(&pmeinfo);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_FREE], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_ERROR(logger, "Failed at pme_info_destroy, pmeinfo is null");
}
}
static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread_seq){
void *logger = g_kni_handle->local_logger;
struct pme_info* pmeinfo = ALLOC(struct pme_info, 1);
pmeinfo->addr_type = (enum addr_type_t)stream->addr.addrtype;
pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1;
uuid_t uu;
uuid_generate_random(uu);
uuid_unparse(uu, pmeinfo->stream_traceid);
pmeinfo->addr = layer_addr_dup(&(stream->addr));
clock_gettime(CLOCK_REALTIME, &(pmeinfo->start_time));
char stream_addr[KNI_SYMBOL_MAX] = "";
//init pme_lock
int ret = pthread_mutex_init(&(pmeinfo->lock), NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream_traceid is %s", pmeinfo->stream_traceid);
goto error_out;
}
kni_stream_addr_trans(&(stream->addr), pmeinfo->addr_type, stream_addr, sizeof(stream_addr));
KNI_LOG_INFO(logger, "stream addr is %s, stream traceid is %s", stream_addr, pmeinfo->stream_traceid);
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1);
return pmeinfo;
error_out:
pme_info_destroy(pmeinfo);
return NULL;
}
static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
//create cjson
cJSON *log_obj = cJSON_CreateObject();
//stream_traceid
cJSON_AddStringToObject(log_obj, "stream_traceid", pmeinfo->stream_traceid);
//policy_id
cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id);
//action
cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action);
//service
cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service);
//start_time
cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time.tv_sec);
//end_time
cJSON_AddNumberToObject(log_obj, "end_time", pmeinfo->end_time.tv_sec);
//con_duration_ms
cJSON_AddNumberToObject(log_obj, "con_duration_ms", (pmeinfo->end_time.tv_sec - pmeinfo->start_time.tv_sec) * 1000
+ (pmeinfo->end_time.tv_nsec - pmeinfo->start_time.tv_nsec) / 1000000);
//stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port
const struct layer_addr *addr = pmeinfo->addr;
char client_ip_str[INET6_ADDRSTRLEN] = "";
char server_ip_str[INET6_ADDRSTRLEN] = "";
switch(addr->addrtype){
case ADDR_TYPE_IPV4:
cJSON_AddNumberToObject(log_obj, "addr_type", 4);
inet_ntop(AF_INET, &addr->tuple4_v4->saddr, client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET, &addr->tuple4_v4->daddr, server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP");
break;
case ADDR_TYPE_IPV6:
cJSON_AddNumberToObject(log_obj, "addr_type", 6);
inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP");
break;
default:
break;
}
//entrance_id: 0
cJSON_AddNumberToObject(log_obj, "entrance_id", 0);
//device_id: 0
cJSON_AddNumberToObject(log_obj, "device_id", 0);
//link_id: 0
cJSON_AddNumberToObject(log_obj, "link_id", 0);
//isp: null
cJSON_AddStringToObject(log_obj, "isp", "");
//encap_type: from sapp, 先填0
cJSON_AddNumberToObject(log_obj, "encap_type", 0);
//pinning state: from tfe
cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst);
//intercept state: from tfe
cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state);
//ssl upstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency);
//ssl downstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency);
//ssl upstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version);
//ssl downstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version);
//ssl cert verify
cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify);
//direction: 0
cJSON_AddNumberToObject(log_obj, "direction", 0);
//stream_dir: from sapp
cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->dir);
//cap_ip: kni ip
char local_ipv4_str[INET6_ADDRSTRLEN];
inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str));
cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str);
//addr_list
cJSON_AddStringToObject(log_obj, "addr_list", "");
//host: http_only
if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){
cJSON_AddStringToObject(log_obj, "host", pmeinfo->domain.host);
}
//sni: ssl only
if(pmeinfo->protocol == KNI_PROTOCOL_SSL){
cJSON_AddStringToObject(log_obj, "sni", pmeinfo->domain.sni);
}
//c2s_pkt_num
cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts);
//s2c_pkt_num
cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->client_pkts);
//c2s_byte_num
cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->server_bytes);
//s2c_byte_num
cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->client_bytes);
int ret = -1;
char *log_msg = cJSON_PrintUnformatted(log_obj);
cJSON_Delete(log_obj);
if(log_msg == NULL){
KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid is %s", pmeinfo->stream_traceid);
goto error_out;
}
KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg);
ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at knisend_logger_sendlog, ret is %d, strem_traceid is %s",
ret, pmeinfo->stream_traceid);
goto error_out;
}
cJSON_free(log_msg);
return 0;
error_out:
if(log_msg != NULL){
cJSON_free(log_msg);
}
return -1;
}
static void judge_pme_destroy(struct pme_info *pmeinfo, int caller){
void *logger = g_kni_handle->local_logger;
if(pmeinfo != NULL){
void *logger = g_kni_handle->local_logger;
pthread_mutex_lock(&(pmeinfo->lock));
if(caller == CALLER_SAPP){
KNI_LOG_DEBUG(logger, "set sapp_release = 1, caller is %d, stream_trace_id is %s, thread id is %p",
caller, pmeinfo->stream_traceid, pthread_self());
pmeinfo->sapp_release = 1;
}
if(caller == CALLER_TFE){
KNI_LOG_DEBUG(logger, "set tfe_release = 1, caller is %d, stream_trace_id is %s, thread id is %p",
caller, pmeinfo->stream_traceid, pthread_self());
pmeinfo->tfe_release = 1;
}
if(pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){
//sendlog
int ret = sendlog_to_kafka(pmeinfo, logger);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at sendlog to kafka, stream traceid is %s", pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "Succeed sendlog to kafka, stream traceid is %s", pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
}
//only intercetp stream need del htable
if(pmeinfo->action == KNI_ACTION_INTERCEPT){
//del traceid2pme htable
int key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_traceid,
key_size, NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, key is %s, key_size is %d, ret is %d",
"traceid2pme_htable", pmeinfo->stream_traceid, key_size, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, key is %s, key_size is %d",
"traceid2pme_htable", pmeinfo->stream_traceid, key_size);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1);
}
//del keepalive_replay_htable
char stream_addr[KNI_SYMBOL_MAX] = "";
kni_stream_addr_trans((const layer_addr*)(pmeinfo->addr), pmeinfo->addr_type, stream_addr, sizeof(stream_addr));
//c2s
struct stream_tuple4_v4 *c2s_key_v4 = NULL;
struct stream_tuple4_v6 *c2s_key_v6 = NULL;
key_size = 0;
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
c2s_key_v6 = pmeinfo->addr->tuple4_v6;
key_size = sizeof(struct stream_tuple4_v6);
ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)c2s_key_v6, key_size, NULL);
}
else{
c2s_key_v4 = pmeinfo->addr->tuple4_v4;
key_size = sizeof(struct stream_tuple4_v4);
ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)c2s_key_v4, key_size, NULL);
}
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, stream addr is %s, dir is c2s, ret is %d",
"keepalive_replay_htable", stream_addr, ret);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, stream addr is %s, dir is c2s",
"keepalive_replay_htable", stream_addr);
}
//s2c
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
struct stream_tuple4_v6 s2c_key_v6;
memcpy(s2c_key_v6.saddr, c2s_key_v6->daddr, sizeof(s2c_key_v6.saddr));
memcpy(s2c_key_v6.daddr, c2s_key_v6->saddr, sizeof(s2c_key_v6.daddr));
s2c_key_v6.source = c2s_key_v6->dest;
s2c_key_v6.dest = c2s_key_v6->source;
key_size = sizeof(struct stream_tuple4_v6);
ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)&s2c_key_v6,
key_size, NULL);
}
else{
struct stream_tuple4_v4 s2c_key_v4;
s2c_key_v4.saddr = c2s_key_v4->daddr;
s2c_key_v4.daddr = c2s_key_v4->saddr;
s2c_key_v4.source = c2s_key_v4->dest;
s2c_key_v4.dest = c2s_key_v4->source;
key_size = sizeof(struct stream_tuple4_v4);
ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)&s2c_key_v4,
key_size, NULL);
}
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, stream addr is %s, dir is s2c, ret is %d",
"keepalive_replay_htable", stream_addr, ret);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, stream addr is %s, dir is s2c",
"keepalive_replay_htable", stream_addr);
}
}
//free pme
pme_info_destroy(pmeinfo);
return;
}
KNI_LOG_DEBUG(logger, "can not destroy pmeinfo, sapp_release = %d, tfe_release = %d", pmeinfo->sapp_release, pmeinfo->tfe_release);
pthread_mutex_unlock(&(pmeinfo->lock));
}
else{
KNI_LOG_ERROR(logger, "Failed at judge_pme_info, pmeinfo is null");
}
}
static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){
//http
struct http_project* project = (struct http_project*)project_req_get_struct(stream, g_kni_handle->http_project_id);
if(project != NULL){
result->protocol = KNI_PROTOCOL_HTTP;
result->domain_len = project->host_len;
strncpy(result->domain, project->host, strnlen(project->host, sizeof(result->domain) - 1));
return 0;
}
//ssl
enum chello_parse_result chello_status = CHELLO_PARSE_INVALID_FORMAT;
struct ssl_chello *chello = NULL;
chello = ssl_chello_parse((const unsigned char*)buf, len, &chello_status);
if(chello_status == CHELLO_PARSE_SUCCESS){
result->protocol = KNI_PROTOCOL_SSL;
if(chello->sni == NULL){
result->domain_len = 0;
}
else{
result->domain_len = strnlen(chello->sni, KNI_DOMAIN_MAX);
strncpy(result->domain, chello->sni, strnlen(chello->sni, sizeof(result->domain) - 1));
}
ssl_chello_free(chello);
return 0;
}
ssl_chello_free(chello);
result->protocol = KNI_PROTOCOL_UNKNOWN;
return 0;
}
static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value,
uint16_t size, char *stream_traceid){
void *logger = g_kni_handle->local_logger;
int ret = kni_cmsg_set(cmsg, type, value, size);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d, stream traceid is %s", type, stream_traceid);
}
return ret;
}
static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, struct pkt_info *pktinfo, uint16_t *len){
void *logger = g_kni_handle->local_logger;
uint16_t bufflen = 0, serialize_len = 0;
unsigned char *buff = NULL;
uint8_t protocol_type = pmeinfo->protocol == KNI_PROTOCOL_SSL ? 0x1 : 0x0;
struct kni_cmsg *cmsg = kni_cmsg_init();
int policy_id = -1;
char *trace_id = NULL;
uint32_t seq = pktinfo->tcphdr->seq;
uint32_t ack = pktinfo->tcphdr->ack_seq;
uint16_t client_mss = htons(pmeinfo->client_tcpopt->mss);
uint16_t server_mss = htons(pmeinfo->server_tcpopt->mss);
uint16_t client_window = htons(pmeinfo->client_window);
uint16_t server_window = htons(pmeinfo->server_window);
//seq
int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//ack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//client mss
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//server mss
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//client wscale
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//server wscale
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//client sack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//server sack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//client timestamp
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//server timestamp
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//protocol
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//client window
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//server window
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2, pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//maat policy id
policy_id = pmeinfo->policy_id;
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id), pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
//stream trace id
trace_id = pmeinfo->stream_traceid;
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id,
strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)), pmeinfo->stream_traceid);
if(ret < 0) goto error_out;
bufflen = kni_cmsg_serialize_size_get(cmsg);
buff = (unsigned char*)ALLOC(char, bufflen);
serialize_len = 0;
ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d, stream traceid is %s",
ret, pmeinfo->stream_traceid);
goto error_out;
}
*len = serialize_len;
kni_cmsg_destroy(cmsg);
return buff;
error_out:
kni_cmsg_destroy(cmsg);
return NULL;
}
static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktinfo, int *len){
//tcp option: kind 88, len 4, control_info_len
char *new_pkt = (char*)ALLOC(struct wrapped_packet, 1);
int offset = 0;
//iphdr
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
memcpy(new_pkt, (void*)pktinfo->iphdr.v6, pktinfo->iphdr_len);
}
else{
memcpy(new_pkt, (void*)pktinfo->iphdr.v4, pktinfo->iphdr_len);
}
offset += pktinfo->iphdr_len;
//tcphdr
struct tcphdr *tcphdr = (struct tcphdr*)(new_pkt + offset);
memcpy(new_pkt + offset, (void*)pktinfo->tcphdr, 20);
offset += 20;
tcphdr->doff = pktinfo->tcphdr->doff + 1;
struct tcp_option_restore *opt = ALLOC(struct tcp_option_restore, 1);
opt->kind = 88;
opt->len = 4;
opt->offset = htons(pktinfo->data_len);
memcpy(new_pkt + offset, (void*)opt, 4);
offset += 4;
memcpy(new_pkt + offset, (void*)((char*)pktinfo->tcphdr + 20), pktinfo->tcphdr_len - 20);
offset += pktinfo->tcphdr_len - 20;
//data
memcpy(new_pkt + offset, (void*)pktinfo->data, pktinfo->data_len);
offset += pktinfo->data_len;
//kni_cmsg_serialize_header
uint16_t header_len = 0;
unsigned char* header = kni_cmsg_serialize_header_new(pmeinfo, pktinfo, &header_len);
memcpy(new_pkt + offset, (void*)header, header_len);
offset += header_len;
FREE(&header);
//ipv6
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
kni_ipv6_header_parse((void*)new_pkt, pktinfo);
pktinfo->iphdr.v6->ip6_ctlun.ip6_un1.ip6_un1_plen = htons(offset - sizeof(ip6_hdr));
pktinfo->tcphdr->check = 0;
pktinfo->tcphdr->check = kni_tcp_checksum_v6((void*)pktinfo->tcphdr,
offset - pktinfo->iphdr_len, pktinfo->iphdr.v6->ip6_src, pktinfo->iphdr.v6->ip6_dst);
}
else{
struct iphdr *iphdr = (struct iphdr*)new_pkt;
iphdr->tot_len = htons(offset);
//must set check = 0
iphdr->check = 0;
iphdr->check = kni_ip_checksum((void*)iphdr, pktinfo->iphdr_len);
//tcphdr: checkdum
tcphdr->check = 0;
tcphdr->check = kni_tcp_checksum((void*)tcphdr, offset - pktinfo->iphdr_len, iphdr->saddr, iphdr->daddr);
}
*len = offset;
return new_pkt;
}
static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, uint16_t raw_len, int thread_seq, int tfe_id, addr_type_t addr_type){
void *logger = g_kni_handle->local_logger;
marsio_buff_t *tx_buffs[BURST_MAX];
struct mr_vdev *dev_eth_handler = handle->tfe_instance_list[tfe_id]->dev_eth_handler;
struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath;
char *src_mac = handle->src_mac_addr;
char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr;
//only send one packet, alloc_ret <= nr_send <= BURST_MAX
int nr_send = 1;
int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq);
if (alloc_ret < 0){
KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d",
alloc_ret, thread_seq);
return -1;
}
for(int i = 0; i < nr_send; i++){
char* dst_data = marsio_buff_append(tx_buffs[i], raw_len + 14);
//ethernet_header[14]
struct ethhdr *ether_hdr = (struct ethhdr*)dst_data;
memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest));
memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source));
if(addr_type == ADDR_TYPE_IPV6){
ether_hdr->h_proto = htons(ETH_P_IPV6);
}
else{
ether_hdr->h_proto = htons(ETH_P_IP);
}
memcpy((char*)dst_data + sizeof(*ether_hdr), raw_data, raw_len);
}
marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, nr_send);
return 0;
}
static int wrapped_kni_header_parse(const void *a_packet, struct pme_info *pmeinfo, struct pkt_info *pktinfo){
void *logger = g_kni_handle->local_logger;
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
int ret = kni_ipv6_header_parse(a_packet, pktinfo);
if(ret < 0){
char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, bypass and dropme, errmsg is %s, stream treaceid is %s",
errmsg, pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_IPV6HDR_PARSE_FAIL;
return -1;
}
}
else{
int ret = kni_ipv4_header_parse(a_packet, pktinfo);
if(ret < 0){
char *errmsg = kni_ipv4_errmsg_get((enum kni_ipv4hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "Failed at parse ipv4 header, bypass and dropme, errmsg is %s, stream treaceid is %s",
errmsg, pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_IPV4HDR_PARSE_FAIL;
return -1;
}
}
return 0;
}
static char pending_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet){
void *logger = g_kni_handle->local_logger;
struct pkt_info pktinfo;
int ret = wrapped_kni_header_parse(a_packet, pmeinfo, &pktinfo);
if(ret < 0){
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
if(!pktinfo.tcphdr->syn){
//pending_opstate not syn, bypass and dropme
KNI_LOG_DEBUG(logger, "pending opstate: not syn, stream traceid is %s", pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN_EXP], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_PENDING_NO_SYN;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
pmeinfo->client_window = pktinfo.tcphdr->window;
pmeinfo->client_tcpopt = kni_get_tcpopt(pktinfo.tcphdr, pktinfo.tcphdr_len);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
static int first_data_intercept(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, char *stream_addr, int thread_seq){
void *logger = g_kni_handle->local_logger;
struct keepalive_replay_htable_value *c2s_value = ALLOC(struct keepalive_replay_htable_value, 1);
c2s_value->first_data_len = pktinfo->data_len;
int key_size = 0, ret;
struct stream_tuple4_v4 *c2s_key_v4 = NULL;
struct stream_tuple4_v6 *c2s_key_v6 = NULL;
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
c2s_key_v6 = stream->addr.tuple4_v6;
key_size = sizeof(*c2s_key_v6);
ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)c2s_key_v6, key_size, (const void*)c2s_value);
}
else{
c2s_key_v4 = stream->addr.tuple4_v4;
key_size = sizeof(*c2s_key_v4);
ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)c2s_key_v4, key_size, (const void*)c2s_value);
}
if(ret < 0){
//tfe not release, sapp release but not expire, so the same stream can not add, bypass and dropme
if(ret != MESA_HTABLE_RET_DUP_ITEM){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table is keepalive_replay_htable, "
"dir is c2s, key is %s, key_size is %d, ret is %d", stream_addr, key_size, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL], 0, FS_OP_ADD, 1);
}
pmeinfo->error = STREAM_ERROR_DUP_STREAM;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at add, table is keepalive_replay_htable, "
"dir is c2s, key is %s, key_size is %d", stream_addr, key_size);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC], 0, FS_OP_ADD, 1);
}
//s2c
struct keepalive_replay_htable_value *s2c_value = ALLOC(struct keepalive_replay_htable_value, 1);
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
struct stream_tuple4_v6 s2c_key_v6;
key_size = sizeof(s2c_key_v6);
memcpy(s2c_key_v6.saddr, c2s_key_v6->daddr, sizeof(s2c_key_v6.saddr));
memcpy(s2c_key_v6.daddr, c2s_key_v6->saddr, sizeof(s2c_key_v6.daddr));
s2c_key_v6.source = c2s_key_v6->dest;
s2c_key_v6.dest = c2s_key_v6->source;
ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&s2c_key_v6),
key_size, (const void*)s2c_value);
}
else{
struct stream_tuple4_v4 s2c_key_v4;
key_size = sizeof(s2c_key_v4);
s2c_key_v4.saddr = c2s_key_v4->daddr;
s2c_key_v4.daddr = c2s_key_v4->saddr;
s2c_key_v4.source = c2s_key_v4->dest;
s2c_key_v4.dest = c2s_key_v4->source;
ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&s2c_key_v4),
key_size, (const void*)s2c_value);
}
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table is keepalive_replay_htable, "
"dir is s2c, key is %s, key_size is %d, ret is %d", stream_addr, key_size, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at add, table is keepalive_replay_htable, "
"dir is s2c, key is %s, key_size is %d", stream_addr, key_size);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC], 0, FS_OP_ADD, 1);
}
//only intercept: add to traceid2pme htable
key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid),
key_size, (const void*)pmeinfo);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: Failed at add,"
"table is traceid2pme_htable, key is %s, ret is %d", pmeinfo->stream_traceid, ret);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at add,"
"table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1);
}
//action = KNI_ACTION_INTERCEPT, sendto tfe
int len = 0;
char *buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len);
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id, pmeinfo->addr_type);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid);
}
else{
KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid);
}
FREE(&buf);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_STM], 0, FS_OP_ADD, 1);
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
}
static char data_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, const void *a_packet, int thread_seq){
//pmeinfo->tfe_release = 1: intercept, tfe end first. so droppkt and dropme
if(pmeinfo->tfe_release == 1){
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum;
pmeinfo->dir=stream->dir;
return APP_STATE_DROPPKT | APP_STATE_DROPME;
}
void *logger = g_kni_handle->local_logger;
struct iphdr *ipv4_hdr = NULL;
struct ip6_hdr *ipv6_hdr = NULL;
uint16_t len = 0, ret;
char stream_addr[KNI_SYMBOL_MAX] = "";
kni_stream_addr_trans(&(stream->addr), pmeinfo->addr_type, stream_addr, sizeof(stream_addr));
//pmeinfo->action has only 3 value: KNI_ACTION_NONE KNI_ACTION_INTERCEPT KNI_ACTION_BYPASS
switch (pmeinfo->action){
case KNI_ACTION_NONE:
break;
case KNI_ACTION_INTERCEPT:
if(pmeinfo->addr_type == ADDR_TYPE_IPV6){
ipv6_hdr = (struct ip6_hdr*)a_packet;
len = ntohs(ipv6_hdr->ip6_ctlun.ip6_un1.ip6_un1_plen) + sizeof(struct ip6_hdr);
}
else{
ipv4_hdr = (struct iphdr*)a_packet;
len = ntohs(ipv4_hdr->tot_len);
}
ret = send_to_tfe(g_kni_handle->marsio_handle, (char*)a_packet, len, thread_seq, pmeinfo->tfe_id, pmeinfo->addr_type);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid);
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
default:
assert(0);
break;
}
//parse ipv4/6 header
struct pkt_info pktinfo;
ret = wrapped_kni_header_parse(a_packet, pmeinfo, &pktinfo);
if(ret < 0){
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//first data > 1500, bypass and dropme
if(pktinfo.ip_totlen > KNI_DEFAULT_MTU){
pmeinfo->error = STREAM_ERROR_EXCEED_MTU;
KNI_LOG_ERROR(logger, "first data packet exceed MTU(1500), bypass and dropme");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_EXCEED_MTU], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
// syn/ack
if(pktinfo.tcphdr->syn && pktinfo.tcphdr->ack){
pmeinfo->server_window = pktinfo.tcphdr->window;
pmeinfo->server_tcpopt = kni_get_tcpopt(pktinfo.tcphdr, pktinfo.tcphdr_len);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
//no data, maybe ack
if(pktinfo.data_len <= 0){
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
//not double dir, bypass and dropme
if(stream->dir != DIR_DOUBLE){
KNI_LOG_INFO(logger, "dir is %d, bypass, stream addr is %s", stream->dir, stream_addr);
pmeinfo->error = STREAM_ERROR_SINGLE_DIR;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
struct protocol_identify_result protocol_identify_res;
memset(&protocol_identify_res, 0, sizeof(protocol_identify_res));
protocol_identify(stream, pktinfo.data, pktinfo.data_len, &protocol_identify_res);
pmeinfo->protocol = protocol_identify_res.protocol;
switch(pmeinfo->protocol){
//can not identify protocol from first data packet, bypass and dropme
case KNI_PROTOCOL_UNKNOWN:
KNI_LOG_INFO(logger, "Failed at protocol_identify, bypass and dropme, stream addr is %s\n",
pmeinfo->protocol, stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STM], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_PROTOCOL_UNKNOWN;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
case KNI_PROTOCOL_SSL:
strncpy(pmeinfo->domain.sni, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->domain.sni) - 1));
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SSL_STM], 0, FS_OP_ADD, 1);
break;
case KNI_PROTOCOL_HTTP:
strncpy(pmeinfo->domain.host, protocol_identify_res.domain, strnlen(protocol_identify_res.domain, sizeof(pmeinfo->domain.host) - 1));
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_HTTP_STM], 0, FS_OP_ADD, 1);
break;
default:
break;
}
pmeinfo->action = intercept_policy_scan(g_kni_handle->maat_handle, (struct ipaddr*)(&stream->addr),
protocol_identify_res.domain, protocol_identify_res.domain_len,
thread_seq, &(pmeinfo->policy_id), &(pmeinfo->maat_hit));
//policy scan log
char *action_str = kni_maat_action_trans(pmeinfo->action);
KNI_LOG_DEBUG(logger, "intercept_policy_scan: %s, %s, policy_id = %d, action = %d(%s), maat_hit = %d, stream traceid is %s",
stream_addr, protocol_identify_res.domain, pmeinfo->policy_id, pmeinfo->action, action_str, pmeinfo->maat_hit, pmeinfo->stream_traceid);
//receive client hello, but no syn/ack, bypass and dropme
if(pmeinfo->client_tcpopt == NULL || pmeinfo->server_tcpopt == NULL){
KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s, stream traceid is %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "have syn",
pmeinfo->server_tcpopt == NULL ? "no syn/ack" : "have syn/ack", pmeinfo->stream_traceid);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA_EXP], 0, FS_OP_ADD, 1);
pmeinfo->error = STREAM_ERROR_NO_SYN_ACK;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
switch(pmeinfo->action){
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
case KNI_ACTION_INTERCEPT:
return first_data_intercept(stream, pmeinfo, &pktinfo, stream_addr, thread_seq);
default:
//action != intercept && action != bypassbypass and dropme
KNI_LOG_ERROR(logger, "Action %d(%s) is invalid, bypass(dropme): policy_id is %d, stream addr is %s, domain is ",
pmeinfo->action, action_str, pmeinfo->policy_id, stream_addr, protocol_identify_res.domain);
pmeinfo->error = STREAM_ERROR_INVALID_ACTION;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, int thread_seq){
//close: a_packet = null, do not sendto tfe
clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time));
void *logger = g_kni_handle->local_logger;
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
pmeinfo->client_pkts=stream->ptcpdetail->clientpktnum;
pmeinfo->dir=stream->dir;
switch(pmeinfo->action){
case KNI_ACTION_INTERCEPT:
//reset clock: when sapp end, start clock
MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_traceid,
strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)));
return APP_STATE_DROPPKT | APP_STATE_DROPME;
case KNI_ACTION_BYPASS:
KNI_LOG_DEBUG(logger, "action is bypass, set tfe_release = 1, stream_trace_id is %s", pmeinfo->stream_traceid);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
//stream has only syn, ack. no data.
default:
char *action_str = kni_maat_action_trans(pmeinfo->action);
pmeinfo->error = STREAM_ERROR_NO_DATA;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STM_NO_DATA], 0, FS_OP_ADD, 1);
KNI_LOG_DEBUG(logger, "close_opstate: action %d(%s) is abnormal, stream_traceid is %s",
pmeinfo->action, action_str, pmeinfo->stream_traceid);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
//from syn
extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){
void *logger = g_kni_handle->local_logger;
int ret;
struct pme_info *pmeinfo = *(struct pme_info **)pme;
//TODO: ipv6
if(stream->addr.addrtype == ADDR_TYPE_IPV6){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_PKT], 0, FS_OP_ADD, 1);
//return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
/* a_packet == NULL && not op_state_close, continue
close: a_packet may be null, if a_packet = null, do not send to tfe
*/
if(a_packet == NULL && stream->pktstate != OP_STATE_CLOSE){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
switch(stream->pktstate){
case OP_STATE_PENDING:
*pme = pmeinfo = pme_info_new(stream, thread_seq);
if(pmeinfo == NULL){
KNI_LOG_ERROR(logger, "Failed at new pmeinfo");
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW], 0, FS_OP_ADD, 1);
ret = pending_opstate(stream, pmeinfo, a_packet);
if(pmeinfo->error < 0){
goto error_out;
}
break;
case OP_STATE_DATA:
ret = data_opstate(stream, pmeinfo, a_packet, thread_seq);
//exception stream, dropme and destroy pmeinfo
if(pmeinfo->error < 0){
goto error_out;
}
break;
case OP_STATE_CLOSE:
//sapp stream close
ret = close_opstate(stream, pmeinfo, thread_seq);
if(pmeinfo->error < 0){
goto error_out;
}
break;
default:
ret = APP_STATE_FAWPKT | APP_STATE_GIVEME;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "Unknown stream opstate %d, stream traceid is %s", stream->pktstate, pmeinfo->stream_traceid);
break;
}
//sapp release: bypass or intercept
if((ret & APP_STATE_DROPME)){
judge_pme_destroy(pmeinfo, CALLER_SAPP);
}
return ret;
//error out: no hash, no sendlog, just destroy_pme
error_out:
char *stream_errmsg = stream_errmsg_get(pmeinfo->error);
KNI_LOG_DEBUG(logger, "stream error is %s, bypass and dropme, stream traceid is %s", stream_errmsg, pmeinfo->stream_traceid);
if(pmeinfo != NULL){
pme_info_destroy(pmeinfo);
}
return ret;
}
void http_project_free(int thread_seq, void *project_req_value){
FREE(&project_req_value);
}
static int http_project_init(){
void *logger = g_kni_handle->local_logger;
int id = project_producer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT, http_project_free);
if(id < 0){
KNI_LOG_ERROR(logger, "Failed at project_producer_register, project name is %s, ret is %d", HTTP_PROJECT_NAME, id);
return -1;
}
id = project_customer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT);
if(id < 0){
KNI_LOG_ERROR(logger, "Failed at project_customer_register, project name is %s, ret is %d", HTTP_PROJECT_NAME, id);
return -1;
}
return id;
}
extern "C" char kni_http_entry(stSessionInfo* session_info, void **pme, int thread_seq, struct streaminfo *a_stream, const void *a_packet){
http_infor* http_info = (http_infor*)(session_info->app_info);
//only process first http session
if(http_info->http_session_seq != 1){
return PROT_STATE_DROPME;
}
if(session_info->prot_flag != HTTP_HOST){
return PROT_STATE_GIVEME;
}
int host_len = MIN(session_info->buflen, KNI_DEFAULT_MTU);
struct http_project* host_info = ALLOC(struct http_project, 1);
host_info->host_len = host_len;
memcpy(host_info->host, session_info->buf, host_len);
if(project_req_add_struct(a_stream, g_kni_handle->http_project_id, host_info) < 0){
FREE(&host_info);
host_info = NULL;
}
return PROT_STATE_DROPME;
}
static void kni_marsio_destroy(struct kni_marsio_handle *handle){
if(handle != NULL){
if(handle->instance != NULL){
marsio_destory(handle->instance);
}
for(int i = 0; i < TFE_COUNT_MAX; i++){
FREE(&handle->tfe_instance_list[i]);
}
}
FREE(&handle);
handle = NULL;
}
static void sendto_vxlan(marsio_buff_t *rx_buff, struct mr_sendpath *dev_vxlan_sendpath, int thread_seq){
//tag
struct mr_tunnat_ctrlzone mr_ctrlzone;
memset(&mr_ctrlzone, 0, sizeof(mr_ctrlzone));
mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER);
marsio_buff_ctrlzone_set(rx_buff, 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone));
//send to vxlan, vxlan handler: recv: 0, send: 1, nr_burst must be 1
int nr_burst = 1;
marsio_send_burst_with_options(dev_vxlan_sendpath, thread_seq, &rx_buff, nr_burst, MARSIO_SEND_OPT_FAST);
}
static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, void *user_args){
void *logger = g_kni_handle->local_logger;
struct keepalive_replay_search_cb_args *args = (struct keepalive_replay_search_cb_args*)user_args;
struct kni_marsio_handle *marsio_handle = args->marsio_handle;
marsio_buff_t *rx_buff = args->rx_buff;
int tfe_id = args->tfe_id;
int thread_seq = args->thread_seq;
if(data == NULL){
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq);
return 0;
}
struct keepalive_replay_htable_value *value = (struct keepalive_replay_htable_value*)data;
if(value->has_replayed == 1){
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq);
return 0;
}
//a_packet: window update
void *raw_packet = args->raw_packet;
char *replay_packet = NULL;
uint16_t tot_len = 0;
//ipv6
if(args->addr_type == ADDR_TYPE_IPV6){
struct pkt_info raw_pktinfo;
int ret = kni_ipv6_header_parse(raw_packet, &raw_pktinfo);
if(ret < 0){
char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, send to vxlan, errmsg is %s", errmsg);
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq);
return 0;
}
tot_len = raw_pktinfo.ip_totlen;
replay_packet = ALLOC(char, tot_len);
memcpy(replay_packet, raw_packet, tot_len);
struct pkt_info replay_pktinfo;
kni_ipv6_header_parse(replay_packet, &replay_pktinfo);
replay_pktinfo.iphdr.v6->ip6_src = raw_pktinfo.iphdr.v6->ip6_dst;
replay_pktinfo.iphdr.v6->ip6_dst = raw_pktinfo.iphdr.v6->ip6_src;
replay_pktinfo.tcphdr->source = raw_pktinfo.tcphdr->dest;
replay_pktinfo.tcphdr->dest = raw_pktinfo.tcphdr->source;
replay_pktinfo.tcphdr->seq = htonl(ntohl(raw_pktinfo.tcphdr->ack_seq) + value->first_data_len);
replay_pktinfo.tcphdr->ack_seq = htonl(ntohl(raw_pktinfo.tcphdr->seq) + 1);
replay_pktinfo.tcphdr->check = 0;
replay_pktinfo.tcphdr->check = kni_tcp_checksum_v6((void*)replay_pktinfo.tcphdr,
tot_len - replay_pktinfo.iphdr_len, replay_pktinfo.iphdr.v6->ip6_src, replay_pktinfo.iphdr.v6->ip6_dst);
}
//ipv4
else{
struct iphdr *raw_packet_iphdr = (struct iphdr*)raw_packet;
tot_len = ntohs(raw_packet_iphdr->tot_len);
uint16_t iphdr_len = raw_packet_iphdr->ihl * 4;
struct tcphdr *raw_packet_tcphdr = (struct tcphdr*)((char*)raw_packet_iphdr + iphdr_len);
//replay packet
replay_packet = ALLOC(char, tot_len);
memcpy(replay_packet, raw_packet, tot_len);
struct iphdr *replay_packet_iphdr = (struct iphdr*)replay_packet;
struct tcphdr *replay_packet_tcphdr = (struct tcphdr*)((char*)replay_packet_iphdr + iphdr_len);
replay_packet_iphdr->saddr = raw_packet_iphdr->daddr;
replay_packet_iphdr->daddr = raw_packet_iphdr->saddr;
replay_packet_tcphdr->source = raw_packet_tcphdr->dest;
replay_packet_tcphdr->dest = raw_packet_tcphdr->source;
replay_packet_tcphdr->seq = htonl(ntohl(raw_packet_tcphdr->ack_seq) + value->first_data_len); //seq = ack + first_data_len
replay_packet_tcphdr->ack_seq = htonl(ntohl(raw_packet_tcphdr->seq) + 1); //ack = seq + 1
replay_packet_iphdr->check = 0;
replay_packet_iphdr->check = kni_ip_checksum((void*)replay_packet_iphdr, iphdr_len);
replay_packet_tcphdr->check = 0;
replay_packet_tcphdr->check = kni_tcp_checksum((void*)replay_packet_tcphdr, tot_len - iphdr_len,
replay_packet_iphdr->saddr, replay_packet_iphdr->daddr);
}
//send to tfe: thread_seq = g_iThreadNum
int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum + thread_seq, tfe_id, args->addr_type);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send keepalive replay packet to tfe");
}
value->has_replayed = 1;
marsio_buff_free(marsio_handle->instance, &rx_buff, 1, 0, 0);
FREE(&replay_packet);
return 0;
}
void* thread_tfe_data_receiver(void *args){
void *logger = g_kni_handle->local_logger;
struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args;
struct kni_marsio_handle *marsio_handle = _args->marsio_handle;
int thread_seq = _args->thread_seq;
FREE(&args);
while(true){
//polling tfe
for(int i = 0; i < g_kni_handle->tfe_count; i++){
marsio_buff_t *rx_buffs[BURST_MAX];
int nr_burst = 1;
struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[i]->dev_eth_handler;
//receive from tfe, nr_recv <= nr_burst <= BURST_MAX
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst);
if(nr_recv <= 0){
continue;
}
if(g_kni_handle->keepalive_replay_switch == 1){
for(int i = 0; i < nr_recv; i++){
struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]);
if(ether_hdr->h_proto == htons(ETH_P_IP) || ether_hdr->h_proto == htons(ETH_P_IPV6)){
void *raw_packet = (char*)ether_hdr + sizeof(*ether_hdr);
long cb_ret = -1;
keepalive_replay_search_cb_args cb_args;
memset(&cb_args, 0, sizeof(cb_args));
cb_args.rx_buff = rx_buffs[i];
cb_args.marsio_handle = marsio_handle;
cb_args.tfe_id = i;
cb_args.thread_seq = thread_seq;
//ipv4
if(ether_hdr->h_proto == htons(ETH_P_IP)){
struct iphdr *iphdr = (struct iphdr*)raw_packet;
uint16_t iphdr_len = iphdr->ihl * 4;
struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len);
struct stream_tuple4_v4 key;
key.saddr = iphdr->saddr;
key.daddr = iphdr->daddr;
key.source = tcphdr->source;
key.dest = tcphdr->dest;
cb_args.addr_type = ADDR_TYPE_IPV4;
cb_args.raw_packet = raw_packet;
MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key),
sizeof(key), keepalive_replay_search_cb, &cb_args, &cb_ret);
}
//ipv6
else{
void *a_packet = (char*)ether_hdr + sizeof(*ether_hdr);
struct pkt_info pktinfo;
int ret = kni_ipv6_header_parse(a_packet, &pktinfo);
if(ret < 0){
char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret);
KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, send to vxlan, errmsg is %s", errmsg);
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq);
}
else{
struct stream_tuple4_v6 key;
memcpy(key.saddr, &(pktinfo.iphdr.v6->ip6_src), sizeof(*(key.saddr)));
memcpy(key.daddr, &(pktinfo.iphdr.v6->ip6_dst), sizeof(*(key.daddr)));
key.source = pktinfo.tcphdr->source;
key.dest = pktinfo.tcphdr->dest;
cb_args.addr_type = ADDR_TYPE_IPV6;
cb_args.raw_packet = raw_packet;
MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key),
sizeof(key), keepalive_replay_search_cb, &cb_args, &cb_ret);
}
}
}
else{
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq);
}
}
}
else{
for(int i = 0; i < nr_recv; i++){
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq);
}
}
}
}
return NULL;
}
static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type,
uint16_t value_size_max, void *logger){
uint16_t value_size = 0;
unsigned char *value = NULL;
int ret = kni_cmsg_get(cmsg, type, &value_size, &value);
if(ret < 0){
if(ret == KNI_CMSG_INVALID_TYPE){
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d, stream traceid is %s",
type, ret, pmeinfo->stream_traceid);
}
return -1;
}
if(value_size > value_size_max){
KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %d, size is %d, which should <= %d, stream traceid is %s",
type, value_size, value_size_max, pmeinfo->stream_traceid);
return -1;
}
switch(type)
{
case TFE_CMSG_SSL_INTERCEPT_STATE:
memcpy((char*)&(pmeinfo->intercept_state), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size);
break;
case TFE_CMSG_SSL_DOWNSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_client_side_latency), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_VERSION:
memcpy(pmeinfo->ssl_server_side_version, value, value_size);
break;
case TFE_CMSG_SSL_DOWNSTREAM_VERSION:
memcpy(pmeinfo->ssl_client_side_version, value, value_size);
break;
case TFE_CMSG_SSL_PINNING_STATE:
memcpy((char*)&(pmeinfo->pinningst), value, value_size);
break;
case TFE_CMSG_SSL_CERT_VERIFY:
memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size);
break;
case TFE_CMSG_SSL_ERROR:
memcpy((char*)&(pmeinfo->ssl_error), value, value_size);
break;
default:
break;
}
return 0;
}
static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size, void *user_args){
struct traceid2pme_search_cb_args *args = (struct traceid2pme_search_cb_args*)user_args;
void *logger = args->logger;
struct kni_cmsg *cmsg = args->cmsg;
struct pme_info *pmeinfo = (struct pme_info*)data;
if(pmeinfo != NULL){
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger);
clock_gettime(CLOCK_REALTIME, &(pmeinfo->end_time));
KNI_LOG_INFO(logger, "recv cmsg from tfe, stream traceid is %s", pmeinfo->stream_traceid);
judge_pme_destroy(pmeinfo, CALLER_TFE);
}
kni_cmsg_destroy(cmsg);
return 0;
}
void* thread_tfe_cmsg_receiver(void *args){
struct thread_tfe_cmsg_receiver_args *_args = (struct thread_tfe_cmsg_receiver_args*)args;
const char *profile = _args->profile;
const char *section = "tfe_cmsg_receiver";
void *logger = _args->logger;
char listen_eth[INET_ADDRSTRLEN];
uint32_t listen_ip;
int listen_port = -1;
char buff[KNI_MTU];
int sockfd;
struct sockaddr_in server_addr, client_addr;
int ret = MESA_load_profile_string_nodef(profile, section, "listen_eth", listen_eth, sizeof(listen_eth));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: listen_eth not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "listen_port", &listen_port);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: listen_port not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d",
section, listen_eth, listen_port);
FREE(&args);
//create socket
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at create udp socket, errno is %d, %s", errno, strerror(errno));
goto error_out;
}
memset(&server_addr, 0, sizeof(server_addr));
memset(&client_addr, 0, sizeof(client_addr));
ret = kni_ipv4_addr_get_by_eth(listen_eth, &listen_ip);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", listen_eth);
goto error_out;
}
server_addr.sin_family = AF_INET; // IPv4
server_addr.sin_addr.s_addr = listen_ip;
server_addr.sin_port = htons(listen_port);
//bind
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at bind udp socket, errno is %d, %s", errno, strerror(errno));
goto error_out;
}
//receive
while(true){
socklen_t client_len = sizeof(client_addr);
int recv_len = recvfrom(sockfd, (char *)buff, sizeof(buff), MSG_WAITALL,
(struct sockaddr*)&client_addr, &client_len);
if(recv_len < 0){
KNI_LOG_ERROR(logger, "Failed at recv udp data, errno is %d, %s", errno, strerror(errno));
continue;
}
KNI_LOG_DEBUG(logger, "recv udp data: recv_len is %d\n", recv_len);
struct kni_cmsg *cmsg = NULL;
ret = kni_cmsg_deserialize((const unsigned char*)buff, recv_len, &cmsg);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret is %d", ret);
continue;
}
//get stream_traceid
unsigned char *stream_traceid = NULL;
uint16_t value_size;
ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_traceid);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", TFE_CMSG_STREAM_TRACE_ID, ret);
continue;
}
//get pme
long cb_ret = -1;
struct traceid2pme_search_cb_args cb_args;
memset((void*)&cb_args, 0, sizeof(cb_args));
cb_args.cmsg = cmsg;
cb_args.logger = logger;
MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_traceid,
value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret);
}
return NULL;
error_out:
if(sockfd >= 0){
close(sockfd);
}
return NULL;
}
static struct kni_marsio_handle* kni_marsio_init(const char* profile){
void *logger = g_kni_handle->local_logger;
const char* section = "marsio";
char appsym[KNI_SYMBOL_MAX];
char dev_vxlan_symbol[KNI_SYMBOL_MAX];
char src_mac_addr_str[KNI_SYMBOL_MAX];
unsigned int opt_value = 1;
int tfe_count;
struct mr_instance *mr_inst = NULL;
struct mr_vdev *dev_vxlan_handler = NULL;
struct mr_sendpath *dev_vxlan_sendpath = NULL;
struct mr_vdev *dev_eth_handler = NULL;
struct mr_sendpath *dev_eth_sendpath = NULL;
struct tfe_instance *tfe_inst = NULL;
struct kni_marsio_handle *handle = NULL;
int ret = MESA_load_profile_string_nodef(profile, section, "appsym", appsym, sizeof(appsym));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: appsym not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "dev_vxlan_symbol", dev_vxlan_symbol, sizeof(dev_vxlan_symbol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: dev_vxlan_symbol not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "src_mac_addr", src_mac_addr_str, sizeof(src_mac_addr_str));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n appsym: %s\n dev_vxlan_symbol: %s\n src_mac_addr: %s",
section, appsym, dev_vxlan_symbol, src_mac_addr_str);
mr_inst = marsio_create();
if(mr_inst == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio instance");
goto error_out;
}
handle = ALLOC(struct kni_marsio_handle, 1);
handle->instance = mr_inst;
ret = sscanf(src_mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
&(handle->src_mac_addr[0]), &(handle->src_mac_addr[1]),
&(handle->src_mac_addr[2]), &(handle->src_mac_addr[3]),
&(handle->src_mac_addr[4]), &(handle->src_mac_addr[5]));
if(ret != 6){
KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, section);
goto error_out;
}
marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value));
marsio_init(mr_inst, appsym);
tfe_count = g_kni_handle->tfe_count;
for(int i = 0; i < tfe_count; i++){
//load tfe conf
char _section[KNI_SYMBOL_MAX];
char mac_addr_str[KNI_SYMBOL_MAX];
char dev_eth_symbol[KNI_SYMBOL_MAX];
snprintf(_section, sizeof(_section), "tfe%d", i);
int ret = MESA_load_profile_string_nodef(profile, _section, "mac_addr", mac_addr_str, sizeof(mac_addr_str));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr not set, profile is %s, section is %s", profile, _section);
goto error_out;
}
tfe_inst = ALLOC(struct tfe_instance, 1);
//ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa
ret = sscanf(mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
&tfe_inst->mac_addr[0], &tfe_inst->mac_addr[1],
&tfe_inst->mac_addr[2], &tfe_inst->mac_addr[3],
&tfe_inst->mac_addr[4], &tfe_inst->mac_addr[5]);
if(ret != 6){
KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, _section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, _section, "dev_eth_symbol", dev_eth_symbol, sizeof(dev_eth_symbol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile is %s, section is %s", profile, _section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s",
_section, mac_addr_str, dev_eth_symbol);
//eth_handler receive thread = tfe_data_recv_thread_num, send thread = g_iThreadNum + tfe_data_recv_thread_num
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->tfe_data_recv_thread_num, g_iThreadNum + g_kni_handle->tfe_data_recv_thread_num);
if(dev_eth_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol);
goto error_out;
}
//sendpath
dev_eth_sendpath = marsio_sendpath_create_by_vdev(dev_eth_handler);
if(dev_eth_sendpath == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_eth_symbol);
goto error_out;
}
//tfe_instance
tfe_inst->dev_eth_handler = dev_eth_handler;
tfe_inst->dev_eth_sendpath = dev_eth_sendpath;
handle->tfe_instance_list[i] = tfe_inst;
}
//vxlan_handler: receive: 0, send: tfe_count
dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, g_kni_handle->tfe_data_recv_thread_num);
if(dev_vxlan_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol);
goto error_out;
}
handle->dev_vxlan_handler = dev_vxlan_handler;
//vxlan sendpath
dev_vxlan_sendpath = marsio_sendpath_create_by_vdev(dev_vxlan_handler);
if(dev_eth_sendpath == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_vxlan_symbol);
goto error_out;
}
handle->dev_vxlan_sendpath = dev_vxlan_sendpath;
//marsio_thread_init(mr_instance);
return handle;
error_out:
kni_marsio_destroy(handle);
return NULL;
}
static void fs_destroy(struct kni_field_stat_handle *fs_handle){
if(fs_handle != NULL){
FS_stop(&(fs_handle->handle));
}
FREE(&fs_handle);
}
static struct kni_field_stat_handle * fs_init(const char *profile){
void *logger = g_kni_handle->local_logger;
const char *section = "field_stat";
char stat_path[KNI_PATH_MAX];
struct kni_field_stat_handle *fs_handle = NULL;
screen_stat_handle_t handle = NULL;
const char *app_name = "fs2_kni";
int value = 0;
int ret = MESA_load_profile_string_nodef(profile, section, "stat_path", stat_path, sizeof(stat_path));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: stat_path not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n stat_path: %s\n", "field_stat", stat_path);
handle = FS_create_handle();
if(handle == NULL){
KNI_LOG_ERROR(logger, "Failed at create FS_create_handle");
goto error_out;
}
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
fs_handle->handle = handle;
FS_set_para(handle, APP_NAME, app_name, strlen(app_name) + 1);
FS_set_para(handle, OUTPUT_DEVICE, stat_path, strlen(stat_path)+1);
value = 0;
FS_set_para(handle, FLUSH_BY_DATE, &value, sizeof(value));
value = 1;
FS_set_para(handle, PRINT_MODE, &value, sizeof(value));
value = 1;
FS_set_para(handle, CREATE_THREAD, &value, sizeof(value));
value = 5;
FS_set_para(handle, STAT_CYCLE, &value, sizeof(value));
value = 4096;
FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
fs_handle->handle = handle;
//fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt");
fs_handle->fields[KNI_FIELD_BYP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_pkt");
fs_handle->fields[KNI_FIELD_INTCP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_pkt");
fs_handle->fields[KNI_FIELD_IPV6_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_pkt");
fs_handle->fields[KNI_FIELD_NULL_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "null_pkt");
fs_handle->fields[KNI_FIELD_NO_SYN_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_syn_pkt");
fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_state");
fs_handle->fields[KNI_FIELD_NO_SA_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_s/a_pkt");
//fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm");
fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm");
fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm");
fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm");
fs_handle->fields[KNI_FIELD_HTTP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "http_stm");
fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_succ");
fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_fail");
fs_handle->fields[KNI_FIELD_UNKNOWN_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_stm");
fs_handle->fields[KNI_FIELD_STM_NO_DATA] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "stm_no_data");
fs_handle->fields[KNI_FIELD_PME_NEW] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_new");
fs_handle->fields[KNI_FIELD_PME_FREE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_free");
fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_succ");
fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_fail");
fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_succ");
fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_fail");
fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv4hdr_parse_fail");
fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6hdr_parse_fail");
fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_fail");
fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_succ");
fs_handle->fields[KNI_FIELD_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "exceed_mtu");
fs_handle->handle = handle;
FS_start(handle);
return fs_handle;
error_out:
fs_destroy(fs_handle);
return NULL;
}
extern "C" void kni_destroy(struct kni_handle *handle){
if(handle != NULL){
}
FREE(&handle);
handle = NULL;
}
//eliminate_type: 0:FIFO; 1:LRU
//ret: 1: the item can be eliminated; 0: the item can't be eliminated
static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
struct pme_info *pmeinfo = (struct pme_info*)data;
if(pmeinfo->sapp_release == 1){
judge_pme_destroy(pmeinfo, CALLER_TFE);
}
return 0;
}
static void keepalive_replay_data_free_cb(void *data)
{
FREE(&data);
}
extern "C" int kni_init(){
const char *profile = "./conf/kni/kni.conf";
const char *section = "global";
//init logger
char log_path[KNI_PATH_MAX] = "";
int tfe_count = 0;
int tfe_data_recv_thread_num = -1;
char local_eth[KNI_SYMBOL_MAX] = "";
struct kni_send_logger *send_logger = NULL;
struct kni_field_stat_handle *fs_handle = NULL;
int id = -1;
void *local_logger = NULL;
int log_level = -1;
pthread_t thread_id = -1;
int keepalive_replay_switch = -1;
struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args;
MESA_htable_handle traceid2pme_htable = NULL, keepalive_replay_htable = NULL;
int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path));
if(ret < 0){
printf("MESA_prof_load: log_path not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "log_level", &log_level);
if(ret < 0){
printf("MESA_prof_load: log_level not set, profile is %s, section is %s", profile, section);
goto error_out;
}
local_logger = MESA_create_runtime_log_handle(log_path, log_level);
if (unlikely(local_logger == NULL)){
printf("Failed at create logger: %s", log_path);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "tfe_count", &tfe_count);
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section);
goto error_out;
}
if(tfe_count > TFE_COUNT_MAX){
KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX);
goto error_out;
}
if(tfe_count <= 0){
KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count);
goto error_out;
}
ret = MESA_load_profile_int_def(profile, section, "tfe_data_recv_thread_num", &tfe_data_recv_thread_num, 1);
ret = MESA_load_profile_string_nodef(profile, section, "local_eth", local_eth, sizeof(local_eth));
if(ret < 0){
printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_def(profile, section, "keepalive_replay_switch", &keepalive_replay_switch, 1);
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n"
"tfe_data_recv_thread_num: %d\n local_eth: %s\n keepalive_replay_switch: %d",
section, log_path, log_level, tfe_count, tfe_data_recv_thread_num, local_eth, keepalive_replay_switch);
g_kni_handle = ALLOC(struct kni_handle, 1);
g_kni_handle->local_logger = local_logger;
g_kni_handle->tfe_count = tfe_count;
g_kni_handle->tfe_data_recv_thread_num = tfe_data_recv_thread_num;
g_kni_handle->keepalive_replay_switch = keepalive_replay_switch;
//init http_project
id = http_project_init();
if(id < 0){
KNI_LOG_ERROR(local_logger, "Failed at init http project, ret is %d", id);
goto error_out;
}
g_kni_handle->http_project_id = id;
//init marsio
g_kni_handle->marsio_handle = kni_marsio_init(profile);
if(g_kni_handle->marsio_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init marsio");
goto error_out;
}
//init maat
g_kni_handle->maat_handle = kni_maat_init(profile, local_logger);
if(g_kni_handle->maat_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init maat");
goto error_out;
}
//init_filedstat
fs_handle = fs_init(profile);
if(fs_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init field_stat");
goto error_out;
}
g_kni_fs_handle = fs_handle;
//init local_ipv4
ret = kni_ipv4_addr_get_by_eth(local_eth, &(g_kni_handle->local_ipv4));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth is %s", local_eth);
goto error_out;
}
//init kni_send_logger
send_logger = kni_send_logger_init(profile, local_logger);
if(send_logger == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", local_eth);
goto error_out;
}
g_kni_handle->send_logger = send_logger;
//init traceid2pme_htable
traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", NULL,
(void*)traceid2pme_htable_expire_notify_cb, local_logger);
if(traceid2pme_htable == NULL){
KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable");
goto error_out;
}
g_kni_handle->traceid2pme_htable = traceid2pme_htable;
//init keepalive_replay_htable
if(g_kni_handle->keepalive_replay_switch == 1){
keepalive_replay_htable = kni_create_htable(profile, "keepalive_replay_htable", (void*)keepalive_replay_data_free_cb,
NULL, local_logger);
if(keepalive_replay_htable == NULL){
KNI_LOG_ERROR(local_logger, "Failed at create keepalive_replay_htable");
goto error_out;
}
g_kni_handle->keepalive_replay_htable = keepalive_replay_htable;
}
//create thread_tfe_data_receiver
for(int i = 0; i < g_kni_handle->tfe_data_recv_thread_num; i++){
struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1);
args->logger = local_logger;
args->marsio_handle = g_kni_handle->marsio_handle;
args->thread_seq = i;
int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);
FREE(&args);
goto error_out;
}
}
//create thread_tfe_cmsg_receiver
cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1);
cmsg_receiver_args->logger = local_logger;
strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1));
ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret);
FREE(&cmsg_receiver_args);
goto error_out;
}
return 0;
error_out:
kni_destroy(g_kni_handle);
return -1;
}