修改和tfe保活功能的bug

* 修改thread_tfe_keepalive函数recv失败的处理逻辑
  * bind函数放到初始化线程,防止bind失败仍然继续运行
  * 增加SO_REUSEADDR选项保证SAPP挂掉后再TIME_WAIT状态下能够快速恢复
  * 增加tfe状态的fs2统计
This commit is contained in:
崔一鸣
2019-06-18 17:37:43 +08:00
parent 0da2c833fe
commit 7e3ae597c4
4 changed files with 119 additions and 61 deletions

View File

@@ -97,6 +97,7 @@ enum kni_field{
KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC, KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC,
KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL, KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL,
KNI_FIELD_EXCEED_MTU, KNI_FIELD_EXCEED_MTU,
KNI_FIELD_TFE_STATUS_BASE,
}; };
struct kni_field_stat_handle{ struct kni_field_stat_handle{

View File

@@ -29,16 +29,19 @@ src_mac_addr = 00:0e:c6:d6:72:c1
[tfe0] [tfe0]
mac_addr = fe:65:b7:03:50:bd mac_addr = fe:65:b7:03:50:bd
dev_eth_symbol = ens1f5 dev_eth_symbol = ens1f5
ip_addr =
keepalive_listen_port = 2476 keepalive_listen_port = 2476
[tfe1] [tfe1]
mac_addr = fe:65:b7:03:50:bd mac_addr = fe:65:b7:03:50:bd
dev_eth_symbol = eth8 dev_eth_symbol = eth8
ip_addr =
keepalive_listen_port = 2477 keepalive_listen_port = 2477
[tfe2] [tfe2]
mac_addr = fe:65:b7:03:50:bd mac_addr = fe:65:b7:03:50:bd
dev_eth_symbol = eth9 dev_eth_symbol = eth9
ip_addr =
keepalive_listen_port = 2478 keepalive_listen_port = 2478
[field_stat] [field_stat]
@@ -79,5 +82,7 @@ mho_eliminate_type = LRU
[tfe_mgr] [tfe_mgr]
keepalive_switch = 1 keepalive_switch = 1
keepalive_idle = 2
keepalive_intvl = 1
keepalive_cnt = 3
keepalive_listen_eth = keepalive_listen_eth =

View File

@@ -1632,6 +1632,11 @@ static struct kni_field_stat_handle * fs_init(const char *profile){
fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_fail"); fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_fail");
fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_succ"); fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_succ");
fs_handle->fields[KNI_FIELD_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "exceed_mtu"); fs_handle->fields[KNI_FIELD_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "exceed_mtu");
for(int i = 0; i < g_kni_handle->tfe_count; i++){
char tfe_status[KNI_SYMBOL_MAX] = "";
snprintf(tfe_status, sizeof(tfe_status), "tfe%d", i);
fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + i] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, tfe_status);
}
fs_handle->handle = handle; fs_handle->handle = handle;
FS_start(handle); FS_start(handle);
return fs_handle; return fs_handle;

View File

@@ -5,6 +5,8 @@
#include "kni_utils.h" #include "kni_utils.h"
#include "tfe_mgr.h" #include "tfe_mgr.h"
extern struct kni_field_stat_handle *g_kni_fs_handle;
struct tfe_mgr{ struct tfe_mgr{
pthread_rwlock_t rwlock; pthread_rwlock_t rwlock;
int tfe_alive_node_list[TFE_COUNT_MAX]; int tfe_alive_node_list[TFE_COUNT_MAX];
@@ -16,11 +18,10 @@ struct tfe_mgr{
struct thread_tfe_keepalive_args{ struct thread_tfe_keepalive_args{
struct tfe_mgr* mgr; struct tfe_mgr* mgr;
int sockfd;
int keepalive_idle; int keepalive_idle;
int keepalive_intvl; int keepalive_intvl;
int keepalive_cnt; int keepalive_cnt;
uint32_t listen_ip;
int listen_port;
int tfe_id; int tfe_id;
uint32_t tfe_ipaddr; uint32_t tfe_ipaddr;
}; };
@@ -59,7 +60,9 @@ static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){
for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){ for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){
mgr->tfe_alive_node_list[j] = mgr->tfe_alive_node_list[j + 1]; mgr->tfe_alive_node_list[j] = mgr->tfe_alive_node_list[j + 1];
} }
mgr->tfe_alive_node_count--;
ret = 0; ret = 0;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 0);
goto out; goto out;
out: out:
@@ -83,6 +86,7 @@ static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){
mgr->tfe_alive_node_list[mgr->tfe_alive_node_count] = tfe_id; mgr->tfe_alive_node_list[mgr->tfe_alive_node_count] = tfe_id;
mgr->tfe_alive_node_count++; mgr->tfe_alive_node_count++;
ret = 0; ret = 0;
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 1);
goto out; goto out;
out: out:
@@ -94,68 +98,26 @@ out:
static void* thread_tfe_keepalive(void *args){ static void* thread_tfe_keepalive(void *args){
struct thread_tfe_keepalive_args *_args = (struct thread_tfe_keepalive_args*)args; struct thread_tfe_keepalive_args *_args = (struct thread_tfe_keepalive_args*)args;
struct tfe_mgr *mgr = _args->mgr; struct tfe_mgr *mgr = _args->mgr;
int sockfd = _args->sockfd;
int keepalive_idle = _args->keepalive_idle; int keepalive_idle = _args->keepalive_idle;
int keepalive_intvl = _args->keepalive_intvl; int keepalive_intvl = _args->keepalive_intvl;
int keepalive_cnt = _args->keepalive_cnt; int keepalive_cnt = _args->keepalive_cnt;
uint32_t listen_ip = _args->listen_ip;
int listen_port = _args->listen_port;
int tfe_id = _args->tfe_id; int tfe_id = _args->tfe_id;
uint32_t tfe_ipaddr = _args->tfe_ipaddr; uint32_t tfe_ipaddr = _args->tfe_ipaddr;
void *logger = mgr->logger; void *logger = mgr->logger;
FREE(&args); FREE(&args);
//create socket
struct sockaddr_in server_addr, client_addr;
socklen_t client_addr_len;
int ret;
int client_fd;
uint32_t client_ipaddr;
char client_ipaddr_str[INET_ADDRSTRLEN] = "";
char *errmsg = NULL;
char buff[BUFF_SIZE_MAX];
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//set socketopt
ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//bind
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET; // IPv4
server_addr.sin_addr.s_addr = listen_ip;
server_addr.sin_port = htons(listen_port);
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at bind tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//listen
ret = listen(sockfd, 5);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//accept //accept
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
uint32_t client_ipaddr;
char client_ipaddr_str[INET_ADDRSTRLEN] = "";
int flags, ret, client_fd;
char buff[BUFF_SIZE_MAX];
char *errmsg = NULL;
while(true){ while(true){
client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len); client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len);
if(client_fd < 0){ if(client_fd < 0){
KNI_LOG_ERROR(logger, "Failed at accept, errno is %d, errmsg is %s", errno, strerror(errno)); KNI_LOG_ERROR(logger, "Failed at accept, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
continue; continue;
} }
client_ipaddr = client_addr.sin_addr.s_addr; client_ipaddr = client_addr.sin_addr.s_addr;
@@ -165,19 +127,50 @@ static void* thread_tfe_keepalive(void *args){
close(client_fd); close(client_fd);
continue; continue;
} }
//set socketopt keepalive
flags = 1;
ret = setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt SO_KEEPALIVE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
}
ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
}
ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
}
ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
}
//succeed accpt: add alive node //succeed accpt: add alive node
ret = tfe_mgr_alive_node_add(mgr, tfe_id); ret = tfe_mgr_alive_node_add(mgr, tfe_id);
if(ret < 0){ if(ret < 0){
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
KNI_LOG_ERROR(logger, "Failed at add alive tfe node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); KNI_LOG_ERROR(logger, "Failed at add tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
}
else{
KNI_LOG_ERROR(logger, "Succeed at add tfe alive node, tfe_id is %d", tfe_id);
} }
while(true){ while(true){
ret = recv(client_fd, buff, sizeof(buff), 0); ret = recv(client_fd, buff, sizeof(buff), 0);
if(ret < 0){ if(ret == 0){
//recv fin/rst KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id);
if(errno == ECONNREFUSED){ break;
}
if(ret <= 0){
if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id);
continue;
}
if(errno == ETIMEDOUT){
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id);
break; break;
} }
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id);
goto error_out;
} }
} }
//recv fin: del alive node //recv fin: del alive node
@@ -185,7 +178,10 @@ static void* thread_tfe_keepalive(void *args){
ret = tfe_mgr_alive_node_del(mgr, tfe_id); ret = tfe_mgr_alive_node_del(mgr, tfe_id);
if(ret < 0){ if(ret < 0){
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
KNI_LOG_ERROR(logger, "Failed at del alive tfe node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
}
else{
KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id);
} }
} }
return NULL; return NULL;
@@ -194,6 +190,10 @@ error_out:
if(sockfd > 0){ if(sockfd > 0){
close(sockfd); close(sockfd);
} }
if(client_fd > 0){
close(client_fd);
}
KNI_LOG_ERROR(logger, "thread_tfe_keepalive exited, tfe_id is %d", tfe_id);
return NULL; return NULL;
} }
@@ -203,6 +203,49 @@ void tfe_mgr_destroy(struct tfe_mgr* mgr){
FREE(&mgr); FREE(&mgr);
} }
} }
static int get_binded_sockfd(int tfe_id, uint32_t listen_ip, uint16_t listen_port, void *logger){
//create socket
struct sockaddr_in server_addr;
int ret;
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
int flag;
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
goto error_out;
}
flag = 1;
ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flag, sizeof(flag));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
}
//bind
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET; // IPv4
server_addr.sin_addr.s_addr = listen_ip;
server_addr.sin_port = htons(listen_port);
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s, tfe_id is %d",
listen_port, errno, strerror(errno), tfe_id);
goto error_out;
}
//listen
ret = listen(sockfd, 5);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, tfe_id is %d, listen_port is %d",
errno, strerror(errno), tfe_id, listen_port);
goto error_out;
}
return sockfd;
error_out:
if(sockfd > 0){
close(sockfd);
}
return -1;
}
struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){ struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){
struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1); struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1);
mgr->logger = logger; mgr->logger = logger;
@@ -263,14 +306,18 @@ struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logg
ret, errno, strerror(errno), i, tfe_ipaddr_str); ret, errno, strerror(errno), i, tfe_ipaddr_str);
goto error_out; goto error_out;
} }
int sockfd = get_binded_sockfd(i, keepalive_listen_ip, keepalive_listen_port, logger);
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at get binded sockfd, tfe_id is %d", i);
goto error_out;
}
struct thread_tfe_keepalive_args *args = ALLOC(struct thread_tfe_keepalive_args, 1); struct thread_tfe_keepalive_args *args = ALLOC(struct thread_tfe_keepalive_args, 1);
args->mgr = mgr; args->mgr = mgr;
args->keepalive_idle = keepalive_idle; args->keepalive_idle = keepalive_idle;
args->sockfd = sockfd;
args->keepalive_intvl = keepalive_intvl; args->keepalive_intvl = keepalive_intvl;
args->keepalive_cnt = keepalive_cnt; args->keepalive_cnt = keepalive_cnt;
args->tfe_id = i; args->tfe_id = i;
args->listen_ip = keepalive_listen_ip;
args->listen_port = keepalive_listen_port;
args->tfe_ipaddr = tfe_ipaddr; args->tfe_ipaddr = tfe_ipaddr;
ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive, (void *)args); ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive, (void *)args);
if(unlikely(ret != 0)){ if(unlikely(ret != 0)){