diff --git a/common/include/kni_utils.h b/common/include/kni_utils.h index 46f4b15..bd2f447 100644 --- a/common/include/kni_utils.h +++ b/common/include/kni_utils.h @@ -3,8 +3,9 @@ #include #include #include -#include +#include #include +#include #include #include #include "MESA/MESA_handle_logger.h" @@ -96,6 +97,7 @@ enum kni_field{ KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC, KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL, KNI_FIELD_EXCEED_MTU, + KNI_FIELD_TFE_STATUS_BASE, }; struct kni_field_stat_handle{ diff --git a/common/src/kni_utils.cpp b/common/src/kni_utils.cpp index a9f9f22..b3c05db 100644 --- a/common/src/kni_utils.cpp +++ b/common/src/kni_utils.cpp @@ -266,7 +266,7 @@ MESA_htable_handle kni_create_htable(const char *profile, const char *section, v MESA_load_profile_int_def(profile, section, "mho_hash_max_element_num", &mho_hash_max_element_num, 12345); MESA_load_profile_int_def(profile, section, "mho_expire_time", &mho_expire_time, 3600); MESA_load_profile_string_def(profile, section, "mho_eliminate_type", mho_eliminate_type, sizeof(mho_eliminate_type), "FIFO"); - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mho_screen_print_ctrl: %d\n mho_thread_safe: %d\n mho_mutex_num: %d\n" + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n mho_screen_print_ctrl: %d\n mho_thread_safe: %d\n mho_mutex_num: %d\n" "mho_hash_slot_size: %d\n mho_hash_max_element_num: %d\n mho_expire_time: %d\n mho_eliminate_type: %s\n", section, mho_screen_print_ctrl, mho_thread_safe, mho_mutex_num, mho_hash_slot_size, mho_hash_max_element_num, mho_expire_time, mho_eliminate_type); MESA_htable_handle htable = MESA_htable_born(); diff --git a/conf/kni.conf b/conf/kni.conf index 6e14f04..ad875a0 100644 --- a/conf/kni.conf +++ b/conf/kni.conf @@ -4,6 +4,7 @@ log_level = 10 tfe_count = 1 local_eth = enp8s0 tfe_data_recv_thread_num = 8 +#keepalive_replay: window update replay keepalive_replay_switch = 1 [maat] @@ -28,14 +29,20 @@ src_mac_addr = 00:0e:c6:d6:72:c1 [tfe0] mac_addr = fe:65:b7:03:50:bd dev_eth_symbol = ens1f5 +ip_addr = +keepalive_listen_port = 2476 [tfe1] mac_addr = fe:65:b7:03:50:bd dev_eth_symbol = eth8 +ip_addr = +keepalive_listen_port = 2477 [tfe2] mac_addr = fe:65:b7:03:50:bd dev_eth_symbol = eth9 +ip_addr = +keepalive_listen_port = 2478 [field_stat] stat_path = ./fs2_kni.status @@ -71,4 +78,11 @@ mho_hash_slot_size = 160000 mho_hash_max_element_num = 640000 #must be 0 mho_expire_time = 0 -mho_eliminate_type = LRU \ No newline at end of file +mho_eliminate_type = LRU + +[tfe_mgr] +keepalive_switch = 1 +keepalive_idle = 2 +keepalive_intvl = 1 +keepalive_cnt = 3 +keepalive_listen_eth = \ No newline at end of file diff --git a/entry/CMakeLists.txt b/entry/CMakeLists.txt index 76a7494..846543e 100644 --- a/entry/CMakeLists.txt +++ b/entry/CMakeLists.txt @@ -1,3 +1,3 @@ -add_library(kni SHARED src/kni_entry.cpp src/kni_maat.cpp src/kni_send_logger.cpp) +add_library(kni SHARED src/kni_entry.cpp src/kni_maat.cpp src/kni_send_logger.cpp src/tfe_mgr.cpp) target_include_directories(kni PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(kni common MESA_prof_load MESA_htable MESA_field_stat maatframe marsio uuid cjson rdkafka) \ No newline at end of file diff --git a/entry/include/kni_maat.h b/entry/include/kni_maat.h index 8096f81..98be2fc 100644 --- a/entry/include/kni_maat.h +++ b/entry/include/kni_maat.h @@ -1,4 +1,4 @@ - +#pragma once #define KNI_MAAT_READCONF_IRIS 0 #define KNI_MAAT_READCONF_JSON 1 #define KNI_MAAT_READCONF_REDIS 2 diff --git a/entry/include/kni_send_logger.h b/entry/include/kni_send_logger.h index f5750ad..df62127 100644 --- a/entry/include/kni_send_logger.h +++ b/entry/include/kni_send_logger.h @@ -1,4 +1,4 @@ - +#pragma once struct kni_send_logger; struct kni_send_logger* kni_send_logger_init(const char *profile, void *logger); void kni_send_logger_destroy(struct kni_send_logger *handle); diff --git a/entry/include/tfe_mgr.h b/entry/include/tfe_mgr.h new file mode 100644 index 0000000..ff08aec --- /dev/null +++ b/entry/include/tfe_mgr.h @@ -0,0 +1,8 @@ +#pragma once +#define TFE_COUNT_MAX 32 +#define BUFF_SIZE_MAX 1024 + +struct tfe_mgr; +struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger); +void tfe_mgr_destroy(struct tfe_mgr* mgr); +int tfe_mgr_alive_node_get(struct tfe_mgr *mgr, int thread_seq); \ No newline at end of file diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index b58b1b0..79845ef 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -7,8 +7,8 @@ #include "uuid/uuid.h" #include "cjson/cJSON.h" #include "kni_send_logger.h" -#include #include +#include "tfe_mgr.h" extern int g_iThreadNum; @@ -19,7 +19,6 @@ struct kni_field_stat_handle *g_kni_fs_handle = NULL; #define HTTP_PROJECT_NAME "kni_http_tag" #define BURST_MAX 1 #define STREAM_TRACEID_LEN 37 -#define TFE_COUNT_MAX 16 #define CALLER_SAPP 0 #define CALLER_TFE 1 @@ -147,6 +146,7 @@ struct kni_handle{ uint32_t local_ipv4; int keepalive_replay_switch; void *local_logger; + struct tfe_mgr *_tfe_mgr; }; struct traceid2pme_search_cb_args{ @@ -222,7 +222,6 @@ static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread void *logger = g_kni_handle->local_logger; struct pme_info* pmeinfo = ALLOC(struct pme_info, 1); pmeinfo->addr_type = (enum addr_type_t)stream->addr.addrtype; - pmeinfo->tfe_id = g_kni_handle->tfe_count > 0 ? thread_seq % g_kni_handle->tfe_count : -1; uuid_t uu; uuid_generate_random(uu); uuid_unparse(uu, pmeinfo->stream_traceid); @@ -1009,7 +1008,13 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in case OP_STATE_PENDING: *pme = pmeinfo = pme_info_new(stream, thread_seq); if(pmeinfo == NULL){ - KNI_LOG_ERROR(logger, "Failed at new pmeinfo"); + KNI_LOG_ERROR(logger, "Failed at new pmeinfo, bypass and dropme"); + return APP_STATE_FAWPKT | APP_STATE_DROPME; + } + pmeinfo->tfe_id = tfe_mgr_alive_node_get(g_kni_handle->_tfe_mgr, thread_seq); + if(pmeinfo->tfe_id < 0){ + KNI_LOG_ERROR(logger, "No alive tfe available, bypass and dropme"); + pme_info_destroy(pmeinfo); return APP_STATE_FAWPKT | APP_STATE_DROPME; } FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW], 0, FS_OP_ADD, 1); @@ -1198,7 +1203,7 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, return 0; } -void* thread_tfe_data_receiver(void *args){ +static void* thread_tfe_data_receiver(void *args){ void *logger = g_kni_handle->local_logger; struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args; struct kni_marsio_handle *marsio_handle = _args->marsio_handle; @@ -1352,12 +1357,12 @@ static long traceid2pme_htable_search_cb(void *data, const uchar *key, uint size return 0; } -void* thread_tfe_cmsg_receiver(void *args){ +static void* thread_tfe_cmsg_receiver(void *args){ struct thread_tfe_cmsg_receiver_args *_args = (struct thread_tfe_cmsg_receiver_args*)args; const char *profile = _args->profile; const char *section = "tfe_cmsg_receiver"; void *logger = _args->logger; - char listen_eth[INET_ADDRSTRLEN]; + char listen_eth[KNI_SYMBOL_MAX]; uint32_t listen_ip; int listen_port = -1; char buff[KNI_MTU]; @@ -1373,7 +1378,7 @@ void* thread_tfe_cmsg_receiver(void *args){ KNI_LOG_ERROR(logger, "MESA_prof_load: listen_port not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d", + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n listen_eth: %s\n listen_port: %d", section, listen_eth, listen_port); FREE(&args); //create socket @@ -1470,7 +1475,7 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ KNI_LOG_ERROR(logger, "MESA_prof_load: src_mac_addr not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n appsym: %s\n dev_vxlan_symbol: %s\n src_mac_addr: %s", + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n appsym: %s\n dev_vxlan_symbol: %s\n src_mac_addr: %s", section, appsym, dev_vxlan_symbol, src_mac_addr_str); mr_inst = marsio_create(); if(mr_inst == NULL){ @@ -1516,7 +1521,7 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile is %s, section is %s", profile, _section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s", + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s", _section, mac_addr_str, dev_eth_symbol); //eth_handler receive thread = tfe_data_recv_thread_num, send thread = g_iThreadNum + tfe_data_recv_thread_num dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->tfe_data_recv_thread_num, g_iThreadNum + g_kni_handle->tfe_data_recv_thread_num); @@ -1577,7 +1582,7 @@ static struct kni_field_stat_handle * fs_init(const char *profile){ KNI_LOG_ERROR(logger, "MESA_prof_load: stat_path not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n stat_path: %s\n", "field_stat", stat_path); + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n stat_path: %s\n", "field_stat", stat_path); handle = FS_create_handle(); if(handle == NULL){ KNI_LOG_ERROR(logger, "Failed at create FS_create_handle"); @@ -1627,6 +1632,11 @@ static struct kni_field_stat_handle * fs_init(const char *profile){ fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_fail"); fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_succ"); fs_handle->fields[KNI_FIELD_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "exceed_mtu"); + for(int i = 0; i < g_kni_handle->tfe_count; i++){ + char tfe_status[KNI_SYMBOL_MAX] = ""; + snprintf(tfe_status, sizeof(tfe_status), "tfe%d", i); + fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + i] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, tfe_status); + } fs_handle->handle = handle; FS_start(handle); return fs_handle; @@ -1678,6 +1688,7 @@ extern "C" int kni_init(){ int keepalive_replay_switch = -1; struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args; MESA_htable_handle traceid2pme_htable = NULL, keepalive_replay_htable = NULL; + struct tfe_mgr *_tfe_mgr = NULL; int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path)); if(ret < 0){ printf("MESA_prof_load: log_path not set, profile is %s, section is %s", profile, section); @@ -1788,6 +1799,14 @@ extern "C" int kni_init(){ g_kni_handle->keepalive_replay_htable = keepalive_replay_htable; } + //init tfe_mgr + _tfe_mgr = tfe_mgr_init(tfe_count, profile, local_logger); + if(_tfe_mgr == NULL){ + KNI_LOG_ERROR(local_logger, "Failed at init tfe_mgr"); + goto error_out; + } + g_kni_handle->_tfe_mgr = _tfe_mgr; + //create thread_tfe_data_receiver for(int i = 0; i < g_kni_handle->tfe_data_recv_thread_num; i++){ struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1); @@ -1796,7 +1815,7 @@ extern "C" int kni_init(){ args->thread_seq = i; int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args); if(unlikely(ret != 0)){ - KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret); + KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, errno is %d, errmsg is %s", errno, strerror(errno)); FREE(&args); goto error_out; } @@ -1808,7 +1827,7 @@ extern "C" int kni_init(){ strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1)); ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args); if(unlikely(ret != 0)){ - KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret); + KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, errno is %d, errmsg is %s", errno, strerror(errno)); FREE(&cmsg_receiver_args); goto error_out; } diff --git a/entry/src/kni_maat.cpp b/entry/src/kni_maat.cpp index 75df092..267b115 100644 --- a/entry/src/kni_maat.cpp +++ b/entry/src/kni_maat.cpp @@ -30,7 +30,7 @@ void kni_maat_destroy(struct kni_maat_handle *handle){ void compile_ex_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp){ void *logger = argp; - KNI_LOG_INFO(logger, "call compile_ex_param_new"); + KNI_LOG_DEBUG(logger, "call compile_ex_param_new"); if(rule->config_id == 0){ unsigned char action = (unsigned char)rule->action; g_maat_default_action = (enum kni_action)action; @@ -40,13 +40,13 @@ void compile_ex_param_new(int idx, const struct Maat_rule_t* rule, const char* s void compile_ex_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp){ void *logger = argp; - KNI_LOG_INFO(logger, "call compile_ex_param_free"); + KNI_LOG_DEBUG(logger, "call compile_ex_param_free"); return; } void compile_ex_param_dup(int idx, MAAT_RULE_EX_DATA *to, MAAT_RULE_EX_DATA *from, long argl, void *argp){ void *logger = argp; - KNI_LOG_INFO(logger, "call compile_ex_param_dup"); + KNI_LOG_DEBUG(logger, "call compile_ex_param_dup"); return; } @@ -95,7 +95,7 @@ struct kni_maat_handle* kni_maat_init(const char* profile, void *logger){ KNI_LOG_ERROR(logger, "MESA_prof_load: default_action not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n readconf_mode: %d\n tableinfo_path: %s\n tablename_intercept_ip: %s\n tablename_intercept_domain: %s\n" + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n readconf_mode: %d\n tableinfo_path: %s\n tablename_intercept_ip: %s\n tablename_intercept_domain: %s\n" "compile_alias: %s\n default_action: %d", section, readconf_mode, tableinfo_path, tablename_intercept_ip, tablename_intercept_domain, compile_alias, g_maat_default_action); feather = Maat_feather(g_iThreadNum, tableinfo_path, logger); @@ -112,7 +112,7 @@ struct kni_maat_handle* kni_maat_init(const char* profile, void *logger){ KNI_LOG_ERROR(logger, "MESA_prof_load: maatjson_path not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n maatjson_path: %s", section, maatjson_path); + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n maatjson_path: %s", section, maatjson_path); Maat_set_feather_opt(feather, MAAT_OPT_JSON_FILE_PATH, maatjson_path, strlen(maatjson_path)); break; case KNI_MAAT_READCONF_IRIS: @@ -133,7 +133,7 @@ struct kni_maat_handle* kni_maat_init(const char* profile, void *logger){ KNI_LOG_ERROR(logger, "MESA_prof_load: redis_index not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n redis_ip: %s\n redis_port: %d\n redis_index: %d", + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n redis_ip: %s\n redis_port: %d\n redis_index: %d", section, redis_ip, redis_port, redis_index); Maat_set_feather_opt(feather, MAAT_OPT_REDIS_IP, (void*)redis_ip, strlen(redis_ip) + 1); Maat_set_feather_opt(feather, MAAT_OPT_REDIS_PORT, (void*)&redis_port, sizeof(redis_port)); diff --git a/entry/src/kni_send_logger.cpp b/entry/src/kni_send_logger.cpp index 03fbb41..3ad647c 100644 --- a/entry/src/kni_send_logger.cpp +++ b/entry/src/kni_send_logger.cpp @@ -34,7 +34,7 @@ static rd_kafka_t* kafka_init(const char *profile, void *logger){ KNI_LOG_ERROR(logger, "MESA_prof_load: security.protocol not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n queue.buffering.max.messages: %s\n topic.metadata.refresh.interval.ms: %s\n" + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n queue.buffering.max.messages: %s\n topic.metadata.refresh.interval.ms: %s\n" "security.protocol: %s", "kafka", queue_buffering_max_messages, topic_metadata_refresh_interval_ms, security_protocol); rdkafka_conf = rd_kafka_conf_new(); diff --git a/entry/src/tfe_mgr.cpp b/entry/src/tfe_mgr.cpp new file mode 100644 index 0000000..5c7f815 --- /dev/null +++ b/entry/src/tfe_mgr.cpp @@ -0,0 +1,348 @@ +#include +#include +#include +#include +#include "kni_utils.h" +#include "tfe_mgr.h" + +extern struct kni_field_stat_handle *g_kni_fs_handle; + +struct tfe_mgr{ + pthread_rwlock_t rwlock; + int tfe_alive_node_list[TFE_COUNT_MAX]; + int tfe_alive_node_count; + int tfe_node_count; + int keepalive_switch; + void *logger; +}; + +struct thread_tfe_keepalive_args{ + struct tfe_mgr* mgr; + int sockfd; + int keepalive_idle; + int keepalive_intvl; + int keepalive_cnt; + int tfe_id; + uint32_t tfe_ipaddr; +}; + +enum tfe_mgr_errno{ + TFE_MGR_NOT_EXISTED = -1, + TFE_MGR_HAS_EXISTED = -2, + TFE_MGR_EXCEED_MAX_COUNT = -3, +}; + +static char* tfe_mgr_errmsg_get(enum tfe_mgr_errno _errno){ + switch(_errno){ + case TFE_MGR_NOT_EXISTED: + return (char*)"tfe not existed"; + case TFE_MGR_HAS_EXISTED: + return (char*)"tfe has existed"; + case TFE_MGR_EXCEED_MAX_COUNT: + return (char*)"tfe exceed max count"; + default: + return (char*)"unknown"; + } +} + +static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){ + pthread_rwlock_wrlock(&(mgr->rwlock)); + int i, ret; + for(i = 0; i < mgr->tfe_alive_node_count; i++){ + if(mgr->tfe_alive_node_list[i] == tfe_id){ + break; + } + } + if(i == mgr->tfe_alive_node_count){ + ret = TFE_MGR_NOT_EXISTED; + goto out; + } + for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){ + mgr->tfe_alive_node_list[j] = mgr->tfe_alive_node_list[j + 1]; + } + mgr->tfe_alive_node_count--; + ret = 0; + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 0); + goto out; + +out: + pthread_rwlock_unlock(&(mgr->rwlock)); + return ret; +} + +static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){ + pthread_rwlock_wrlock(&(mgr->rwlock)); + int ret; + for(int i = 0; i < mgr->tfe_alive_node_count; i++){ + if(mgr->tfe_alive_node_list[i] == tfe_id){ + ret = TFE_MGR_HAS_EXISTED; + goto out; + } + } + if(mgr->tfe_alive_node_count == TFE_COUNT_MAX){ + ret = TFE_MGR_EXCEED_MAX_COUNT; + goto out; + } + mgr->tfe_alive_node_list[mgr->tfe_alive_node_count] = tfe_id; + mgr->tfe_alive_node_count++; + ret = 0; + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 1); + goto out; + +out: + pthread_rwlock_unlock(&(mgr->rwlock)); + return ret; +} + + +static void* thread_tfe_keepalive(void *args){ + struct thread_tfe_keepalive_args *_args = (struct thread_tfe_keepalive_args*)args; + struct tfe_mgr *mgr = _args->mgr; + int sockfd = _args->sockfd; + int keepalive_idle = _args->keepalive_idle; + int keepalive_intvl = _args->keepalive_intvl; + int keepalive_cnt = _args->keepalive_cnt; + int tfe_id = _args->tfe_id; + uint32_t tfe_ipaddr = _args->tfe_ipaddr; + void *logger = mgr->logger; + FREE(&args); + //accept + struct sockaddr_in client_addr; + socklen_t client_addr_len = sizeof(client_addr); + uint32_t client_ipaddr; + char client_ipaddr_str[INET_ADDRSTRLEN] = ""; + int flags, ret, client_fd; + char buff[BUFF_SIZE_MAX]; + char *errmsg = NULL; + while(true){ + client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len); + if(client_fd < 0){ + KNI_LOG_ERROR(logger, "Failed at accept, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + continue; + } + client_ipaddr = client_addr.sin_addr.s_addr; + inet_ntop(AF_INET, &client_ipaddr, client_ipaddr_str, INET_ADDRSTRLEN); + if(client_ipaddr != tfe_ipaddr){ + KNI_LOG_ERROR(logger, "Receive connection not from tfe%d, client addr is %s", tfe_id, client_ipaddr); + close(client_fd); + continue; + } + //set socketopt keepalive + flags = 1; + ret = setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at set socketopt SO_KEEPALIVE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + } + ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + } + ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + } + ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + } + //succeed accpt: add alive node + ret = tfe_mgr_alive_node_add(mgr, tfe_id); + if(ret < 0){ + errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); + KNI_LOG_ERROR(logger, "Failed at add tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); + } + else{ + KNI_LOG_ERROR(logger, "Succeed at add tfe alive node, tfe_id is %d", tfe_id); + } + while(true){ + ret = recv(client_fd, buff, sizeof(buff), 0); + if(ret == 0){ + KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id); + break; + } + if(ret <= 0){ + if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){ + KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id); + continue; + } + if(errno == ETIMEDOUT){ + KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id); + break; + } + KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id); + goto error_out; + } + } + //recv fin: del alive node + close(client_fd); + ret = tfe_mgr_alive_node_del(mgr, tfe_id); + if(ret < 0){ + errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); + KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); + } + else{ + KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id); + } + } + return NULL; + +error_out: + if(sockfd > 0){ + close(sockfd); + } + if(client_fd > 0){ + close(client_fd); + } + KNI_LOG_ERROR(logger, "thread_tfe_keepalive exited, tfe_id is %d", tfe_id); + return NULL; +} + +void tfe_mgr_destroy(struct tfe_mgr* mgr){ + if(mgr != NULL){ + pthread_rwlock_destroy(&(mgr->rwlock)); + FREE(&mgr); + } +} + +static int get_binded_sockfd(int tfe_id, uint32_t listen_ip, uint16_t listen_port, void *logger){ + //create socket + struct sockaddr_in server_addr; + int ret; + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + int flag; + if(sockfd < 0){ + KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + goto error_out; + } + flag = 1; + ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flag, sizeof(flag)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + } + //bind + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; // IPv4 + server_addr.sin_addr.s_addr = listen_ip; + server_addr.sin_port = htons(listen_port); + ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s, tfe_id is %d", + listen_port, errno, strerror(errno), tfe_id); + goto error_out; + } + //listen + ret = listen(sockfd, 5); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, tfe_id is %d, listen_port is %d", + errno, strerror(errno), tfe_id, listen_port); + goto error_out; + } + return sockfd; + +error_out: + if(sockfd > 0){ + close(sockfd); + } + return -1; +} + +struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){ + struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1); + mgr->logger = logger; + mgr->tfe_node_count = tfe_node_count; + int ret; + //load keepalive conf + char section[KNI_SYMBOL_MAX] = "tfe_mgr"; + MESA_load_profile_int_def(profile, section, "keepalive_switch", &(mgr->keepalive_switch), 0); + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_switch: %d", section, mgr->keepalive_switch); + if(mgr->keepalive_switch == 0){ + return mgr; + } + int keepalive_idle, keepalive_cnt, keepalive_intvl; + char keepalive_listen_eth[KNI_SYMBOL_MAX] = ""; + uint32_t keepalive_listen_ip; + int keepalive_listen_port; + uint32_t tfe_ipaddr; + char tfe_ipaddr_str[INET_ADDRSTRLEN]; + pthread_t thread_id = -1; + MESA_load_profile_int_def(profile, section, "keepalive_idle", &keepalive_idle, 2); + MESA_load_profile_int_def(profile, section, "keepalive_intvl", &keepalive_intvl, 1); + MESA_load_profile_int_def(profile, section, "keepalive_cnt", &keepalive_cnt, 3); + ret = MESA_load_profile_string_nodef(profile, section, "keepalive_listen_eth", keepalive_listen_eth, sizeof(keepalive_listen_eth)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_eth not set, profile is %s, section is %s", profile, section); + goto error_out; + } + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s", + section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth); + ret = kni_ipv4_addr_get_by_eth(keepalive_listen_eth, &keepalive_listen_ip); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", keepalive_listen_eth); + goto error_out; + } + //rw_lock + ret = pthread_rwlock_init(&(mgr->rwlock), NULL); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno)); + goto error_out; + } + //create tfe_keepalive thread + for(int i = 0; i < tfe_node_count; i++){ + snprintf(section, sizeof(section), "tfe%d", i); + ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section); + goto error_out; + } + ret = MESA_load_profile_string_nodef(profile, section, "ip_addr", tfe_ipaddr_str, sizeof(tfe_ipaddr_str)); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_prof_load: ip_addr not set, profile is %s, section is %s", profile, section); + goto error_out; + } + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_listen_port: %d\n ip_addr: %s", section, keepalive_listen_port, tfe_ipaddr_str); + ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_ipaddr); + if(ret != 1){ + KNI_LOG_ERROR(logger, "Failed at inet_pton, ret is %d, errno is %d, errmsg is %s, tfe_id is %d, ip_addr is %s", + ret, errno, strerror(errno), i, tfe_ipaddr_str); + goto error_out; + } + int sockfd = get_binded_sockfd(i, keepalive_listen_ip, keepalive_listen_port, logger); + if(sockfd < 0){ + KNI_LOG_ERROR(logger, "Failed at get binded sockfd, tfe_id is %d", i); + goto error_out; + } + struct thread_tfe_keepalive_args *args = ALLOC(struct thread_tfe_keepalive_args, 1); + args->mgr = mgr; + args->keepalive_idle = keepalive_idle; + args->sockfd = sockfd; + args->keepalive_intvl = keepalive_intvl; + args->keepalive_cnt = keepalive_cnt; + args->tfe_id = i; + args->tfe_ipaddr = tfe_ipaddr; + ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive, (void *)args); + if(unlikely(ret != 0)){ + KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive, errno is %d, errmsg is %s", errno, strerror(errno)); + FREE(&args); + goto error_out; + } + } + return mgr; + +error_out: + tfe_mgr_destroy(mgr); + return NULL; +} + +int tfe_mgr_alive_node_get(struct tfe_mgr *mgr, int thread_seq){ + if(mgr->keepalive_switch == 0){ + return mgr->tfe_node_count > 0 ? thread_seq % mgr->tfe_node_count : -1; + } + pthread_rwlock_rdlock(&(mgr->rwlock)); + int tfe_id = -1; + if(mgr->tfe_alive_node_count > 0){ + tfe_id = thread_seq % mgr->tfe_alive_node_count; + } + pthread_rwlock_unlock(&(mgr->rwlock)); + return tfe_id; +} +