#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT #define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024) #endif /* forward declaration of libevent callbacks */ static void tfe_stream_readcb(struct bufferevent *, void *); static void tfe_stream_writecb(struct bufferevent *, void *); static void tfe_stream_eventcb(struct bufferevent *, short, void *); static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream) { return container_of(stream, struct tfe_stream_private, head); } static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); } static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); } static inline enum tfe_conn_dir __DIR(struct tfe_stream_private * _stream, struct bufferevent * bev) { return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM); } static inline bool __IS_SSL(struct tfe_stream_private * _stream) { return (_stream->session_type == SESSION_PROTO_SSL); } void tfe_stream_detach(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); int plug_id = _stream->calling_idx; _stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED; return; } int tfe_stream_preempt(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); int plug_id = _stream->calling_idx; int i = 0; for (i = 0; i < _stream->plugin_num; i++) { if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION) { return -1; } } _stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION; return 0; } struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) { struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); if (this_conn->on_writing == 1) { return NULL; } this_conn->w_ctx.dir = dir; this_conn->w_ctx._stream = _stream; this_conn->on_writing = 1; bufferevent_disable(peer_conn->bev, EV_READ); return &(this_conn->w_ctx); } int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);; int ret = bufferevent_write(this_conn->bev, data, size); return ret; } void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) { struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * peer_conn = __PEER_CONN(w_ctx->_stream, w_ctx->dir); this_conn->on_writing = 0; bufferevent_enable(peer_conn->bev, EV_READ); return; } int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) { int ret = 0; struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir); ret = tfe_stream_write_frag(wctx, data, size); tfe_stream_write_frag_end(wctx); return ret; } /* * Callback for read events on the up- and downstream connection bufferevents. * Called when there is data ready in the input evbuffer. */ static void tfe_stream_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __DIR(_stream, bev); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); int i = 0, ret = 0; enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA; const struct tfe_plugin * plugins = _stream->thread_ref->modules; struct plugin_ctx * plug_ctx = NULL; int plug_num = _stream->thread_ref->nr_modules; struct evbuffer * inbuf = bufferevent_get_input(bev); struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0; const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len); _stream->defere_bytes = 0; _stream->drop_bytes = 0; _stream->forward_bytes = 0; for (i = 0; i < plug_num; i++) { _stream->calling_idx = i; plug_ctx = _stream->plug_ctx + i; if (_stream->is_plugin_opened == 0) { action_tmp = plugins[i].on_open(&_stream->head, _stream->thread_ref->thread_id, dir, contiguous_data, contigous_len, &(plug_ctx->pme)); _stream->is_plugin_opened = 1; } else { action_tmp = plugins[i].on_data(&_stream->head, _stream->thread_ref->thread_id, dir, contiguous_data, contigous_len, &(plug_ctx->pme)); } if (plug_ctx->state == PLUG_STATE_PREEPTION) { action_final = action_tmp; } } switch (action_final) { case ACTION_FORWARD_DATA: if (_stream->forward_bytes > 0) { evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes); } else { evbuffer_add_buffer(outbuf, inbuf); } break; case ACTION_DROP_DATA: if (_stream->drop_bytes > 0) { drain_size = _stream->drop_bytes; } else { drain_size = evbuffer_get_length(inbuf); } evbuffer_drain(inbuf, drain_size); case ACTION_DEFER_DATA: if (_stream->defere_bytes > 0) { bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0); } break; default: assert(0); break; } if (evbuffer_get_length(inbuf) != 0) { bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); } if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) { bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); bufferevent_disable(bev, EV_READ); } return; } /* * Callback for write events on the up- and downstream connection bufferevents. * Called when either all data from the output evbuffer has been written, * or if the outbuf is only half full again after having been full. */ static void tfe_stream_writecb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __DIR(_stream, bev); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); struct evbuffer * outbuf = bufferevent_get_output(bev); if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ)) { /* data source temporarily disabled; * re-enable and reset watermark to 0. */ bufferevent_setwatermark(bev, EV_WRITE, 0, 0); bufferevent_enable(peer_conn->bev, EV_READ); } } /* * Callback for meta events on the up- and downstream connection bufferevents. * Called when EOF has been reached, a connection has been made, and on errors. */ static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __DIR(_stream, bev); struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); const struct tfe_plugin * plugins = _stream->thread_ref->modules; struct plugin_ctx * plug_ctx = NULL; int plug_num = _stream->thread_ref->nr_modules, i = 0; enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED; if (events & BEV_EVENT_ERROR) { this_conn->closed = 1; reason = REASON_ERROR; goto call_plugin_close; } if (events & BEV_EVENT_EOF) { //generate a 0 size read callback to notify plugins. tfe_stream_readcb(bev, arg); this_conn->closed = 1; } if (peer_conn->closed == 1 && this_conn->closed == 1) { reason = REASON_PASSIVE_CLOSED; goto call_plugin_close; } return; call_plugin_close: for (i = 0; i < plug_num; i++) { _stream->calling_idx = i; plug_ctx = _stream->plug_ctx + i; plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme)); } tfe_stream_destory(_stream); return; } static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, evutil_socket_t fd) { struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); struct event_base * __ev_base = stream->thread_ref->evbase; __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS); __conn_private->fd = fd; if (!__conn_private->bev) { TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd); goto __errout; } bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream); bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); return __conn_private; __errout: if (__conn_private != NULL) free(__conn_private); return NULL; } static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, struct bufferevent * bev) { struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); __conn_private->bev = bev; __conn_private->fd = bufferevent_getfd(bev); bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream); bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); return __conn_private; } evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) { evutil_socket_t __to_release_fd = conn->fd; conn->fd = 0; return __to_release_fd; } static void __conn_private_destory(struct tfe_conn_private * conn) { return; } void ssl_downstream_create_on_success(future_result_t * result, void * user) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; struct ssl_stream * downstream = ssl_downstream_create_result_release_stream(result); struct bufferevent * bev = ssl_downstream_create_result_release_bev(result); _stream->conn_downstream = __conn_private_create(_stream, bev); _stream->ssl_downstream = downstream; future_destroy(_stream->future_downstream_create); _stream->future_downstream_create = NULL; _stream->defer_fd_downstream = 0; return; } void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user) { return; } void ssl_upstream_create_on_success(future_result_t * result, void * user) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; struct event_base * ev_base = _stream->thread_ref->evbase; struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result); struct bufferevent * bev = ssl_upstream_create_result_release_bev(result); assert(upstream != NULL && bev != NULL); /* Create connection ctx by bev */ _stream->conn_upstream = __conn_private_create(_stream, bev); _stream->ssl_upstream = upstream; future_destroy(_stream->future_upstream_create); _stream->future_upstream_create = NULL; _stream->defer_fd_upstream = 0; /* Next, create downstream */ _stream->future_downstream_create = future_create(ssl_downstream_create_on_success, ssl_downstream_create_on_fail, _stream); ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr, _stream->ssl_upstream, _stream->defer_fd_downstream, /* KEYRING ID */ 0, ev_base); } void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) { assert(0); } struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) { struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); _stream->thread_ref = thread_ctx; _stream->proxy_ref = pxy; return (struct tfe_stream *) &_stream->head; } void tfe_stream_destory(struct tfe_stream_private * stream) { struct tfe_thread_ctx * thread = stream->thread_ref; struct tfe_proxy * proxy = stream->proxy_ref; struct event_base * ev_base = thread->evbase; if (__IS_SSL(stream) && stream->ssl_upstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd); } if (__IS_SSL(stream) && stream->ssl_downstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd); } if (stream->conn_upstream) { __conn_private_destory(stream->conn_upstream); } if (stream->conn_downstream) { __conn_private_destory(stream->conn_downstream); } if (stream->defer_fd_downstream) { evutil_closesocket(stream->defer_fd_downstream); } if (stream->defer_fd_upstream) { evutil_closesocket(stream->defer_fd_upstream); } if (stream->future_downstream_create) { future_destroy(stream->future_downstream_create); } if (stream->future_upstream_create) { future_destroy(stream->future_upstream_create); } free(stream); thread->load--; } void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); struct event_base * ev_base = _stream->thread_ref->evbase; if (session_type == SESSION_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create(_stream, fd_downstream); _stream->conn_upstream = __conn_private_create(_stream, fd_upstream); assert(_stream->conn_downstream != NULL); assert(_stream->conn_upstream != NULL); } if (session_type == SESSION_PROTO_SSL) { _stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler; _stream->future_upstream_create = future_create( ssl_upstream_create_on_success, ssl_upstream_create_on_fail, (void *) _stream); /* Defer setup conn_downstream & conn_upstream in async callbacks. */ ssl_async_upstream_create(_stream->future_upstream_create, _stream->ssl_mgr, fd_upstream, fd_downstream, ev_base); _stream->defer_fd_downstream = fd_downstream; _stream->defer_fd_upstream = fd_upstream; } _stream->session_type = session_type; }