#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT #define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024) #endif /* forward declaration of libevent callbacks */ static void __stream_bev_readcb(struct bufferevent *, void *); static void __stream_bev_writecb(struct bufferevent *, void *); static void __stream_bev_eventcb(struct bufferevent *, short, void *); /* ==================================================================================================================== * HELPER FUNCTIONS * ===================================================================================================================*/ static inline void __stream_log_event(struct tfe_stream_private * _stream, enum tfe_stream_event_log_type type, enum tfe_conn_dir dir, unsigned int error, const char * str_error) { unsigned int log_offset = _stream->nr_log_event; assert(log_offset < STREAM_EVENT_LOG_MAX); _stream->log_event[log_offset].type = type; _stream->log_event[log_offset].dir = dir; _stream->log_event[log_offset].error = error; _stream->log_event[log_offset].str_error = str_error; _stream->nr_log_event++; } static const char * __str_stream_log_type(enum tfe_stream_event_log_type type) { static const char * map_event_log_type[] = { [EVENT_LOG_CLOSE_BY_FD_PEER] = "FD/PEER", [EVENT_LOG_CLOSE_BY_FD_EOF] = "FD/EOF", [EVENT_LOG_CLOSE_BY_FD_ERROR] = "FD/ERR", [EVENT_LOG_CLOSE_BY_SSL_ERROR] = "SSL/ERR" }; return map_event_log_type[type]; } static inline struct tfe_stream_private * to_stream_private(const struct tfe_stream * stream) { return container_of(stream, struct tfe_stream_private, head); } static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); } static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); } static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) { if (_stream->conn_downstream && bev == _stream->conn_downstream->bev) { return CONN_DIR_DOWNSTREAM; } if (_stream->conn_upstream && bev == _stream->conn_upstream->bev) { return CONN_DIR_UPSTREAM; } assert(0); return CONN_DIR_DOWNSTREAM; } static inline const char * __str_dir(enum tfe_conn_dir dir) { return dir == CONN_DIR_DOWNSTREAM ? "DOWNSTREAM" : "UPSTREAM"; } static inline bool __is_ssl(struct tfe_stream_private * _stream) { return (_stream->session_type == STREAM_PROTO_SSL); } static void call_plugin_close(struct tfe_stream_private * _stream) { unsigned int plugin_id_iter = 0; unsigned int plugin_id = 0; for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter); p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter)) { _stream->calling_idx = plugin_id; struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id]; /* TODO: do not use pme to determinate we call on_open or not ever. */ if (p_info_iter->on_close && plug_ctx->pme != NULL) { p_info_iter->on_close(&_stream->head, _stream->thread_ref->thread_id, REASON_PASSIVE_CLOSED, &(plug_ctx->pme)); } plugin_id++; } } /* ==================================================================================================================== * INTERFACE * ===================================================================================================================*/ void tfe_stream_detach(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = to_stream_private(stream); int plug_id = _stream->calling_idx; _stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED; } int tfe_stream_preempt(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = to_stream_private(stream); int plug_id = _stream->calling_idx; for (unsigned int i = 0; i < _stream->nr_plugin_ctxs; i++) { if (_stream->plugin_ctxs[i].state == PLUG_STATE_PREEPTION) { return -1; } } _stream->plugin_ctxs[plug_id].state = PLUG_STATE_PREEPTION; return 0; } void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) { struct tfe_stream_private * _stream = to_stream_private(stream); assert(_stream != NULL); assert(_stream->conn_upstream != NULL && _stream->conn_downstream != NULL); assert(_stream->conn_upstream->bev != NULL); assert(_stream->conn_downstream->bev != NULL); /* stream cannot be suspended twice or more */ assert(!_stream->is_suspended); _stream->is_suspended = true; _stream->suspended_by = by; /* disable all events */ int ret = 0; ret = bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); assert(ret == 0); ret = bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); assert(ret == 0); (void) ret; } void tfe_stream_resume(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = to_stream_private(stream); assert(_stream->is_suspended); bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); if (_stream->suspended_by == CONN_DIR_DOWNSTREAM) { bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); } else { bufferevent_trigger(_stream->conn_upstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); } _stream->is_suspended = false; _stream->suspended_by = CONN_DIR_DOWNSTREAM; } struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) { struct tfe_stream_private * _stream = to_stream_private(stream); struct tfe_stream_write_ctx ** ref_write_ctx = NULL; struct tfe_conn_private * this_conn = NULL; struct tfe_conn_private * peer_conn = NULL; if (dir == CONN_DIR_DOWNSTREAM) { this_conn = _stream->conn_downstream; peer_conn = _stream->conn_upstream; ref_write_ctx = &_stream->w_ctx_downstream; } else { this_conn = _stream->conn_upstream; peer_conn = _stream->conn_downstream; ref_write_ctx = &_stream->w_ctx_upstream; } if (*ref_write_ctx != NULL) return NULL; this_conn->on_writing = 1; *ref_write_ctx = ALLOC(struct tfe_stream_write_ctx, 1); (*ref_write_ctx)->_stream = _stream; (*ref_write_ctx)->dir = dir; bufferevent_disable(peer_conn->bev, EV_READ); return *ref_write_ctx; } int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { if (w_ctx->_stream == NULL) { return -EPIPE; } struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); int ret = 0; if (this_conn != NULL) { ret = bufferevent_write(this_conn->bev, data, size); } else { return -EPIPE; } return ret; } void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) { struct tfe_conn_private * this_conn; struct tfe_conn_private * peer_conn; /* The connection terminated before this function call */ if (w_ctx->_stream == NULL) goto __out; this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); if (this_conn != NULL) { this_conn->on_writing = 0; bufferevent_enable(peer_conn->bev, EV_READ); } if (w_ctx->dir == CONN_DIR_DOWNSTREAM) { assert(w_ctx->_stream->w_ctx_downstream == w_ctx); w_ctx->_stream->w_ctx_downstream = NULL; } else { assert(w_ctx->_stream->w_ctx_upstream == w_ctx); w_ctx->_stream->w_ctx_upstream = NULL; } __out: free(w_ctx); } int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) { int ret = 0; struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir); ret = tfe_stream_write_frag(wctx, data, size); tfe_stream_write_frag_end(wctx); return ret; } static int conn_private_ratelimit_setup(struct tfe_conn_private * conn, struct tfe_proxy_rate_limit_options * opt) { conn->ratelimit_bucket = ev_token_bucket_cfg_new(opt->read_rate, opt->read_burst, opt->write_rate, opt->write_burst, NULL); if(unlikely(conn->ratelimit_bucket == NULL)) { TFE_LOG_ERROR(g_default_logger, "Failed at setting ratelimit bucket, " "read_rate = %u, read_burst = %u, write_rate = %u, write_burst = %u", opt->read_rate, opt->read_burst, opt->write_rate, opt->write_burst); return -1; } bufferevent_set_rate_limit(conn->bev, conn->ratelimit_bucket); return 0; } int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type, void * value, size_t size) { struct tfe_stream_private * _stream = to_stream_private(stream); #define __SAVE_PARAM(what) do { \ if(size != sizeof(__typeof__(what))) { return -EINVAL; } \ else { what = *(__typeof(what) *)value; } } while(0) \ switch (type) { case ACTION_OPT_FOWARD_BYTES: __SAVE_PARAM(_stream->forward_bytes); break; case ACTION_OPT_DEFER_BYTES: __SAVE_PARAM(_stream->defer_bytes); break; case ACTION_OPT_DEFER_TIME_TV: __SAVE_PARAM(_stream->defer_timeval); break; case ACTION_OPT_DROP_BYTES: __SAVE_PARAM(_stream->drop_bytes); break; } #undef __SAVE_PARAM return 0; } /* ==================================================================================================================== * CONNECTION STRUCTURE AND OPERATION FUCTIONS * ===================================================================================================================*/ evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) { evutil_socket_t __to_release_fd = conn->fd; conn->fd = 0; return __to_release_fd; } static void __conn_private_destory(struct tfe_conn_private * conn) { bufferevent_disable(conn->bev, EV_READ | EV_WRITE); if(conn->ratelimit_bucket) { ev_token_bucket_cfg_free(conn->ratelimit_bucket); conn->ratelimit_bucket = NULL; } bufferevent_free(conn->bev); if (conn->fd > 0) { evutil_closesocket(conn->fd); } free(conn); TFE_PROXY_STAT_INCREASE(STAT_FD_INSTANT_CLOSE, 1); } static void __conn_private_destory_with_ssl(struct event_base * ev_base, struct tfe_conn_private * conn, struct ssl_stream * ssl_stream) { if (ssl_stream == NULL) return __conn_private_destory(conn); evutil_socket_t __to_closed_fd = __conn_private_release_fd(conn); ssl_stream_free_and_close_fd(ssl_stream, ev_base, __to_closed_fd); return __conn_private_destory(conn); } static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev)); struct evbuffer * __input_buffer = bufferevent_get_input(bev); if (peer_conn == NULL) { evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer)); return; } struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); evbuffer_add_buffer(__output_buffer, __input_buffer); } static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; } struct evbuffer * __output_buffer = bufferevent_get_output(bev); assert(__output_buffer != NULL); if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0) { __conn_private_destory(*ref_this_conn); *ref_this_conn = NULL; } if (*ref_peer_conn == NULL && *ref_this_conn == NULL) { call_plugin_close(_stream); tfe_stream_destory(_stream); } return; } static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short events, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); const char * str_conn_dir = __str_dir(conn_dir); if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) { if (evbuffer_get_length(bufferevent_get_input(bev))) { __stream_bev_passthrough_readcb(bev, arg); } if(events & BEV_EVENT_ERROR) { unsigned long err; while ((err = (bufferevent_get_openssl_error(bev)))) { const char *msg = (const char*)ERR_reason_error_string(err); const char *lib = (const char*)ERR_lib_error_string(err); const char *func = (const char*)ERR_func_error_string(err); TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", _stream->str_stream_addr, str_conn_dir, err, lib, func, msg); } if (errno) { TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s", _stream->str_stream_addr, str_conn_dir, errno, strerror(errno)); } } goto __close_connection; } return; __close_connection: if (*ref_peer_conn != NULL) { struct bufferevent * __peer_bev = (*ref_peer_conn)->bev; struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev); if (evbuffer_get_length(__peer_output_buffer) == 0) { __conn_private_destory(*ref_peer_conn); *ref_peer_conn = NULL; } } if (*ref_this_conn != NULL) { __conn_private_destory(*ref_this_conn); *ref_this_conn = NULL; } if (*ref_this_conn == NULL && *ref_peer_conn == NULL) { call_plugin_close(_stream); tfe_stream_destory(_stream); } return; } /* * Callback for read events on the up- and downstream connection bufferevents. * Called when there is data ready in the input evbuffer. */ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __bev_dir(_stream, bev); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); /* Peer connection is terminated, drain all data. * This connection will be destoryed in __event_cb */ struct evbuffer * inbuf = bufferevent_get_input(bev); if (peer_conn == NULL) { evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); return; } struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); assert(inbuf != NULL && outbuf != NULL); enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA; enum tfe_stream_action action_final = ACTION_FORWARD_DATA; size_t drain_size = 0; size_t contigous_len = evbuffer_get_length(inbuf); unsigned char * contiguous_data = evbuffer_pullup(inbuf, contigous_len); _stream->defer_bytes = 0; _stream->drop_bytes = 0; _stream->forward_bytes = 0; unsigned int plugin_id_iter = 0; unsigned int plugin_id = 0; for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter); p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter), plugin_id++) { _stream->calling_idx = plugin_id; struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id]; if (p_info_iter->on_open != NULL && plug_ctx->is_plugin_opened == 0) { p_info_iter->on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme)); plug_ctx->is_plugin_opened = 1; } if (plug_ctx->state == PLUG_STATE_DETACHED) continue; if (p_info_iter->on_data != NULL) { action_tmp = p_info_iter->on_data(&_stream->head, _stream->thread_ref->thread_id, dir, contiguous_data, contigous_len, &(plug_ctx->pme)); } if (plug_ctx->state == PLUG_STATE_PREEPTION) action_final = action_tmp; } switch (action_final) { case ACTION_FORWARD_DATA: if (_stream->forward_bytes > 0) { evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes); } else { evbuffer_add_buffer(outbuf, inbuf); } break; case ACTION_DROP_DATA: if (_stream->drop_bytes > 0) { drain_size = _stream->drop_bytes; } else { drain_size = evbuffer_get_length(inbuf); } evbuffer_drain(inbuf, drain_size); break; case ACTION_DEFER_DATA: if (_stream->defer_bytes > 0) { bufferevent_setwatermark(bev, EV_WRITE, _stream->defer_bytes, 0); } break; default: assert(0); break; } if(_stream->need_to_be_kill) { const static struct linger sl{.l_onoff = 1, .l_linger = 0}; /* Set SO_LINGER, the fd will be closed by RST */ setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); /* Destroy STREAM */ TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS_KILL, 1); return tfe_stream_destory(_stream); } #if 0 if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) { bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); bufferevent_disable(bev, EV_READ); } #endif } /* * Callback for write events on the up- and downstream connection bufferevents. * Called when either all data from the output evbuffer has been written, * or if the outbuf is only half full again after having been full. */ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct event_base * ev_base = bufferevent_get_base(bev); struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; struct ssl_stream ** ref_this_ssl_stream{}; enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); if (conn_dir == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_this_ssl_stream = &_stream->ssl_upstream; ref_peer_conn = &_stream->conn_downstream; } if (conn_dir == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_this_ssl_stream = &_stream->ssl_downstream; ref_peer_conn = &_stream->conn_upstream; } struct evbuffer * __output_buffer = bufferevent_get_output(bev); assert(__output_buffer != NULL); if (*ref_peer_conn == NULL /* Peer connection is closed */ && (*ref_this_conn)->on_writing == 0 /* No body is prepare to write data, eg. No body call stream_write */ && evbuffer_get_length(__output_buffer) == 0) /* Nothing is in send queue */ { __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, conn_dir, 0, NULL); *ref_this_conn = NULL; *ref_this_ssl_stream = NULL; } if (*ref_peer_conn == NULL && *ref_this_conn == NULL) { call_plugin_close(_stream); tfe_stream_destory(_stream); } } /* * Callback for meta events on the up- and downstream connection bufferevents. * Called when EOF has been reached, a connection has been made, and on errors. */ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct event_base * ev_base = bufferevent_get_base(bev); struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; struct ssl_stream ** ref_this_ssl_stream{}; struct ssl_stream ** ref_peer_ssl_stream{}; struct tfe_stream_write_ctx ** ref_this_write_ctx{}; enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); const char * str_conn_dir = __str_dir(conn_dir); enum tfe_conn_dir peer_conn_dir{}; if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; ref_this_ssl_stream = &_stream->ssl_upstream; ref_peer_ssl_stream = &_stream->ssl_downstream; ref_this_write_ctx = &_stream->w_ctx_upstream; peer_conn_dir = CONN_DIR_DOWNSTREAM; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; ref_this_ssl_stream = &_stream->ssl_downstream; ref_peer_ssl_stream = &_stream->ssl_upstream; ref_this_write_ctx = &_stream->w_ctx_downstream; peer_conn_dir = CONN_DIR_UPSTREAM; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) { if (evbuffer_get_length(bufferevent_get_input(bev))) { __stream_bev_readcb(bev, arg); } if(events & BEV_EVENT_ERROR) { unsigned long err; while ((err = (bufferevent_get_openssl_error(bev)))) { const char *msg = (const char*)ERR_reason_error_string(err); const char *lib = (const char*)ERR_lib_error_string(err); const char *func = (const char*)ERR_func_error_string(err); TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", _stream->str_stream_addr, str_conn_dir, err, lib, func, msg); } if (errno) { TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s", _stream->str_stream_addr, str_conn_dir, errno, strerror(errno)); } } if(events & BEV_EVENT_ERROR) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL); if(events & BEV_EVENT_EOF) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL); goto __close_connection; } return; __close_connection: if (*ref_peer_conn != NULL) { struct bufferevent * __peer_bev = (*ref_peer_conn)->bev; struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev); if (evbuffer_get_length(__peer_output_buffer) == 0 && (*ref_peer_conn)->on_writing == 0) { __conn_private_destory_with_ssl(ev_base, *ref_peer_conn, *ref_peer_ssl_stream); *ref_peer_conn = NULL; *ref_peer_ssl_stream = NULL; __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, peer_conn_dir, 0, NULL); } } if (*ref_this_conn != NULL) { /* There is a frag writter setted, need to clear the reference of stream in the writter to indicate the connection is closed */ if (*ref_this_write_ctx != NULL) { (*ref_this_write_ctx)->_stream = NULL; } __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); *ref_this_conn = NULL; *ref_this_ssl_stream = NULL; } if (*ref_this_conn == NULL && *ref_peer_conn == NULL) { goto __call_plugin_close; } return; __call_plugin_close: call_plugin_close(_stream); tfe_stream_destory(_stream); } static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev) { struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); __conn_private->bev = bev; __conn_private->fd = bufferevent_getfd(bev); bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream); bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); struct tfe_proxy * proxy_ref = stream->proxy_ref; if(unlikely(proxy_ref->en_rate_limit)) { conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options); } return __conn_private; } static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd) { struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); struct event_base * __ev_base = stream->thread_ref->evbase; struct tfe_proxy * proxy_ref = stream->proxy_ref; __conn_private->_stream_ref = stream; __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE ); __conn_private->fd = fd; bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); if (!__conn_private->bev) { TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd); goto __errout; } if (stream->passthough) { bufferevent_setcb(__conn_private->bev, __stream_bev_passthrough_readcb, __stream_bev_passthrough_writecb, __stream_bev_passthrough_eventcb, stream); } else { bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream); } if(proxy_ref->en_rate_limit) { conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options); } return __conn_private; __errout: if (__conn_private != NULL) free(__conn_private); return NULL; } void __conn_private_enable(struct tfe_conn_private * conn_private) { assert(conn_private != NULL && conn_private->bev != NULL); bufferevent_enable(conn_private->bev, EV_READ | EV_WRITE); } void ssl_downstream_create_on_success(future_result_t * result, void * user) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; struct ssl_stream * downstream = ssl_downstream_create_result_release_stream(result); struct bufferevent * bev = ssl_downstream_create_result_release_bev(result); _stream->conn_downstream = __conn_private_create_by_bev(_stream, bev); _stream->ssl_downstream = downstream; future_destroy(_stream->future_downstream_create); _stream->future_downstream_create = NULL; _stream->defer_fd_downstream = 0; assert(_stream->conn_downstream != NULL && _stream->conn_upstream != NULL); __conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_upstream); return; } void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL); TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL downstream, close the connection : %s. ", _stream->str_stream_addr, what); __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_DOWNSTREAM, 0, NULL); tfe_stream_destory(_stream); } void ssl_upstream_create_on_success(future_result_t * result, void * user) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result); struct bufferevent * bev = ssl_upstream_create_result_release_bev(result); assert(upstream != NULL && bev != NULL); /* Create connection ctx by bev */ _stream->conn_upstream = __conn_private_create_by_bev(_stream, bev); _stream->ssl_upstream = upstream; assert(_stream->conn_upstream != NULL); assert(_stream->ssl_upstream != NULL); future_destroy(_stream->future_upstream_create); _stream->future_upstream_create = NULL; _stream->defer_fd_upstream = 0; /* Next, create downstream */ _stream->future_downstream_create = future_create("ssl_down", ssl_downstream_create_on_success, ssl_downstream_create_on_fail, _stream); ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr, _stream->ssl_upstream, _stream->defer_fd_downstream, _stream->keyring_id, _stream->thread_ref->thread_id); } void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL); TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL upstream, close the connection : %s. ", _stream->str_stream_addr, what); __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_UPSTREAM, 0, NULL); tfe_stream_destory(_stream); } struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) { struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); TFE_PROXY_STAT_INCREASE(STAT_STREAM_OPEN, 1); _stream->thread_ref = thread_ctx; _stream->proxy_ref = pxy; _stream->stream_logger = pxy->logger; unsigned int total_plugin_count = tfe_plugin_total_counts(); _stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count); return &_stream->head; } void __stream_access_log_write(struct tfe_stream_private * stream) { const char * str_passthrough = stream->passthough ? "PASSTHROUGH" : "-"; const char * str_kill = stream->need_to_be_kill ? "KILL" : "-"; char str_log_event[TFE_STRING_MAX] = ""; unsigned int offset = 0; /* Write event abstract log. It is used to determine which connection is broken */ for(unsigned int i = 0; i < stream->nr_log_event; i++) { struct tfe_stream_event_log * ev_log = &stream->log_event[i]; const char * str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "DOWN" : "UP"; offset += snprintf(str_log_event + offset, sizeof(str_log_event) - offset, "%s/%s ", __str_stream_log_type(ev_log->type), str_dir); } MESA_handle_runtime_log(stream->stream_logger, RLOG_LV_INFO, "access", "%d %d %d %s %s %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id, stream->str_stream_addr, str_passthrough, str_kill, str_log_event); } static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}}; void __ev_log_to_stat_map_init() __attribute__((constructor, used)); void __ev_log_to_stat_map_init() { ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_DOWNSTREAM] = -1; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_EOF; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_UPSTREAM] = -1; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_EOF; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR; } void __stream_close_stat(struct tfe_stream_private * stream) { TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS, 1); if(stream->nr_log_event > 0) { struct tfe_stream_event_log * ev_log = &stream->log_event[0]; assert(ev_log_to_stat_map[ev_log->type][ev_log->dir] >= 0); TFE_PROXY_STAT_INCREASE(ev_log_to_stat_map[ev_log->type][ev_log->dir], 1); } } void tfe_stream_destory(struct tfe_stream_private * stream) { struct tfe_thread_ctx * thread = stream->thread_ref; struct event_base * ev_base = thread->evbase; __stream_access_log_write(stream); __stream_close_stat(stream); if (stream->head.addr) { FREE(&(stream->head.addr)); } if (stream->str_stream_addr) { FREE(&(stream->str_stream_addr)); } if (__is_ssl(stream) && stream->ssl_upstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd); } if (__is_ssl(stream) && stream->ssl_downstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_downstream); ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd); } if (stream->conn_upstream) { __conn_private_destory(stream->conn_upstream); } if (stream->conn_downstream) { __conn_private_destory(stream->conn_downstream); } if (stream->defer_fd_downstream) { evutil_closesocket(stream->defer_fd_downstream); } if (stream->defer_fd_upstream) { evutil_closesocket(stream->defer_fd_upstream); } if (stream->future_downstream_create) { future_destroy(stream->future_downstream_create); } if (stream->future_upstream_create) { future_destroy(stream->future_upstream_create); } FREE(&(stream->plugin_ctxs)); tfe_proxy_thread_ctx_release(stream->thread_ref); stream->proxy_ref = NULL; FREE(&(stream)); } int __fd_ttl_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, int ttl) { struct sockaddr_storage sk_storage; socklen_t sk_storage_len = sizeof(sk_storage); if (getsockname(fd, (struct sockaddr *) &sk_storage, &sk_storage_len) < 0) { TFE_STREAM_LOG_ERROR(_stream, "getsockname(fd = %d) failed: %s", fd, strerror(errno)); return -1; } /* For IPv4, set TTL */ unsigned int __ttl = (unsigned int) ttl; const char * __str_family = NULL; int ret = 0; if (sk_storage.ss_family == AF_INET) { ret = setsockopt(fd, IPPROTO_IP, IP_TTL, &__ttl, sizeof(__ttl)); __str_family = "AF_INET"; } else if (sk_storage.ss_family == AF_INET6) { ret = setsockopt(fd, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &__ttl, sizeof(__ttl)); __str_family = "AF_INET6"; } else { assert(0); } if (ret < 0) { TFE_STREAM_LOG_ERROR(_stream, "setsockopt(ttl = %u, fd = %d, family = %s) failed: %s", __ttl, fd, __str_family, strerror(errno)); return -2; } return 0; } void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, tfe_conn_dir dir) { struct tfe_stream * stream = &_stream->head; struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options; /* Make it non-blocking */ evutil_make_socket_nonblocking(fd); /* Recv Buffer */ if (tcp_options->sz_rcv_buffer >= 0) { if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void *) &tcp_options->sz_rcv_buffer, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_RCVBUF, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->sz_rcv_buffer, strerror(errno)); } } /* Send Buffer */ if (tcp_options->sz_snd_buffer >= 0) { if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void *) &tcp_options->sz_snd_buffer, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_SNDBUF, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->sz_snd_buffer, strerror(errno)); } } /* Keep-alive */ if (tcp_options->so_keepalive > 0) { if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &tcp_options->so_keepalive, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_KEEPALIVE, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->so_keepalive, strerror(errno)); } } if (tcp_options->tcp_keepcnt > 0) { if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &tcp_options->tcp_keepcnt, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPCNT, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->tcp_keepcnt, strerror(errno)); } } if (tcp_options->tcp_keepintvl > 0) { if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &tcp_options->tcp_keepintvl, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPINTVL, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->tcp_keepintvl, strerror(errno)); } } if (tcp_options->tcp_keepidle > 0) { if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &tcp_options->tcp_keepidle, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPIDLE, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->tcp_keepidle, strerror(errno)); } } if (tcp_options->tcp_user_timeout > 0) { if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (const void *) &tcp_options->tcp_user_timeout, sizeof(int)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_USER_TIMEOUT, %d) failed, ignored: %s", stream->str_stream_info, tcp_options->tcp_user_timeout, strerror(errno)); } } int __ttl = (dir == CONN_DIR_UPSTREAM) ? tcp_options->tcp_ttl_upstream : tcp_options->tcp_ttl_downstream; if (__ttl > 0 && __fd_ttl_option_setup(_stream, fd, __ttl) < 0) { TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", stream->str_stream_info, __ttl, fd); } } int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); _stream->defer_fd_downstream = fd_downstream; _stream->defer_fd_upstream = fd_upstream; _stream->log_fd_downstream = fd_downstream; _stream->log_fd_upstream = fd_upstream; __stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM); __stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM); _stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM); if (unlikely(_stream->head.addr == NULL)) { TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", fd_downstream, fd_upstream); goto __errout; } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); stream->str_stream_info = _stream->str_stream_addr; if (_stream->session_type == STREAM_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); _stream->conn_upstream = __conn_private_create_by_fd(_stream, fd_upstream); /* Defer FD has been transfer to conn_downstream/conn_upstream */ _stream->defer_fd_downstream = 0; _stream->defer_fd_upstream = 0; if (unlikely(_stream->conn_downstream == NULL || _stream->conn_upstream == NULL)) { goto __errout; } assert(_stream->conn_downstream != NULL); assert(_stream->conn_upstream != NULL); __conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_upstream); TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_PLAIN, 1); } if (_stream->session_type == STREAM_PROTO_SSL) { _stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler; _stream->future_upstream_create = future_create("ssl_up", ssl_upstream_create_on_success, ssl_upstream_create_on_fail, (void *) _stream); /* Defer setup conn_downstream & conn_upstream in async callbacks. */ ssl_async_upstream_create(_stream->future_upstream_create, _stream->ssl_mgr, fd_upstream, fd_downstream, _stream->thread_ref->thread_id); TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1); } return 0; __errout: return -1; } int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); if (opt == TFE_STREAM_OPT_SESSION_TYPE) { assert(sz_arg == sizeof(enum tfe_stream_proto)); _stream->session_type = *(enum tfe_stream_proto *) arg; } else if (opt == TFE_STREAM_OPT_PASSTHROUGH) { assert(sz_arg == sizeof(bool)); _stream->passthough = *(bool *) arg; } else if (opt == TFE_STREAM_OPT_KEYRING_ID) { assert(sz_arg == sizeof(unsigned int)); _stream->keyring_id = *(unsigned int *) arg; } return 0; } void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, const char * fmt, ...) { va_list arg_ptr; va_start(arg_ptr, fmt); struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); /* Format input content */ char * __tmp_buffer; vasprintf(&__tmp_buffer, fmt, arg_ptr); /* Log content with stream tag */ MESA_handle_runtime_log(_stream->stream_logger, level, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer); free(__tmp_buffer); } int tfe_stream_shutdown(const struct tfe_stream * stream) { return 0; } int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir) { return 0; } void tfe_stream_kill(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = to_stream_private(stream); _stream->need_to_be_kill = true; }