/* * Proxy engine, built around libevent 2.x. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#include //#include //#include #include #include #include #include #include #include /* Breakpad */ #include /* Systemd */ #include #include extern struct tcp_policy_enforcer *tcp_policy_enforcer_create(void *logger); extern struct chaining_policy_enforcer *chaining_policy_enforcer_create(void *logger); extern struct ssl_policy_enforcer *ssl_policy_enforcer_create(); extern enum ssl_stream_action ssl_policy_enforce(struct ssl_stream *upstream, void *u_para); static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1, SIGUSR2}; /* Global Resource */ void * g_default_logger = NULL; struct tfe_proxy * g_default_proxy = NULL; bool g_print_to_stderr = true; int worker_thread_ready = 0; /* Per thread resource */ thread_local unsigned int __currect_thread_id = 0; thread_local void * __currect_default_logger = NULL; #define TFE_VAR_VERSION_CATTER(v) __attribute__((__used__)) const char * TFE_VERSION_version_##v = NULL #define TFE_VAR_VERSION_EXPEND(v) TFE_VAR_VERSION_CATTER(v) extern "C" { /* VERSION TAG */ #ifdef TFE_VAR_VERSION TFE_VAR_VERSION_EXPEND(TFE_VAR_VERSION); #else static __attribute__((__used__)) const char * TFE_VERSION_version_UNKNOWN = NULL; #endif #undef TFE_VAR_VERSION_CATTER #undef TFE_VAR_VERSION_EXPEND } /* VERSION STRING */ #ifdef TFE_GIT_VERSION static __attribute__((__used__)) const char * __tfe_version = TFE_GIT_VERSION; #else static __attribute__((__used__)) const char * tfe_version = "Unknown"; #endif struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx) { unsigned int min_thread_id = 0; unsigned int min_load; static unsigned int counter=0; counter++; // least_conn if (ctx->load_balance == LEAST_CONN) { for (unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) { struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid]; unsigned int thread_load = ATOMIC_READ(&thread_ctx->load); if (tid == 0) { min_thread_id = tid; min_load = thread_load; continue; } min_thread_id = min_load > thread_load ? tid : min_thread_id; min_load = min_load > thread_load ? thread_load : min_load; } } // round_robin else { min_thread_id = counter % ctx->nr_work_threads; } ATOMIC_INC(&ctx->work_threads[min_thread_id]->load); return ctx->work_threads[min_thread_id]; } void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx) { ATOMIC_DEC(&thread_ctx->load); } /* 检查本进程是否通过SYSTEMD启动 */ static int check_is_started_by_notify() { char * notify_socket = getenv("NOTIFY_SOCKET"); return notify_socket == NULL ? 0 : 1; } int tfe_thread_set_affinity(int core_id) { int num_cores = sysconf(_SC_NPROCESSORS_ONLN); if (core_id < 0 || core_id >= num_cores) { return EINVAL; } cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(core_id, &cpuset); return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); } int tfe_proxy_fds_accept(struct tfe_proxy *ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg *cmsg) { struct tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); enum tfe_stream_proto stream_protocol; uint8_t stream_protocol_in_char = 0; int tcp_passthrough = -1; uint16_t size = 0; int result = 0; result = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char, sizeof(stream_protocol_in_char), &size); if (unlikely(result < 0)) { TFE_LOG_ERROR(ctx->logger, "failed at fetch connection's protocol from cmsg: %s", strerror(-result)); goto __errout; } stream_protocol = (enum tfe_stream_proto)stream_protocol_in_char; tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &stream_protocol, sizeof(stream_protocol)); tfe_stream_cmsg_setup(stream, cmsg); if (ctx->tcp_options.enable_overwrite <= 0) { result = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_PASSTHROUGH, (unsigned char *)&tcp_passthrough, sizeof(tcp_passthrough), &size); if (result < 0) { TFE_LOG_ERROR(ctx->logger, "failed at fetch connection's tcp_passthrough from cmsg: %s", strerror(-result)); // goto __errout; } } /* FOR DEBUG */ if (unlikely(ctx->tcp_all_passthrough) || tcp_passthrough > 0) { bool __true = true; uint8_t ssl_intercept_status = SSL_ACTION_PASSTHROUGH; enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN; tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true)); tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); tfe_cmsg_set(cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (const unsigned char *)"TCP Passthrough", (uint16_t)strlen("TCP Passthrough")); tfe_cmsg_set(cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (const unsigned char *)&ssl_intercept_status, (uint16_t)sizeof(ssl_intercept_status)); } TFE_LOG_DEBUG(ctx->logger, "%p: fetch tcp options: cmsg's tcp_passthrough: %d, conf's tcp_passthrough: %d, enalbe passthrough: %d", stream, tcp_passthrough, ctx->tcp_all_passthrough, (ctx->tcp_all_passthrough > 0 || tcp_passthrough > 0) ? 1 : 0); result = tfe_stream_init_by_fds(stream, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s); if (result < 0) { TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.", stream, fd_downstream, fd_upstream, stream_protocol); goto __errout; } else { TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted.", stream, fd_downstream, fd_upstream, stream_protocol); } return 0; __errout: if(stream != NULL) tfe_stream_destory((struct tfe_stream_private *)stream); return -1; } void tfe_proxy_loopbreak(tfe_proxy * ctx) { event_base_loopbreak(ctx->evbase); } void tfe_proxy_free(tfe_proxy * ctx) { return; } static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { struct tfe_thread_ctx *ctx = (struct tfe_thread_ctx *)arg; struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); ATOMIC_SET(&(ctx->lastime), now.tv_sec); // TFE_LOG_DEBUG(g_default_logger, "Worker thread %d Update time %lds", ctx->thread_id, now.tv_sec); while (ATOMIC_READ(&(ctx->proxy->make_work_thread_sleep)) > 0) { TFE_LOG_ERROR(g_default_logger, "recv SIGUSR1, make worker thread[%d] %d sleep", ctx->thread_id, ctx->readable_tid); sleep(1); } } static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; struct maat *maat=NULL; const char *profile_path = "./conf/tfe/tfe.conf"; int log_level=LOG_LEVEL_FATAL; switch (fd) { case SIGTERM: case SIGQUIT: case SIGHUP: TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf"); MESA_handle_runtime_log_reconstruction(NULL); MESA_load_profile_int_def(profile_path, "maat", "log_level", &(log_level), LOG_LEVEL_FATAL); maat = tfe_get_maat_handle(); if(maat) { maat_reload_log_level(maat, (enum log_level)log_level); } break; case SIGUSR1: // enable work thread sleep TFE_LOG_ERROR(ctx->logger, "recv SIGUSR1, make worker thread sleep"); ATOMIC_SET(&(ctx->make_work_thread_sleep), 1); break; case SIGUSR2: // disable work thread sleep TFE_LOG_ERROR(ctx->logger, "recv SIGUSR2, wake worker thread from sleep"); ATOMIC_ZERO(&(ctx->make_work_thread_sleep)); break; case SIGPIPE: TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1); TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); break; default: TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd); break; } } static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy *ctx = (tfe_proxy *)arg; for (int i = 0; i < TFE_STAT_MAX; i++) { long long delta = ATOMIC_EXCHANGE(&(ctx->stat_val[i]), 0); fieldstat_easy_counter_incrby(ctx->fs_handle, 0, ctx->fs_id[i], NULL, 0, delta); } if (ctx->kni_v4_acceptor != NULL) packet_io_fs_dump(ctx->kni_v4_acceptor->packet_io_fs); timestamp_update(); return; } static void * tfe_work_thread(void * arg) { struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; struct timeval timer_delay = {2, 0}; ctx->readable_tid = syscall(SYS_gettid); struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, ctx); if (unlikely(ev == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id); exit(EXIT_FAILURE); } evtimer_add(ev, &timer_delay); __currect_thread_id = ctx->thread_id; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); char affinity[32] = {0}; if (ctx->proxy->enable_cpu_affinity) { tfe_thread_set_affinity(ctx->proxy->cpu_affinity_mask[ctx->thread_id + 1]); snprintf(affinity, sizeof(affinity), "affinity cpu%d", ctx->proxy->cpu_affinity_mask[ctx->thread_id + 1]); } TFE_LOG_INFO(g_default_logger, "Work thread %u %s is running...", ctx->thread_id, ctx->proxy->enable_cpu_affinity ? affinity : ""); event_base_dispatch(ctx->evbase); assert(0); event_free(ev); TFE_LOG_ERROR(g_default_logger, "Work thread %u is exit...", ctx->thread_id); return (void *) NULL; } void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) { unsigned int i = 0; for (i = 0; i < proxy->nr_work_threads; i++) { proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1); proxy->work_threads[i]->thread_id = i; proxy->work_threads[i]->evbase = event_base_new(); proxy->work_threads[i]->dnsbase = evdns_base_new(proxy->work_threads[i]->evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS); proxy->work_threads[i]->evhttp = key_keeper_evhttp_init(proxy->work_threads[i]->evbase, proxy->work_threads[i]->dnsbase, proxy->key_keeper_handler); proxy->work_threads[i]->proxy = proxy; } return; } int tfe_proxy_work_thread_run(struct tfe_proxy * proxy) { struct tfe_thread_ctx * __thread_ctx = NULL; unsigned int i = 0; int ret = 0; for (i = 0; i < proxy->nr_work_threads; i++) { __thread_ctx = proxy->work_threads[i]; struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); ATOMIC_SET(&(__thread_ctx->lastime), now.tv_sec); ret = pthread_create(&__thread_ctx->thr, NULL, tfe_work_thread, (void *) __thread_ctx); if (unlikely(ret < 0)) { TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, strerror(errno)); /* after log, reset errno */ errno = 0; return -1; } } return 0; } int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) { /* Worker threads */ MESA_load_profile_uint_def(profile, "system", "nr_worker_threads", &proxy->nr_work_threads, 1); MESA_load_profile_uint_def(profile, "system", "enable_cpu_affinity", &proxy->enable_cpu_affinity, 0); MESA_load_profile_uint_range(profile, "system", "cpu_affinity_mask", TFE_THREAD_MAX, proxy->cpu_affinity_mask); // LEAST_CONN = 0; ROUND_ROBIN = 1, MESA_load_profile_uint_def(profile, "system", "load_balance", (unsigned int *)&proxy->load_balance, ROUND_ROBIN); if (proxy->nr_work_threads < 1 || proxy->nr_work_threads > TFE_THREAD_MAX) { TFE_LOG_ERROR(g_default_logger, "'nr_worker_threads' is invalid, only support [1, %d].", TFE_THREAD_MAX); return -1; } if (proxy->enable_cpu_affinity) { unsigned int num_cores = sysconf(_SC_NPROCESSORS_ONLN); if (proxy->nr_work_threads > num_cores - 2) { TFE_LOG_ERROR(g_default_logger, "'nr_worker_threads' is invalid, suggest [1, cpu_cores - 2]."); return -1; } for (unsigned int i = 0; i < proxy->nr_work_threads; i++) { if (proxy->cpu_affinity_mask[i] <= 0 || proxy->cpu_affinity_mask[i] >= num_cores) { TFE_LOG_ERROR(g_default_logger, "'cpu_affinity_mask' is invalid, only support [1, %d].", num_cores); return -1; } } } /* Debug */ MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0); /* ratelimit */ MESA_load_profile_uint_def(profile, "ratelimit", "read_rate", &proxy->rate_limit_options.read_rate, 0); MESA_load_profile_uint_def(profile, "ratelimit", "read_burst", &proxy->rate_limit_options.read_burst, 0); MESA_load_profile_uint_def(profile, "ratelimit", "write_rate", &proxy->rate_limit_options.write_rate, 0); MESA_load_profile_uint_def(profile, "ratelimit", "write_burst", &proxy->rate_limit_options.write_burst, 0); if(proxy->rate_limit_options.read_rate != 0 || proxy->rate_limit_options.read_burst != 0 || proxy->rate_limit_options.write_rate != 0 || proxy->rate_limit_options.write_burst != 0) { proxy->en_rate_limit = 1; } /* TCP options, -1 means unset, we shall not call setsockopt */ MESA_load_profile_int_def(profile, "tcp", "sz_rcv_buffer", &proxy->tcp_options.sz_rcv_buffer, -1); MESA_load_profile_int_def(profile, "tcp", "sz_snd_buffer", &proxy->tcp_options.sz_snd_buffer, -1); MESA_load_profile_int_def(profile, "tcp", "enable_overwrite", &proxy->tcp_options.enable_overwrite, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_nodelay", &proxy->tcp_options.tcp_nodelay, -1); MESA_load_profile_int_def(profile, "tcp", "so_keepalive", &proxy->tcp_options.so_keepalive, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_keepidle", &proxy->tcp_options.tcp_keepidle, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_keepintvl", &proxy->tcp_options.tcp_keepintvl, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_keepcnt", &proxy->tcp_options.tcp_keepcnt, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1); MESA_load_profile_int_def(profile, "traffic_steering", "enable_steering_http", &proxy->traffic_steering_options.enable_steering_http, 0); MESA_load_profile_int_def(profile, "traffic_steering", "enable_steering_ssl", &proxy->traffic_steering_options.enable_steering_ssl, 0); MESA_load_profile_int_def(profile, "traffic_steering", "so_mask_client", &proxy->traffic_steering_options.so_mask_client, 0x11); MESA_load_profile_int_def(profile, "traffic_steering", "so_mask_server", &proxy->traffic_steering_options.so_mask_server, 0x22); MESA_load_profile_string_def(profile, "traffic_steering", "device_client", proxy->traffic_steering_options.device_client, sizeof(proxy->traffic_steering_options.device_client), "eth_client"); MESA_load_profile_string_def(profile, "traffic_steering", "device_server", proxy->traffic_steering_options.device_server, sizeof(proxy->traffic_steering_options.device_server), "eth_server"); return 0; } static const char * __str_stat_spec_map[] = { [STAT_SIGPIPE] = "SIGPIPE", [STAT_FD_OPEN_BY_KNI_ACCEPT] = "fd_rx", [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "fd_rx_err", [STAT_FD_CLOSE] = "fd_inst_cls", [STAT_STREAM_OPEN] = "stm_open", [STAT_STREAM_CLS] = "stm_cls", [STAT_STREAM_CLS_DOWN_EOF] = "dstm_eof", [STAT_STREAM_CLS_UP_EOF] = "ustm_eof", [STAT_STREAM_CLS_DOWN_ERR] = "dstm_err", [STAT_STREAM_CLS_UP_ERR] = "ustm_err", [STAT_STREAM_CLS_KILL] = "stm_kill", [STAT_STREAM_INTERCEPT] = "stm_incpt", [STAT_STREAM_BYPASS] = "stm_byp", [STAT_STREAM_INCPT_BYTES] = "stm_incpt_B", [STAT_STREAM_INCPT_DOWN_BYTES] = "dstm_incpt_B", [STAT_STREAM_INCPT_UP_BYTES] = "ustm_incpt_B", [STAT_STREAM_TCP_PLAIN] = "plain", [STAT_STREAM_TCP_SSL] = "ssl", [STAT_STEERING_SSL_CONN] = "stee_ssl_conn", [STAT_STEERING_HTTP_CONN] = "stee_http_conn", [STAT_STEERING_CLIENT_TX_B] = "stee_c_tx_B", [STAT_STEERING_SERVER_RX_B] = "stee_s_rx_B", [STAT_STEERING_SERVER_TX_B] = "stee_s_tx_B", [STAT_STEERING_CLIENT_RX_B] = "stee_c_rx_B", [STAT_STEERING_CLIENT_ERR] = "stee_c_err", [STAT_STEERING_SERVER_ERR] = "stee_s_err", [STAT_STEERING_CLIENT_EOF] = "stee_c_eof", [STAT_STEERING_SERVER_EOF] = "stee_s_eof", [TFE_STAT_MAX] = NULL }; int tfe_stat_init(struct tfe_proxy *proxy, const char *profile) { int output_cycle = 0; char output_file[TFE_STRING_MAX] = {0}; MESA_load_profile_string_def(profile, "STAT", "output_file", output_file, sizeof(output_file), "log/tfe.fs4"); MESA_load_profile_int_def(profile, "STAT", "output_cycle", &(output_cycle), 5); proxy->fs_handle = fieldstat_easy_new(1, "tfe", NULL, 0); fieldstat_easy_enable_auto_output(proxy->fs_handle, output_file, output_cycle); for (int i = 0; i < TFE_STAT_MAX; i++) { proxy->fs_id[i] = fieldstat_easy_register_counter(proxy->fs_handle, __str_stat_spec_map[i]); } return 0; } void tfe_proxy_acceptor_init(struct tfe_proxy *proxy, const char *profile) { #if 0 MESA_load_profile_uint_def(profile, "system", "enable_kni_v1", &proxy->en_kni_v1_acceptor, 0); MESA_load_profile_uint_def(profile, "system", "enable_kni_v2", &proxy->en_kni_v2_acceptor, 0); MESA_load_profile_uint_def(profile, "system", "enable_kni_v3", &proxy->en_kni_v3_acceptor, 0); MESA_load_profile_uint_def(profile, "system", "enable_kni_v4", &proxy->en_kni_v4_acceptor, 0); int ret = proxy->en_kni_v1_acceptor + proxy->en_kni_v2_acceptor + proxy->en_kni_v3_acceptor + proxy->en_kni_v4_acceptor; CHECK_OR_EXIT((ret == 1), "Invalid KNI acceptor. Exit."); if (proxy->en_kni_v1_acceptor) { g_default_proxy->kni_v1_acceptor = acceptor_kni_v1_create(proxy, profile, proxy->logger); CHECK_OR_EXIT(g_default_proxy->kni_v1_acceptor, "Failed at init KNIv1 acceptor. Exit. "); } if (proxy->en_kni_v2_acceptor) { g_default_proxy->kni_v2_acceptor = acceptor_kni_v2_create(g_default_proxy, profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_v2_acceptor, "Failed at init KNIv2 acceptor. Exit. "); } if (proxy->en_kni_v3_acceptor) { g_default_proxy->kni_v3_acceptor = acceptor_kni_v3_create(g_default_proxy, profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_v3_acceptor, "Failed at init KNIv3 acceptor. Exit. "); } if (proxy->en_kni_v4_acceptor) { g_default_proxy->kni_v4_acceptor = acceptor_kni_v4_create(g_default_proxy, profile); CHECK_OR_EXIT(g_default_proxy->kni_v4_acceptor, "Failed at init KNIv4 acceptor. Exit. "); } #endif g_default_proxy->kni_v4_acceptor = acceptor_kni_v4_create(g_default_proxy, profile); CHECK_OR_EXIT(g_default_proxy->kni_v4_acceptor, "Failed at init KNIv4 acceptor. Exit. "); return; } static void usage(char *cmd) { fprintf(stderr, "USAGE: %s [OPTIONS]\n", cmd); fprintf(stderr, " -v -- show version\n"); fprintf(stderr, " -g -- generate coredump\n"); fprintf(stderr, " -h -- show help info\n\n"); fprintf(stderr, "kill -s SIGHUP $pid -- reload zlog configure\n"); fprintf(stderr, "kill -s SIGUSR1 $pid -- make worker thread sleep\n"); fprintf(stderr, "kill -s SIGUSR2 $pid -- wake worker thread from sleep\n"); } int main(int argc, char * argv[]) { const char * main_profile = "./conf/tfe/tfe.conf"; const char * future_profile= "./conf/tfe/future.conf"; const char * zlog_profile = "./conf/tfe/zlog.conf"; int ret = 0; int opt = 0; bool to_generate_a_segv = false; while ((opt = getopt(argc, argv, "vgh")) != -1) { switch (opt) { case 'v': fprintf(stderr, "Tango Frontend Engine, Version: %s\n", tfe_version()); return 0; case 'g': fprintf(stderr, "Tango Frontend Engine, prepare to generate a coredump."); to_generate_a_segv = true; break; case 'h': /* fall through */ default: usage(argv[0]); return 0; } } fprintf(stderr, "Tango Frontend Engine, Version: %s", __tfe_version); if (0 != MESA_handle_runtime_log_creation(zlog_profile)) { fprintf(stderr, "MESA_handle_runtime_log_creation return error\n"); exit(EXIT_FAILURE); } /* adds locking, only required if accessed from separate threads */ evthread_use_pthreads(); g_default_logger = (void *)MESA_create_runtime_log_handle("tfe", RLOG_LV_DEBUG); if (unlikely(g_default_logger == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at creating default logger: %s", "log/tfe.log"); exit(EXIT_FAILURE); } /* PROXY INSTANCE */ g_default_proxy = ALLOC(struct tfe_proxy, 1); assert(g_default_proxy); strcpy(g_default_proxy->name, "tfe"); g_default_proxy->breakpad = breakpad_init(main_profile, "system", g_default_logger, tfe_version()); CHECK_OR_EXIT(g_default_proxy->breakpad, "Failed at starting breakpad. Exit."); if (to_generate_a_segv) { breakpad_segv_generate(); } future_promise_library_init(future_profile); /* CONFIG */ ret = tfe_proxy_config(g_default_proxy, main_profile); CHECK_OR_EXIT(ret == 0, "Failed at loading profile %s, Exit.", main_profile); /* PERFOMANCE MONITOR */ tfe_stat_init(g_default_proxy, main_profile); /* LOGGER */ g_default_proxy->logger = g_default_logger; /* MAIN THREAD EVBASE */ g_default_proxy->evbase = event_base_new(); CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit."); /* GC EVENT */ g_default_proxy->gcev = event_new(g_default_proxy->evbase, -1, EV_PERSIST, __gc_handler_cb, g_default_proxy); CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. "); /* KEY_KEEP INIT */ g_default_proxy->key_keeper_handler = key_keeper_init(main_profile, "key_keeper", g_default_logger); CHECK_OR_EXIT(g_default_proxy->key_keeper_handler, "Failed at init Key keeper. Exit."); /* RESOURCE INIT */ ret = tfe_env_init(); CHECK_OR_EXIT(ret == 0, "TFE bussiness resource init failed. Exit."); /* SSL INIT */ g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", g_default_proxy->evbase, g_default_proxy->key_keeper_handler, g_default_logger); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) { g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); CHECK_OR_EXIT(g_default_proxy->sev[i], "Failed at create signal event. Exit."); evsignal_add(g_default_proxy->sev[i], NULL); } struct timeval gc_delay = {0, 500 * 1000}; // Microseconds, we set 500 miliseconds here. evtimer_add(g_default_proxy->gcev, &gc_delay); timestamp_update(); /* WORKER THREAD CTX Create */ tfe_proxy_work_thread_create_ctx(g_default_proxy); tfe_proxy_acceptor_init(g_default_proxy, main_profile); /* SCM Sender */ g_default_proxy->scm_sender = sender_scm_init(main_profile, "kni", g_default_logger); CHECK_OR_EXIT(g_default_proxy->scm_sender != NULL, "Failed at creating scm sender, Exit."); /* PLUGIN INIT */ unsigned int plugin_iterator = 0; for (struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) { ret = plugin_iter->on_init(g_default_proxy); CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol); TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); } g_default_proxy->int_ply_enforcer = intercept_policy_enforcer_create(g_default_logger); CHECK_OR_EXIT(g_default_proxy->int_ply_enforcer != NULL, "Failed at creating intercept policy enforcer. Exit."); g_default_proxy->tcp_ply_enforcer = tcp_policy_enforcer_create(g_default_logger); CHECK_OR_EXIT(g_default_proxy->tcp_ply_enforcer != NULL, "Failed at creating tcp policy enforcer. Exit."); g_default_proxy->ssl_ply_enforcer = ssl_policy_enforcer_create(); CHECK_OR_EXIT(g_default_proxy->ssl_ply_enforcer != NULL, "Failed at creating ssl policy enforcer. Exit."); g_default_proxy->chain_ply_enforcer = chaining_policy_enforcer_create(g_default_logger); CHECK_OR_EXIT(g_default_proxy->chain_ply_enforcer != NULL, "Failed at creating chaining policy enforcer. Exit."); ssl_manager_set_new_upstream_cb(g_default_proxy->ssl_mgr_handler, ssl_policy_enforce, g_default_proxy->ssl_ply_enforcer); ret = tfe_proxy_work_thread_run(g_default_proxy); CHECK_OR_EXIT(ret == 0, "Failed at creating thread. Exit."); /* Watchdog KNI */ g_default_proxy->watchdog_kni = watchdog_kni_create(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->watchdog_kni != NULL, "Failed at creating KNI watchdog, Exit."); /* Watchdog TFE */ g_default_proxy->watchdog_tfe = watchdog_tfe_create(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->watchdog_tfe != NULL, "Failed at creating TFE watchdog, Exit."); /* Watchdog 3rd device */ g_default_proxy->watchdog_3rd_device = watchdog_3rd_device_create(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->watchdog_3rd_device != NULL, "Failed at creating 3rd device watchdog, Exit."); TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized, Version: %s.", __tfe_version); /* If TFE is run by systemd's notify, then tell the systemd our tfe is ready. * and disable the stderr log, only print logs into files */ if(check_is_started_by_notify()) { sd_notify(0, "READY=1"); g_print_to_stderr = false; sleep(1); } g_print_to_stderr = false; worker_thread_ready = 1; event_base_dispatch(g_default_proxy->evbase); return 0; } const char * tfe_version() { return __tfe_version; } unsigned int tfe_proxy_get_work_thread_count(void) { return g_default_proxy->nr_work_threads; } struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) { assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->evbase; } struct evdns_base* tfe_proxy_get_work_thread_dnsbase(unsigned int thread_id) { assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->dnsbase; } struct evhttp_connection* tfe_proxy_get_work_thread_evhttp(unsigned int thread_id) { assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->evhttp; } struct event_base * tfe_proxy_get_gc_evbase(void) { return g_default_proxy->evbase; } struct fieldstat_easy *tfe_proxy_get_fs_handle(void) { return g_default_proxy->fs_handle; } void * tfe_proxy_get_error_logger(void) { return g_default_logger; } int tfe_proxy_ssl_add_trust_ca(const char* pem_file) { return ssl_manager_add_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file); } int tfe_proxy_ssl_del_trust_ca(const char* pem_file) { return ssl_manager_del_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file); } int tfe_proxy_ssl_add_crl(const char* pem_file) { return ssl_manager_add_crl(g_default_proxy->ssl_mgr_handler, pem_file); } int tfe_proxy_ssl_del_crl(const char* pem_file) { return ssl_manager_del_crl(g_default_proxy->ssl_mgr_handler, pem_file); } void tfe_proxy_ssl_reset_trust_ca(void) { ssl_manager_reset_trust_ca(g_default_proxy->ssl_mgr_handler); return; } void tfe_proxy_ssl_reset_trust_ca_finish(void) { ssl_manager_reset_trust_ca_finish(g_default_proxy->ssl_mgr_handler); return; }