#include #include #include #include #include #include // for NF_ACCEPT #include #include #include #include #include #include #include "io_uring.h" #include "tfe_tap_rss.h" #include "tfe_metrics.h" #include "tfe_tcp_restore.h" #include "acceptor_kni_v4.h" static void *worker_thread_cycle(void *arg) { struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; struct acceptor_ctx *acceptor_ctx = thread_ctx->ref_acceptor_ctx; int pkg_len = 0; char thread_name[16]; int n_pkt_recv_from_nf = 0; int n_pkt_recv_from_tap = 0; int n_pkt_recv_from_tap_c = 0; int n_pkt_recv_from_tap_s = 0; snprintf(thread_name, sizeof(thread_name), "kni:worker-%d", thread_ctx->thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); if (packet_io_thread_init(handle, thread_ctx) != 0) { goto error_out; } if (acceptor_ctx->config->enable_iouring) { io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_fd, handle_raw_packet_from_tap, thread_ctx); io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_c, handle_decryption_packet_from_tap, thread_ctx); io_uring_register_read_callback(thread_ctx->tap_ctx->io_uring_s, handle_decryption_packet_from_tap, thread_ctx); } else { thread_ctx->tap_ctx->buff_size = 3000; thread_ctx->tap_ctx->buff = ALLOC(char, thread_ctx->tap_ctx->buff_size); } TFE_LOG_INFO(g_default_logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_ctx->thread_index); while(1) { n_pkt_recv_from_nf = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx); if (acceptor_ctx->config->enable_iouring) { n_pkt_recv_from_tap = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_fd); n_pkt_recv_from_tap_c = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_c); n_pkt_recv_from_tap_c = io_uring_peek_ready_entrys(thread_ctx->tap_ctx->io_uring_s); } else { if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_fd, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0) { handle_raw_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx); } if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_c, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0) { handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx); } if ((pkg_len = tfe_tap_read_per_thread(thread_ctx->tap_ctx->tap_s, thread_ctx->tap_ctx->buff, thread_ctx->tap_ctx->buff_size, g_default_logger)) > 0) { handle_decryption_packet_from_tap(thread_ctx->tap_ctx->buff, pkg_len, thread_ctx); } } global_metrics_dump(acceptor_ctx->metrics); if (n_pkt_recv_from_nf == 0) { packet_io_thread_wait(handle, thread_ctx, 0); } if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0) { session_table_reset(thread_ctx->session_table); __atomic_fetch_and(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED); } } error_out: TFE_LOG_ERROR(g_default_logger, "%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index); return (void *)NULL; } void acceptor_kni_v4_destroy() { return; } struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const char *profile, void *logger) { int ret = 0; struct acceptor_kni_v4 *__ctx = (struct acceptor_kni_v4 *)calloc(1, sizeof(struct acceptor_kni_v4)); struct acceptor_ctx *acceptor_ctx = acceptor_ctx_create(profile); if (acceptor_ctx == NULL) goto error_out; acceptor_ctx->ref_proxy = proxy; for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { acceptor_ctx->work_threads[i].tid = 0; acceptor_ctx->work_threads[i].thread_index = i; acceptor_ctx->work_threads[i].ref_acceptor_ctx = acceptor_ctx; acceptor_ctx->work_threads[i].tap_ctx = tfe_tap_ctx_create(&acceptor_ctx->work_threads[i]); if (acceptor_ctx->config->enable_iouring) { int eventfd = 0; struct tap_ctx *tap_ctx = acceptor_ctx->work_threads[i].tap_ctx; tap_ctx->io_uring_fd = io_uring_instance_create(tap_ctx->tap_fd, eventfd, acceptor_ctx->config->ring_size, acceptor_ctx->config->buff_size, acceptor_ctx->config->flags, acceptor_ctx->config->sq_thread_idle, acceptor_ctx->config->enable_debuglog); tap_ctx->io_uring_c = io_uring_instance_create(tap_ctx->tap_c, eventfd, acceptor_ctx->config->ring_size, acceptor_ctx->config->buff_size, acceptor_ctx->config->flags, acceptor_ctx->config->sq_thread_idle, acceptor_ctx->config->enable_debuglog); tap_ctx->io_uring_s = io_uring_instance_create(tap_ctx->tap_s, eventfd, acceptor_ctx->config->ring_size, acceptor_ctx->config->buff_size, acceptor_ctx->config->flags, acceptor_ctx->config->sq_thread_idle, acceptor_ctx->config->enable_debuglog); } acceptor_ctx->work_threads[i].session_table = session_table_create(); acceptor_ctx->work_threads[i].ref_io = acceptor_ctx->io; acceptor_ctx->work_threads[i].ref_proxy = proxy; acceptor_ctx->work_threads[i].ref_tap_config = acceptor_ctx->config; acceptor_ctx->work_threads[i].ref_metrics = acceptor_ctx->metrics; acceptor_ctx->work_threads[i].session_table_need_reset = 0; if (acceptor_ctx->config->tap_rps_enable) { ret = tfe_tap_set_rps(g_default_logger, acceptor_ctx->config->tap_device, i, acceptor_ctx->config->tap_rps_mask); if (ret != 0) goto error_out; } } for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { struct acceptor_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; if (pthread_create(&thread_ctx->tid, NULL, worker_thread_cycle, (void *)thread_ctx) < 0) { goto error_out; } } return __ctx; error_out: acceptor_kni_v4_destroy(); return NULL; }