diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index 50c839d..e35ccca 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -68,8 +68,20 @@ enum tfe_stream_close_reason REASON_ERROR }; +enum tfe_stream_info +{ + /* OFFSET IN BYTES, FROM BEGIN OF THE DIRECTION + * TYPE: SIZE_T */ + INFO_FROM_DOWNSTREAM_RX_OFFSET, + INFO_FROM_UPSTREAM_RX_OFFSET, +}; + int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type, void * value, size_t size); + +int tfe_stream_info_get(const struct tfe_stream * stream, enum tfe_stream_info type, + void * value, size_t size); + enum tfe_stream_opt_level { STREAM_OPT_LEVEL_TCP, diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 1eb696f..f7abbbd 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -16,7 +16,7 @@ struct tfe_thread_ctx struct event_base * evbase; struct evdns_base* dnsbase; unsigned char running; - + unsigned int nr_modules; const struct tfe_plugin * modules; }; @@ -48,7 +48,6 @@ struct tfe_conn_private struct bufferevent * bev; struct ev_token_bucket_cfg * ratelimit_bucket; uint8_t on_writing; - size_t total_rx_bytes, total_tx_bytes; }; enum tfe_stream_event_log_type @@ -96,7 +95,6 @@ struct tfe_stream_private size_t forward_bytes; size_t drop_bytes; - size_t defer_bytes; struct timeval defer_timeval; @@ -140,6 +138,10 @@ struct tfe_stream_private /* CONNECTION LOG */ char ssl_downstream_info_dump[TFE_STRING_MAX]; char ssl_upstream_info_dump[TFE_STRING_MAX]; + + /* OFFSET FOR LOG */ + size_t downstream_rx_offset; + size_t upstream_rx_offset; }; static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 40d65db..d3dee51 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -20,6 +20,7 @@ enum TFE_STAT_FIELD STAT_FD_INSTANT_CLOSE, STAT_FD_DEFER_CLOSE_IN_QUEUE, STAT_FD_DEFER_CLOSE_SUCCESS, + /* Stream */ STAT_STREAM_OPEN, STAT_STREAM_CLS, @@ -32,6 +33,11 @@ enum TFE_STAT_FIELD /* Stream Protocol */ STAT_STREAM_TCP_PLAIN, STAT_STREAM_TCP_SSL, + + /* RX DATA */ + STAT_STREAM_DOWN_RX_BYTES, + STAT_STREAM_UP_RX_BYTES, + TFE_STAT_MAX }; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 7f9b136..0406ed6 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -234,7 +234,7 @@ void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) { proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1); proxy->work_threads[i]->thread_id = i; - proxy->work_threads[i]->evbase = event_base_new(); + proxy->work_threads[i]->evbase = event_base_new(); proxy->work_threads[i]->dnsbase = evdns_base_new(proxy->work_threads[i]->evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS); } return; @@ -290,7 +290,7 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1); - + return 0; } @@ -311,6 +311,8 @@ static const char * __str_stat_spec_map[] = [STAT_STREAM_CLS_KILL] = "stm_kill", [STAT_STREAM_TCP_PLAIN] = "plain", [STAT_STREAM_TCP_SSL] = "SSL", + [STAT_STREAM_DOWN_RX_BYTES] = "dstm_bytes", + [STAT_STREAM_UP_RX_BYTES] = "ustm_bytes", [TFE_STAT_MAX] = NULL }; diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 86fb56c..a6bcd15 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -345,6 +345,49 @@ int tfe_stream_set_integer_opt(struct tfe_stream * stream, enum tfe_stream_opt_l { return 1; } + +int tfe_stream_info_get(const struct tfe_stream * stream, enum tfe_stream_info type, void * value, size_t size) +{ +#define __CHECK_PARAM(type) do { if(sizeof(type) != size) return -EINVAL; } while(0) + struct tfe_stream_private * _stream = to_stream_private(stream); + + switch (type) + { + case INFO_FROM_DOWNSTREAM_RX_OFFSET: + __CHECK_PARAM(_stream->downstream_rx_offset); + if(_stream->conn_downstream) + { + *((size_t *) value) = _stream->downstream_rx_offset; + return sizeof(_stream->downstream_rx_offset); + } + else + { + return -ENOTCONN; + } + break; + + case INFO_FROM_UPSTREAM_RX_OFFSET: + __CHECK_PARAM(_stream->upstream_rx_offset); + if(_stream->conn_upstream) + { + *((size_t *) value) = _stream->upstream_rx_offset; + return sizeof(_stream->upstream_rx_offset); + } + else + { + return -ENOTCONN; + } + + break; + + default: assert(0); + return -EINVAL; + } + +#undef __CHECK_PARAM + return 0; +} + /* ==================================================================================================================== * CONNECTION STRUCTURE AND OPERATION FUCTIONS * ===================================================================================================================*/ @@ -527,6 +570,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __bev_dir(_stream, bev); + struct tfe_conn_private * this_conn = __this_conn(_stream, dir); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); @@ -534,7 +578,6 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) * This connection will be destoryed in __event_cb */ struct evbuffer * inbuf = bufferevent_get_input(bev); size_t contigous_len = evbuffer_get_length(inbuf); - this_conn->total_rx_bytes+=contigous_len; if (peer_conn == NULL) { evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); @@ -548,6 +591,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) enum tfe_stream_action action_final = ACTION_FORWARD_DATA; size_t drain_size = 0; + size_t rx_offset_increase = 0; unsigned char * contiguous_data = evbuffer_pullup(inbuf, contigous_len); _stream->defer_bytes = 0; @@ -588,13 +632,14 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) if (_stream->forward_bytes > 0) { evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes); - peer_conn->total_tx_bytes+=_stream->forward_bytes; + rx_offset_increase += _stream->forward_bytes; } else { evbuffer_add_buffer(outbuf, inbuf); - peer_conn->total_tx_bytes+=contigous_len; + rx_offset_increase += contigous_len; } + break; case ACTION_DROP_DATA: @@ -606,7 +651,9 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { drain_size = evbuffer_get_length(inbuf); } + evbuffer_drain(inbuf, drain_size); + rx_offset_increase += drain_size; break; case ACTION_DEFER_DATA: @@ -619,6 +666,17 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) break; } + if (dir == CONN_DIR_DOWNSTREAM) + { + TFE_PROXY_STAT_INCREASE(STAT_STREAM_DOWN_RX_BYTES, rx_offset_increase); + _stream->downstream_rx_offset += rx_offset_increase; + } + else + { + TFE_PROXY_STAT_INCREASE(STAT_STREAM_UP_RX_BYTES, rx_offset_increase); + _stream->upstream_rx_offset += rx_offset_increase; + } + if(_stream->need_to_be_kill) { const static struct linger sl{.l_onoff = 1, .l_linger = 0}; @@ -708,6 +766,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); const char * str_conn_dir = __str_dir(conn_dir); enum tfe_conn_dir peer_conn_dir{}; + size_t rx_offset = 0; if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { @@ -717,6 +776,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * ref_peer_ssl_stream = &_stream->ssl_downstream; ref_this_write_ctx = &_stream->w_ctx_upstream; peer_conn_dir = CONN_DIR_DOWNSTREAM; + rx_offset = _stream->upstream_rx_offset; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) @@ -727,6 +787,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * ref_peer_ssl_stream = &_stream->ssl_upstream; ref_this_write_ctx = &_stream->w_ctx_downstream; peer_conn_dir = CONN_DIR_UPSTREAM; + rx_offset = _stream->downstream_rx_offset; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) @@ -736,24 +797,33 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * __stream_bev_readcb(bev, arg); } - if(events & BEV_EVENT_ERROR) + if (events & BEV_EVENT_ERROR) { - if(_stream->session_type==STREAM_PROTO_SSL) + if (_stream->session_type == STREAM_PROTO_SSL) { ssl_stream_log_error(bev, __bev_dir(_stream, bev), _stream->ssl_mgr); } - else if(errno) + else if (errno) { TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s", _stream->str_stream_addr, str_conn_dir, errno, strerror(errno)); } } - else if(events & BEV_EVENT_EOF && (*ref_this_conn)->total_rx_bytes==0 && _stream->session_type==STREAM_PROTO_SSL) + else if (events & BEV_EVENT_EOF && rx_offset == 0 && _stream->session_type == STREAM_PROTO_SSL) { ssl_stream_process_zero_eof(*ref_this_ssl_stream, g_default_proxy->ssl_mgr_handler); } - if(events & BEV_EVENT_ERROR) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL); - if(events & BEV_EVENT_EOF) __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL); + + if (events & BEV_EVENT_ERROR) + { + __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL); + } + + if (events & BEV_EVENT_EOF) + { + __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL); + } + goto __close_connection; } diff --git a/plugin/business/traffic-mirror/src/ethdev.cpp b/plugin/business/traffic-mirror/src/ethdev.cpp index b67e36b..ec1b4c0 100644 --- a/plugin/business/traffic-mirror/src/ethdev.cpp +++ b/plugin/business/traffic-mirror/src/ethdev.cpp @@ -49,6 +49,7 @@ static int mr4_ethdev_send_finish(struct traffic_mirror_ethdev * ethdev, unsigne return -1; } + detail_mr4->sendbuf[tid] = NULL; return 0; }