修改和tfe数据面通信的线程模型,使其可以水平拓展
This commit is contained in:
@@ -3,6 +3,7 @@ log_path = ./log/kni/kni.log
|
|||||||
log_level = 10
|
log_level = 10
|
||||||
tfe_count = 1
|
tfe_count = 1
|
||||||
local_eth = enp8s0
|
local_eth = enp8s0
|
||||||
|
tfe_data_recv_thread_num = 8
|
||||||
keepalive_replay_switch = 1
|
keepalive_replay_switch = 1
|
||||||
|
|
||||||
[maat]
|
[maat]
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ struct protocol_identify_result{
|
|||||||
struct thread_tfe_data_receiver_args{
|
struct thread_tfe_data_receiver_args{
|
||||||
void *logger;
|
void *logger;
|
||||||
struct kni_marsio_handle *marsio_handle;
|
struct kni_marsio_handle *marsio_handle;
|
||||||
int tfe_id;
|
int thread_seq;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct thread_tfe_cmsg_receiver_args{
|
struct thread_tfe_cmsg_receiver_args{
|
||||||
@@ -148,6 +148,7 @@ struct kni_handle{
|
|||||||
MESA_htable_handle traceid2pme_htable;
|
MESA_htable_handle traceid2pme_htable;
|
||||||
MESA_htable_handle keepalive_replay_htable;
|
MESA_htable_handle keepalive_replay_htable;
|
||||||
int tfe_count;
|
int tfe_count;
|
||||||
|
int tfe_data_recv_thread_num;
|
||||||
uint32_t local_ipv4;
|
uint32_t local_ipv4;
|
||||||
int keepalive_replay_switch;
|
int keepalive_replay_switch;
|
||||||
void *local_logger;
|
void *local_logger;
|
||||||
@@ -168,6 +169,7 @@ struct keepalive_replay_search_cb_args{
|
|||||||
struct kni_marsio_handle *marsio_handle;
|
struct kni_marsio_handle *marsio_handle;
|
||||||
struct iphdr *raw_packet_iphdr;
|
struct iphdr *raw_packet_iphdr;
|
||||||
int tfe_id;
|
int tfe_id;
|
||||||
|
int thread_seq;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void pme_info_destroy(void *data){
|
static void pme_info_destroy(void *data){
|
||||||
@@ -1006,13 +1008,14 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size,
|
|||||||
struct kni_marsio_handle *marsio_handle = args->marsio_handle;
|
struct kni_marsio_handle *marsio_handle = args->marsio_handle;
|
||||||
marsio_buff_t *rx_buff = args->rx_buff;
|
marsio_buff_t *rx_buff = args->rx_buff;
|
||||||
int tfe_id = args->tfe_id;
|
int tfe_id = args->tfe_id;
|
||||||
|
int thread_seq = args->thread_seq;
|
||||||
if(data == NULL){
|
if(data == NULL){
|
||||||
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id);
|
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
struct keepalive_replay_htable_value *value = (struct keepalive_replay_htable_value*)data;
|
struct keepalive_replay_htable_value *value = (struct keepalive_replay_htable_value*)data;
|
||||||
if(value->has_replayed == 1){
|
if(value->has_replayed == 1){
|
||||||
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id);
|
sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, thread_seq);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
//raw_packet: window update
|
//raw_packet: window update
|
||||||
@@ -1037,7 +1040,7 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size,
|
|||||||
replay_packet_tcphdr->check = kni_tcp_checksum((void*)replay_packet_tcphdr, tot_len - iphdr_len,
|
replay_packet_tcphdr->check = kni_tcp_checksum((void*)replay_packet_tcphdr, tot_len - iphdr_len,
|
||||||
replay_packet_iphdr->saddr, replay_packet_iphdr->daddr);
|
replay_packet_iphdr->saddr, replay_packet_iphdr->daddr);
|
||||||
//send to tfe: thread_seq = g_iThreadNum
|
//send to tfe: thread_seq = g_iThreadNum
|
||||||
int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum, tfe_id);
|
int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum + thread_seq, tfe_id);
|
||||||
if(ret < 0){
|
if(ret < 0){
|
||||||
KNI_LOG_ERROR(logger, "Failed at send keepalive replay packet to tfe");
|
KNI_LOG_ERROR(logger, "Failed at send keepalive replay packet to tfe");
|
||||||
}
|
}
|
||||||
@@ -1049,14 +1052,14 @@ static long keepalive_replay_search_cb(void *data, const uchar *key, uint size,
|
|||||||
void* thread_tfe_data_receiver(void *args){
|
void* thread_tfe_data_receiver(void *args){
|
||||||
struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args;
|
struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args;
|
||||||
struct kni_marsio_handle *marsio_handle = _args->marsio_handle;
|
struct kni_marsio_handle *marsio_handle = _args->marsio_handle;
|
||||||
int tfe_id = _args->tfe_id;
|
int thread_seq = _args->thread_seq;
|
||||||
struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[tfe_id]->dev_eth_handler;
|
|
||||||
FREE(&args);
|
FREE(&args);
|
||||||
|
while(true){
|
||||||
|
//polling tfe
|
||||||
|
for(int i = 0; i < g_kni_handle->tfe_count; i++){
|
||||||
marsio_buff_t *rx_buffs[BURST_MAX];
|
marsio_buff_t *rx_buffs[BURST_MAX];
|
||||||
int nr_burst = 1;
|
int nr_burst = 1;
|
||||||
//eth_handler: recv: 1
|
struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[i]->dev_eth_handler;
|
||||||
int thread_seq = 0;
|
|
||||||
while(true){
|
|
||||||
//receive from tfe, nr_recv <= nr_burst <= BURST_MAX
|
//receive from tfe, nr_recv <= nr_burst <= BURST_MAX
|
||||||
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst);
|
int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst);
|
||||||
if(nr_recv <= 0){
|
if(nr_recv <= 0){
|
||||||
@@ -1081,18 +1084,20 @@ void* thread_tfe_data_receiver(void *args){
|
|||||||
cb_args.rx_buff = rx_buffs[i];
|
cb_args.rx_buff = rx_buffs[i];
|
||||||
cb_args.marsio_handle = marsio_handle;
|
cb_args.marsio_handle = marsio_handle;
|
||||||
cb_args.raw_packet_iphdr = iphdr;
|
cb_args.raw_packet_iphdr = iphdr;
|
||||||
cb_args.tfe_id = tfe_id;
|
cb_args.tfe_id = i;
|
||||||
|
cb_args.thread_seq = thread_seq;
|
||||||
MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key),
|
MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key),
|
||||||
key_size, keepalive_replay_search_cb, &cb_args, &cb_ret);
|
key_size, keepalive_replay_search_cb, &cb_args, &cb_ret);
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id);
|
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
for(int i = 0; i < nr_recv; i++){
|
for(int i = 0; i < nr_recv; i++){
|
||||||
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id);
|
sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, thread_seq);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1336,8 +1341,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
|
|||||||
}
|
}
|
||||||
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s",
|
KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s",
|
||||||
_section, mac_addr_str, dev_eth_symbol);
|
_section, mac_addr_str, dev_eth_symbol);
|
||||||
//eth_handler receive thread = 1, send thread = g_iThreadNum + 1
|
//eth_handler receive thread = tfe_data_recv_thread_num, send thread = g_iThreadNum + tfe_data_recv_thread_num
|
||||||
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum + 1);
|
dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, g_kni_handle->tfe_data_recv_thread_num, g_iThreadNum + g_kni_handle->tfe_data_recv_thread_num);
|
||||||
if(dev_eth_handler == NULL){
|
if(dev_eth_handler == NULL){
|
||||||
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol);
|
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
@@ -1354,7 +1359,7 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){
|
|||||||
handle->tfe_instance_list[i] = tfe_inst;
|
handle->tfe_instance_list[i] = tfe_inst;
|
||||||
}
|
}
|
||||||
//vxlan_handler: receive: 0, send: tfe_count
|
//vxlan_handler: receive: 0, send: tfe_count
|
||||||
dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, tfe_count);
|
dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, g_kni_handle->tfe_data_recv_thread_num);
|
||||||
if(dev_vxlan_handler == NULL){
|
if(dev_vxlan_handler == NULL){
|
||||||
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol);
|
KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
@@ -1479,6 +1484,7 @@ extern "C" int kni_init(){
|
|||||||
//init logger
|
//init logger
|
||||||
char log_path[KNI_PATH_MAX] = "";
|
char log_path[KNI_PATH_MAX] = "";
|
||||||
int tfe_count = 0;
|
int tfe_count = 0;
|
||||||
|
int tfe_data_recv_thread_num = -1;
|
||||||
char local_eth[KNI_SYMBOL_MAX] = "";
|
char local_eth[KNI_SYMBOL_MAX] = "";
|
||||||
struct kni_send_logger *send_logger = NULL;
|
struct kni_send_logger *send_logger = NULL;
|
||||||
struct kni_field_stat_handle *fs_handle = NULL;
|
struct kni_field_stat_handle *fs_handle = NULL;
|
||||||
@@ -1509,6 +1515,7 @@ extern "C" int kni_init(){
|
|||||||
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section);
|
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_count not set, profile is %s, section is %s", profile, section);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(tfe_count > TFE_COUNT_MAX){
|
if(tfe_count > TFE_COUNT_MAX){
|
||||||
KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX);
|
KNI_LOG_ERROR(local_logger, "tfe_count is %d, exceed the max_tfe_count %d", tfe_count, TFE_COUNT_MAX);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
@@ -1517,6 +1524,11 @@ extern "C" int kni_init(){
|
|||||||
KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count);
|
KNI_LOG_ERROR(local_logger, "tfe_count is %d, <= 0", tfe_count);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
|
ret = MESA_load_profile_int_nodef(profile, section, "tfe_data_recv_thread_num", &tfe_data_recv_thread_num);
|
||||||
|
if(ret < 0){
|
||||||
|
KNI_LOG_ERROR(local_logger, "MESA_prof_load: tfe_data_recv_thread_num not set, profile is %s, section is %s", profile, section);
|
||||||
|
goto error_out;
|
||||||
|
}
|
||||||
ret = MESA_load_profile_string_nodef(profile, section, "local_eth", local_eth, sizeof(local_eth));
|
ret = MESA_load_profile_string_nodef(profile, section, "local_eth", local_eth, sizeof(local_eth));
|
||||||
if(ret < 0){
|
if(ret < 0){
|
||||||
printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section);
|
printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section);
|
||||||
@@ -1527,11 +1539,13 @@ extern "C" int kni_init(){
|
|||||||
printf("MESA_prof_load: keepalive_replay_switch not set, profile is %s, section is %s", profile, section);
|
printf("MESA_prof_load: keepalive_replay_switch not set, profile is %s, section is %s", profile, section);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s\n keepalive_replay_switch: %d",
|
KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n"
|
||||||
section, log_path, log_level, tfe_count, local_eth, keepalive_replay_switch);
|
"tfe_data_recv_thread_num: %d\n local_eth: %s\n keepalive_replay_switch: %d",
|
||||||
|
section, log_path, log_level, tfe_count, tfe_data_recv_thread_num, local_eth, keepalive_replay_switch);
|
||||||
g_kni_handle = ALLOC(struct kni_handle, 1);
|
g_kni_handle = ALLOC(struct kni_handle, 1);
|
||||||
g_kni_handle->local_logger = local_logger;
|
g_kni_handle->local_logger = local_logger;
|
||||||
g_kni_handle->tfe_count = tfe_count;
|
g_kni_handle->tfe_count = tfe_count;
|
||||||
|
g_kni_handle->tfe_data_recv_thread_num = tfe_data_recv_thread_num;
|
||||||
g_kni_handle->keepalive_replay_switch = keepalive_replay_switch;
|
g_kni_handle->keepalive_replay_switch = keepalive_replay_switch;
|
||||||
|
|
||||||
//init http_project
|
//init http_project
|
||||||
@@ -1600,11 +1614,11 @@ extern "C" int kni_init(){
|
|||||||
}
|
}
|
||||||
|
|
||||||
//create thread_tfe_data_receiver
|
//create thread_tfe_data_receiver
|
||||||
for(int i = 0; i < tfe_count; i++){
|
for(int i = 0; i < g_kni_handle->tfe_data_recv_thread_num; i++){
|
||||||
struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1);
|
struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1);
|
||||||
args->logger = local_logger;
|
args->logger = local_logger;
|
||||||
args->marsio_handle = g_kni_handle->marsio_handle;
|
args->marsio_handle = g_kni_handle->marsio_handle;
|
||||||
args->tfe_id = i;
|
args->thread_seq = i;
|
||||||
int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args);
|
int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args);
|
||||||
if(unlikely(ret != 0)){
|
if(unlikely(ret != 0)){
|
||||||
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);
|
KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret);
|
||||||
|
|||||||
Reference in New Issue
Block a user