/* * Proxy engine, built around libevent 2.x. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1}; /* Global Resource */ void * g_default_logger = NULL; struct tfe_proxy * g_default_proxy = NULL; /* Per thread resource */ thread_local unsigned int __currect_thread_id = 0; thread_local void * __currect_default_logger = NULL; #define TFE_VAR_VERSION_CATTER(v) __attribute__((__used__)) const char * TFE_VERSION_version_##v = NULL #define TFE_VAR_VERSION_EXPEND(v) TFE_VAR_VERSION_CATTER(v) extern "C" { /* VERSION TAG */ #ifdef TFE_VAR_VERSION TFE_VAR_VERSION_EXPEND(TFE_VAR_VERSION); #else static __attribute__((__used__)) const char * TFE_VERSION_version_UNKNOWN = NULL; #endif #undef TFE_VAR_VERSION_CATTER #undef TFE_VAR_VERSION_EXPEND } /* VERSION STRING */ #ifdef TFE_GIT_VERSION static __attribute__((__used__)) const char * __tfe_version = TFE_GIT_VERSION; #else static __attribute__((__used__)) const char * tfe_version = "Unknown"; #endif struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx) { unsigned int min_thread_id = 0; unsigned int min_load = 0; for (unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) { struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid]; min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id; min_load = min_load > thread_ctx->load ? thread_ctx->load : min_load; } ctx->work_threads[min_thread_id]->load++; return ctx->work_threads[min_thread_id]; } void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx) { thread_ctx->load--; } int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para) { tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, ¶->session_type, sizeof(para->session_type)); tfe_stream_option_set(stream, TFE_STREAM_OPT_KEYRING_ID, ¶->keyring_id, sizeof(para->keyring_id)); /* FOR DEBUG */ if (para->passthrough || ctx->tcp_all_passthrough) { bool __true = true; enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN; tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true)); tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); } int ret = tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd); if (ret < 0) { TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.", stream, para->downstream_fd, para->upstream_fd, para->session_type); goto __errout; } else { TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted.", stream, para->downstream_fd, para->upstream_fd, para->session_type); } return 0; __errout: if(stream != NULL) tfe_stream_destory((struct tfe_stream_private *)stream); return -1; } void tfe_proxy_loopbreak(tfe_proxy * ctx) { event_base_loopbreak(ctx->evbase); } void tfe_proxy_free(tfe_proxy * ctx) { return; } static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { //printf("%s alive\n",__FUNCTION__); return; } static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; switch (fd) { case SIGTERM: case SIGQUIT: case SIGHUP: break; case SIGUSR1: break; case SIGPIPE: TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1); TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); break; default: TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd); break; } } static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; int i = 0; for (i = 0; i < TFE_STAT_MAX; i++) { FS_operate(ctx->fs_handle, ctx->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(ctx->stat_val[i]))); } FS_passive_output(ctx->fs_handle); return; } static void * tfe_work_thread(void * arg) { struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; struct timeval timer_delay = {60, 0}; struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); if (unlikely(ev == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id); exit(EXIT_FAILURE); } evtimer_add(ev, &timer_delay); ctx->running = 1; __currect_thread_id = ctx->thread_id; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id); event_base_dispatch(ctx->evbase); assert(0); event_free(ev); TFE_LOG_ERROR(g_default_logger, "Work thread %u is exit...", ctx->thread_id); return (void *) NULL; } void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) { unsigned int i = 0; for (i = 0; i < proxy->nr_work_threads; i++) { proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1); proxy->work_threads[i]->thread_id = i; proxy->work_threads[i]->evbase = event_base_new(); proxy->work_threads[i]->dnsbase = evdns_base_new(proxy->work_threads[i]->evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS); } return; } int tfe_proxy_work_thread_run(struct tfe_proxy * proxy) { struct tfe_thread_ctx * __thread_ctx = NULL; unsigned int i = 0; int ret = 0; for (i = 0; i < proxy->nr_work_threads; i++) { __thread_ctx = proxy->work_threads[i]; ret = pthread_create(&__thread_ctx->thr, NULL, tfe_work_thread, (void *) __thread_ctx); if (unlikely(ret < 0)) { TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, strerror(errno)); return -1; } } return 0; } int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) { /* Worker threads */ MESA_load_profile_uint_def(profile, "main", "nr_worker_threads", &proxy->nr_work_threads, 1); MESA_load_profile_uint_def(profile, "main", "buffer_output_limit", &proxy->buffer_output_limit, 0); /* Debug */ MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0); /* ratelimit */ MESA_load_profile_uint_def(profile, "ratelimit", "read_rate", &proxy->rate_limit_options.read_rate, 0); MESA_load_profile_uint_def(profile, "ratelimit", "read_burst", &proxy->rate_limit_options.read_burst, 0); MESA_load_profile_uint_def(profile, "ratelimit", "write_rate", &proxy->rate_limit_options.write_rate, 0); MESA_load_profile_uint_def(profile, "ratelimit", "write_burst", &proxy->rate_limit_options.write_burst, 0); if(proxy->rate_limit_options.read_rate != 0 || proxy->rate_limit_options.read_burst != 0 || proxy->rate_limit_options.write_rate != 0 || proxy->rate_limit_options.write_burst != 0) { proxy->en_rate_limit = 1; } /* TCP options, -1 means unset, we shall not call setsockopt */ MESA_load_profile_int_def(profile, "tcp", "sz_rcv_buffer", &proxy->tcp_options.sz_rcv_buffer, -1); MESA_load_profile_int_def(profile, "tcp", "sz_snd_buffer", &proxy->tcp_options.sz_snd_buffer, -1); MESA_load_profile_int_def(profile, "tcp", "so_keepalive", &proxy->tcp_options.so_keepalive, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_keepidle", &proxy->tcp_options.tcp_keepidle, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_keepintvl", &proxy->tcp_options.tcp_keepintvl, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_keepcnt", &proxy->tcp_options.tcp_keepcnt, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1); return 0; } static const char * __str_stat_spec_map[] = { [STAT_SIGPIPE] = "SIGPIPE", [STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdOpenKNI", [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdCloseKNI", [STAT_FD_CLOSE_BY_EVENT_WRITE] = "FdCloseUser", [STAT_FD_CLOSE_BY_EVENT_EOF] = "FdCloseEOF", [STAT_FD_CLOSE_BY_EVENT_ERROR] = "FdCloseError", [STAT_FD_INSTANT_CLOSE] = "FdCloseInstant", [STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdCloseDeferInQ", [STAT_FD_DEFER_CLOSE_SUCCESS] = "FdCloseDeferSuc", [STAT_STREAM_CREATE] = "StreamCreate", [STAT_STREAM_DESTROY] = "StreamDestroy", [STAT_STREAM_TCP_PLAIN] = "StreamTCPPlain", [STAT_STREAM_TCP_SSL] = "StreamTCPSSL", [TFE_STAT_MAX] = NULL }; int tfe_stat_init(struct tfe_proxy * proxy, const char * profile) { static const char * fieldstat_output = "./tfe.fieldstat"; static const char * app_name = "tfe3a"; int value = 0, i = 0; screen_stat_handle_t fs_handle = NULL; char statsd_server_ip[TFE_SYMBOL_MAX]={0}; char histogram_bins[TFE_SYMBOL_MAX]={0}; int statsd_server_port=0; MESA_load_profile_string_def(profile, "STAT", "statsd_server", statsd_server_ip, sizeof(statsd_server_ip), ""); MESA_load_profile_int_def(profile, "STAT", "statsd_port", &(statsd_server_port), 0); MESA_load_profile_string_def(profile, "STAT", "histogram_bins", histogram_bins, sizeof(histogram_bins), "0.5,0.8,0.9,0.95"); fs_handle = FS_create_handle(); FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, (int)strlen(fieldstat_output) + 1); FS_set_para(fs_handle, APP_NAME, app_name, (int)strlen(app_name) + 1); value = 1; FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value)); value = 0; FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value)); if(strlen(statsd_server_ip)>0 && statsd_server_port!=0) { FS_set_para(fs_handle, STATS_SERVER_IP, statsd_server_ip, strlen(statsd_server_ip)+1); FS_set_para(fs_handle, STATS_SERVER_PORT, &(statsd_server_port), sizeof(statsd_server_port)); } FS_set_para(fs_handle, HISTOGRAM_GLOBAL_BINS, histogram_bins, strlen(histogram_bins)+1); for (i = 0; i < TFE_STAT_MAX; i++) { proxy->fs_id[i] = FS_register(fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, __str_stat_spec_map[i]); } FS_start(fs_handle); proxy->fs_handle = fs_handle; return 0; } int main(int argc, char * argv[]) { const char * main_profile = "./conf/tfe/tfe.conf"; const char * future_profile= "./conf/tfe/future.conf"; unsigned int __log_level = RLOG_LV_INFO; MESA_load_profile_uint_def(main_profile, "log", "level", &__log_level, RLOG_LV_INFO); char __log_path[TFE_STRING_MAX]= {}; MESA_load_profile_string_def(main_profile, "log", "location", __log_path, sizeof(__log_path), "log/tfe.log"); g_default_logger = MESA_create_runtime_log_handle(__log_path, __log_level); if (unlikely(g_default_logger == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating default logger: %s", "log/tfe.log"); exit(EXIT_FAILURE); } future_promise_library_init(future_profile); tango_cache_global_init(); /* PROXY INSTANCE */ g_default_proxy = ALLOC(struct tfe_proxy, 1); assert(g_default_proxy); strcpy(g_default_proxy->name, "tfe3a"); /* CONFIG */ int ret = tfe_proxy_config(g_default_proxy, main_profile); CHECK_OR_EXIT(ret == 0, "Failed at loading profile %s, Exit.", main_profile); /* PERFOMANCE MONITOR */ tfe_stat_init(g_default_proxy, main_profile); /* LOGGER */ g_default_proxy->logger = g_default_logger; /* adds locking, only required if accessed from separate threads */ evthread_use_pthreads(); /* MAIN THREAD EVBASE */ g_default_proxy->evbase = event_base_new(); CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit."); /* GC EVENT */ g_default_proxy->gcev = event_new(g_default_proxy->evbase, -1, EV_PERSIST, __gc_handler_cb, g_default_proxy); CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. "); /* SSL INIT */ g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", g_default_proxy->evbase, g_default_logger); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) { g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); CHECK_OR_EXIT(g_default_proxy->sev[i], "Failed at create signal event. Exit."); evsignal_add(g_default_proxy->sev[i], NULL); } struct timeval gc_delay = {2, 0}; evtimer_add(g_default_proxy->gcev, &gc_delay); /* WORKER THREAD CTX Create */ tfe_proxy_work_thread_create_ctx(g_default_proxy); /* ACCEPTOR INIT */ g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); /* PLUGIN INIT */ unsigned int plugin_iterator = 0; for (struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) { ret = plugin_iter->on_init(g_default_proxy); CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol); TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); } ret = tfe_proxy_work_thread_run(g_default_proxy); CHECK_OR_EXIT(ret == 0, "Failed at creating thread. Exit."); TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); event_base_dispatch(g_default_proxy->evbase); return 0; } const char * tfe_version() { return __tfe_version; } unsigned int tfe_proxy_get_work_thread_count(void) { return g_default_proxy->nr_work_threads; } struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) { assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->evbase; } struct evdns_base* tfe_proxy_get_work_thread_dnsbase(unsigned int thread_id) { assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->dnsbase; } struct event_base * tfe_proxy_get_gc_evbase(void) { return g_default_proxy->evbase; } screen_stat_handle_t tfe_proxy_get_fs_handle(void) { return g_default_proxy->fs_handle; } int tfe_proxy_ssl_add_trust_ca(const char* pem_file) { return ssl_manager_add_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file); } int tfe_proxy_ssl_del_trust_ca(const char* pem_file) { return ssl_manager_del_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file); } int tfe_proxy_ssl_add_crl(const char* pem_file) { return ssl_manager_add_crl(g_default_proxy->ssl_mgr_handler, pem_file); } int tfe_proxy_ssl_del_crl(const char* pem_file) { return ssl_manager_del_crl(g_default_proxy->ssl_mgr_handler, pem_file); } void tfe_proxy_ssl_reset_trust_ca(void) { ssl_manager_reset_trust_ca(g_default_proxy->ssl_mgr_handler); return; }