修复接入多台tfe时的bug
* 修复keepalive_replay_switch关闭仍然加入hash表的bug * 修复多台tfe时获得tfe_id不正确的问题 * 修改tfe_mgr的线程模型,改成accept之后创建新线程 * tfe配置中增加enabled字段
This commit is contained in:
@@ -7,23 +7,35 @@
|
||||
|
||||
extern struct kni_field_stat_handle *g_kni_fs_handle;
|
||||
|
||||
struct tfe_node{
|
||||
int tfe_id;
|
||||
uint32_t ipaddr;
|
||||
};
|
||||
|
||||
struct tfe_mgr{
|
||||
pthread_rwlock_t rwlock;
|
||||
int tfe_alive_node_list[TFE_COUNT_MAX];
|
||||
struct tfe_node tfe_enabled_nodes[TFE_COUNT_MAX];
|
||||
int tfe_enabled_node_count;
|
||||
int tfe_alive_nodes[TFE_COUNT_MAX];
|
||||
int tfe_alive_node_count;
|
||||
int tfe_node_count;
|
||||
int keepalive_switch;
|
||||
int keepalive_switch;
|
||||
void *logger;
|
||||
};
|
||||
|
||||
struct thread_tfe_keepalive_args{
|
||||
struct thread_tfe_keepalive_accept_args{
|
||||
struct tfe_mgr* mgr;
|
||||
int sockfd;
|
||||
int keepalive_idle;
|
||||
int keepalive_intvl;
|
||||
int keepalive_cnt;
|
||||
void *logger;
|
||||
};
|
||||
|
||||
struct thread_tfe_keepalive_recv_args{
|
||||
struct tfe_mgr* mgr;
|
||||
int client_fd;
|
||||
int tfe_id;
|
||||
uint32_t tfe_ipaddr;
|
||||
void *logger;
|
||||
};
|
||||
|
||||
enum tfe_mgr_errno{
|
||||
@@ -49,7 +61,7 @@ static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){
|
||||
pthread_rwlock_wrlock(&(mgr->rwlock));
|
||||
int i, ret;
|
||||
for(i = 0; i < mgr->tfe_alive_node_count; i++){
|
||||
if(mgr->tfe_alive_node_list[i] == tfe_id){
|
||||
if(mgr->tfe_alive_nodes[i] == tfe_id){
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -58,7 +70,7 @@ static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){
|
||||
goto out;
|
||||
}
|
||||
for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){
|
||||
mgr->tfe_alive_node_list[j] = mgr->tfe_alive_node_list[j + 1];
|
||||
mgr->tfe_alive_nodes[j] = mgr->tfe_alive_nodes[j + 1];
|
||||
}
|
||||
mgr->tfe_alive_node_count--;
|
||||
ret = 0;
|
||||
@@ -74,7 +86,7 @@ static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){
|
||||
pthread_rwlock_wrlock(&(mgr->rwlock));
|
||||
int ret;
|
||||
for(int i = 0; i < mgr->tfe_alive_node_count; i++){
|
||||
if(mgr->tfe_alive_node_list[i] == tfe_id){
|
||||
if(mgr->tfe_alive_nodes[i] == tfe_id){
|
||||
ret = TFE_MGR_HAS_EXISTED;
|
||||
goto out;
|
||||
}
|
||||
@@ -83,7 +95,7 @@ static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){
|
||||
ret = TFE_MGR_EXCEED_MAX_COUNT;
|
||||
goto out;
|
||||
}
|
||||
mgr->tfe_alive_node_list[mgr->tfe_alive_node_count] = tfe_id;
|
||||
mgr->tfe_alive_nodes[mgr->tfe_alive_node_count] = tfe_id;
|
||||
mgr->tfe_alive_node_count++;
|
||||
ret = 0;
|
||||
FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 1);
|
||||
@@ -94,16 +106,61 @@ out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void* thread_tfe_keepalive_recv(void *args){
|
||||
struct thread_tfe_keepalive_recv_args *_args = (struct thread_tfe_keepalive_recv_args*)args;
|
||||
struct tfe_mgr *mgr = _args->mgr;
|
||||
int client_fd = _args->client_fd;
|
||||
int tfe_id = _args->tfe_id;
|
||||
void *logger = _args->logger;
|
||||
FREE(&args);
|
||||
char buff[BUFF_SIZE_MAX];
|
||||
char *errmsg = NULL;
|
||||
int ret;
|
||||
while(true){
|
||||
ret = recv(client_fd, buff, sizeof(buff), 0);
|
||||
if(ret == 0){
|
||||
KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id);
|
||||
break;
|
||||
}
|
||||
if(ret <= 0){
|
||||
if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){
|
||||
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
continue;
|
||||
}
|
||||
if(errno == ETIMEDOUT){
|
||||
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
break;
|
||||
}
|
||||
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
goto error_out;
|
||||
}
|
||||
}
|
||||
//recv fin: del alive node
|
||||
close(client_fd);
|
||||
ret = tfe_mgr_alive_node_del(mgr, tfe_id);
|
||||
if(ret < 0){
|
||||
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
|
||||
KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id);
|
||||
}
|
||||
return NULL;
|
||||
|
||||
static void* thread_tfe_keepalive(void *args){
|
||||
struct thread_tfe_keepalive_args *_args = (struct thread_tfe_keepalive_args*)args;
|
||||
error_out:
|
||||
if(client_fd > 0){
|
||||
close(client_fd);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void* thread_tfe_keepalive_accept(void *args){
|
||||
struct thread_tfe_keepalive_accept_args *_args = (struct thread_tfe_keepalive_accept_args*)args;
|
||||
struct tfe_mgr *mgr = _args->mgr;
|
||||
int sockfd = _args->sockfd;
|
||||
int keepalive_idle = _args->keepalive_idle;
|
||||
int keepalive_intvl = _args->keepalive_intvl;
|
||||
int keepalive_cnt = _args->keepalive_cnt;
|
||||
int tfe_id = _args->tfe_id;
|
||||
uint32_t tfe_ipaddr = _args->tfe_ipaddr;
|
||||
void *logger = mgr->logger;
|
||||
FREE(&args);
|
||||
//accept
|
||||
@@ -112,7 +169,8 @@ static void* thread_tfe_keepalive(void *args){
|
||||
uint32_t client_ipaddr;
|
||||
char client_ipaddr_str[INET_ADDRSTRLEN] = "";
|
||||
int flags, ret, client_fd;
|
||||
char buff[BUFF_SIZE_MAX];
|
||||
pthread_t thread_id = -1;
|
||||
int tfe_id = -1;
|
||||
char *errmsg = NULL;
|
||||
while(true){
|
||||
client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len);
|
||||
@@ -122,8 +180,14 @@ static void* thread_tfe_keepalive(void *args){
|
||||
}
|
||||
client_ipaddr = client_addr.sin_addr.s_addr;
|
||||
inet_ntop(AF_INET, &client_ipaddr, client_ipaddr_str, INET_ADDRSTRLEN);
|
||||
if(client_ipaddr != tfe_ipaddr){
|
||||
KNI_LOG_ERROR(logger, "Receive connection not from tfe%d, client addr is %s", tfe_id, client_ipaddr);
|
||||
for(int i = 0; i < mgr->tfe_enabled_node_count; i++){
|
||||
if(client_ipaddr == mgr->tfe_enabled_nodes[i].ipaddr){
|
||||
tfe_id = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(tfe_id == -1){
|
||||
KNI_LOG_ERROR(logger, "Receive connection not from tfe, client addr is %s", client_ipaddr_str);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
@@ -132,69 +196,54 @@ static void* thread_tfe_keepalive(void *args){
|
||||
ret = setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at set socketopt SO_KEEPALIVE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
//succeed accpt: add alive node
|
||||
//add alive node
|
||||
ret = tfe_mgr_alive_node_add(mgr, tfe_id);
|
||||
if(ret < 0){
|
||||
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
|
||||
KNI_LOG_ERROR(logger, "Failed at add tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
else{
|
||||
KNI_LOG_ERROR(logger, "Succeed at add tfe alive node, tfe_id is %d", tfe_id);
|
||||
}
|
||||
while(true){
|
||||
ret = recv(client_fd, buff, sizeof(buff), 0);
|
||||
if(ret == 0){
|
||||
KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id);
|
||||
break;
|
||||
}
|
||||
if(ret <= 0){
|
||||
if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){
|
||||
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
continue;
|
||||
}
|
||||
if(errno == ETIMEDOUT){
|
||||
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
break;
|
||||
}
|
||||
KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
goto error_out;
|
||||
}
|
||||
}
|
||||
//recv fin: del alive node
|
||||
close(client_fd);
|
||||
ret = tfe_mgr_alive_node_del(mgr, tfe_id);
|
||||
if(ret < 0){
|
||||
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
|
||||
KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
|
||||
}
|
||||
else{
|
||||
KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id);
|
||||
}
|
||||
//create thread_tfe_keepalive_recv
|
||||
struct thread_tfe_keepalive_recv_args *recv_args = ALLOC(struct thread_tfe_keepalive_recv_args, 1);
|
||||
recv_args->mgr = mgr;
|
||||
recv_args->client_fd = client_fd;
|
||||
recv_args->tfe_id = tfe_id;
|
||||
recv_args->logger = logger;
|
||||
ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive_recv, (void *)recv_args);
|
||||
if(unlikely(ret != 0)){
|
||||
KNI_LOG_ERROR(logger, "Failed at thread_tfe_keepalive_recv, thread_func is thread_tfe_keepalive_recv, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
FREE(&recv_args);
|
||||
close(client_fd);
|
||||
tfe_mgr_alive_node_del(mgr, tfe_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
|
||||
error_out:
|
||||
if(sockfd > 0){
|
||||
close(sockfd);
|
||||
}
|
||||
if(client_fd > 0){
|
||||
close(client_fd);
|
||||
}
|
||||
KNI_LOG_ERROR(logger, "thread_tfe_keepalive exited, tfe_id is %d", tfe_id);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tfe_mgr_destroy(struct tfe_mgr* mgr){
|
||||
@@ -204,20 +253,20 @@ void tfe_mgr_destroy(struct tfe_mgr* mgr){
|
||||
}
|
||||
}
|
||||
|
||||
static int get_binded_sockfd(int tfe_id, uint32_t listen_ip, uint16_t listen_port, void *logger){
|
||||
static int get_binded_sockfd(uint32_t listen_ip, uint16_t listen_port, void *logger){
|
||||
//create socket
|
||||
struct sockaddr_in server_addr;
|
||||
int ret;
|
||||
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
int flag;
|
||||
if(sockfd < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
goto error_out;
|
||||
}
|
||||
flag = 1;
|
||||
ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flag, sizeof(flag));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id);
|
||||
KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
}
|
||||
//bind
|
||||
memset(&server_addr, 0, sizeof(server_addr));
|
||||
@@ -226,15 +275,15 @@ static int get_binded_sockfd(int tfe_id, uint32_t listen_ip, uint16_t listen_por
|
||||
server_addr.sin_port = htons(listen_port);
|
||||
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s, tfe_id is %d",
|
||||
listen_port, errno, strerror(errno), tfe_id);
|
||||
KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s",
|
||||
listen_port, errno, strerror(errno));
|
||||
goto error_out;
|
||||
}
|
||||
//listen
|
||||
ret = listen(sockfd, 5);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, tfe_id is %d, listen_port is %d",
|
||||
errno, strerror(errno), tfe_id, listen_port);
|
||||
KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, listen_port is %d",
|
||||
errno, strerror(errno), listen_port);
|
||||
goto error_out;
|
||||
}
|
||||
return sockfd;
|
||||
@@ -249,7 +298,6 @@ error_out:
|
||||
struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){
|
||||
struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1);
|
||||
mgr->logger = logger;
|
||||
mgr->tfe_node_count = tfe_node_count;
|
||||
int ret;
|
||||
//load keepalive conf
|
||||
char section[KNI_SYMBOL_MAX] = "tfe_mgr";
|
||||
@@ -262,9 +310,13 @@ struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logg
|
||||
char keepalive_listen_eth[KNI_SYMBOL_MAX] = "";
|
||||
uint32_t keepalive_listen_ip;
|
||||
int keepalive_listen_port;
|
||||
uint32_t tfe_ipaddr;
|
||||
uint32_t tfe_node_ipaddr;
|
||||
char tfe_ipaddr_str[INET_ADDRSTRLEN];
|
||||
pthread_t thread_id = -1;
|
||||
int sockfd;
|
||||
int tfe_node_enabled;
|
||||
int j;
|
||||
struct thread_tfe_keepalive_accept_args *args = NULL;
|
||||
MESA_load_profile_int_def(profile, section, "keepalive_idle", &keepalive_idle, 2);
|
||||
MESA_load_profile_int_def(profile, section, "keepalive_intvl", &keepalive_intvl, 1);
|
||||
MESA_load_profile_int_def(profile, section, "keepalive_cnt", &keepalive_cnt, 3);
|
||||
@@ -273,58 +325,68 @@ struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logg
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_eth not set, profile is %s, section is %s", profile, section);
|
||||
goto error_out;
|
||||
}
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s",
|
||||
section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth);
|
||||
ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section);
|
||||
goto error_out;
|
||||
}
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s\n keepalive_listen_port: %d",
|
||||
section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth, keepalive_listen_port);
|
||||
ret = kni_ipv4_addr_get_by_eth(keepalive_listen_eth, &keepalive_listen_ip);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", keepalive_listen_eth);
|
||||
goto error_out;
|
||||
}
|
||||
//rw_lock
|
||||
ret = pthread_rwlock_init(&(mgr->rwlock), NULL);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
goto error_out;
|
||||
}
|
||||
//create tfe_keepalive thread
|
||||
//load tfe_ipaddr
|
||||
j = 0;
|
||||
for(int i = 0; i < tfe_node_count; i++){
|
||||
snprintf(section, sizeof(section), "tfe%d", i);
|
||||
ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section);
|
||||
goto error_out;
|
||||
}
|
||||
MESA_load_profile_int_def(profile, section, "enabled", &tfe_node_enabled, 1);
|
||||
if(tfe_node_enabled != 1){
|
||||
continue;
|
||||
}
|
||||
ret = MESA_load_profile_string_nodef(profile, section, "ip_addr", tfe_ipaddr_str, sizeof(tfe_ipaddr_str));
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load: ip_addr not set, profile is %s, section is %s", profile, section);
|
||||
goto error_out;
|
||||
}
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_listen_port: %d\n ip_addr: %s", section, keepalive_listen_port, tfe_ipaddr_str);
|
||||
ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_ipaddr);
|
||||
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n ip_addr: %s", section, tfe_ipaddr_str);
|
||||
ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_node_ipaddr);
|
||||
if(ret != 1){
|
||||
KNI_LOG_ERROR(logger, "Failed at inet_pton, ret is %d, errno is %d, errmsg is %s, tfe_id is %d, ip_addr is %s",
|
||||
ret, errno, strerror(errno), i, tfe_ipaddr_str);
|
||||
goto error_out;
|
||||
}
|
||||
int sockfd = get_binded_sockfd(i, keepalive_listen_ip, keepalive_listen_port, logger);
|
||||
if(sockfd < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at get binded sockfd, tfe_id is %d", i);
|
||||
goto error_out;
|
||||
}
|
||||
struct thread_tfe_keepalive_args *args = ALLOC(struct thread_tfe_keepalive_args, 1);
|
||||
args->mgr = mgr;
|
||||
args->keepalive_idle = keepalive_idle;
|
||||
args->sockfd = sockfd;
|
||||
args->keepalive_intvl = keepalive_intvl;
|
||||
args->keepalive_cnt = keepalive_cnt;
|
||||
args->tfe_id = i;
|
||||
args->tfe_ipaddr = tfe_ipaddr;
|
||||
ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive, (void *)args);
|
||||
if(unlikely(ret != 0)){
|
||||
KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
FREE(&args);
|
||||
goto error_out;
|
||||
}
|
||||
mgr->tfe_enabled_nodes[j].tfe_id = i;
|
||||
mgr->tfe_enabled_nodes[j].ipaddr = tfe_node_ipaddr;
|
||||
j++;
|
||||
}
|
||||
mgr->tfe_enabled_node_count = j;
|
||||
//init rw_lock
|
||||
ret = pthread_rwlock_init(&(mgr->rwlock), NULL);
|
||||
if(ret < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
goto error_out;
|
||||
}
|
||||
//bind and listen
|
||||
sockfd = get_binded_sockfd(keepalive_listen_ip, keepalive_listen_port, logger);
|
||||
if(sockfd < 0){
|
||||
KNI_LOG_ERROR(logger, "Failed at get binded sockfd");
|
||||
goto error_out;
|
||||
}
|
||||
//create tfe_keepalive_accept thread
|
||||
args = ALLOC(struct thread_tfe_keepalive_accept_args, 1);
|
||||
args->mgr = mgr;
|
||||
args->keepalive_idle = keepalive_idle;
|
||||
args->sockfd = sockfd;
|
||||
args->keepalive_intvl = keepalive_intvl;
|
||||
args->keepalive_cnt = keepalive_cnt;
|
||||
args->logger = logger;
|
||||
ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive_accept, (void *)args);
|
||||
if(unlikely(ret != 0)){
|
||||
KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive_accept, errno is %d, errmsg is %s", errno, strerror(errno));
|
||||
FREE(&args);
|
||||
goto error_out;
|
||||
}
|
||||
return mgr;
|
||||
|
||||
@@ -334,13 +396,18 @@ error_out:
|
||||
}
|
||||
|
||||
int tfe_mgr_alive_node_get(struct tfe_mgr *mgr, int thread_seq){
|
||||
int tfe_id = -1;
|
||||
if(mgr->keepalive_switch == 0){
|
||||
return mgr->tfe_node_count > 0 ? thread_seq % mgr->tfe_node_count : -1;
|
||||
if(mgr->tfe_enabled_node_count > 0){
|
||||
int i = thread_seq % mgr->tfe_enabled_node_count;
|
||||
tfe_id = mgr->tfe_enabled_nodes[i].tfe_id;
|
||||
}
|
||||
return tfe_id;
|
||||
}
|
||||
pthread_rwlock_rdlock(&(mgr->rwlock));
|
||||
int tfe_id = -1;
|
||||
if(mgr->tfe_alive_node_count > 0){
|
||||
tfe_id = thread_seq % mgr->tfe_alive_node_count;
|
||||
int i = thread_seq % mgr->tfe_alive_node_count;
|
||||
tfe_id = mgr->tfe_alive_nodes[i];
|
||||
}
|
||||
pthread_rwlock_unlock(&(mgr->rwlock));
|
||||
return tfe_id;
|
||||
|
||||
Reference in New Issue
Block a user