修正Http-kill位置过完的问题,增加TCP链接摘要日志功能

This commit is contained in:
luqiuwen
2018-12-14 03:06:34 +06:00
parent 8c93f7203d
commit 8713da2d81
7 changed files with 203 additions and 75 deletions

View File

@@ -11,7 +11,8 @@
enum tfe_conn_dir
{
CONN_DIR_DOWNSTREAM = 0, //From client to proxy, aka client-side.
CONN_DIR_UPSTREAM //From proxy to server, aka server-side.
CONN_DIR_UPSTREAM, //From proxy to server, aka server-side.
__CONN_DIR_MAX
};
/* network-order */

View File

@@ -50,6 +50,23 @@ struct tfe_conn_private
uint8_t on_writing;
};
enum tfe_stream_event_log_type
{
EVENT_LOG_CLOSE_BY_FD_PEER,
EVENT_LOG_CLOSE_BY_FD_EOF,
EVENT_LOG_CLOSE_BY_FD_ERROR,
EVENT_LOG_CLOSE_BY_SSL_ERROR,
__EVENT_LOG_CLOSE_MAX
};
struct tfe_stream_event_log
{
enum tfe_stream_event_log_type type;
enum tfe_conn_dir dir;
unsigned int error;
const char * str_error;
};
struct tfe_stream_private
{
struct tfe_stream head;
@@ -103,12 +120,20 @@ struct tfe_stream_private
bool is_suspended;
enum tfe_conn_dir suspended_by;
/* KILL */
bool need_to_be_kill;
/* KEYRING-ID */
unsigned keyring_id;
/* ONLY FOR LOG */
evutil_socket_t log_fd_downstream;
evutil_socket_t log_fd_upstream;
/* EVENT LOG */
#define STREAM_EVENT_LOG_MAX 8
struct tfe_stream_event_log log_event[STREAM_EVENT_LOG_MAX];
unsigned int nr_log_event;
};
static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream)

View File

@@ -14,16 +14,19 @@ enum TFE_STAT_FIELD
/* FDs */
STAT_FD_OPEN_BY_KNI_ACCEPT,
STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL,
STAT_FD_CLOSE_BY_EVENT_WRITE,
STAT_FD_CLOSE_BY_EVENT_EOF,
STAT_FD_CLOSE_BY_EVENT_ERROR,
/* FDs */
STAT_FD_INSTANT_CLOSE,
STAT_FD_DEFER_CLOSE_IN_QUEUE,
STAT_FD_DEFER_CLOSE_SUCCESS,
/* Stream */
STAT_STREAM_CREATE,
STAT_STREAM_DESTROY,
STAT_STREAM_OPEN,
STAT_STREAM_CLS,
STAT_STREAM_CLS_DOWN_EOF,
STAT_STREAM_CLS_UP_EOF,
STAT_STREAM_CLS_DOWN_ERR,
STAT_STREAM_CLS_UP_ERR,
STAT_STREAM_CLS_KILL,
/* Stream Protocol */
STAT_STREAM_TCP_PLAIN,
STAT_STREAM_TCP_SSL,

View File

@@ -79,17 +79,19 @@ struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx)
for (unsigned int tid = 0; tid < ctx->nr_work_threads; tid++)
{
struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid];
min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id;
min_load = min_load > thread_ctx->load ? thread_ctx->load : min_load;
unsigned int thread_load = ATOMIC_READ(&thread_ctx->load);
min_thread_id = min_load > thread_load ? tid : min_thread_id;
min_load = min_load > thread_load ? thread_load : min_load;
}
ctx->work_threads[min_thread_id]->load++;
ATOMIC_INC(&ctx->work_threads[min_thread_id]->load);
return ctx->work_threads[min_thread_id];
}
void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx)
{
thread_ctx->load--;
ATOMIC_DEC(&thread_ctx->load);
}
int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para)
@@ -274,18 +276,20 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile)
static const char * __str_stat_spec_map[] =
{
[STAT_SIGPIPE] = "SIGPIPE",
[STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdOpenKNI",
[STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdCloseKNI",
[STAT_FD_CLOSE_BY_EVENT_WRITE] = "FdCloseUser",
[STAT_FD_CLOSE_BY_EVENT_EOF] = "FdCloseEOF",
[STAT_FD_CLOSE_BY_EVENT_ERROR] = "FdCloseError",
[STAT_FD_INSTANT_CLOSE] = "FdCloseInstant",
[STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdCloseDeferInQ",
[STAT_FD_DEFER_CLOSE_SUCCESS] = "FdCloseDeferSuc",
[STAT_STREAM_CREATE] = "StreamCreate",
[STAT_STREAM_DESTROY] = "StreamDestroy",
[STAT_STREAM_TCP_PLAIN] = "StreamTCPPlain",
[STAT_STREAM_TCP_SSL] = "StreamTCPSSL",
[STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdRcv",
[STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdRcvFail",
[STAT_FD_INSTANT_CLOSE] = "FdClsInstant",
[STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdClsDefInQ",
[STAT_FD_DEFER_CLOSE_SUCCESS] = "FdClsDefSuc",
[STAT_STREAM_OPEN] = "StrOpen",
[STAT_STREAM_CLS] = "StrCls",
[STAT_STREAM_CLS_DOWN_EOF] = "StrDownEOF",
[STAT_STREAM_CLS_UP_EOF] = "StrUpEOF",
[STAT_STREAM_CLS_DOWN_ERR] = "StrDownErr",
[STAT_STREAM_CLS_UP_ERR] = "StrUpErr",
[STAT_STREAM_CLS_KILL] = "StrKill",
[STAT_STREAM_TCP_PLAIN] = "Plain",
[STAT_STREAM_TCP_SSL] = "SSL",
[TFE_STAT_MAX] = NULL
};
@@ -334,6 +338,10 @@ int main(int argc, char * argv[])
{
const char * main_profile = "./conf/tfe/tfe.conf";
const char * future_profile= "./conf/tfe/future.conf";
/* adds locking, only required if accessed from separate threads */
evthread_use_pthreads();
unsigned int __log_level = RLOG_LV_INFO;
MESA_load_profile_uint_def(main_profile, "log", "level", &__log_level, RLOG_LV_INFO);
@@ -364,9 +372,6 @@ int main(int argc, char * argv[])
/* LOGGER */
g_default_proxy->logger = g_default_logger;
/* adds locking, only required if accessed from separate threads */
evthread_use_pthreads();
/* MAIN THREAD EVBASE */
g_default_proxy->evbase = event_base_new();
CHECK_OR_EXIT(g_default_proxy->evbase, "Failed at creating evbase for main thread. Exit.");

View File

@@ -1098,7 +1098,7 @@ static void peek_chello_on_succ(future_result_t * result, void * user)
clock_gettime(CLOCK_MONOTONIC, &(ctx->start));
ctx->s_stream = ssl_stream_new(ctx->mgr, ctx->fd_upstream, CONN_DIR_UPSTREAM, chello, NULL, NULL);
ctx->bev = bufferevent_openssl_socket_new(evbase, ctx->fd_upstream,
ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS);
ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE );
bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev, 1);
bufferevent_setcb(ctx->bev, NULL, NULL, ssl_server_connected_eventcb, p);
@@ -1557,7 +1557,7 @@ void ask_keyring_on_succ(void * result, void * user)
ctx->downstream = ssl_stream_new(mgr, ctx->fd_downstream, CONN_DIR_DOWNSTREAM, NULL, kyr,
ctx->origin_ssl?ctx->origin_ssl->alpn_selected:NULL);
ctx->bev_down = bufferevent_openssl_socket_new(evbase, ctx->fd_downstream, ctx->downstream->ssl,
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE);
bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev_down, 1);
bufferevent_setcb(ctx->bev_down, NULL, NULL, ssl_client_connected_eventcb, p);

View File

@@ -25,6 +25,7 @@
#include <tfe_utils.h>
#include <tfe_future.h>
#include <tfe_plugin.h>
#include <tfe_proxy.h>
#include <platform.h>
#include <ssl_stream.h>
@@ -46,6 +47,31 @@ static void __stream_bev_eventcb(struct bufferevent *, short, void *);
* HELPER FUNCTIONS
* ===================================================================================================================*/
static inline void __stream_log_event(struct tfe_stream_private * _stream,
enum tfe_stream_event_log_type type, enum tfe_conn_dir dir, unsigned int error, const char * str_error)
{
unsigned int log_offset = _stream->nr_log_event;
assert(log_offset < STREAM_EVENT_LOG_MAX);
_stream->log_event[log_offset].type = type;
_stream->log_event[log_offset].dir = dir;
_stream->log_event[log_offset].error = error;
_stream->log_event[log_offset].str_error = str_error;
_stream->nr_log_event++;
}
static const char * __str_stream_log_type(enum tfe_stream_event_log_type type)
{
static const char * map_event_log_type[] =
{
[EVENT_LOG_CLOSE_BY_FD_PEER] = "FD/PEER",
[EVENT_LOG_CLOSE_BY_FD_EOF] = "FD/EOF",
[EVENT_LOG_CLOSE_BY_FD_ERROR] = "FD/ERR",
[EVENT_LOG_CLOSE_BY_SSL_ERROR] = "SSL/ERR"
};
return map_event_log_type[type];
}
static inline struct tfe_stream_private * to_stream_private(const struct tfe_stream * stream)
{
return container_of(stream, struct tfe_stream_private, head);
@@ -77,6 +103,11 @@ static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, s
return CONN_DIR_DOWNSTREAM;
}
static inline const char * __str_dir(enum tfe_conn_dir dir)
{
return dir == CONN_DIR_DOWNSTREAM ? "DOWNSTREAM" : "UPSTREAM";
}
static inline bool __is_ssl(struct tfe_stream_private * _stream)
{
return (_stream->session_type == STREAM_PROTO_SSL);
@@ -408,19 +439,19 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
const char * str_direction{};
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
const char * str_conn_dir = __str_dir(conn_dir);
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
str_direction = "UPSTREAM";
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
str_direction = "DOWNSTREAM";
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
@@ -440,13 +471,13 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve
const char *func = (const char*)ERR_func_error_string(err);
TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s",
_stream->str_stream_addr, str_direction, err, lib, func, msg);
_stream->str_stream_addr, str_conn_dir, err, lib, func, msg);
}
if (errno)
{
TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s",
_stream->str_stream_addr, str_direction, errno, strerror(errno));
_stream->str_stream_addr, str_conn_dir, errno, strerror(errno));
}
}
@@ -580,6 +611,19 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
break;
}
if(_stream->need_to_be_kill)
{
const static struct linger sl{.l_onoff = 1, .l_linger = 0};
/* Set SO_LINGER, the fd will be closed by RST */
setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
/* Destroy STREAM */
TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS_KILL, 1);
return tfe_stream_destory(_stream);
}
#if 0
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
{
@@ -602,15 +646,16 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
struct ssl_stream ** ref_this_ssl_stream{};
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
if (conn_dir == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_this_ssl_stream = &_stream->ssl_upstream;
ref_peer_conn = &_stream->conn_downstream;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
if (conn_dir == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_this_ssl_stream = &_stream->ssl_downstream;
@@ -624,9 +669,8 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
&& (*ref_this_conn)->on_writing == 0 /* No body is prepare to write data, eg. No body call stream_write */
&& evbuffer_get_length(__output_buffer) == 0) /* Nothing is in send queue */
{
TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_WRITE, 1);
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, conn_dir, 0, NULL);
*ref_this_conn = NULL;
*ref_this_ssl_stream = NULL;
}
@@ -653,6 +697,10 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
struct ssl_stream ** ref_peer_ssl_stream{};
struct tfe_stream_write_ctx ** ref_this_write_ctx{};
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
const char * str_conn_dir = __str_dir(conn_dir);
enum tfe_conn_dir peer_conn_dir{};
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
@@ -660,6 +708,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
ref_this_ssl_stream = &_stream->ssl_upstream;
ref_peer_ssl_stream = &_stream->ssl_downstream;
ref_this_write_ctx = &_stream->w_ctx_upstream;
peer_conn_dir = CONN_DIR_DOWNSTREAM;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
@@ -669,6 +718,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
ref_this_ssl_stream = &_stream->ssl_downstream;
ref_peer_ssl_stream = &_stream->ssl_upstream;
ref_this_write_ctx = &_stream->w_ctx_downstream;
peer_conn_dir = CONN_DIR_UPSTREAM;
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
@@ -678,9 +728,28 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
__stream_bev_readcb(bev, arg);
}
if (events & BEV_EVENT_ERROR) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_ERROR, 1); }
if (events & BEV_EVENT_EOF) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_EOF, 1); }
if(events & BEV_EVENT_ERROR)
{
unsigned long err;
while ((err = (bufferevent_get_openssl_error(bev))))
{
const char *msg = (const char*)ERR_reason_error_string(err);
const char *lib = (const char*)ERR_lib_error_string(err);
const char *func = (const char*)ERR_func_error_string(err);
TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s",
_stream->str_stream_addr, str_conn_dir, err, lib, func, msg);
}
if (errno)
{
TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s",
_stream->str_stream_addr, str_conn_dir, errno, strerror(errno));
}
}
if(events & BEV_EVENT_ERROR) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL);
if(events & BEV_EVENT_EOF) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL);
goto __close_connection;
}
@@ -697,6 +766,8 @@ __close_connection:
__conn_private_destory_with_ssl(ev_base, *ref_peer_conn, *ref_peer_ssl_stream);
*ref_peer_conn = NULL;
*ref_peer_ssl_stream = NULL;
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, peer_conn_dir, 0, NULL);
}
}
@@ -750,9 +821,11 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private
struct tfe_proxy * proxy_ref = stream->proxy_ref;
__conn_private->_stream_ref = stream;
__conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS);
__conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE );
__conn_private->fd = fd;
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
if (!__conn_private->bev)
{
TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd);
@@ -770,8 +843,7 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private
__stream_bev_writecb, __stream_bev_eventcb, stream);
}
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
if(unlikely(proxy_ref->en_rate_limit))
if(proxy_ref->en_rate_limit)
{
conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options);
}
@@ -814,11 +886,10 @@ void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, v
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL);
TFE_STREAM_LOG_ERROR(_stream, "%s - Failed to create SSL downstream, close the connection : %s. ",
TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL downstream, close the connection : %s. ",
_stream->str_stream_addr, what);
/* There is nothing we can do because upstream has been handshake,
* Close the stream */
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_DOWNSTREAM, 0, NULL);
tfe_stream_destory(_stream);
}
@@ -854,26 +925,17 @@ void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, voi
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL);
TFE_STREAM_LOG_ERROR(_stream, "%s - Failed to create SSL upstream, pass-through the connection : %s. ",
TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL upstream, close the connection : %s. ",
_stream->str_stream_addr, what);
_stream->passthough = true;
_stream->conn_downstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream);
_stream->conn_upstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream);
assert(_stream->conn_downstream != NULL);
assert(_stream->conn_upstream != NULL);
_stream->defer_fd_downstream = 0;
_stream->defer_fd_upstream = 0;
__conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream);
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_UPSTREAM, 0, NULL);
tfe_stream_destory(_stream);
}
struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx)
{
struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1);
TFE_PROXY_STAT_INCREASE(STAT_STREAM_CREATE, 1);
TFE_PROXY_STAT_INCREASE(STAT_STREAM_OPEN, 1);
_stream->thread_ref = thread_ctx;
_stream->proxy_ref = pxy;
@@ -887,10 +949,48 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_
void __stream_access_log_write(struct tfe_stream_private * stream)
{
const char * str_passthrough = stream->passthough ? "PASSTHROUGH" : "-";
const char * str_kill = stream->need_to_be_kill ? "KILL" : "-";
char str_log_event[TFE_STRING_MAX] = "";
unsigned int offset = 0;
/* Write event abstract log. It is used to determine which connection is broken */
for(unsigned int i = 0; i < stream->nr_log_event; i++)
{
struct tfe_stream_event_log * ev_log = &stream->log_event[i];
const char * str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "DOWN" : "UP";
offset += snprintf(str_log_event + offset, sizeof(str_log_event) - offset,
"%s/%s ", __str_stream_log_type(ev_log->type), str_dir);
}
MESA_handle_runtime_log(stream->stream_logger, RLOG_LV_INFO, "access",
"%d %d %d %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id,
stream->str_stream_addr, str_passthrough);
"%d %d %d %s %s %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id,
stream->str_stream_addr, str_passthrough, str_kill, str_log_event);
}
static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}};
void __ev_log_to_stat_map_init() __attribute__((constructor, used));
void __ev_log_to_stat_map_init()
{
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_DOWNSTREAM] = -1;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_EOF;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_UPSTREAM] = -1;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_EOF;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR;
}
void __stream_close_stat(struct tfe_stream_private * stream)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS, 1);
if(stream->nr_log_event > 0)
{
struct tfe_stream_event_log * ev_log = &stream->log_event[0];
assert(ev_log_to_stat_map[ev_log->type][ev_log->dir] >= 0);
TFE_PROXY_STAT_INCREASE(ev_log_to_stat_map[ev_log->type][ev_log->dir], 1);
}
}
void tfe_stream_destory(struct tfe_stream_private * stream)
@@ -898,8 +998,8 @@ void tfe_stream_destory(struct tfe_stream_private * stream)
struct tfe_thread_ctx * thread = stream->thread_ref;
struct event_base * ev_base = thread->evbase;
TFE_PROXY_STAT_INCREASE(STAT_STREAM_DESTROY, 1);
__stream_access_log_write(stream);
__stream_close_stat(stream);
if (stream->head.addr)
{
@@ -953,9 +1053,10 @@ void tfe_stream_destory(struct tfe_stream_private * stream)
future_destroy(stream->future_upstream_create);
}
FREE(&(stream->plugin_ctxs));
tfe_proxy_thread_ctx_release(stream->thread_ref);
stream->proxy_ref = NULL;
FREE(&(stream));
thread->load--;
}
int __fd_ttl_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, int ttl)
@@ -1197,12 +1298,5 @@ int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir
void tfe_stream_kill(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
const static struct linger sl {.l_onoff = 1, .l_linger = 0};
/* Set SO_LINGER, the fd will be closed by RST */
setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
/* Destroy STREAM */
return tfe_stream_destory(_stream);
_stream->need_to_be_kill = true;
}

View File

@@ -428,6 +428,12 @@ enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, e
return ACTION_DEFER_DATA;
}
if(hs_private->kill_signal)
{
tfe_stream_kill(stream);
return ACTION_DROP_DATA;
}
if (hf_private_in->is_upgrade || hf_private_in->is_passthrough)
{
goto __passthrough;
@@ -456,12 +462,6 @@ enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, e
goto __passthrough;
}
if(hs_private->kill_signal)
{
tfe_stream_kill(stream);
return ACTION_DROP_DATA;
}
ret = (dir == CONN_DIR_DOWNSTREAM) ?
__on_request_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session) :
__on_response_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session);