diff --git a/common/include/kni_utils.h b/common/include/kni_utils.h index e319fc1..0cc83ed 100644 --- a/common/include/kni_utils.h +++ b/common/include/kni_utils.h @@ -99,7 +99,7 @@ struct kni_field_stat_handle{ int fields[KNI_FIELD_MAX]; }; -int kni_stream_addr_trans(struct ipaddr* addr, char *output, int len); +int kni_stream_addr_trans(const struct layer_addr *addr, char *output, int len); uint16_t kni_ip_checksum(const void *buf, size_t hdr_len); uint16_t kni_tcp_checksum(const void *_buf, size_t len, in_addr_t src_addr, in_addr_t dest_addr); uint16_t kni_udp_checksum(const void *_buf, size_t len, in_addr_t src_addr, in_addr_t dest_addr); diff --git a/common/src/kni_utils.cpp b/common/src/kni_utils.cpp index aa6f111..365a7b2 100644 --- a/common/src/kni_utils.cpp +++ b/common/src/kni_utils.cpp @@ -2,12 +2,12 @@ #include #include -int kni_stream_addr_trans(struct ipaddr* addr, char *output, int len){ +int kni_stream_addr_trans(const struct layer_addr *addr, char *output, int len){ char saddr[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &(addr->v4->saddr), saddr, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(addr->tuple4_v4->saddr), saddr, INET_ADDRSTRLEN); char daddr[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &(addr->v4->daddr), daddr, INET_ADDRSTRLEN); - snprintf(output, len, "%s:%d -> %s:%d", saddr, ntohs(addr->v4->source), daddr, ntohs(addr->v4->dest)); + inet_ntop(AF_INET, &(addr->tuple4_v4->daddr), daddr, INET_ADDRSTRLEN); + snprintf(output, len, "%s:%d -> %s:%d", saddr, ntohs(addr->tuple4_v4->source), daddr, ntohs(addr->tuple4_v4->dest)); return 0; } diff --git a/conf/kni.conf b/conf/kni.conf index 670dffb..2fa63b0 100644 --- a/conf/kni.conf +++ b/conf/kni.conf @@ -3,6 +3,7 @@ log_path = ./log/kni/kni.log log_level = 10 tfe_count = 1 local_eth = enp8s0 +keepalive_replay_switch = 1 [maat] #readconf_mode: 0 = iris, 1 = json, 2 = redis @@ -59,4 +60,13 @@ mho_mutex_num = 160 mho_hash_slot_size = 160000 mho_hash_max_element_num = 640000 mho_expire_time = 30 +mho_eliminate_type = LRU + +[keepalive_replay_htable] +mho_screen_print_ctrl = 0 +mho_thread_safe = 1 +mho_mutex_num = 160 +mho_hash_slot_size = 160000 +mho_hash_max_element_num = 640000 +mho_expire_time = 30 mho_eliminate_type = LRU \ No newline at end of file diff --git a/entry/src/kni_entry.cpp b/entry/src/kni_entry.cpp index c7c89e9..c5c696b 100644 --- a/entry/src/kni_entry.cpp +++ b/entry/src/kni_entry.cpp @@ -146,8 +146,10 @@ struct kni_handle{ struct kni_maat_handle *maat_handle; struct kni_send_logger *send_logger; MESA_htable_handle traceid2pme_htable; + MESA_htable_handle keepalive_replay_htable; int tfe_count; uint32_t local_ipv4; + int keepalive_replay_switch; void *local_logger; }; @@ -156,6 +158,18 @@ struct traceid2pme_search_cb_args{ void *logger; }; +struct keepalive_replay_htable_value{ + int has_replayed; + uint32_t first_data_len; +}; + +struct keepalive_replay_search_cb_args{ + marsio_buff_t *rx_buff; + struct kni_marsio_handle *marsio_handle; + struct iphdr *raw_packet_iphdr; + int tfe_id; +}; + static void pme_info_destroy(void *data){ struct pme_info *pmeinfo = (struct pme_info *)data; void *logger = g_kni_handle->local_logger; @@ -197,7 +211,7 @@ static struct pme_info* pme_info_new(const struct streaminfo *stream, int thread KNI_LOG_ERROR(logger, "Failed at init pthread mutex, stream_traceid is %s", pmeinfo->stream_traceid); goto error_out; } - kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); + kni_stream_addr_trans(&(stream->addr), stream_addr, sizeof(stream_addr)); KNI_LOG_INFO(logger, "stream addr is %s, stream traceid is %s", stream_addr, pmeinfo->stream_traceid); //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_TOT_STM], 0, FS_OP_ADD, 1); return pmeinfo; @@ -314,7 +328,7 @@ static int sendlog_to_kafka(struct pme_info *pmeinfo, void *local_logger){ KNI_LOG_DEBUG(local_logger, "log_msg is %s\n", log_msg); ret = kni_send_logger_sendlog(g_kni_handle->send_logger, log_msg, strlen(log_msg)); if(ret < 0){ - KNI_LOG_ERROR(local_logger, "Failed at kni_send_logger_sendlog, ret is %d, strem_traceid is %s", + KNI_LOG_ERROR(local_logger, "Failed at knisend_logger_sendlog, ret is %d, strem_traceid is %s", ret, pmeinfo->stream_traceid); goto error_out; } @@ -553,7 +567,7 @@ static int send_to_tfe(struct kni_marsio_handle *handle, char *raw_data, int raw struct mr_sendpath *dev_eth_sendpath = handle->tfe_instance_list[tfe_id]->dev_eth_sendpath; char *src_mac = handle->src_mac_addr; char *dst_mac = handle->tfe_instance_list[tfe_id]->mac_addr; - //only send one packet + //only send one packet, alloc_ret <= nr_send <= BURST_MAX int nr_send = 1; int alloc_ret = marsio_buff_malloc_device(dev_eth_handler, tx_buffs, nr_send, 0, thread_seq); if (alloc_ret < 0){ @@ -601,7 +615,7 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein int len = pktinfo->ip_totlen; int ret; char stream_addr[KNI_SYMBOL_MAX] = ""; - kni_stream_addr_trans((struct ipaddr*)(&stream->addr), stream_addr, sizeof(stream_addr)); + kni_stream_addr_trans(&(stream->addr), stream_addr, sizeof(stream_addr)); //pmeinfo->action has only 3 value: KNI_ACTION_NONE, KNI_ACTION_INTERCEPT, KNI_ACTION_BYPASS switch (pmeinfo->action){ case KNI_ACTION_NONE: @@ -684,13 +698,17 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein return APP_STATE_FAWPKT | APP_STATE_DROPME; } int key_size; + struct stream_tuple4_v4 *c2s_key = NULL; + struct stream_tuple4_v4 s2c_key; + struct keepalive_replay_htable_value *c2s_value = NULL; + struct keepalive_replay_htable_value *s2c_value = NULL; switch(pmeinfo->action){ case KNI_ACTION_BYPASS: FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_PKT], 0, FS_OP_ADD, 1); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_BYP_STM], 0, FS_OP_ADD, 1); return APP_STATE_FAWPKT | APP_STATE_GIVEME; case KNI_ACTION_INTERCEPT: - //only intercept: add to hash table + //only intercept: add to traceid2pme htable key_size = strnlen(pmeinfo->stream_traceid, sizeof(pmeinfo->stream_traceid)); ret = MESA_htable_add(g_kni_handle->traceid2pme_htable, (const unsigned char *)(pmeinfo->stream_traceid), key_size, (const void*)pmeinfo); @@ -704,6 +722,45 @@ static char data_opstate(const struct streaminfo *stream, struct pme_info *pmein "table is traceid2pme_htable, key is %s", pmeinfo->stream_traceid); FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1); } + + //interctp: add to keepalive_replay_htable + //c2s + c2s_key = stream->addr.tuple4_v4; + key_size = sizeof(*c2s_key); + c2s_value = ALLOC(struct keepalive_replay_htable_value, 1); + c2s_value->first_data_len = pktinfo->data_len; + ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)c2s_key, + key_size, (const void*)c2s_value); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add," + "table is keepalive_replay_htable, dir is c2s, stream is %s", stream_addr); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1); + } + else{ + KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add," + "table is keepalive_replay_htable, dir is c2s, stream is %s", stream_addr); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1); + } + //s2c + key_size = sizeof(s2c_key); + s2c_key.saddr = c2s_key->daddr; + s2c_key.daddr = c2s_key->saddr; + s2c_key.source = c2s_key->dest; + s2c_key.dest = c2s_key->source; + s2c_value = ALLOC(struct keepalive_replay_htable_value, 1); + ret = MESA_htable_add(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&s2c_key), + key_size, (const void*)s2c_value); + if(ret < 0){ + KNI_LOG_ERROR(logger, "MESA_htable: failed at MESA_htable_add," + "table is keepalive_replay_htable, dir is s2c, stream is %s", stream_addr); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_FAIL], 0, FS_OP_ADD, 1); + } + else{ + KNI_LOG_DEBUG(logger, "MESA_htable: succeed at MESA_htable_add," + "table is keepalive_replay_htable, dir is s2c, stream is %s", stream_addr); + //FS_operate(g_kni_fs_handle->handle, g_kni_fs_handle->fields[KNI_FIELD_ID2PME_ADD_SUCC], 0, FS_OP_ADD, 1); + } + //action = KNI_ACTION_INTERCEPT, sendto tfe buf = add_cmsg_to_packet(pmeinfo, pktinfo, &len); ret = send_to_tfe(g_kni_handle->marsio_handle, buf, len, thread_seq, pmeinfo->tfe_id); @@ -892,29 +949,114 @@ static void kni_marsio_destroy(struct kni_marsio_handle *handle){ handle = NULL; } + +static void sendto_vxlan(marsio_buff_t *rx_buff, struct mr_sendpath *dev_vxlan_sendpath, int thread_seq){ + //tag + struct mr_tunnat_ctrlzone mr_ctrlzone; + memset(&mr_ctrlzone, 0, sizeof(mr_ctrlzone)); + mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER); + marsio_buff_ctrlzone_set(rx_buff, 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone)); + + //send to vxlan, vxlan handler: recv: 0, send: 1, nr_burst must be 1 + int nr_burst = 1; + marsio_send_burst_with_options(dev_vxlan_sendpath, thread_seq, &rx_buff, nr_burst, MARSIO_SEND_OPT_FAST); +} + +static long keepalive_replay_search_cb(void *data, const uchar *key, uint size, void *user_args){ + void *logger = g_kni_handle->local_logger; + struct keepalive_replay_search_cb_args *args = (struct keepalive_replay_search_cb_args*)user_args; + struct kni_marsio_handle *marsio_handle = args->marsio_handle; + marsio_buff_t *rx_buff = args->rx_buff; + int tfe_id = args->tfe_id; + if(data == NULL){ + sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id); + return 0; + } + struct keepalive_replay_htable_value *value = (struct keepalive_replay_htable_value*)data; + if(value->has_replayed == 1){ + sendto_vxlan(rx_buff, marsio_handle->dev_vxlan_sendpath, tfe_id); + return 0; + } + //raw_packet: window update + struct iphdr *raw_packet_iphdr = args->raw_packet_iphdr; + int tot_len = ntohs(raw_packet_iphdr->tot_len); + int iphdr_len = raw_packet_iphdr->ihl * 4; + struct tcphdr *raw_packet_tcphdr = (struct tcphdr*)((char*)raw_packet_iphdr + iphdr_len); + //replay packet + char *replay_packet = ALLOC(char, tot_len); + memcpy(replay_packet, (void*)raw_packet_iphdr, tot_len); + struct iphdr *replay_packet_iphdr = (struct iphdr*)replay_packet; + struct tcphdr *replay_packet_tcphdr = (struct tcphdr*)((char*)replay_packet_iphdr + iphdr_len); + replay_packet_iphdr->saddr = raw_packet_iphdr->daddr; + replay_packet_iphdr->daddr = raw_packet_iphdr->saddr; + replay_packet_tcphdr->source = raw_packet_tcphdr->dest; + replay_packet_tcphdr->dest = raw_packet_tcphdr->source; + replay_packet_tcphdr->seq = htonl(ntohl(raw_packet_tcphdr->ack_seq) + value->first_data_len); //seq = ack + first_data_len + replay_packet_tcphdr->ack_seq = htonl(ntohl(raw_packet_tcphdr->seq) + 1); //ack = seq + 1 + replay_packet_iphdr->check = 0; + replay_packet_iphdr->check = kni_ip_checksum((void*)replay_packet_iphdr, iphdr_len); + replay_packet_tcphdr->check = 0; + replay_packet_tcphdr->check = kni_tcp_checksum((void*)replay_packet_tcphdr, tot_len - iphdr_len, + replay_packet_iphdr->saddr, replay_packet_iphdr->daddr); + //send to tfe: thread_seq = g_iThreadNum + int ret = send_to_tfe(marsio_handle, replay_packet, tot_len, g_iThreadNum, tfe_id); + if(ret < 0){ + KNI_LOG_ERROR(logger, "Failed at send keepalive replay packet to tfe"); + } + value->has_replayed = 1; + FREE(&replay_packet); + return 0; +} + void* thread_tfe_data_receiver(void *args){ struct thread_tfe_data_receiver_args *_args = (struct thread_tfe_data_receiver_args*)args; struct kni_marsio_handle *marsio_handle = _args->marsio_handle; int tfe_id = _args->tfe_id; struct mr_vdev *dev_eth_handler = marsio_handle->tfe_instance_list[tfe_id]->dev_eth_handler; FREE(&args); - marsio_buff_t * rx_buff[BURST_MAX]; + marsio_buff_t *rx_buffs[BURST_MAX]; int nr_burst = 1; + //eth_handler: recv: 1 int thread_seq = 0; while(true){ - //receive from tfe - int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buff, nr_burst); + //receive from tfe, nr_recv <= nr_burst <= BURST_MAX + int nr_recv = marsio_recv_burst(dev_eth_handler, thread_seq, rx_buffs, nr_burst); if(nr_recv <= 0){ continue; } - //tag - struct mr_tunnat_ctrlzone mr_ctrlzone; - mr_ctrlzone.action |= (TUNNAT_CZ_ACTION_ENCAP_INNER | TUNNAT_CZ_ACTION_ENCAP_OUTER); - for(int i = 0; i < nr_recv; i++){ - marsio_buff_ctrlzone_set(rx_buff[i], 0, &mr_ctrlzone, sizeof(struct mr_tunnat_ctrlzone)); + if(g_kni_handle->keepalive_replay_switch == 1){ + for(int i = 0; i < nr_recv; i++){ + struct ethhdr *ether_hdr = (struct ethhdr*)marsio_buff_mtod(rx_buffs[i]); + if(ether_hdr->h_proto == htons(ETH_P_IP)){ + struct iphdr *iphdr = (struct iphdr*)((char*)ether_hdr + sizeof(*ether_hdr)); + int iphdr_len = iphdr->ihl * 4; + struct tcphdr *tcphdr = (struct tcphdr*)((char*)iphdr + iphdr_len); + struct stream_tuple4_v4 key; + key.saddr = iphdr->saddr; + key.daddr = iphdr->daddr; + key.source = tcphdr->source; + key.dest = tcphdr->dest; + int key_size = sizeof(key); + long cb_ret = -1; + keepalive_replay_search_cb_args cb_args; + memset(&cb_args, 0, sizeof(cb_args)); + cb_args.rx_buff = rx_buffs[i]; + cb_args.marsio_handle = marsio_handle; + cb_args.raw_packet_iphdr = iphdr; + cb_args.tfe_id = tfe_id; + MESA_htable_search_cb(g_kni_handle->keepalive_replay_htable, (const unsigned char *)(&key), + key_size, keepalive_replay_search_cb, &cb_args, &cb_ret); + } + else{ + sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id); + } + } + } + else{ + for(int i = 0; i < nr_recv; i++){ + sendto_vxlan(rx_buffs[i], marsio_handle->dev_vxlan_sendpath, tfe_id); + } } - //send to vxlan - marsio_send_burst_with_options(marsio_handle->dev_vxlan_sendpath, thread_seq, rx_buff, nr_recv, MARSIO_SEND_OPT_FAST); } return NULL; } @@ -1127,7 +1269,6 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ } marsio_option_set(mr_inst, MARSIO_OPT_EXIT_WHEN_ERR, &opt_value, sizeof(opt_value)); marsio_init(mr_inst, appsym); - //eth_handler receive thread = 1, send thread = g_iThreadNum tfe_count = g_kni_handle->tfe_count; for(int i = 0; i < tfe_count; i++){ //load tfe conf @@ -1157,8 +1298,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ } KNI_LOG_INFO(logger, "MESA_prof_load, [%s]:\n mac_addr: %s\n dev_eth_symbol: %s", _section, mac_addr_str, dev_eth_symbol); - //handler - dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum); + //eth_handler receive thread = 1, send thread = g_iThreadNum + 1 + dev_eth_handler = marsio_open_device(mr_inst, dev_eth_symbol, 1, g_iThreadNum + 1); if(dev_eth_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_eth_symbol); goto error_out; @@ -1174,8 +1315,8 @@ static struct kni_marsio_handle* kni_marsio_init(const char* profile){ tfe_inst->dev_eth_sendpath = dev_eth_sendpath; handle->tfe_instance_list[i] = tfe_inst; } - //vxlan_handler: receive: 0 thread, send: 1 - dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, 1); + //vxlan_handler: receive: 0, send: tfe_count + dev_vxlan_handler = marsio_open_device(mr_inst, dev_vxlan_symbol, 0, tfe_count); if(dev_vxlan_handler == NULL){ KNI_LOG_ERROR(logger, "Failed at marsio_open_device, dev_symbol is %s", dev_vxlan_symbol); goto error_out; @@ -1289,6 +1430,10 @@ static int traceid2pme_htable_expire_notify_cb(void *data, int eliminate_type){ return 0; } +static void keepalive_replay_data_free_cb(void *data) +{ + FREE(&data); +} extern "C" int kni_init(){ const char *profile = "./conf/kni/kni.conf"; @@ -1303,8 +1448,9 @@ extern "C" int kni_init(){ void *local_logger = NULL; int log_level = -1; pthread_t thread_id = -1; + int keepalive_replay_switch = -1; struct thread_tfe_cmsg_receiver_args *cmsg_receiver_args; - MESA_htable_handle traceid2pme_htable = NULL; + MESA_htable_handle traceid2pme_htable = NULL, keepalive_replay_htable = NULL; int ret = MESA_load_profile_string_nodef(profile, section, "log_path", log_path, sizeof(log_path)); if(ret < 0){ printf("MESA_prof_load: log_path not set, profile is %s, section is %s", profile, section); @@ -1338,11 +1484,17 @@ extern "C" int kni_init(){ printf("MESA_prof_load: local_eth not set, profile is %s, section is %s", profile, section); goto error_out; } - KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s", - section, log_path, log_level, tfe_count, local_eth); + ret = MESA_load_profile_int_nodef(profile, section, "keepalive_replay_switch", &keepalive_replay_switch); + if(ret < 0){ + printf("MESA_prof_load: keepalive_replay_switch not set, profile is %s, section is %s", profile, section); + goto error_out; + } + KNI_LOG_INFO(local_logger, "MESA_prof_load, [%s]:\n log_path: %s\n log_level: %d\n tfe_count: %d\n local_eth: %s\n keepalive_replay_switch: %d", + section, log_path, log_level, tfe_count, local_eth, keepalive_replay_switch); g_kni_handle = ALLOC(struct kni_handle, 1); g_kni_handle->local_logger = local_logger; g_kni_handle->tfe_count = tfe_count; + g_kni_handle->keepalive_replay_switch = keepalive_replay_switch; //init http_project id = http_project_init(); @@ -1359,31 +1511,6 @@ extern "C" int kni_init(){ goto error_out; } - //create thread_tfe_data_receiver - for(int i = 0; i < tfe_count; i++){ - struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1); - args->logger = local_logger; - args->marsio_handle = g_kni_handle->marsio_handle; - args->tfe_id = i; - int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args); - if(unlikely(ret != 0)){ - KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret); - FREE(&args); - goto error_out; - } - } - - //create thread_tfe_cmsg_receiver - cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1); - cmsg_receiver_args->logger = local_logger; - strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1)); - ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args); - if(unlikely(ret != 0)){ - KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret); - FREE(&cmsg_receiver_args); - goto error_out; - } - //init maat g_kni_handle->maat_handle = kni_maat_init(profile, local_logger); if(g_kni_handle->maat_handle == NULL){ @@ -1422,6 +1549,42 @@ extern "C" int kni_init(){ goto error_out; } g_kni_handle->traceid2pme_htable = traceid2pme_htable; + + //init keepalive_replay_htable + if(g_kni_handle->keepalive_replay_switch == 1){ + keepalive_replay_htable = kni_create_htable(profile, "keepalive_replay_htable", (void*)keepalive_replay_data_free_cb, + NULL, local_logger); + if(keepalive_replay_htable == NULL){ + KNI_LOG_ERROR(local_logger, "Failed at create keepalive_replay_htable"); + goto error_out; + } + g_kni_handle->keepalive_replay_htable = keepalive_replay_htable; + } + + //create thread_tfe_data_receiver + for(int i = 0; i < tfe_count; i++){ + struct thread_tfe_data_receiver_args *args = ALLOC(struct thread_tfe_data_receiver_args, 1); + args->logger = local_logger; + args->marsio_handle = g_kni_handle->marsio_handle; + args->tfe_id = i; + int ret = pthread_create(&thread_id, NULL, thread_tfe_data_receiver, (void *)args); + if(unlikely(ret != 0)){ + KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_data_receiver, ret is %d", ret); + FREE(&args); + goto error_out; + } + } + + //create thread_tfe_cmsg_receiver + cmsg_receiver_args = ALLOC(struct thread_tfe_cmsg_receiver_args, 1); + cmsg_receiver_args->logger = local_logger; + strncpy(cmsg_receiver_args->profile, profile, strnlen(profile, sizeof(cmsg_receiver_args->profile) - 1)); + ret = pthread_create(&thread_id, NULL, thread_tfe_cmsg_receiver, (void *)cmsg_receiver_args); + if(unlikely(ret != 0)){ + KNI_LOG_ERROR(local_logger, "Failed at pthread_create, thread_func is thread_tfe_cmsg_receiver, ret is %d", ret); + FREE(&cmsg_receiver_args); + goto error_out; + } return 0; error_out: diff --git a/entry/src/kni_send_logger.cpp b/entry/src/kni_send_logger.cpp index 22b981f..03fbb41 100644 --- a/entry/src/kni_send_logger.cpp +++ b/entry/src/kni_send_logger.cpp @@ -137,6 +137,9 @@ error_out: } int kni_send_logger_sendlog(kni_send_logger *handle, char *log_msg, int log_msg_len){ + if(handle->sendlog_switch == 0){ + return 0; + } void *logger = handle->local_logger; //kafka produce int kafka_status = rd_kafka_produce(handle->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,