修改packet为空的处理, 日志带上traceid, 增加统计日志, 修改pme释放的逻辑
This commit is contained in:
@@ -7,7 +7,8 @@
|
||||
#include "uuid/uuid.h"
|
||||
#include "cjson/cJSON.h"
|
||||
#include "kni_send_logger.h"
|
||||
#include<pthread.h>
|
||||
#include <pthread.h>
|
||||
#include <linux/if_ether.h>
|
||||
|
||||
extern int g_iThreadNum;
|
||||
|
||||
@@ -17,15 +18,12 @@ struct kni_field_stat_handle *g_kni_fs_handle = NULL;
|
||||
|
||||
#define HTTP_PROJECT_NAME "kni_http_tag"
|
||||
#define BURST_MAX 1
|
||||
#define STREAM_TRACE_ID_LEN 37
|
||||
#define stream_traceid_LEN 37
|
||||
#define TFE_COUNT_MAX 16
|
||||
#define CALLER_SAPP 0
|
||||
#define CALLER_TFE 1
|
||||
|
||||
|
||||
/* TODO:
|
||||
1. con_duration_ms: how to calculate
|
||||
2. sapp return dropme之后还会继续调用close吗
|
||||
*/
|
||||
|
||||
enum kni_protocol{
|
||||
KNI_PROTOCOL_UNKNOWN = 0,
|
||||
KNI_PROTOCOL_SSL,
|
||||
@@ -38,6 +36,7 @@ enum stream_error{
|
||||
STREAM_ERROR_PROTOCOL_UNKNOWN = -3,
|
||||
STREAM_ERROR_NO_SYN_ACK = -4,
|
||||
STREAM_ERROR_INVALID_ACTION = -5,
|
||||
STREAM_ERROR_NO_DATA = -6,
|
||||
};
|
||||
|
||||
struct http_project{
|
||||
@@ -58,9 +57,12 @@ struct pme_info{
|
||||
int tfe_id;
|
||||
pthread_mutex_t lock;
|
||||
enum stream_error error;
|
||||
char stream_trace_id[STREAM_TRACE_ID_LEN];
|
||||
char host[KNI_DOMAIN_MAX]; //http only
|
||||
char sni[KNI_DOMAIN_MAX]; //ssl only
|
||||
char stream_traceid[stream_traceid_LEN];
|
||||
//TODO: union, cjson check protocol
|
||||
union{
|
||||
char host[KNI_DOMAIN_MAX]; //http only
|
||||
char sni[KNI_DOMAIN_MAX]; //ssl only
|
||||
};
|
||||
//tfe_release = 1: tfe don't need pmeinfo
|
||||
int tfe_release;
|
||||
int sapp_release;
|
||||
@@ -172,6 +174,7 @@ static void pme_info_destroy(void *data){
|
||||
//free lock
|
||||
pthread_mutex_destroy(&(pmeinfo->lock));
|
||||
FREE(&pmeinfo);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_FREE], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_ERROR(logger, "Failed at pme_info_destroy, pmeinfo is null");
|
||||
@@ -184,16 +187,19 @@ static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread
|
||||
pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1;
|
||||
uuid_t uu;
|
||||
uuid_generate_random(uu);
|
||||
uuid_unparse(uu, pmeinfo->stream_trace_id);
|
||||
uuid_unparse(uu, pmeinfo->stream_traceid);
|
||||
pmeinfo->addr = layer_addr_dup(&(stream->addr));
|
||||
pmeinfo->start_time = time(NULL);
|
||||
char stream_addr[KNI_SYMBOL_MAX] = "";
|
||||
//init pme_lock
|
||||
int ret = pthread_mutex_init(&(pmeinfo->lock), NULL);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at init pthread mutex");
|
||||
KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream_traceid is %s", pmeinfo->stream_traceid);
|
||||
goto error_out;
|
||||
}
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1);
|
||||
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
|
||||
KNI_LOG_INFO(logger, "stream addr is %s, stream traceid is %s", stream_addr, pmeinfo->stream_traceid);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1);
|
||||
return pmeinfo;
|
||||
|
||||
error_out:
|
||||
@@ -204,8 +210,8 @@ error_out:
|
||||
static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
|
||||
//create cjson
|
||||
cJSON *log_obj = cJSON_CreateObject();
|
||||
//stream_trace_id
|
||||
cJSON_AddStringToObject(log_obj, "stream_trace_id", pmeinfo->stream_trace_id);
|
||||
//stream_traceid
|
||||
cJSON_AddStringToObject(log_obj, "stream_traceid", pmeinfo->stream_traceid);
|
||||
//policy_id
|
||||
cJSON_AddNumberToObject(log_obj, "policy_id", pmeinfo->policy_id);
|
||||
//action
|
||||
@@ -283,9 +289,13 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
|
||||
//addr_list
|
||||
cJSON_AddStringToObject(log_obj, "addr_list", "");
|
||||
//host: http_only
|
||||
cJSON_AddStringToObject(log_obj, "host", pmeinfo->host);
|
||||
if(pmeinfo->protocol == KNI_PROTOCOL_HTTP){
|
||||
cJSON_AddStringToObject(log_obj, "host", pmeinfo->host);
|
||||
}
|
||||
//sni: ssl only
|
||||
cJSON_AddStringToObject(log_obj, "sni", pmeinfo->sni);
|
||||
if(pmeinfo->protocol == KNI_PROTOCOL_SSL){
|
||||
cJSON_AddStringToObject(log_obj, "sni", pmeinfo->sni);
|
||||
}
|
||||
//c2s_pkt_num
|
||||
cJSON_AddNumberToObject(log_obj, "c2s_pkt_num", pmeinfo->server_pkts);
|
||||
//s2c_pkt_num
|
||||
@@ -298,13 +308,14 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){
|
||||
char *log_msg = cJSON_PrintUnformatted(log_obj);
|
||||
cJSON_Delete(log_obj);
|
||||
if(log_msg == NULL){
|
||||
KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print");
|
||||
KNI_LOG_ERROR(local_logger, "Failed at cJSON_Print, stream_treaceid is %s", pmeinfo->stream_traceid);
|
||||
goto error_out;
|
||||
}
|
||||
KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg);
|
||||
ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d", ret);
|
||||
KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d, strem_traceid is %s",
|
||||
ret, pmeinfo->stream_traceid);
|
||||
goto error_out;
|
||||
}
|
||||
cJSON_free(log_msg);
|
||||
@@ -316,33 +327,59 @@ error_out:
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
//return 0: has been destroy
|
||||
static int judge_pme_destroy(struct pme_info *pmeinfo){
|
||||
|
||||
static void judge_pme_destroy(struct pme_info *pmeinfo, int caller){
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
if(pmeinfo != NULL){
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
pthread_mutex_lock(&(pmeinfo->lock));
|
||||
if(caller == CALLER_SAPP){
|
||||
KNI_LOG_DEBUG(logger, "set sapp_release = 1, caller is %d, stream_trace_id is %s, thread id is %p",
|
||||
caller, pmeinfo->stream_traceid, pthread_self());
|
||||
pmeinfo->sapp_release = 1;
|
||||
}
|
||||
if(caller == CALLER_TFE){
|
||||
KNI_LOG_DEBUG(logger, "set tfe_release = 1, caller is %d, stream_trace_id is %s, thread id is %p",
|
||||
caller, pmeinfo->stream_traceid, pthread_self());
|
||||
pmeinfo->tfe_release = 1;
|
||||
}
|
||||
if(pmeinfo->sapp_release == 1 && pmeinfo->tfe_release == 1){
|
||||
//sendlog
|
||||
int ret = sendlog_to_kafka(pmeinfo, logger);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at sendlog to kafka, stream_trace_id is %s", pmeinfo->stream_trace_id);
|
||||
KNI_LOG_ERROR(logger, "Failed at sendlog to kafka, stream traceid is %s", pmeinfo->stream_traceid);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_FAIL], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_DEBUG(logger, "Succeed sendlog to kafka, stream_trace_id is %s", pmeinfo->stream_trace_id);
|
||||
KNI_LOG_DEBUG(logger, "Succeed sendlog to kafka, stream traceid is %s", pmeinfo->stream_traceid);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_SENDLOG_SUCC], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
//only intercetp stream need del htable
|
||||
if(pmeinfo->action == KNI_ACTION_INTERCEPT){
|
||||
int key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
|
||||
ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_traceid,
|
||||
key_size, NULL);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d",
|
||||
"traceid2pme_htable", pmeinfo->stream_traceid, key_size, ret);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d",
|
||||
"traceid2pme_htable", pmeinfo->stream_traceid, key_size);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
}
|
||||
//free pme
|
||||
pme_info_destroy(pmeinfo);
|
||||
return 0;
|
||||
}
|
||||
else{
|
||||
KNI_LOG_DEBUG(logger, "can not destroy pmeinfo, sapp_release = %d, tfe_release = %d", pmeinfo->sapp_release, pmeinfo->tfe_release);
|
||||
return;
|
||||
}
|
||||
KNI_LOG_DEBUG(logger, "can not destroy pmeinfo, sapp_release = %d, tfe_release = %d", pmeinfo->sapp_release, pmeinfo->tfe_release);
|
||||
pthread_mutex_unlock(&(pmeinfo->lock));
|
||||
}
|
||||
else{
|
||||
KNI_LOG_ERROR(logger, "Failed at judge_pme_info, pmeinfo is null");
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int protocol_identify(const struct streaminfo* stream, char *buf, int len, struct protocol_identify_result *result){
|
||||
@@ -376,11 +413,12 @@ static int protocol_identify(const struct streaminfo* stream, char *buf, int len
|
||||
result->protocol = KNI_PROTOCOL_UNKNOWN;
|
||||
return 0;
|
||||
}
|
||||
static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value, uint16_t size){
|
||||
static int wrapped_kni_cmsg_set(struct kni_cmsg *cmsg, uint16_t type, const unsigned char *value,
|
||||
uint16_t size, char *stream_traceid){
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
int ret = kni_cmsg_set(cmsg, type, value, size);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d", type);
|
||||
KNI_LOG_ERROR(logger, "Failed set cmsg, type is %d, stream traceid is %s", type, stream_traceid);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@@ -400,52 +438,52 @@ static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, st
|
||||
uint16_t client_window = htons(pmeinfo->client_window);
|
||||
uint16_t server_window = htons(pmeinfo->server_window);
|
||||
//seq
|
||||
int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4);
|
||||
int ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SEQ, (const unsigned char*)&seq, 4, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//ack
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_ACK, (const unsigned char*)&ack, 4, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//client mss
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_CLIENT, (const unsigned char*)&client_mss, 2, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//server mss
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_MSS_SERVER, (const unsigned char*)&server_mss, 2, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//client wscale
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->wscale), 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//server wscale
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->wscale), 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//client sack
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->sack), 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//server sack
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_SACK_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->sack), 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//client timestamp
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_CLIENT, (const unsigned char*)&(pmeinfo->client_tcpopt->ts), 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//server timestamp
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_TS_SERVER, (const unsigned char*)&(pmeinfo->server_tcpopt->ts), 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//protocol
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (const unsigned char*)&protocol_type, 1, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//client window
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_CLIENT, (const unsigned char*)&client_window, 2, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//server window
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2);
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_TCP_RESTORE_WINDOW_SERVER, (const unsigned char*)&server_window, 2, pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//maat policy id
|
||||
policy_id = pmeinfo->policy_id;
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id));
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_POLICY_ID, (const unsigned char*)&policy_id, sizeof(policy_id), pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
//stream trace id
|
||||
trace_id = pmeinfo->stream_trace_id;
|
||||
trace_id = pmeinfo->stream_traceid;
|
||||
ret = wrapped_kni_cmsg_set(cmsg, TFE_CMSG_STREAM_TRACE_ID, (const unsigned char*)trace_id,
|
||||
strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)));
|
||||
strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)), pmeinfo->stream_traceid);
|
||||
if(ret < 0) goto error_out;
|
||||
|
||||
bufflen = kni_cmsg_serialize_size_get(cmsg);
|
||||
@@ -453,7 +491,8 @@ static unsigned char* kni_cmsg_serialize_header_new(struct pme_info *pmeinfo, st
|
||||
serialize_len = 0;
|
||||
ret = kni_cmsg_serialize(cmsg, buff, bufflen, &serialize_len);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d", ret);
|
||||
KNI_LOG_ERROR(logger, "Failed at serialize cmsg, ret is %d, stream traceid is %s",
|
||||
ret, pmeinfo->stream_traceid);
|
||||
goto error_out;
|
||||
}
|
||||
*len = serialize_len;
|
||||
@@ -510,24 +549,28 @@ static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktin
|
||||
static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw_len, int thread_seq, int tfe_id){
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
marsio_buff_t *tx_buffs[BURST_MAX];
|
||||
unsigned int ret = 1;
|
||||
struct mr_vdev *dev_eth_handler = handle->tfe_instance_list[tfe_id]->dev_eth_handler;
|
||||
struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath;
|
||||
char *src_mac = handle->src_mac_addr;
|
||||
char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr;
|
||||
int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, ret, 0, thread_seq);
|
||||
//only send one packet
|
||||
int nr_send = 1;
|
||||
int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq);
|
||||
if (alloc_ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d", ret, thread_seq);
|
||||
KNI_LOG_ERROR(logger, "Failed at alloc marsio buffer, ret is %d, thread_seq is %d",
|
||||
alloc_ret, thread_seq);
|
||||
return -1;
|
||||
}
|
||||
char* dst_data = marsio_buff_append(tx_buffs[0], raw_len + 14);
|
||||
//ethernet_header[14]
|
||||
memcpy(dst_data, dst_mac, 6);
|
||||
memcpy(dst_data + 6, src_mac, 6);
|
||||
dst_data[12] = 0x08;
|
||||
dst_data[13] = 0x00;
|
||||
memcpy((char*)dst_data + 14, raw_data, raw_len);
|
||||
marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, ret);
|
||||
for(int i = 0; i < nr_send; i++){
|
||||
char* dst_data = marsio_buff_append(tx_buffs[i], raw_len + 14);
|
||||
//ethernet_header[14]
|
||||
struct ethhdr *ether_hdr = (struct ethhdr*)dst_data;
|
||||
memcpy(ether_hdr->h_dest, dst_mac, sizeof(ether_hdr->h_dest));
|
||||
memcpy(ether_hdr->h_source, src_mac, sizeof(ether_hdr->h_source));
|
||||
ether_hdr->h_proto = htons(ETH_P_IP);
|
||||
memcpy((char*)dst_data + sizeof(*ether_hdr), raw_data, raw_len);
|
||||
}
|
||||
marsio_send_burst(dev_eth_sendpath, thread_seq, tx_buffs, nr_send);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -535,16 +578,16 @@ static char pending_opstate(const struct streaminfo *stream, struct pme_info *pm
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
if(!pktinfo->tcphdr->syn){
|
||||
//pending_opstate not syn, bypass and dropme
|
||||
KNI_LOG_ERROR(logger, "pending opstate: not syn");
|
||||
KNI_LOG_DEBUG(logger, "pending opstate: not syn, stream traceid is %s", pmeinfo->stream_traceid);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SYN_EXP], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
pmeinfo->error = STREAM_ERROR_PENDING_NO_SYN;
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
pmeinfo->client_window = pktinfo->tcphdr->window;
|
||||
pmeinfo->client_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
}
|
||||
|
||||
@@ -556,8 +599,8 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
char *buf = (char*)pktinfo->iphdr;
|
||||
int len = pktinfo->ip_totlen;
|
||||
char stream_addr[KNI_SYMBOL_MAX] = "";
|
||||
int ret;
|
||||
char stream_addr[KNI_SYMBOL_MAX] = "";
|
||||
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
|
||||
//pmeinfo->action has only 3 value: KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS
|
||||
switch (pmeinfo->action){
|
||||
@@ -566,7 +609,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
|
||||
case KNI_ACTION_INTERCEPT:
|
||||
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream_addr is %s", pmeinfo->tfe_id, stream_addr);
|
||||
KNI_LOG_ERROR(logger, "Failed at send continue packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid);
|
||||
}
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_DROPPKT | APP_STATE_GIVEME;
|
||||
@@ -574,29 +617,31 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
// syn/ack
|
||||
if(pktinfo->tcphdr->syn && pktinfo->tcphdr->ack){
|
||||
pmeinfo->server_window = pktinfo->tcphdr->window;
|
||||
pmeinfo->server_tcpopt = kni_get_tcpopt(pktinfo->tcphdr, pktinfo->tcphdr_len);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
}
|
||||
//no data, maybe ack
|
||||
if(pktinfo->data_len <= 0){
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
}
|
||||
//not double dir, bypass and dropme
|
||||
if(stream->dir != DIR_DOUBLE){
|
||||
KNI_LOG_INFO(logger, "dir is %d, bypass, stream addr is %s", stream->dir, stream_addr);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
pmeinfo->error = STREAM_ERROR_SINGLE_DIR;
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
struct protocol_identify_result protocol_identify_res;
|
||||
memset(&protocol_identify_res, 0, sizeof(struct protocol_identify_result));
|
||||
memset(&protocol_identify_res, 0, sizeof(protocol_identify_res));
|
||||
protocol_identify(stream, pktinfo->data, pktinfo->data_len, &protocol_identify_res);
|
||||
pmeinfo->protocol = protocol_identify_res.protocol;
|
||||
switch(pmeinfo->protocol){
|
||||
@@ -604,8 +649,8 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
|
||||
case KNI_PROTOCOL_UNKNOWN:
|
||||
KNI_LOG_INFO(logger, "Failed at protocol_identify, bypass and dropme, stream addr is %s\n",
|
||||
pmeinfo->protocol, stream_addr);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STM], 0, FS_OP_ADD, 1);
|
||||
pmeinfo->error = STREAM_ERROR_PROTOCOL_UNKNOWN;
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
@@ -630,28 +675,40 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
|
||||
stream_addr, protocol_identify_res.domain, pmeinfo->policy_id, pmeinfo->action, action_str, pmeinfo->maat_hit);
|
||||
//receive client hello, but no syn/ack, bypass and dropme
|
||||
if(pmeinfo->client_tcpopt == NULL || pmeinfo->server_tcpopt == NULL){
|
||||
KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "",
|
||||
pmeinfo->server_tcpopt == NULL ? "no syn/ack" : "");
|
||||
KNI_LOG_ERROR(logger, "Failed at intercept, %s, %s, stream traceid is %s", pmeinfo->client_tcpopt == NULL ? "no syn" : "",
|
||||
pmeinfo->server_tcpopt == NULL ? "no syn/ack" : "", pmeinfo->stream_traceid);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NO_SA_EXP], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
pmeinfo->error = STREAM_ERROR_NO_SYN_ACK;
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
int key_size;
|
||||
switch(pmeinfo->action){
|
||||
case KNI_ACTION_BYPASS:
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
case KNI_ACTION_INTERCEPT:
|
||||
//only intercept: add to hash table
|
||||
key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid));
|
||||
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid),
|
||||
key_size, (const void*)pmeinfo);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add,"
|
||||
"table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add,"
|
||||
"table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1);
|
||||
}
|
||||
//action = KNI_ACTION_INTERCEPT, sendto tfe
|
||||
buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len);
|
||||
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr);
|
||||
}
|
||||
else{
|
||||
//KNI_LOG_DEBUG(logger, "Succeed at send first packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr);
|
||||
KNI_LOG_ERROR(logger, "Failed at send first packet to tfe%d, stream traceid is %s", pmeinfo->tfe_id, pmeinfo->stream_traceid);
|
||||
}
|
||||
FREE(&buf);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
|
||||
@@ -661,22 +718,17 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein
|
||||
//action != intercept && action != bypass,bypass and dropme
|
||||
KNI_LOG_ERROR(logger, "Action %d(%s) is invalid, bypass(dropme): policy_id is %d, stream addr is %s, domain is ",
|
||||
pmeinfo->action, action_str, pmeinfo->policy_id, stream_addr, protocol_identify_res.domain);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1);
|
||||
pmeinfo->error = STREAM_ERROR_INVALID_ACTION;
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
}
|
||||
|
||||
static char close_opstate(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, int thread_seq){
|
||||
//close: also need to send to tfe
|
||||
//close: a_packet = null, do not sendto tfe
|
||||
pmeinfo->end_time = time(NULL);
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
char *buf = (char*)pktinfo->iphdr;
|
||||
char stream_addr[KNI_SYMBOL_MAX] = "";
|
||||
kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr));
|
||||
int len = pktinfo->ip_totlen;
|
||||
int ret;
|
||||
pmeinfo->server_bytes=stream->ptcpdetail->serverbytes;
|
||||
pmeinfo->client_bytes=stream->ptcpdetail->clientbytes;
|
||||
pmeinfo->server_pkts=stream->ptcpdetail->serverpktnum;
|
||||
@@ -684,88 +736,69 @@ static char close_opstate(const struct streaminfo *stream, struct pme_info *pmei
|
||||
pmeinfo->dir=stream->dir;
|
||||
switch(pmeinfo->action){
|
||||
case KNI_ACTION_INTERCEPT:
|
||||
ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at send last packet to tfe%d, stream addr is %s",
|
||||
pmeinfo->tfe_id, stream_addr);
|
||||
}
|
||||
else{
|
||||
//KNI_LOG_DEBUG(logger, "Succeed at send last packet to tfe%d, stream addr is %s", pmeinfo->tfe_id, stream_addr);
|
||||
}
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_INTCP_PKT], 0, FS_OP_ADD, 1);
|
||||
//reset clock: when sapp end, start clock
|
||||
MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_trace_id,
|
||||
strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id)));
|
||||
MESA_htable_search(g_kni_handle->traceid2pme_htable, (const unsigned char*)pmeinfo->stream_traceid,
|
||||
strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)));
|
||||
return APP_STATE_DROPPKT | APP_STATE_DROPME;
|
||||
case KNI_ACTION_BYPASS:
|
||||
KNI_LOG_DEBUG(logger, "set tfe_release = 1, stream_trace_id is %s", pmeinfo->stream_traceid);
|
||||
pmeinfo->tfe_release = 1;
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
//will not happen, close has only 2 action:
|
||||
//stream has only syn, ack. not data. do not send to tfe
|
||||
default:
|
||||
char action_str[KNI_SYMBOL_MAX];
|
||||
kni_maat_action_trans(pmeinfo->action, action_str);
|
||||
KNI_LOG_ERROR(logger, "close_opstate: action %d(%s) is abnormal",
|
||||
pmeinfo->action, action_str);
|
||||
pmeinfo->error = STREAM_ERROR_NO_DATA;
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_STM_NO_DATA], 0, FS_OP_ADD, 1);
|
||||
KNI_LOG_DEBUG(logger, "close_opstate: action %d(%s) is abnormal, stream_traceid is %s",
|
||||
pmeinfo->action, action_str, pmeinfo->stream_traceid);
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
}
|
||||
|
||||
//from syn
|
||||
extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, int thread_seq, const void* a_packet){
|
||||
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1);
|
||||
int ret, key_size;
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_PKT], 0, FS_OP_ADD, 1);
|
||||
int ret;
|
||||
struct pme_info *pmeinfo = *(struct pme_info **)pme;
|
||||
//pktinfo
|
||||
struct pkt_info pktinfo;
|
||||
memset(&pktinfo, 0, sizeof(pktinfo));
|
||||
//TODO: ipv6
|
||||
if(stream->addr.addrtype == ADDR_TYPE_IPV6){
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
//a_packet == NULL, continue
|
||||
if(a_packet == NULL){
|
||||
|
||||
//a_packet == NULL && not op_state_close, continue
|
||||
//close: a_packet may be null, if a_packet = null, do not send to tfe
|
||||
if(a_packet == NULL && stream->pktstate != OP_STATE_CLOSE){
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_NULL_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
}
|
||||
memset(&pktinfo, 0, sizeof(struct pkt_info));
|
||||
pktinfo.iphdr = (struct iphdr*)a_packet;
|
||||
pktinfo.iphdr_len = pktinfo.iphdr->ihl * 4;
|
||||
pktinfo.ip_totlen = ntohs(pktinfo.iphdr->tot_len);
|
||||
pktinfo.tcphdr = (struct tcphdr*)((char*)pktinfo.iphdr + pktinfo.iphdr_len);
|
||||
pktinfo.tcphdr_len = pktinfo.tcphdr->doff * 4;
|
||||
pktinfo.data = (char*)pktinfo.tcphdr + pktinfo.tcphdr_len;
|
||||
pktinfo.data_len = pktinfo.ip_totlen - pktinfo.iphdr_len - pktinfo.tcphdr_len;
|
||||
|
||||
/* for debug
|
||||
if(stream->pktstate == 2){
|
||||
printf("stream->pktstate is %d\n", stream->pktstate);
|
||||
}
|
||||
return APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
*/
|
||||
|
||||
if(a_packet != NULL){
|
||||
pktinfo.iphdr = (struct iphdr*)a_packet;
|
||||
pktinfo.iphdr_len = pktinfo.iphdr->ihl * 4;
|
||||
pktinfo.ip_totlen = ntohs(pktinfo.iphdr->tot_len);
|
||||
pktinfo.tcphdr = (struct tcphdr*)((char*)pktinfo.iphdr + pktinfo.iphdr_len);
|
||||
pktinfo.tcphdr_len = pktinfo.tcphdr->doff * 4;
|
||||
pktinfo.data = (char*)pktinfo.tcphdr + pktinfo.tcphdr_len;
|
||||
pktinfo.data_len = pktinfo.ip_totlen - pktinfo.iphdr_len - pktinfo.tcphdr_len;
|
||||
}
|
||||
|
||||
switch(stream->pktstate){
|
||||
case OP_STATE_PENDING:
|
||||
pmeinfo = pme_info_new(stream, thread_seq);
|
||||
*pme = pmeinfo = pme_info_new(stream, thread_seq);
|
||||
if(pmeinfo == NULL){
|
||||
KNI_LOG_ERROR(logger, "Failed at new pmeinfo");
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
*pme = pmeinfo;
|
||||
key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id));
|
||||
ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_trace_id),
|
||||
key_size, (const void*)pmeinfo);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add,"
|
||||
"table is traceid2pme_htable, key is %s", pmeinfo->stream_trace_id);
|
||||
pme_info_destroy(pmeinfo);
|
||||
return APP_STATE_FAWPKT | APP_STATE_DROPME;
|
||||
}
|
||||
//KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add, table is traceid2pme_htable, key is %s, key_size is %d",
|
||||
// pmeinfo->stream_trace_id, key_size);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW], 0, FS_OP_ADD, 1);
|
||||
ret = pending_opstate(stream, pmeinfo, &pktinfo);
|
||||
if(pmeinfo->error < 0){
|
||||
goto error_out;
|
||||
@@ -781,40 +814,28 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in
|
||||
case OP_STATE_CLOSE:
|
||||
//sapp stream close
|
||||
ret = close_opstate(stream, pmeinfo, &pktinfo, thread_seq);
|
||||
if(pmeinfo->error < 0){
|
||||
goto error_out;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
ret = APP_STATE_FAWPKT | APP_STATE_GIVEME;
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
//FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1);
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP], 0, FS_OP_ADD, 1);
|
||||
KNI_LOG_ERROR(logger, "Unknown stream opstate %d", stream->pktstate);
|
||||
KNI_LOG_ERROR(logger, "Unknown stream opstate %d, stream traceid is %s", stream->pktstate, pmeinfo->stream_traceid);
|
||||
break;
|
||||
}
|
||||
//close opstate or data_opstate(tfe_release = 1), need sendlog
|
||||
//sapp release: bypass or intercept
|
||||
if((ret & APP_STATE_DROPME)){
|
||||
//lock
|
||||
pthread_mutex_lock(&(pmeinfo->lock));
|
||||
pmeinfo->sapp_release = 1;
|
||||
int ret = judge_pme_destroy(pmeinfo);
|
||||
if(ret < 0){
|
||||
pthread_mutex_unlock(&(pmeinfo->lock));
|
||||
}
|
||||
judge_pme_destroy(pmeinfo, CALLER_SAPP);
|
||||
}
|
||||
return ret;
|
||||
|
||||
//error: so del htable, pmeinfo destroy. do not sendlog
|
||||
//error out: no hash, no sendlog, just destroy_pme
|
||||
error_out:
|
||||
KNI_LOG_ERROR(logger, "stream error = %d, ret is %d", pmeinfo->error, ret);
|
||||
KNI_LOG_DEBUG(logger, "stream error = %d, ret is %d, stream traceid is %s", pmeinfo->error, ret, pmeinfo->stream_traceid);
|
||||
if(pmeinfo != NULL){
|
||||
key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id));
|
||||
int ret1 = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id, key_size, pme_info_destroy);
|
||||
if(ret1 < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d",
|
||||
"traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret1);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d",
|
||||
"traceid2pme_htable", pmeinfo->stream_trace_id, key_size);
|
||||
}
|
||||
pme_info_destroy(pmeinfo);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@@ -882,34 +903,37 @@ void* thread_tfe_data_receiver(void *args){
|
||||
int thread_seq = 0;
|
||||
while(true){
|
||||
//receive from tfe
|
||||
int ret = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst);
|
||||
if(ret <= 0){
|
||||
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst);
|
||||
if(nr_recv <= 0){
|
||||
continue;
|
||||
}
|
||||
//tag
|
||||
struct mr_tunnat_ctrlzone mr_ctrlzone;
|
||||
mr_ctrlzone.action |= TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER;
|
||||
for(int i = 0; i < ret; i++){
|
||||
mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER);
|
||||
for(int i = 0; i < nr_recv; i++){
|
||||
marsio_buff_ctrlzone_set(rx_buff[i], 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone));
|
||||
}
|
||||
//send to vxlan
|
||||
marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, 1, MARSIO_SEND_OPT_FAST);
|
||||
marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, nr_recv, MARSIO_SEND_OPT_FAST);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type, uint16_t value_size_max, void *logger){
|
||||
static int wrapped_kni_cmsg_get(struct pme_info *pmeinfo, struct kni_cmsg *cmsg, uint16_t type,
|
||||
uint16_t value_size_max, void *logger){
|
||||
uint16_t value_size = 0;
|
||||
unsigned char *value = NULL;
|
||||
int ret = kni_cmsg_get(cmsg, type, &value_size, &value);
|
||||
if(ret < 0){
|
||||
if(ret == KNI_CMSG_INVALID_TYPE){
|
||||
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", type, ret);
|
||||
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d, stream traceid is %s",
|
||||
type, ret, pmeinfo->stream_traceid);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
if(value_size > value_size_max){
|
||||
KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %d, size is %d, which should <= %d", type, value_size, value_size_max);
|
||||
KNI_LOG_ERROR(logger, "kni_cmsg_get: type is %d, size is %d, which should <= %d, stream traceid is %s",
|
||||
type, value_size, value_size_max, pmeinfo->stream_traceid);
|
||||
return -1;
|
||||
}
|
||||
switch(type)
|
||||
@@ -959,17 +983,8 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size
|
||||
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_CERT_VERIFY, sizeof(pmeinfo->ssl_cert_verify), logger);
|
||||
wrapped_kni_cmsg_get(pmeinfo, cmsg, TFE_CMSG_SSL_ERROR, sizeof(pmeinfo->ssl_error), logger);
|
||||
pmeinfo->end_time = time(NULL);
|
||||
int key_size = strnlen(pmeinfo->stream_trace_id, sizeof(pmeinfo->stream_trace_id));
|
||||
int ret = MESA_htable_del(g_kni_handle->traceid2pme_htable, (const unsigned char *)pmeinfo->stream_trace_id,
|
||||
key_size, NULL);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_htable: failed at del, table is %s, key is %s, key_size is %d, ret is %d",
|
||||
"traceid2pme_htable", pmeinfo->stream_trace_id, key_size, ret);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_DEBUG(logger, "MESA_htable: succeed at del, table is %s, key is %s, key_size is %d",
|
||||
"traceid2pme_htable", pmeinfo->stream_trace_id, key_size);
|
||||
}
|
||||
KNI_LOG_INFO(logger, "recv cmsg from tfe, stream traceid is %s", pmeinfo->stream_traceid);
|
||||
judge_pme_destroy(pmeinfo, CALLER_TFE);
|
||||
}
|
||||
kni_cmsg_destroy(cmsg);
|
||||
return 0;
|
||||
@@ -1037,10 +1052,10 @@ void* thread_tfe_cmsg_receiver(void *args){
|
||||
KNI_LOG_ERROR(logger, "Failed at deserialize cmsg, ret is %d", ret);
|
||||
continue;
|
||||
}
|
||||
//get stream_trace_id
|
||||
unsigned char *stream_trace_id = NULL;
|
||||
//get stream_traceid
|
||||
unsigned char *stream_traceid = NULL;
|
||||
uint16_t value_size;
|
||||
ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_trace_id);
|
||||
ret = kni_cmsg_get(cmsg, TFE_CMSG_STREAM_TRACE_ID, &value_size, &stream_traceid);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at kni_cmsg_get: type is %d, ret is %d", TFE_CMSG_STREAM_TRACE_ID, ret);
|
||||
continue;
|
||||
@@ -1051,7 +1066,7 @@ void* thread_tfe_cmsg_receiver(void *args){
|
||||
memset((void*)&cb_args, 0, sizeof(cb_args));
|
||||
cb_args.cmsg = cmsg;
|
||||
cb_args.logger = logger;
|
||||
MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_trace_id,
|
||||
MESA_htable_search_cb(g_kni_handle->traceid2pme_htable, (const unsigned char *)stream_traceid,
|
||||
value_size, traceid2pme_htable_search_cb, &cb_args, &cb_ret);
|
||||
}
|
||||
return NULL;
|
||||
@@ -1223,7 +1238,7 @@ static struct kni_field_stat_handle * fs_init(const char *profile){
|
||||
FS_set_para(handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
|
||||
fs_handle = ALLOC(struct kni_field_stat_handle, 1);
|
||||
fs_handle->handle = handle;
|
||||
fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt");
|
||||
//fs_handle->fields[KNI_FIELD_TOT_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_pkt");
|
||||
fs_handle->fields[KNI_FIELD_BYP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_pkt");
|
||||
fs_handle->fields[KNI_FIELD_INTCP_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_pkt");
|
||||
fs_handle->fields[KNI_FIELD_IPV6_PKT] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6_pkt");
|
||||
@@ -1231,7 +1246,7 @@ static struct kni_field_stat_handle * fs_init(const char *profile){
|
||||
fs_handle->fields[KNI_FIELD_NO_SYN_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_syn_pkt");
|
||||
fs_handle->fields[KNI_FIELD_UNKNOWN_STATE_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_state");
|
||||
fs_handle->fields[KNI_FIELD_NO_SA_EXP] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "no_s/a_pkt");
|
||||
fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm");
|
||||
//fs_handle->fields[KNI_FIELD_TOT_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "tot_stm");
|
||||
fs_handle->fields[KNI_FIELD_BYP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "byp_stm");
|
||||
fs_handle->fields[KNI_FIELD_INTCP_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "intcp_stm");
|
||||
fs_handle->fields[KNI_FIELD_SSL_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ssl_stm");
|
||||
@@ -1239,6 +1254,13 @@ static struct kni_field_stat_handle * fs_init(const char *profile){
|
||||
fs_handle->fields[KNI_FIELD_SENDLOG_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_succ");
|
||||
fs_handle->fields[KNI_FIELD_SENDLOG_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendlog_fail");
|
||||
fs_handle->fields[KNI_FIELD_UNKNOWN_STM] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "unknown_stm");
|
||||
fs_handle->fields[KNI_FIELD_STM_NO_DATA] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "stm_no_data");
|
||||
fs_handle->fields[KNI_FIELD_PME_NEW] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_new");
|
||||
fs_handle->fields[KNI_FIELD_PME_FREE] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "pme_free");
|
||||
fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_succ");
|
||||
fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_fail");
|
||||
fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_succ");
|
||||
fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_fail");
|
||||
fs_handle->handle = handle;
|
||||
FS_start(handle);
|
||||
return fs_handle;
|
||||
@@ -1257,29 +1279,14 @@ extern "C" void kni_destroy(struct kni_handle *handle){
|
||||
handle = NULL;
|
||||
}
|
||||
|
||||
static void traceid2pme_htable_data_free_cb(void *data){
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
KNI_LOG_DEBUG(logger, "call traceid2pme_htable_data_free_cb");
|
||||
struct pme_info *pmeinfo = (struct pme_info*)data;
|
||||
pthread_mutex_lock(&(pmeinfo->lock));
|
||||
pmeinfo->tfe_release = 1;
|
||||
int ret = judge_pme_destroy(pmeinfo);
|
||||
if(ret < 0){
|
||||
pthread_mutex_unlock(&(pmeinfo->lock));
|
||||
}
|
||||
}
|
||||
|
||||
//eliminate_type: 0:FIFO; 1:LRU
|
||||
//ret: 1: the item can be eliminated; 0: the item can't be eliminated
|
||||
static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){
|
||||
void *logger = g_kni_handle->local_logger;
|
||||
struct pme_info *pmeinfo = (struct pme_info*)data;
|
||||
if(pmeinfo->sapp_release == 0){
|
||||
KNI_LOG_DEBUG(logger, "Failed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release);
|
||||
return 0;
|
||||
if(pmeinfo->sapp_release == 1){
|
||||
judge_pme_destroy(pmeinfo, CALLER_TFE);
|
||||
}
|
||||
KNI_LOG_DEBUG(logger, "Succeed at eliminate pmeinfo, sapp_release is %d", pmeinfo->sapp_release);
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -1408,7 +1415,7 @@ extern "C" int kni_init(){
|
||||
g_kni_handle->send_logger = send_logger;
|
||||
|
||||
//init traceid2pme_htable
|
||||
traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", (void*)traceid2pme_htable_data_free_cb,
|
||||
traceid2pme_htable = kni_create_htable(profile, "traceid2pme_htable", NULL,
|
||||
(void*)traceid2pme_htable_expire_notify_cb, local_logger);
|
||||
if(traceid2pme_htable == NULL){
|
||||
KNI_LOG_ERROR(local_logger, "Failed at create traceid2pme_htable");
|
||||
|
||||
Reference in New Issue
Block a user