增加和tfe之间的保活

This commit is contained in:
崔一鸣
2019-06-17 20:52:22 +08:00
parent a03bebbfb9
commit 0da2c833fe
11 changed files with 359 additions and 26 deletions

301
entry/src/tfe_mgr.cpp Normal file
View File

@@ -0,0 +1,301 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include "kni_utils.h"
#include "tfe_mgr.h"
struct tfe_mgr{
pthread_rwlock_t rwlock;
int tfe_alive_node_list[TFE_COUNT_MAX];
int tfe_alive_node_count;
int tfe_node_count;
int keepalive_switch;
void *logger;
};
struct thread_tfe_keepalive_args{
struct tfe_mgr* mgr;
int keepalive_idle;
int keepalive_intvl;
int keepalive_cnt;
uint32_t listen_ip;
int listen_port;
int tfe_id;
uint32_t tfe_ipaddr;
};
enum tfe_mgr_errno{
TFE_MGR_NOT_EXISTED = -1,
TFE_MGR_HAS_EXISTED = -2,
TFE_MGR_EXCEED_MAX_COUNT = -3,
};
static char* tfe_mgr_errmsg_get(enum tfe_mgr_errno _errno){
switch(_errno){
case TFE_MGR_NOT_EXISTED:
return (char*)"tfe not existed";
case TFE_MGR_HAS_EXISTED:
return (char*)"tfe has existed";
case TFE_MGR_EXCEED_MAX_COUNT:
return (char*)"tfe exceed max count";
default:
return (char*)"unknown";
}
}
static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){
pthread_rwlock_wrlock(&(mgr->rwlock));
int i, ret;
for(i = 0; i < mgr->tfe_alive_node_count; i++){
if(mgr->tfe_alive_node_list[i] == tfe_id){
break;
}
}
if(i == mgr->tfe_alive_node_count){
ret = TFE_MGR_NOT_EXISTED;
goto out;
}
for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){
mgr->tfe_alive_node_list[j] = mgr->tfe_alive_node_list[j + 1];
}
ret = 0;
goto out;
out:
pthread_rwlock_unlock(&(mgr->rwlock));
return ret;
}
static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){
pthread_rwlock_wrlock(&(mgr->rwlock));
int ret;
for(int i = 0; i < mgr->tfe_alive_node_count; i++){
if(mgr->tfe_alive_node_list[i] == tfe_id){
ret = TFE_MGR_HAS_EXISTED;
goto out;
}
}
if(mgr->tfe_alive_node_count == TFE_COUNT_MAX){
ret = TFE_MGR_EXCEED_MAX_COUNT;
goto out;
}
mgr->tfe_alive_node_list[mgr->tfe_alive_node_count] = tfe_id;
mgr->tfe_alive_node_count++;
ret = 0;
goto out;
out:
pthread_rwlock_unlock(&(mgr->rwlock));
return ret;
}
static void* thread_tfe_keepalive(void *args){
struct thread_tfe_keepalive_args *_args = (struct thread_tfe_keepalive_args*)args;
struct tfe_mgr *mgr = _args->mgr;
int keepalive_idle = _args->keepalive_idle;
int keepalive_intvl = _args->keepalive_intvl;
int keepalive_cnt = _args->keepalive_cnt;
uint32_t listen_ip = _args->listen_ip;
int listen_port = _args->listen_port;
int tfe_id = _args->tfe_id;
uint32_t tfe_ipaddr = _args->tfe_ipaddr;
void *logger = mgr->logger;
FREE(&args);
//create socket
struct sockaddr_in server_addr, client_addr;
socklen_t client_addr_len;
int ret;
int client_fd;
uint32_t client_ipaddr;
char client_ipaddr_str[INET_ADDRSTRLEN] = "";
char *errmsg = NULL;
char buff[BUFF_SIZE_MAX];
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0){
KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//set socketopt
ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
ret = setsockopt(sockfd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//bind
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET; // IPv4
server_addr.sin_addr.s_addr = listen_ip;
server_addr.sin_port = htons(listen_port);
ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr));
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at bind tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//listen
ret = listen(sockfd, 5);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//accept
while(true){
client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len);
if(client_fd < 0){
KNI_LOG_ERROR(logger, "Failed at accept, errno is %d, errmsg is %s", errno, strerror(errno));
continue;
}
client_ipaddr = client_addr.sin_addr.s_addr;
inet_ntop(AF_INET, &client_ipaddr, client_ipaddr_str, INET_ADDRSTRLEN);
if(client_ipaddr != tfe_ipaddr){
KNI_LOG_ERROR(logger, "Receive connection not from tfe%d, client addr is %s", tfe_id, client_ipaddr);
close(client_fd);
continue;
}
//succeed accpt: add alive node
ret = tfe_mgr_alive_node_add(mgr, tfe_id);
if(ret < 0){
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
KNI_LOG_ERROR(logger, "Failed at add alive tfe node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
}
while(true){
ret = recv(client_fd, buff, sizeof(buff), 0);
if(ret < 0){
//recv fin/rst
if(errno == ECONNREFUSED){
break;
}
}
}
//recv fin: del alive node
close(client_fd);
ret = tfe_mgr_alive_node_del(mgr, tfe_id);
if(ret < 0){
errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret);
KNI_LOG_ERROR(logger, "Failed at del alive tfe node, tfe_id is %d, errmsg is %s", tfe_id, errmsg);
}
}
return NULL;
error_out:
if(sockfd > 0){
close(sockfd);
}
return NULL;
}
void tfe_mgr_destroy(struct tfe_mgr* mgr){
if(mgr != NULL){
pthread_rwlock_destroy(&(mgr->rwlock));
FREE(&mgr);
}
}
struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){
struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1);
mgr->logger = logger;
mgr->tfe_node_count = tfe_node_count;
int ret;
//load keepalive conf
char section[KNI_SYMBOL_MAX] = "tfe_mgr";
MESA_load_profile_int_def(profile, section, "keepalive_switch", &(mgr->keepalive_switch), 0);
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_switch: %d", section, mgr->keepalive_switch);
if(mgr->keepalive_switch == 0){
return mgr;
}
int keepalive_idle, keepalive_cnt, keepalive_intvl;
char keepalive_listen_eth[KNI_SYMBOL_MAX] = "";
uint32_t keepalive_listen_ip;
int keepalive_listen_port;
uint32_t tfe_ipaddr;
char tfe_ipaddr_str[INET_ADDRSTRLEN];
pthread_t thread_id = -1;
MESA_load_profile_int_def(profile, section, "keepalive_idle", &keepalive_idle, 2);
MESA_load_profile_int_def(profile, section, "keepalive_intvl", &keepalive_intvl, 1);
MESA_load_profile_int_def(profile, section, "keepalive_cnt", &keepalive_cnt, 3);
ret = MESA_load_profile_string_nodef(profile, section, "keepalive_listen_eth", keepalive_listen_eth, sizeof(keepalive_listen_eth));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_eth not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s",
section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth);
ret = kni_ipv4_addr_get_by_eth(keepalive_listen_eth, &keepalive_listen_ip);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", keepalive_listen_eth);
goto error_out;
}
//rw_lock
ret = pthread_rwlock_init(&(mgr->rwlock), NULL);
if(ret < 0){
KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno));
goto error_out;
}
//create tfe_keepalive thread
for(int i = 0; i < tfe_node_count; i++){
snprintf(section, sizeof(section), "tfe%d", i);
ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port);
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section);
goto error_out;
}
ret = MESA_load_profile_string_nodef(profile, section, "ip_addr", tfe_ipaddr_str, sizeof(tfe_ipaddr_str));
if(ret < 0){
KNI_LOG_ERROR(logger, "MESA_prof_load: ip_addr not set, profile is %s, section is %s", profile, section);
goto error_out;
}
KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_listen_port: %d\n ip_addr: %s", section, keepalive_listen_port, tfe_ipaddr_str);
ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_ipaddr);
if(ret != 1){
KNI_LOG_ERROR(logger, "Failed at inet_pton, ret is %d, errno is %d, errmsg is %s, tfe_id is %d, ip_addr is %s",
ret, errno, strerror(errno), i, tfe_ipaddr_str);
goto error_out;
}
struct thread_tfe_keepalive_args *args = ALLOC(struct thread_tfe_keepalive_args, 1);
args->mgr = mgr;
args->keepalive_idle = keepalive_idle;
args->keepalive_intvl = keepalive_intvl;
args->keepalive_cnt = keepalive_cnt;
args->tfe_id = i;
args->listen_ip = keepalive_listen_ip;
args->listen_port = keepalive_listen_port;
args->tfe_ipaddr = tfe_ipaddr;
ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive, (void *)args);
if(unlikely(ret != 0)){
KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive, errno is %d, errmsg is %s", errno, strerror(errno));
FREE(&args);
goto error_out;
}
}
return mgr;
error_out:
tfe_mgr_destroy(mgr);
return NULL;
}
int tfe_mgr_alive_node_get(struct tfe_mgr *mgr, int thread_seq){
if(mgr->keepalive_switch == 0){
return mgr->tfe_node_count > 0 ? thread_seq % mgr->tfe_node_count : -1;
}
pthread_rwlock_rdlock(&(mgr->rwlock));
int tfe_id = -1;
if(mgr->tfe_alive_node_count > 0){
tfe_id = thread_seq % mgr->tfe_alive_node_count;
}
pthread_rwlock_unlock(&(mgr->rwlock));
return tfe_id;
}