@@ -10,43 +10,10 @@
extern int g_iThreadNum ;
//APP_STATE_DROPME/GIVEME: 当前tcp会话的剩下包是否回调
//APP_STATE_FAWPKT/DROPPKT: 当前包是否丢弃or转发, 如果是丢弃, 当前包不会给后面的插件
//PROT_STATE_GIVEME/DROPME: 当前http会话的剩下包是否回调
//seq, ack 是当拿到client hello时传给秋秋, 取client hello的 seq, ack, 时间戳和sack没有解, 不用解, 只需要知道enable/disable即可
//TODO: 注意内存泄漏, ALLOC对应的FREE, 还有calloc
//函数加static
//统计syn/syn/ack个数, 流个数, pending not syn个数, not syn/ack个数, 单向流数量, 发往tfe的包数, 流数, 收到的包数, 流数 done
//多个tcpall插件, APP_STATE_DROPPKT, APP_STATE_FAWPKT? 有一个droppkt, 就不给后面的插件了
//一个tcp流中有多个http, ssl会话的情况, 只扫描第一个
//TODO: 统计增加多个tfe实例
//TODO: kafka写日志
//TODO: taobao穿了? ?
//TODO: 捕的包里面第一个client hello是没有控制信息的
//TODO: bypass之后继续统计, 不要dropme, dropme之后会立即调用close
//TODO:
/*
1. 对pme来说, 只有sapp和tfe都release之后才会释放内存
1.1. sapp有自己的超时淘汰, 所以肯定会release
1.2. 当收到tfe的cmsg信息时, set tfe_release = 1
1.3 从收到sapp的release信息之后开始计时(查一下hash表),超时之后 set tfe_release = 1
2. 对traceid2pme的hash表来说
2.1 收到tfe的cmsg信息之后需要查询, 查完就可以删掉了
2.2 hash表的超时淘汰函数:
sapp_release = 0表示计时未开始, 不能删除
sapp_release = 1表示已经收到sapp的release信息, 确定超时
*/
struct kni_handle * g_kni_handle = NULL ;
struct kni_field_stat_handle * g_kni_fs_handle = NULL ;
//int g_http_project_id;
//struct kni_marsio_handle *g_kni_marsio_handle;
//g_iThreadNum 为sapp线程数
# define HTTP_PROJECT_NAME "kni_http_tag"
# define BURST_MAX 1
# define STREAM_TRACE_ID_LEN 37
@@ -87,7 +54,7 @@ struct pme_info{
uint64_t con_duration ;
//from tfe, kafka log
int intercept_state ;
int pinningst ; //默认为0, 表示没有从tfe收到
int pinningst ; //defalut 0
uint64_t ssl_server_side_latency ;
uint64_t ssl_client_side_latency ;
char ssl_server_side_version [ KNI_SYMBOL_MAX ] ;
@@ -138,7 +105,6 @@ struct thread_tfe_cmsg_receiver_args{
char profile [ KNI_SYMBOL_MAX ] ;
} ;
//TODO: 有些字段可以不要
struct pkt_info {
struct iphdr * iphdr ;
int iphdr_len ;
@@ -179,7 +145,7 @@ static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread
}
static int sendlog_to_kafka ( struct pme_info * pmeinfo , void * local_logger ) {
//创建 cjson对象
//create cjson
cJSON * log_obj = cJSON_CreateObject ( ) ;
//stream_trace_id
cJSON_AddStringToObject ( log_obj , " stream_trace_id " , pmeinfo - > stream_trace_id ) ;
@@ -251,7 +217,7 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
cJSON_AddNumberToObject ( log_obj , " direction " , 0 ) ;
//stream_dir: from sapp
cJSON_AddNumberToObject ( log_obj , " stream_dir " , pmeinfo - > stream - > dir ) ;
//cap_ip: 从网卡名得到
//cap_ip: kni ip
char local_ipv4_str [ INET6_ADDRSTRLEN ] ;
inet_ntop ( AF_INET , & ( g_kni_handle - > local_ipv4 ) , local_ipv4_str , sizeof ( local_ipv4_str ) ) ;
cJSON_AddStringToObject ( log_obj , " cap_ip " , local_ipv4_str ) ;
@@ -281,14 +247,13 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
if ( ret < 0 ) {
KNI_LOG_ERROR ( local_logger , " Failed at kni_send_logger_sendlog, ret is %d " , ret ) ;
goto error_out ;
//重试逻辑?
}
FREE ( & log_msg ) ;
cJSON_free ( log_msg ) ;
return 0 ;
error_out :
if ( log_msg ! = NULL ) {
FREE ( & log_msg ) ;
cJSON_free ( log_msg ) ;
}
return - 1 ;
}
@@ -316,16 +281,16 @@ static void pme_info_destroy(struct pme_info *pmeinfo){
}
static int protocol_identify ( const struct streaminfo * stream , char * buf , int len , struct protocol_identify_result * result ) {
//判断是 http
//http
struct http_project * project = ( struct http_project * ) project_req_get_struct ( stream , g_kni_handle - > http_project_id ) ;
if ( project ! = NULL ) {
result - > protocol = KNI_PROTOCOL_HTTP ;
result - > domain_len = project - > host_len ;
mem cpy( result - > domain , project - > host , result - > domain_len ) ;
strn cpy( result - > domain , project - > host , strnlen ( project - > host , sizeof ( result- > domain) ) ) ;
return 0 ;
}
//判断是 ssl
//ssl
enum chello_parse_result chello_status = CHELLO_PARSE_INVALID_FORMAT ;
struct ssl_chello * chello = NULL ;
chello = ssl_chello_parse ( ( const unsigned char * ) buf , len , & chello_status ) ;
@@ -336,7 +301,7 @@ static int protocol_identify(const struct streaminfo* stream, char *buf, int len
}
else {
result - > domain_len = strnlen ( chello - > sni , KNI_DOMAIN_MAX ) ;
mem cpy( result - > domain , chello - > sni , result - > domain_len ) ;
strn cpy( result - > domain , chello - > sni , strnlen ( chello - > sni , sizeof ( result- > domain) ) ) ;
}
ssl_chello_free ( chello ) ;
return 0 ;
@@ -458,8 +423,7 @@ static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktin
FREE ( & header ) ;
//iphdr: tot_len
iphdr - > tot_len = htons ( offset ) ;
//iphdr: checksum
//计算校验和之前一定要先置0
//must set check = 0
iphdr - > check = 0 ;
iphdr - > check = kni_ip_checksum ( ( void * ) iphdr , pktinfo - > iphdr_len ) ;
//tcphdr: checkdum
@@ -473,8 +437,6 @@ static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw
void * logger = g_kni_handle - > local_logger ;
marsio_buff_t * tx_buffs [ BURST_MAX ] ;
unsigned int ret = 1 ;
//TODO: marsio配置文件: 2500
//thread_seq实际上是网卡队列, 一个线程对应一个网卡队列, 并不是线程号和网卡队列号一一对应, 假设线程号是tid, 网卡队列为n, 那么tid % n就是网卡队列号
struct mr_vdev * dev_eth_handler = handle - > tfe_instance_list [ tfe_id ] - > dev_eth_handler ;
struct mr_sendpath * dev_eth_sendpath = handle - > tfe_instance_list [ tfe_id ] - > dev_eth_sendpath ;
char * src_mac = handle - > src_mac_addr ;
@@ -498,12 +460,11 @@ static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw
static char pending_opstate ( const struct streaminfo * stream , struct pme_info * pmeinfo , struct pkt_info * pktinfo ) {
void * logger = g_kni_handle - > local_logger ;
if ( ! pktinfo - > tcphdr - > syn ) {
//pending_opstate 不是 syn, bypass这个流
//pending_opstate not syn, bypass and dropme
KNI_LOG_ERROR ( logger , " pending opstate: not syn " ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_NO_SYN_EXP ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_STM ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
//异常情况, 不需要等tfe release, 直接释放
pmeinfo - > tfe_release = 1 ;
return APP_STATE_FAWPKT | APP_STATE_DROPME ;
}
@@ -512,7 +473,6 @@ static char pending_opstate(const struct streaminfo *stream, struct pme_info *pm
return APP_STATE_FAWPKT | APP_STATE_GIVEME ;
}
//TODO: 这一块逻辑需要和洋姐和秋秋讨论一下
static char data_opstate ( const struct streaminfo * stream , struct pme_info * pmeinfo , struct pkt_info * pktinfo , int thread_seq ) {
void * logger = g_kni_handle - > local_logger ;
char * buf = ( char * ) pktinfo - > iphdr ;
@@ -520,7 +480,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
char stream_addr [ KNI_SYMBOL_MAX ] = " " ;
int ret ;
kni_stream_addr_trans ( ( struct ipaddr * ) ( & stream - > addr ) , stream_addr , sizeof ( stream_addr ) ) ;
//保证 pmeinfo->action只有 KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS三种情况
//pmeinfo->action has only 3 value: KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS
switch ( pmeinfo - > action ) {
case KNI_ACTION_NONE :
break ;
@@ -529,9 +489,6 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
if ( ret < 0 ) {
KNI_LOG_ERROR ( logger , " Failed at send continue packet to tfe%d, stream_addr is %s " , pmeinfo - > tfe_id , stream_addr ) ;
}
else {
KNI_LOG_DEBUG ( logger , " Succeed at send continue packet to tfe%d, stream_addr is %s " , pmeinfo - > tfe_id , stream_addr ) ;
}
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_INTCP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
return APP_STATE_DROPPKT | APP_STATE_GIVEME ;
case KNI_ACTION_BYPASS :
@@ -540,9 +497,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
default :
break ;
}
//TODO: client hello如果跨包怎么办? client hello后面一个包先到, 这个包该丢掉还是bypass
//此时 action = KNI_ACTION_UNKNOWN, 说明还没收到第一个数据包
// syn/ack包
// syn/ack
if ( pktinfo - > tcphdr - > syn & & pktinfo - > tcphdr - > ack ) {
pmeinfo - > server_tcpopt = kni_get_tcpopt ( pktinfo - > tcphdr , pktinfo - > tcphdr_len ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
@@ -552,7 +507,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
return APP_STATE_FAWPKT | APP_STATE_GIVEME ;
}
//单向流 , bypass and dropme
//not double dir , bypass and dropme
if ( stream - > dir ! = DIR_DOUBLE ) {
KNI_LOG_INFO ( logger , " dir is %d, bypass, stream addr is %s " , stream - > dir , stream_addr ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_STM ] , 0 , FS_OP_ADD , 1 ) ;
@@ -560,40 +515,40 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
pmeinfo - > tfe_release = 1 ;
return APP_STATE_FAWPKT | APP_STATE_DROPME ;
}
struct protocol_identify_result * result = ALLOC ( struct protocol_identify_result , 1 ) ;
protocol_identify( stream , pktinfo - > data , pktinfo - > data_len , result ) ;
pmeinfo - > protocol = result - > protocol ;
//第一个数据包: 如果从第一个数据包判断不出协议, bypass and dropme
if ( pmeinfo - > protocol = = KNI_PROTOCOL_UNKNOWN ) {
KNI_LOG_INFO ( logger , " Failed at protocol_identify, bypass and dropme, stream addr is %s \n " ,
struct protocol_identify_result protocol_identify_res;
memset ( & protocol_identify_res , 0 , sizeof ( struct protocol_identify_ result) ) ;
protocol_identify ( stream , pktinfo - > data , pktinfo - > data_len , & protocol_identify_res ) ;
pmeinfo - > protocol = protocol_identify_res . protocol ;
switch ( pmeinfo - > protocol ) {
//can not identify protocol from first data packet, bypass and dropme
case KNI_PROTOCOL_UNKNOWN :
KNI_LOG_INFO ( logger , " Failed at protocol_identify, bypass and dropme, stream addr is %s \n " ,
pmeinfo - > protocol , stream_addr ) ;
FREE ( & result ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP _STM ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_UNKNOWN_STM ] , 0 , FS_OP_ADD , 1 ) ;
pmeinfo - > tfe_release = 1 ;
return APP_STATE_FAWPKT | APP_STATE_DROPME ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_STM ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_UNKNOWN _STM ] , 0 , FS_OP_ADD , 1 ) ;
pmeinfo - > tfe_release = 1 ;
return APP_STATE_FAWPKT | APP_STATE_DROPME ;
case KNI_PROTOCOL_SSL :
strncpy ( pmeinfo - > sni , protocol_identify_res . domain , strnlen ( protocol_identify_res . domain , sizeof ( pmeinfo - > sni ) ) ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_SSL_STM ] , 0 , FS_OP_ADD , 1 ) ;
break ;
case KNI_PROTOCOL_HTTP :
strncpy ( pmeinfo - > host , protocol_identify_res . domain , strnlen ( protocol_identify_res . domain , sizeof ( pmeinfo - > host ) ) ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_HTTP_STM ] , 0 , FS_OP_ADD , 1 ) ;
break ;
default :
break ;
}
//protocol = KNI_PROTOCOL_SSL/KNI_PROTOCOL_HTTP, 判断action, action返回值: KNI_ACTION_INTERCEPT/KNI_ACTION_BYPASS
if ( pmeinfo - > protocol = = KNI_PROTOCOL_SSL ) {
memcpy ( pmeinfo - > sni , result - > domain , result - > domain_len ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_SSL_STM ] , 0 , FS_OP_ADD , 1 ) ;
}
else if ( pmeinfo - > protocol = = KNI_PROTOCOL_HTTP ) {
memcpy ( pmeinfo - > host , result - > domain , result - > domain_len ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_HTTP_STM ] , 0 , FS_OP_ADD , 1 ) ;
}
pmeinfo - > action = intercept_policy_scan ( g_kni_handle - > maat_handle , ( struct ipaddr * ) ( & stream - > addr ) , result - > domain , result - > domain_len ,
pmeinfo - > action = intercept_policy_scan ( g_kni_handle - > maat_handle , ( struct ipaddr * ) ( & stream - > addr ) ,
protocol_identify_res . domain , protocol_identify_res . domain_len ,
thread_seq , & ( pmeinfo - > policy_id ) , & ( pmeinfo - > maat_hit ) ) ;
//输出maat拦截日志
char domain_str [ KNI_DOMAIN_MAX ] = " " ;
memcpy ( domain_str , result - > domain , result - > domain_len ) ;
//policy scan log
char action_str [ KNI_SYMBOL_MAX ] ;
kni_maat_action_trans ( pmeinfo - > action , action_str ) ;
KNI_LOG_DEBUG ( logger , " intercept_policy_scan: %s, %s, policy_id = %d, action = %d(%s), maat_hit = %d " ,
stream_addr , domain_str , pmeinfo - > policy_id , pmeinfo - > action , action_str , pmeinfo - > maat_hit ) ;
FREE ( & result ) ;
//TODO: 这块比较奇怪, 收到client hello, 但是没有syn/ack包, 直接bypass了
stream_addr , protocol_identify_res . domain, pmeinfo - > policy_id , pmeinfo - > action , action_str , pmeinfo - > maat_hit ) ;
//receive client hello, but no syn/ack, bypass and dropme
if ( pmeinfo - > client_tcpopt = = NULL | | pmeinfo - > server_tcpopt = = NULL ) {
KNI_LOG_ERROR ( logger , " Failed at intercept, %s, %s " , pmeinfo - > client_tcpopt = = NULL ? " no syn " : " " ,
pmeinfo - > server_tcpopt = = NULL ? " no syn/ack " : " " ) ;
@@ -609,22 +564,23 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_STM ] , 0 , FS_OP_ADD , 1 ) ;
return APP_STATE_FAWPKT | APP_STATE_GIVEME ;
case KNI_ACTION_INTERCEPT :
//action = KNI_ACTION_INTERCEPT, 带上控制信息发送给qq, 要修改ip, tcp的校验和
//action = KNI_ACTION_INTERCEPT, sendto tfe
buf = add_cmsg_to_packet ( pmeinfo , pktinfo , & len ) ;
ret = send_to_tfe ( g_kni_handle - > marsio_handle , buf , len , thread_seq , pmeinfo - > tfe_id ) ;
if ( ret < 0 ) {
KNI_LOG_ERROR ( logger , " Failed at send first packet to tfe%d, stream_trace_id is %s " , pmeinfo - > tfe_id , pmeinfo - > stream_trace_id ) ;
KNI_LOG_ERROR ( logger , " Failed at send first packet to tfe%d, stream addr is %s " , pmeinfo - > tfe_id , stream_addr ) ;
}
else {
KNI_LOG_DEBUG( logger, " Succeed at send first packet to tfe%d, stream_trace_id is %s " , pmeinfo - > tfe_id , pmeinfo - > stream_trace_id ) ;
// KNI_LOG_DEBUG( logger, " Succeed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr);
}
FREE ( & buf ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_INTCP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_INTCP_STM ] , 0 , FS_OP_ADD , 1 ) ;
return APP_STATE_DROPPKT | APP_STATE_GIVEME ;
default :
//action非法 , bypass and dropme
KNI_LOG_ERROR ( logger , " Action %d is I nvalid, policy_id is %d, bypass(dropme) " , pmeinfo - > action , pmeinfo - > policy_id ) ;
//action != intercept && action != bypass , bypass and dropme
KNI_LOG_ERROR ( logger , " Action %d(%s) is i nvalid, bypass(dropme): policy_id is %d, stream addr is %s, domain is " ,
pmeinfo - > action , action_str , pmeinfo - > policy_id , stream_addr , protocol_identify_res . domain ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_STM ] , 0 , FS_OP_ADD , 1 ) ;
pmeinfo - > tfe_release = 1 ;
@@ -633,7 +589,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
}
static char close_opstate ( const struct streaminfo * stream , struct pme_info * pmeinfo , struct pkt_info * pktinfo , int thread_seq ) {
//close 数据也要发送给 tfe
//close: sendto tfe
void * logger = g_kni_handle - > local_logger ;
char * buf = ( char * ) pktinfo - > iphdr ;
char stream_addr [ KNI_SYMBOL_MAX ] = " " ;
@@ -642,14 +598,13 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei
int ret ;
switch ( pmeinfo - > action ) {
case KNI_ACTION_INTERCEPT :
ret = send_to_tfe ( g_kni_handle - > marsio_handle , buf , len , thread_seq , pmeinfo - > tfe_id ) ;
ret = send_to_tfe ( g_kni_handle - > marsio_handle , buf , len , thread_seq , pmeinfo - > tfe_id ) ;
if ( ret < 0 ) {
KNI_LOG_ERROR ( logger , " Failed at send last packet to tfe%d, stream addr is %s " ,
pmeinfo - > tfe_id , stream_addr ) ;
}
else {
KNI_LOG_DEBUG( logger, " Succeed at send last packet to tfe%d, stream addr is %s" ,
pmeinfo - > tfe_id , stream_addr ) ;
// KNI_LOG_DEBUG( logger, " Succeed at send last packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr);
}
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_INTCP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
return APP_STATE_DROPPKT | APP_STATE_DROPME ;
@@ -659,19 +614,17 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei
}
}
//从syn包开始回调
//from syn
extern " C " char kni_tcpall_entry ( const struct streaminfo * stream , void * * pme , int thread_seq , const void * a_packet ) {
void * logger = g_kni_handle - > local_logger ;
//KNI_LOG_DEBUG(logger, "call kni_tcpall_entry");
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_TOT_PKT ] , 0 , FS_OP_ADD , 1 ) ;
//当前包bypass, 剩下包bypass
//TODO: ipv6暂时不处理, ipv6: 通过nexthdr链式寻找tcp头(IPPROTO_TCP)
//TODO: ipv6
if ( stream - > addr . addrtype = = ADDR_TYPE_IPV6 ) {
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_IPV6_PKT ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
return APP_STATE_FAWPKT | APP_STATE_DROPME ;
}
//a_packet == NULL, 不处理这个包
//a_packet == NULL, continue
if ( a_packet = = NULL ) {
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_NULL_PKT ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_kni_fs_handle - > handle , g_kni_fs_handle - > fields [ KNI_FIELD_BYP_PKT ] , 0 , FS_OP_ADD , 1 ) ;
@@ -743,7 +696,7 @@ static int http_project_init(){
extern " C " char kni_http_entry ( stSessionInfo * session_info , void * * pme , int thread_seq , struct streaminfo * a_stream , const void * a_packet ) {
http_infor * http_info = ( http_infor * ) ( session_info - > app_info ) ;
//http_session_seq = 1表示只处理tcp链接中的第一个http会话
//only process first http session
if ( http_info - > http_session_seq ! = 1 ) {
return PROT_STATE_DROPME ;
}
@@ -762,7 +715,6 @@ extern "C" char kni_http_entry(stSessionInfo* session_info, void **pme, int thr
}
static void kni_marsio_destroy ( struct kni_marsio_handle * handle ) {
//TODO: dev_handler, dev_sendpath不需要free吗
if ( handle ! = NULL ) {
if ( handle - > instance ! = NULL ) {
marsio_destory ( handle - > instance ) ;
@@ -777,28 +729,26 @@ static void kni_marsio_destroy(struct kni_marsio_handle *handle){
void * thread_tfe_data_receiver ( void * args ) {
struct thread_tfe_data_receiver_args * _args = ( struct thread_tfe_data_receiver_args * ) args ;
//void *logger = _args->logger;
struct kni_marsio_handle * marsio_handle = _args - > marsio_handle ;
int tfe_id = _args - > tfe_id ;
struct mr_vdev * dev_eth_handler = marsio_handle - > tfe_instance_list [ tfe_id ] - > dev_eth_handler ;
FREE ( & args ) ;
marsio_buff_t * rx_buff [ BURST_MAX ] ;
int nr_burst = 1 ;
//实际上是网卡队列id
int thread_seq = 0 ;
while ( true ) {
//从 tfe上收
//receive from tfe
int ret = marsio_recv_burst ( dev_eth_handler , thread_seq , rx_buff , nr_burst ) ;
if ( ret < = 0 ) {
continue ;
}
//打上标签
//tag
struct mr_tunnat_ctrlzone mr_ctrlzone ;
mr_ctrlzone . action | = TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER ;
for ( int i = 0 ; i < ret ; i + + ) {
marsio_buff_ctrlzone_set ( rx_buff [ i ] , 0 , & mr_ctrlzone , sizeof ( struct mr_tunnat_ctrlzone ) ) ;
}
//发送给 vxlan
//send to vxlan
marsio_send_burst_with_options ( marsio_handle - > dev_vxlan_sendpath , thread_seq , rx_buff , 1 , MARSIO_SEND_OPT_FAST ) ;
}
return NULL ;
@@ -1009,10 +959,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
goto error_out ;
}
marsio_option_set ( mr_inst , MARSIO_OPT_EXIT_WHEN_ERR , & opt_value , sizeof ( opt_value ) ) ;
//uint64_t cpu_mask = 0x3c; //??
//marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK, &cpu_mask, sizeof(cpu_mask));
marsio_init ( mr_inst , appsym ) ;
//eth_handler有一个线程收, g_iThreadNum个线程发
//eth_handler receive thread = 1, send thread = g_iThreadNum
tfe_count = g_kni_handle - > tfe_count ;
for ( int i = 0 ; i < tfe_count ; i + + ) {
//load tfe conf
@@ -1026,7 +974,7 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
goto error_out ;
}
tfe_inst = ALLOC ( struct tfe_instance , 1 ) ;
//转化mac地址, ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa
//ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa
ret = sscanf ( mac_addr_str , " %02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx " ,
& tfe_inst - > mac_addr [ 0 ] , & tfe_inst - > mac_addr [ 1 ] ,
& tfe_inst - > mac_addr [ 2 ] , & tfe_inst - > mac_addr [ 3 ] ,
@@ -1059,7 +1007,7 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
tfe_inst - > dev_eth_sendpath = dev_eth_sendpath ;
handle - > tfe_instance_list [ i ] = tfe_inst ;
}
//vxlan_handler有0个线程收, 1个线程发
//vxlan_handler: receive: 0 thread, send: 1
dev_vxlan_handler = marsio_open_device ( mr_inst , dev_vxlan_symbol , 0 , 1 ) ;
if ( dev_vxlan_handler = = NULL ) {
KNI_LOG_ERROR ( logger , " Failed at marsio_open_device, dev_symbol is %s " , dev_vxlan_symbol ) ;
@@ -1073,7 +1021,6 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
goto error_out ;
}
handle - > dev_vxlan_sendpath = dev_vxlan_sendpath ;
//暂时不用调
//marsio_thread_init(mr_instance);
return handle ;
@@ -1244,7 +1191,7 @@ extern "C" int kni_init(){
goto error_out ;
}
//创建线程从tfe收包然后打上标签发送给vxlan_us er
//create thread_tfe_data_receiv er
for ( int i = 0 ; i < tfe_count ; i + + ) {
struct thread_tfe_data_receiver_args * args = ALLOC ( struct thread_tfe_data_receiver_args , 1 ) ;
args - > logger = local_logger ;
@@ -1258,10 +1205,10 @@ extern "C" int kni_init(){
}
}
//创建线程从 tfe收取 cmsg控制信息
//create thread_ tfe_ cmsg_receiver
cmsg_receiver_args = ALLOC ( struct thread_tfe_cmsg_receiver_args , 1 ) ;
cmsg_receiver_args - > logger = local_logger ;
mem cpy( cmsg_receiver_args - > profile , profile , strlen ( profile ) ) ;
strn cpy( cmsg_receiver_args - > profile , profile , strn len ( profile , sizeof ( cmsg_receiver_args - > profile ) )) ;
ret = pthread_create ( & thread_id , NULL , thread_tfe_cmsg_receiver , ( void * ) cmsg_receiver_args ) ;
if ( unlikely ( ret ! = 0 ) ) {
KNI_LOG_ERROR ( local_logger , " Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d " , ret ) ;