#include #include #include #include #include #include "sce.h" #include "log.h" #include "utils.h" #include "sf_metrics.h" #include "health_check.h" #include "global_metrics.h" static void sig_handler(int signo) { if (signo == SIGHUP) { LOG_INFO("%s: recv SIGHUP, reload zlog.conf", LOG_TAG_SCE); LOG_RELOAD(); } } static int thread_set_affinity(int core_id) { int num_cores = sysconf(_SC_NPROCESSORS_ONLN); if (core_id < 0 || core_id >= num_cores) { return EINVAL; } cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(core_id, &cpuset); return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); } static void *worker_thread_cycle(void *arg) { struct thread_ctx *thread_ctx = (struct thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; int n_packet_recv; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_ctx->thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); char affinity[32] = {0}; if (thread_ctx->cpu_mask >= 0) { thread_set_affinity(thread_ctx->cpu_mask); snprintf(affinity, sizeof(affinity), "affinity cpu%d", thread_ctx->cpu_mask); } LOG_INFO("%s: worker thread %d %s is running", LOG_TAG_SCE, thread_ctx->thread_index, thread_ctx->cpu_mask >= 0 ? affinity : ""); while (1) { n_packet_recv = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx); if (n_packet_recv) { // LOG_INFO("%s: worker thread %d recv %03d packets from nf_interface", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); } n_packet_recv = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx); if (n_packet_recv) { // LOG_INFO("%s: worker thread %d recv %03d packets from endpoint", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); } if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0) { session_table_reset(thread_ctx->session_table); __atomic_fetch_and(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED); } if (__atomic_fetch_add(&thread_ctx->sf_metrics_need_send, 0, __ATOMIC_RELAXED) > 0) { sf_metrics_send(thread_ctx->sf_metrics); sf_metrics_reset(thread_ctx->sf_metrics); __atomic_fetch_and(&thread_ctx->sf_metrics_need_send, 0, __ATOMIC_RELAXED); } } LOG_ERROR("%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index); return (void *)NULL; } int main(int argc, char **argv) { const char *profile = "./conf/sce.conf"; if (LOG_INIT("./conf/zlog.conf") == -1) { return -1; } if (signal(SIGHUP, sig_handler) == SIG_ERR) { LOG_ERROR("%s: unable to register SIGHUP signal handler, error %d: %s", LOG_TAG_SCE, errno, strerror(errno)); LOG_CLOSE(); return -1; } health_check_session_init(profile); struct sce_ctx *ctx = sce_ctx_create(profile); if (ctx == NULL) { LOG_CLOSE(); return -1; } for (int i = 0; i < ctx->nr_worker_threads; i++) { ctx->work_threads[i].tid = 0; ctx->work_threads[i].thread_index = i; ctx->work_threads[i].session_table = session_table_create(); ctx->work_threads[i].sf_metrics = sf_metrics_create(profile); ctx->work_threads[i].ref_io = ctx->io; ctx->work_threads[i].ref_metrics = ctx->metrics; ctx->work_threads[i].ref_enforcer = ctx->enforcer; ctx->work_threads[i].ref_sce_ctx = ctx; ctx->work_threads[i].session_table_need_reset = 0; ctx->work_threads[i].sf_metrics_need_send = 0; ctx->work_threads[i].cpu_mask = ctx->enable_cpu_affinity ? ctx->cpu_affinity_mask[i] : -1; } for (int i = 0; i < ctx->nr_worker_threads; i++) { struct thread_ctx *thread_ctx = &ctx->work_threads[i]; if (pthread_create(&thread_ctx->tid, NULL, worker_thread_cycle, (void *)thread_ctx) < 0) { LOG_ERROR("%s: unable to create worker thread %d, error %d: %s", LOG_TAG_SCE, i, errno, strerror(errno)); goto error_out; } } struct timespec current_time; struct timespec g_metrics_last_send_time; struct timespec sf_metrics_last_send_time; clock_gettime(CLOCK_MONOTONIC, ¤t_time); clock_gettime(CLOCK_MONOTONIC, &g_metrics_last_send_time); clock_gettime(CLOCK_MONOTONIC, &sf_metrics_last_send_time); while (1) { if (current_time.tv_sec - g_metrics_last_send_time.tv_sec >= ctx->metrics->config.statsd_cycle) { clock_gettime(CLOCK_MONOTONIC, &g_metrics_last_send_time); global_metrics_dump(ctx->metrics); } if (current_time.tv_sec - sf_metrics_last_send_time.tv_sec >= sf_metrics_get_interval(ctx->work_threads[0].sf_metrics)) { clock_gettime(CLOCK_MONOTONIC, &sf_metrics_last_send_time); for (int i = 0; i < ctx->nr_worker_threads; i++) { struct thread_ctx *thread_ctx = &ctx->work_threads[i]; __atomic_fetch_add(&thread_ctx->sf_metrics_need_send, 1, __ATOMIC_RELAXED); } } sleep(MIN(ctx->metrics->config.statsd_cycle, sf_metrics_get_interval(ctx->work_threads[0].sf_metrics))); clock_gettime(CLOCK_MONOTONIC, ¤t_time); } error_out: for (int i = 0; i < ctx->nr_worker_threads; i++) { struct thread_ctx *thread_ctx = &ctx->work_threads[i]; session_table_destory(thread_ctx->session_table); sf_metrics_destory(thread_ctx->sf_metrics); } sce_ctx_destory(ctx); LOG_CLOSE(); return 0; }