diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 91063c6..42354c3 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -6,8 +6,9 @@ #pragma once #include -#define TFE_STRING_MAX 2048 -#define TFE_SYMBOL_MAX 64 +#define TFE_STRING_MAX 2048 +#define TFE_SYMBOL_MAX 64 +#define TFE_THREAD_MAX 128 #ifndef TFE_CONFIG_BACKLOG_DEFAULT #define TFE_CONFIG_BACKLOG_DEFAULT 20 @@ -26,13 +27,22 @@ #define unlikely(expr) __builtin_expect((expr), 0) #define TFE_LOG_ERROR(handler, fmt, ...) \ -do { MESA_handle_runtime_log(handler, RLOG_LV_FATAL, NULL, fmt, ##__VA_ARGS__); } while(0) \ +do { fprintf(stderr, fmt "\n" , ##__VA_ARGS__); \ + MESA_handle_runtime_log(handler, RLOG_LV_FATAL, "tfe", fmt, ##__VA_ARGS__); } while(0) #define TFE_LOG_INFO(handler, fmt, ...) \ -do { MESA_handle_runtime_log(handler, RLOG_LV_INFO, NULL, fmt, ##__VA_ARGS__); } while(0) \ +do { fprintf(stderr, fmt "\n", ##__VA_ARGS__); \ + MESA_handle_runtime_log(handler, RLOG_LV_INFO, "tfe", fmt, ##__VA_ARGS__); } while(0) \ #define TFE_LOG_DEBUG(handler, fmt, ...) \ -do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, NULL, fmt, ##__VA_ARGS__); } while(0) \ +do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, "tfe", fmt, ##__VA_ARGS__); } while(0) \ + +#define TFE_STREAM_LOG_DEBUG(stream, fmt, ...) +#define TFE_STREAM_LOG_INFO(stream, fmt, ...) +#define TFE_STREAM_LOG_ERROR(stream, fmt, ...) + +#define TFE_STREAM_TRACE_TAG_INFO(stream, tag, fmt, ...) +#define TFE_STREAM_TRACE_TAG_VERBOSE(stream, tag, fmt, ...) #ifndef offsetof /** Return the offset of a field in a structure. */ diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index 86e0f32..c0eb6e0 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -7,3 +7,5 @@ target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/interna target_link_libraries(tfe common) target_link_libraries(tfe pthread dl openssl-ssl-static openssl-crypto-static pthread libevent-static libevent-static-openssl libevent-static-pthreads MESA_handle_logger MESA_prof_load MESA_htable wiredcfg) + +install(TARGETS tfe RUNTIME DESTINATION ./) diff --git a/platform/include/internal/kni_acceptor.h b/platform/include/internal/kni_acceptor.h index 56404ff..7b29560 100644 --- a/platform/include/internal/kni_acceptor.h +++ b/platform/include/internal/kni_acceptor.h @@ -1,7 +1,7 @@ #pragma once struct tfe_proxy; -struct kni_acceptor_ctx; +struct kni_acceptor; -struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger); -void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx); +struct kni_acceptor * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger); +void kni_acceptor_deinit(struct kni_acceptor *ctx); diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 39180bb..b48cdac 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -12,7 +12,7 @@ struct tfe_thread_ctx { pthread_t thr; unsigned int thread_id; - size_t load; + unsigned int load; struct event_base * evbase; unsigned char running; @@ -20,9 +20,6 @@ struct tfe_thread_ctx struct tfe_stats stat; struct cert_mgr * cert_mgr; - struct sess_cache * dsess_cache; - struct sess_cache * ssess_cache; - unsigned int nr_modules; const struct tfe_plugin * modules; }; @@ -84,7 +81,9 @@ struct tfe_stream_private int plugin_num; struct plugin_ctx * plug_ctx; - unsigned char passthrough; /* 1 if SSL passthrough is active */ + + /* TCP forward without scan or decode when the passthough is set */ + bool passthough; /* For defer connection setup */ evutil_socket_t defer_fd_downstream; @@ -98,5 +97,5 @@ struct tfe_stream_private static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) { - return _stream->proxy_ref->main_logger; + return _stream->proxy_ref->logger; } diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index b620e28..82a7083 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -6,6 +6,7 @@ struct ssl_mgr; struct key_keeper; +struct kni_acceptor; struct tfe_proxy { @@ -14,18 +15,19 @@ struct tfe_proxy struct event * sev[8]; struct event * gcev; - struct tfe_config * opts; - void * main_logger; + void * logger; unsigned int nr_work_threads; - struct tfe_thread_ctx * work_threads; + struct tfe_thread_ctx * work_threads[TFE_THREAD_MAX]; unsigned int nr_modules; struct tfe_plugin * modules; - void * io_mod; struct ssl_mgr * ssl_mgr_handler; struct key_keeper * key_keeper_handler; + struct kni_acceptor * kni_acceptor_handler; + + unsigned int tcp_all_passthrough; }; struct tfe_proxy_accept_para @@ -35,9 +37,14 @@ struct tfe_proxy_accept_para evutil_socket_t downstream_fd; /* Session Type */ + bool is_set_session_type; enum tfe_session_proto session_type; + bool passthrough; }; +struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx); +void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx); + struct tfe_proxy * tfe_proxy_new(const char * profile); int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para); void tfe_proxy_run(struct tfe_proxy * proxy); diff --git a/platform/include/internal/tcp_stream.h b/platform/include/internal/tcp_stream.h index fc052a0..98ecf3f 100644 --- a/platform/include/internal/tcp_stream.h +++ b/platform/include/internal/tcp_stream.h @@ -3,6 +3,13 @@ #include struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx); -void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type, - evutil_socket_t fd_downstream, evutil_socket_t fd_upstream); + +enum tfe_stream_option +{ + TFE_STREAM_OPT_SESSION_TYPE, + TFE_STREAM_OPT_PASSTHROUGH +}; + +int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg); +void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream); void tfe_stream_destory(struct tfe_stream_private * stream); diff --git a/platform/src/kni_acceptor.cpp b/platform/src/kni_acceptor.cpp index f165bd5..be87a23 100644 --- a/platform/src/kni_acceptor.cpp +++ b/platform/src/kni_acceptor.cpp @@ -40,7 +40,7 @@ struct kni_tlv_info char value; }; -struct kni_acceptor_ctx +struct kni_acceptor { /* INPUT */ struct tfe_proxy * proxy; @@ -63,7 +63,7 @@ struct kni_acceptor_ctx pid_t pid_kni_conn; }; -void __kni_conn_close(struct kni_acceptor_ctx * ctx) +void __kni_conn_close(struct kni_acceptor * ctx) { if (ctx->fd_kni_conn != 0) close(ctx->fd_kni_conn); if (ctx->ev_kni_conn != NULL) event_free(ctx->ev_kni_conn); @@ -72,7 +72,7 @@ void __kni_conn_close(struct kni_acceptor_ctx * ctx) void __kni_event_cb(evutil_socket_t fd, short what, void * user) { - struct kni_acceptor_ctx * __ctx = (struct kni_acceptor_ctx *)user; + struct kni_acceptor * __ctx = (struct kni_acceptor *)user; struct cmsghdr * __cmsghdr; struct tfe_proxy_accept_para __accept_para; int * __fds = NULL; @@ -112,12 +112,12 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user) } else if (rd < 0) { - TFE_LOG_ERROR(__ctx->logger, "Failed at recving fds from KNI connection: %s", strerror(errno)); + TFE_LOG_ERROR(__ctx->logger, "Failed at recving fds from KNI connection: %s. ", strerror(errno)); goto __close_kni_connection; } else if (rd == 0) { - TFE_LOG_INFO(__ctx->logger, "KNI connected from process %u", __ctx->pid_kni_conn); + TFE_LOG_INFO(__ctx->logger, "KNI connected from process %u. ", __ctx->pid_kni_conn); goto __close_kni_connection; } @@ -163,12 +163,14 @@ __drop_recieved_fds: void __kni_listener_accept_cb(struct evconnlistener * listener, evutil_socket_t fd, struct sockaddr * sk_addr, int sk_len, void * user) { - struct kni_acceptor_ctx * __ctx = (struct kni_acceptor_ctx *)user; + struct kni_acceptor * __ctx = (struct kni_acceptor *)user; struct event * __event = NULL; struct ucred __cr{}; socklen_t __cr_len = sizeof(struct ucred); + int ret = 0; + /* There is only one KNI process can connect to TFE. * If ev_kni_conn is not NULL, there's already a KNI connected to TFE. * We need to refuse this connection @@ -195,6 +197,13 @@ void __kni_listener_accept_cb(struct evconnlistener * listener, evutil_socket_t goto __close_this_connection; } + ret = event_add(__event, NULL); + if (unlikely(ret < 0)) + { + TFE_LOG_ERROR(__ctx->logger, "Failed at adding event to evbase, close this connection. "); + goto __close_this_connection; + } + __ctx->fd_kni_conn = fd; __ctx->ev_kni_conn = __event; __ctx->pid_kni_conn = __cr.pid; @@ -208,7 +217,7 @@ __close_this_connection: void * __kni_listener_thread_entry(void * args) { - struct kni_acceptor_ctx * __ctx = (struct kni_acceptor_ctx *)args; + struct kni_acceptor * __ctx = (struct kni_acceptor *)args; assert(__ctx != NULL && __ctx->thread == pthread_self()); TFE_LOG_DEBUG(__ctx->logger, "Starting KNI listener thread..."); @@ -217,7 +226,7 @@ void * __kni_listener_thread_entry(void * args) return (void *)NULL; } -void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx) +void kni_acceptor_deinit(struct kni_acceptor *ctx) { if (ctx != NULL && ctx->ev_listener != NULL) { @@ -242,12 +251,13 @@ void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx) return; } -struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger) +struct kni_acceptor * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger) { - struct kni_acceptor_ctx * __ctx = ALLOC(struct kni_acceptor_ctx, 1); + struct kni_acceptor * __ctx = ALLOC(struct kni_acceptor, 1); struct sockaddr_un __sockaddr_un; int ret = 0; + __ctx->proxy = proxy; __ctx->profile = profile; __ctx->logger = logger; @@ -290,9 +300,7 @@ struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char goto __errout; } - TFE_LOG_INFO(__ctx->logger, "KNI UNIXDOMAIN FILE: %s", __ctx->str_unixdomain_file); - TFE_LOG_INFO(__ctx->logger, "KNI LISTENER FD: %d", __ctx->fd_unixdomain); - + TFE_LOG_INFO(__ctx->logger, "KNI acceptor unixdomain file: %s", __ctx->str_unixdomain_file); return __ctx; __errout: diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 2c90b4e..c0b2e18 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -28,47 +28,80 @@ #include #include #include +#include -static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1}; +static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1}; +/* Global Resource */ +void * g_default_logger = NULL; +struct tfe_proxy * g_default_proxy = NULL; +/* Per thread resource */ +thread_local unsigned int __currect_thread_id = 0; +thread_local void * __currect_default_logger = NULL; -const char * module_name_pxy = "TFE_PXY"; -extern struct tfe_instance * g_tfe_instance; +struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx) +{ + unsigned int min_thread_id = 0; + unsigned int min_load = 0; -__thread int __currect_thread_id; + for(unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) + { + struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid]; + min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id; + min_load = min_load > thread_ctx->load ? thread_ctx->load : min_load; + } + + ctx->work_threads[min_thread_id]->load++; + return ctx->work_threads[min_thread_id]; +} + +void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx) +{ + thread_ctx->load--; +} + +int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para) +{ + tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); + + struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); + tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, ¶->session_type, sizeof(para->session_type)); + + /* FOR DEBUG */ + if (para->passthrough || ctx->tcp_all_passthrough) + { + bool __true = true; + enum tfe_session_proto __session_type = SESSION_PROTO_PLAIN; + + tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true)); + tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); + } + + tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd); + + TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted", + stream, para->downstream_fd, para->upstream_fd, para->session_type); + + return 0; +} + +void tfe_proxy_loopbreak(tfe_proxy * ctx) +{ + event_base_loopbreak(ctx->evbase); +} + +void tfe_proxy_free(tfe_proxy * ctx) +{ + return; +} static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { return; } -/* - * Thread entry point; runs the event loop of the event base. - * Does not exit until the libevent loop is broken explicitly. - */ -static void * __tfe_thrmgr_thread_entry(void * arg) -{ - struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; - struct timeval timer_delay = {60, 0}; - struct event * ev; - ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); - - if (!ev) return (void *)NULL; - - evtimer_add(ev, &timer_delay); - ctx->running = 1; - - __currect_thread_id = ctx->thread_id; - event_base_dispatch(ctx->evbase); - event_free(ev); - - return (void *)NULL; -} - - - -static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg) +static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; switch (fd) @@ -81,178 +114,124 @@ static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg) case SIGUSR1: break; case SIGPIPE: - TFE_LOG_ERROR(ctx->main_logger, "Warning: Received SIGPIPE; ignoring.\n"); + TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); break; default: - TFE_LOG_ERROR(ctx->main_logger, "Warning: Received unexpected signal %i\n", fd); + TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd); break; } } -static void proxy_gc_cb(evutil_socket_t fd, short what, void * arg) +static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; (void)fd; (void)what; } -unsigned int select_work_thread(struct tfe_proxy * pxy) +static void * __thread_ctx_entry(void * arg) { - unsigned int min_thread_id = 0; - size_t min_load = pxy->work_threads[min_thread_id].load; + struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; + struct timeval timer_delay = {60, 0}; - for (unsigned thread_id = 1; thread_id < pxy->nr_work_threads; thread_id++) + struct event * ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); + if (unlikely(ev == NULL)) { - if (min_load > pxy->work_threads[thread_id].load) - { - min_load = pxy->work_threads[thread_id].load; - min_thread_id = thread_id; - } + TFE_LOG_ERROR(g_default_logger, "Failed at creating dummy event for thread %u", ctx->thread_id); + exit(EXIT_FAILURE); } - pxy->work_threads[min_thread_id].load++; - return min_thread_id; -} -/* - * Callback for accept events on the socket listener bufferevent. - */ + evtimer_add(ev, &timer_delay); + ctx->running = 1; + __currect_thread_id = ctx->thread_id; -int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para) -{ - unsigned int worker_tid = select_work_thread(ctx); - tfe_thread_ctx * worker_thread_ctx = &ctx->work_threads[worker_tid]; + TFE_LOG_INFO(g_default_logger, "Thread %u is running...", ctx->thread_id); + event_base_dispatch(ctx->evbase); + event_free(ev); - struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); - tfe_stream_init_by_fds(stream, para->session_type, para->downstream_fd, para->upstream_fd); - - return 0; + return (void *)NULL; } -/* - * Set up the core event loop. - * Socket clisock is the privsep client socket used for binding to ports. - * Returns ctx on success, or NULL on error. - */ -struct tfe_proxy * tfe_proxy_new(const char * profile) +struct tfe_thread_ctx * __thread_ctx_create(struct tfe_proxy * proxy, unsigned int thread_id) { - struct tfe_proxy * proxy = ALLOC(struct tfe_proxy, 1); - assert(proxy != NULL); + struct tfe_thread_ctx * __thread_ctx = ALLOC(struct tfe_thread_ctx, 1); + assert(__thread_ctx != NULL); - struct timeval gc_delay = {60, 0}; + __thread_ctx->thread_id = thread_id; + __thread_ctx->evbase = event_base_new(); - /* adds locking, only required if accessed from separate threads */ - evthread_use_pthreads(); - event_enable_debug_mode(); - - proxy->evbase = event_base_new(); - proxy->nr_modules = 2; - proxy->modules = ALLOC(struct tfe_plugin, proxy->nr_modules); - - proxy->modules[0].proto = APP_PROTO_HTTP1; - proxy->modules[1].proto = APP_PROTO_HTTP2; - - proxy->work_threads = ALLOC(struct tfe_thread_ctx, proxy->nr_work_threads); - proxy->io_mod = kni_acceptor_init(proxy, profile, NULL); - - for (unsigned int i = 0; i < proxy->nr_work_threads; i++) + int ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx); + if (unlikely(ret < 0)) { - proxy->work_threads[i].thread_id = i; - proxy->work_threads[i].evbase = event_base_new(); - proxy->work_threads[i].nr_modules = proxy->nr_modules; - proxy->work_threads[i].modules = proxy->modules; + TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d: %s", strerror(errno)); + goto __errout; } - //Todo: Not handle signal if have mutliple proxy instance. - for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) - { - proxy->sev[i] = evsignal_new(proxy->evbase, signals[i], proxy_signal_cb, proxy); - if (!proxy->sev[i]) goto error_out; - evsignal_add(proxy->sev[i], NULL); - } + return __thread_ctx; - proxy->gcev = event_new(proxy->evbase, -1, EV_PERSIST, proxy_gc_cb, proxy); - if (!proxy->gcev) goto error_out; - - evtimer_add(proxy->gcev, &gc_delay); - return proxy; - -error_out: - if (proxy->gcev) - { - event_free(proxy->gcev); - } - - for (size_t i = 0; i < (sizeof(proxy->sev) / sizeof(proxy->sev[0])); i++) - { - if (proxy->sev[i]) - { - event_free(proxy->sev[i]); - } - } - - for (typeof(proxy->nr_work_threads) i = 0; i < proxy->nr_work_threads; i++) - { - proxy->work_threads[i].thread_id = i; - event_base_free(proxy->work_threads[i].evbase); - } - - event_base_free(proxy->evbase); - - free(proxy); +__errout: + if (__thread_ctx != NULL && __thread_ctx->evbase != NULL) event_base_free(__thread_ctx->evbase); + if (__thread_ctx != NULL) free(__thread_ctx); return NULL; } -/* - * Run the event loop. Returns when the event loop is cancelled by a signal - * or on failure. - */ -void tfe_proxy_run(struct tfe_proxy * proxy) +int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) { - unsigned int thread_id; - for (thread_id = 0; thread_id < proxy->nr_work_threads; thread_id++) - { - if (pthread_create(&(proxy->work_threads[thread_id].thr), NULL, - __tfe_thrmgr_thread_entry, &(proxy->work_threads[thread_id]))) - { - MESA_handle_runtime_log(proxy->main_logger, RLOG_LV_FATAL, proxy->name, "pthread_create failed."); - } - - while (!proxy->work_threads[thread_id].running) - { - sched_yield(); - } - } - - event_base_dispatch(proxy->evbase); + MESA_load_profile_uint_def(profile, "main", "nr_worker_threads", &proxy->nr_work_threads, 1); + MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0); + return 0; } -/* - * Break the loop of the proxy, causing the tfe_proxy_run to return. - */ -void proxy_loopbreak(tfe_proxy * ctx) -{ - event_base_loopbreak(ctx->evbase); -} -/* - * Free the proxy data structures. - */ -void proxy_free(tfe_proxy * ctx) -{ - -} +#define CHECK_OR_EXIT(condition, fmt, ...) \ +do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); exit(EXIT_FAILURE); } } while(0) \ int main(int argc, char *argv[]) { - const char* main_profile="./conf/tfe_main.conf"; + const char* main_profile="./conf/tfe.conf"; - tfe_proxy *proxy=NULL; - void* wcfg_handle=NULL; + g_default_logger = MESA_create_runtime_log_handle("log/tfe.log", RLOG_LV_DEBUG); + if (unlikely(g_default_logger == NULL)) + { + TFE_LOG_ERROR(g_default_logger, "Failed at creating default logger: %s", "log/tfe.log"); + exit(EXIT_FAILURE); + } - //TODO: Initiate Local Cert Cache, Decryption Mirror, Field Stat, Logger and etc. - //NOTICE: Maat, Cert Store,Tango Cache are initiated in bussiness plugin. + /* PROXY INSTANCE */ + g_default_proxy = ALLOC(struct tfe_proxy, 1); + assert(g_default_proxy); - proxy=tfe_proxy_new(main_profile); - tfe_proxy_run(proxy); - proxy_free(proxy); + /* CONFIG */ + int ret = tfe_proxy_config(g_default_proxy, main_profile); + CHECK_OR_EXIT(ret == 0, "Failed at loading profile %s, Exit.", main_profile); + + /* LOGGER */ + g_default_proxy->logger = g_default_logger; + + /* MAIN THREAD EVBASE */ + g_default_proxy->evbase = event_base_new(); + CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit."); + + /* GC EVENT */ + g_default_proxy->gcev = event_new(g_default_proxy->evbase, -1, EV_PERSIST, __gc_handler_cb, g_default_proxy); + CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. "); + + /* MODULE INIT */ + g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); + CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); + + struct timeval gc_delay = {60, 0}; + evtimer_add(g_default_proxy->gcev , &gc_delay); + + /* WORKER THREAD */ + for(unsigned tid = 0; tid < g_default_proxy->nr_work_threads; tid++) + { + g_default_proxy->work_threads[tid] = __thread_ctx_create(g_default_proxy, tid); + CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid); + } + + TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); + event_base_dispatch(g_default_proxy->evbase); + + return 0; } diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 070d047..4f68eec 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -27,17 +27,16 @@ #include #include #include -#include #include #ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT -#define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024) +#define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024) #endif /* forward declaration of libevent callbacks */ -static void tfe_stream_readcb(struct bufferevent *, void *); -static void tfe_stream_writecb(struct bufferevent *, void *); -static void tfe_stream_eventcb(struct bufferevent *, short, void *); +static void __stream_bev_readcb(struct bufferevent *, void *); +static void __stream_bev_writecb(struct bufferevent *, void *); +static void __stream_bev_eventcb(struct bufferevent *, short, void *); static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream) { @@ -46,17 +45,17 @@ static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_s static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { - return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); } static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { - return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); } -static inline enum tfe_conn_dir __DIR(struct tfe_stream_private * _stream, struct bufferevent * bev) +static inline enum tfe_conn_dir __BEV_DIR(struct tfe_stream_private * _stream, struct bufferevent * bev) { - return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM); + return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM); } static inline bool __IS_SSL(struct tfe_stream_private * _stream) @@ -88,8 +87,6 @@ int tfe_stream_preempt(const struct tfe_stream * stream) return 0; } - - struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) { struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); @@ -131,15 +128,152 @@ int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, co return ret; } +static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, struct bufferevent * bev) +{ + struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); + __conn_private->bev = bev; + __conn_private->fd = bufferevent_getfd(bev); + + bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream); + bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); + + return __conn_private; +} + +evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) +{ + evutil_socket_t __to_release_fd = conn->fd; + conn->fd = 0; + return __to_release_fd; +} + +static void __conn_private_destory(struct tfe_conn_private * conn) +{ + bufferevent_disable(conn->bev, EV_READ | EV_WRITE); + bufferevent_free(conn->bev); + + if (conn->fd > 0) evutil_closesocket(conn->fd); + free(conn); +} + +static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + struct tfe_conn_private * this_conn = __THIS_CONN(_stream, __BEV_DIR(_stream, bev)); + struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, __BEV_DIR(_stream, bev)); + + struct evbuffer * __input_buffer = bufferevent_get_input(bev); + if (peer_conn == NULL) + { + evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer)); + return; + } + + struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); + evbuffer_add_buffer(__output_buffer, __input_buffer); +} + +static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + struct tfe_conn_private ** ref_this_conn{}; + struct tfe_conn_private ** ref_peer_conn{}; + + if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM) + { + ref_this_conn = &_stream->conn_upstream; + ref_peer_conn = &_stream->conn_downstream; + } + + if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM) + { + ref_this_conn = &_stream->conn_downstream; + ref_peer_conn = &_stream->conn_upstream; + } + + struct evbuffer * __output_buffer = bufferevent_get_output(bev); + assert(__output_buffer != NULL); + + if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0) + { + __conn_private_destory(*ref_this_conn); + *ref_this_conn = NULL; + } + + if (*ref_peer_conn == NULL && *ref_this_conn == NULL) + { + tfe_stream_destory(_stream); + } + + return; +} + +static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short events, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + struct tfe_conn_private ** ref_this_conn{}; + struct tfe_conn_private ** ref_peer_conn{}; + + if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM) + { + ref_this_conn = &_stream->conn_upstream; + ref_peer_conn = &_stream->conn_downstream; + } + + if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM) + { + ref_this_conn = &_stream->conn_downstream; + ref_peer_conn = &_stream->conn_upstream; + } + + if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) + { + if (evbuffer_get_length(bufferevent_get_input(bev))) + { + __stream_bev_passthrough_readcb(bev, arg); + } + + goto __close_connection; + } + + return; + +__close_connection: + if (*ref_peer_conn != NULL) + { + struct bufferevent * __peer_bev = (*ref_peer_conn)->bev; + struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev); + + if (evbuffer_get_length(__peer_output_buffer) == 0) + { + __conn_private_destory(*ref_peer_conn); + *ref_peer_conn = NULL; + } + } + + if (*ref_this_conn != NULL) + { + __conn_private_destory(*ref_this_conn); + *ref_this_conn = NULL; + } + + if (*ref_this_conn == NULL && *ref_peer_conn == NULL) + { + tfe_stream_destory(_stream); + } + + return; +} + /* * Callback for read events on the up- and downstream connection bufferevents. * Called when there is data ready in the input evbuffer. */ -static void tfe_stream_readcb(struct bufferevent * bev, void * arg) +static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __DIR(_stream, bev); + enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); @@ -235,10 +369,10 @@ static void tfe_stream_readcb(struct bufferevent * bev, void * arg) * Called when either all data from the output evbuffer has been written, * or if the outbuf is only half full again after having been full. */ -static void tfe_stream_writecb(struct bufferevent * bev, void * arg) +static void __stream_bev_writecb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __DIR(_stream, bev); + enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); @@ -257,10 +391,10 @@ static void tfe_stream_writecb(struct bufferevent * bev, void * arg) * Callback for meta events on the up- and downstream connection bufferevents. * Called when EOF has been reached, a connection has been made, and on errors. */ -static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg) +static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __DIR(_stream, bev); + enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); @@ -279,9 +413,10 @@ static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * ar if (events & BEV_EVENT_EOF) { //generate a 0 size read callback to notify plugins. - tfe_stream_readcb(bev, arg); + __stream_bev_readcb(bev, arg); this_conn->closed = 1; } + if (peer_conn->closed == 1 && this_conn->closed == 1) { reason = REASON_PASSIVE_CLOSED; @@ -315,7 +450,17 @@ static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stre goto __errout; } - bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream); + if (stream->passthough) + { + bufferevent_setcb(__conn_private->bev, __stream_bev_passthrough_readcb, + __stream_bev_passthrough_writecb, __stream_bev_passthrough_eventcb, stream); + } + else + { + bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, + __stream_bev_writecb, __stream_bev_eventcb, stream); + } + bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); return __conn_private; @@ -324,28 +469,7 @@ __errout: return NULL; } -static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, struct bufferevent * bev) -{ - struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); - __conn_private->bev = bev; - __conn_private->fd = bufferevent_getfd(bev); - bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream); - bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); - return __conn_private; -} - -evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) -{ - evutil_socket_t __to_release_fd = conn->fd; - conn->fd = 0; - return __to_release_fd; -} - -static void __conn_private_destory(struct tfe_conn_private * conn) -{ - return; -} void ssl_downstream_create_on_success(future_result_t * result, void * user) { @@ -458,13 +582,18 @@ void tfe_stream_destory(struct tfe_stream_private * stream) thread->load--; } -void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type, - evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) +void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); struct event_base * ev_base = _stream->thread_ref->evbase; - if (session_type == SESSION_PROTO_PLAIN) + _stream->defer_fd_downstream = fd_downstream; + _stream->defer_fd_upstream = fd_upstream; + + evutil_make_socket_nonblocking(fd_downstream); + evutil_make_socket_nonblocking(fd_upstream); + + if (_stream->session_type == SESSION_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create(_stream, fd_downstream); _stream->conn_upstream = __conn_private_create(_stream, fd_upstream); @@ -473,7 +602,7 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto s assert(_stream->conn_upstream != NULL); } - if (session_type == SESSION_PROTO_SSL) + if (_stream->session_type == SESSION_PROTO_SSL) { _stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler; @@ -483,10 +612,26 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto s /* Defer setup conn_downstream & conn_upstream in async callbacks. */ ssl_async_upstream_create(_stream->future_upstream_create, _stream->ssl_mgr, fd_upstream, fd_downstream, ev_base); - - _stream->defer_fd_downstream = fd_downstream; - _stream->defer_fd_upstream = fd_upstream; } - _stream->session_type = session_type; + return; +} + + +int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg) +{ + struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); + + if (opt == TFE_STREAM_OPT_SESSION_TYPE) + { + assert(sz_arg == sizeof(enum tfe_session_proto)); + _stream->session_type = *(enum tfe_session_proto *)arg; + } + else if (opt == TFE_STREAM_OPT_PASSTHROUGH) + { + assert(sz_arg == sizeof(bool)); + _stream->passthough = *(bool *)arg; + } + + return 0; }