diff --git a/common/include/tfe_types.h b/common/include/tfe_types.h index 1f2f09f..8921e28 100644 --- a/common/include/tfe_types.h +++ b/common/include/tfe_types.h @@ -11,7 +11,8 @@ enum tfe_conn_dir { CONN_DIR_DOWNSTREAM = 0, //From client to proxy, aka client-side. - CONN_DIR_UPSTREAM //From proxy to server, aka server-side. + CONN_DIR_UPSTREAM, //From proxy to server, aka server-side. + __CONN_DIR_MAX }; /* network-order */ diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index e0f94be..fd5e959 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -50,6 +50,23 @@ struct tfe_conn_private uint8_t on_writing; }; +enum tfe_stream_event_log_type +{ + EVENT_LOG_CLOSE_BY_FD_PEER, + EVENT_LOG_CLOSE_BY_FD_EOF, + EVENT_LOG_CLOSE_BY_FD_ERROR, + EVENT_LOG_CLOSE_BY_SSL_ERROR, + __EVENT_LOG_CLOSE_MAX +}; + +struct tfe_stream_event_log +{ + enum tfe_stream_event_log_type type; + enum tfe_conn_dir dir; + unsigned int error; + const char * str_error; +}; + struct tfe_stream_private { struct tfe_stream head; @@ -103,12 +120,20 @@ struct tfe_stream_private bool is_suspended; enum tfe_conn_dir suspended_by; + /* KILL */ + bool need_to_be_kill; + /* KEYRING-ID */ unsigned keyring_id; /* ONLY FOR LOG */ evutil_socket_t log_fd_downstream; evutil_socket_t log_fd_upstream; + + /* EVENT LOG */ +#define STREAM_EVENT_LOG_MAX 8 + struct tfe_stream_event_log log_event[STREAM_EVENT_LOG_MAX]; + unsigned int nr_log_event; }; static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index b5403fd..f37ef8a 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -14,16 +14,19 @@ enum TFE_STAT_FIELD /* FDs */ STAT_FD_OPEN_BY_KNI_ACCEPT, STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, - STAT_FD_CLOSE_BY_EVENT_WRITE, - STAT_FD_CLOSE_BY_EVENT_EOF, - STAT_FD_CLOSE_BY_EVENT_ERROR, /* FDs */ STAT_FD_INSTANT_CLOSE, STAT_FD_DEFER_CLOSE_IN_QUEUE, STAT_FD_DEFER_CLOSE_SUCCESS, /* Stream */ - STAT_STREAM_CREATE, - STAT_STREAM_DESTROY, + STAT_STREAM_OPEN, + STAT_STREAM_CLS, + STAT_STREAM_CLS_DOWN_EOF, + STAT_STREAM_CLS_UP_EOF, + STAT_STREAM_CLS_DOWN_ERR, + STAT_STREAM_CLS_UP_ERR, + STAT_STREAM_CLS_KILL, + /* Stream Protocol */ STAT_STREAM_TCP_PLAIN, STAT_STREAM_TCP_SSL, diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index c5beda0..2ba022f 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -79,17 +79,19 @@ struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx) for (unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) { struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid]; - min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id; - min_load = min_load > thread_ctx->load ? thread_ctx->load : min_load; + unsigned int thread_load = ATOMIC_READ(&thread_ctx->load); + + min_thread_id = min_load > thread_load ? tid : min_thread_id; + min_load = min_load > thread_load ? thread_load : min_load; } - ctx->work_threads[min_thread_id]->load++; + ATOMIC_INC(&ctx->work_threads[min_thread_id]->load); return ctx->work_threads[min_thread_id]; } void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx) { - thread_ctx->load--; + ATOMIC_DEC(&thread_ctx->load); } int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para) @@ -274,18 +276,20 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) static const char * __str_stat_spec_map[] = { [STAT_SIGPIPE] = "SIGPIPE", - [STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdOpenKNI", - [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdCloseKNI", - [STAT_FD_CLOSE_BY_EVENT_WRITE] = "FdCloseUser", - [STAT_FD_CLOSE_BY_EVENT_EOF] = "FdCloseEOF", - [STAT_FD_CLOSE_BY_EVENT_ERROR] = "FdCloseError", - [STAT_FD_INSTANT_CLOSE] = "FdCloseInstant", - [STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdCloseDeferInQ", - [STAT_FD_DEFER_CLOSE_SUCCESS] = "FdCloseDeferSuc", - [STAT_STREAM_CREATE] = "StreamCreate", - [STAT_STREAM_DESTROY] = "StreamDestroy", - [STAT_STREAM_TCP_PLAIN] = "StreamTCPPlain", - [STAT_STREAM_TCP_SSL] = "StreamTCPSSL", + [STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdRcv", + [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdRcvFail", + [STAT_FD_INSTANT_CLOSE] = "FdClsInstant", + [STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdClsDefInQ", + [STAT_FD_DEFER_CLOSE_SUCCESS] = "FdClsDefSuc", + [STAT_STREAM_OPEN] = "StrOpen", + [STAT_STREAM_CLS] = "StrCls", + [STAT_STREAM_CLS_DOWN_EOF] = "StrDownEOF", + [STAT_STREAM_CLS_UP_EOF] = "StrUpEOF", + [STAT_STREAM_CLS_DOWN_ERR] = "StrDownErr", + [STAT_STREAM_CLS_UP_ERR] = "StrUpErr", + [STAT_STREAM_CLS_KILL] = "StrKill", + [STAT_STREAM_TCP_PLAIN] = "Plain", + [STAT_STREAM_TCP_SSL] = "SSL", [TFE_STAT_MAX] = NULL }; @@ -334,6 +338,10 @@ int main(int argc, char * argv[]) { const char * main_profile = "./conf/tfe/tfe.conf"; const char * future_profile= "./conf/tfe/future.conf"; + + /* adds locking, only required if accessed from separate threads */ + evthread_use_pthreads(); + unsigned int __log_level = RLOG_LV_INFO; MESA_load_profile_uint_def(main_profile, "log", "level", &__log_level, RLOG_LV_INFO); @@ -364,9 +372,6 @@ int main(int argc, char * argv[]) /* LOGGER */ g_default_proxy->logger = g_default_logger; - /* adds locking, only required if accessed from separate threads */ - evthread_use_pthreads(); - /* MAIN THREAD EVBASE */ g_default_proxy->evbase = event_base_new(); CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit."); diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index f233c10..62184ec 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -1098,7 +1098,7 @@ static void peek_chello_on_succ(future_result_t * result, void * user) clock_gettime(CLOCK_MONOTONIC, &(ctx->start)); ctx->s_stream = ssl_stream_new(ctx->mgr, ctx->fd_upstream, CONN_DIR_UPSTREAM, chello, NULL, NULL); ctx->bev = bufferevent_openssl_socket_new(evbase, ctx->fd_upstream, - ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); + ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE ); bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev, 1); bufferevent_setcb(ctx->bev, NULL, NULL, ssl_server_connected_eventcb, p); @@ -1557,7 +1557,7 @@ void ask_keyring_on_succ(void * result, void * user) ctx->downstream = ssl_stream_new(mgr, ctx->fd_downstream, CONN_DIR_DOWNSTREAM, NULL, kyr, ctx->origin_ssl?ctx->origin_ssl->alpn_selected:NULL); ctx->bev_down = bufferevent_openssl_socket_new(evbase, ctx->fd_downstream, ctx->downstream->ssl, - BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS); + BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE); bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev_down, 1); bufferevent_setcb(ctx->bev_down, NULL, NULL, ssl_client_connected_eventcb, p); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 34e6b22..d0f2c24 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -46,6 +47,31 @@ static void __stream_bev_eventcb(struct bufferevent *, short, void *); * HELPER FUNCTIONS * ===================================================================================================================*/ +static inline void __stream_log_event(struct tfe_stream_private * _stream, + enum tfe_stream_event_log_type type, enum tfe_conn_dir dir, unsigned int error, const char * str_error) +{ + unsigned int log_offset = _stream->nr_log_event; + assert(log_offset < STREAM_EVENT_LOG_MAX); + + _stream->log_event[log_offset].type = type; + _stream->log_event[log_offset].dir = dir; + _stream->log_event[log_offset].error = error; + _stream->log_event[log_offset].str_error = str_error; + _stream->nr_log_event++; +} + +static const char * __str_stream_log_type(enum tfe_stream_event_log_type type) +{ + static const char * map_event_log_type[] = + { + [EVENT_LOG_CLOSE_BY_FD_PEER] = "FD/PEER", + [EVENT_LOG_CLOSE_BY_FD_EOF] = "FD/EOF", + [EVENT_LOG_CLOSE_BY_FD_ERROR] = "FD/ERR", + [EVENT_LOG_CLOSE_BY_SSL_ERROR] = "SSL/ERR" + }; + return map_event_log_type[type]; +} + static inline struct tfe_stream_private * to_stream_private(const struct tfe_stream * stream) { return container_of(stream, struct tfe_stream_private, head); @@ -77,6 +103,11 @@ static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, s return CONN_DIR_DOWNSTREAM; } +static inline const char * __str_dir(enum tfe_conn_dir dir) +{ + return dir == CONN_DIR_DOWNSTREAM ? "DOWNSTREAM" : "UPSTREAM"; +} + static inline bool __is_ssl(struct tfe_stream_private * _stream) { return (_stream->session_type == STREAM_PROTO_SSL); @@ -408,19 +439,19 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; - const char * str_direction{}; + enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); + const char * str_conn_dir = __str_dir(conn_dir); + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; - str_direction = "UPSTREAM"; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; - str_direction = "DOWNSTREAM"; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) @@ -440,13 +471,13 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve const char *func = (const char*)ERR_func_error_string(err); TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", - _stream->str_stream_addr, str_direction, err, lib, func, msg); + _stream->str_stream_addr, str_conn_dir, err, lib, func, msg); } if (errno) { TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s", - _stream->str_stream_addr, str_direction, errno, strerror(errno)); + _stream->str_stream_addr, str_conn_dir, errno, strerror(errno)); } } @@ -580,6 +611,19 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) break; } + if(_stream->need_to_be_kill) + { + const static struct linger sl{.l_onoff = 1, .l_linger = 0}; + + /* Set SO_LINGER, the fd will be closed by RST */ + setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); + + /* Destroy STREAM */ + TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS_KILL, 1); + return tfe_stream_destory(_stream); + } + #if 0 if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) { @@ -602,15 +646,16 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; struct ssl_stream ** ref_this_ssl_stream{}; + enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); - if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) + if (conn_dir == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_this_ssl_stream = &_stream->ssl_upstream; ref_peer_conn = &_stream->conn_downstream; } - if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) + if (conn_dir == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_this_ssl_stream = &_stream->ssl_downstream; @@ -624,9 +669,8 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) && (*ref_this_conn)->on_writing == 0 /* No body is prepare to write data, eg. No body call stream_write */ && evbuffer_get_length(__output_buffer) == 0) /* Nothing is in send queue */ { - TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_WRITE, 1); - __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); + __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, conn_dir, 0, NULL); *ref_this_conn = NULL; *ref_this_ssl_stream = NULL; } @@ -653,6 +697,10 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * struct ssl_stream ** ref_peer_ssl_stream{}; struct tfe_stream_write_ctx ** ref_this_write_ctx{}; + enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); + const char * str_conn_dir = __str_dir(conn_dir); + enum tfe_conn_dir peer_conn_dir{}; + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; @@ -660,6 +708,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * ref_this_ssl_stream = &_stream->ssl_upstream; ref_peer_ssl_stream = &_stream->ssl_downstream; ref_this_write_ctx = &_stream->w_ctx_upstream; + peer_conn_dir = CONN_DIR_DOWNSTREAM; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) @@ -669,6 +718,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * ref_this_ssl_stream = &_stream->ssl_downstream; ref_peer_ssl_stream = &_stream->ssl_upstream; ref_this_write_ctx = &_stream->w_ctx_downstream; + peer_conn_dir = CONN_DIR_UPSTREAM; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) @@ -678,9 +728,28 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * __stream_bev_readcb(bev, arg); } - if (events & BEV_EVENT_ERROR) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_ERROR, 1); } - if (events & BEV_EVENT_EOF) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_EOF, 1); } + if(events & BEV_EVENT_ERROR) + { + unsigned long err; + while ((err = (bufferevent_get_openssl_error(bev)))) + { + const char *msg = (const char*)ERR_reason_error_string(err); + const char *lib = (const char*)ERR_lib_error_string(err); + const char *func = (const char*)ERR_func_error_string(err); + TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", + _stream->str_stream_addr, str_conn_dir, err, lib, func, msg); + } + + if (errno) + { + TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s", + _stream->str_stream_addr, str_conn_dir, errno, strerror(errno)); + } + } + + if(events & BEV_EVENT_ERROR) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL); + if(events & BEV_EVENT_EOF) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL); goto __close_connection; } @@ -697,6 +766,8 @@ __close_connection: __conn_private_destory_with_ssl(ev_base, *ref_peer_conn, *ref_peer_ssl_stream); *ref_peer_conn = NULL; *ref_peer_ssl_stream = NULL; + + __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, peer_conn_dir, 0, NULL); } } @@ -750,9 +821,11 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private struct tfe_proxy * proxy_ref = stream->proxy_ref; __conn_private->_stream_ref = stream; - __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS); + __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE ); __conn_private->fd = fd; + bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); + if (!__conn_private->bev) { TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd); @@ -770,8 +843,7 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private __stream_bev_writecb, __stream_bev_eventcb, stream); } - bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); - if(unlikely(proxy_ref->en_rate_limit)) + if(proxy_ref->en_rate_limit) { conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options); } @@ -814,11 +886,10 @@ void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, v struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL); - TFE_STREAM_LOG_ERROR(_stream, "%s - Failed to create SSL downstream, close the connection : %s. ", + TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL downstream, close the connection : %s. ", _stream->str_stream_addr, what); - /* There is nothing we can do because upstream has been handshake, - * Close the stream */ + __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_DOWNSTREAM, 0, NULL); tfe_stream_destory(_stream); } @@ -854,26 +925,17 @@ void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, voi struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL); - TFE_STREAM_LOG_ERROR(_stream, "%s - Failed to create SSL upstream, pass-through the connection : %s. ", + TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL upstream, close the connection : %s. ", _stream->str_stream_addr, what); - _stream->passthough = true; - _stream->conn_downstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream); - _stream->conn_upstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream); - - assert(_stream->conn_downstream != NULL); - assert(_stream->conn_upstream != NULL); - - _stream->defer_fd_downstream = 0; - _stream->defer_fd_upstream = 0; - __conn_private_enable(_stream->conn_downstream); - __conn_private_enable(_stream->conn_upstream); + __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_UPSTREAM, 0, NULL); + tfe_stream_destory(_stream); } struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) { struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); - TFE_PROXY_STAT_INCREASE(STAT_STREAM_CREATE, 1); + TFE_PROXY_STAT_INCREASE(STAT_STREAM_OPEN, 1); _stream->thread_ref = thread_ctx; _stream->proxy_ref = pxy; @@ -887,10 +949,48 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ void __stream_access_log_write(struct tfe_stream_private * stream) { const char * str_passthrough = stream->passthough ? "PASSTHROUGH" : "-"; + const char * str_kill = stream->need_to_be_kill ? "KILL" : "-"; + + char str_log_event[TFE_STRING_MAX] = ""; + unsigned int offset = 0; + + /* Write event abstract log. It is used to determine which connection is broken */ + for(unsigned int i = 0; i < stream->nr_log_event; i++) + { + struct tfe_stream_event_log * ev_log = &stream->log_event[i]; + const char * str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "DOWN" : "UP"; + offset += snprintf(str_log_event + offset, sizeof(str_log_event) - offset, + "%s/%s ", __str_stream_log_type(ev_log->type), str_dir); + } MESA_handle_runtime_log(stream->stream_logger, RLOG_LV_INFO, "access", - "%d %d %d %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id, - stream->str_stream_addr, str_passthrough); + "%d %d %d %s %s %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id, + stream->str_stream_addr, str_passthrough, str_kill, str_log_event); +} + +static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}}; +void __ev_log_to_stat_map_init() __attribute__((constructor, used)); +void __ev_log_to_stat_map_init() +{ + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_DOWNSTREAM] = -1; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_EOF; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_UPSTREAM] = -1; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_EOF; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR; +} + +void __stream_close_stat(struct tfe_stream_private * stream) +{ + TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS, 1); + if(stream->nr_log_event > 0) + { + struct tfe_stream_event_log * ev_log = &stream->log_event[0]; + assert(ev_log_to_stat_map[ev_log->type][ev_log->dir] >= 0); + TFE_PROXY_STAT_INCREASE(ev_log_to_stat_map[ev_log->type][ev_log->dir], 1); + } } void tfe_stream_destory(struct tfe_stream_private * stream) @@ -898,8 +998,8 @@ void tfe_stream_destory(struct tfe_stream_private * stream) struct tfe_thread_ctx * thread = stream->thread_ref; struct event_base * ev_base = thread->evbase; - TFE_PROXY_STAT_INCREASE(STAT_STREAM_DESTROY, 1); __stream_access_log_write(stream); + __stream_close_stat(stream); if (stream->head.addr) { @@ -953,9 +1053,10 @@ void tfe_stream_destory(struct tfe_stream_private * stream) future_destroy(stream->future_upstream_create); } FREE(&(stream->plugin_ctxs)); + + tfe_proxy_thread_ctx_release(stream->thread_ref); stream->proxy_ref = NULL; FREE(&(stream)); - thread->load--; } int __fd_ttl_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, int ttl) @@ -1197,12 +1298,5 @@ int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir void tfe_stream_kill(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = to_stream_private(stream); - const static struct linger sl {.l_onoff = 1, .l_linger = 0}; - - /* Set SO_LINGER, the fd will be closed by RST */ - setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); - setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl)); - - /* Destroy STREAM */ - return tfe_stream_destory(_stream); + _stream->need_to_be_kill = true; } \ No newline at end of file diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 7dd5277..9d2f245 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -428,6 +428,12 @@ enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, e return ACTION_DEFER_DATA; } + if(hs_private->kill_signal) + { + tfe_stream_kill(stream); + return ACTION_DROP_DATA; + } + if (hf_private_in->is_upgrade || hf_private_in->is_passthrough) { goto __passthrough; @@ -456,12 +462,6 @@ enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, e goto __passthrough; } - if(hs_private->kill_signal) - { - tfe_stream_kill(stream); - return ACTION_DROP_DATA; - } - ret = (dir == CONN_DIR_DOWNSTREAM) ? __on_request_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session) : __on_response_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session);