From 455612fd0318a9332a3e1bc7d55d9e1f61fa5dc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B4=94=E4=B8=80=E9=B8=A3?= Date: Wed, 19 Jun 2019 12:23:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=8E=A5=E5=85=A5=E5=A4=9A?= =?UTF-8?q?=E5=8F=B0tfe=E6=97=B6=E7=9A=84bug=20=20=20=20*=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8Dkeepalive=5Freplay=5Fswitch=E5=85=B3=E9=97=AD=E4=BB=8D?= =?UTF-8?q?=E7=84=B6=E5=8A=A0=E5=85=A5hash=E8=A1=A8=E7=9A=84bug=20=20=20?= =?UTF-8?q?=20*=20=E4=BF=AE=E5=A4=8D=E5=A4=9A=E5=8F=B0tfe=E6=97=B6?= =?UTF-8?q?=E8=8E=B7=E5=BE=97tfe=5Fid=E4=B8=8D=E6=AD=A3=E7=A1=AE=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98=20=20=20=20*=20=E4=BF=AE=E6=94=B9tfe=5Fmgr?= =?UTF-8?q?=E7=9A=84=E7=BA=BF=E7=A8=8B=E6=A8=A1=E5=9E=8B,=E6=94=B9?= =?UTF-8?q?=E6=88=90accept=E4=B9=8B=E5=90=8E=E5=88=9B=E5=BB=BA=E6=96=B0?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=20=20=20=20*=20tfe=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=B8=AD=E5=A2=9E=E5=8A=A0enabled=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/include/kni_utils.h | 6 +- conf/kni.conf | 11 +- entry/src/kni_entry.cpp | 377 +++++++++++++++++++++---------------- entry/src/tfe_mgr.cpp | 273 +++++++++++++++++---------- 4 files changed, 394 insertions(+), 273 deletions(-) diff --git a/common/include/kni_utils.h b/common/include/kni_utils.h index 756b0a5..b9d58a1 100644 --- a/common/include/kni_utils.h +++ b/common/include/kni_utils.h @@ -61,9 +61,9 @@ struct kni_tcpopt_info{ }; //field_stat -#define KNI_FIELD_MAX 32 +#define KNI_FIELD_MAX 64 enum kni_field{ - KNI_FIELD_TOT_PKT, + KNI_FIELD_TOT_PKT = 0, KNI_FIELD_BYP_PKT, KNI_FIELD_INTCP_PKT, KNI_FIELD_IPV6_PKT, @@ -90,6 +90,8 @@ enum kni_field{ KNI_FIELD_IPV6HDR_PARSE_FAIL, KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC, KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL, + KNI_FIELD_KEEPALIVE_REPLAY_DEL_SUCC, + KNI_FIELD_KEEPALIVE_REPLAY_DEL_FAIL, KNI_FIELD_EXCEED_MTU, KNI_FIELD_SENDTO_TFE_FAIL, //KNI_FIELD_TFE_STATUS_BASE must be last diff --git a/conf/kni.conf b/conf/kni.conf index ad875a0..e6e136c 100644 --- a/conf/kni.conf +++ b/conf/kni.conf @@ -1,7 +1,7 @@ [global] log_path = ./log/kni/kni.log log_level = 10 -tfe_count = 1 +tfe_node_count = 1 local_eth = enp8s0 tfe_data_recv_thread_num = 8 #keepalive_replay: window update replay @@ -27,22 +27,22 @@ dev_vxlan_symbol = vxlan_user src_mac_addr = 00:0e:c6:d6:72:c1 [tfe0] +enabled = 1 mac_addr = fe:65:b7:03:50:bd dev_eth_symbol = ens1f5 ip_addr = -keepalive_listen_port = 2476 [tfe1] +enabled = 1 mac_addr = fe:65:b7:03:50:bd dev_eth_symbol = eth8 ip_addr = -keepalive_listen_port = 2477 [tfe2] +enabled = 1 mac_addr = fe:65:b7:03:50:bd dev_eth_symbol = eth9 ip_addr = -keepalive_listen_port = 2478 [field_stat] stat_path = ./fs2_kni.status @@ -85,4 +85,5 @@ keepalive_switch = 1 keepalive_idle = 2 keepalive_intvl = 1 keepalive_cnt = 3 -keepalive_listen_eth = \ No newline at end of file +keepalive_listen_eth = +keepalive_listen_port = 2476 \ No newline at end of file diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index 13e116e..7c13630 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -38,7 +38,7 @@ enum stream_error{ STREAM_ERROR_NO_DATA = -6, STREAM_ERROR_IPV4HDR_PARSE_FAIL = -7, STREAM_ERROR_IPV6HDR_PARSE_FAIL = -8, - STREAM_ERROR_DUP_STREAM = -9, + STREAM_ERROR_KA_REPLAY_ADD_FAIL = -9, STREAM_ERROR_EXCEED_MTU = -10, STREAM_ERROR_SENDTO_TFE_FAIL = -11, }; @@ -103,7 +103,8 @@ struct tcp_option_restore{ uint16_t offset; }; -struct tfe_instance{ +struct tfe_enabled_node{ + int tfe_id; struct mr_vdev *dev_eth_handler; struct mr_sendpath *dev_eth_sendpath; char mac_addr[6]; @@ -111,7 +112,8 @@ struct tfe_instance{ struct kni_marsio_handle{ struct mr_instance *instance; - struct tfe_instance *tfe_instance_list[TFE_COUNT_MAX]; + int tfe_enabled_node_count; + struct tfe_enabled_node tfe_enabled_nodes[TFE_COUNT_MAX]; struct mr_vdev *dev_vxlan_handler; struct mr_sendpath *dev_vxlan_sendpath; char src_mac_addr[6]; @@ -142,7 +144,6 @@ struct kni_handle{ struct kni_send_logger *send_logger; MESA_htable_handle traceid2pme_htable; MESA_htable_handle keepalive_replay_htable; - int tfe_count; int tfe_data_recv_thread_num; uint32_t local_ipv4; int keepalive_replay_switch; @@ -187,6 +188,8 @@ static char* stream_errmsg_get(enum stream_error _errno){ return (char*)"ipv4 header parse fail"; case STREAM_ERROR_IPV6HDR_PARSE_FAIL: return (char*)"ipv6 header parse fail"; + case STREAM_ERROR_KA_REPLAY_ADD_FAIL: + return (char*)"keepalive_replay_add_fail"; case STREAM_ERROR_EXCEED_MTU: return (char*)"exceed mtu(1500)"; case STREAM_ERROR_SENDTO_TFE_FAIL: @@ -369,6 +372,67 @@ error_out: return -1; } +static void keepalive_replay_htable_del(struct pme_info *pmeinfo){ + void *logger = g_kni_handle->local_logger; + int key_size = 0, ret; + char stream_addr[KNI_SYMBOL_MAX] = ""; + kni_stream_addr_trans((const layer_addr*)(pmeinfo->addr), pmeinfo->addr_type, stream_addr, sizeof(stream_addr)); + //c2s + struct stream_tuple4_v4 *c2s_key_v4 = NULL; + struct stream_tuple4_v6 *c2s_key_v6 = NULL; + key_size = 0; + if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ + c2s_key_v6 = pmeinfo->addr->tuple4_v6; + key_size = sizeof(struct stream_tuple4_v6); + ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)c2s_key_v6, key_size, NULL); + } + else{ + c2s_key_v4 = pmeinfo->addr->tuple4_v4; + key_size = sizeof(struct stream_tuple4_v4); + ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)c2s_key_v4, key_size, NULL); + } + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, stream addr is %s, dir is c2s, ret is %d", + "keepalive_replay_htable", stream_addr, ret); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_DEL_FAIL], 0, FS_OP_ADD, 1); + } + else{ + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_DEL_SUCC], 0, FS_OP_ADD, 1); + //KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, stream addr is %s, dir is c2s", + // "keepalive_replay_htable", stream_addr); + } + //s2c + if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ + struct stream_tuple4_v6 s2c_key_v6; + memcpy(s2c_key_v6.saddr, c2s_key_v6->daddr, sizeof(s2c_key_v6.saddr)); + memcpy(s2c_key_v6.daddr, c2s_key_v6->saddr, sizeof(s2c_key_v6.daddr)); + s2c_key_v6.source = c2s_key_v6->dest; + s2c_key_v6.dest = c2s_key_v6->source; + key_size = sizeof(struct stream_tuple4_v6); + ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)&s2c_key_v6, + key_size, NULL); + } + else{ + struct stream_tuple4_v4 s2c_key_v4; + s2c_key_v4.saddr = c2s_key_v4->daddr; + s2c_key_v4.daddr = c2s_key_v4->saddr; + s2c_key_v4.source = c2s_key_v4->dest; + s2c_key_v4.dest = c2s_key_v4->source; + key_size = sizeof(struct stream_tuple4_v4); + ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)&s2c_key_v4, + key_size, NULL); + } + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, stream addr is %s, dir is s2c, ret is %d", + "keepalive_replay_htable", stream_addr, ret); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_DEL_FAIL], 0, FS_OP_ADD, 1); + } + else{ + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_DEL_SUCC], 0, FS_OP_ADD, 1); + //KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, stream addr is %s, dir is s2c", + // "keepalive_replay_htable", stream_addr); + } +} static void judge_pme_destroy(struct pme_info *pmeinfo, int caller){ void *logger = g_kni_handle->local_logger; if(pmeinfo != NULL){ @@ -412,58 +476,8 @@ static void judge_pme_destroy(struct pme_info *pmeinfo, int caller){ FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC], 0, FS_OP_ADD, 1); } //del keepalive_replay_htable - char stream_addr[KNI_SYMBOL_MAX] = ""; - kni_stream_addr_trans((const layer_addr*)(pmeinfo->addr), pmeinfo->addr_type, stream_addr, sizeof(stream_addr)); - //c2s - struct stream_tuple4_v4 *c2s_key_v4 = NULL; - struct stream_tuple4_v6 *c2s_key_v6 = NULL; - key_size = 0; - if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ - c2s_key_v6 = pmeinfo->addr->tuple4_v6; - key_size = sizeof(struct stream_tuple4_v6); - ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)c2s_key_v6, key_size, NULL); - } - else{ - c2s_key_v4 = pmeinfo->addr->tuple4_v4; - key_size = sizeof(struct stream_tuple4_v4); - ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)c2s_key_v4, key_size, NULL); - } - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, stream addr is %s, dir is c2s, ret is %d", - "keepalive_replay_htable", stream_addr, ret); - } - else{ - //KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, stream addr is %s, dir is c2s", - // "keepalive_replay_htable", stream_addr); - } - //s2c - if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ - struct stream_tuple4_v6 s2c_key_v6; - memcpy(s2c_key_v6.saddr, c2s_key_v6->daddr, sizeof(s2c_key_v6.saddr)); - memcpy(s2c_key_v6.daddr, c2s_key_v6->saddr, sizeof(s2c_key_v6.daddr)); - s2c_key_v6.source = c2s_key_v6->dest; - s2c_key_v6.dest = c2s_key_v6->source; - key_size = sizeof(struct stream_tuple4_v6); - ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)&s2c_key_v6, - key_size, NULL); - } - else{ - struct stream_tuple4_v4 s2c_key_v4; - s2c_key_v4.saddr = c2s_key_v4->daddr; - s2c_key_v4.daddr = c2s_key_v4->saddr; - s2c_key_v4.source = c2s_key_v4->dest; - s2c_key_v4.dest = c2s_key_v4->source; - key_size = sizeof(struct stream_tuple4_v4); - ret = MESA_htable_del(g_kni_handle->keepalive_replay_htable, (const unsigned char*)&s2c_key_v4, - key_size, NULL); - } - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_htable: Failed at del, table is %s, stream addr is %s, dir is s2c, ret is %d", - "keepalive_replay_htable", stream_addr, ret); - } - else{ - //KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at del, table is %s, stream addr is %s, dir is s2c", - // "keepalive_replay_htable", stream_addr); + if(g_kni_handle->keepalive_replay_switch == 1){ + keepalive_replay_htable_del(pmeinfo); } } //free pme @@ -660,10 +674,21 @@ static char* add_cmsg_to_packet(struct pme_info *pmeinfo, struct pkt_info *pktin static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, uint16_t raw_len, int thread_seq, int tfe_id, addr_type_t addr_type){ void *logger = g_kni_handle->local_logger; marsio_buff_t *tx_buffs[BURST_MAX]; - struct mr_vdev *dev_eth_handler = handle->tfe_instance_list[tfe_id]->dev_eth_handler; - struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath; + int index = -1; + for(int i = 0; i < handle->tfe_enabled_node_count; i++){ + if(handle->tfe_enabled_nodes[i].tfe_id == tfe_id){ + index = i; + break; + } + } + if(index == -1){ + KNI_LOG_ERROR(logger, "tfd %d is disabled"); + return -1; + } + struct mr_vdev *dev_eth_handler = handle->tfe_enabled_nodes[index].dev_eth_handler; + struct mr_sendpath *dev_eth_sendpath = handle->tfe_enabled_nodes[index].dev_eth_sendpath; char *src_mac = handle->src_mac_addr; - char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr; + char *dst_mac = handle->tfe_enabled_nodes[index].mac_addr; //only send one packet, alloc_ret <= nr_send <= BURST_MAX int nr_send = 1; int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq); @@ -696,7 +721,7 @@ static int wrapped_kni_header_parse(const void *a_packet, struct pme_info *pmein int ret = kni_ipv6_header_parse(a_packet, pktinfo); if(ret < 0){ char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret); - KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, bypass and dropme, errmsg is %s, stream treaceid is %s", + KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, errmsg is %s, stream treaceid is %s", errmsg, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_IPV6HDR_PARSE_FAIL; @@ -707,7 +732,7 @@ static int wrapped_kni_header_parse(const void *a_packet, struct pme_info *pmein int ret = kni_ipv4_header_parse(a_packet, pktinfo); if(ret < 0){ char *errmsg = kni_ipv4_errmsg_get((enum kni_ipv4hdr_parse_error)ret); - KNI_LOG_ERROR(logger, "Failed at parse ipv4 header, bypass and dropme, errmsg is %s, stream treaceid is %s", + KNI_LOG_ERROR(logger, "Failed at parse ipv4 header, errmsg is %s, stream treaceid is %s", errmsg, pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); pmeinfo->error = STREAM_ERROR_IPV4HDR_PARSE_FAIL; @@ -736,11 +761,11 @@ static char pending_opstate(const struct streaminfo *stream, struct pme_info *pm return APP_STATE_FAWPKT | APP_STATE_GIVEME; } -static int first_data_intercept(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, char *stream_addr, int thread_seq){ +int keepalive_replay_htable_add(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, char *stream_addr, int *sapp_ret){ void *logger = g_kni_handle->local_logger; + int key_size =0, ret; struct keepalive_replay_htable_value *c2s_value = ALLOC(struct keepalive_replay_htable_value, 1); c2s_value->first_data_len = pktinfo->data_len; - int key_size = 0, ret; struct stream_tuple4_v4 *c2s_key_v4 = NULL; struct stream_tuple4_v6 *c2s_key_v6 = NULL; if(pmeinfo->addr_type == ADDR_TYPE_IPV6){ @@ -758,10 +783,11 @@ static int first_data_intercept(const struct streaminfo *stream, struct pme_info if(ret != MESA_HTABLE_RET_DUP_ITEM){ KNI_LOG_ERROR(logger, "MESA_htable: Failed at add, table is keepalive_replay_htable, " "dir is c2s, key is %s, key_size is %d, ret is %d", stream_addr, key_size, ret); - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL], 0, FS_OP_ADD, 1); } - pmeinfo->error = STREAM_ERROR_DUP_STREAM; - return APP_STATE_FAWPKT | APP_STATE_DROPME; + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL], 0, FS_OP_ADD, 1); + pmeinfo->error = STREAM_ERROR_KA_REPLAY_ADD_FAIL; + *sapp_ret = APP_STATE_FAWPKT | APP_STATE_DROPME; + return -1; } else{ //KNI_LOG_DEBUG(logger, "MESA_htable: Succeed at add, table is keepalive_replay_htable, " @@ -800,7 +826,18 @@ static int first_data_intercept(const struct streaminfo *stream, struct pme_info // "dir is s2c, key is %s, key_size is %d", stream_addr, key_size); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC], 0, FS_OP_ADD, 1); } - + return 0; +} +static int first_data_intercept(const struct streaminfo *stream, struct pme_info *pmeinfo, struct pkt_info *pktinfo, char *stream_addr, int thread_seq){ + void *logger = g_kni_handle->local_logger; + int key_size =0, ret; + if(g_kni_handle->keepalive_replay_switch == 1){ + int sapp_ret; + ret = keepalive_replay_htable_add(stream, pmeinfo, pktinfo, stream_addr, &sapp_ret); + if(ret < 0){ + return sapp_ret; + } + } //only intercept: add to traceid2pme htable key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)); ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid), @@ -1019,13 +1056,14 @@ extern "C" char kni_tcpall_entry(const struct streaminfo *stream, void** pme, in KNI_LOG_ERROR(logger, "Failed at new pmeinfo, bypass and dropme"); return APP_STATE_FAWPKT | APP_STATE_DROPME; } + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW], 0, FS_OP_ADD, 1); pmeinfo->tfe_id = tfe_mgr_alive_node_get(g_kni_handle->_tfe_mgr, thread_seq); + printf("tfe_id is %d\n", pmeinfo->tfe_id); if(pmeinfo->tfe_id < 0){ KNI_LOG_ERROR(logger, "No alive tfe available, bypass and dropme"); pme_info_destroy(pmeinfo); return APP_STATE_FAWPKT | APP_STATE_DROPME; } - FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_PME_NEW], 0, FS_OP_ADD, 1); ret = pending_opstate(stream, pmeinfo, a_packet); if(pmeinfo->error < 0){ goto error_out; @@ -1111,9 +1149,6 @@ static void kni_marsio_destroy(struct kni_marsio_handle *handle){ if(handle->instance != NULL){ marsio_destory(handle->instance); } - for(int i = 0; i < TFE_COUNT_MAX; i++){ - FREE(&handle->tfe_instance_list[i]); - } } FREE(&handle); handle = NULL; @@ -1159,6 +1194,7 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, if(ret < 0){ char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret); KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, send to vxlan, errmsg is %s", errmsg); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq); return 0; } @@ -1212,81 +1248,87 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, return 0; } -static void* thread_tfe_data_receiver(void *args){ +void keepalive_replay_htable_search(struct kni_marsio_handle *marsio_handle, marsio_buff_t **rx_buffs, int nr_recv, int tfe_id, int thread_seq){ void *logger = g_kni_handle->local_logger; + for(int i = 0; i < nr_recv; i++){ + struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]); + if(ether_hdr->h_proto == htons(ETH_P_IP) || ether_hdr->h_proto == htons(ETH_P_IPV6)){ + void *raw_packet = (char*)ether_hdr + sizeof(*ether_hdr); + long cb_ret = -1; + keepalive_replay_search_cb_args cb_args; + memset(&cb_args, 0, sizeof(cb_args)); + cb_args.rx_buff = rx_buffs[i]; + cb_args.marsio_handle = marsio_handle; + cb_args.tfe_id = tfe_id; + cb_args.thread_seq = thread_seq; + //ipv4 + if(ether_hdr->h_proto == htons(ETH_P_IP)){ + struct iphdr *iphdr = (struct iphdr*)raw_packet; + uint16_t iphdr_len = iphdr->ihl * 4; + struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len); + struct stream_tuple4_v4 key; + key.saddr = iphdr->saddr; + key.daddr = iphdr->daddr; + key.source = tcphdr->source; + key.dest = tcphdr->dest; + cb_args.addr_type = ADDR_TYPE_IPV4; + cb_args.raw_packet = raw_packet; + MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), + sizeof(key), keepalive_replay_search_cb, &cb_args, &cb_ret); + } + //ipv6 + else{ + void *a_packet = (char*)ether_hdr + sizeof(*ether_hdr); + struct pkt_info pktinfo; + int ret = kni_ipv6_header_parse(a_packet, &pktinfo); + if(ret < 0){ + char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret); + KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, send to vxlan, errmsg is %s", errmsg); + FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL], 0, FS_OP_ADD, 1); + sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); + } + else{ + struct stream_tuple4_v6 key; + memcpy(key.saddr, &(pktinfo.iphdr.v6->ip6_src), sizeof(*(key.saddr))); + memcpy(key.daddr, &(pktinfo.iphdr.v6->ip6_dst), sizeof(*(key.daddr))); + key.source = pktinfo.tcphdr->source; + key.dest = pktinfo.tcphdr->dest; + cb_args.addr_type = ADDR_TYPE_IPV6; + cb_args.raw_packet = raw_packet; + MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), + sizeof(key), keepalive_replay_search_cb, &cb_args, &cb_ret); + } + } + } + else{ + sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); + } + } +} + +static void* thread_tfe_data_receiver(void *args){ struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args; struct kni_marsio_handle *marsio_handle = _args->marsio_handle; int thread_seq = _args->thread_seq; FREE(&args); while(true){ //polling tfe - for(int i = 0; i < g_kni_handle->tfe_count; i++){ + for(int i = 0; i < marsio_handle->tfe_enabled_node_count; i++){ marsio_buff_t *rx_buffs[BURST_MAX]; int nr_burst = 1; - struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[i]->dev_eth_handler; + struct mr_vdev *dev_eth_handler = marsio_handle->tfe_enabled_nodes[i].dev_eth_handler; + int tfe_id = marsio_handle->tfe_enabled_nodes[i].tfe_id; //receive from tfe, nr_recv <= nr_burst <= BURST_MAX int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst); if(nr_recv <= 0){ continue; } if(g_kni_handle->keepalive_replay_switch == 1){ - for(int i = 0; i < nr_recv; i++){ - struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]); - if(ether_hdr->h_proto == htons(ETH_P_IP) || ether_hdr->h_proto == htons(ETH_P_IPV6)){ - void *raw_packet = (char*)ether_hdr + sizeof(*ether_hdr); - long cb_ret = -1; - keepalive_replay_search_cb_args cb_args; - memset(&cb_args, 0, sizeof(cb_args)); - cb_args.rx_buff = rx_buffs[i]; - cb_args.marsio_handle = marsio_handle; - cb_args.tfe_id = i; - cb_args.thread_seq = thread_seq; - //ipv4 - if(ether_hdr->h_proto == htons(ETH_P_IP)){ - struct iphdr *iphdr = (struct iphdr*)raw_packet; - uint16_t iphdr_len = iphdr->ihl * 4; - struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len); - struct stream_tuple4_v4 key; - key.saddr = iphdr->saddr; - key.daddr = iphdr->daddr; - key.source = tcphdr->source; - key.dest = tcphdr->dest; - cb_args.addr_type = ADDR_TYPE_IPV4; - cb_args.raw_packet = raw_packet; - MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), - sizeof(key), keepalive_replay_search_cb, &cb_args, &cb_ret); - } - //ipv6 - else{ - void *a_packet = (char*)ether_hdr + sizeof(*ether_hdr); - struct pkt_info pktinfo; - int ret = kni_ipv6_header_parse(a_packet, &pktinfo); - if(ret < 0){ - char *errmsg = kni_ipv6_errmsg_get((enum kni_ipv6hdr_parse_error)ret); - KNI_LOG_ERROR(logger, "Failed at parse ipv6 header, send to vxlan, errmsg is %s", errmsg); - sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); - } - else{ - struct stream_tuple4_v6 key; - memcpy(key.saddr, &(pktinfo.iphdr.v6->ip6_src), sizeof(*(key.saddr))); - memcpy(key.daddr, &(pktinfo.iphdr.v6->ip6_dst), sizeof(*(key.daddr))); - key.source = pktinfo.tcphdr->source; - key.dest = pktinfo.tcphdr->dest; - cb_args.addr_type = ADDR_TYPE_IPV6; - cb_args.raw_packet = raw_packet; - MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), - sizeof(key), keepalive_replay_search_cb, &cb_args, &cb_ret); - } - } - } - else{ - sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); - } - } + keepalive_replay_htable_search(marsio_handle, rx_buffs, nr_recv, tfe_id, thread_seq); } else{ - for(int i = 0; i < nr_recv; i++){ - sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); + for(int j = 0; j < nr_recv; j++){ + sendto_vxlan(rx_buffs[j], marsio_handle->dev_vxlan_sendpath, thread_seq); } } } @@ -1454,21 +1496,21 @@ error_out: return NULL; } -static struct kni_marsio_handle* kni_marsio_init(const char* profile){ +static struct kni_marsio_handle* kni_marsio_init(const char* profile, int tfe_node_count){ void *logger = g_kni_handle->local_logger; const char* section = "marsio"; char appsym[KNI_SYMBOL_MAX]; char dev_vxlan_symbol[KNI_SYMBOL_MAX]; char src_mac_addr_str[KNI_SYMBOL_MAX]; unsigned int opt_value = 1; - int tfe_count; + int tfe_node_enabled; struct mr_instance *mr_inst = NULL; struct mr_vdev *dev_vxlan_handler = NULL; struct mr_sendpath *dev_vxlan_sendpath = NULL; struct mr_vdev *dev_eth_handler = NULL; struct mr_sendpath *dev_eth_sendpath = NULL; - struct tfe_instance *tfe_inst = NULL; struct kni_marsio_handle *handle = NULL; + int j; int ret = MESA_load_profile_string_nodef(profile, section, "appsym", appsym, sizeof(appsym)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: appsym not set, profile is %s, section is %s", profile, section); @@ -1503,24 +1545,29 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ } marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value)); marsio_init(mr_inst, appsym); - tfe_count = g_kni_handle->tfe_count; - for(int i = 0; i < tfe_count; i++){ + j = 0; + for(int i = 0; i < tfe_node_count; i++){ //load tfe conf char _section[KNI_SYMBOL_MAX]; char mac_addr_str[KNI_SYMBOL_MAX]; char dev_eth_symbol[KNI_SYMBOL_MAX]; snprintf(_section, sizeof(_section), "tfe%d", i); + MESA_load_profile_int_def(profile, _section, "enabled", &tfe_node_enabled, 1); + if(tfe_node_enabled != 1){ + continue; + } int ret = MESA_load_profile_string_nodef(profile, _section, "mac_addr", mac_addr_str, sizeof(mac_addr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr not set, profile is %s, section is %s", profile, _section); goto error_out; } - tfe_inst = ALLOC(struct tfe_instance, 1); + struct tfe_enabled_node tfe_node; + memset(&tfe_node, 0, sizeof(tfe_node)); //ff:ee:dd:cc:bb:aa ---> 0xff 0xee 0xdd 0xcc 0xbb 0xaa ret = sscanf(mac_addr_str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", - &tfe_inst->mac_addr[0], &tfe_inst->mac_addr[1], - &tfe_inst->mac_addr[2], &tfe_inst->mac_addr[3], - &tfe_inst->mac_addr[4], &tfe_inst->mac_addr[5]); + &(tfe_node.mac_addr[0]), &(tfe_node.mac_addr[1]), + &(tfe_node.mac_addr[2]), &(tfe_node.mac_addr[3]), + &(tfe_node.mac_addr[4]), &(tfe_node.mac_addr[5])); if(ret != 6){ KNI_LOG_ERROR(logger, "MESA_prof_load: mac_addr is invalid, ret is %d, profile is %s, section is %s", ret, profile, _section); goto error_out; @@ -1530,8 +1577,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ KNI_LOG_ERROR(logger, "MESA_prof_load: dev_eth_symbol not set, profile is %s, section is %s", profile, _section); goto error_out; } - KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s", - _section, mac_addr_str, dev_eth_symbol); + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n enabled: %d, mac_addr: %s\n dev_eth_symbol: %s", + _section, tfe_node_enabled, mac_addr_str, dev_eth_symbol); //eth_handler receive thread = tfe_data_recv_thread_num, send thread = g_iThreadNum + tfe_data_recv_thread_num dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->tfe_data_recv_thread_num, g_iThreadNum + g_kni_handle->tfe_data_recv_thread_num); if(dev_eth_handler == NULL){ @@ -1544,12 +1591,14 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ KNI_LOG_ERROR(logger, "Failed at create marsio sendpath, dev_symbol is %s", dev_eth_symbol); goto error_out; } - //tfe_instance - tfe_inst->dev_eth_handler = dev_eth_handler; - tfe_inst->dev_eth_sendpath = dev_eth_sendpath; - handle->tfe_instance_list[i] = tfe_inst; + //tfe_node + tfe_node.dev_eth_handler = dev_eth_handler; + tfe_node.dev_eth_sendpath = dev_eth_sendpath; + tfe_node.tfe_id = i; + handle->tfe_enabled_nodes[j++] = tfe_node; } - //vxlan_handler: receive: 0, send: tfe_count + handle->tfe_enabled_node_count = j; + //vxlan_handler: receive: 0, send: tfe_data_recv_thread_num dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, g_kni_handle->tfe_data_recv_thread_num); if(dev_vxlan_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol); @@ -1636,15 +1685,18 @@ static struct kni_field_stat_handle * fs_init(const char *profile){ fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_add_fail"); fs_handle->fields[KNI_FIELD_ID2PME_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_succ"); fs_handle->fields[KNI_FIELD_ID2PME_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "id2pme_del_fail"); - fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv4hdr_parse_fail"); - fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ipv6hdr_parse_fail"); - fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_fail"); - fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "kaReplay_add_succ"); + fs_handle->fields[KNI_FIELD_IPV4HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "v4_parse_fail"); + fs_handle->fields[KNI_FIELD_IPV6HDR_PARSE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "v6_parse_fail"); + fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ka_add_fail"); + fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_ADD_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ka_add_succ"); + fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_DEL_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ka_del_fail"); + fs_handle->fields[KNI_FIELD_KEEPALIVE_REPLAY_DEL_SUCC] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "ka_del_succ"); fs_handle->fields[KNI_FIELD_EXCEED_MTU] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "exceed_mtu"); fs_handle->fields[KNI_FIELD_SENDTO_TFE_FAIL] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "sendto_tfe_fail"); - for(int i = 0; i < g_kni_handle->tfe_count; i++){ + for(int i = 0; i < g_kni_handle->marsio_handle->tfe_enabled_node_count; i++){ + int tfe_id = g_kni_handle->marsio_handle->tfe_enabled_nodes[i].tfe_id; char tfe_status[KNI_SYMBOL_MAX] = ""; - snprintf(tfe_status, sizeof(tfe_status), "tfe%d", i); + snprintf(tfe_status, sizeof(tfe_status), "tfe%d", tfe_id); fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + i] = FS_register(handle, FS_STYLE_FIELD, FS_CALC_CURRENT, tfe_status); } fs_handle->handle = handle; @@ -1686,7 +1738,7 @@ extern "C" int kni_init(){ const char *section = "global"; //init logger char log_path[KNI_PATH_MAX] = ""; - int tfe_count = 0; + int tfe_node_count = 0; int tfe_data_recv_thread_num = -1; char local_eth[KNI_SYMBOL_MAX] = ""; struct kni_send_logger *send_logger = NULL; @@ -1714,18 +1766,18 @@ extern "C" int kni_init(){ printf("Failed at create logger: %s", log_path); goto error_out; } - ret = MESA_load_profile_int_nodef(profile, section, "tfe_count", &tfe_count); + ret = MESA_load_profile_int_nodef(profile, section, "tfe_node_count", &tfe_node_count); if(ret < 0){ - KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section); + KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_node_count not set, profile is %s, section is %s", profile, section); goto error_out; } - if(tfe_count > TFE_COUNT_MAX){ - KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX); + if(tfe_node_count > TFE_COUNT_MAX){ + KNI_LOG_ERROR(local_logger, "tfe_node_count is %d, exceed the max_tfe_node_count %d", tfe_node_count, TFE_COUNT_MAX); goto error_out; } - if(tfe_count <= 0){ - KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count); + if(tfe_node_count <= 0){ + KNI_LOG_ERROR(local_logger, "tfe_node_count is %d, <= 0", tfe_node_count); goto error_out; } ret = MESA_load_profile_int_def(profile, section, "tfe_data_recv_thread_num", &tfe_data_recv_thread_num, 1); @@ -1735,12 +1787,11 @@ extern "C" int kni_init(){ goto error_out; } ret = MESA_load_profile_int_def(profile, section, "keepalive_replay_switch", &keepalive_replay_switch, 1); - KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n" + KNI_LOG_ERROR(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_node_count: %d\n" "tfe_data_recv_thread_num: %d\n local_eth: %s\n keepalive_replay_switch: %d", - section, log_path, log_level, tfe_count, tfe_data_recv_thread_num, local_eth, keepalive_replay_switch); + section, log_path, log_level, tfe_node_count, tfe_data_recv_thread_num, local_eth, keepalive_replay_switch); g_kni_handle = ALLOC(struct kni_handle, 1); g_kni_handle->local_logger = local_logger; - g_kni_handle->tfe_count = tfe_count; g_kni_handle->tfe_data_recv_thread_num = tfe_data_recv_thread_num; g_kni_handle->keepalive_replay_switch = keepalive_replay_switch; @@ -1753,7 +1804,7 @@ extern "C" int kni_init(){ g_kni_handle->http_project_id = id; //init marsio - g_kni_handle->marsio_handle = kni_marsio_init(profile); + g_kni_handle->marsio_handle = kni_marsio_init(profile, tfe_node_count); if(g_kni_handle->marsio_handle == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init marsio"); goto error_out; @@ -1810,7 +1861,7 @@ extern "C" int kni_init(){ } //init tfe_mgr - _tfe_mgr = tfe_mgr_init(tfe_count, profile, local_logger); + _tfe_mgr = tfe_mgr_init(tfe_node_count, profile, local_logger); if(_tfe_mgr == NULL){ KNI_LOG_ERROR(local_logger, "Failed at init tfe_mgr"); goto error_out; diff --git a/entry/src/tfe_mgr.cpp b/entry/src/tfe_mgr.cpp index 5c7f815..4755c4d 100644 --- a/entry/src/tfe_mgr.cpp +++ b/entry/src/tfe_mgr.cpp @@ -7,23 +7,35 @@ extern struct kni_field_stat_handle *g_kni_fs_handle; +struct tfe_node{ + int tfe_id; + uint32_t ipaddr; +}; + struct tfe_mgr{ pthread_rwlock_t rwlock; - int tfe_alive_node_list[TFE_COUNT_MAX]; + struct tfe_node tfe_enabled_nodes[TFE_COUNT_MAX]; + int tfe_enabled_node_count; + int tfe_alive_nodes[TFE_COUNT_MAX]; int tfe_alive_node_count; - int tfe_node_count; - int keepalive_switch; + int keepalive_switch; void *logger; }; -struct thread_tfe_keepalive_args{ +struct thread_tfe_keepalive_accept_args{ struct tfe_mgr* mgr; int sockfd; int keepalive_idle; int keepalive_intvl; int keepalive_cnt; + void *logger; +}; + +struct thread_tfe_keepalive_recv_args{ + struct tfe_mgr* mgr; + int client_fd; int tfe_id; - uint32_t tfe_ipaddr; + void *logger; }; enum tfe_mgr_errno{ @@ -49,7 +61,7 @@ static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){ pthread_rwlock_wrlock(&(mgr->rwlock)); int i, ret; for(i = 0; i < mgr->tfe_alive_node_count; i++){ - if(mgr->tfe_alive_node_list[i] == tfe_id){ + if(mgr->tfe_alive_nodes[i] == tfe_id){ break; } } @@ -58,7 +70,7 @@ static int tfe_mgr_alive_node_del(struct tfe_mgr *mgr, int tfe_id){ goto out; } for(int j = i; j < mgr->tfe_alive_node_count - 1; j++){ - mgr->tfe_alive_node_list[j] = mgr->tfe_alive_node_list[j + 1]; + mgr->tfe_alive_nodes[j] = mgr->tfe_alive_nodes[j + 1]; } mgr->tfe_alive_node_count--; ret = 0; @@ -74,7 +86,7 @@ static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){ pthread_rwlock_wrlock(&(mgr->rwlock)); int ret; for(int i = 0; i < mgr->tfe_alive_node_count; i++){ - if(mgr->tfe_alive_node_list[i] == tfe_id){ + if(mgr->tfe_alive_nodes[i] == tfe_id){ ret = TFE_MGR_HAS_EXISTED; goto out; } @@ -83,7 +95,7 @@ static int tfe_mgr_alive_node_add(struct tfe_mgr *mgr, int tfe_id){ ret = TFE_MGR_EXCEED_MAX_COUNT; goto out; } - mgr->tfe_alive_node_list[mgr->tfe_alive_node_count] = tfe_id; + mgr->tfe_alive_nodes[mgr->tfe_alive_node_count] = tfe_id; mgr->tfe_alive_node_count++; ret = 0; FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TFE_STATUS_BASE + tfe_id], 0, FS_OP_SET, 1); @@ -94,16 +106,61 @@ out: return ret; } +static void* thread_tfe_keepalive_recv(void *args){ + struct thread_tfe_keepalive_recv_args *_args = (struct thread_tfe_keepalive_recv_args*)args; + struct tfe_mgr *mgr = _args->mgr; + int client_fd = _args->client_fd; + int tfe_id = _args->tfe_id; + void *logger = _args->logger; + FREE(&args); + char buff[BUFF_SIZE_MAX]; + char *errmsg = NULL; + int ret; + while(true){ + ret = recv(client_fd, buff, sizeof(buff), 0); + if(ret == 0){ + KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id); + break; + } + if(ret <= 0){ + if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){ + KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id); + continue; + } + if(errno == ETIMEDOUT){ + KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id); + break; + } + KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id); + goto error_out; + } + } + //recv fin: del alive node + close(client_fd); + ret = tfe_mgr_alive_node_del(mgr, tfe_id); + if(ret < 0){ + errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); + KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); + } + else{ + KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id); + } + return NULL; -static void* thread_tfe_keepalive(void *args){ - struct thread_tfe_keepalive_args *_args = (struct thread_tfe_keepalive_args*)args; +error_out: + if(client_fd > 0){ + close(client_fd); + } + return NULL; +} + +static void* thread_tfe_keepalive_accept(void *args){ + struct thread_tfe_keepalive_accept_args *_args = (struct thread_tfe_keepalive_accept_args*)args; struct tfe_mgr *mgr = _args->mgr; int sockfd = _args->sockfd; int keepalive_idle = _args->keepalive_idle; int keepalive_intvl = _args->keepalive_intvl; int keepalive_cnt = _args->keepalive_cnt; - int tfe_id = _args->tfe_id; - uint32_t tfe_ipaddr = _args->tfe_ipaddr; void *logger = mgr->logger; FREE(&args); //accept @@ -112,7 +169,8 @@ static void* thread_tfe_keepalive(void *args){ uint32_t client_ipaddr; char client_ipaddr_str[INET_ADDRSTRLEN] = ""; int flags, ret, client_fd; - char buff[BUFF_SIZE_MAX]; + pthread_t thread_id = -1; + int tfe_id = -1; char *errmsg = NULL; while(true){ client_fd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addr_len); @@ -122,8 +180,14 @@ static void* thread_tfe_keepalive(void *args){ } client_ipaddr = client_addr.sin_addr.s_addr; inet_ntop(AF_INET, &client_ipaddr, client_ipaddr_str, INET_ADDRSTRLEN); - if(client_ipaddr != tfe_ipaddr){ - KNI_LOG_ERROR(logger, "Receive connection not from tfe%d, client addr is %s", tfe_id, client_ipaddr); + for(int i = 0; i < mgr->tfe_enabled_node_count; i++){ + if(client_ipaddr == mgr->tfe_enabled_nodes[i].ipaddr){ + tfe_id = i; + break; + } + } + if(tfe_id == -1){ + KNI_LOG_ERROR(logger, "Receive connection not from tfe, client addr is %s", client_ipaddr_str); close(client_fd); continue; } @@ -132,69 +196,54 @@ static void* thread_tfe_keepalive(void *args){ ret = setsockopt(client_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt SO_KEEPALIVE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + close(client_fd); + continue; } ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPIDLE, (void *)&keepalive_idle, sizeof(keepalive_idle)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPIDLE, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + close(client_fd); + continue; } ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPINTVL, (void *)&keepalive_intvl, sizeof(keepalive_intvl)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPINTVL, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + close(client_fd); + continue; } ret = setsockopt(client_fd, SOL_TCP, TCP_KEEPCNT, (void *)&keepalive_cnt, sizeof(keepalive_cnt)); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at set socketopt TCP_KEEPCNT, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + close(client_fd); + continue; } - //succeed accpt: add alive node + //add alive node ret = tfe_mgr_alive_node_add(mgr, tfe_id); if(ret < 0){ errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); KNI_LOG_ERROR(logger, "Failed at add tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); + close(client_fd); + continue; } else{ KNI_LOG_ERROR(logger, "Succeed at add tfe alive node, tfe_id is %d", tfe_id); } - while(true){ - ret = recv(client_fd, buff, sizeof(buff), 0); - if(ret == 0){ - KNI_LOG_ERROR(logger, "recv fin, del tfe alive node, tfe_id is %d", tfe_id); - break; - } - if(ret <= 0){ - if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){ - KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, continue recv, tfe_id is %d", errno, strerror(errno), tfe_id); - continue; - } - if(errno == ETIMEDOUT){ - KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, del tfe alive node, tfe_id is %d", errno, strerror(errno), tfe_id); - break; - } - KNI_LOG_ERROR(logger, "recv error, errno is %d, errmsg is %s, error_out, tfe_id is %d", errno, strerror(errno), tfe_id); - goto error_out; - } - } - //recv fin: del alive node - close(client_fd); - ret = tfe_mgr_alive_node_del(mgr, tfe_id); - if(ret < 0){ - errmsg = tfe_mgr_errmsg_get((enum tfe_mgr_errno)ret); - KNI_LOG_ERROR(logger, "Failed at del tfe alive node, tfe_id is %d, errmsg is %s", tfe_id, errmsg); - } - else{ - KNI_LOG_ERROR(logger, "Succeed at del tfe alive node, tfe_id is %d", tfe_id); - } + //create thread_tfe_keepalive_recv + struct thread_tfe_keepalive_recv_args *recv_args = ALLOC(struct thread_tfe_keepalive_recv_args, 1); + recv_args->mgr = mgr; + recv_args->client_fd = client_fd; + recv_args->tfe_id = tfe_id; + recv_args->logger = logger; + ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive_recv, (void *)recv_args); + if(unlikely(ret != 0)){ + KNI_LOG_ERROR(logger, "Failed at thread_tfe_keepalive_recv, thread_func is thread_tfe_keepalive_recv, errno is %d, errmsg is %s", errno, strerror(errno)); + FREE(&recv_args); + close(client_fd); + tfe_mgr_alive_node_del(mgr, tfe_id); + continue; + } } return NULL; - -error_out: - if(sockfd > 0){ - close(sockfd); - } - if(client_fd > 0){ - close(client_fd); - } - KNI_LOG_ERROR(logger, "thread_tfe_keepalive exited, tfe_id is %d", tfe_id); - return NULL; } void tfe_mgr_destroy(struct tfe_mgr* mgr){ @@ -204,20 +253,20 @@ void tfe_mgr_destroy(struct tfe_mgr* mgr){ } } -static int get_binded_sockfd(int tfe_id, uint32_t listen_ip, uint16_t listen_port, void *logger){ +static int get_binded_sockfd(uint32_t listen_ip, uint16_t listen_port, void *logger){ //create socket struct sockaddr_in server_addr; int ret; int sockfd = socket(AF_INET, SOCK_STREAM, 0); int flag; if(sockfd < 0){ - KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + KNI_LOG_ERROR(logger, "Failed at create tcp socket, errno is %d, errmsg is %s", errno, strerror(errno)); goto error_out; } flag = 1; ret = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flag, sizeof(flag)); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s, tfe_id is %d", errno, strerror(errno), tfe_id); + KNI_LOG_ERROR(logger, "Failed at set socketopt SO_REUSEADDR, errno is %d, errmsg is %s", errno, strerror(errno)); } //bind memset(&server_addr, 0, sizeof(server_addr)); @@ -226,15 +275,15 @@ static int get_binded_sockfd(int tfe_id, uint32_t listen_ip, uint16_t listen_por server_addr.sin_port = htons(listen_port); ret = bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s, tfe_id is %d", - listen_port, errno, strerror(errno), tfe_id); + KNI_LOG_ERROR(logger, "Failed at bind tcp socket, port is %d, errno is %d, errmsg is %s", + listen_port, errno, strerror(errno)); goto error_out; } //listen ret = listen(sockfd, 5); if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, tfe_id is %d, listen_port is %d", - errno, strerror(errno), tfe_id, listen_port); + KNI_LOG_ERROR(logger, "Failed at listen tcp socket, errno is %d, errmsg is %s, listen_port is %d", + errno, strerror(errno), listen_port); goto error_out; } return sockfd; @@ -249,7 +298,6 @@ error_out: struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logger){ struct tfe_mgr* mgr = ALLOC(struct tfe_mgr, 1); mgr->logger = logger; - mgr->tfe_node_count = tfe_node_count; int ret; //load keepalive conf char section[KNI_SYMBOL_MAX] = "tfe_mgr"; @@ -262,9 +310,13 @@ struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logg char keepalive_listen_eth[KNI_SYMBOL_MAX] = ""; uint32_t keepalive_listen_ip; int keepalive_listen_port; - uint32_t tfe_ipaddr; + uint32_t tfe_node_ipaddr; char tfe_ipaddr_str[INET_ADDRSTRLEN]; pthread_t thread_id = -1; + int sockfd; + int tfe_node_enabled; + int j; + struct thread_tfe_keepalive_accept_args *args = NULL; MESA_load_profile_int_def(profile, section, "keepalive_idle", &keepalive_idle, 2); MESA_load_profile_int_def(profile, section, "keepalive_intvl", &keepalive_intvl, 1); MESA_load_profile_int_def(profile, section, "keepalive_cnt", &keepalive_cnt, 3); @@ -273,58 +325,68 @@ struct tfe_mgr* tfe_mgr_init(int tfe_node_count, const char* profile, void *logg KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_eth not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s", - section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth); + ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section); + goto error_out; + } + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_idle: %d\n keepalive_intvl: %d\n keepalive_cnt: %d\n keepalive_listen_eth: %s\n keepalive_listen_port: %d", + section, keepalive_idle, keepalive_intvl, keepalive_cnt, keepalive_listen_eth, keepalive_listen_port); ret = kni_ipv4_addr_get_by_eth(keepalive_listen_eth, &keepalive_listen_ip); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at get bind ipv4 addr, eth is %s", keepalive_listen_eth); goto error_out; } - //rw_lock - ret = pthread_rwlock_init(&(mgr->rwlock), NULL); - if(ret < 0){ - KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno)); - goto error_out; - } - //create tfe_keepalive thread + //load tfe_ipaddr + j = 0; for(int i = 0; i < tfe_node_count; i++){ snprintf(section, sizeof(section), "tfe%d", i); - ret = MESA_load_profile_int_nodef(profile, section, "keepalive_listen_port", &keepalive_listen_port); - if(ret < 0){ - KNI_LOG_ERROR(logger, "MESA_prof_load: keepalive_listen_port not set, profile is %s, section is %s", profile, section); - goto error_out; - } + MESA_load_profile_int_def(profile, section, "enabled", &tfe_node_enabled, 1); + if(tfe_node_enabled != 1){ + continue; + } ret = MESA_load_profile_string_nodef(profile, section, "ip_addr", tfe_ipaddr_str, sizeof(tfe_ipaddr_str)); if(ret < 0){ KNI_LOG_ERROR(logger, "MESA_prof_load: ip_addr not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n keepalive_listen_port: %d\n ip_addr: %s", section, keepalive_listen_port, tfe_ipaddr_str); - ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_ipaddr); + KNI_LOG_ERROR(logger, "MESA_prof_load, [%s]:\n ip_addr: %s", section, tfe_ipaddr_str); + ret = inet_pton(AF_INET, tfe_ipaddr_str, &tfe_node_ipaddr); if(ret != 1){ KNI_LOG_ERROR(logger, "Failed at inet_pton, ret is %d, errno is %d, errmsg is %s, tfe_id is %d, ip_addr is %s", ret, errno, strerror(errno), i, tfe_ipaddr_str); goto error_out; } - int sockfd = get_binded_sockfd(i, keepalive_listen_ip, keepalive_listen_port, logger); - if(sockfd < 0){ - KNI_LOG_ERROR(logger, "Failed at get binded sockfd, tfe_id is %d", i); - goto error_out; - } - struct thread_tfe_keepalive_args *args = ALLOC(struct thread_tfe_keepalive_args, 1); - args->mgr = mgr; - args->keepalive_idle = keepalive_idle; - args->sockfd = sockfd; - args->keepalive_intvl = keepalive_intvl; - args->keepalive_cnt = keepalive_cnt; - args->tfe_id = i; - args->tfe_ipaddr = tfe_ipaddr; - ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive, (void *)args); - if(unlikely(ret != 0)){ - KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive, errno is %d, errmsg is %s", errno, strerror(errno)); - FREE(&args); - goto error_out; - } + mgr->tfe_enabled_nodes[j].tfe_id = i; + mgr->tfe_enabled_nodes[j].ipaddr = tfe_node_ipaddr; + j++; + } + mgr->tfe_enabled_node_count = j; + //init rw_lock + ret = pthread_rwlock_init(&(mgr->rwlock), NULL); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at pthread_rwlock_init, errno is %d, errmsg is %s", errno, strerror(errno)); + goto error_out; + } + //bind and listen + sockfd = get_binded_sockfd(keepalive_listen_ip, keepalive_listen_port, logger); + if(sockfd < 0){ + KNI_LOG_ERROR(logger, "Failed at get binded sockfd"); + goto error_out; + } + //create tfe_keepalive_accept thread + args = ALLOC(struct thread_tfe_keepalive_accept_args, 1); + args->mgr = mgr; + args->keepalive_idle = keepalive_idle; + args->sockfd = sockfd; + args->keepalive_intvl = keepalive_intvl; + args->keepalive_cnt = keepalive_cnt; + args->logger = logger; + ret = pthread_create(&thread_id, NULL, thread_tfe_keepalive_accept, (void *)args); + if(unlikely(ret != 0)){ + KNI_LOG_ERROR(logger, "Failed at pthread_create, thread_func is thread_tfe_keepalive_accept, errno is %d, errmsg is %s", errno, strerror(errno)); + FREE(&args); + goto error_out; } return mgr; @@ -334,13 +396,18 @@ error_out: } int tfe_mgr_alive_node_get(struct tfe_mgr *mgr, int thread_seq){ + int tfe_id = -1; if(mgr->keepalive_switch == 0){ - return mgr->tfe_node_count > 0 ? thread_seq % mgr->tfe_node_count : -1; + if(mgr->tfe_enabled_node_count > 0){ + int i = thread_seq % mgr->tfe_enabled_node_count; + tfe_id = mgr->tfe_enabled_nodes[i].tfe_id; + } + return tfe_id; } pthread_rwlock_rdlock(&(mgr->rwlock)); - int tfe_id = -1; if(mgr->tfe_alive_node_count > 0){ - tfe_id = thread_seq % mgr->tfe_alive_node_count; + int i = thread_seq % mgr->tfe_alive_node_count; + tfe_id = mgr->tfe_alive_nodes[i]; } pthread_rwlock_unlock(&(mgr->rwlock)); return tfe_id;