不再数据处理线程将数据write到tun中,而是拷贝到队列中,由单独线程处理。本版本目前在性能测试过程中用于调试对比,目前不加入主版本中

This commit is contained in:
liuyang
2018-11-23 20:41:03 +08:00
parent 1528dbc104
commit 9117a85409
6 changed files with 178 additions and 33 deletions

View File

@@ -16,26 +16,32 @@
const char *g_kni_fs2_name[FS2_COLUMN_NUM] = const char *g_kni_fs2_name[FS2_COLUMN_NUM] =
{ {
"recv_from_sapp", "tcp/udp_entry",
"send_to_tun", "write_tun",
"read_from_tun", "read_tun",
"send_to_sapp", "send_masio",
"whitelist_ip", "whitelist_ip",
"whitelist_domain", "whitelist_domain",
"http_protocol", "http_protocol",
"ssl_protocol", "ssl_protocol",
"droppkt_other", "droppkt_other",
"client_hello_pkt", "client_hello",
"ssl_sni_pkt", "ssl_sni",
"drop_sapp_ipv6", "ipv6_option",
"drop_tun_htable", "not_in_htable",
"tcprepair_total", "tcprepair_total",
"tcprepair_error", "tcprepair_error",
"send_tcprepair_succ", "send_fds_succ",
"send_tcprepair_error", "send_fds_error",
"pengding", "pengding",
"close_timeout", "close_timeout",
"close_fin" "close_fin",
"add_lqueue_succ",
"add_lqueue_err",
"get_lqueue_succ",
"get_lqueue_err",
"write_tun_succ",
"write_tun_err"
}; };

View File

@@ -7,7 +7,7 @@
#endif #endif
#define FS2_COLUMN_NUM 20 #define FS2_COLUMN_NUM 26
#define FS2_APPNAME "KNI" #define FS2_APPNAME "KNI"
@@ -33,6 +33,12 @@ enum kni_FS_COLUME
FS2_COLUME_PENDING, FS2_COLUME_PENDING,
FS2_COLUME_CLOSE_TIMEOUT, FS2_COLUME_CLOSE_TIMEOUT,
FS2_COLUME_CLOSE_FIN, FS2_COLUME_CLOSE_FIN,
COLUME_ADD_LQUEUE_SUCC,
COLUME_ADD_LQUEUE_ERR,
COLUME_GET_LQUEUE_SUCC,
COLUME_GET_LQUEUE_ERR,
COLUME_WRITE_TUN_SUCC,
COLUME_WRITE_TUN_ERR
}; };

View File

@@ -160,7 +160,7 @@ int kni_sendfds_domain()
datainfo_len = sizeof(datainfo); datainfo_len = sizeof(datainfo);
ret=MESA_lqueue_get_tail(g_kni_structinfo.lqueue_for_domain,&datainfo,&datainfo_len); ret=MESA_lqueue_get_tail(g_kni_structinfo.lqueue_send_fds,&datainfo,&datainfo_len);
if(ret==MESA_QUEUE_RET_QEMPTY) if(ret==MESA_QUEUE_RET_QEMPTY)
{ {
continue; continue;
@@ -411,8 +411,8 @@ int tun_read_data(int fd,char* recv_buf,int max_buflen)
FD_SET(fd, &alive_readfd); FD_SET(fd, &alive_readfd);
max_fd = fd; max_fd = fd;
// ret = select(max_fd + 1, &alive_readfd, NULL, NULL, &timeout); ret = select(max_fd + 1, &alive_readfd, NULL, NULL, &timeout);
ret = select(max_fd + 1, &alive_readfd, NULL, NULL, NULL); // ret = select(max_fd + 1, &alive_readfd, NULL, NULL, NULL);
if (ret < 0) if (ret < 0)
{ {
// MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL, "keep_alive_action function", "select function errno %d is %s!", errno, strerror(errno)); // MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL, "keep_alive_action function", "select function errno %d is %s!", errno, strerror(errno));
@@ -434,6 +434,74 @@ int tun_read_data(int fd,char* recv_buf,int max_buflen)
} }
int kni_add_lqueue(int addrtype,int thread_seq,char* send_buf,int send_buflen)
{
int ret = 0;
struct kni_lqueue_writetun datainfo;
datainfo.addr_type = addrtype;
datainfo.buflen = send_buflen;
datainfo.buf = (char*)malloc(send_buflen);
memcpy(datainfo.buf,send_buf,send_buflen);
ret=MESA_lqueue_join_head(g_kni_structinfo.lqueue_write_tun[thread_seq],(void*)&datainfo,sizeof(datainfo));
if(ret <0)
{
kni_filestate2_set(thread_seq,COLUME_ADD_LQUEUE_ERR,0,1);
MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_SENDFD,"kni_add_lqueue() error,ret:%d",ret);
return -1;
}
kni_filestate2_set(thread_seq,COLUME_ADD_LQUEUE_SUCC,0,1);
return 0;
}
char tun_write_data(int fd,char* send_buf,int send_buflen,int thread_seq)
{
char ret=APP_STATE_DROPPKT|APP_STATE_GIVEME;
int succ_sendlen=0;
struct timespec start, end;
long elapse=0;
clock_gettime(CLOCK_MONOTONIC, &start);
succ_sendlen = write(fd, send_buf,send_buflen);
if(succ_sendlen<0)
{
kni_filestate2_set(thread_seq,COLUME_WRITE_TUN_ERR,0,1);
kni_filestate2_set(thread_seq,FS2_COLUME_DROPPKT,0,1);
MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_WRITETUN,"write() error %d, %s",errno,strerror(errno));
ret=APP_STATE_DROPPKT|APP_STATE_DROPME;
}
else if(succ_sendlen<send_buflen)
{
kni_filestate2_set(thread_seq,COLUME_WRITE_TUN_ERR,0,1);
MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_WRITETUN,"succ_sendlen is %d,send_buflen is %d",succ_sendlen,send_buflen);
}
else
{
kni_filestate2_set(thread_seq,COLUME_WRITE_TUN_SUCC,0,1);
}
kni_filestate2_set(thread_seq,FS2_COLUME_WRITE,0,1);
clock_gettime(CLOCK_MONOTONIC, &end);
elapse=(end.tv_sec-start.tv_sec)*1000000+(end.tv_nsec-start.tv_nsec)/1000;
FS_operate(g_kni_fs2_info.handler, g_kni_fs2_info.metric_tun_write, 0, FS_OP_SET, elapse);
return ret;
}
/*
char tun_write_data(int fd,char* send_buf,int send_buflen,struct streaminfo* pstream,int thread_seq) char tun_write_data(int fd,char* send_buf,int send_buflen,struct streaminfo* pstream,int thread_seq)
{ {
char ret=APP_STATE_DROPPKT|APP_STATE_GIVEME; char ret=APP_STATE_DROPPKT|APP_STATE_GIVEME;
@@ -466,7 +534,7 @@ char tun_write_data(int fd,char* send_buf,int send_buflen,struct streaminfo* pst
return ret; return ret;
} }
*/
int tun_write_data_v6(int fd,char* send_buf,int send_buflen) int tun_write_data_v6(int fd,char* send_buf,int send_buflen)
@@ -665,7 +733,10 @@ int kni_keepalive_replay_v6(struct stream_tuple4_v6* ipv6_addr,int iprever_flag,
sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_TCP,htons(ipv6_hdr->ip6_payload_len)); sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_TCP,htons(ipv6_hdr->ip6_payload_len));
// sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_IP,sizeof(struct kni_ipv6_hdr)); // sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_IP,sizeof(struct kni_ipv6_hdr));
tun_write_data(g_kni_comminfo.fd_tun[thread_seq],sendbuf,iplen,NULL,thread_seq); // tun_write_data(g_kni_comminfo.fd_tun[thread_seq],sendbuf,iplen,NULL,thread_seq);
tun_write_data(g_kni_comminfo.fd_tun[thread_seq],sendbuf,iplen,thread_seq);
kni_log_debug(RLOG_LV_DEBUG,(char*)"win_update",a_packet,(char*)"recv tcp_repair windows update,and replay"); kni_log_debug(RLOG_LV_DEBUG,(char*)"win_update",a_packet,(char*)"recv tcp_repair windows update,and replay");
@@ -725,7 +796,8 @@ int kni_keepalive_replay(struct stream_tuple4_v4* ipv4_addr,int iprever_flag,str
sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_TCP,(iplen-4*(iphdr->ip_hl))); sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_TCP,(iplen-4*(iphdr->ip_hl)));
sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_IP,sizeof(struct ip)); sendpacket_do_checksum((unsigned char*)sendbuf,IPPROTO_IP,sizeof(struct ip));
tun_write_data(g_kni_comminfo.fd_tun[thread_seq],sendbuf,iplen,NULL,thread_seq); // tun_write_data(g_kni_comminfo.fd_tun[thread_seq],sendbuf,iplen,NULL,thread_seq);
tun_write_data(g_kni_comminfo.fd_tun[thread_seq],sendbuf,iplen,thread_seq);
kni_log_debug(RLOG_LV_DEBUG,(char*)"win_update",a_packet,(char*)"recv tcp_repair windows update,and replay"); kni_log_debug(RLOG_LV_DEBUG,(char*)"win_update",a_packet,(char*)"recv tcp_repair windows update,and replay");
@@ -742,12 +814,6 @@ int kni_keepalive_replay(struct stream_tuple4_v4* ipv4_addr,int iprever_flag,str
long kni_readtun_htable_cb_v6(void* data,const unsigned char* key,unsigned int size,void* user_arg) long kni_readtun_htable_cb_v6(void* data,const unsigned char* key,unsigned int size,void* user_arg)
{ {
long result=0; long result=0;
@@ -955,6 +1021,7 @@ return:
*********************************************************************************************************************/ *********************************************************************************************************************/
void* kni_read_tun(void* arg) void* kni_read_tun(void* arg)
{ {
int ret = 0;
int thread_seq=*(int*)arg; int thread_seq=*(int*)arg;
int recv_len=0; int recv_len=0;
char recv_buf[KNI_MAX_BUFLEN] = {0}; char recv_buf[KNI_MAX_BUFLEN] = {0};
@@ -965,6 +1032,9 @@ void* kni_read_tun(void* arg)
sapp_get_platform_opt(SPO_INDEPENDENT_THREAD_ID,&sendpkt_threadid,&sendpkt_threadid_len); sapp_get_platform_opt(SPO_INDEPENDENT_THREAD_ID,&sendpkt_threadid,&sendpkt_threadid_len);
struct timespec start, end; struct timespec start, end;
struct kni_lqueue_writetun datainfo;
long datainfo_len = sizeof(datainfo);
while(1) while(1)
{ {
if(g_kni_comminfo.kni_mode_cur==KNI_MODE_BYPASS) if(g_kni_comminfo.kni_mode_cur==KNI_MODE_BYPASS)
@@ -972,7 +1042,31 @@ void* kni_read_tun(void* arg)
sleep(KNI_USLEEP_TIME); sleep(KNI_USLEEP_TIME);
continue; continue;
} }
//write to run
datainfo_len = sizeof(datainfo);
ret=MESA_lqueue_get_tail(g_kni_structinfo.lqueue_write_tun[thread_seq],&datainfo,&datainfo_len);
if(ret==MESA_QUEUE_RET_QEMPTY)
{
}
else if(ret<0)
{
kni_filestate2_set(thread_seq,COLUME_GET_LQUEUE_ERR,0,1);
MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_READTUN,"MESA_lqueue_try_get_tail() error!ret:%d,datalen:%d\n",ret,datainfo_len);
}
else
{
kni_filestate2_set(thread_seq,COLUME_GET_LQUEUE_SUCC,0,1);
tun_write_data(g_kni_comminfo.fd_tun[thread_seq],datainfo.buf,datainfo.buflen,thread_seq);
free(datainfo.buf);
datainfo.buf = NULL;
}
//end
//read from tun
clock_gettime(CLOCK_MONOTONIC, &start); clock_gettime(CLOCK_MONOTONIC, &start);
recv_len=0; recv_len=0;
memset(recv_buf,0,KNI_MAX_BUFLEN); memset(recv_buf,0,KNI_MAX_BUFLEN);
@@ -997,6 +1091,7 @@ void* kni_read_tun(void* arg)
FS_operate(g_kni_fs2_info.handler, g_kni_fs2_info.metric_forward, 0, FS_OP_SET, elapse); FS_operate(g_kni_fs2_info.handler, g_kni_fs2_info.metric_forward, 0, FS_OP_SET, elapse);
} }
//end
} }
return 0; return 0;
@@ -1360,7 +1455,7 @@ int tcp_repair_process(const struct streaminfo* pstream,const void* a_packet,str
datainfo.keyring = pmeinfo->keyring_id; datainfo.keyring = pmeinfo->keyring_id;
ret=MESA_lqueue_join_head(g_kni_structinfo.lqueue_for_domain,(void*)&datainfo,sizeof(datainfo)); ret=MESA_lqueue_join_head(g_kni_structinfo.lqueue_send_fds,(void*)&datainfo,sizeof(datainfo));
if(ret <0) if(ret <0)
{ {
kni_filestate2_set(pstream->threadnum,FS2_COLUME_TCPREPAIR_ERROR,0,2); kni_filestate2_set(pstream->threadnum,FS2_COLUME_TCPREPAIR_ERROR,0,2);

View File

@@ -5,7 +5,8 @@
#define KNI_SENDFD_NUM 2 #define KNI_SENDFD_NUM 2
int kni_send_fds(int socket, int *fds, int n,int protocol); int kni_send_fds(int socket, int *fds, int n,int protocol);
char tun_write_data(int fd,char* send_buf,int send_buflen,struct streaminfo* pstream,int thread_seq); //char tun_write_data(int fd,char* send_buf,int send_buflen,struct streaminfo* pstream,int thread_seq);
char tun_write_data(int fd,char* send_buf,int send_buflen,int thread_seq);
int init_domain_fd(); int init_domain_fd();
@@ -13,6 +14,7 @@ int init_kni_domain();
int init_kni_tun(); int init_kni_tun();
void* kni_read_tun(void* arg); void* kni_read_tun(void* arg);
int kni_add_lqueue(int addrtype,int thread_seq,char* send_buf,int send_buflen);
int tcp_repair_process(const struct streaminfo* pstream,const void* a_packet,struct kni_pme_info* pmeinfo,int protocol); int tcp_repair_process(const struct streaminfo* pstream,const void* a_packet,struct kni_pme_info* pmeinfo,int protocol);

View File

@@ -10,7 +10,7 @@
int g_kni_version_VERSION_20181122_htable_add; int g_kni_version_VERSION_20181123_test;
struct kni_var_comm g_kni_comminfo; struct kni_var_comm g_kni_comminfo;
struct kni_var_struct g_kni_structinfo; struct kni_var_struct g_kni_structinfo;
@@ -548,7 +548,8 @@ char kni_pending_opstate(const struct streaminfo* pstream,struct kni_pme_info* p
ret=kni_first_tcpdata(pstream,a_packet,pmeinfo,data,datalen); ret=kni_first_tcpdata(pstream,a_packet,pmeinfo,data,datalen);
if((pmeinfo->protocol==KNI_FLAG_HTTP) ||(pmeinfo->protocol==KNI_FLAG_SSL)) if((pmeinfo->protocol==KNI_FLAG_HTTP) ||(pmeinfo->protocol==KNI_FLAG_SSL))
{ {
ret=tun_write_data(g_kni_comminfo.fd_tun[thread_seq],(char*)ipv4_hdr,iplen,(struct streaminfo*)pstream,thread_seq); kni_add_lqueue(ADDR_TYPE_IPV4,thread_seq,(char*)ipv4_hdr,iplen);
// ret=tun_write_data(g_kni_comminfo.fd_tun[thread_seq],(char*)ipv4_hdr,iplen,(struct streaminfo*)pstream,thread_seq);
} }
} }
#ifndef KNI_DEBUG_TCPREPAIR #ifndef KNI_DEBUG_TCPREPAIR
@@ -632,7 +633,8 @@ char kni_data_opstate(const struct streaminfo* pstream,struct kni_pme_info* pmei
if((pmeinfo->action == KNI_ACTION_MONITOR) && ((pmeinfo->protocol==KNI_FLAG_HTTP)||(pmeinfo->protocol==KNI_FLAG_SSL))) if((pmeinfo->action == KNI_ACTION_MONITOR) && ((pmeinfo->protocol==KNI_FLAG_HTTP)||(pmeinfo->protocol==KNI_FLAG_SSL)))
{ {
ret=tun_write_data(g_kni_comminfo.fd_tun[thread_seq],(char*)a_packet,iplen,(struct streaminfo*)pstream,thread_seq); kni_add_lqueue(ADDR_TYPE_IPV4,thread_seq,(char*)a_packet,iplen);
// ret=tun_write_data(g_kni_comminfo.fd_tun[thread_seq],(char*)a_packet,iplen,(struct streaminfo*)pstream,thread_seq);
} }
else if(pmeinfo->action == KNI_ACTION_RATELIMIT) else if(pmeinfo->action == KNI_ACTION_RATELIMIT)
{ {
@@ -1279,6 +1281,31 @@ int init_kni_sendpkt()
} }
int kni_init_lqueue()
{
int i=0;
g_kni_structinfo.lqueue_send_fds=MESA_lqueue_create(KNI_THREAD_SAFE,KNI_LQUEUE_MAXNUM);
if(g_kni_structinfo.lqueue_send_fds==NULL)
{
MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"MESA_lqueue_create() error for lqueue_send_fds,action:%s",KNI_ACTION_EXIT);
return -1;
}
for(i=0;i<g_iThreadNum;i++)
{
g_kni_structinfo.lqueue_write_tun[i] = MESA_lqueue_create(0,KNI_LQUEUE_MAXNUM);
if(g_kni_structinfo.lqueue_write_tun[i] == NULL)
{
MESA_handle_runtime_log(g_kni_comminfo.logger, RLOG_LV_FATAL,KNI_MODULE_INIT,"MESA_lqueue_create() error for lqueue_write_tun,thread_num:%d,action:%s",i,KNI_ACTION_EXIT);
return -1;
}
}
return 0;
}
extern "C" char kni_init() extern "C" char kni_init()
{ {
int ret=0; int ret=0;
@@ -1320,12 +1347,13 @@ extern "C" char kni_init()
return -1; return -1;
} }
g_kni_structinfo.lqueue_for_domain=MESA_lqueue_create(KNI_THREAD_SAFE,KNI_LQUEUE_MAXNUM);
if(g_kni_structinfo.lqueue_for_domain==NULL) ret = kni_init_lqueue();
if(ret<0)
{ {
printf("MESA_lqueue_create() error!\n");
return -1; return -1;
} }
ret = init_kni_tun(); ret = init_kni_tun();

View File

@@ -220,6 +220,13 @@ struct kni_lqueue_datainfo
int keyring; int keyring;
}; };
struct kni_lqueue_writetun
{
int addr_type;
int buflen;
char* buf;
};
enum kni_flag enum kni_flag
@@ -298,7 +305,8 @@ struct kni_var_struct
MESA_htable_handle htable_to_tun_v4; MESA_htable_handle htable_to_tun_v4;
MESA_htable_handle htable_to_tun_v6; MESA_htable_handle htable_to_tun_v6;
MESA_htable_handle htable_to_io_v6; MESA_htable_handle htable_to_io_v6;
MESA_lqueue_head lqueue_for_domain; MESA_lqueue_head lqueue_send_fds;
MESA_lqueue_head lqueue_write_tun[KNI_MAX_THREADNUM];
}; };
//maat //maat