This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-kni/entry/src/kni_entry.cpp
2019-06-04 13:25:44 +08:00

1321 lines
52 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "kni_utils.h"
#include "ssl_utils.h"
#include "marsio.h"
#include "kni_maat.h"
#include "MESA/http.h"
#include "kni_cmsg.h"
#include "uuid/uuid.h"
#include "cjson/cJSON.h"
#include "kni_send_logger.h"
extern int g_iThreadNum;
//APP_STATE_DROPME/GIVEME: 当前tcp会话的剩下包是否回调
//APP_STATE_FAWPKT/DROPPKT: 当前包是否丢弃or转发如果是丢弃当前包不会给后面的插件
//PROT_STATE_GIVEME/DROPME: 当前http会话的剩下包是否回调
//seq, ack 是当拿到client hello时传给秋秋取client hello的 seq, ack, 时间戳和sack没有解, 不用解只需要知道enable/disable即可
//TODO: 注意内存泄漏ALLOC对应的FREE, 还有calloc
//函数加static
//统计syn/syn/ack个数流个数 pending not syn个数, not syn/ack个数, 单向流数量, 发往tfe的包数流数收到的包数流数 done
//多个tcpall插件APP_STATE_DROPPKT, APP_STATE_FAWPKT? 有一个droppkt就不给后面的插件了
//一个tcp流中有多个httpssl会话的情况只扫描第一个
//TODO: 统计增加多个tfe实例
//TODO: kafka写日志
//TODO: taobao穿了
//TODO: 捕的包里面第一个client hello是没有控制信息的
//TODO: bypass之后继续统计不要dropme, dropme之后会立即调用close
//TODO:
/*
1. 对pme来说只有sapp和tfe都release之后才会释放内存
1.1. sapp有自己的超时淘汰所以肯定会release
1.2. 当收到tfe的cmsg信息时, set tfe_release = 1
1.3 从收到sapp的release信息之后开始计时(查一下hash表),超时之后 set tfe_release = 1
2. 对traceid2pme的hash表来说
2.1 收到tfe的cmsg信息之后需要查询查完就可以删掉了
2.2 hash表的超时淘汰函数:
sapp_release = 0表示计时未开始不能删除
sapp_release = 1表示已经收到sapp的release信息, 确定超时
*/
struct kni_handle *g_kni_handle = NULL;
struct kni_field_stat_handle *g_kni_fs_handle = NULL;
//int g_http_project_id;
//struct kni_marsio_handle *g_kni_marsio_handle;
//g_iThreadNum 为sapp线程数
#define HTTP_PROJECT_NAME "kni_http_tag"
#define BURST_MAX 1
#define STREAM_TRACE_ID_LEN 37
#define TFE_COUNT_MAX 16
enum kni_protocol{
KNI_PROTOCOL_UNKNOWN = 0,
KNI_PROTOCOL_SSL,
KNI_PROTOCOL_HTTP,
};
struct http_project{
int host_len;
char host[KNI_DOMAIN_MAX];
};
struct pme_info{
int protocol;
int policy_id;
int action;
int service;
struct kni_tcpopt_info *client_tcpopt;
struct kni_tcpopt_info *server_tcpopt;
int tfe_id;
void *logger;
char stream_trace_id[STREAM_TRACE_ID_LEN];
char host[KNI_DOMAIN_MAX]; //http only
char sni[KNI_DOMAIN_MAX]; //ssl only
//tfe_release = 1: tfe don't need pmeinfo
int tfe_release;
int sapp_release;
//kafka log
struct streaminfo *stream;
time_t start_time;
uint64_t con_duration;
//from tfe, kafka log
int intercept_state;
int pinningst; //默认为0, 表示没有从tfe收到
uint64_t ssl_server_side_latency;
uint64_t ssl_client_side_latency;
char ssl_server_side_version[KNI_SYMBOL_MAX];
char ssl_client_side_version[KNI_SYMBOL_MAX];
int ssl_cert_verify;
char ssl_error[KNI_STRING_MAX];
};
struct wrapped_packet{
char data[KNI_MTU];
};
struct tcp_option_restore{
uint8_t kind;
uint8_t len;
uint16_t offset;
};
struct tfe_instance{
struct mr_vdev *dev_eth_handler;
struct mr_sendpath *dev_eth_sendpath;
char mac_addr[6];
};
struct kni_marsio_handle{
struct mr_instance *instance;
struct tfe_instance *tfe_instance_list[TFE_COUNT_MAX];
struct mr_vdev *dev_vxlan_handler;
struct mr_sendpath *dev_vxlan_sendpath;
char src_mac_addr[6];
};
struct protocol_identify_result{
int protocol;
char domain[KNI_DOMAIN_MAX];
int domain_len;
};
struct thread_tfe_data_receiver_args{
void *logger;
struct kni_marsio_handle *marsio_handle;
int tfe_id;
};
struct thread_tfe_cmsg_receiver_args{
void *logger;
char profile[KNI_SYMBOL_MAX];
};
//TODO: 有些字段可以不要
struct pkt_info{
struct iphdr *iphdr;
int iphdr_len;
int ip_totlen;
struct tcphdr *tcphdr;
int tcphdr_len;
char *data;
int data_len;
};
struct kni_handle{
int http_project_id;
struct kni_marsio_handle *marsio_handle;
struct kni_maat_handle *maat_handle;
struct kni_send_logger *send_logger;
MESA_htable_handle traceid2pme_htable;
int tfe_count;
uint32_t local_ipv4;
void *local_logger;
};
struct traceid2pme_search_cb_args{
struct kni_cmsg *cmsg;
void *logger;
};
static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread_seq, void *logger){
struct pme_info* pmeinfo = ALLOC(struct pme_info, 1);
pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1;
uuid_t uu;
uuid_generate_random(uu);
uuid_unparse(uu, pmeinfo->stream_trace_id);
pmeinfo->stream = (struct streaminfo*)stream;
pmeinfo->start_time = time(NULL);
pmeinfo->logger = logger;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1);
return pmeinfo;
}
static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
//创建cjson对象
cJSON *log_obj = cJSON_CreateObject();
//stream_trace_id
cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_trace_id);
//policy_id
cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id);
//action
cJSON_AddNumberToObject(log_obj, "action", pmeinfo->action);
//service
cJSON_AddNumberToObject(log_obj, "service", pmeinfo->service);
//start_time
cJSON_AddNumberToObject(log_obj, "start_time", pmeinfo->start_time);
//end_time
cJSON_AddNumberToObject(log_obj, "end_time", time(NULL));
//stream_info: addr_type, trans_proto, client_ip, client_port, server_ip, server_port
const struct layer_addr *addr = &(pmeinfo->stream->addr);
char client_ip_str[INET6_ADDRSTRLEN] = "";
char server_ip_str[INET6_ADDRSTRLEN] = "";
switch(addr->addrtype){
case ADDR_TYPE_IPV4:
cJSON_AddNumberToObject(log_obj, "addr_type", 4);
inet_ntop(AF_INET, &addr->tuple4_v4->saddr, client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET, &addr->tuple4_v4->daddr, server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v4->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v4->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv4_TCP");
break;
case ADDR_TYPE_IPV6:
cJSON_AddNumberToObject(log_obj, "addr_type", 6);
inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, client_ip_str, sizeof(client_ip_str));
inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, server_ip_str, sizeof(server_ip_str));
cJSON_AddStringToObject(log_obj, "client_ip", client_ip_str);
cJSON_AddStringToObject(log_obj, "server_ip", server_ip_str);
cJSON_AddNumberToObject(log_obj, "client_port", ntohs(addr->tuple4_v6->source));
cJSON_AddNumberToObject(log_obj, "server_port", ntohs(addr->tuple4_v6->dest));
cJSON_AddStringToObject(log_obj, "trans_proto", "IPv6_TCP");
break;
default:
break;
}
//entrance_id: 0
cJSON_AddNumberToObject(log_obj, "entrance_id", 0);
//device_id: 0
cJSON_AddNumberToObject(log_obj, "device_id", 0);
//link_id: 0
cJSON_AddNumberToObject(log_obj, "link_id", 0);
//isp: null
cJSON_AddStringToObject(log_obj, "isp", "");
//encap_type: from sapp, 先填0
cJSON_AddNumberToObject(log_obj, "encap_type", 0);
//pinning state: from tfe
cJSON_AddNumberToObject(log_obj, "pinningst", pmeinfo->pinningst);
//intercept state: from tfe
cJSON_AddNumberToObject(log_obj, "intercept_state", pmeinfo->intercept_state);
//ssl upstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_server_side_latency", pmeinfo->ssl_server_side_latency);
//ssl downstream latency: from tfe
cJSON_AddNumberToObject(log_obj, "ssl_client_side_latency", pmeinfo->ssl_client_side_latency);
//ssl upstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_server_side_version", pmeinfo->ssl_server_side_version);
//ssl downstream version: from tfe
cJSON_AddStringToObject(log_obj, "ssl_client_side_version", pmeinfo->ssl_client_side_version);
//ssl cert verify
cJSON_AddNumberToObject(log_obj, "ssl_cert_verify", pmeinfo->ssl_cert_verify);
//direction: 0
cJSON_AddNumberToObject(log_obj, "direction", 0);
//stream_dir: from sapp
cJSON_AddNumberToObject(log_obj, "stream_dir", pmeinfo->stream->dir);
//cap_ip: 从网卡名得到
char local_ipv4_str[INET6_ADDRSTRLEN];
inet_ntop(AF_INET, &(g_kni_handle->local_ipv4), local_ipv4_str, sizeof(local_ipv4_str));
cJSON_AddStringToObject(log_obj, "cap_ip", local_ipv4_str);
//addr_list
cJSON_AddStringToObject(log_obj, "addr_list", "");
//host: http_only
cJSON_AddStringToObject(log_obj, "host", pmeinfo->host);
//sni: ssl only
cJSON_AddStringToObject(log_obj, "sni", pmeinfo->sni);
//c2s_pkt_num
cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->stream->ptcpdetail->serverpktnum);
//s2c_pkt_num
cJSON_AddNumberToObject(log_obj, "s2c_pkt_num", pmeinfo->stream->ptcpdetail->clientpktnum);
//c2s_byte_num
cJSON_AddNumberToObject(log_obj, "c2s_byte_num", pmeinfo->stream->ptcpdetail->serverbytes);
//s2c_byte_num
cJSON_AddNumberToObject(log_obj, "s2c_byte_num", pmeinfo->stream->ptcpdetail->clientbytes);
int ret = -1;
char *log_msg = cJSON_Print(log_obj);
cJSON_Delete(log_obj);
if(log_msg == NULL){
KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print");
goto error_out;
}
KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg);
ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d", ret);
goto error_out;
//重试逻辑?
}
FREE(&log_msg);
return 0;
error_out:
if(log_msg != NULL){
FREE(&log_msg);
}
return -1;
}
static void pme_info_destroy(struct pme_info *pmeinfo){
void *logger = pmeinfo->logger;
if(pmeinfo != NULL && pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){
int ret = sendlog_to_kafka(pmeinfo, logger);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at sendlog to kafka");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
}
else{
KNI_LOG_INFO(logger, "Succeed sendlog to kafka");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
}
if(pmeinfo->client_tcpopt != NULL){
FREE(&(pmeinfo->client_tcpopt));
}
if(pmeinfo->server_tcpopt != NULL){
FREE(&(pmeinfo->server_tcpopt));
}
FREE(&pmeinfo);
}
}
static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){
//判断是http
struct http_project* project = (struct http_project*)project_req_get_struct(stream, g_kni_handle->http_project_id);
if(project != NULL){
result->protocol = KNI_PROTOCOL_HTTP;
result->domain_len = project->host_len;
memcpy(result->domain, project->host, result->domain_len);
return 0;
}
//判断是ssl
enum chello_parse_result chello_status = CHELLO_PARSE_INVALID_FORMAT;
struct ssl_chello *chello = NULL;
chello = ssl_chello_parse((const unsigned char*)buf, len, &chello_status);
if(chello_status == CHELLO_PARSE_SUCCESS){
result->protocol = KNI_PROTOCOL_SSL;
if(chello->sni == NULL){
result->domain_len = 0;
}
else{
result->domain_len = strnlen(chello->sni, KNI_DOMAIN_MAX);
memcpy(result->domain, chello->sni, result->domain_len);
}
ssl_chello_free(chello);
return 0;
}
ssl_chello_free(chello);
result->protocol = KNI_PROTOCOL_UNKNOWN;
return 0;
}
static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, uint16_t size){
void *logger = g_kni_handle->local_logger;
int ret = kni_cmsg_set(cmsg, type, value, size);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d", type);
}
return ret;
}
static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, struct pkt_info *pktinfo, uint16_t *len){
void *logger = g_kni_handle->local_logger;
uint16_t bufflen = 0, serialize_len = 0;
unsigned char *buff = NULL;
uint8_t protocol_type = pmeinfo->protocol == KNI_PROTOCOL_SSL ? 0x1 : 0x0;
struct kni_cmsg *cmsg = kni_cmsg_init();
int policy_id = -1;
char *trace_id = NULL;
uint32_t seq = pktinfo->tcphdr->seq;
uint32_t ack = pktinfo->tcphdr->ack_seq;
uint16_t client_mss = htons(pmeinfo->client_tcpopt->mss);
uint16_t server_mss = htons(pmeinfo->server_tcpopt->mss);
//seq
int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4);
if(ret < 0) goto error_out;
//ack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4);
if(ret < 0) goto error_out;
//client mss
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2);
if(ret < 0) goto error_out;
//server mss
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2);
if(ret < 0) goto error_out;
//client wscale
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1);
if(ret < 0) goto error_out;
//server wscale
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1);
if(ret < 0) goto error_out;
//client sack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1);
if(ret < 0) goto error_out;
//server sack
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1);
if(ret < 0) goto error_out;
//client timestamp
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1);
if(ret < 0) goto error_out;
//server timestamp
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1);
if(ret < 0) goto error_out;
//protocol
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1);
if(ret < 0) goto error_out;
//maat policy id
policy_id = pmeinfo->policy_id;
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id));
if(ret < 0) goto error_out;
//stream trace id
trace_id = pmeinfo->stream_trace_id;
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id, STREAM_TRACE_ID_LEN);
if(ret < 0) goto error_out;
bufflen = kni_cmsg_serialize_size_get(cmsg);
buff = (unsigned char*)ALLOC(char, bufflen);
serialize_len = 0;
ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d", ret);
goto error_out;
}
*len = serialize_len;
kni_cmsg_destroy(cmsg);
return buff;
error_out:
kni_cmsg_destroy(cmsg);
return NULL;
}
static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktinfo, int *len){
//tcp option: kind 88, len 4, control_info_len
char *new_pkt = (char*)ALLOC(struct wrapped_packet, 1);
struct iphdr *iphdr = (struct iphdr*)new_pkt;
int offset = 0;
//iphdr
memcpy(new_pkt, (void*)pktinfo->iphdr, pktinfo->iphdr_len);
offset += pktinfo->iphdr_len;
//tcphdr
struct tcphdr *tcphdr = (struct tcphdr*)(new_pkt + offset);
memcpy(new_pkt + offset, (void*)pktinfo->tcphdr, 20);
offset += 20;
tcphdr->doff = pktinfo->tcphdr->doff + 1;
struct tcp_option_restore *opt = ALLOC(struct tcp_option_restore, 1);
opt->kind = 88;
opt->len = 4;
opt->offset = htons(pktinfo->data_len);
memcpy(new_pkt + offset, (void*)opt, 4);
offset += 4;
memcpy(new_pkt + offset, (void*)((char*)pktinfo->tcphdr + 20), pktinfo->tcphdr_len - 20);
offset += pktinfo->tcphdr_len - 20;
//data
memcpy(new_pkt + offset, (void*)pktinfo->data, pktinfo->data_len);
offset += pktinfo->data_len;
//kni_cmsg_serialize_header
uint16_t header_len = 0;
unsigned char* header = kni_cmsg_serialize_header_new(pmeinfo, pktinfo, &header_len);
memcpy(new_pkt + offset, (void*)header, header_len);
offset += header_len;
FREE(&header);
//iphdr: tot_len
iphdr->tot_len = htons(offset);
//iphdr: checksum
//计算校验和之前一定要先置0
iphdr->check = 0;
iphdr->check = kni_ip_checksum((void*)iphdr, pktinfo->iphdr_len);
//tcphdr: checkdum
tcphdr->check = 0;
tcphdr->check = kni_tcp_checksum((void*)tcphdr, offset - pktinfo->iphdr_len, iphdr->saddr, iphdr->daddr);
*len = offset;
return new_pkt;
}
static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw_len, int thread_seq, int tfe_id){
void *logger = g_kni_handle->local_logger;
marsio_buff_t *tx_buffs[BURST_MAX];
unsigned int ret = 1;
//TODO: marsio配置文件: 2500
//thread_seq实际上是网卡队列一个线程对应一个网卡队列, 并不是线程号和网卡队列号一一对应假设线程号是tid网卡队列为n那么tid % n就是网卡队列号
struct mr_vdev *dev_eth_handler = handle->tfe_instance_list[tfe_id]->dev_eth_handler;
struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath;
char *src_mac = handle->src_mac_addr;
char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr;
int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, ret, 0, thread_seq);
if (alloc_ret < 0){
KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d", ret, thread_seq);
return -1;
}
char* dst_data = marsio_buff_append(tx_buffs[0], raw_len + 14);
//ethernet_header[14]
memcpy(dst_data, dst_mac, 6);
memcpy(dst_data + 6, src_mac, 6);
dst_data[12] = 0x08;
dst_data[13] = 0x00;
memcpy((char*)dst_data + 14, raw_data, raw_len);
marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, ret);
return 0;
}
static char pending_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo){
void *logger = g_kni_handle->local_logger;
if(!pktinfo->tcphdr->syn){
//pending_opstate 不是syn, bypass这个流
KNI_LOG_ERROR(logger, "pending opstate: not syn");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN_EXP], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
//异常情况不需要等tfe release, 直接释放
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
pmeinfo->client_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
static int get_action(struct ipaddr *addr, char *domain, int domain_len, int thread_seq, int *policy_id){
//return KNI_ACTION_INTERCEPT;
int action = kni_maat_scan_ip(g_kni_handle->maat_handle, addr, thread_seq, policy_id);
if(action == KNI_ACTION_BYPASS){
return action;
}
if(domain_len != 0){
action = kni_maat_scan_domain(g_kni_handle->maat_handle, domain, domain_len, thread_seq, policy_id);
}
return action;
}
//TODO: 这一块逻辑需要和洋姐和秋秋讨论一下
static char data_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){
void *logger = g_kni_handle->local_logger;
char *buf = (char*)pktinfo->iphdr;
int len = pktinfo->ip_totlen;
char stream_addr[KNI_SYMBOL_MAX] = "";
int ret;
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
//保证pmeinfo->action只有KNI_ACTION_NONE KNI_ACTION_INTERCEPT KNI_ACTION_BYPASS三种情况
switch (pmeinfo->action){
case KNI_ACTION_NONE:
break;
case KNI_ACTION_INTERCEPT:
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream_addr is %s", pmeinfo->tfe_id, stream_addr);
}
else{
KNI_LOG_DEBUG(logger, "Succeed at send continue packet to tfe%d, stream_addr is %s", pmeinfo->tfe_id, stream_addr);
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
default:
break;
}
//TODO: client hello如果跨包怎么办client hello后面一个包先到这个包该丢掉还是bypass
//此时 action = KNI_ACTION_UNKNOWN, 说明还没收到第一个数据包
// syn/ack包
if(pktinfo->tcphdr->syn && pktinfo->tcphdr->ack){
pmeinfo->server_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
if(pktinfo->data_len <= 0){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
//单向流, bypass and dropme
if(stream->dir != DIR_DOUBLE){
KNI_LOG_INFO(logger, "dir is %d, bypass, stream addr is %s", stream->dir, stream_addr);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
struct protocol_identify_result *result = ALLOC(struct protocol_identify_result, 1);
protocol_identify(stream, pktinfo->data, pktinfo->data_len, result);
pmeinfo->protocol = result->protocol;
//第一个数据包: 如果从第一个数据包判断不出协议, bypass and dropme
if(pmeinfo->protocol == KNI_PROTOCOL_UNKNOWN){
KNI_LOG_INFO(logger, "Failed at protocol_identify, bypass and dropme, stream addr is %s\n",
pmeinfo->protocol, stream_addr);
FREE(&result);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STM], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//protocol = KNI_PROTOCOL_SSL/KNI_PROTOCOL_HTTP, 判断action, action返回值: KNI_ACTION_INTERCEPT/KNI_ACTION_BYPASS
if(pmeinfo->protocol == KNI_PROTOCOL_SSL){
memcpy(pmeinfo->sni, result->domain, result->domain_len);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SSL_STM], 0, FS_OP_ADD, 1);
}
if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){
memcpy(pmeinfo->host, result->domain, result->domain_len);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_HTTP_STM], 0, FS_OP_ADD, 1);
}
pmeinfo->action = get_action((struct ipaddr*)(&stream->addr), result->domain, result->domain_len, thread_seq, &(pmeinfo->policy_id));
//输出maat拦截日志
char domain_str[KNI_DOMAIN_MAX] = "";
memcpy(domain_str, result->domain, result->domain_len);
KNI_LOG_DEBUG(logger, "get_action: %s, %s, policy_id = %d, action = %s",
stream_addr, domain_str, pmeinfo->policy_id, pmeinfo->action == KNI_ACTION_BYPASS ? "bypass" : "intercept");
FREE(&result);
//TODO: 这块比较奇怪, 收到client hello, 但是没有syn/ack包, 直接bypass了
if(pmeinfo->client_tcpopt == NULL || pmeinfo->server_tcpopt == NULL){
KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "",
pmeinfo->server_tcpopt == NULL ? "no syn/ack" : "");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA_EXP], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
switch(pmeinfo->action){
case KNI_ACTION_BYPASS:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
case KNI_ACTION_INTERCEPT:
//action = KNI_ACTION_INTERCEPT, 带上控制信息发送给qq, 要修改ip, tcp的校验和
buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len);
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream_trace_id is %s", pmeinfo->tfe_id, pmeinfo->stream_trace_id);
}
else{
KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream_trace_id is %s", pmeinfo->tfe_id, pmeinfo->stream_trace_id);
}
FREE(&buf);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_STM], 0, FS_OP_ADD, 1);
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
default:
//action非法bypass and dropme
KNI_LOG_ERROR(logger, "Action %d is Invalid, policy_id is %d, bypass(dropme)", pmeinfo->action, pmeinfo->policy_id);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
pmeinfo->tfe_release = 1;
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){
//close 数据也要发送给tfe
void *logger = g_kni_handle->local_logger;
char *buf = (char*)pktinfo->iphdr;
char stream_addr[KNI_SYMBOL_MAX] = "";
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
int len = pktinfo->ip_totlen;
int ret;
switch(pmeinfo->action){
case KNI_ACTION_INTERCEPT:
ret =send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at send last packet to tfe%d, stream addr is %s",
pmeinfo->tfe_id, stream_addr);
}
else{
KNI_LOG_DEBUG(logger, "Succeed at send last packet to tfe%d, stream addr is %s",
pmeinfo->tfe_id, stream_addr);
}
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_DROPPKT | APP_STATE_DROPME;
default:
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
}
//从syn包开始回调
extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){
void *logger = g_kni_handle->local_logger;
//KNI_LOG_DEBUG(logger, "call kni_tcpall_entry");
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1);
//当前包bypass, 剩下包bypass
//TODO: ipv6暂时不处理, ipv6: 通过nexthdr链式寻找tcp头(IPPROTO_TCP)
if(stream->addr.addrtype == ADDR_TYPE_IPV6){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_DROPME;
}
//a_packet == NULL, 不处理这个包
if(a_packet == NULL){
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
}
struct pme_info *pmeinfo = *(struct pme_info **)pme;
//pktinfo
struct pkt_info *pktinfo = (struct pkt_info*)ALLOC(struct pkt_info, 1);
pktinfo->iphdr = (struct iphdr*)a_packet;
pktinfo->iphdr_len = pktinfo->iphdr->ihl * 4;
pktinfo->ip_totlen = ntohs(pktinfo->iphdr->tot_len);
pktinfo->tcphdr = (struct tcphdr*)((char*)pktinfo->iphdr + pktinfo->iphdr_len);
pktinfo->tcphdr_len = pktinfo->tcphdr->doff * 4;
pktinfo->data = (char*)pktinfo->tcphdr + pktinfo->tcphdr_len;
pktinfo->data_len = pktinfo->ip_totlen - pktinfo->iphdr_len - pktinfo->tcphdr_len;
int ret;
switch(stream->pktstate){
case OP_STATE_PENDING:
*pme = pmeinfo = pme_info_new(stream, thread_seq, logger);
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_trace_id),
strlen(pmeinfo->stream_trace_id), (const void*)pmeinfo);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add,"
"table is traceid2pme_htable, key is %s", pmeinfo->stream_trace_id);
}
ret = pending_opstate(stream, pmeinfo, pktinfo);
break;
case OP_STATE_DATA:
ret = data_opstate(stream, pmeinfo, pktinfo, thread_seq);
break;
case OP_STATE_CLOSE:
ret = close_opstate(stream, pmeinfo, pktinfo, thread_seq);
break;
default:
ret = APP_STATE_FAWPKT | APP_STATE_GIVEME;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP], 0, FS_OP_ADD, 1);
KNI_LOG_ERROR(logger, "Unknown stream opstate %d", stream->pktstate);
break;
}
FREE(&pktinfo);
if((ret & APP_STATE_DROPME)){
//sendlog_to_kafka(pmeinfo, logger);
pmeinfo->sapp_release = 1;
pme_info_destroy(pmeinfo);
*pme = NULL;
}
return ret;
}
void http_project_free(int thread_seq, void *project_req_value){
FREE(&project_req_value);
}
static int http_project_init(){
void *logger = g_kni_handle->local_logger;
int id = project_producer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT, http_project_free);
if(id < 0){
KNI_LOG_ERROR(logger, "Failed at project_producer_register, project name is %s, ret is %d", HTTP_PROJECT_NAME, id);
return -1;
}
id = project_customer_register(HTTP_PROJECT_NAME, PROJECT_VAL_TYPE_STRUCT);
if(id < 0){
KNI_LOG_ERROR(logger, "Failed at project_customer_register, project name is %s, ret is %d", HTTP_PROJECT_NAME, id);
return -1;
}
return id;
}
extern "C" char kni_http_entry(stSessionInfo* session_info, void **pme, int thread_seq, struct streaminfo *a_stream, const void *a_packet){
http_infor* http_info = (http_infor*)(session_info->app_info);
//http_session_seq = 1表示只处理tcp链接中的第一个http会话
if(http_info->http_session_seq != 1){
return PROT_STATE_DROPME;
}
if(session_info->prot_flag != HTTP_HOST){
return PROT_STATE_GIVEME;
}
int host_len = MIN(session_info->buflen, KNI_DEFAULT_MTU);
struct http_project* host_info = ALLOC(struct http_project, 1);
host_info->host_len = host_len;
memcpy(host_info->host, session_info->buf, host_len);
if(project_req_add_struct(a_stream, g_kni_handle->http_project_id, host_info) < 0){
FREE(&host_info);
host_info = NULL;
}
return PROT_STATE_DROPME;
}
static void kni_marsio_destroy(struct kni_marsio_handle *handle){
//TODO: dev_handler, dev_sendpath不需要free吗
if(handle != NULL){
if(handle->instance != NULL){
marsio_destory(handle->instance);
}
for(int i = 0; i < TFE_COUNT_MAX; i++){
FREE(&handle->tfe_instance_list[i]);
}
}
FREE(&handle);
handle = NULL;
}
void* thread_tfe_data_receiver(void *args){
struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args;
//void *logger = _args->logger;
struct kni_marsio_handle *marsio_handle = _args->marsio_handle;
int tfe_id = _args->tfe_id;
struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[tfe_id]->dev_eth_handler;
FREE(&args);
marsio_buff_t * rx_buff[BURST_MAX];
int nr_burst = 1;
//实际上是网卡队列id
int thread_seq = 0;
while(true){
//从tfe上收
int ret = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst);
if(ret <= 0){
continue;
}
//打上标签
struct mr_tunnat_ctrlzone mr_ctrlzone;
mr_ctrlzone.action |= TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER;
for(int i = 0; i < ret; i++){
marsio_buff_ctrlzone_set(rx_buff[i], 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone));
}
//发送给vxlan
marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, 1, MARSIO_SEND_OPT_FAST);
}
return NULL;
}
static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type, uint16_t value_size_max, void *logger){
uint16_t value_size = 0;
unsigned char *value = NULL;
int ret = kni_cmsg_get(cmsg, type, &value_size, &value);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", type, ret);
return -1;
}
if(value_size > value_size_max){
KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %s, size is %d, which should <= %d", type, value_size, value_size_max);
return -1;
}
switch(type)
{
case TFE_CMSG_SSL_INTERCEPT_STATE:
memcpy((char*)&(pmeinfo->intercept_state), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_server_side_latency), value, value_size);
break;
case TFE_CMSG_SSL_DOWNSTREAM_LATENCY:
memcpy((char*)&(pmeinfo->ssl_client_side_latency), value, value_size);
break;
case TFE_CMSG_SSL_UPSTREAM_VERSION:
memcpy(pmeinfo->ssl_server_side_version, value, value_size);
break;
case TFE_CMSG_SSL_DOWNSTREAM_VERSION:
memcpy(pmeinfo->ssl_client_side_version, value, value_size);
break;
case TFE_CMSG_SSL_PINNING_STATE:
memcpy((char*)&(pmeinfo->pinningst), value, value_size);
break;
case TFE_CMSG_SSL_CERT_VERIFY:
memcpy((char*)&(pmeinfo->ssl_cert_verify), value, value_size);
break;
case TFE_CMSG_SSL_ERROR:
memcpy((char*)&(pmeinfo->ssl_error), value, value_size);
break;
default:
break;
}
return 0;
}
static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size, void *user_args){
struct traceid2pme_search_cb_args *args = (struct traceid2pme_search_cb_args*)user_args;
void *logger = args->logger;
struct kni_cmsg *cmsg = args->cmsg;
struct pme_info *pmeinfo = (struct pme_info*)data;
if(pmeinfo != NULL){
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, sizeof(pmeinfo->intercept_state), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_LATENCY, sizeof(pmeinfo->ssl_server_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_LATENCY, sizeof(pmeinfo->ssl_client_side_latency), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_UPSTREAM_VERSION, sizeof(pmeinfo->ssl_server_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_DOWNSTREAM_VERSION, sizeof(pmeinfo->ssl_client_side_version) - 1, logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_PINNING_STATE, sizeof(pmeinfo->pinningst), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger);
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger);
pmeinfo->tfe_release = 1;
int ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id,
sizeof(pmeinfo->stream_trace_id), NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, ret is %d",
"traceid2pme_htable", pmeinfo->stream_trace_id, ret);
}
}
return 0;
}
void* thread_tfe_cmsg_receiver(void *args){
struct thread_tfe_cmsg_receiver_args *_args = (struct thread_tfe_cmsg_receiver_args*)args;
const char *profile = _args->profile;
const char *section = "tfe_cmsg_receiver";
void *logger = _args->logger;
char listen_eth[INET_ADDRSTRLEN];
uint32_t listen_ip;
int listen_port = -1;
char buff[KNI_MTU];
int sockfd;
struct sockaddr_in server_addr, client_addr;
int ret = MESA_load_profile_string_nodef(profile, section, "listen_eth", listen_eth, sizeof(listen_eth));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: listen_eth not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "listen_port", &listen_port);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: listen_port not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d",
section, listen_eth, listen_port);
FREE(&args);
//create socket
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at create udp socket, errno is %d, %s", errno, strerror(errno));
goto error_out;
}
memset(&server_addr, 0, sizeof(server_addr));
memset(&client_addr, 0, sizeof(client_addr));
ret = kni_ipv4_addr_get_by_eth(listen_eth, &listen_ip);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", listen_eth);
goto error_out;
}
server_addr.sin_family = AF_INET; // IPv4
server_addr.sin_addr.s_addr = listen_ip;
server_addr.sin_port = htons(listen_port);
//bind
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at bind udp socket, errno is %d, %s", errno, strerror(errno));
goto error_out;
}
//receive
while(true){
socklen_t client_len = sizeof(client_addr);
int recv_len = recvfrom(sockfd, (char *)buff, sizeof(buff), MSG_WAITALL,
(struct sockaddr*)&client_addr, &client_len);
if(recv_len < 0){
KNI_LOG_ERROR(logger, "Failed at recv udp data, errno is %d, %s", errno, strerror(errno));
continue;
}
KNI_LOG_DEBUG(logger, "recv udp data: recv_len is %d\n", recv_len);
struct kni_cmsg *cmsg = NULL;
ret = kni_cmsg_deserialize((const unsigned char*)buff, recv_len, &cmsg);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret is %d", ret);
continue;
}
//get stream_trace_id
unsigned char *stream_trace_id = NULL;
uint16_t value_size;
ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_trace_id);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", TFE_CMSG_STREAM_TRACE_ID, ret);
continue;
}
//get pme
long cb_ret = -1;
struct traceid2pme_search_cb_args cb_args;
memset((void*)&cb_args, sizeof(cb_args), 0);
cb_args.cmsg = cmsg;
cb_args.logger = logger;
MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_trace_id,
strlen((char*)stream_trace_id), traceid2pme_htable_search_cb, &cb_args, &cb_ret);
}
return NULL;
error_out:
if(sockfd >= 0){
close(sockfd);
}
return NULL;
}
static struct kni_marsio_handle* kni_marsio_init(const char* profile){
void *logger = g_kni_handle->local_logger;
const char* section = "marsio";
char appsym[KNI_SYMBOL_MAX];
char dev_vxlan_symbol[KNI_SYMBOL_MAX];
char src_mac_addr_str[KNI_SYMBOL_MAX];
unsigned int opt_value = 1;
int tfe_count;
struct mr_instance *mr_inst = NULL;
struct mr_vdev *dev_vxlan_handler = NULL;
struct mr_sendpath *dev_vxlan_sendpath = NULL;
struct mr_vdev *dev_eth_handler = NULL;
struct mr_sendpath *dev_eth_sendpath = NULL;
struct tfe_instance *tfe_inst = NULL;
struct kni_marsio_handle *handle = NULL;
int ret = MESA_load_profile_string_nodef(profile, section, "appsym", appsym, sizeof(appsym));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: appsym not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "dev_vxlan_symbol", dev_vxlan_symbol, sizeof(dev_vxlan_symbol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: dev_vxlan_symbol not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "src_mac_addr", src_mac_addr_str, sizeof(src_mac_addr_str));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n appsym: %s\n dev_vxlan_symbol: %s\n src_mac_addr: %s",
section, appsym, dev_vxlan_symbol, src_mac_addr_str);
mr_inst = marsio_create();
if(mr_inst == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio instance");
goto error_out;
}
handle = ALLOC(struct kni_marsio_handle, 1);
handle->instance = mr_inst;
ret = sscanf(src_mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
&(handle->src_mac_addr[0]), &(handle->src_mac_addr[1]),
&(handle->src_mac_addr[2]), &(handle->src_mac_addr[3]),
&(handle->src_mac_addr[4]), &(handle->src_mac_addr[5]));
if(ret != 6){
KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, section);
goto error_out;
}
marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value));
//uint64_t cpu_mask = 0x3c; //??
//marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK, &cpu_mask, sizeof(cpu_mask));
marsio_init(mr_inst, appsym);
//eth_handler有一个线程收, g_iThreadNum个线程发
tfe_count = g_kni_handle->tfe_count;
for(int i = 0; i < tfe_count; i++){
//load tfe conf
char _section[KNI_SYMBOL_MAX];
char mac_addr_str[KNI_SYMBOL_MAX];
char dev_eth_symbol[KNI_SYMBOL_MAX];
snprintf(_section, sizeof(_section), "tfe%d", i);
int ret = MESA_load_profile_string_nodef(profile, _section, "mac_addr", mac_addr_str, sizeof(mac_addr_str));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr not set, profile is %s, section is %s", profile, _section);
goto error_out;
}
tfe_inst = ALLOC(struct tfe_instance, 1);
//转化mac地址, ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa
ret = sscanf(mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx",
&tfe_inst->mac_addr[0], &tfe_inst->mac_addr[1],
&tfe_inst->mac_addr[2], &tfe_inst->mac_addr[3],
&tfe_inst->mac_addr[4], &tfe_inst->mac_addr[5]);
if(ret != 6){
KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, _section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, _section, "dev_eth_symbol", dev_eth_symbol, sizeof(dev_eth_symbol));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile is %s, section is %s", profile, _section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s",
_section, mac_addr_str, dev_eth_symbol);
//handler
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum);
if(dev_eth_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol);
goto error_out;
}
//sendpath
dev_eth_sendpath = marsio_sendpath_create_by_vdev(dev_eth_handler);
if(dev_eth_sendpath == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_eth_symbol);
goto error_out;
}
//tfe_instance
tfe_inst->dev_eth_handler = dev_eth_handler;
tfe_inst->dev_eth_sendpath = dev_eth_sendpath;
handle->tfe_instance_list[i] = tfe_inst;
}
//vxlan_handler有0个线程收, 1个线程发
dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, 1);
if(dev_vxlan_handler == NULL){
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol);
goto error_out;
}
handle->dev_vxlan_handler = dev_vxlan_handler;
//vxlan sendpath
dev_vxlan_sendpath = marsio_sendpath_create_by_vdev(dev_vxlan_handler);
if(dev_eth_sendpath == NULL){
KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_vxlan_symbol);
goto error_out;
}
handle->dev_vxlan_sendpath = dev_vxlan_sendpath;
//暂时不用调
//marsio_thread_init(mr_instance);
return handle;
error_out:
kni_marsio_destroy(handle);
return NULL;
}
static void fs_destroy(struct kni_field_stat_handle *fs_handle){
if(fs_handle != NULL){
FS_stop(&(fs_handle->handle));
}
FREE(&fs_handle);
}
static struct kni_field_stat_handle * fs_init(const char *profile){
void *logger = g_kni_handle->local_logger;
const char *section = "field_stat";
char stat_path[KNI_PATH_MAX];
struct kni_field_stat_handle *fs_handle = NULL;
screen_stat_handle_t handle = NULL;
const char *app_name = "fs2_kni";
int value = 0;
int ret = MESA_load_profile_string_nodef(profile, section, "stat_path", stat_path, sizeof(stat_path));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: stat_path not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n stat_path: %s\n", "field_stat", stat_path);
handle = FS_create_handle();
if(handle == NULL){
KNI_LOG_ERROR(logger, "Failed at create FS_create_handle");
goto error_out;
}
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
fs_handle->handle = handle;
FS_set_para(handle, APP_NAME, app_name, strlen(app_name) + 1);
FS_set_para(handle, OUTPUT_DEVICE, stat_path, strlen(stat_path)+1);
value = 0;
FS_set_para(handle, FLUSH_BY_DATE, &value, sizeof(value));
value = 1;
FS_set_para(handle, PRINT_MODE, &value, sizeof(value));
value = 1;
FS_set_para(handle, CREATE_THREAD, &value, sizeof(value));
value = 5;
FS_set_para(handle, STAT_CYCLE, &value, sizeof(value));
value = 4096;
FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
fs_handle->handle = handle;
fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt");
fs_handle->fields[KNI_FIELD_BYP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_pkt");
fs_handle->fields[KNI_FIELD_INTCP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_pkt");
fs_handle->fields[KNI_FIELD_IPV6_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_pkt");
fs_handle->fields[KNI_FIELD_NULL_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "null_pkt");
fs_handle->fields[KNI_FIELD_NO_SYN_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_syn_pkt");
fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_state");
fs_handle->fields[KNI_FIELD_NO_SA_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_s/a_pkt");
fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm");
fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm");
fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm");
fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm");
fs_handle->fields[KNI_FIELD_HTTP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "http_stm");
fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_succ");
fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_fail");
fs_handle->fields[KNI_FIELD_UNKNOWN_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_stm");
fs_handle->handle = handle;
FS_start(handle);
return fs_handle;
error_out:
fs_destroy(fs_handle);
return NULL;
}
extern "C" void kni_destroy(struct kni_handle *handle){
if(handle != NULL){
}
FREE(&handle);
handle = NULL;
}
static void traceid2pme_htable_data_free_cb(void *data){
struct pme_info *pmeinfo = (struct pme_info*)data;
pme_info_destroy(pmeinfo);
}
//eliminate_type: 0:FIFO; 1:LRU
//ret: 1: the item can be eliminated; 0: the item can't be eliminated
static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
struct pme_info *pmeinfo = (struct pme_info*)data;
if(pmeinfo->sapp_release == 0){
return 0;
}
pmeinfo->tfe_release = 1;
return 1;
}
extern "C" int kni_init(){
const char *profile = "./conf/kni/kni.conf";
const char *section = "global";
//init logger
char log_path[KNI_PATH_MAX] = "";
int tfe_count = 0;
char local_eth[KNI_SYMBOL_MAX] = "";
struct kni_send_logger *send_logger = NULL;
struct kni_field_stat_handle *fs_handle = NULL;
int id = -1;
void *local_logger = NULL;
int log_level = -1;
pthread_t thread_id = -1;
struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args;
MESA_htable_handle traceid2pme_htable = NULL;
int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path));
if(ret < 0){
printf("MESA_prof_load: log_path not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "log_level", &log_level);
if(ret < 0){
printf("MESA_prof_load: log_level not set, profile is %s, section is %s", profile, section);
goto error_out;
}
local_logger = MESA_create_runtime_log_handle(log_path, log_level);
if (unlikely(local_logger == NULL)){
printf("Failed at create logger: %s", log_path);
goto error_out;
}
ret = MESA_load_profile_int_nodef(profile, section, "tfe_count", &tfe_count);
if(ret < 0){
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section);
goto error_out;
}
if(tfe_count > TFE_COUNT_MAX){
KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX);
goto error_out;
}
if(tfe_count <= 0){
KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "local_eth", local_eth, sizeof(local_eth));
if(ret < 0){
printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s",
section, log_path, log_level, tfe_count, local_eth);
g_kni_handle = ALLOC(struct kni_handle, 1);
g_kni_handle->local_logger = local_logger;
g_kni_handle->tfe_count = tfe_count;
//init http_project
id = http_project_init();
if(id < 0){
KNI_LOG_ERROR(local_logger, "Failed at init http project, ret is %d", id);
goto error_out;
}
g_kni_handle->http_project_id = id;
//init marsio
g_kni_handle->marsio_handle = kni_marsio_init(profile);
if(g_kni_handle->marsio_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init marsio");
goto error_out;
}
//创建线程从tfe收包然后打上标签发送给vxlan_user
for(int i = 0; i < tfe_count; i++){
struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1);
args->logger = local_logger;
args->marsio_handle = g_kni_handle->marsio_handle;
args->tfe_id = i;
int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);
FREE(&args);
goto error_out;
}
}
//创建线程从tfe收取cmsg控制信息
cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1);
cmsg_receiver_args->logger = local_logger;
memcpy(cmsg_receiver_args->profile, profile, strlen(profile));
ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret);
FREE(&cmsg_receiver_args);
goto error_out;
}
//init maat
g_kni_handle->maat_handle = kni_maat_init(profile, local_logger);
if(g_kni_handle->maat_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init maat");
goto error_out;
}
//init_filedstat
fs_handle = fs_init(profile);
if(fs_handle == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init field_stat");
goto error_out;
}
g_kni_fs_handle = fs_handle;
//init local_ipv4
ret = kni_ipv4_addr_get_by_eth(local_eth, &(g_kni_handle->local_ipv4));
if(ret < 0){
KNI_LOG_ERROR(local_logger, "Failed at get bind ipv4 addr, eth is %s", local_eth);
goto error_out;
}
//init kni_send_logger
send_logger = kni_send_logger_init(profile, local_logger);
if(send_logger == NULL){
KNI_LOG_ERROR(local_logger, "Failed at init kni_send_logger", local_eth);
goto error_out;
}
g_kni_handle->send_logger = send_logger;
//init traceid2pme_htable
traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", (void*)traceid2pme_htable_data_free_cb,
(void*)traceid2pme_htable_expire_notify_cb, local_logger);
if(traceid2pme_htable == NULL){
KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable");
goto error_out;
}
g_kni_handle->traceid2pme_htable = traceid2pme_htable;
return 0;
error_out:
kni_destroy(g_kni_handle);
return -1;
}