增加逐流的上、下游收发字节统计并对业务插件提供查询接口。

This commit is contained in:
luqiuwen
2019-08-13 19:50:51 +08:00
parent 1e869c3ded
commit 3015d4df86
6 changed files with 107 additions and 14 deletions

View File

@@ -345,6 +345,49 @@ int tfe_stream_set_integer_opt(struct tfe_stream * stream, enum tfe_stream_opt_l
{
return 1;
}
int tfe_stream_info_get(const struct tfe_stream * stream, enum tfe_stream_info type, void * value, size_t size)
{
#define __CHECK_PARAM(type) do { if(sizeof(type) != size) return -EINVAL; } while(0)
struct tfe_stream_private * _stream = to_stream_private(stream);
switch (type)
{
case INFO_FROM_DOWNSTREAM_RX_OFFSET:
__CHECK_PARAM(_stream->downstream_rx_offset);
if(_stream->conn_downstream)
{
*((size_t *) value) = _stream->downstream_rx_offset;
return sizeof(_stream->downstream_rx_offset);
}
else
{
return -ENOTCONN;
}
break;
case INFO_FROM_UPSTREAM_RX_OFFSET:
__CHECK_PARAM(_stream->upstream_rx_offset);
if(_stream->conn_upstream)
{
*((size_t *) value) = _stream->upstream_rx_offset;
return sizeof(_stream->upstream_rx_offset);
}
else
{
return -ENOTCONN;
}
break;
default: assert(0);
return -EINVAL;
}
#undef __CHECK_PARAM
return 0;
}
/* ====================================================================================================================
* CONNECTION STRUCTURE AND OPERATION FUCTIONS
* ===================================================================================================================*/
@@ -527,6 +570,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __bev_dir(_stream, bev);
struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
@@ -534,7 +578,6 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
* This connection will be destoryed in __event_cb */
struct evbuffer * inbuf = bufferevent_get_input(bev);
size_t contigous_len = evbuffer_get_length(inbuf);
this_conn->total_rx_bytes+=contigous_len;
if (peer_conn == NULL)
{
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
@@ -548,6 +591,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
enum tfe_stream_action action_final = ACTION_FORWARD_DATA;
size_t drain_size = 0;
size_t rx_offset_increase = 0;
unsigned char * contiguous_data = evbuffer_pullup(inbuf, contigous_len);
_stream->defer_bytes = 0;
@@ -588,13 +632,14 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
if (_stream->forward_bytes > 0)
{
evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes);
peer_conn->total_tx_bytes+=_stream->forward_bytes;
rx_offset_increase += _stream->forward_bytes;
}
else
{
evbuffer_add_buffer(outbuf, inbuf);
peer_conn->total_tx_bytes+=contigous_len;
rx_offset_increase += contigous_len;
}
break;
case ACTION_DROP_DATA:
@@ -606,7 +651,9 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{
drain_size = evbuffer_get_length(inbuf);
}
evbuffer_drain(inbuf, drain_size);
rx_offset_increase += drain_size;
break;
case ACTION_DEFER_DATA:
@@ -619,6 +666,17 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
break;
}
if (dir == CONN_DIR_DOWNSTREAM)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_DOWN_RX_BYTES, rx_offset_increase);
_stream->downstream_rx_offset += rx_offset_increase;
}
else
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_UP_RX_BYTES, rx_offset_increase);
_stream->upstream_rx_offset += rx_offset_increase;
}
if(_stream->need_to_be_kill)
{
const static struct linger sl{.l_onoff = 1, .l_linger = 0};
@@ -708,6 +766,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
const char * str_conn_dir = __str_dir(conn_dir);
enum tfe_conn_dir peer_conn_dir{};
size_t rx_offset = 0;
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
@@ -717,6 +776,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
ref_peer_ssl_stream = &_stream->ssl_downstream;
ref_this_write_ctx = &_stream->w_ctx_upstream;
peer_conn_dir = CONN_DIR_DOWNSTREAM;
rx_offset = _stream->upstream_rx_offset;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
@@ -727,6 +787,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
ref_peer_ssl_stream = &_stream->ssl_upstream;
ref_this_write_ctx = &_stream->w_ctx_downstream;
peer_conn_dir = CONN_DIR_UPSTREAM;
rx_offset = _stream->downstream_rx_offset;
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
@@ -736,24 +797,33 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
__stream_bev_readcb(bev, arg);
}
if(events & BEV_EVENT_ERROR)
if (events & BEV_EVENT_ERROR)
{
if(_stream->session_type==STREAM_PROTO_SSL)
if (_stream->session_type == STREAM_PROTO_SSL)
{
ssl_stream_log_error(bev, __bev_dir(_stream, bev), _stream->ssl_mgr);
}
else if(errno)
else if (errno)
{
TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s",
_stream->str_stream_addr, str_conn_dir, errno, strerror(errno));
}
}
else if(events & BEV_EVENT_EOF && (*ref_this_conn)->total_rx_bytes==0 && _stream->session_type==STREAM_PROTO_SSL)
else if (events & BEV_EVENT_EOF && rx_offset == 0 && _stream->session_type == STREAM_PROTO_SSL)
{
ssl_stream_process_zero_eof(*ref_this_ssl_stream, g_default_proxy->ssl_mgr_handler);
}
if(events & BEV_EVENT_ERROR) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL);
if(events & BEV_EVENT_EOF) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL);
if (events & BEV_EVENT_ERROR)
{
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL);
}
if (events & BEV_EVENT_EOF)
{
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL);
}
goto __close_connection;
}