Decrypted Traffic Steering 回流回注测试通过
This commit is contained in:
@@ -583,13 +583,48 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
|
||||
{
|
||||
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
|
||||
enum tfe_conn_dir dir = __bev_dir(_stream, bev);
|
||||
struct tfe_conn_private * this_conn = NULL;
|
||||
struct tfe_conn_private * peer_conn = NULL;
|
||||
struct evbuffer * inbuf = NULL;
|
||||
struct evbuffer * outbuf = NULL;
|
||||
|
||||
UNUSED struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
|
||||
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
|
||||
if (_stream->proxy_ref->traffic_steering_options.enable)
|
||||
{
|
||||
if (bev == _stream->conn_downstream->bev)
|
||||
{
|
||||
this_conn = _stream->conn_downstream;
|
||||
peer_conn = _stream->conn_fake_c;
|
||||
}
|
||||
else if (bev == _stream->conn_upstream->bev)
|
||||
{
|
||||
this_conn = _stream->conn_upstream;
|
||||
peer_conn = _stream->conn_fake_s;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
|
||||
inbuf = bufferevent_get_input(bev);
|
||||
int data_len = evbuffer_get_length(inbuf);
|
||||
outbuf = bufferevent_get_output(peer_conn->bev);
|
||||
assert(inbuf != NULL && outbuf != NULL);
|
||||
evbuffer_add_buffer(outbuf, inbuf);
|
||||
|
||||
// TODO 增加计数
|
||||
|
||||
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, send %d bytes form %s to %s",
|
||||
data_len, bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream",
|
||||
bev == _stream->conn_downstream->bev ? "conn_fake_c" : "conn_fake_s");
|
||||
|
||||
return;
|
||||
}
|
||||
this_conn = __this_conn(_stream, dir);
|
||||
peer_conn = __peer_conn(_stream, dir);
|
||||
|
||||
/* Peer connection is terminated, drain all data.
|
||||
* This connection will be destoryed in __event_cb */
|
||||
struct evbuffer * inbuf = bufferevent_get_input(bev);
|
||||
inbuf = bufferevent_get_input(bev);
|
||||
size_t contigous_len = evbuffer_get_length(inbuf);
|
||||
if (peer_conn == NULL)
|
||||
{
|
||||
@@ -603,7 +638,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
|
||||
_stream->is_first_call_rxcb = 1;
|
||||
}
|
||||
|
||||
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
|
||||
outbuf = bufferevent_get_output(peer_conn->bev);
|
||||
assert(inbuf != NULL && outbuf != NULL);
|
||||
|
||||
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA;
|
||||
@@ -737,6 +772,14 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
|
||||
struct tfe_conn_private ** ref_this_conn{};
|
||||
struct tfe_conn_private ** ref_peer_conn{};
|
||||
struct ssl_stream ** ref_this_ssl_stream{};
|
||||
|
||||
if (_stream->proxy_ref->traffic_steering_options.enable)
|
||||
{
|
||||
// TODO 增加计数
|
||||
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run writecb", bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream");
|
||||
return;
|
||||
}
|
||||
|
||||
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
|
||||
|
||||
if (conn_dir == CONN_DIR_UPSTREAM)
|
||||
@@ -793,6 +836,13 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
|
||||
enum tfe_conn_dir peer_conn_dir{};
|
||||
size_t rx_offset = 0;
|
||||
|
||||
if (_stream->proxy_ref->traffic_steering_options.enable)
|
||||
{
|
||||
// TODO 增加计数
|
||||
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run eventcb", bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream");
|
||||
return;
|
||||
}
|
||||
|
||||
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
|
||||
{
|
||||
ref_this_conn = &_stream->conn_upstream;
|
||||
@@ -976,6 +1026,200 @@ __errout:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void __steering_stream_bev_readcb(struct bufferevent * bev, void * arg)
|
||||
{
|
||||
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
|
||||
struct tfe_conn_private * peer_conn = NULL;
|
||||
|
||||
if (bev == _stream->conn_fake_c->bev)
|
||||
{
|
||||
peer_conn = _stream->conn_downstream;
|
||||
}
|
||||
else if (bev == _stream->conn_fake_s->bev)
|
||||
{
|
||||
peer_conn = _stream->conn_upstream;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
|
||||
struct evbuffer * __input_buffer = bufferevent_get_input(bev);
|
||||
if (peer_conn == NULL)
|
||||
{
|
||||
evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer));
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO 增加计数
|
||||
|
||||
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, send %d bytes form %s to %s",
|
||||
evbuffer_get_length(__input_buffer), bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s",
|
||||
bev == _stream->conn_fake_c->bev ? "conn_downstream" : "conn_upstream"
|
||||
);
|
||||
|
||||
struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev);
|
||||
evbuffer_add_buffer(__output_buffer, __input_buffer);
|
||||
}
|
||||
|
||||
static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg)
|
||||
{
|
||||
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
|
||||
struct tfe_conn_private ** ref_this_conn{};
|
||||
struct tfe_conn_private ** ref_peer_conn{};
|
||||
|
||||
if (bev == _stream->conn_fake_c->bev)
|
||||
{
|
||||
ref_this_conn = &_stream->conn_fake_c;
|
||||
ref_peer_conn = &_stream->conn_downstream;
|
||||
}
|
||||
else if (bev == _stream->conn_fake_s->bev)
|
||||
{
|
||||
ref_this_conn = &_stream->conn_fake_s;
|
||||
ref_peer_conn = &_stream->conn_upstream;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
|
||||
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run write cb", bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s");
|
||||
|
||||
struct evbuffer * __output_buffer = bufferevent_get_output(bev);
|
||||
assert(__output_buffer != NULL);
|
||||
|
||||
// TODO 资源释放
|
||||
// TODO 资源释放
|
||||
// TODO 资源释放
|
||||
// TODO 资源释放
|
||||
|
||||
if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0)
|
||||
{
|
||||
__conn_private_destory(*ref_this_conn);
|
||||
*ref_this_conn = NULL;
|
||||
}
|
||||
|
||||
if (*ref_peer_conn == NULL && *ref_this_conn == NULL)
|
||||
{
|
||||
// TODO call_plugin_close(_stream);
|
||||
tfe_stream_destory(_stream);
|
||||
}
|
||||
}
|
||||
|
||||
static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg)
|
||||
{
|
||||
struct tfe_stream_private *_stream = (struct tfe_stream_private *)arg;
|
||||
struct tfe_conn_private **ref_this_conn{};
|
||||
struct tfe_conn_private **ref_peer_conn{};
|
||||
|
||||
if (bev == _stream->conn_fake_c->bev)
|
||||
{
|
||||
ref_this_conn = &_stream->conn_fake_c;
|
||||
ref_peer_conn = &_stream->conn_downstream;
|
||||
}
|
||||
else if (bev == _stream->conn_fake_s->bev)
|
||||
{
|
||||
ref_this_conn = &_stream->conn_fake_s;
|
||||
ref_peer_conn = &_stream->conn_upstream;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
|
||||
TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run event cb", bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s");
|
||||
|
||||
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
|
||||
{
|
||||
if (evbuffer_get_length(bufferevent_get_input(bev)))
|
||||
{
|
||||
__steering_stream_bev_readcb(bev, arg);
|
||||
}
|
||||
|
||||
if (events & BEV_EVENT_ERROR)
|
||||
{
|
||||
unsigned long err;
|
||||
while ((err = (bufferevent_get_openssl_error(bev))))
|
||||
{
|
||||
const char *msg = (const char *)ERR_reason_error_string(err);
|
||||
const char *lib = (const char *)ERR_lib_error_string(err);
|
||||
const char *func = (const char *)ERR_func_error_string(err);
|
||||
TFE_LOG_INFO(g_default_logger, "%s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", _stream->str_stream_addr, err, lib, func, msg);
|
||||
}
|
||||
|
||||
if (errno)
|
||||
{
|
||||
TFE_LOG_INFO(g_default_logger, "%s connection error, errno = %d, %s", _stream->str_stream_addr, errno, strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
goto __close_connection;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
__close_connection:
|
||||
// TODO 资源释放
|
||||
// TODO 资源释放
|
||||
// TODO 资源释放
|
||||
// TODO 资源释放
|
||||
if (*ref_peer_conn != NULL)
|
||||
{
|
||||
struct bufferevent *__peer_bev = (*ref_peer_conn)->bev;
|
||||
struct evbuffer *__peer_output_buffer = bufferevent_get_output(__peer_bev);
|
||||
|
||||
if (evbuffer_get_length(__peer_output_buffer) == 0)
|
||||
{
|
||||
__conn_private_destory(*ref_peer_conn);
|
||||
*ref_peer_conn = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (*ref_this_conn != NULL)
|
||||
{
|
||||
__conn_private_destory(*ref_this_conn);
|
||||
*ref_this_conn = NULL;
|
||||
}
|
||||
|
||||
if (*ref_this_conn == NULL && *ref_peer_conn == NULL)
|
||||
{
|
||||
// TODO call_plugin_close(_stream);
|
||||
tfe_stream_destory(_stream);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
static tfe_conn_private *__conn_private_create_by_fake_fd(struct tfe_stream_private *stream, evutil_socket_t fd)
|
||||
{
|
||||
struct tfe_conn_private *__conn_private = ALLOC(struct tfe_conn_private, 1);
|
||||
__conn_private->fd = fd;
|
||||
__conn_private->_stream_ref = stream;
|
||||
__conn_private->bev = bufferevent_socket_new(stream->thread_ref->evbase, fd, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE);
|
||||
if (!__conn_private->bev)
|
||||
{
|
||||
TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd);
|
||||
goto __errout;
|
||||
}
|
||||
|
||||
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
|
||||
bufferevent_setcb(__conn_private->bev, __steering_stream_bev_readcb, __steering_stream_bev_writecb, __steering_stream_bev_eventcb, stream);
|
||||
if (stream->proxy_ref->en_rate_limit)
|
||||
{
|
||||
conn_private_ratelimit_setup(__conn_private, &stream->proxy_ref->rate_limit_options);
|
||||
}
|
||||
|
||||
return __conn_private;
|
||||
|
||||
__errout:
|
||||
if (__conn_private != NULL)
|
||||
{
|
||||
free(__conn_private);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void __conn_private_enable(struct tfe_conn_private * conn_private)
|
||||
{
|
||||
assert(conn_private != NULL && conn_private->bev != NULL);
|
||||
@@ -1007,6 +1251,12 @@ void ssl_downstream_create_on_success(future_result_t * result, void * user)
|
||||
__conn_private_enable(_stream->conn_downstream);
|
||||
__conn_private_enable(_stream->conn_upstream);
|
||||
|
||||
if (_stream->proxy_ref->traffic_steering_options.enable)
|
||||
{
|
||||
__conn_private_enable(_stream->conn_fake_c);
|
||||
__conn_private_enable(_stream->conn_fake_s);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1183,6 +1433,28 @@ void tfe_stream_destory(struct tfe_stream_private * stream)
|
||||
__conn_private_destory(stream->conn_downstream);
|
||||
}
|
||||
|
||||
if (stream->conn_fake_c)
|
||||
{
|
||||
assert(stream->fd_fake_c <= 0);
|
||||
__conn_private_destory(stream->conn_fake_c);
|
||||
}
|
||||
|
||||
if (stream->conn_fake_s)
|
||||
{
|
||||
assert(stream->fd_fake_s <= 0);
|
||||
__conn_private_destory(stream->conn_fake_s);
|
||||
}
|
||||
|
||||
if (stream->fd_fake_c)
|
||||
{
|
||||
evutil_closesocket(stream->fd_fake_c);
|
||||
}
|
||||
|
||||
if (stream->fd_fake_s)
|
||||
{
|
||||
evutil_closesocket(stream->fd_fake_s);
|
||||
}
|
||||
|
||||
if (stream->defer_fd_downstream)
|
||||
{
|
||||
evutil_closesocket(stream->defer_fd_downstream);
|
||||
@@ -1458,7 +1730,7 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket
|
||||
|
||||
}
|
||||
|
||||
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
|
||||
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, evutil_socket_t fd_fake_c, evutil_socket_t fd_fake_s)
|
||||
{
|
||||
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
|
||||
|
||||
@@ -1466,6 +1738,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
|
||||
_stream->defer_fd_upstream = fd_upstream;
|
||||
_stream->log_fd_downstream = fd_downstream;
|
||||
_stream->log_fd_upstream = fd_upstream;
|
||||
_stream->fd_fake_c = fd_fake_c;
|
||||
_stream->fd_fake_s = fd_fake_s;
|
||||
|
||||
_stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM);
|
||||
if (unlikely(_stream->head.addr == NULL))
|
||||
@@ -1480,6 +1754,35 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
|
||||
__stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM);
|
||||
__stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM);
|
||||
|
||||
if (_stream->proxy_ref->traffic_steering_options.enable)
|
||||
{
|
||||
__stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_DOWNSTREAM);
|
||||
__stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_UPSTREAM);
|
||||
|
||||
_stream->conn_fake_s = __conn_private_create_by_fake_fd(_stream, fd_fake_s);
|
||||
if (_stream->conn_fake_s == NULL)
|
||||
{
|
||||
goto __errout;
|
||||
}
|
||||
_stream->fd_fake_s = 0;
|
||||
|
||||
_stream->conn_fake_c = __conn_private_create_by_fake_fd(_stream, fd_fake_c);
|
||||
if (_stream->conn_fake_c == NULL)
|
||||
{
|
||||
goto __errout;
|
||||
}
|
||||
_stream->fd_fake_c = 0;
|
||||
|
||||
assert(_stream->conn_fake_s != NULL);
|
||||
assert(_stream->conn_fake_c != NULL);
|
||||
|
||||
// enable on upsteam and downsteam success
|
||||
// __conn_private_enable(_stream->conn_fake_s);
|
||||
// __conn_private_enable(_stream->conn_fake_c);
|
||||
|
||||
// TFE_PROXY_STAT_INCREASE(STAT_STREAM_STEERING, 1);
|
||||
}
|
||||
|
||||
if (_stream->session_type == STREAM_PROTO_PLAIN)
|
||||
{
|
||||
_stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream);
|
||||
@@ -1508,6 +1811,12 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
|
||||
__conn_private_enable(_stream->conn_downstream);
|
||||
__conn_private_enable(_stream->conn_upstream);
|
||||
|
||||
if (_stream->proxy_ref->traffic_steering_options.enable)
|
||||
{
|
||||
__conn_private_enable(_stream->conn_fake_s);
|
||||
__conn_private_enable(_stream->conn_fake_c);
|
||||
}
|
||||
|
||||
TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_PLAIN, 1);
|
||||
}
|
||||
|
||||
@@ -1525,6 +1834,7 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
|
||||
TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1);
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
|
||||
__errout:
|
||||
|
||||
Reference in New Issue
Block a user