TSG-15328 TLS Decrypted的HTTP2流量未Steering给第三方设备

TSG-15329 命中Intercept & Decrypted Service Chaining Policy且开启tcp_passthrough的流量仍Steering给第三方设备
This commit is contained in:
luwenpeng
2023-06-05 14:34:21 +08:00
parent 409dfb7e4b
commit cb39660a1a

View File

@@ -81,19 +81,38 @@ static inline struct tfe_stream_private * to_stream_private(const struct tfe_str
return container_of(stream, struct tfe_stream_private, head); return container_of(stream, struct tfe_stream_private, head);
} }
static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) static inline struct tfe_conn_private *__this_conn_for_read(struct tfe_stream_private *_stream, enum tfe_conn_dir dir)
{ {
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream));
} }
static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) static inline struct tfe_conn_private *__this_conn_for_write(struct tfe_stream_private *_stream, enum tfe_conn_dir dir)
{
if (_stream->is_decrypted_traffic_steering)
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_s) : (_stream->conn_fake_c));
}
else
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream));
}
}
static inline struct tfe_conn_private *__peer_conn_for_read(struct tfe_stream_private *_stream, enum tfe_conn_dir dir)
{ {
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
} }
static inline struct tfe_conn_private *__steering_peer_conn(struct tfe_stream_private *_stream, enum tfe_conn_dir dir) static inline struct tfe_conn_private *__peer_conn_for_write(struct tfe_stream_private *_stream, enum tfe_conn_dir dir)
{ {
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_c) : (_stream->conn_fake_s)); if (_stream->is_decrypted_traffic_steering)
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_fake_c) : (_stream->conn_fake_s));
}
else
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
}
} }
static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev)
@@ -177,6 +196,41 @@ static const char *errno_to_string(int err_num)
} }
} }
static void steering_out_log(struct tfe_stream_private *_stream, int len, int send_to_fake_c)
{
if (_stream->is_decrypted_traffic_steering)
{
TFE_LOG_DEBUG(g_default_logger, "decrypted traffic steering, %s send %d byte to %s",
_stream->str_stream_addr, len, send_to_fake_c == 1 ? "conn_fake_c" : "conn_fake_s");
if (send_to_fake_c == 1)
{
TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, len);
}
else
{
TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, len);
}
}
}
static void steering_in_log(struct tfe_stream_private *_stream, int len, int read_from_fake_c)
{
if (_stream->is_decrypted_traffic_steering)
{
TFE_LOG_DEBUG(g_default_logger, "decrypted traffic steering, %s read %d bytes from %s",
_stream->str_stream_addr, len, read_from_fake_c == 1 ? "conn_fake_c" : "conn_fake_s");
if (read_from_fake_c == 1)
{
TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_RX_B, len);
}
else
{
TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_RX_B, len);
}
}
}
/* ==================================================================================================================== /* ====================================================================================================================
* INTERFACE * INTERFACE
* ===================================================================================================================*/ * ===================================================================================================================*/
@@ -257,14 +311,14 @@ struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_strea
if (dir == CONN_DIR_DOWNSTREAM) if (dir == CONN_DIR_DOWNSTREAM)
{ {
this_conn = _stream->conn_downstream; this_conn = __this_conn_for_write(_stream, CONN_DIR_DOWNSTREAM);
peer_conn = _stream->conn_upstream; peer_conn = __peer_conn_for_read(_stream, CONN_DIR_DOWNSTREAM);
ref_write_ctx = &_stream->w_ctx_downstream; ref_write_ctx = &_stream->w_ctx_downstream;
} }
else else
{ {
this_conn = _stream->conn_upstream; this_conn = __this_conn_for_write(_stream, CONN_DIR_UPSTREAM);
peer_conn = _stream->conn_downstream; peer_conn = __peer_conn_for_read(_stream, CONN_DIR_UPSTREAM);
ref_write_ctx = &_stream->w_ctx_upstream; ref_write_ctx = &_stream->w_ctx_upstream;
} }
@@ -287,10 +341,11 @@ int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned ch
return -EPIPE; return -EPIPE;
} }
struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private *this_conn = __this_conn_for_write(w_ctx->_stream, w_ctx->dir);
int ret = 0; int ret = 0;
if (this_conn != NULL) if (this_conn != NULL)
{ {
steering_out_log(w_ctx->_stream, size, w_ctx->dir == CONN_DIR_DOWNSTREAM);
ret = bufferevent_write(this_conn->bev, data, size); ret = bufferevent_write(this_conn->bev, data, size);
} }
else else
@@ -308,8 +363,8 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
/* The connection terminated before this function call */ /* The connection terminated before this function call */
if (w_ctx->_stream == NULL) goto __out; if (w_ctx->_stream == NULL) goto __out;
this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); this_conn = __this_conn_for_write(w_ctx->_stream, w_ctx->dir);
peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); peer_conn = __peer_conn_for_read(w_ctx->_stream, w_ctx->dir);
if (this_conn != NULL) if (this_conn != NULL)
{ {
@@ -483,13 +538,16 @@ static void __conn_private_destory_with_ssl(struct event_base * ev_base,
static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private *_stream = (struct tfe_stream_private *)arg;
struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev)); enum tfe_conn_dir dir = __bev_dir(_stream, bev);
struct tfe_conn_private *peer_conn = __peer_conn_for_write(_stream, dir);
struct evbuffer *__input_buffer = bufferevent_get_input(bev);
int __input_length = evbuffer_get_length(__input_buffer);
struct evbuffer * __input_buffer = bufferevent_get_input(bev);
if (peer_conn == NULL) if (peer_conn == NULL)
{ {
evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer)); evbuffer_drain(__input_buffer, __input_length);
return; return;
} }
@@ -500,6 +558,11 @@ static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg
tfe_cmsg_set_flag(_stream->cmsg, TFE_CMSG_FLAG_USER0); tfe_cmsg_set_flag(_stream->cmsg, TFE_CMSG_FLAG_USER0);
} }
if (_stream->is_decrypted_traffic_steering)
{
steering_out_log(_stream, __input_length, dir == CONN_DIR_DOWNSTREAM);
}
struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev);
evbuffer_add_buffer(__output_buffer, __input_buffer); evbuffer_add_buffer(__output_buffer, __input_buffer);
} }
@@ -638,17 +701,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
struct tfe_conn_private * peer_conn = NULL; struct tfe_conn_private * peer_conn = NULL;
struct evbuffer * inbuf = NULL; struct evbuffer * inbuf = NULL;
struct evbuffer * outbuf = NULL; struct evbuffer * outbuf = NULL;
int inbuff_len = 0;
if (_stream->is_decrypted_traffic_steering)
{
peer_conn = __steering_peer_conn(_stream, dir);
}
else
{
peer_conn = __peer_conn(_stream, dir);
}
peer_conn = __peer_conn_for_write(_stream, dir);
/* Peer connection is terminated, drain all data. /* Peer connection is terminated, drain all data.
* This connection will be destoryed in __event_cb */ * This connection will be destoryed in __event_cb */
inbuf = bufferevent_get_input(bev); inbuf = bufferevent_get_input(bev);
@@ -760,22 +814,9 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
/* Total Bytes */ /* Total Bytes */
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase); TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase);
if (_stream->is_decrypted_traffic_steering && action_final == ACTION_FORWARD_DATA) if (_stream->is_decrypted_traffic_steering && action_final == ACTION_FORWARD_DATA)
{ {
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes from %s to %s", steering_out_log(_stream, rx_offset_increase, dir == CONN_DIR_DOWNSTREAM);
_stream->str_stream_addr, rx_offset_increase,
dir == CONN_DIR_DOWNSTREAM ? "conn_downstream" : "conn_upstream",
dir == CONN_DIR_DOWNSTREAM ? "conn_fake_c" : "conn_fake_s");
if (dir == CONN_DIR_DOWNSTREAM)
{
TFE_PROXY_STAT_INCREASE(STAT_STEERING_CLIENT_TX_B, rx_offset_increase);
}
else
{
TFE_PROXY_STAT_INCREASE(STAT_STEERING_SERVER_TX_B, rx_offset_increase);
}
} }
if(_stream->need_to_be_kill) if(_stream->need_to_be_kill)
@@ -819,10 +860,6 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
if (_stream->is_decrypted_traffic_steering) if (_stream->is_decrypted_traffic_steering)
{ {
// TODO 增加计数
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run writecb",
_stream->str_stream_addr,
bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream");
return; return;
} }
@@ -1068,16 +1105,13 @@ static void __steering_stream_bev_readcb(struct bufferevent * bev, void * arg)
struct tfe_stream_private * _stream = (struct tfe_stream_private *)arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *)arg;
struct tfe_conn_private * peer_conn = NULL; struct tfe_conn_private * peer_conn = NULL;
enum TFE_STAT_FIELD stat_filed = TFE_STAT_MAX;
if (bev == _stream->conn_fake_c->bev) if (bev == _stream->conn_fake_c->bev)
{ {
peer_conn = _stream->conn_downstream; peer_conn = _stream->conn_downstream;
stat_filed = STAT_STEERING_CLIENT_RX_B;
} }
else if (bev == _stream->conn_fake_s->bev) else if (bev == _stream->conn_fake_s->bev)
{ {
peer_conn = _stream->conn_upstream; peer_conn = _stream->conn_upstream;
stat_filed = STAT_STEERING_SERVER_RX_B;
} }
else else
{ {
@@ -1088,32 +1122,22 @@ static void __steering_stream_bev_readcb(struct bufferevent * bev, void * arg)
* Peer connection is terminated, drain all data. * Peer connection is terminated, drain all data.
* This connection will be destoryed in __event_cb * This connection will be destoryed in __event_cb
*/ */
struct evbuffer * inbuf = bufferevent_get_input(bev); struct evbuffer *inbuf = bufferevent_get_input(bev);
int inlen = evbuffer_get_length(inbuf);
if (peer_conn == NULL) if (peer_conn == NULL)
{ {
evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); evbuffer_drain(inbuf, inlen);
return; return;
} }
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s send %d bytes form %s to %s", steering_in_log(_stream, inlen, bev == _stream->conn_fake_c->bev);
_stream->str_stream_addr, struct evbuffer *outbuf = bufferevent_get_output(peer_conn->bev);
evbuffer_get_length(inbuf),
bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s",
bev == _stream->conn_fake_c->bev ? "conn_downstream" : "conn_upstream"
);
TFE_PROXY_STAT_INCREASE(stat_filed, evbuffer_get_length(inbuf));
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
evbuffer_add_buffer(outbuf, inbuf); evbuffer_add_buffer(outbuf, inbuf);
} }
static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg) static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; //struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s %s run writecb",
_stream->str_stream_addr,
bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s");
} }
static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg) static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg)
@@ -1330,10 +1354,10 @@ void __stream_access_log_write(struct tfe_stream_private * stream)
"%s/%s/%s ", __str_stream_log_type(ev_log->type), str_dir, ev_log->str_error); "%s/%s/%s ", __str_stream_log_type(ev_log->type), str_dir, ev_log->str_error);
} }
TFE_LOG_INFO(stream->stream_logger, "access", TFE_LOG_INFO(stream->stream_logger, "access %d %d %d %s %s %s %s %s %s",
"%d %d %d %s %s %s %s %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id, stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id,
stream->str_stream_addr, str_passthrough, str_kill, str_log_event, stream->str_stream_addr, str_passthrough, str_kill, str_log_event,
stream->ssl_downstream_info_dump, stream->ssl_upstream_info_dump); stream->ssl_downstream_info_dump, stream->ssl_upstream_info_dump);
} }
static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}}; static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}};
@@ -1744,8 +1768,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
if (_stream->is_decrypted_traffic_steering) if (_stream->is_decrypted_traffic_steering)
{ {
__stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_DOWNSTREAM); __stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_UPSTREAM);
__stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_UPSTREAM); __stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_DOWNSTREAM);
_stream->conn_fake_s = __conn_private_create_by_fake_fd(_stream, fd_fake_s); _stream->conn_fake_s = __conn_private_create_by_fake_fd(_stream, fd_fake_s);
if (_stream->conn_fake_s == NULL) if (_stream->conn_fake_s == NULL)
@@ -1875,7 +1899,7 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co
vasprintf(&__tmp_buffer, fmt, arg_ptr); vasprintf(&__tmp_buffer, fmt, arg_ptr);
/* Log content with stream tag */ /* Log content with stream tag */
TFE_LOG_INFO(_stream->stream_logger, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer); TFE_LOG_INFO(_stream->stream_logger, "access %s %s", _stream->str_stream_addr, __tmp_buffer);
free(__tmp_buffer); free(__tmp_buffer);
} }