diff --git a/conf/kni.conf b/conf/kni.conf index 46e4a9d..6e14f04 100644 --- a/conf/kni.conf +++ b/conf/kni.conf @@ -3,6 +3,7 @@ log_path = ./log/kni/kni.log log_level = 10 tfe_count = 1 local_eth = enp8s0 +tfe_data_recv_thread_num = 8 keepalive_replay_switch = 1 [maat] diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index 4aaa60a..41ea581 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -122,7 +122,7 @@ struct protocol_identify_result{ struct thread_tfe_data_receiver_args{ void *logger; struct kni_marsio_handle *marsio_handle; - int tfe_id; + int thread_seq; }; struct thread_tfe_cmsg_receiver_args{ @@ -148,6 +148,7 @@ struct kni_handle{ MESA_htable_handle traceid2pme_htable; MESA_htable_handle keepalive_replay_htable; int tfe_count; + int tfe_data_recv_thread_num; uint32_t local_ipv4; int keepalive_replay_switch; void *local_logger; @@ -168,6 +169,7 @@ struct keepalive_replay_search_cb_args{ struct kni_marsio_handle *marsio_handle; struct iphdr *raw_packet_iphdr; int tfe_id; + int thread_seq; }; static void pme_info_destroy(void *data){ @@ -1006,13 +1008,14 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, struct kni_marsio_handle *marsio_handle = args->marsio_handle; marsio_buff_t *rx_buff = args->rx_buff; int tfe_id = args->tfe_id; + int thread_seq = args->thread_seq; if(data == NULL){ - sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id); + sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq); return 0; } struct keepalive_replay_htable_value *value = (struct keepalive_replay_htable_value*)data; if(value->has_replayed == 1){ - sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id); + sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq); return 0; } //raw_packet: window update @@ -1037,7 +1040,7 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, replay_packet_tcphdr->check = kni_tcp_checksum((void*)replay_packet_tcphdr, tot_len - iphdr_len, replay_packet_iphdr->saddr, replay_packet_iphdr->daddr); //send to tfe: thread_seq = g_iThreadNum - int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum, tfe_id); + int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum + thread_seq, tfe_id); if(ret < 0){ KNI_LOG_ERROR(logger, "Failed at send keepalive replay packet to tfe"); } @@ -1049,50 +1052,52 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, void* thread_tfe_data_receiver(void *args){ struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args; struct kni_marsio_handle *marsio_handle = _args->marsio_handle; - int tfe_id = _args->tfe_id; - struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[tfe_id]->dev_eth_handler; + int thread_seq = _args->thread_seq; FREE(&args); - marsio_buff_t *rx_buffs[BURST_MAX]; - int nr_burst = 1; - //eth_handler: recv: 1 - int thread_seq = 0; while(true){ - //receive from tfe, nr_recv <= nr_burst <= BURST_MAX - int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst); - if(nr_recv <= 0){ - continue; - } - if(g_kni_handle->keepalive_replay_switch == 1){ - for(int i = 0; i < nr_recv; i++){ - struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]); - if(ether_hdr->h_proto == htons(ETH_P_IP)){ - struct iphdr *iphdr = (struct iphdr*)((char*)ether_hdr + sizeof(*ether_hdr)); - int iphdr_len = iphdr->ihl * 4; - struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len); - struct stream_tuple4_v4 key; - key.saddr = iphdr->saddr; - key.daddr = iphdr->daddr; - key.source = tcphdr->source; - key.dest = tcphdr->dest; - int key_size = sizeof(key); - long cb_ret = -1; - keepalive_replay_search_cb_args cb_args; - memset(&cb_args, 0, sizeof(cb_args)); - cb_args.rx_buff = rx_buffs[i]; - cb_args.marsio_handle = marsio_handle; - cb_args.raw_packet_iphdr = iphdr; - cb_args.tfe_id = tfe_id; - MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), - key_size, keepalive_replay_search_cb, &cb_args, &cb_ret); - } - else{ - sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id); + //polling tfe + for(int i = 0; i < g_kni_handle->tfe_count; i++){ + marsio_buff_t *rx_buffs[BURST_MAX]; + int nr_burst = 1; + struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[i]->dev_eth_handler; + //receive from tfe, nr_recv <= nr_burst <= BURST_MAX + int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst); + if(nr_recv <= 0){ + continue; + } + if(g_kni_handle->keepalive_replay_switch == 1){ + for(int i = 0; i < nr_recv; i++){ + struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]); + if(ether_hdr->h_proto == htons(ETH_P_IP)){ + struct iphdr *iphdr = (struct iphdr*)((char*)ether_hdr + sizeof(*ether_hdr)); + int iphdr_len = iphdr->ihl * 4; + struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len); + struct stream_tuple4_v4 key; + key.saddr = iphdr->saddr; + key.daddr = iphdr->daddr; + key.source = tcphdr->source; + key.dest = tcphdr->dest; + int key_size = sizeof(key); + long cb_ret = -1; + keepalive_replay_search_cb_args cb_args; + memset(&cb_args, 0, sizeof(cb_args)); + cb_args.rx_buff = rx_buffs[i]; + cb_args.marsio_handle = marsio_handle; + cb_args.raw_packet_iphdr = iphdr; + cb_args.tfe_id = i; + cb_args.thread_seq = thread_seq; + MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), + key_size, keepalive_replay_search_cb, &cb_args, &cb_ret); + } + else{ + sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); + } } } - } - else{ - for(int i = 0; i < nr_recv; i++){ - sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id); + else{ + for(int i = 0; i < nr_recv; i++){ + sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq); + } } } } @@ -1336,8 +1341,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ } KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s", _section, mac_addr_str, dev_eth_symbol); - //eth_handler receive thread = 1, send thread = g_iThreadNum + 1 - dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum + 1); + //eth_handler receive thread = tfe_data_recv_thread_num, send thread = g_iThreadNum + tfe_data_recv_thread_num + dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->tfe_data_recv_thread_num, g_iThreadNum + g_kni_handle->tfe_data_recv_thread_num); if(dev_eth_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol); goto error_out; @@ -1354,7 +1359,7 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ handle->tfe_instance_list[i] = tfe_inst; } //vxlan_handler: receive: 0, send: tfe_count - dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, tfe_count); + dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, g_kni_handle->tfe_data_recv_thread_num); if(dev_vxlan_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol); goto error_out; @@ -1479,6 +1484,7 @@ extern "C" int kni_init(){ //init logger char log_path[KNI_PATH_MAX] = ""; int tfe_count = 0; + int tfe_data_recv_thread_num = -1; char local_eth[KNI_SYMBOL_MAX] = ""; struct kni_send_logger *send_logger = NULL; struct kni_field_stat_handle *fs_handle = NULL; @@ -1509,6 +1515,7 @@ extern "C" int kni_init(){ KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section); goto error_out; } + if(tfe_count > TFE_COUNT_MAX){ KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX); goto error_out; @@ -1517,6 +1524,11 @@ extern "C" int kni_init(){ KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count); goto error_out; } + ret = MESA_load_profile_int_nodef(profile, section, "tfe_data_recv_thread_num", &tfe_data_recv_thread_num); + if(ret < 0){ + KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_data_recv_thread_num not set, profile is %s, section is %s", profile, section); + goto error_out; + } ret = MESA_load_profile_string_nodef(profile, section, "local_eth", local_eth, sizeof(local_eth)); if(ret < 0){ printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section); @@ -1527,11 +1539,13 @@ extern "C" int kni_init(){ printf("MESA_prof_load: keepalive_replay_switch not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s\n keepalive_replay_switch: %d", - section, log_path, log_level, tfe_count, local_eth, keepalive_replay_switch); + KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n" + "tfe_data_recv_thread_num: %d\n local_eth: %s\n keepalive_replay_switch: %d", + section, log_path, log_level, tfe_count, tfe_data_recv_thread_num, local_eth, keepalive_replay_switch); g_kni_handle = ALLOC(struct kni_handle, 1); g_kni_handle->local_logger = local_logger; g_kni_handle->tfe_count = tfe_count; + g_kni_handle->tfe_data_recv_thread_num = tfe_data_recv_thread_num; g_kni_handle->keepalive_replay_switch = keepalive_replay_switch; //init http_project @@ -1600,11 +1614,11 @@ extern "C" int kni_init(){ } //create thread_tfe_data_receiver - for(int i = 0; i < tfe_count; i++){ + for(int i = 0; i < g_kni_handle->tfe_data_recv_thread_num; i++){ struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1); args->logger = local_logger; args->marsio_handle = g_kni_handle->marsio_handle; - args->tfe_id = i; + args->thread_seq = i; int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args); if(unlikely(ret != 0)){ KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);