diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index f44970c..be895a0 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -81,19 +81,38 @@ static inline struct tfe_stream_private * to_stream_private(const struct tfe_str return container_of(stream, struct tfe_stream_private, head); } -static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) +static inline struct tfe_conn_private *__this_conn_for_read(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); } -static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) +static inline struct tfe_conn_private *__this_conn_for_write(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) +{ + if (_stream->is_decrypted_traffic_steering) + { + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_s) : (_stream->conn_fake_c)); + } + else + { + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); + } +} + +static inline struct tfe_conn_private *__peer_conn_for_read(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); } -static inline struct tfe_conn_private *__steering_peer_conn(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) +static inline struct tfe_conn_private *__peer_conn_for_write(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) { - return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_c) : (_stream->conn_fake_s)); + if (_stream->is_decrypted_traffic_steering) + { + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_c) : (_stream->conn_fake_s)); + } + else + { + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); + } } static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) @@ -177,6 +196,41 @@ static const char *errno_to_string(int err_num) } } +static void steering_out_log(struct tfe_stream_private *_stream, int len, int send_to_fake_c) +{ + if (_stream->is_decrypted_traffic_steering) + { + TFE_LOG_DEBUG(g_default_logger, "decrypted traffic steering, %s send %d byte to %s", + _stream->str_stream_addr, len, send_to_fake_c == 1 ? "conn_fake_c" : "conn_fake_s"); + + if (send_to_fake_c == 1) + { + TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, len); + } + else + { + TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, len); + } + } +} + +static void steering_in_log(struct tfe_stream_private *_stream, int len, int read_from_fake_c) +{ + if (_stream->is_decrypted_traffic_steering) + { + TFE_LOG_DEBUG(g_default_logger, "decrypted traffic steering, %s read %d bytes from %s", + _stream->str_stream_addr, len, read_from_fake_c == 1 ? "conn_fake_c" : "conn_fake_s"); + if (read_from_fake_c == 1) + { + TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_RX_B, len); + } + else + { + TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_RX_B, len); + } + } +} + /* ==================================================================================================================== * INTERFACE * ===================================================================================================================*/ @@ -257,14 +311,14 @@ struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_strea if (dir == CONN_DIR_DOWNSTREAM) { - this_conn = _stream->conn_downstream; - peer_conn = _stream->conn_upstream; + this_conn = __this_conn_for_write(_stream, CONN_DIR_DOWNSTREAM); + peer_conn = __peer_conn_for_read(_stream, CONN_DIR_DOWNSTREAM); ref_write_ctx = &_stream->w_ctx_downstream; } else { - this_conn = _stream->conn_upstream; - peer_conn = _stream->conn_downstream; + this_conn = __this_conn_for_write(_stream, CONN_DIR_UPSTREAM); + peer_conn = __peer_conn_for_read(_stream, CONN_DIR_UPSTREAM); ref_write_ctx = &_stream->w_ctx_upstream; } @@ -287,10 +341,11 @@ int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned ch return -EPIPE; } - struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); + struct tfe_conn_private *this_conn = __this_conn_for_write(w_ctx->_stream, w_ctx->dir); int ret = 0; if (this_conn != NULL) { + steering_out_log(w_ctx->_stream, size, w_ctx->dir == CONN_DIR_DOWNSTREAM); ret = bufferevent_write(this_conn->bev, data, size); } else @@ -308,8 +363,8 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) /* The connection terminated before this function call */ if (w_ctx->_stream == NULL) goto __out; - this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); - peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); + this_conn = __this_conn_for_write(w_ctx->_stream, w_ctx->dir); + peer_conn = __peer_conn_for_read(w_ctx->_stream, w_ctx->dir); if (this_conn != NULL) { @@ -483,13 +538,16 @@ static void __conn_private_destory_with_ssl(struct event_base * ev_base, static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) { - struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev)); + struct tfe_stream_private *_stream = (struct tfe_stream_private *)arg; + enum tfe_conn_dir dir = __bev_dir(_stream, bev); + struct tfe_conn_private *peer_conn = __peer_conn_for_write(_stream, dir); + + struct evbuffer *__input_buffer = bufferevent_get_input(bev); + int __input_length = evbuffer_get_length(__input_buffer); - struct evbuffer * __input_buffer = bufferevent_get_input(bev); if (peer_conn == NULL) { - evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer)); + evbuffer_drain(__input_buffer, __input_length); return; } @@ -500,6 +558,11 @@ static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg tfe_cmsg_set_flag(_stream->cmsg, TFE_CMSG_FLAG_USER0); } + if (_stream->is_decrypted_traffic_steering) + { + steering_out_log(_stream, __input_length, dir == CONN_DIR_DOWNSTREAM); + } + struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); evbuffer_add_buffer(__output_buffer, __input_buffer); } @@ -638,17 +701,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) struct tfe_conn_private * peer_conn = NULL; struct evbuffer * inbuf = NULL; struct evbuffer * outbuf = NULL; - int inbuff_len = 0; - - if (_stream->is_decrypted_traffic_steering) - { - peer_conn = __steering_peer_conn(_stream, dir); - } - else - { - peer_conn = __peer_conn(_stream, dir); - } + peer_conn = __peer_conn_for_write(_stream, dir); /* Peer connection is terminated, drain all data. * This connection will be destoryed in __event_cb */ inbuf = bufferevent_get_input(bev); @@ -760,22 +814,9 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) /* Total Bytes */ TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase); - if (_stream->is_decrypted_traffic_steering && action_final == ACTION_FORWARD_DATA) { - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes from %s to %s", - _stream->str_stream_addr, rx_offset_increase, - dir == CONN_DIR_DOWNSTREAM ? "conn_downstream" : "conn_upstream", - dir == CONN_DIR_DOWNSTREAM ? "conn_fake_c" : "conn_fake_s"); - - if (dir == CONN_DIR_DOWNSTREAM) - { - TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, rx_offset_increase); - } - else - { - TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, rx_offset_increase); - } + steering_out_log(_stream, rx_offset_increase, dir == CONN_DIR_DOWNSTREAM); } if(_stream->need_to_be_kill) @@ -819,10 +860,6 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) if (_stream->is_decrypted_traffic_steering) { - // TODO 增加计数 - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run writecb", - _stream->str_stream_addr, - bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream"); return; } @@ -1068,16 +1105,13 @@ static void __steering_stream_bev_readcb(struct bufferevent * bev, void * arg) struct tfe_stream_private * _stream = (struct tfe_stream_private *)arg; struct tfe_conn_private * peer_conn = NULL; - enum TFE_STAT_FIELD stat_filed = TFE_STAT_MAX; if (bev == _stream->conn_fake_c->bev) { peer_conn = _stream->conn_downstream; - stat_filed = STAT_STEERING_CLIENT_RX_B; } else if (bev == _stream->conn_fake_s->bev) { peer_conn = _stream->conn_upstream; - stat_filed = STAT_STEERING_SERVER_RX_B; } else { @@ -1088,32 +1122,22 @@ static void __steering_stream_bev_readcb(struct bufferevent * bev, void * arg) * Peer connection is terminated, drain all data. * This connection will be destoryed in __event_cb */ - struct evbuffer * inbuf = bufferevent_get_input(bev); + struct evbuffer *inbuf = bufferevent_get_input(bev); + int inlen = evbuffer_get_length(inbuf); if (peer_conn == NULL) { - evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); + evbuffer_drain(inbuf, inlen); return; } - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes form %s to %s", - _stream->str_stream_addr, - evbuffer_get_length(inbuf), - bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s", - bev == _stream->conn_fake_c->bev ? "conn_downstream" : "conn_upstream" - ); - - TFE_PROXY_STAT_INCREASE(stat_filed, evbuffer_get_length(inbuf)); - struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); + steering_in_log(_stream, inlen, bev == _stream->conn_fake_c->bev); + struct evbuffer *outbuf = bufferevent_get_output(peer_conn->bev); evbuffer_add_buffer(outbuf, inbuf); } static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg) { - struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run writecb", - _stream->str_stream_addr, - bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s"); + //struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; } static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg) @@ -1330,10 +1354,10 @@ void __stream_access_log_write(struct tfe_stream_private * stream) "%s/%s/%s ", __str_stream_log_type(ev_log->type), str_dir, ev_log->str_error); } - TFE_LOG_INFO(stream->stream_logger, "access", - "%d %d %d %s %s %s %s %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id, - stream->str_stream_addr, str_passthrough, str_kill, str_log_event, - stream->ssl_downstream_info_dump, stream->ssl_upstream_info_dump); + TFE_LOG_INFO(stream->stream_logger, "access %d %d %d %s %s %s %s %s %s", + stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id, + stream->str_stream_addr, str_passthrough, str_kill, str_log_event, + stream->ssl_downstream_info_dump, stream->ssl_upstream_info_dump); } static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}}; @@ -1744,8 +1768,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst if (_stream->is_decrypted_traffic_steering) { - __stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_DOWNSTREAM); - __stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_UPSTREAM); + __stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_UPSTREAM); + __stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_DOWNSTREAM); _stream->conn_fake_s = __conn_private_create_by_fake_fd(_stream, fd_fake_s); if (_stream->conn_fake_s == NULL) @@ -1875,7 +1899,7 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co vasprintf(&__tmp_buffer, fmt, arg_ptr); /* Log content with stream tag */ - TFE_LOG_INFO(_stream->stream_logger, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer); + TFE_LOG_INFO(_stream->stream_logger, "access %s %s", _stream->str_stream_addr, __tmp_buffer); free(__tmp_buffer); }