/* * Proxy engine, built around libevent 2.x. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1}; /* Global Resource */ void * g_default_logger = NULL; struct tfe_proxy * g_default_proxy = NULL; /* Per thread resource */ thread_local unsigned int __currect_thread_id = 0; thread_local void * __currect_default_logger = NULL; struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx) { unsigned int min_thread_id = 0; unsigned int min_load = 0; for(unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) { struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid]; min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id; min_load = min_load > thread_ctx->load ? thread_ctx->load : min_load; } ctx->work_threads[min_thread_id]->load++; return ctx->work_threads[min_thread_id]; } void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx) { thread_ctx->load--; } int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para) { tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, ¶->session_type, sizeof(para->session_type)); /* FOR DEBUG */ if (para->passthrough || ctx->tcp_all_passthrough) { bool __true = true; enum tfe_session_proto __session_type = SESSION_PROTO_PLAIN; tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true)); tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); } tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd); TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted", stream, para->downstream_fd, para->upstream_fd, para->session_type); return 0; } void tfe_proxy_loopbreak(tfe_proxy * ctx) { event_base_loopbreak(ctx->evbase); } void tfe_proxy_free(tfe_proxy * ctx) { return; } static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { return; } static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; switch (fd) { case SIGTERM: case SIGQUIT: case SIGINT: case SIGHUP: break; case SIGUSR1: break; case SIGPIPE: TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); break; default: TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd); break; } } static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; (void)fd; (void)what; } static void * __thread_ctx_entry(void * arg) { struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; struct timeval timer_delay = {60, 0}; struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); if (unlikely(ev == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id); exit(EXIT_FAILURE); } evtimer_add(ev, &timer_delay); ctx->running = 1; __currect_thread_id = ctx->thread_id; TFE_LOG_INFO(g_default_logger, "Thread %u is running...", ctx->thread_id); event_base_dispatch(ctx->evbase); event_free(ev); return (void *)NULL; } struct tfe_thread_ctx * __thread_ctx_create(struct tfe_proxy * proxy, unsigned int thread_id) { struct tfe_thread_ctx * __thread_ctx = ALLOC(struct tfe_thread_ctx, 1); assert(__thread_ctx != NULL); __thread_ctx->thread_id = thread_id; __thread_ctx->evbase = event_base_new(); int ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx); if (unlikely(ret < 0)) { TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d: %s", strerror(errno)); goto __errout; } return __thread_ctx; __errout: if (__thread_ctx != NULL && __thread_ctx->evbase != NULL) event_base_free(__thread_ctx->evbase); if (__thread_ctx != NULL) free(__thread_ctx); return NULL; } int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) { MESA_load_profile_uint_def(profile, "main", "nr_worker_threads", &proxy->nr_work_threads, 1); MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0); return 0; } #define CHECK_OR_EXIT(condition, fmt, ...) \ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); exit(EXIT_FAILURE); } } while(0) \ int main(int argc, char *argv[]) { const char* main_profile="./conf/tfe.conf"; g_default_logger = MESA_create_runtime_log_handle("log/tfe.log", RLOG_LV_DEBUG); if (unlikely(g_default_logger == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating default logger: %s", "log/tfe.log"); exit(EXIT_FAILURE); } /* PROXY INSTANCE */ g_default_proxy = ALLOC(struct tfe_proxy, 1); assert(g_default_proxy); /* CONFIG */ int ret = tfe_proxy_config(g_default_proxy, main_profile); CHECK_OR_EXIT(ret == 0, "Failed at loading profile %s, Exit.", main_profile); /* LOGGER */ g_default_proxy->logger = g_default_logger; /* MAIN THREAD EVBASE */ g_default_proxy->evbase = event_base_new(); CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit."); /* GC EVENT */ g_default_proxy->gcev = event_new(g_default_proxy->evbase, -1, EV_PERSIST, __gc_handler_cb, g_default_proxy); CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. "); /* MODULE INIT */ g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); struct timeval gc_delay = {60, 0}; evtimer_add(g_default_proxy->gcev , &gc_delay); /* WORKER THREAD */ for(unsigned tid = 0; tid < g_default_proxy->nr_work_threads; tid++) { g_default_proxy->work_threads[tid] = __thread_ctx_create(g_default_proxy, tid); CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid); } TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); event_base_dispatch(g_default_proxy->evbase); return 0; }