Close #99 增加全局传输限速功能,增加passthrough状态下eventcb的错误日志

* 增加全局传输限速功能,可以限制上游、下游的传输速度;
* 增加eventcb中的错误日志,当BEV_EVENT_ERROR发生时,打印错误代码与日志信息。
This commit is contained in:
luqiuwen
2018-12-08 20:48:19 +06:00
parent 0b76bdf5e5
commit de92efb380
6 changed files with 109 additions and 13 deletions

View File

@@ -268,15 +268,21 @@ int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, co
return ret;
}
static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev)
static int conn_private_ratelimit_setup(struct tfe_conn_private * conn, struct tfe_proxy_rate_limit_options * opt)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
__conn_private->bev = bev;
__conn_private->fd = bufferevent_getfd(bev);
conn->ratelimit_bucket = ev_token_bucket_cfg_new(opt->read_rate, opt->read_burst,
opt->write_rate, opt->write_burst, NULL);
bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream);
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
return __conn_private;
if(unlikely(conn->ratelimit_bucket == NULL))
{
TFE_LOG_ERROR(g_default_logger, "Failed at setting ratelimit bucket, "
"read_rate = %u, read_burst = %u, write_rate = %u, write_burst = %u",
opt->read_rate, opt->read_burst, opt->write_rate, opt->write_burst);
return -1;
}
bufferevent_set_rate_limit(conn->bev, conn->ratelimit_bucket);
return 0;
}
int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type,
@@ -318,11 +324,20 @@ evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn)
static void __conn_private_destory(struct tfe_conn_private * conn)
{
bufferevent_disable(conn->bev, EV_READ | EV_WRITE);
if(conn->ratelimit_bucket)
{
ev_token_bucket_cfg_free(conn->ratelimit_bucket);
conn->ratelimit_bucket = NULL;
}
bufferevent_free(conn->bev);
if (conn->fd > 0)
{
evutil_closesocket(conn->fd);
}
if (conn->fd > 0) evutil_closesocket(conn->fd);
free(conn);
TFE_PROXY_STAT_INCREASE(STAT_FD_INSTANT_CLOSE, 1);
}
@@ -338,6 +353,7 @@ static void __conn_private_destory_with_ssl(struct event_base * ev_base,
static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_proxy * _proxy = _stream->proxy_ref;
struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev));
struct evbuffer * __input_buffer = bufferevent_get_input(bev);
@@ -393,16 +409,19 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
const char * str_direction{};
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
str_direction = "UPSTREAM";
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
str_direction = "DOWNSTREAM";
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
@@ -412,6 +431,26 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve
__stream_bev_passthrough_readcb(bev, arg);
}
if(events & BEV_EVENT_ERROR)
{
unsigned long err;
while ((err = (bufferevent_get_openssl_error(bev))))
{
const char *msg = (const char*)ERR_reason_error_string(err);
const char *lib = (const char*)ERR_lib_error_string(err);
const char *func = (const char*)ERR_func_error_string(err);
TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s",
_stream->str_stream_addr, str_direction, err, lib, func, msg);
}
if (errno)
{
TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s",
_stream->str_stream_addr, str_direction, errno, strerror(errno));
}
}
goto __close_connection;
}
@@ -687,10 +726,29 @@ __call_plugin_close:
tfe_stream_destory(_stream);
}
static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
__conn_private->bev = bev;
__conn_private->fd = bufferevent_getfd(bev);
bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream);
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
struct tfe_proxy * proxy_ref = stream->proxy_ref;
if(unlikely(proxy_ref->en_rate_limit))
{
conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options);
}
return __conn_private;
}
static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
struct event_base * __ev_base = stream->thread_ref->evbase;
struct tfe_proxy * proxy_ref = stream->proxy_ref;
__conn_private->_stream_ref = stream;
__conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS);
@@ -714,6 +772,11 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private
}
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
if(unlikely(proxy_ref->en_rate_limit))
{
conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options);
}
return __conn_private;
__errout: