This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/platform/src/tcp_stream.cpp

649 lines
18 KiB
C++
Raw Normal View History

#include <netinet/in.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/dns.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/rand.h>
#include <openssl/x509.h>
#include <openssl/x509v3.h>
#include <tfe_stream.h>
#include <tfe_utils.h>
#include <tfe_future.h>
#include <tfe_plugin.h>
#include <platform.h>
#include <ssl_stream.h>
#include <tcp_stream.h>
#include <proxy.h>
#ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT
#define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024)
#endif
/* forward declaration of libevent callbacks */
static void __stream_bev_readcb(struct bufferevent *, void *);
static void __stream_bev_writecb(struct bufferevent *, void *);
static void __stream_bev_eventcb(struct bufferevent *, short, void *);
static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream)
{
return container_of(stream, struct tfe_stream_private, head);
}
static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream));
}
static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
}
static inline enum tfe_conn_dir __BEV_DIR(struct tfe_stream_private * _stream, struct bufferevent * bev)
{
return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM);
}
static inline bool __IS_SSL(struct tfe_stream_private * _stream)
{
return (_stream->session_type == STREAM_PROTO_SSL);
}
void tfe_stream_detach(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream);
int plug_id = _stream->calling_idx;
_stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED;
return;
}
int tfe_stream_preempt(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream);
int plug_id = _stream->calling_idx;
int i = 0;
for (i = 0; i < _stream->plugin_num; i++)
{
if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION)
{
return -1;
}
}
_stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION;
return 0;
}
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
if (this_conn->on_writing == 1)
{
return NULL;
}
this_conn->w_ctx.dir = dir;
this_conn->w_ctx._stream = _stream;
this_conn->on_writing = 1;
bufferevent_disable(peer_conn->bev, EV_READ);
return &(this_conn->w_ctx);
}
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{
struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);;
int ret = bufferevent_write(this_conn->bev, data, size);
return ret;
}
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{
struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(w_ctx->_stream, w_ctx->dir);
this_conn->on_writing = 0;
bufferevent_enable(peer_conn->bev, EV_READ);
return;
}
int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size)
{
int ret = 0;
struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir);
ret = tfe_stream_write_frag(wctx, data, size);
tfe_stream_write_frag_end(wctx);
return ret;
}
2018-09-05 10:38:27 +08:00
static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
__conn_private->bev = bev;
__conn_private->fd = bufferevent_getfd(bev);
bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream);
bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE);
return __conn_private;
}
evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn)
{
evutil_socket_t __to_release_fd = conn->fd;
conn->fd = 0;
return __to_release_fd;
}
static void __conn_private_destory(struct tfe_conn_private * conn)
{
bufferevent_disable(conn->bev, EV_READ | EV_WRITE);
bufferevent_free(conn->bev);
if (conn->fd > 0) evutil_closesocket(conn->fd);
free(conn);
}
static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, __BEV_DIR(_stream, bev));
struct evbuffer * __input_buffer = bufferevent_get_input(bev);
if (peer_conn == NULL)
{
evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer));
return;
}
struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev);
evbuffer_add_buffer(__output_buffer, __input_buffer);
}
static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
}
if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
}
struct evbuffer * __output_buffer = bufferevent_get_output(bev);
assert(__output_buffer != NULL);
if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0)
{
__conn_private_destory(*ref_this_conn);
*ref_this_conn = NULL;
}
if (*ref_peer_conn == NULL && *ref_this_conn == NULL)
{
tfe_stream_destory(_stream);
}
return;
}
static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short events, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
}
if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
{
if (evbuffer_get_length(bufferevent_get_input(bev)))
{
__stream_bev_passthrough_readcb(bev, arg);
}
goto __close_connection;
}
return;
__close_connection:
if (*ref_peer_conn != NULL)
{
struct bufferevent * __peer_bev = (*ref_peer_conn)->bev;
struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev);
if (evbuffer_get_length(__peer_output_buffer) == 0)
{
__conn_private_destory(*ref_peer_conn);
*ref_peer_conn = NULL;
}
}
if (*ref_this_conn != NULL)
{
__conn_private_destory(*ref_this_conn);
*ref_this_conn = NULL;
}
if (*ref_this_conn == NULL && *ref_peer_conn == NULL)
{
tfe_stream_destory(_stream);
}
return;
}
/*
* Callback for read events on the up- and downstream connection bufferevents.
* Called when there is data ready in the input evbuffer.
*/
static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __BEV_DIR(_stream, bev);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
2018-09-05 10:38:27 +08:00
int i = 0;
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA;
const struct tfe_plugin * plugins = _stream->thread_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thread_ref->nr_modules;
struct evbuffer * inbuf = bufferevent_get_input(bev);
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0;
const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len);
_stream->defere_bytes = 0;
_stream->drop_bytes = 0;
_stream->forward_bytes = 0;
for (i = 0; i < plug_num; i++)
{
_stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i;
if (_stream->is_plugin_opened == 0)
{
plugins[i].on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme));
_stream->is_plugin_opened = 1;
}
else
{
action_tmp = plugins[i].on_data(&_stream->head, _stream->thread_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme));
}
if (plug_ctx->state == PLUG_STATE_PREEPTION)
{
action_final = action_tmp;
}
}
switch (action_final)
{
case ACTION_FORWARD_DATA:
if (_stream->forward_bytes > 0)
{
evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes);
}
else
{
evbuffer_add_buffer(outbuf, inbuf);
}
break;
case ACTION_DROP_DATA:
if (_stream->drop_bytes > 0)
{
drain_size = _stream->drop_bytes;
}
else
{
drain_size = evbuffer_get_length(inbuf);
}
evbuffer_drain(inbuf, drain_size);
case ACTION_DEFER_DATA:
if (_stream->defere_bytes > 0)
{
bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0);
}
break;
default: assert(0);
break;
}
if (evbuffer_get_length(inbuf) != 0)
{
bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
{
bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2,
TFE_CONFIG_OUTPUT_LIMIT_DEFAULT);
bufferevent_disable(bev, EV_READ);
}
return;
}
/*
* Callback for write events on the up- and downstream connection bufferevents.
* Called when either all data from the output evbuffer has been written,
* or if the outbuf is only half full again after having been full.
*/
static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __BEV_DIR(_stream, bev);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
2018-09-05 10:38:27 +08:00
// struct evbuffer * outbuf = bufferevent_get_output(bev);
if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ))
{
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(peer_conn->bev, EV_READ);
}
}
/*
* Callback for meta events on the up- and downstream connection bufferevents.
* Called when EOF has been reached, a connection has been made, and on errors.
*/
static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __BEV_DIR(_stream, bev);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
const struct tfe_plugin * plugins = _stream->thread_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thread_ref->nr_modules, i = 0;
enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED;
if (events & BEV_EVENT_ERROR)
{
this_conn->closed = 1;
reason = REASON_ERROR;
if(__IS_SSL(_stream))
{
ssl_stream_log_error(bev, dir, __STREAM_LOGGER(_stream));
}
goto call_plugin_close;
}
if (events & BEV_EVENT_EOF)
{
//generate a 0 size read callback to notify plugins.
__stream_bev_readcb(bev, arg);
this_conn->closed = 1;
}
if (peer_conn->closed == 1 && this_conn->closed == 1)
{
reason = REASON_PASSIVE_CLOSED;
goto call_plugin_close;
}
return;
call_plugin_close:
for (i = 0; i < plug_num; i++)
{
_stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i;
plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme));
}
tfe_stream_destory(_stream);
return;
}
2018-09-05 10:38:27 +08:00
static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
struct event_base * __ev_base = stream->thread_ref->evbase;
__conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS);
__conn_private->fd = fd;
if (!__conn_private->bev)
{
TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd);
goto __errout;
}
if (stream->passthough)
{
bufferevent_setcb(__conn_private->bev, __stream_bev_passthrough_readcb,
__stream_bev_passthrough_writecb, __stream_bev_passthrough_eventcb, stream);
}
else
{
bufferevent_setcb(__conn_private->bev, __stream_bev_readcb,
__stream_bev_writecb, __stream_bev_eventcb, stream);
}
return __conn_private;
__errout:
if (__conn_private != NULL) free(__conn_private);
return NULL;
}
2018-08-31 14:32:34 +08:00
void __conn_private_enable(struct tfe_conn_private * conn_private)
{
assert(conn_private != NULL && conn_private->bev != NULL);
bufferevent_enable(conn_private->bev, EV_READ | EV_WRITE);
}
void ssl_downstream_create_on_success(future_result_t * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
struct ssl_stream * downstream = ssl_downstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_downstream_create_result_release_bev(result);
2018-09-05 10:38:27 +08:00
_stream->conn_downstream = __conn_private_create_by_bev(_stream, bev);
_stream->ssl_downstream = downstream;
future_destroy(_stream->future_downstream_create);
_stream->future_downstream_create = NULL;
_stream->defer_fd_downstream = 0;
2018-08-31 14:32:34 +08:00
assert(_stream->conn_downstream != NULL && _stream->conn_upstream != NULL);
__conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream);
return;
}
void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
return;
}
void ssl_upstream_create_on_success(future_result_t * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
struct event_base * ev_base = _stream->thread_ref->evbase;
struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_upstream_create_result_release_bev(result);
assert(upstream != NULL && bev != NULL);
/* Create connection ctx by bev */
2018-09-05 10:38:27 +08:00
_stream->conn_upstream = __conn_private_create_by_bev(_stream, bev);
_stream->ssl_upstream = upstream;
future_destroy(_stream->future_upstream_create);
_stream->future_upstream_create = NULL;
_stream->defer_fd_upstream = 0;
/* Next, create downstream */
_stream->future_downstream_create = future_create("ssl_down",ssl_downstream_create_on_success,
ssl_downstream_create_on_fail, _stream);
ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr,
_stream->ssl_upstream, _stream->defer_fd_downstream, /* KEYRING ID */ 0, ev_base);
}
void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
assert(0);
}
struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx)
{
struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1);
_stream->thread_ref = thread_ctx;
_stream->proxy_ref = pxy;
return (struct tfe_stream *) &_stream->head;
}
void tfe_stream_destory(struct tfe_stream_private * stream)
{
struct tfe_thread_ctx * thread = stream->thread_ref;
struct event_base * ev_base = thread->evbase;
if (__IS_SSL(stream) && stream->ssl_upstream)
{
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream);
ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd);
}
if (__IS_SSL(stream) && stream->ssl_downstream)
{
2018-09-04 18:13:05 +08:00
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_downstream);
ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd);
}
if (stream->conn_upstream)
{
__conn_private_destory(stream->conn_upstream);
}
if (stream->conn_downstream)
{
__conn_private_destory(stream->conn_downstream);
}
if (stream->defer_fd_downstream)
{
evutil_closesocket(stream->defer_fd_downstream);
}
if (stream->defer_fd_upstream)
{
evutil_closesocket(stream->defer_fd_upstream);
}
if (stream->future_downstream_create)
{
future_destroy(stream->future_downstream_create);
}
if (stream->future_upstream_create)
{
future_destroy(stream->future_upstream_create);
}
2018-09-05 10:38:27 +08:00
stream->proxy_ref=NULL;
free(stream);
thread->load--;
}
void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
struct event_base * ev_base = _stream->thread_ref->evbase;
_stream->defer_fd_downstream = fd_downstream;
_stream->defer_fd_upstream = fd_upstream;
evutil_make_socket_nonblocking(fd_downstream);
evutil_make_socket_nonblocking(fd_upstream);
if (_stream->session_type == STREAM_PROTO_PLAIN)
{
2018-09-05 10:38:27 +08:00
_stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream);
_stream->conn_upstream = __conn_private_create_by_fd(_stream, fd_upstream);
assert(_stream->conn_downstream != NULL);
assert(_stream->conn_upstream != NULL);
2018-08-31 14:32:34 +08:00
__conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream);
}
if (_stream->session_type == STREAM_PROTO_SSL)
{
_stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler;
_stream->future_upstream_create = future_create("ssl_up",
ssl_upstream_create_on_success, ssl_upstream_create_on_fail, (void *) _stream);
/* Defer setup conn_downstream & conn_upstream in async callbacks. */
ssl_async_upstream_create(_stream->future_upstream_create,
_stream->ssl_mgr, fd_upstream, fd_downstream, ev_base);
}
return;
}
int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
if (opt == TFE_STREAM_OPT_SESSION_TYPE)
{
assert(sz_arg == sizeof(enum tfe_stream_proto));
_stream->session_type = *(enum tfe_stream_proto *)arg;
}
else if (opt == TFE_STREAM_OPT_PASSTHROUGH)
{
assert(sz_arg == sizeof(bool));
_stream->passthough = *(bool *)arg;
}
return 0;
}