diff --git a/common/include/tfe_future.h b/common/include/tfe_future.h index 80f2b27..c428f08 100644 --- a/common/include/tfe_future.h +++ b/common/include/tfe_future.h @@ -1,3 +1,4 @@ +#pragma once enum e_future_error { diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index 2669423..5970e45 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -44,7 +44,6 @@ struct tfe_conn struct tfe_stream { - struct tfe_conn upstream; struct tfe_conn downstream; }; diff --git a/platform/include/internal/kni.h b/platform/include/internal/kni.h index 6302989..56404ff 100644 --- a/platform/include/internal/kni.h +++ b/platform/include/internal/kni.h @@ -1,2 +1,7 @@ -void* kni_init(const char *unix_domain_path, struct event_base *attach); +#pragma once +struct tfe_proxy; +struct kni_acceptor_ctx; + +struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger); +void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx); diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h new file mode 100644 index 0000000..79cfacd --- /dev/null +++ b/platform/include/internal/proxy.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +struct tfe_proxy; +struct tfe_proxy_accept_para +{ + /* Both upstream and downstream FDs */ + evutil_socket_t upstream_fd; + evutil_socket_t downstream_fd; + + /* Session Type */ + enum tfe_session_proto session_type; +}; + +struct tfe_proxy * tfe_proxy_new(const char * profile); +int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para); +void tfe_proxy_run(struct tfe_proxy * proxy); diff --git a/platform/include/internal/ssl_stream.h b/platform/include/internal/ssl_stream.h index 733bef9..66fefdd 100644 --- a/platform/include/internal/ssl_stream.h +++ b/platform/include/internal/ssl_stream.h @@ -1,5 +1,7 @@ #pragma once -#include "tfe_future.h" + +#include +#include struct ssl_client_hello { @@ -7,11 +9,14 @@ struct ssl_client_hello char* sni; char* cipher_suites; }; + + struct ssl_client_hello* ssl_get_peek_result(future_result_t* result); void ssl_free_peek_result(struct ssl_client_hello* client_hello); void ssl_async_peek_client_hello(struct future* future, evutil_socket_t fd, struct event_base *evbase); -void ssl_async_connect_origin(struct future* future, const struct ssl_client_hello* client_hello, evutil_socket_t fd, const char* sni, struct event_base *evbase); +void ssl_async_connect_origin(struct future* future, const struct ssl_client_hello* client_hello, + evutil_socket_t fd, const char* sni, struct event_base *evbase); struct ssl_downstream * ssl_downstream_create(); void ssl_upstream_free(struct ssl_upstream * p); diff --git a/platform/include/internal/stream.h b/platform/include/internal/stream.h index 9ddb678..90110db 100644 --- a/platform/include/internal/stream.h +++ b/platform/include/internal/stream.h @@ -78,6 +78,8 @@ struct tfe_conn_private struct tfe_stream_private { struct tfe_stream head; + struct tfe_proxy *proxy; + enum tfe_session_proto session_type; struct tfe_conn_private conn_upstream; struct tfe_conn_private conn_downstream; @@ -96,11 +98,14 @@ struct tfe_stream_private uint8_t is_plugin_opened; int calling_idx; + size_t forward_bytes; size_t defere_bytes; size_t drop_bytes; + enum tfe_app_proto app_proto; int plugin_num; + struct plugin_ctx * plug_ctx; unsigned char passthrough; /* 1 if SSL passthrough is active */ diff --git a/platform/src/kni.cpp b/platform/src/kni.cpp index 570fb65..4a74a9f 100644 --- a/platform/src/kni.cpp +++ b/platform/src/kni.cpp @@ -12,6 +12,8 @@ #include #include #include +#include +#include #ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT #define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler" @@ -38,6 +40,7 @@ struct kni_tlv_info struct kni_acceptor_ctx { /* INPUT */ + struct tfe_proxy * proxy; const char * profile; void * logger; @@ -68,7 +71,8 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user) { struct kni_acceptor_ctx * __ctx = (struct kni_acceptor_ctx *)user; struct cmsghdr * __cmsghdr; - int * __fds; + struct tfe_proxy_accept_para __accept_para; + int * __fds = NULL; assert(__ctx != NULL && __ctx->thread == pthread_self()); assert(what & EV_READ); @@ -134,7 +138,15 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user) goto __close_kni_connection; } - /* Call Proxy's Callback */ + __accept_para.session_type = __session_proto; + __accept_para.downstream_fd = __fds[0]; + __accept_para.upstream_fd = __fds[1]; + + if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0) + { + goto __drop_recieved_fds; + } + return; __close_kni_connection: @@ -227,7 +239,7 @@ void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx) return; } -struct kni_acceptor_ctx * kni_acceptor_init(const char *profile, void *logger) +struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger) { struct kni_acceptor_ctx * __ctx = ALLOC(struct kni_acceptor_ctx, 1); struct sockaddr_un __sockaddr_un; diff --git a/platform/src/main.cpp b/platform/src/main.cpp index f92e192..dacdd6a 100644 --- a/platform/src/main.cpp +++ b/platform/src/main.cpp @@ -13,12 +13,3 @@ #include #include #include - -extern struct tfe_instance __g_tfe_instance; -extern struct tfe_config __g_tfe_config; - -struct tfe_instance* g_tfe_instance = &__g_tfe_instance; -struct tfe_config * g_tfe_cfg = &__g_tfe_config; - -const char* module_name="TFE"; - diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 10e2a3f..a79e449 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -1,3 +1,6 @@ +/* + * Proxy engine, built around libevent 2.x. + */ #include #include @@ -22,48 +25,9 @@ #include #include #include -#include +#include #include - -/* - * Proxy engine, built around libevent 2.x. - */ - -#define TFE_BACKLOG_DEFAULT 20 - -const char * module_name_pxy = "TFE_PXY"; -extern struct tfe_instance * g_tfe_instance; - -__thread int __currect_thread_id; - -static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) -{ - return; -} - -/* - * Thread entry point; runs the event loop of the event base. - * Does not exit until the libevent loop is broken explicitly. - */ -static void * __tfe_thrmgr_thread_entry(void * arg) -{ - struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; - struct timeval timer_delay = {60, 0}; - - struct event * ev; - ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); - - if (!ev) return (void *)NULL; - - evtimer_add(ev, &timer_delay); - ctx->running = 1; - - __currect_thread_id = ctx->thread_id; - event_base_dispatch(ctx->evbase); - event_free(ev); - - return (void *)NULL; -} +#include static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1}; @@ -89,9 +53,41 @@ struct tfe_proxy void * io_mod; }; +const char * module_name_pxy = "TFE_PXY"; +extern struct tfe_instance * g_tfe_instance; + +__thread int __currect_thread_id; + +static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) +{ + return; +} /* - * Signal handler for SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGPIPE and SIGUSR1. + * Thread entry point; runs the event loop of the event base. + * Does not exit until the libevent loop is broken explicitly. */ +static void * __tfe_thrmgr_thread_entry(void * arg) +{ + struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg; + struct timeval timer_delay = {60, 0}; + + struct event * ev; + ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL); + + if (!ev) return (void *)NULL; + + evtimer_add(ev, &timer_delay); + ctx->running = 1; + + __currect_thread_id = ctx->thread_id; + event_base_dispatch(ctx->evbase); + event_free(ev); + + return (void *)NULL; +} + + + static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; @@ -105,10 +101,10 @@ static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg) case SIGUSR1: break; case SIGPIPE: - TFE_LOG_ERROR("Warning: Received SIGPIPE; ignoring.\n"); + TFE_LOG_ERROR(ctx->main_logger, "Warning: Received SIGPIPE; ignoring.\n"); break; default: - TFE_LOG_ERROR("Warning: Received unexpected signal %i\n", fd); + TFE_LOG_ERROR(ctx->main_logger, "Warning: Received unexpected signal %i\n", fd); break; } } @@ -140,19 +136,20 @@ unsigned int select_work_thread(struct tfe_proxy * pxy) /* * Callback for accept events on the socket listener bufferevent. */ -static void io_mod_accept_cb(evutil_socket_t upstream_fd, evutil_socket_t downstream_fd, - enum tfe_session_proto session_type, void * arg) + +int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para) { - struct tfe_proxy * pxy = (struct tfe_proxy *) arg; + unsigned int worker_tid = select_work_thread(ctx); + tfe_thread_ctx * worker_thread_ctx = &ctx->work_threads[worker_tid]; - unsigned int worker_tid = select_work_thread(pxy); - tfe_thread_ctx * worker_thread_ctx = &pxy->work_threads[worker_tid]; + struct tfe_stream_private * stream = tfe_stream_create(para->upstream_fd, + para->downstream_fd, para->session_type, worker_thread_ctx); - struct tfe_stream_private * stream = tfe_stream_create(upstream_fd, - downstream_fd, session_type, worker_thread_ctx); + if (stream == NULL) goto __errout; + tfe_stream_setup(stream); - assert(stream != NULL); - return tfe_stream_setup(stream); +__errout: + return -1; } /* @@ -160,7 +157,7 @@ static void io_mod_accept_cb(evutil_socket_t upstream_fd, evutil_socket_t downst * Socket clisock is the privsep client socket used for binding to ports. * Returns ctx on success, or NULL on error. */ -struct tfe_proxy * tfe_proxy_new(tfe_config * cfg) +struct tfe_proxy * tfe_proxy_new(const char * profile) { struct tfe_proxy * proxy = ALLOC(struct tfe_proxy, 1); assert(proxy != NULL); @@ -182,6 +179,7 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * cfg) proxy->modules[1].proto = APP_PROTO_HTTP2; proxy->work_threads = ALLOC(struct tfe_thread_ctx, proxy->nr_work_threads); + proxy->io_mod = kni_acceptor_init(proxy, profile, NULL); for (unsigned int i = 0; i < proxy->nr_work_threads; i++) { @@ -202,8 +200,7 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * cfg) } proxy->gcev = event_new(proxy->evbase, -1, EV_PERSIST, proxy_gc_cb, proxy); - if (!proxy->gcev) - goto error_out; + if (!proxy->gcev) goto error_out; evtimer_add(proxy->gcev, &gc_delay); return proxy; @@ -271,8 +268,8 @@ void proxy_loopbreak(tfe_proxy * ctx) */ void proxy_free(tfe_proxy * ctx) { -} +} int main(int argc, char *argv[]) { @@ -284,11 +281,7 @@ int main(int argc, char *argv[]) //TODO: Initiate Local Cert Cache, Decryption Mirror, Field Stat, Logger and etc. //NOTICE: Maat, Cert Store,Tango Cache are initiated in bussiness plugin. -#if 0 - g_tfe_instance=ALLOC(struct tfe_instance,1); - proxy=tfe_proxy_new(g_tfe_cfg); -#endif - + proxy=tfe_proxy_new(main_profile); tfe_proxy_run(proxy); proxy_free(proxy); } diff --git a/platform/src/tfe_stream.cpp b/platform/src/tfe_stream.cpp index 820ab3d..0eeb96e 100644 --- a/platform/src/tfe_stream.cpp +++ b/platform/src/tfe_stream.cpp @@ -23,6 +23,8 @@ #include #include #include + +#include #include #include @@ -481,29 +483,59 @@ void peek_client_hello_on_fail(enum e_future_error err, const char * what, void assert(0); } -/* - * Callback for accept events on the socket listener bufferevent. - * Called when a new incoming connection has been accepted. - * Initiates the connection to the server. The incoming connection - * from the client is not being activated until we have a successful - * connection to the server, because we need the server's certificate - * in order to set up the SSL session to the client. - * For consistency, plain TCP works the same way, even if we could - * start reading from the client while waiting on the connection to - * the server to connect. - */ +int ssl_stream_setup(struct tfe_stream_private * _stream) +{ + return 0; +} + +int __plain_stream_conn_private_init(struct tfe_stream_private * _stream, + struct tfe_conn_private * _conn, evutil_socket_t fd) +{ + struct tfe_proxy * proxy = _stream->proxy; + struct event_base * ev_base = _stream->thrmgr_ref->evbase; + + _conn->bev = bufferevent_socket_new(ev_base, fd, BEV_OPT_DEFER_CALLBACKS) + _conn->fd = fd; + _conn->closed = 0; + _conn->need_shutdown = 0; + _conn->on_writing = 0; +} + +int plain_stream_setup(struct tfe_stream_private * _stream) +{ + return 0; +} + void tfe_stream_setup(struct tfe_stream_private * _stream) { struct future * f_sni = NULL; tfe_thread_ctx * thread = _stream->thrmgr_ref; + + if (_stream->session_type == SESSION_PROTO_SSL) + { + _stream->ssl_downstream = ssl_downstream_create(); + _stream->async_future = future_create(peek_client_hello_on_succ, peek_client_hello_on_fail, _stream); + ssl_async_peek_client_hello(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream, + _stream->thrmgr_ref->evbase); + } + else if (_stream->session_type == SESSION_PROTO_PLAIN) + { + bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); + bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); + bufferevent_enable(_stream->head.upstream.bev, EV_READ | EV_WRITE); + bufferevent_enable(_stream->head.downstream.bev, EV_READ | EV_WRITE); + } + else + { + + } + + switch (_stream->session_type) { case SESSION_PROTO_SSL: // for SSL, defer dst connection setup to initial_readcb - _stream->ssl_downstream = ssl_downstream_create(); - _stream->async_future = future_create(peek_client_hello_on_succ, peek_client_hello_on_fail, _stream); - ssl_async_peek_client_hello(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream, - _stream->thrmgr_ref->evbase); + thread->stat.value[SSL_NUM]++; break; default: