diff --git a/interface/tfe_stream.h b/interface/tfe_stream.h index d9dd081..3f3dbfd 100644 --- a/interface/tfe_stream.h +++ b/interface/tfe_stream.h @@ -40,47 +40,68 @@ struct tfe_stream struct tfe_conn upstream; struct tfe_conn downstream; - void* application_pme; }; +enum tfe_stream_action +{ + ACTION_FORWARD_DATA, + ACTION_DEFER_DATA, + ACTION_DROP_DATA +}; +enum tfe_stream_action_opt +{ + ACTION_OPT_FOWARD_BYTES, //value is size_t, default: forward entire data + ACTION_OPT_DEFER_TIME_TV, //value is "struct timeval " which defines in , default: time defer is not enabled + ACTION_OPT_DEFER_BYTES, //value is size_t, default: defer entire data + ACTION_OPT_DROP_BYTES //value is size_t, default: drop entire data +}; +enum tfe_stream_close_reason +{ + REASON_PASSIVE_CLOSED, + REASON_ACTIVE_CLOSED, + REASON_ERROR +}; +int tfe_stream_action_set_opt(const struct tfe_stream* stream,enum tfe_stream_action_opt type, void* value, size_t size); +/* +@return 0 if successful, or -1 if an error occurred +*/ + +int tfe_stream_write(const struct tfe_stream* stream, enum tfe_conn_dir dir, const unsigned char *data, size_t len); + +struct tfe_stream_write_ctx{}; +//following tfe_stream_write_xx functions are NOT thread safe, MUST be called in the stream process thread. +struct tfe_stream_write_ctx* tfe_stream_write_frag_start(const struct tfe_stream* stream, enum tfe_conn_dir dir); +/* +@return 0 if successful, or -1 if an error occurred +*/ +int tfe_stream_write_frag(struct tfe_stream_write_ctx* w_ctx,const unsigned char *data, size_t size); +void tfe_stream_write_frag_end(struct tfe_stream_write_ctx* w_ctx); //Return 1 for identify as its traffic; //Return 0 for unknown traffic; -typedef int proto_pend_cb_t(const struct tfe_stream* stream, struct evbuffer *data, void **pme); +typedef tfe_stream_action stream_open_cb_t(const struct tfe_stream* stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char *data, size_t len, void **pme); +typedef tfe_stream_action stream_data_cb_t(const struct tfe_stream* stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char *data, size_t len, void **pme); +typedef void stream_close_cb_t(const struct tfe_stream* stream, unsigned int thread_id, enum tfe_stream_close_reason reason, void **pme); -enum tfe_proto_action -{ - PROTO_ATCION_FORWARD, - PROTO_ACTION_DEFER, - PROTO_ACTION_STEAL, - PROTO_ACTION_PASSTHROUGH, - PROTO_ACTION_CLOSE -}; -enum tfe_proto_action_opt -{ - ACTION_OPT_DEFER_TIME_TV, //value is "struct timeval " which defines in , default: time defer is not enabled - ACTION_OPT_DEFER_BYTES, //value is size_t, default: defer entire evbufer - ACTION_OPT_FOWARD_BYTES, //value is size_t, default: forward entire evbufer - ACTION_OPT_CLOSE_DIR //value is enum tfe_conn_dir, default: close both side. -}; -int tfe_proto_action_set_opt(const struct tfe_stream* stream,enum tfe_proto_action_opt type, void* value, size_t size); -typedef tfe_proto_action proto_read_cb_t(const struct tfe_stream* stream, struct evbuffer *data, enum tfe_conn_dir dir, void **pme); - -typedef void proto_close_cb_t(const struct tfe_stream* stream, int ev, void **pme); +void tfe_stream_detach(const struct tfe_stream* stream); +int tfe_stream_preempt(const struct tfe_stream* stream); +int stream_shutdown(const struct tfe_stream* stream);//close both sides of the stream. +int stream_shutdown_dir(const struct tfe_stream* stream, enum tfe_conn_dir dir); //typedef int proto_onwrite_cb_t(struct tfe_stream*, struct evbuffer *data, void **pme); -struct tfe_proto_module +struct tfe_plugin { char symbol[TFE_SYMBOL_MAX]; - proto_pend_cb_t *on_pend; - proto_read_cb_t *on_read; - proto_close_cb_t *on_close; + enum tfe_app_proto proto; + stream_open_cb_t* on_open; + stream_data_cb_t* on_data; + stream_close_cb_t* on_close; // proto_onwrite_cb_t *onwrite; }; int tfe_io_write(struct pxy_conn_desc* dest,int dir,struct evbuffer *data); -int tfe_xxx_proto_init(struct tfe_proto_module*m); +int tfe_xxx_proto_init(struct tfe_plugin*m); diff --git a/src/ssl_stream.cpp b/src/ssl_stream.cpp index 34a8544..685fc74 100644 --- a/src/ssl_stream.cpp +++ b/src/ssl_stream.cpp @@ -885,6 +885,8 @@ struct ssl_upstream* ssl_upstream_create(tfe_config*opts, const char* sni) void ssl_upstream_free(struct ssl_upstream* p) { X509_free(p->orig_cert); + //todo close ssl with a callback. + //SSL_free(ctx->ssl); } struct ssl_downstream* ssl_downstream_create(void) { diff --git a/src/tfe_proxy.cpp b/src/tfe_proxy.cpp index 2d43ad6..5f0d927 100644 --- a/src/tfe_proxy.cpp +++ b/src/tfe_proxy.cpp @@ -37,13 +37,6 @@ extern struct tfe_instance* g_tfe_instance; __thread int __currect_thread_id; -struct tfe_thread_manager_ctx -{ - tfe_config *opts; - unsigned int nr_thread; - - tfe_thread_ctx* thr_ctx; -}; void free_thread_manager(struct tfe_thread_manager_ctx* ctx) { @@ -123,6 +116,8 @@ struct tfe_proxy cert_mgr* cert_mgr; struct sess_cache* dsess_cache; struct sess_cache* ssess_cache; + int module_num; + struct tfe_plugin* modules; }; /* @@ -261,6 +256,13 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * opts) proxy->dsess_cache=session_cache_init(); proxy->ssess_cache=session_cache_init(); + proxy->module_num=2; + proxy->modules=ALLOC(struct tfe_plugin,proxy->module_num); + proxy->modules[0].proto=APP_PROTO_HTTP1; + //todo: setup each protocol module. + //proxy->modules[0].on_read=xxx; + proxy->modules[1].proto=APP_PROTO_HTTP2; + proxy->work_threads=ALLOC(struct tfe_thread_ctx, proxy->thread_num); for(i=0;ithread_num;i++) { @@ -270,7 +272,8 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * opts) proxy->work_threads[i].cert_mgr=proxy->cert_mgr;//reference proxy->work_threads[i].dsess_cache=proxy->dsess_cache; proxy->work_threads[i].ssess_cache=proxy->ssess_cache; - + proxy->work_threads[i].module_num=proxy->module_num; + proxy->work_threads[i].modules=proxy->modules } //Todo: Not handle signal if have mutliple proxy instance. for (i = 0; i < (sizeof(signals) / sizeof(int)); i++) diff --git a/src/tfe_stream.cpp b/src/tfe_stream.cpp index abf0723..bd8e41f 100644 --- a/src/tfe_stream.cpp +++ b/src/tfe_stream.cpp @@ -1,1165 +1,507 @@ - -#include "tfe_stream.h" -#include "tfe_util.h" -#include "opts.h" -#include "attrib.h" - - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#define STREAM_EVBASE(s) ((s)->thrmgr_ref->evbase) -/* - * Maximum size of data to buffer per connection direction before - * temporarily stopping to read data from the other end. - */ -#define OUTBUF_LIMIT (1024*1024) - -/* - * Print helper for logging code. - */ -#define STRORDASH(x) (((x)&&*(x))?(x):"-") - -/* - * Context used for all server http_sessions_. - */ -#ifdef USE_SSL_SESSION_ID_CONTEXT -static unsigned long ssl_session_context = 0x31415926; -#endif /* USE_SSL_SESSION_ID_CONTEXT */ - -#define WANT_CONNECT_LOG(ctx) ((ctx)->opts->connectlog||!(ctx)->opts->detach) -#define WANT_CONTENT_LOG(ctx) ((ctx)->opts->contentlog&&!(ctx)->passthrough) - -static pxy_conn_ctx_t * -pxy_conn_ctx_new(proxyspec * spec, tfe_config * opts, tfe_thread_manager_ctx * thrmgr, - evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) -MALLOC NONNULL(1, 2, 3); - -static tfe_stream * -pxy_conn_ctx_new(proxyspec * spec, tfe_config * opts, tfe_thread_manager_ctx * thrmgr, - evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) -{ - pxy_conn_ctx_t * ctx = (pxy_conn_ctx_t *) malloc(sizeof(pxy_conn_ctx_t)); - if (!ctx) return NULL; - - memset(ctx, 0, sizeof(pxy_conn_ctx_t)); - ctx->spec = spec; - ctx->opts = opts; - ctx->clienthello_search = spec->upgrade; - ctx->fd = fd_downstream; - ctx->peer_fd = fd_upstream; - - ctx->thridx = tfe_thread_manager_attach(thrmgr, &ctx->evbase, &ctx->dnsbase); - ctx->thrmgr = thrmgr; - - CLOG(DEBUG, "conntrace") << string_format("ctx = %p created.", ctx); - - TFE_STAT(TFE_STAT_CONN_CTX_NEW); - return ctx; -} - - -/* forward declaration of libevent callbacks */ -static void pxy_bev_readcb(struct bufferevent *, void *); -static void pxy_bev_writecb(struct bufferevent *, void *); -static void pxy_bev_eventcb(struct bufferevent *, short, void *); -static void stream_fd_readcb(evutil_socket_t, short, void *); - -/* - * Free bufferenvent and close underlying socket properly. - * For OpenSSL bufferevents, this will shutdown the SSL connection. - */ -static void -bufferevent_free_and_close_fd(struct bufferevent * bev, pxy_conn_ctx_t * ctx) -{ - evutil_socket_t fd = bufferevent_getfd(bev); - SSL * ssl = NULL; - - if ((ctx->spec->ssl || ctx->clienthello_found) && !ctx->passthrough) - { - ssl = bufferevent_openssl_get_ssl(bev); /* does not inc refc */ - } - - CLOG(DEBUG, "conntrace") << string_format("%p %p free_and_close_fd", ctx, bev); - bufferevent_free(bev); /* does not free SSL unless the option - BEV_OPT_CLOSE_ON_FREE was set */ - if (ssl) - { - pxy_ssl_shutdown(ctx->opts, ctx->evbase, ssl, fd); - } - else - { - evutil_closesocket(fd); - CLOG(DEBUG, "conntrace") << string_format("%p %p fd %d closed", ctx, bev, fd); - } - - return; -} - -/* - * Set up a bufferevent structure for either a dst or src connection, - * optionally with or without SSL. Sets all callbacks, enables read - * and write events, but does not call bufferevent_socket_connect(). - * - * For dst connections, pass -1 as fd. Pass a pointer to an initialized - * SSL struct as ssl if the connection should use SSL. - * - * Returns pointer to initialized bufferevent structure, as returned - * by bufferevent_socket_new() or bufferevent_openssl_socket_new(). - */ -static struct bufferevent * stream_bufferevent_new(enum tfe_conn_dir direction,enum tfe_session_proto session_type, void* session_ctx, evutil_socket_t fd, struct event_base* evbase, void* cb_para) -{ -// pxy_conn_ctx_t * ctx, evutil_socket_t fd, -// SSL * ssl, bool is_upstream = false - - struct bufferevent * bev = NULL; - struct tfe_stream_private* _stream, enum tfe_conn_dir direction; - /* 上行,SSL的状态全部为CONNECTING */ - switch(_stream->session_type) - { - case SESSION_PROTO_SSL: - SSL* ssl=(SSL*)session_ctx; - bev = bufferevent_openssl_socket_new(evbase, fd, ssl, ((direction == CONN_DIR_UPSTREAM) ? BUFFEREVENT_SSL_CONNECTING - : BUFFEREVENT_SSL_ACCEPTING), BEV_OPT_DEFER_CALLBACKS); - /* Prevent unclean (dirty) shutdowns to cause error - * events on the SSL socket bufferevent. */ - bufferevent_openssl_set_allow_dirty_shutdown(bev, 1); - break; - default: - bev = bufferevent_socket_new(evbase, fd, BEV_OPT_DEFER_CALLBACKS); - break; - } - bufferevent_setcb(bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, cb_para); - bufferevent_enable(bev, EV_READ | EV_WRITE); - return bev; -} - -static int pxy_ocsp_is_valid_uri(const char * uri, pxy_conn_ctx_t * ctx) -{ - char * buf_url; - size_t sz_url; - char * buf_b64; - size_t sz_b64; - unsigned char * buf_asn1; - size_t sz_asn1; - int ret; - - buf_url = (char *) strrchr(uri, '/'); - if (!buf_url) - return 0; - buf_url++; - - /* - * Do some quick checks to avoid unnecessary buffer allocations and - * decoding URL, Base64 and ASN.1: - * - OCSP requests begin with a SEQUENCE (0x30), so the first Base64 - * byte is 'M' or, unlikely but legal, the URL encoding thereof. - * - There should be no query string in OCSP GET requests. - * - Encoded OCSP request ASN.1 blobs are longer than 32 bytes. - */ - if (buf_url[0] != 'M' && buf_url[0] != '%') - return 0; - if (strchr(uri, '?')) - return 0; - sz_url = strlen(buf_url); - if (sz_url < 32) - return 0; - buf_b64 = url_dec(buf_url, sz_url, &sz_b64); - if (!buf_b64) - { - ctx->enomem = 1; - return 0; - } - buf_asn1 = base64_dec(buf_b64, sz_b64, &sz_asn1); - if (!buf_asn1) - { - ctx->enomem = 1; - free(buf_b64); - return 0; - } - ret = ssl_is_ocspreq(buf_asn1, sz_asn1); - free(buf_asn1); - free(buf_b64); - return ret; -} - -/* - * Called after a request header was completely read. - * If the request is an OCSP request, deny the request by sending an - * OCSP response of type tryLater and close the connection to the server. - * - * Reference: - * RFC 2560: X.509 Internet PKI Online Certificate Status Protocol (OCSP) - */ -static void pxy_ocsp_deny(pxy_conn_ctx_t * ctx) -{ - struct evbuffer * inbuf, * outbuf; - static const char ocspresp[] = - "HTTP/1.0 200 OK\r\n" - "Content-Type: application/ocsp-response\r\n" - "Content-Length: 5\r\n" - "Connection: close\r\n" - "\r\n" - "\x30\x03" /* OCSPResponse: SEQUENCE */ - "\x0a\x01" /* OCSPResponseStatus: ENUMERATED */ - "\x03"; /* tryLater (3) */ - - if (!ctx->http_method) - return; - if (!strncasecmp(ctx->http_method, "GET", 3) && - pxy_ocsp_is_valid_uri(ctx->http_uri, ctx)) - goto deny; - if (!strncasecmp(ctx->http_method, "POST", 4) && - ctx->http_content_type && - !strncasecmp(ctx->http_content_type, - "application/ocsp-request", 24)) - goto deny; - return; - -deny: - inbuf = bufferevent_get_input(ctx->src.bev); - outbuf = bufferevent_get_output(ctx->src.bev); - - if (evbuffer_get_length(inbuf) > 0) - { - evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); - } - - bufferevent_free_and_close_fd(ctx->dst.bev, ctx); - ctx->dst.bev = NULL; - ctx->dst.closed = 1; - evbuffer_add_printf(outbuf, ocspresp); - ctx->ocsp_denied = 1; -} - -/* - * Peek into pending data to see if it is an SSL/TLS ClientHello, and if so, - * upgrade the connection from plain TCP to SSL/TLS. - * - * Return 1 if ClientHello was found and connection was upgraded to SSL/TLS, - * 0 otherwise. - * - * WARNING: This is experimental code and will need to be improved. - * - * TODO - enable search and skip bytes before ClientHello in case it does not - * start at offset 0 (i.e. chello > vec_out[0].iov_base) - * TODO - peek into more than just the current segment - * TODO - add retry mechanism for short truncated ClientHello, possibly generic - */ -int -pxy_conn_autossl_peek_and_upgrade(pxy_conn_ctx_t * ctx) -{ - struct evbuffer * inbuf; - struct evbuffer_iovec vec_out[1]; - const unsigned char * chello; - if (OPTS_DEBUG(ctx->opts)) - { - log_dbg_printf("Checking for a client hello\n"); - } - /* peek the buffer */ - inbuf = bufferevent_get_input(ctx->src.bev); - if (evbuffer_peek(inbuf, 1024, 0, vec_out, 1)) - { - if (ssl_tls_clienthello_parse((const unsigned char *) vec_out[0].iov_base, - vec_out[0].iov_len, 0, &chello, &ctx->sni) == 0) - { - if (OPTS_DEBUG(ctx->opts)) - { - log_dbg_printf("Peek found ClientHello\n"); - } - ctx->dst.ssl = upstream_ssl_create(ctx); - if (!ctx->dst.ssl) - { - log_err_printf("Error creating SSL for " - "upgrade\n"); - return 0; - } - ctx->dst.bev = bufferevent_openssl_filter_new( - ctx->evbase, ctx->dst.bev, ctx->dst.ssl, - BUFFEREVENT_SSL_CONNECTING, - BEV_OPT_DEFER_CALLBACKS); - if (!ctx->dst.bev) - { - return 0; - } - bufferevent_setcb(ctx->dst.bev, pxy_bev_readcb, - pxy_bev_writecb, pxy_bev_eventcb, - ctx); - bufferevent_enable(ctx->dst.bev, EV_READ | EV_WRITE); - if (OPTS_DEBUG(ctx->opts)) - { - log_err_printf("Replaced dst bufferevent, new " - "one is %p\n", - (void *) ctx->dst.bev); - } - ctx->clienthello_search = 0; - ctx->clienthello_found = 1; - return 1; - } - else - { - if (OPTS_DEBUG(ctx->opts)) - { - log_dbg_printf("Peek found no ClientHello\n"); - } - return 0; - } - } - return 0; -} - -static void -pxy_conn_terminate_free(pxy_conn_ctx_t * ctx, int is_requestor) -{ - log_err_printf("Terminating connection%s!\n", - ctx->enomem ? " (out of memory)" : ""); - - if (ctx->dst.bev && !ctx->dst.closed) - { - bufferevent_free_and_close_fd(ctx->dst.bev, ctx); - ctx->dst.bev = NULL; - } - - if (ctx->src.bev && !ctx->src.closed) - { - bufferevent_free_and_close_fd(ctx->src.bev, ctx); - ctx->src.bev = NULL; - } - - tfe_stream_free(ctx, is_requestor); -} - -extern int pxy_http_read_cb(pxy_conn_ctx_t * ctx, struct bufferevent * bev); - -void * __http1_connection_ctx_setup(pxy_conn_ctx_t * ctx) -{ - const auto & __source_address = ctx->spec->listen_addr; - const auto & __dest_address = ctx->spec->connect_addr; - - auto __http1_connection_ctx = std::make_unique( - ctx->src.bev, ctx->dst.bev, __source_address, __dest_address); - - auto __http_scan_ctx = std::make_shared(g_tfe_instance, g_tfe_config); - - __http_scan_ctx->handlerConnectionCreate(*__http1_connection_ctx); - __http_scan_ctx->handlerConnectionClose(*__http1_connection_ctx); - - return __http1_connection_ctx.release(); -} - -/* - * Callback for read events on the up- and downstream connection bufferevents. - * Called when there is data ready in the input evbuffer. - */ - -static void pxy_bev_readcb(struct bufferevent * bev, void * arg) -{ -#if 0 - TIMED_FUNC(timerObj); -#endif - - pxy_conn_ctx_t * ctx = (pxy_conn_ctx_t *) arg; - pxy_conn_desc_t * conn_this = (bev == ctx->src.bev) ? &ctx->src : &ctx->dst; - pxy_conn_desc_t * conn_other = (bev == ctx->src.bev) ? &ctx->dst : &ctx->src; - -#if 0 - CLOG(DEBUG, "conntrace") << "ctx = " << ctx << "direction = " << (bev == ctx->src.bev); - CLOG(DEBUG, "conntrace") << "inbuf length: " << evbuffer_get_length(bufferevent_get_input(bev)); - CLOG(DEBUG, "conntrace") << "outbuf length: " << evbuffer_get_length(bufferevent_get_output(conn_other->bev)); -#endif - - if (!ctx->connected) - { - log_err_printf("readcb called when other end not connected - aborting.\n"); - return; - } - - if (ctx->clienthello_search && pxy_conn_autossl_peek_and_upgrade(ctx)) - { - return; - } - - struct evbuffer * inbuf = bufferevent_get_input(bev); - struct evbuffer * outbuf = bufferevent_get_output(conn_other->bev); - - if (conn_other->closed) - { - log_dbg_printf("Warning: Drained %zu bytes (conn closed)\n", evbuffer_get_length(inbuf)); - evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); - return; - } - - if (ctx->spec->http && ctx->protocol_conn_ctx == nullptr) - { - ctx->protocol_conn_ctx = __http1_connection_ctx_setup(ctx); - } - - auto * http1_connection = reinterpret_cast(ctx->protocol_conn_ctx); - - if (ctx->spec->http && (bev == ctx->src.bev) && !ctx->passthrough) - { - http1_connection->on_connection_read_request(ctx, conn_this, conn_other); - } - - if (ctx->spec->http && (bev == ctx->dst.bev) && !ctx->passthrough) - { - http1_connection->on_connection_read_response(ctx, conn_this, conn_other); - } - - if (ctx->spec->http && http1_connection->NeedToClose()) - { - pxy_conn_terminate_free(ctx, (bev == ctx->src.bev)); - return; - } - - /* out of memory condition? */ - if (ctx->enomem) - { - pxy_conn_terminate_free(ctx, (bev == ctx->src.bev)); - return; - } - - /* no data left after parsing headers? */ - if (evbuffer_get_length(inbuf) == 0) - return; - - if (ctx->passthrough) - { - evbuffer_add_buffer(outbuf, inbuf); - } - else if (evbuffer_get_length(inbuf) != 0) - { - bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); - } - - if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) - { - CLOG(DEBUG, "conntrace") << string_format("ctx = %p, exceed output limit, disable read", ctx); - - /* temporarily disable data source; - * set an appropriate watermark. */ - bufferevent_setwatermark(conn_other->bev, EV_WRITE, OUTBUF_LIMIT / 2, OUTBUF_LIMIT); - bufferevent_disable(bev, EV_READ); - } - - return; -} - -/* - * Callback for write events on the up- and downstream connection bufferevents. - * Called when either all data from the output evbuffer has been written, - * or if the outbuf is only half full again after having been full. - */ -static void pxy_bev_writecb(struct bufferevent * bev, void * arg) -{ - pxy_conn_ctx_t * ctx = (pxy_conn_ctx_t *) arg; - pxy_conn_desc_t * other = (bev == ctx->src.bev) ? &ctx->dst : &ctx->src; - - struct evbuffer * outbuf = bufferevent_get_output(bev); - -#if 0 - CLOG(DEBUG, "conntrace") << "ctx = " << ctx << "direction = " << (bev == ctx->src.bev); - CLOG(DEBUG, "conntrace") << "length: " << evbuffer_get_length(outbuf); -#endif - - if (other->closed) - { - struct evbuffer * outbuf = bufferevent_get_output(bev); - if (evbuffer_get_length(outbuf) == 0) - { - /* finished writing and other end is closed; - * close this end too and clean up memory */ - bufferevent_free_and_close_fd(bev, ctx); - tfe_stream_free(ctx, (bev == ctx->dst.bev)); - } - return; - } - - if (other->bev && !(bufferevent_get_enabled(other->bev) & EV_READ)) - { - /* data source temporarily disabled; - * re-enable and reset watermark to 0. */ - bufferevent_setwatermark(bev, EV_WRITE, 0, 0); - bufferevent_enable(other->bev, EV_READ); - } -} - -/* - * Callback for meta events on the up- and downstream connection bufferevents. - * Called when EOF has been reached, a connection has been made, and on errors. - */ -static void pxy_bev_eventcb(struct bufferevent * bev, short events, void * arg) -{ - pxy_conn_ctx_t * ctx = (pxy_conn_ctx_t *) arg; - pxy_conn_desc_t * this_conn = (bev == ctx->src.bev) ? &ctx->src : &ctx->dst; - pxy_conn_desc_t * other_conn = (bev == ctx->src.bev) ? &ctx->dst : &ctx->src; - - int is_requestor = (bev == ctx->src.bev); - - CLOG(DEBUG, "conntrace") << string_format("%p %p eventcb %s %s%s%s%s", arg, (void *) bev, - (bev == ctx->src.bev) ? "src" : "dst", - events & BEV_EVENT_CONNECTED ? "connected" : "", - events & BEV_EVENT_ERROR ? "error" : "", - events & BEV_EVENT_TIMEOUT ? "timeout" : "", - events & BEV_EVENT_EOF ? "eof" : ""); - - if (events & BEV_EVENT_CONNECTED) - { - if (bev != ctx->dst.bev) - { - LOG(DEBUG) << string_format("ctx = %p, src buffer event connected: ignoring event", ctx); - goto connected; - } - - /* dst has connected */ - ctx->connected = 1; - - /* wrap client-side socket in an eventbuffer */ - if ((ctx->spec->ssl || ctx->clienthello_found) && - !ctx->passthrough) - { - ctx->src.ssl = downstream_ssl_create(ctx, this_conn->ssl); - if (!ctx->src.ssl) - { - bufferevent_free_and_close_fd(bev, ctx); - ctx->dst.bev = NULL; - ctx->dst.ssl = NULL; - if (ctx->opts->passthrough && !ctx->enomem) - { - ctx->passthrough = 1; - ctx->connected = 0; - log_dbg_printf("No cert found; " - "falling back " - "to passthrough\n"); - stream_fd_readcb(ctx->fd, 0, ctx); - return; - } - evutil_closesocket(ctx->fd); - tfe_stream_free(ctx, 1); - return; - } - } - - if (ctx->clienthello_found) - { - if (OPTS_DEBUG(ctx->opts)) - { - log_dbg_printf("Completing autossl upgrade\n"); - } - - ctx->src.bev = bufferevent_openssl_filter_new(ctx->evbase, ctx->src.bev, ctx->src.ssl, - BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS); - - if (ctx->src.bev) - { - bufferevent_setcb(ctx->src.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx); - bufferevent_enable(ctx->src.bev, EV_READ | EV_WRITE); - } - } - else - { - ctx->src.bev = stream_bufferevent_new(ctx, ctx->fd, ctx->src.ssl); - } - - if (!ctx->src.bev) - { - if (ctx->src.ssl) - { - SSL_free(ctx->src.ssl); - ctx->src.ssl = NULL; - } - bufferevent_free_and_close_fd(bev, ctx); - evutil_closesocket(ctx->fd); - tfe_stream_free(ctx, 1); - return; - } - - /* prepare logging, part 2 */ - if (WANT_CONNECT_LOG(ctx) || WANT_CONTENT_LOG(ctx)) - { - if (sys_sockaddr_str((struct sockaddr *) - &ctx->addr, ctx->addrlen, - &ctx->dsthost_str, - &ctx->dstport_str) != 0) - { - ctx->enomem = 1; - pxy_conn_terminate_free(ctx, 1); - return; - } - - } - -connected: - /* log connection if we don't analyze any headers */ - if ((!this_conn->ssl || (bev == ctx->src.bev)) && - (!ctx->spec->http || ctx->passthrough) && - WANT_CONNECT_LOG(ctx)) - { - pxy_log_connect_nonhttp(ctx); - } - - if (this_conn->ssl) - { - /* write SSL certificates to gendir */ - if ((bev == ctx->src.bev) && ctx->opts->certgendir) - { - pxy_srccert_write(ctx); - } - - /* log master key */ - if (ctx->opts->masterkeylog) - { - char * keystr; - keystr = ssl_ssl_masterkey_to_str(this_conn->ssl); - if ((keystr == NULL)) - { - if (errno == ENOMEM) ctx->enomem = 1; - pxy_conn_terminate_free(ctx, 1); - return; - } - } - } - - if (this_conn->ssl) - { - char * keystr; - /* for SSL, we get two connect events */ - CLOG(DEBUG, "conntrace") << string_format("SSL connected %s [%s]:%s %s %s", - bev == ctx->dst.bev ? "to" : "from", - bev == ctx->dst.bev ? ctx->dsthost_str : ctx->srchost_str, - bev == ctx->dst.bev ? ctx->dstport_str : ctx->srcport_str, - SSL_get_version(this_conn->ssl), SSL_get_cipher(this_conn->ssl), this_conn); - - keystr = ssl_ssl_masterkey_to_str(this_conn->ssl); - if (keystr) - { - log_dbg_print_free(keystr); - } - } - else - { - /* for TCP, we get only a dst connect event, - * since src was already connected from the - * beginning; mirror SSL debug output anyway - * in order not to confuse anyone who might be - * looking closely at the output */ - CLOG(DEBUG, "conntrace") << string_format("TCP connected to [%s]:%s", ctx->dsthost_str, ctx->dstport_str); - CLOG(DEBUG, "conntrace") << string_format("TCP connected from [%s]:%s", ctx->srchost_str, ctx->srcport_str); - } - - return; - } - - if (events & BEV_EVENT_ERROR) - { - unsigned long sslerr; - int have_sslerr = 0; - - /* Can happen for socket errs, ssl errs; - * may happen for unclean ssl socket shutdowns. */ - sslerr = bufferevent_get_openssl_error(bev); - if (sslerr) - have_sslerr = 1; - if (!errno && !sslerr) - { -#if LIBEVENT_VERSION_NUMBER >= 0x02010000 - /* We have disabled notification for unclean shutdowns - * so this should not happen; log a warning. */ - log_err_printf("Warning: Spurious error from " - "bufferevent (errno=0,sslerr=0)\n"); -#else /* LIBEVENT_VERSION_NUMBER < 0x02010000 */ - /* Older versions of libevent will report these. */ - if (OPTS_DEBUG(ctx->opts)) { - log_dbg_printf("Unclean SSL shutdown.\n"); - } -#endif /* LIBEVENT_VERSION_NUMBER < 0x02010000 */ - } - else if (ERR_GET_REASON(sslerr) == - SSL_R_SSLV3_ALERT_HANDSHAKE_FAILURE) - { - /* these can happen due to client cert auth, - * only log error if debugging is activated */ - log_dbg_printf("Error from %s bufferevent: " - "%i:%s %lu:%i:%s:%i:%s:%i:%s\n", - (bev == ctx->src.bev) ? "src" : "dst", - errno, - errno ? strerror(errno) : "-", - sslerr, - ERR_GET_REASON(sslerr), - sslerr ? - ERR_reason_error_string(sslerr) : "-", - ERR_GET_LIB(sslerr), - sslerr ? - ERR_lib_error_string(sslerr) : "-", - ERR_GET_FUNC(sslerr), - sslerr ? - ERR_func_error_string(sslerr) : "-"); - while ((sslerr = bufferevent_get_openssl_error(bev))) - { - log_dbg_printf("Additional SSL error: " - "%lu:%i:%s:%i:%s:%i:%s\n", - sslerr, - ERR_GET_REASON(sslerr), - ERR_reason_error_string(sslerr), - ERR_GET_LIB(sslerr), - ERR_lib_error_string(sslerr), - ERR_GET_FUNC(sslerr), - ERR_func_error_string(sslerr)); - } - } - else - { - /* real errors */ - log_err_printf("Error from %s bufferevent: " - "%i:%s %lu:%i:%s:%i:%s:%i:%s\n", - (bev == ctx->src.bev) ? "src" : "dst", - errno, - errno ? strerror(errno) : "-", - sslerr, - ERR_GET_REASON(sslerr), - sslerr ? - ERR_reason_error_string(sslerr) : "-", - ERR_GET_LIB(sslerr), - sslerr ? - ERR_lib_error_string(sslerr) : "-", - ERR_GET_FUNC(sslerr), - sslerr ? - ERR_func_error_string(sslerr) : "-"); - while ((sslerr = bufferevent_get_openssl_error(bev))) - { - log_err_printf("Additional SSL error: " - "%lu:%i:%s:%i:%s:%i:%s\n", - sslerr, - ERR_GET_REASON(sslerr), - ERR_reason_error_string(sslerr), - ERR_GET_LIB(sslerr), - ERR_lib_error_string(sslerr), - ERR_GET_FUNC(sslerr), - ERR_func_error_string(sslerr)); - } - } - - if (!ctx->connected) - { - /* the callout to the original destination failed, - * e.g. because it asked for client cert auth, so - * close the accepted socket and clean up */ - if (bev == ctx->dst.bev && ctx->dst.ssl && - ctx->opts->passthrough && have_sslerr) - { - /* ssl callout failed, fall back to plain - * TCP passthrough of SSL connection */ - bufferevent_free_and_close_fd(bev, ctx); - ctx->dst.bev = NULL; - ctx->dst.ssl = NULL; - ctx->passthrough = 1; - log_dbg_printf("SSL dst connection failed; fal" - "ling back to passthrough\n"); - stream_fd_readcb(ctx->fd, 0, ctx); - return; - } - evutil_closesocket(ctx->fd); - other_conn->closed = 1; - } - else if (!other_conn->closed) - { - /* if the other end is still open and doesn't have data - * to send, close it, otherwise its writecb will close - * it after writing what's left in the output buffer */ - struct evbuffer * outbuf; - outbuf = bufferevent_get_output(other_conn->bev); - if (evbuffer_get_length(outbuf) == 0) - { - bufferevent_free_and_close_fd(other_conn->bev, ctx); - other_conn->bev = NULL; - other_conn->closed = 1; - } - } - goto leave; - } - - if (events & BEV_EVENT_EOF) - { -#ifdef DEBUG_PROXY - if (OPTS_DEBUG(ctx->opts)) { - log_dbg_printf("evbuffer size at EOF: " - "i:%zu o:%zu i:%zu o:%zu\n", - evbuffer_get_length( - bufferevent_get_input(bev)), - evbuffer_get_length( - bufferevent_get_output(bev)), - evbuffer_get_length( - other->closed ? 0 : - bufferevent_get_input(other->bev)), - evbuffer_get_length( - other->closed ? 0 : - bufferevent_get_output(other->bev)) - ); - } -#endif /* DEBUG_PROXY */ - if (!ctx->connected) - { - log_dbg_printf("EOF on outbound connection before " - "connection establishment\n"); - evutil_closesocket(ctx->fd); - other_conn->closed = 1; - } - else if (!other_conn->closed) - { - /* if there is data pending in the closed connection, - * handle it here, otherwise it will be lost. */ - if (evbuffer_get_length(bufferevent_get_input(bev))) - { - pxy_bev_readcb(bev, ctx); - } - /* if the other end is still open and doesn't - * have data to send, close it, otherwise its - * writecb will close it after writing what's - * left in the output buffer. */ - if (evbuffer_get_length(bufferevent_get_output(other_conn->bev)) == 0) - { - bufferevent_free_and_close_fd(other_conn->bev, ctx); - other_conn->bev = NULL; - other_conn->closed = 1; - } - } - goto leave; - } - - log_err_printf("Unknown bufferevent 0x%02X\n", (int) events); - return; - -leave: - /* we only get a single disconnect event here for both connections */ - if (OPTS_DEBUG(ctx->opts)) - { - log_dbg_printf("%s disconnected to [%s]:%s\n", - this_conn->ssl ? "SSL" : "TCP", - ctx->dsthost_str, ctx->dstport_str); - log_dbg_printf("%s disconnected from [%s]:%s\n", - this_conn->ssl ? "SSL" : "TCP", - ctx->srchost_str, ctx->srcport_str); - } - - this_conn->closed = 1; - bufferevent_free_and_close_fd(bev, ctx); - this_conn->bev = NULL; - if (other_conn->closed) - { - tfe_stream_free(ctx, is_requestor); - } -} - -/* - * Complete the connection. This gets called after finding out where to - * connect to. - */ -static void tfe_stream_connect(struct tfe_stream_private* _stream, enum tfe_conn_dir direction) -{ - struct tfe_stream* conn_pub=&(_stream->head); - struct event_base* evbase=STREAM_EVBASE(_stream); - /* create server-side socket and eventbuffer */ - switch(_stream->session_type) - { - case SESSION_PROTO_SSL: - if(!_stream->passthrough) - { - break; - } - _stream->ssl_upstream=ssl_upstream_create(_stream->thrmgr_ref->opts, _stream->ssl_downstream->sni); - _stream->head.upstream.bev = stream_bufferevent_new(SESSION_PROTO_SSL,CONN_DIR_UPSTREAM, _stream->ssl_upstream->ssl, _stream->fd_upstream, evbase, _stream); - break; - default: - stream_bufferevent_new(SESSION_PROTO_PLAIN, NULL, _stream->fd_upstream, evbase, _stream); - break; - } - - pxy_conn_ctx_t * ctx; - - /* 对于明文上游连接,直接出发CONNECT事件 - * 密文的上游连接,需要等SSL连接建立后,由libevent触发 - */ - if (ctx->peer_fd == 0) - { - bufferevent_socket_connect(ctx->dst.bev, (struct sockaddr *) &ctx->addr, ctx->addrlen); - } - if (ctx->peer_fd > 0 && ctx->spec->ssl == 0) - { - bufferevent_trigger_event(ctx->dst.bev, BEV_EVENT_CONNECTED, 0); - } - - return; - -__close_connection_with_ssl: - if (ctx->dst.ssl != nullptr) - { - SSL_free(ctx->dst.ssl); - ctx->dst.ssl = nullptr; - } - -__close_connection: - if (ctx->fd > 0) evutil_closesocket(ctx->fd); - if (ctx->peer_fd > 0) evutil_closesocket(ctx->peer_fd); - tfe_stream_free(ctx, 1); - - CLOG(DEBUG, "conntrace") << string_format("ctx = %p, fd = %d, peer fd = %d closed.", ctx, ctx->fd, ctx->peer_fd); - return; -} - -/* - * The SNI hostname has been resolved. Fill the first resolved address into - * the context and continue connecting. - */ -static void pxy_sni_resolve_cb(int errcode, struct evutil_addrinfo * ai, void * arg) -{ - pxy_conn_ctx_t * ctx = (pxy_conn_ctx_t *) arg; - - if (errcode) - { - log_err_printf("Cannot resolve SNI hostname '%s': %s\n", - ctx->sni, evutil_gai_strerror(errcode)); - evutil_closesocket(ctx->fd); - tfe_stream_free(ctx, 1); - return; - } - - memcpy(&ctx->addr, ai->ai_addr, ai->ai_addrlen); - ctx->addrlen = ai->ai_addrlen; - evutil_freeaddrinfo(ai); - tfe_stream_connect(ctx); -} - -/* - * The src fd is readable. This is used to sneak-preview the SNI on SSL - * connections. If ctx->ev is NULL, it was called manually for a non-SSL - * connection. If ctx->passthrough is set, it was called a second time - * after the first ssl callout failed because of client cert auth. - */ -static void stream_fd_readcb(evutil_socket_t fd, short what, void * arg) -{ - struct tfe_stream_private* stream = (struct tfe_stream_private*)arg; - unsigned char buf[1024]; - ssize_t n=0; - const unsigned char * chello=NULL; - int rv=0; - struct tfe_stats *p_stat=&(stream->thrmgr_ref->stat); - struct ssl_downstream* ssl_down=stream->ssl_downstream; - struct tfe_stream* pub=&(stream->head); - - /* for SSL, peek ClientHello and parse SNI from it */ - if (stream->session_type==SESSION_PROTO_SSL && !stream->passthrough) /*&& ctx->ev*/ - { - n = recv(fd, buf, sizeof(buf), MSG_PEEK); - if (n == -1) - { - log_err_printf("Error peeking on fd, aborting connection\n"); - goto __close_connection_fd; - } - - if (n == 0) - { - return; - } - - rv = ssl_tls_clienthello_parse(buf, n, 0, &chello, &ssl_down->sni); - if ((rv == 1) && !chello) - { - log_err_printf("Peeking did not yield a (truncated) ClientHello message, aborting connection\n"); - goto __close_connection_fd; - } - - if ((rv == 1) && chello && (ssl_down->sni_peek_retries++ < 50)) - { - /* ssl_tls_clienthello_parse indicates that we - * should retry later when we have more data, and we - * haven't reached the maximum retry count yet. - * Reschedule this event as timeout-only event in - * order to prevent busy looping over the read event. - * Because we only peeked at the pending bytes and - * never actually read them, fd is still ready for - * reading now. We use 25 * 0.2 s = 5 s timeout. */ - struct timeval retry_delay = {0, 100}; - - event_free(ssl_down->ev); - ssl_down->ev = event_new(stream->thrmgr_ref->evbase, fd, 0, stream_fd_readcb, stream); - if (!ssl_down->ev) - { - log_err_printf("Error creating retry event, aborting connection\n"); - goto __close_connection_fd; - } - - event_add(ssl_down->ev, &retry_delay); - return; - } - event_free(ssl_down->ev); - ssl_down->ev = NULL; - } - tfe_stream_connect(stream); - return; - -__close_connection_fd: - p_stat->value[SNI_PEAK_FAIL]++; - if (stream->fd_downstream > 0) - { - evutil_closesocket(stream->fd_downstream); - } - - if (stream->fd_upstream > 0) - { - evutil_closesocket(private->fd_upstream); - } - - tfe_stream_free(stream, 1); - return; -} - -static void tfe_stream_free(struct tfe_stream_private* stream) -{ - pxy_conn_ctx_t * ctx; - int by_requestor; - struct tfe_thread_ctx* thread=stream->thrmgr_ref; - thread->load--; - switch (stream->session_type) - { - case SESSION_PROTO_SSL: - ssl_upstream_free(stream->ssl_upstream); - ssl_downstream_free(stream->ssl_downstream); - thread->stat.value[SSL_NUM]--; - break; - default: - break; - } - free(stream); - thread->stat.value[STREAM_NUM]--; - return; -} -void ssl_get_cert_on_succ(void * result, void * user) -{ - cert_t* cert=(cert_t*)result; - struct tfe_stream_private* _stream=(struct tfe_stream_private*)user; - _stream->ssl_downstream->ssl=downstream_ssl_create(_stream, ); - cert_free(cert); - - - future_destroy(_stream->ssl_downstream->future_get_cert); - _stream->ssl_downstream->future_get_cert=NULL; - return; -} -void ssl_get_cert_on_fail(enum e_future_error err, const char * what, void * user) -{ - //todo - assert(0); -} - -void ssl_conn_origin_on_succ(void * result, void * user) -{ - struct bufferevent *bev=(struct bufferevent *)result; - struct tfe_stream_private* _stream=(struct tfe_stream_private*)user; - - _stream->head.upstream.bev=bev; - _stream->ssl_upstream->ssl=bufferevent_openssl_get_ssl(bev); /* does not inc refc */ - _stream->ssl_upstream->orig_cert=SSL_get_peer_certificate(_stream->ssl_upstream->ssl); - - session_cache_set(_stream->thrmgr_ref->dsess_cache, struct sockaddr * addr, addrlen,_stream->ssl_downstream->sni, SSL_get0_session(_stream->ssl_upstream->ssl)); - - _stream->ssl_downstream->future_get_cert=future_create(ssl_get_cert_on_succ, ssl_get_cert_on_fail, _stream); - cert_mgr_async_get(_stream->thrmgr_ref->cert_mgr, _stream->ssl_downstream->sni, _stream->ssl_downstream->keyring_id,_stream->ssl_upstream->orig_cert); - - future_destroy(_stream->ssl_upstream->conn_ssl_srv); - _stream->ssl_upstream->conn_ssl_srv=NULL; - return; -} -void ssl_conn_origin_on_fail(enum e_future_error err, const char * what, void * user) -{ - //todo - assert(0); -} - - -void peek_sni_on_succ(void* result, void* user) -{ - struct tfe_stream_private* _stream=(struct tfe_stream_private*)user; - assert(_stream->session_type==SESSION_PROTO_SSL); - _stream->ssl_downstream->sni=tfe_strdup((const char *)result); - - future_destroy(ssl_downstream->future_sni_peek); - ssl_downstream->future_sni_peek=NULL; - - _stream->ssl_upstream=ALLOC(struct ssl_upstream,1); - _stream->ssl_upstream->conn_ssl_srv=future_create(ssl_conn_origin_on_succ, ssl_conn_origin_on_fail, _stream); - ssl_async_connect_origin(_stream->ssl_upstream->conn_ssl_srv, _stream->fd_upstream, _stream->ssl_downstream->sni, _stream->thrmgr_ref->evbase, _stream->thrmgr_ref->opts); - -} -void peek_sni_on_fail(enum e_future_error err, const char * what, void * user) -{ - //todo - assert(0); -} - - -/* - * Callback for accept events on the socket listener bufferevent. - * Called when a new incoming connection has been accepted. - * Initiates the connection to the server. The incoming connection - * from the client is not being activated until we have a successful - * connection to the server, because we need the server's certificate - * in order to set up the SSL session to the client. - * For consistency, plain TCP works the same way, even if we could - * start reading from the client while waiting on the connection to - * the server to connect. - */ -void tfe_stream_setup(struct tfe_stream_private* _stream) -{ - struct future* f_sni=NULL; - tfe_thread_ctx* thread=_stream->thrmgr_ref; - switch (_stream->session_type) - { - case SESSION_PROTO_SSL: - // for SSL, defer dst connection setup to initial_readcb - _stream->ssl_downstream=ssl_downstream_create(); - _stream->async_future=future_create(peek_sni_on_succ, peek_sni_on_fail, _stream); - ssl_async_peek_sni(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream, _stream->thrmgr_ref->evbase); - thread->stat.value[SSL_NUM]++; - break - default: - stream_fd_readcb(_stream->fd_downstream, 0, _stream); - break; - } - return; -} - -struct tfe_stream_private* tfe_stream_create(evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, - struct sockaddr * peeraddr, int peeraddrlen, - enum tfe_session_proto session_type, tfe_thread_ctx* thread) -{ - struct tfe_stream_private* conn_private=NULL; - struct tfe_stream* conn_public=NULL; - conn_private= ALLOC(struct tfe_stream_private, 1); - conn_private->session_type=session_type; - conn_private->fd_downstream=fd_downstream; - conn_private->fd_upstream=fd_upstream; - conn_private->thrmgr_ref=thread; - conn_public=&(conn_private->head); - addr_sock2layer(conn_public->downstream.addr,peeraddr,peeraddrlen); - thread->stat.value[STREAM_NUM]++; - return; -} - -/* vim: set noet ft=c: */ + +#include "tfe_stream.h" +#include "tfe_util.h" +#include "opts.h" +#include "attrib.h" + + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#define STREAM_EVBASE(s) ((s)->thrmgr_ref->evbase) +/* + * Maximum size of data to buffer per connection direction before + * temporarily stopping to read data from the other end. + */ +#define OUTBUF_LIMIT (1024*1024) + +/* + * Print helper for logging code. + */ +#define STRORDASH(x) (((x)&&*(x))?(x):"-") + +/* + * Context used for all server http_sessions_. + */ +#ifdef USE_SSL_SESSION_ID_CONTEXT +static unsigned long ssl_session_context = 0x31415926; +#endif /* USE_SSL_SESSION_ID_CONTEXT */ + + +/* forward declaration of libevent callbacks */ +static void tfe_stream_readcb(struct bufferevent *, void *); +static void tfe_stream_writecb(struct bufferevent *, void *); +static void tfe_stream_eventcb(struct bufferevent *, short, void *); +static void stream_fd_readcb(evutil_socket_t, short, void *); + + +void tfe_stream_detach(const struct tfe_stream* stream) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)stream; + int plug_id=_stream->calling_idx; + _stream->plug_ctx[plug_id].state=PLUG_STATE_DETACHED; + return; +} +int tfe_stream_preempt(const struct tfe_stream* stream) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)stream; + int plug_id=_stream->calling_idx; + int i=0; + for(i=0;i<_stream->plugin_num;i++) + { + if(_stream->plug_ctx[i].state==PLUG_STATE_PREEPTION) + { + return -1; + } + } + _stream->plug_ctx[plug_id].state=PLUG_STATE_PREEPTION; + return 0; +} + +inline struct tfe_conn_private* __this_conn(struct tfe_stream_private* _stream, enum tfe_conn_dir dir) +{ + struct tfe_conn_private* this_conn=NULL; + this_conn=((dir==CONN_DIR_UPSTREAM)? &(_stream->conn_downstream):&(_stream->conn_upstream)); + return this_conn; +} +inline struct tfe_conn_private* __peer_conn(struct tfe_stream_private* _stream, enum tfe_conn_dir dir) +{ + struct tfe_conn_private* peer_conn=NULL; + peer_conn=(dir==CONN_DIR_UPSTREAM)? &(_stream->conn_downstream):&(_stream->conn_upstream); + return peer_conn; +} +struct tfe_stream_write_ctx* tfe_stream_write_frag_start(const struct tfe_stream* stream, enum tfe_conn_dir dir) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)stream; + struct tfe_conn_private* this_conn=__this_conn(_stream, dir); + struct tfe_conn_private* peer_conn=__peer_conn(_stream, dir); + if(this_conn->on_writing==1) + { + return NULL; + } + this_conn->w_ctx.dir=dir; + this_conn->w_ctx._stream=_stream; + this_conn->on_writing=1; + bufferevent_disable(peer_conn->bev, EV_READ); + return &(this_conn->w_ctx); +} + +int tfe_stream_write_frag(struct tfe_stream_write_ctx* w_ctx,const unsigned char *data, size_t size) +{ + struct tfe_conn_private* this_conn=__this_conn(w_ctx->_stream, w_ctx->dir);; + int ret=bufferevent_write(this_conn->bev, data, size); + return ret; +} +void tfe_stream_write_frag_end(struct tfe_stream_write_ctx* w_ctx) +{ + struct tfe_conn_private* this_conn=__this_conn(w_ctx->_stream, w_ctx->dir); + struct tfe_conn_private* peer_conn=__peer_conn(w_ctx->_stream, w_ctx->dir); + this_conn->on_writing=0; + bufferevent_enable(peer_conn->bev, EV_READ); + return; +} + +int tfe_stream_write(const struct tfe_stream* stream, enum tfe_conn_dir dir, const unsigned char *data, size_t size) +{ + int ret=0; + struct tfe_stream_write_ctx* wctx=tfe_stream_write_frag_start( stream, dir); + ret=tfe_stream_write_frag(wctx, data, size); + tfe_stream_write_frag_end(wctx); + return ret; +} +/* + + #define ON_OPEN_CALL 0 + #define ON_DATA_CALL 1 + #define ON_CLOSE_CALL 2 +enum tfe_stream_action tfe_stream_call_plugin(struct tfe_stream_private* _stream, enum tfe_conn_dir dir, int what, struct evbuffer * inbuf) +{ + size_t contigous_len=evbuffer_get_length(inbuf),drain_size=0; + const char* contiguous_data=evbuffer_pullup(inbuf,contigous_len); + int i=0,ret=0; + int plug_num=_stream->thrmgr_ref->module_num; + + const struct tfe_plugin* plugins=_stream->thrmgr_ref->modules; + struct plugin_ctx* plug_ctx=NULL; + enum tfe_stream_action action_tmp=ACTION_FORWARD_DATA, action_final=ACTION_FORWARD_DATA; + + _stream->defere_bytes=0; + _stream->drop_bytes=0; + _stream->forward_bytes=0; + switch(what) + { + case ON_OPEN_CALL: + for(i=0;ithrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + if(plug_ctx->state=PLUG_STATE_PREEPTION) + { + action_final=action_tmp; + } + } + + break; + case ON_DATA_CALL: + for(i=0;ithrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + if(plug_ctx->state=PLUG_STATE_PREEPTION) + { + action_final=action_tmp; + } + } + case ON_CLOSE_CALL: + for(i=0;ithrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + if(plug_ctx->state=PLUG_STATE_PREEPTION) + { + action_final=action_tmp; + } + } + } + for(i=0;icalling_idx=i; + switch(what) + { + } + plug_ctx=_stream->plug_ctx+i; + if(_stream->is_fisrt_read==1) + { + action_tmp=plugins[i].on_open(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + _stream->is_fisrt_read=0; + } + else + { + action_tmp=plugins[i].on_data(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + } + if(plug_ctx->state=PLUG_STATE_PREEPTION) + { + action_final=action_tmp; + } + } + +} +*/ +/* + * Callback for read events on the up- and downstream connection bufferevents. + * Called when there is data ready in the input evbuffer. + */ + +static void tfe_stream_readcb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)arg; + enum tfe_conn_dir dir=(bev == _stream->conn_downstream.bev)? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; + struct tfe_conn_private* this_conn=__this_conn(_stream, dir); + struct tfe_conn_private* peer_conn= __peer_conn(_stream, dir); + + int i=0,ret=0; + enum tfe_stream_action action_tmp=ACTION_FORWARD_DATA, action_final=ACTION_FORWARD_DATA; + + + + const struct tfe_plugin* plugins=_stream->thrmgr_ref->modules; + struct plugin_ctx* plug_ctx=NULL; + int plug_num=_stream->thrmgr_ref->module_num; + + struct evbuffer * inbuf = bufferevent_get_input(bev); + struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); + size_t contigous_len=evbuffer_get_length(inbuf),drain_size=0; + const char* contiguous_data=evbuffer_pullup(inbuf,contigous_len); + _stream->defere_bytes=0; + _stream->drop_bytes=0; + _stream->forward_bytes=0; + for(i=0;icalling_idx=i; + plug_ctx=_stream->plug_ctx+i; + if(_stream->is_fisrt_read==1) + { + action_tmp=plugins[i].on_open(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + _stream->is_fisrt_read=0; + } + else + { + action_tmp=plugins[i].on_data(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); + } + if(plug_ctx->state=PLUG_STATE_PREEPTION) + { + action_final=action_tmp; + } + } + switch (action_final) + { + case ACTION_FORWARD_DATA: + if(_stream->forward_bytes>0) + { + evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes); + } + else + { + evbuffer_add_buffer(outbuf, inbuf); + } + break; + case ACTION_DROP_DATA: + if(_stream->drop_bytes>0) + { + drain_size=_stream->drop_bytes; + } + else + { + drain_size= evbuffer_get_length(inbuf); + } + evbuffer_drain(inbuf,drain_size); + case ACTION_DEFER_DATA: + if(_stream->defere_bytes>0) + { + bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0); + } + break; + default: + assert(0); + break; + } + if(evbuffer_get_length(inbuf) != 0) + { + bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); + } + + if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) + { + /* temporarily disable data source; + * set an appropriate watermark. */ + bufferevent_setwatermark(peer_conn->bev, EV_WRITE, OUTBUF_LIMIT / 2, OUTBUF_LIMIT); + bufferevent_disable(bev, EV_READ); + } + + return; +} + +/* + * Callback for write events on the up- and downstream connection bufferevents. + * Called when either all data from the output evbuffer has been written, + * or if the outbuf is only half full again after having been full. + */ +static void tfe_stream_writecb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)arg; + enum tfe_conn_dir dir=(bev == _stream->conn_downstream.bev)? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; + struct tfe_conn_private* this_conn=__this_conn(_stream, dir); + struct tfe_conn_private* peer_conn= __peer_conn(_stream, dir); + + struct evbuffer * outbuf = bufferevent_get_output(bev); + + if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ)) + { + /* data source temporarily disabled; + * re-enable and reset watermark to 0. */ + bufferevent_setwatermark(bev, EV_WRITE, 0, 0); + bufferevent_enable(peer_conn->bev, EV_READ); + } +} + +/* + * Callback for meta events on the up- and downstream connection bufferevents. + * Called when EOF has been reached, a connection has been made, and on errors. + */ +static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)arg; + enum tfe_conn_dir dir=(bev == _stream->conn_downstream.bev)? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; + struct tfe_conn_private* this_conn=__this_conn(_stream, dir); + struct tfe_conn_private* peer_conn= __peer_conn(_stream, dir); + + const struct tfe_plugin* plugins=_stream->thrmgr_ref->modules; + struct plugin_ctx* plug_ctx=NULL; + int plug_num=_stream->thrmgr_ref->module_num,i=0; + enum tfe_stream_close_reason reason=REASON_PASSIVE_CLOSED; + + if (events & BEV_EVENT_ERROR) + { + this_conn->closed=1; + reason=REASON_ERROR; + goto call_plugin_close; + } + + if (events & BEV_EVENT_EOF) + { + //generate a 0 size read callback to notify plugins. + tfe_stream_readcb(bev, arg); + this_conn->closed=1; + } + if(peer_conn->closed==1&&this_conn->closed==1) + { + reason=REASON_PASSIVE_CLOSED; + goto call_plugin_close; + } + return; +call_plugin_close: + for(i=0;icalling_idx=i; + plug_ctx=_stream->plug_ctx+i; + plugins[i].on_close(&(_stream.head), _stream->thrmgr_ref->thread_id, reason, &(plug_ctx->pme)); + } + tfe_stream_free(_stream); + return; + +} + +static void tfe_stream_free(struct tfe_stream_private* stream) +{ + pxy_conn_ctx_t * ctx; + int by_requestor; + struct tfe_thread_ctx* thread=stream->thrmgr_ref; + thread->load--; + switch (stream->session_type) + { + case SESSION_PROTO_SSL: + ssl_upstream_free(stream->ssl_upstream); + ssl_downstream_free(stream->ssl_downstream); + thread->stat.value[SSL_NUM]--; + break; + default: + break; + } + free(stream); + thread->stat.value[STREAM_NUM]--; + return; +} + +void ssl_get_cert_on_succ(void * result, void * user) +{ + cert_t* cert=(cert_t*)result; + struct tfe_stream_private* _stream=(struct tfe_stream_private*)user; + _stream->ssl_downstream->ssl=downstream_ssl_create(_stream, ); + cert_free(cert); + + bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); + bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); + bufferevent_enable(bev, EV_READ | EV_WRITE); + + + future_destroy(_stream->ssl_downstream->future_get_cert); + _stream->ssl_downstream->future_get_cert=NULL; + return; +} +void ssl_get_cert_on_fail(enum e_future_error err, const char * what, void * user) +{ + //todo + assert(0); +} + +void ssl_conn_origin_on_succ(void * result, void * user) +{ + struct bufferevent *bev=(struct bufferevent *)result; + struct tfe_stream_private* _stream=(struct tfe_stream_private*)user; + + _stream->head.upstream.bev=bev; + _stream->ssl_upstream->ssl=bufferevent_openssl_get_ssl(bev); /* does not inc refc */ + _stream->ssl_upstream->orig_cert=SSL_get_peer_certificate(_stream->ssl_upstream->ssl); + + session_cache_set(_stream->thrmgr_ref->dsess_cache, struct sockaddr * addr, addrlen,_stream->ssl_downstream->sni, SSL_get0_session(_stream->ssl_upstream->ssl)); + + _stream->ssl_downstream->future_get_cert=future_create(ssl_get_cert_on_succ, ssl_get_cert_on_fail, _stream); + cert_mgr_async_get(_stream->thrmgr_ref->cert_mgr, _stream->ssl_downstream->sni, _stream->ssl_downstream->keyring_id,_stream->ssl_upstream->orig_cert); + + future_destroy(_stream->ssl_upstream->conn_ssl_srv); + _stream->ssl_upstream->conn_ssl_srv=NULL; + return; +} +void ssl_conn_origin_on_fail(enum e_future_error err, const char * what, void * user) +{ + //todo + assert(0); +} + + +void peek_sni_on_succ(void* result, void* user) +{ + struct tfe_stream_private* _stream=(struct tfe_stream_private*)user; + assert(_stream->session_type==SESSION_PROTO_SSL); + _stream->ssl_downstream->sni=tfe_strdup((const char *)result); + + future_destroy(ssl_downstream->future_sni_peek); + ssl_downstream->future_sni_peek=NULL; + + _stream->ssl_upstream=ALLOC(struct ssl_upstream,1); + _stream->ssl_upstream->conn_ssl_srv=future_create(ssl_conn_origin_on_succ, ssl_conn_origin_on_fail, _stream); + ssl_async_connect_origin(_stream->ssl_upstream->conn_ssl_srv, _stream->fd_upstream, _stream->ssl_downstream->sni, _stream->thrmgr_ref->evbase, _stream->thrmgr_ref->opts); + +} +void peek_sni_on_fail(enum e_future_error err, const char * what, void * user) +{ + //todo + assert(0); +} + + +/* + * Callback for accept events on the socket listener bufferevent. + * Called when a new incoming connection has been accepted. + * Initiates the connection to the server. The incoming connection + * from the client is not being activated until we have a successful + * connection to the server, because we need the server's certificate + * in order to set up the SSL session to the client. + * For consistency, plain TCP works the same way, even if we could + * start reading from the client while waiting on the connection to + * the server to connect. + */ +void tfe_stream_setup(struct tfe_stream_private* _stream) +{ + struct future* f_sni=NULL; + tfe_thread_ctx* thread=_stream->thrmgr_ref; + switch (_stream->session_type) + { + case SESSION_PROTO_SSL: + // for SSL, defer dst connection setup to initial_readcb + _stream->ssl_downstream=ssl_downstream_create(); + _stream->async_future=future_create(peek_sni_on_succ, peek_sni_on_fail, _stream); + ssl_async_peek_sni(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream, _stream->thrmgr_ref->evbase); + thread->stat.value[SSL_NUM]++; + break + default: + //todo: + stream_fd_readcb(_stream->fd_downstream, 0, _stream); + break; + } + return; +} + +struct tfe_stream_private* tfe_stream_create(evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, + struct sockaddr * peeraddr, int peeraddrlen, + enum tfe_session_proto session_type, tfe_thread_ctx* thread) +{ + struct tfe_stream_private* conn_private=NULL; + struct tfe_stream* conn_public=NULL; + conn_private= ALLOC(struct tfe_stream_private, 1); + conn_private->session_type=session_type; + conn_private->fd_downstream=fd_downstream; + conn_private->fd_upstream=fd_upstream; + conn_private->thrmgr_ref=thread; + conn_private->is_fisrt_read=1; + conn_public=&(conn_private->head); + addr_sock2layer(conn_public->downstream.addr,peeraddr,peeraddrlen); + thread->stat.value[STREAM_NUM]++; + return; +} + +/* vim: set noet ft=c: */ diff --git a/src/tfe_stream_internal.h b/src/tfe_stream_internal.h index 4c307c3..6a5f8d7 100644 --- a/src/tfe_stream_internal.h +++ b/src/tfe_stream_internal.h @@ -4,7 +4,7 @@ struct tfe_thread_ctx { pthread_t thr; - int thread_id; + unsigned int thread_id; size_t load; struct event_base *evbase; unsigned char running; @@ -13,7 +13,8 @@ struct tfe_thread_ctx cert_mgr cert_mgr; struct sess_cache* dsess_cache; struct sess_cache* ssess_cache; - + const int module_num; + const struct tfe_plugin* modules; }; @@ -37,12 +38,38 @@ struct ssl_upstream SSL *ssl; struct future* conn_ssl_srv; }; - +enum tfe_plugin_state +{ + PLUG_STATE_READONLY, + PLUG_STATE_PREEPTION, + PLUG_STATE_DETACHED +}; +struct plugin_ctx +{ + + enum tfe_plugin_state state; + void *pme; +}; +struct tfe_stream_write_ctx +{ + struct tfe_stream_private* _stream; + enum tfe_conn_dir dir; +}; +struct tfe_conn_private +{ + evutil_socket_t fd; + struct bufferevent *bev; + uint8_t on_writing; + uint8_t closed; + uint8_t need_shutdown; + struct tfe_stream_write_ctx w_ctx; +}; struct tfe_stream_private { struct tfe_stream head; enum tfe_session_proto session_type; - + struct tfe_conn_private conn_upstream; + struct tfe_conn_private conn_downstream; union { struct ssl_downstream* ssl_downstream; @@ -53,13 +80,18 @@ struct tfe_stream_private struct ssl_upstream* ssl_upstream; void* raw_upstream; }; - enum tfe_app_proto app_type; - - + uint8_t is_fisrt_read; + int calling_idx; + size_t forward_bytes; + size_t defere_bytes; + size_t drop_bytes; + enum tfe_app_proto app_proto; + int plugin_num; + struct plugin_ctx* plug_ctx; unsigned char passthrough; /* 1 if SSL passthrough is active */ evutil_socket_t fd_downstream, fd_upstream; - + struct tfe_thread_ctx* thrmgr_ref; future* async_future; };