diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 7df5883..1de3f79 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -91,6 +91,11 @@ static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); } +static inline struct tfe_conn_private *__steering_peer_conn(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) +{ + return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_c) : (_stream->conn_fake_s)); +} + static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) { if (_stream->conn_downstream && bev == _stream->conn_downstream->bev) @@ -641,7 +646,6 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __bev_dir(_stream, bev); - struct tfe_conn_private * this_conn = NULL; struct tfe_conn_private * peer_conn = NULL; struct evbuffer * inbuf = NULL; struct evbuffer * outbuf = NULL; @@ -649,71 +653,12 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) if (_stream->is_decrypted_traffic_steering) { - if (bev == _stream->conn_downstream->bev) - { - peer_conn = _stream->conn_fake_c; - } - else if (bev == _stream->conn_upstream->bev) - { - peer_conn = _stream->conn_fake_s; - } - else - { - assert(0); - } - - if (_stream->is_first_call_rxcb == 0) - { - TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1); - _stream->is_first_call_rxcb = 1; - tfe_set_intercept_metric(&_stream->head, 1, 0, 0, 0, 0); - } - - /* - * Peer connection is terminated, drain all data. - * This connection will be destoryed in __event_cb - */ - inbuf = bufferevent_get_input(bev); - inbuff_len = evbuffer_get_length(inbuf); - if (peer_conn == NULL) - { - evbuffer_drain(inbuf, inbuff_len); - return; - } - - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes from %s to %s", - _stream->str_stream_addr, - inbuff_len, - bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream", - bev == _stream->conn_downstream->bev ? "conn_fake_c" : "conn_fake_s"); - - outbuf = bufferevent_get_output(peer_conn->bev); - evbuffer_add_buffer(outbuf, inbuf); - - if (bev == _stream->conn_downstream->bev) - { - TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, inbuff_len); - // TODO: Delete the following code when support calling the tfe-plugin - TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, inbuff_len); - tfe_set_intercept_metric(&_stream->head, 0, 1, inbuff_len, 0, 0); - _stream->downstream_rx_offset += inbuff_len; - } - else - { - TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, inbuff_len); - // TODO: Delete the following code when support calling the tfe-plugin - TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, inbuff_len); - tfe_set_intercept_metric(&_stream->head, 0, 0, 0, 1, inbuff_len); - _stream->upstream_rx_offset += inbuff_len; - } - - // TODO: Delete the following code when support calling the tfe-plugin - TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, inbuff_len); - return; + peer_conn = __steering_peer_conn(_stream, dir); + } + else + { + peer_conn = __peer_conn(_stream, dir); } - - this_conn = __this_conn(_stream, dir); - peer_conn = __peer_conn(_stream, dir); /* Peer connection is terminated, drain all data. * This connection will be destoryed in __event_cb */ @@ -830,6 +775,23 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) /* Total Bytes */ TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase); + if (_stream->is_decrypted_traffic_steering && action_final == ACTION_FORWARD_DATA) + { + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes from %s to %s", + _stream->str_stream_addr, rx_offset_increase, + dir == CONN_DIR_DOWNSTREAM ? "conn_downstream" : "conn_upstream", + dir == CONN_DIR_DOWNSTREAM ? "conn_fake_c" : "conn_fake_s"); + + if (dir == CONN_DIR_DOWNSTREAM) + { + TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, rx_offset_increase); + } + else + { + TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, rx_offset_increase); + } + } + if(_stream->need_to_be_kill) { const static struct linger sl{.l_onoff = 1, .l_linger = 0}; @@ -933,7 +895,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * const char * str_conn_dir = __str_dir(conn_dir); enum tfe_conn_dir peer_conn_dir{}; size_t rx_offset = 0; - int need_close_connection = 0; if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { @@ -1003,23 +964,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL); } - need_close_connection = 1; - } - - if (_stream->is_decrypted_traffic_steering) - { - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run eventcb, %s %s", - _stream->str_stream_addr, - bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream", - bev_event_to_string(events), - errno_to_string(errno) - ); - tfe_stream_destory(_stream); - return; - } - - if (need_close_connection) - { goto __close_connection; } @@ -1189,15 +1133,14 @@ static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg) static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg) { struct tfe_stream_private *_stream = (struct tfe_stream_private *)arg; + + TFE_LOG_ERROR(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run eventcb, %s %s", + _stream->str_stream_addr, + bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s", + bev_event_to_string(events), + errno_to_string(errno)); - TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run eventcb, %s %s", - _stream->str_stream_addr, - bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s", - bev_event_to_string(events), - errno_to_string(errno) - ); - - enum tfe_conn_dir conn_dir = (bev == _stream->conn_fake_c->bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; + enum tfe_conn_dir conn_dir = (bev == _stream->conn_fake_c->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM; if (events & BEV_EVENT_ERROR) { __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FACKFD_ERROR, conn_dir, errno, errno_to_string(errno)); @@ -1206,6 +1149,8 @@ static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, { __stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FACKFD_EOF, conn_dir, errno, errno_to_string(errno)); } + + call_plugin_close(_stream); tfe_stream_destory(_stream); } @@ -1389,7 +1334,7 @@ void __stream_access_log_write(struct tfe_stream_private * stream) break; case EVENT_LOG_CLOSE_BY_FACKFD_EOF: /* FALLTHROUGH */ case EVENT_LOG_CLOSE_BY_FACKFD_ERROR: /* FALLTHROUGH */ - str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "SERVER" : "CLIENT"; + str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "CLIENT" : "SERVER"; break; default: str_dir = ""; @@ -1421,11 +1366,11 @@ void __ev_log_to_stat_map_init() ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR; ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR; - ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STEERING_SERVER_EOF; - ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_UPSTREAM] = STAT_STEERING_CLIENT_EOF; - - ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STEERING_SERVER_ERR; - ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_UPSTREAM] = STAT_STEERING_CLIENT_ERR; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STEERING_CLIENT_EOF; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_EOF][CONN_DIR_UPSTREAM] = STAT_STEERING_SERVER_EOF; + + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STEERING_CLIENT_ERR; + ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FACKFD_ERROR][CONN_DIR_UPSTREAM] = STAT_STEERING_SERVER_ERR; } void __stream_close_stat(struct tfe_stream_private * stream) @@ -1886,7 +1831,6 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1); } - return 0; __errout: