diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index d59109a..76b4973 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -3,6 +3,11 @@ nr_worker_threads=1 enable_kni_v1=0 enable_kni_v2=1 disable_coredump=0 +enable_cpu_affinity=0 +# ask for at least (1 + nr_worker_threads) masks +# the first mask for acceptor thread +# the others mask for worker thread +cpu_affinity_mask=1-9,10-12 [kni] ip=192.168.100.1 diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 35f6647..9868353 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -9,6 +9,7 @@ struct tfe_thread_ctx { + struct tfe_proxy *proxy; pthread_t thr; unsigned int thread_id; unsigned int load; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index b575741..40223e8 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -124,6 +124,10 @@ struct tfe_proxy /* Crash Report */ struct breakpad_instance * breakpad; + + /* cpu affinity */ + unsigned int enable_cpu_affinity; + unsigned int cpu_affinity_mask[TFE_THREAD_MAX]; }; extern struct tfe_proxy * g_default_proxy; @@ -137,3 +141,4 @@ void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx); struct tfe_proxy * tfe_proxy_new(const char * profile); int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, struct tfe_cmsg * cmsg); void tfe_proxy_run(struct tfe_proxy * proxy); +int tfe_thread_set_affinity(int core_id); diff --git a/platform/src/acceptor_kni_v1.cpp b/platform/src/acceptor_kni_v1.cpp index 9dad068..06d5dc0 100644 --- a/platform/src/acceptor_kni_v1.cpp +++ b/platform/src/acceptor_kni_v1.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT #define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler" @@ -357,8 +358,18 @@ void * __kni_listener_thread_entry(void * args) { struct acceptor_kni_v1 * __ctx = (struct acceptor_kni_v1 *) args; assert(__ctx != NULL && __ctx->thread == pthread_self()); + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "tfe:acceptor-v1"); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - TFE_LOG_DEBUG(__ctx->logger, "Starting KNI listener thread..."); + char affinity[32] = {0}; + if (__ctx->proxy->enable_cpu_affinity) + { + tfe_thread_set_affinity(__ctx->proxy->cpu_affinity_mask[0]); + snprintf(affinity, sizeof(affinity), "affinity cpu%d", __ctx->proxy->cpu_affinity_mask[0]); + } + + TFE_LOG_DEBUG(__ctx->logger, "Starting KNI listener thread %s...", __ctx->proxy->enable_cpu_affinity ? affinity : ""); event_base_dispatch(__ctx->ev_base); TFE_LOG_DEBUG(__ctx->logger, "Stoping KNI listener thread..."); return (void *) NULL; diff --git a/platform/src/acceptor_kni_v2.cpp b/platform/src/acceptor_kni_v2.cpp index 4ec4ad3..873ba7b 100644 --- a/platform/src/acceptor_kni_v2.cpp +++ b/platform/src/acceptor_kni_v2.cpp @@ -17,7 +17,7 @@ #include #include #include -# +#include #ifndef TFE_CONFIG_SCM_SOCKET_FILE #define TFE_CONFIG_SCM_SOCKET_FILE "/var/run/.tfe_kmod_scm_socket" @@ -143,8 +143,18 @@ void * acceptor_kni_v2_event_thread_entry(void * args) { struct acceptor_kni_v2 * __ctx = (struct acceptor_kni_v2 *) args; assert(__ctx != NULL && __ctx->thread == pthread_self()); + char thread_name[16]; + snprintf(thread_name, sizeof(thread_name), "tfe:acceptor-v2"); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - TFE_LOG_INFO(__ctx->logger, "scm acceptor thread is running."); + char affinity[32] = {0}; + if (__ctx->proxy->enable_cpu_affinity) + { + tfe_thread_set_affinity(__ctx->proxy->cpu_affinity_mask[0]); + snprintf(affinity, sizeof(affinity), "affinity cpu%d", __ctx->proxy->cpu_affinity_mask[0]); + } + + TFE_LOG_INFO(__ctx->logger, "scm acceptor thread %s is running.", __ctx->proxy->enable_cpu_affinity ? affinity : ""); event_base_dispatch(__ctx->ev_base); DIE("scm acceptor thread is exited, abort."); } diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 58dfa0f..2887928 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -122,6 +122,21 @@ static int check_is_started_by_notify() return notify_socket == NULL ? 0 : 1; } +int tfe_thread_set_affinity(int core_id) +{ + int num_cores = sysconf(_SC_NPROCESSORS_ONLN); + if (core_id < 0 || core_id >= num_cores) + { + return EINVAL; + } + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(core_id, &cpuset); + + return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); +} + int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, struct tfe_cmsg * cmsg) { struct tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); @@ -239,7 +254,14 @@ static void * tfe_work_thread(void * arg) snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); - TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id); + char affinity[32] = {0}; + if (ctx->proxy->enable_cpu_affinity) + { + tfe_thread_set_affinity(ctx->proxy->cpu_affinity_mask[ctx->thread_id + 1]); + snprintf(affinity, sizeof(affinity), "affinity cpu%d", ctx->proxy->cpu_affinity_mask[ctx->thread_id + 1]); + } + + TFE_LOG_INFO(g_default_logger, "Work thread %u %s is running...", ctx->thread_id, ctx->proxy->enable_cpu_affinity ? affinity : ""); event_base_dispatch(ctx->evbase); assert(0); event_free(ev); @@ -257,6 +279,7 @@ void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) proxy->work_threads[i]->evbase = event_base_new(); proxy->work_threads[i]->dnsbase = evdns_base_new(proxy->work_threads[i]->evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS); proxy->work_threads[i]->evhttp = key_keeper_evhttp_init(proxy->work_threads[i]->evbase, proxy->work_threads[i]->dnsbase, proxy->key_keeper_handler); + proxy->work_threads[i]->proxy = proxy; } return; } @@ -283,6 +306,34 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) /* Worker threads */ MESA_load_profile_uint_def(profile, "system", "nr_worker_threads", &proxy->nr_work_threads, 1); MESA_load_profile_uint_def(profile, "system", "buffer_output_limit", &proxy->buffer_output_limit, 0); + MESA_load_profile_uint_def(profile, "system", "enable_cpu_affinity", &proxy->enable_cpu_affinity, 0); + MESA_load_profile_uint_range(profile, "system", "cpu_affinity_mask", TFE_THREAD_MAX, proxy->cpu_affinity_mask); + + if (proxy->nr_work_threads < 1 || proxy->nr_work_threads > TFE_THREAD_MAX) + { + TFE_LOG_ERROR(g_default_logger, "'nr_worker_threads' is invalid, only support [1, %d].", TFE_THREAD_MAX); + return -1; + } + + if (proxy->enable_cpu_affinity) + { + unsigned int num_cores = sysconf(_SC_NPROCESSORS_ONLN); + + if (proxy->nr_work_threads > num_cores - 2) + { + TFE_LOG_ERROR(g_default_logger, "'nr_worker_threads' is invalid, suggest [1, cpu_cores - 2]."); + return -1; + } + + for (unsigned int i = 0; i < proxy->nr_work_threads; i++) + { + if (proxy->cpu_affinity_mask[i] <= 0 || proxy->cpu_affinity_mask[i] >= num_cores) + { + TFE_LOG_ERROR(g_default_logger, "'cpu_affinity_mask' is invalid, only support [1, %d].", num_cores); + return -1; + } + } + } /* Debug */ MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0);