#include #include #include #include #include "kni_utils.h" #include "tfe_mgr.h" extern struct kni_field_stat_handle *g_kni_fs_handle; struct tfe_node{ int tfe_id; uint32_t ipaddr; }; struct tfe_mgr{ pthread_rwlock_t rwlock; struct tfe_node tfe_enabled_nodes[TFE_COUNT_MAX]; int tfe_enabled_node_count; int tfe_alive_nodes[TFE_COUNT_MAX]; int tfe_alive_node_count; int keepalive_switch; void *logger; }; struct thread_tfe_keepalive_accept_args{ struct tfe_mgr* mgr; int sockfd; int keepalive_idle; int keepalive_intvl; int keepalive_cnt; void *logger; }; struct thread_tfe_keepalive_recv_args{ struct tfe_mgr* mgr; int client_fd; int tfe_id; void *logger; }; enum tfe_mgr_errno{ TFE_MGR_NOT_EXISTED = -1, TFE_MGR_HAS_EXISTED = -2, TFE_MGR_EXCEED_MAX_COUNT = -3, }; static char* tfe_mgr_errmsg_get(enum tfe_mgr_errno _errno){ switch(_errno){ case TFE_MGR_NOT_EXISTED: return (char*)"tfe not existed"; case TFE_MGR_HAS_EXISTED: return (char*)"tfe has existed"; case TFE_MGR_EXCEED_MAX_COUNT: return (char*)"tfe exceed max count"; default: return (char*)"unknown"; } } static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){ pthread_rwlock_wrlock(&(mgr->rwlock)); int i, ret; for(i = 0; i < mgr->tfe_alive_node_count; i++){ if(mgr->tfe_alive_nodes[i] == tfe_id){ break; } } if(i == mgr->tfe_alive_node_count){ ret = TFE_MGR_NOT_EXISTED; goto out; } for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){ mgr->tfe_alive_nodes[j] = mgr->tfe_alive_nodes[j + 1]; } mgr->tfe_alive_node_count--; ret = 0; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 0); goto out; out: pthread_rwlock_unlock(&(mgr->rwlock)); return ret; } static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){ pthread_rwlock_wrlock(&(mgr->rwlock)); int ret; for(int i = 0; i < mgr->tfe_alive_node_count; i++){ if(mgr->tfe_alive_nodes[i] == tfe_id){ ret = TFE_MGR_HAS_EXISTED; goto out; } } if(mgr->tfe_alive_node_count == TFE_COUNT_MAX){ ret = TFE_MGR_EXCEED_MAX_COUNT; goto out; } mgr->tfe_alive_nodes[mgr->tfe_alive_node_count] = tfe_id; mgr->tfe_alive_node_count++; ret = 0; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 1); goto out; out: pthread_rwlock_unlock(&(mgr->rwlock)); return ret; } static void* thread_tfe_keepalive_recv(void *args){ struct thread_tfe_keepalive_recv_args *_args = (struct thread_tfe_keepalive_recv_args*)args; struct tfe_mgr *mgr = _args->mgr; int client_fd = _args->client_fd; int tfe_id = _args->tfe_id; void *logger = _args->logger; FREE(&args); char buff[BUFF_SIZE_MAX]; char *errmsg = NULL; int ret; while(true){ ret = recv(client_fd, buff, sizeof(buff), 0); if(ret == 0){ KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id); break; } if(ret <= 0){ if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){ KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id); continue; } if(errno == ETIMEDOUT){ KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id); break; } KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id); goto error_out; } } //recv fin: del alive node close(client_fd); ret = tfe_mgr_alive_node_del(mgr, tfe_id); if(ret < 0){ errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); } else{ KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id); } return NULL; error_out: if(client_fd > 0){ close(client_fd); } return NULL; } static void* thread_tfe_keepalive_accept(void *args){ struct thread_tfe_keepalive_accept_args *_args = (struct thread_tfe_keepalive_accept_args*)args; struct tfe_mgr *mgr = _args->mgr; int sockfd = _args->sockfd; int keepalive_idle = _args->keepalive_idle; int keepalive_intvl = _args->keepalive_intvl; int keepalive_cnt = _args->keepalive_cnt; void *logger = mgr->logger; FREE(&args); //accept struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); uint32_t client_ipaddr; char client_ipaddr_str[INET_ADDRSTRLEN] = ""; int flags, ret, client_fd; pthread_t thread_id = -1; int tfe_id = -1; char *errmsg = NULL; while(true){ client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len); if(client_fd < 0){ KNI_LOG_ERROR(logger, "Failed at accept, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); continue; } client_ipaddr = client_addr.sin_addr.s_addr; inet_ntop(AF_INET, &client_ipaddr, client_ipaddr_str, INET_ADDRSTRLEN); for(int i = 0; i < mgr->tfe_enabled_node_count; i++){ if(client_ipaddr == mgr->tfe_enabled_nodes[i].ipaddr){ tfe_id = i; break; } } if(tfe_id == -1){ KNI_LOG_ERROR(logger, "Receive connection not from tfe, client addr is %s", client_ipaddr_str); close(client_fd); continue; } //set socketopt keepalive flags = 1; ret = setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt SO_KEEPALIVE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); close(client_fd); continue; } ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); close(client_fd); continue; } ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); close(client_fd); continue; } ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); close(client_fd); continue; } //add alive node ret = tfe_mgr_alive_node_add(mgr, tfe_id); if(ret < 0){ errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); KNI_LOG_ERROR(logger, "Failed at add tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); close(client_fd); continue; } else{ KNI_LOG_ERROR(logger, "Succeed at add tfe alive node, tfe_id is %d", tfe_id); } //create thread_tfe_keepalive_recv struct thread_tfe_keepalive_recv_args *recv_args = ALLOC(struct thread_tfe_keepalive_recv_args, 1); recv_args->mgr = mgr; recv_args->client_fd = client_fd; recv_args->tfe_id = tfe_id; recv_args->logger = logger; ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive_recv, (void *)recv_args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(logger, "Failed at thread_tfe_keepalive_recv, thread_func is thread_tfe_keepalive_recv, errno is %d, errmsg is %s", errno, strerror(errno)); FREE(&recv_args); close(client_fd); tfe_mgr_alive_node_del(mgr, tfe_id); continue; } } return NULL; } void tfe_mgr_destroy(struct tfe_mgr* mgr){ if(mgr != NULL){ pthread_rwlock_destroy(&(mgr->rwlock)); FREE(&mgr); } } static int get_binded_sockfd(uint32_t listen_ip, uint16_t listen_port, void *logger){ //create socket struct sockaddr_in server_addr; int ret; int sockfd = socket(AF_INET, SOCK_STREAM, 0); int flag; if(sockfd < 0){ KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s", errno, strerror(errno)); goto error_out; } flag = 1; ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flag, sizeof(flag)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s", errno, strerror(errno)); } //bind memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; // IPv4 server_addr.sin_addr.s_addr = listen_ip; server_addr.sin_port = htons(listen_port); ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s", listen_port, errno, strerror(errno)); goto error_out; } //listen ret = listen(sockfd, 5); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, listen_port is %d", errno, strerror(errno), listen_port); goto error_out; } return sockfd; error_out: if(sockfd > 0){ close(sockfd); } return -1; } struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){ struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1); mgr->logger = logger; int ret; //load keepalive conf char section[KNI_SYMBOL_MAX] = "tfe_mgr"; MESA_load_profile_int_def(profile, section, "keepalive_switch", &(mgr->keepalive_switch), 0); KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_switch: %d", section, mgr->keepalive_switch); if(mgr->keepalive_switch == 0){ return mgr; } int keepalive_idle, keepalive_cnt, keepalive_intvl; char keepalive_listen_eth[KNI_SYMBOL_MAX] = ""; uint32_t keepalive_listen_ip; int keepalive_listen_port; uint32_t tfe_node_ipaddr; char tfe_ipaddr_str[INET_ADDRSTRLEN]; pthread_t thread_id = -1; int sockfd; int tfe_node_enabled; int j; struct thread_tfe_keepalive_accept_args *args = NULL; MESA_load_profile_int_def(profile, section, "keepalive_idle", &keepalive_idle, 2); MESA_load_profile_int_def(profile, section, "keepalive_intvl", &keepalive_intvl, 1); MESA_load_profile_int_def(profile, section, "keepalive_cnt", &keepalive_cnt, 3); ret = MESA_load_profile_string_nodef(profile, section, "keepalive_listen_eth", keepalive_listen_eth, sizeof(keepalive_listen_eth)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_eth not set, profile is %s, section is %s", profile, section); goto error_out; } ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s\n keepalive_listen_port: %d", section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth, keepalive_listen_port); ret = kni_ipv4_addr_get_by_eth(keepalive_listen_eth, &keepalive_listen_ip); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", keepalive_listen_eth); goto error_out; } //load tfe_ipaddr j = 0; for(int i = 0; i < tfe_node_count; i++){ snprintf(section, sizeof(section), "tfe%d", i); MESA_load_profile_int_def(profile, section, "enabled", &tfe_node_enabled, 1); if(tfe_node_enabled != 1){ continue; } ret = MESA_load_profile_string_nodef(profile, section, "ip_addr", tfe_ipaddr_str, sizeof(tfe_ipaddr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: ip_addr not set, profile is %s, section is %s", profile, section); goto error_out; } KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n ip_addr: %s", section, tfe_ipaddr_str); ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_node_ipaddr); if(ret != 1){ KNI_LOG_ERROR(logger, "Failed at inet_pton, ret is %d, errno is %d, errmsg is %s, tfe_id is %d, ip_addr is %s", ret, errno, strerror(errno), i, tfe_ipaddr_str); goto error_out; } mgr->tfe_enabled_nodes[j].tfe_id = i; mgr->tfe_enabled_nodes[j].ipaddr = tfe_node_ipaddr; j++; } mgr->tfe_enabled_node_count = j; //init rw_lock ret = pthread_rwlock_init(&(mgr->rwlock), NULL); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno)); goto error_out; } //bind and listen sockfd = get_binded_sockfd(keepalive_listen_ip, keepalive_listen_port, logger); if(sockfd < 0){ KNI_LOG_ERROR(logger, "Failed at get binded sockfd"); goto error_out; } //create tfe_keepalive_accept thread args = ALLOC(struct thread_tfe_keepalive_accept_args, 1); args->mgr = mgr; args->keepalive_idle = keepalive_idle; args->sockfd = sockfd; args->keepalive_intvl = keepalive_intvl; args->keepalive_cnt = keepalive_cnt; args->logger = logger; ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive_accept, (void *)args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive_accept, errno is %d, errmsg is %s", errno, strerror(errno)); FREE(&args); goto error_out; } return mgr; error_out: tfe_mgr_destroy(mgr); return NULL; } int tfe_mgr_alive_node_get(struct tfe_mgr *mgr, int thread_seq){ int tfe_id = -1; if(mgr->keepalive_switch == 0){ if(mgr->tfe_enabled_node_count > 0){ int i = thread_seq % mgr->tfe_enabled_node_count; tfe_id = mgr->tfe_enabled_nodes[i].tfe_id; } return tfe_id; } pthread_rwlock_rdlock(&(mgr->rwlock)); if(mgr->tfe_alive_node_count > 0){ int i = thread_seq % mgr->tfe_alive_node_count; tfe_id = mgr->tfe_alive_nodes[i]; } pthread_rwlock_unlock(&(mgr->rwlock)); return tfe_id; }