This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/platform/src/tcp_stream.cpp

1606 lines
49 KiB
C++
Raw Normal View History

#include <netinet/in.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/dns.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/rand.h>
#include <openssl/x509.h>
#include <openssl/x509v3.h>
#include <tfe_stream.h>
#include <tfe_utils.h>
#include <tfe_future.h>
#include <tfe_plugin.h>
#include <tfe_proxy.h>
#include <platform.h>
#include <ssl_stream.h>
#include <tcp_stream.h>
#include <proxy.h>
#include <netinet/tcp.h>
#include <event2/bufferevent_struct.h>
#ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT
#define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024)
#endif
/* forward declaration of libevent callbacks */
static void __stream_bev_readcb(struct bufferevent *, void *);
static void __stream_bev_writecb(struct bufferevent *, void *);
static void __stream_bev_eventcb(struct bufferevent *, short, void *);
/* ====================================================================================================================
* HELPER FUNCTIONS
* ===================================================================================================================*/
static inline void __stream_log_event(struct tfe_stream_private * _stream,
enum tfe_stream_event_log_type type, enum tfe_conn_dir dir, unsigned int error, const char * str_error)
{
unsigned int log_offset = _stream->nr_log_event;
assert(log_offset < STREAM_EVENT_LOG_MAX);
_stream->log_event[log_offset].type = type;
_stream->log_event[log_offset].dir = dir;
_stream->log_event[log_offset].error = error;
_stream->log_event[log_offset].str_error = str_error;
_stream->nr_log_event++;
}
static const char * __str_stream_log_type(enum tfe_stream_event_log_type type)
{
static const char * map_event_log_type[] =
{
[EVENT_LOG_CLOSE_BY_FD_PEER] = "FD/PEER",
[EVENT_LOG_CLOSE_BY_FD_EOF] = "FD/EOF",
[EVENT_LOG_CLOSE_BY_FD_ERROR] = "FD/ERR",
[EVENT_LOG_CLOSE_BY_SSL_ERROR] = "SSL/ERR"
};
return map_event_log_type[type];
}
static inline struct tfe_stream_private * to_stream_private(const struct tfe_stream * stream)
{
return container_of(stream, struct tfe_stream_private, head);
}
static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream));
}
static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
}
static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev)
{
if (_stream->conn_downstream && bev == _stream->conn_downstream->bev)
{
return CONN_DIR_DOWNSTREAM;
}
if (_stream->conn_upstream && bev == _stream->conn_upstream->bev)
{
return CONN_DIR_UPSTREAM;
}
assert(0);
return CONN_DIR_DOWNSTREAM;
}
static inline const char * __str_dir(enum tfe_conn_dir dir)
{
return dir == CONN_DIR_DOWNSTREAM ? "DOWNSTREAM" : "UPSTREAM";
}
static inline bool __is_ssl(struct tfe_stream_private * _stream)
{
return (_stream->session_type == STREAM_PROTO_SSL);
}
static void call_plugin_close(struct tfe_stream_private * _stream)
{
unsigned int plugin_id_iter = 0;
unsigned int plugin_id = 0;
for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter);
p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter))
{
_stream->calling_idx = plugin_id;
struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id];
/* TODO: do not use pme to determinate we call on_open or not ever. */
if (p_info_iter->on_close && plug_ctx->pme != NULL)
{
p_info_iter->on_close(&_stream->head, _stream->thread_ref->thread_id,
REASON_PASSIVE_CLOSED, &(plug_ctx->pme));
}
plugin_id++;
}
}
/* ====================================================================================================================
* INTERFACE
* ===================================================================================================================*/
void tfe_stream_detach(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
int plug_id = _stream->calling_idx;
_stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED;
}
int tfe_stream_preempt(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
int plug_id = _stream->calling_idx;
for (unsigned int i = 0; i < _stream->nr_plugin_ctxs; i++)
{
if (_stream->plugin_ctxs[i].state == PLUG_STATE_PREEPTION)
{
return -1;
}
}
_stream->plugin_ctxs[plug_id].state = PLUG_STATE_PREEPTION;
return 0;
}
void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
assert(_stream != NULL);
assert(_stream->conn_upstream != NULL && _stream->conn_downstream != NULL);
assert(_stream->conn_upstream->bev != NULL);
assert(_stream->conn_downstream->bev != NULL);
/* stream cannot be suspended twice or more */
assert(!_stream->is_suspended);
_stream->is_suspended = true;
_stream->suspended_by = by;
/* disable all events */
int ret = 0;
ret = bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
assert(ret == 0);
ret = bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
assert(ret == 0);
2018-11-02 13:52:30 +08:00
(void) ret;
}
void tfe_stream_resume(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
assert(_stream->is_suspended);
bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
if (_stream->suspended_by == CONN_DIR_DOWNSTREAM)
{
bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
else
{
bufferevent_trigger(_stream->conn_upstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
_stream->is_suspended = false;
_stream->suspended_by = CONN_DIR_DOWNSTREAM;
}
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
struct tfe_stream_write_ctx ** ref_write_ctx = NULL;
2019-09-02 11:39:19 +08:00
UNUSED struct tfe_conn_private * this_conn = NULL;
struct tfe_conn_private * peer_conn = NULL;
if (dir == CONN_DIR_DOWNSTREAM)
{
this_conn = _stream->conn_downstream;
peer_conn = _stream->conn_upstream;
ref_write_ctx = &_stream->w_ctx_downstream;
}
else
{
this_conn = _stream->conn_upstream;
peer_conn = _stream->conn_downstream;
ref_write_ctx = &_stream->w_ctx_upstream;
}
if (*ref_write_ctx != NULL)
return NULL;
this_conn->on_writing = 1;
*ref_write_ctx = ALLOC(struct tfe_stream_write_ctx, 1);
(*ref_write_ctx)->_stream = _stream;
(*ref_write_ctx)->dir = dir;
bufferevent_disable(peer_conn->bev, EV_READ);
return *ref_write_ctx;
}
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{
if (w_ctx->_stream == NULL)
{
return -EPIPE;
}
struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);
int ret = 0;
if (this_conn != NULL)
{
ret = bufferevent_write(this_conn->bev, data, size);
}
else
{
return -EPIPE;
}
return ret;
}
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{
struct tfe_conn_private * this_conn;
struct tfe_conn_private * peer_conn;
/* The connection terminated before this function call */
if (w_ctx->_stream == NULL) goto __out;
this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);
peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir);
if (this_conn != NULL)
{
this_conn->on_writing = 0;
bufferevent_enable(peer_conn->bev, EV_READ);
}
if (w_ctx->dir == CONN_DIR_DOWNSTREAM)
{
assert(w_ctx->_stream->w_ctx_downstream == w_ctx);
w_ctx->_stream->w_ctx_downstream = NULL;
}
else
{
assert(w_ctx->_stream->w_ctx_upstream == w_ctx);
w_ctx->_stream->w_ctx_upstream = NULL;
}
__out:
free(w_ctx);
}
int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size)
{
int ret = 0;
struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir);
ret = tfe_stream_write_frag(wctx, data, size);
tfe_stream_write_frag_end(wctx);
return ret;
}
static int conn_private_ratelimit_setup(struct tfe_conn_private * conn, struct tfe_proxy_rate_limit_options * opt)
{
conn->ratelimit_bucket = ev_token_bucket_cfg_new(opt->read_rate, opt->read_burst,
opt->write_rate, opt->write_burst, NULL);
if(unlikely(conn->ratelimit_bucket == NULL))
{
TFE_LOG_ERROR(g_default_logger, "Failed at setting ratelimit bucket, "
"read_rate = %u, read_burst = %u, write_rate = %u, write_burst = %u",
opt->read_rate, opt->read_burst, opt->write_rate, opt->write_burst);
return -1;
}
bufferevent_set_rate_limit(conn->bev, conn->ratelimit_bucket);
return 0;
}
int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type,
void * value, size_t size)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
#define __SAVE_PARAM(what) do { \
if(size != sizeof(__typeof__(what))) { return -EINVAL; } \
else { what = *(__typeof(what) *)value; } } while(0) \
switch (type)
{
case ACTION_OPT_FOWARD_BYTES: __SAVE_PARAM(_stream->forward_bytes);
break;
case ACTION_OPT_DEFER_BYTES: __SAVE_PARAM(_stream->defer_bytes);
break;
case ACTION_OPT_DEFER_TIME_TV: __SAVE_PARAM(_stream->defer_timeval);
break;
case ACTION_OPT_DROP_BYTES: __SAVE_PARAM(_stream->drop_bytes);
break;
}
#undef __SAVE_PARAM
return 0;
}
int tfe_stream_set_integer_opt(struct tfe_stream * stream, enum tfe_stream_opt_level level, int type, int val)
{
return 1;
}
int tfe_stream_info_get(const struct tfe_stream * stream, enum tfe_stream_info type, void * value, size_t size)
{
#define __CHECK_PARAM(type) do { if(sizeof(type) != size) return -EINVAL; } while(0)
struct tfe_stream_private * _stream = to_stream_private(stream);
switch (type)
{
case INFO_FROM_DOWNSTREAM_RX_OFFSET:
__CHECK_PARAM(_stream->downstream_rx_offset);
if(_stream->conn_downstream)
{
*((size_t *) value) = _stream->downstream_rx_offset;
return sizeof(_stream->downstream_rx_offset);
}
else
{
return -ENOTCONN;
}
break;
case INFO_FROM_UPSTREAM_RX_OFFSET:
__CHECK_PARAM(_stream->upstream_rx_offset);
if(_stream->conn_upstream)
{
*((size_t *) value) = _stream->upstream_rx_offset;
return sizeof(_stream->upstream_rx_offset);
}
else
{
return -ENOTCONN;
}
break;
default: assert(0);
return -EINVAL;
}
#undef __CHECK_PARAM
return 0;
}
/* ====================================================================================================================
* CONNECTION STRUCTURE AND OPERATION FUCTIONS
* ===================================================================================================================*/
evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn)
{
evutil_socket_t __to_release_fd = conn->fd;
conn->fd = 0;
return __to_release_fd;
}
static void __conn_private_destory(struct tfe_conn_private * conn)
{
int ret = 0;
if (conn->bev)
{
bufferevent_disable(conn->bev, EV_READ | EV_WRITE);
bufferevent_free(conn->bev);
}
if(conn->ratelimit_bucket)
{
ev_token_bucket_cfg_free(conn->ratelimit_bucket);
conn->ratelimit_bucket = NULL;
}
if (conn->fd > 0)
{
ret = evutil_closesocket(conn->fd);
assert(ret >= 0);
}
free(conn);
(void)ret;
TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE, 1);
}
static void __conn_private_destory_with_ssl(struct event_base * ev_base,
struct tfe_conn_private * conn, struct ssl_stream * ssl_stream)
{
if (ssl_stream == NULL) return __conn_private_destory(conn);
ssl_stream_free(ssl_stream, ev_base, conn->bev);
return __conn_private_destory(conn);
}
static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev));
struct evbuffer * __input_buffer = bufferevent_get_input(bev);
if (peer_conn == NULL)
{
evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer));
return;
}
if (_stream->is_first_call_rxcb == 0)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_BYPASS, 1);
_stream->is_first_call_rxcb = 1;
}
struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev);
evbuffer_add_buffer(__output_buffer, __input_buffer);
}
static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
}
struct evbuffer * __output_buffer = bufferevent_get_output(bev);
assert(__output_buffer != NULL);
if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0)
{
__conn_private_destory(*ref_this_conn);
*ref_this_conn = NULL;
}
if (*ref_peer_conn == NULL && *ref_this_conn == NULL)
{
call_plugin_close(_stream);
tfe_stream_destory(_stream);
}
return;
}
static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short events, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
const char * str_conn_dir = __str_dir(conn_dir);
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
{
if (evbuffer_get_length(bufferevent_get_input(bev)))
{
__stream_bev_passthrough_readcb(bev, arg);
}
if(events & BEV_EVENT_ERROR)
{
unsigned long err;
while ((err = (bufferevent_get_openssl_error(bev))))
{
const char *msg = (const char*)ERR_reason_error_string(err);
const char *lib = (const char*)ERR_lib_error_string(err);
const char *func = (const char*)ERR_func_error_string(err);
TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s",
_stream->str_stream_addr, str_conn_dir, err, lib, func, msg);
}
if (errno)
{
TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s",
_stream->str_stream_addr, str_conn_dir, errno, strerror(errno));
2020-07-30 15:57:34 +08:00
/* after log, reset errno */
errno = 0;
}
}
goto __close_connection;
}
return;
__close_connection:
if (*ref_peer_conn != NULL)
{
struct bufferevent * __peer_bev = (*ref_peer_conn)->bev;
struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev);
if (evbuffer_get_length(__peer_output_buffer) == 0)
{
__conn_private_destory(*ref_peer_conn);
*ref_peer_conn = NULL;
}
}
if (*ref_this_conn != NULL)
{
__conn_private_destory(*ref_this_conn);
*ref_this_conn = NULL;
}
if (*ref_this_conn == NULL && *ref_peer_conn == NULL)
{
call_plugin_close(_stream);
tfe_stream_destory(_stream);
}
return;
}
/*
* Callback for read events on the up- and downstream connection bufferevents.
* Called when there is data ready in the input evbuffer.
*/
static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __bev_dir(_stream, bev);
2019-09-02 11:39:19 +08:00
UNUSED struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
/* Peer connection is terminated, drain all data.
* This connection will be destoryed in __event_cb */
struct evbuffer * inbuf = bufferevent_get_input(bev);
size_t contigous_len = evbuffer_get_length(inbuf);
if (peer_conn == NULL)
{
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
return;
}
if (_stream->is_first_call_rxcb == 0)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1);
_stream->is_first_call_rxcb = 1;
}
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
2018-11-02 13:52:30 +08:00
assert(inbuf != NULL && outbuf != NULL);
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA;
enum tfe_stream_action action_final = ACTION_FORWARD_DATA;
size_t drain_size = 0;
size_t rx_offset_increase = 0;
unsigned char * contiguous_data = evbuffer_pullup(inbuf, contigous_len);
_stream->defer_bytes = 0;
_stream->drop_bytes = 0;
_stream->forward_bytes = 0;
unsigned int plugin_id_iter = 0;
unsigned int plugin_id = 0;
for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter);
p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter), plugin_id++)
{
_stream->calling_idx = plugin_id;
struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id];
if (p_info_iter->on_open != NULL && plug_ctx->is_plugin_opened == 0)
{
p_info_iter->on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme));
plug_ctx->is_plugin_opened = 1;
}
if (plug_ctx->state == PLUG_STATE_DETACHED)
continue;
if (p_info_iter->on_data != NULL)
{
action_tmp = p_info_iter->on_data(&_stream->head, _stream->thread_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme));
}
if (plug_ctx->state == PLUG_STATE_PREEPTION)
action_final = action_tmp;
}
switch (action_final)
{
case ACTION_FORWARD_DATA:
if (_stream->forward_bytes > 0)
{
evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes);
rx_offset_increase += _stream->forward_bytes;
}
else
{
evbuffer_add_buffer(outbuf, inbuf);
rx_offset_increase += contigous_len;
}
break;
case ACTION_DROP_DATA:
if (_stream->drop_bytes > 0)
{
drain_size = _stream->drop_bytes;
}
else
{
drain_size = evbuffer_get_length(inbuf);
}
evbuffer_drain(inbuf, drain_size);
rx_offset_increase += drain_size;
break;
case ACTION_DEFER_DATA:
if (_stream->defer_bytes > 0)
{
bufferevent_setwatermark(bev, EV_WRITE, _stream->defer_bytes, 0);
}
break;
default: assert(0);
break;
}
if (dir == CONN_DIR_DOWNSTREAM)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, rx_offset_increase);
_stream->downstream_rx_offset += rx_offset_increase;
}
else
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, rx_offset_increase);
_stream->upstream_rx_offset += rx_offset_increase;
}
/* Total Bytes */
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase);
if(_stream->need_to_be_kill)
{
const static struct linger sl{.l_onoff = 1, .l_linger = 0};
/* Set SO_LINGER, the fd will be closed by RST */
setsockopt(_stream->conn_upstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
setsockopt(_stream->conn_downstream->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
/* Destroy STREAM */
TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS_KILL, 1);
/* Call plugin close when the connection is killed */
call_plugin_close(_stream);
return tfe_stream_destory(_stream);
}
#if 0
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
{
bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT);
bufferevent_disable(bev, EV_READ);
}
#endif
}
/*
* Callback for write events on the up- and downstream connection bufferevents.
* Called when either all data from the output evbuffer has been written,
* or if the outbuf is only half full again after having been full.
*/
static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct event_base * ev_base = bufferevent_get_base(bev);
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
struct ssl_stream ** ref_this_ssl_stream{};
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
if (conn_dir == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_this_ssl_stream = &_stream->ssl_upstream;
ref_peer_conn = &_stream->conn_downstream;
}
if (conn_dir == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_this_ssl_stream = &_stream->ssl_downstream;
ref_peer_conn = &_stream->conn_upstream;
}
struct evbuffer * __output_buffer = bufferevent_get_output(bev);
assert(__output_buffer != NULL);
2018-11-02 13:52:30 +08:00
if (*ref_peer_conn == NULL /* Peer connection is closed */
&& (*ref_this_conn)->on_writing == 0 /* No body is prepare to write data, eg. No body call stream_write */
&& evbuffer_get_length(__output_buffer) == 0) /* Nothing is in send queue */
{
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, conn_dir, 0, NULL);
*ref_this_conn = NULL;
*ref_this_ssl_stream = NULL;
}
if (*ref_peer_conn == NULL && *ref_this_conn == NULL)
{
call_plugin_close(_stream);
tfe_stream_destory(_stream);
}
}
/*
* Callback for meta events on the up- and downstream connection bufferevents.
* Called when EOF has been reached, a connection has been made, and on errors.
*/
static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct event_base * ev_base = bufferevent_get_base(bev);
struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
struct ssl_stream ** ref_this_ssl_stream{};
struct ssl_stream ** ref_peer_ssl_stream{};
struct tfe_stream_write_ctx ** ref_this_write_ctx{};
enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev);
const char * str_conn_dir = __str_dir(conn_dir);
enum tfe_conn_dir peer_conn_dir{};
size_t rx_offset = 0;
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{
ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream;
ref_this_ssl_stream = &_stream->ssl_upstream;
ref_peer_ssl_stream = &_stream->ssl_downstream;
ref_this_write_ctx = &_stream->w_ctx_upstream;
peer_conn_dir = CONN_DIR_DOWNSTREAM;
rx_offset = _stream->upstream_rx_offset;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
ref_this_ssl_stream = &_stream->ssl_downstream;
ref_peer_ssl_stream = &_stream->ssl_upstream;
ref_this_write_ctx = &_stream->w_ctx_downstream;
peer_conn_dir = CONN_DIR_UPSTREAM;
rx_offset = _stream->downstream_rx_offset;
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
{
if (evbuffer_get_length(bufferevent_get_input(bev)))
{
__stream_bev_readcb(bev, arg);
}
if (events & BEV_EVENT_ERROR)
{
if (_stream->session_type == STREAM_PROTO_SSL)
{
unsigned long sslerr = ssl_stream_log_error(bev, __bev_dir(_stream, bev), _stream->ssl_mgr);
if (sslerr)
{
TFE_LOG_ERROR(g_default_logger, "%s %s connection error at tcp layer, ssl layer is unavailable", _stream->str_stream_addr, str_conn_dir);
enum ssl_stream_error error = SSL_STREAM_R_SERVER_PROTOCOL_ERROR;
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
error = SSL_STREAM_R_CLIENT_PROTOCOL_ERROR;
}
ssl_stream_set_cmsg_string(*ref_this_ssl_stream, TFE_CMSG_SSL_ERROR, ssl_stream_get_error_string(error));
ssl_stream_process_error(*ref_this_ssl_stream, sslerr, _stream->ssl_mgr);
}
}
else if (errno)
{
TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s",
_stream->str_stream_addr, str_conn_dir, errno, strerror(errno));
2020-07-30 15:57:34 +08:00
/* after log, reset errno */
errno = 0;
}
}
else if (events & BEV_EVENT_EOF && rx_offset == 0 && _stream->session_type == STREAM_PROTO_SSL)
{
ssl_stream_process_zero_eof(*ref_this_ssl_stream, g_default_proxy->ssl_mgr_handler);
}
if (events & BEV_EVENT_ERROR)
{
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_ERROR, conn_dir, 0, NULL);
}
if (events & BEV_EVENT_EOF)
{
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_EOF, conn_dir, 0, NULL);
}
goto __close_connection;
}
return;
__close_connection:
if (*ref_peer_conn != NULL)
{
struct bufferevent * __peer_bev = (*ref_peer_conn)->bev;
struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev);
2018-10-18 16:20:22 +08:00
if (evbuffer_get_length(__peer_output_buffer) == 0 && (*ref_peer_conn)->on_writing == 0)
{
__conn_private_destory_with_ssl(ev_base, *ref_peer_conn, *ref_peer_ssl_stream);
*ref_peer_conn = NULL;
*ref_peer_ssl_stream = NULL;
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_FD_PEER, peer_conn_dir, 0, NULL);
}
}
if (*ref_this_conn != NULL)
{
/* There is a frag writter setted, need to clear the reference of stream in the writter to indicate the connection is closed */
if (*ref_this_write_ctx != NULL)
{
(*ref_this_write_ctx)->_stream = NULL;
}
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);
*ref_this_conn = NULL;
*ref_this_ssl_stream = NULL;
}
if (*ref_this_conn == NULL && *ref_peer_conn == NULL)
{
goto __call_plugin_close;
}
return;
__call_plugin_close:
call_plugin_close(_stream);
tfe_stream_destory(_stream);
}
static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
__conn_private->bev = bev;
__conn_private->fd = bufferevent_getfd(bev);
if(stream->tcp_passthough)
{
bufferevent_setcb(__conn_private->bev, __stream_bev_passthrough_readcb,
__stream_bev_passthrough_writecb, __stream_bev_passthrough_eventcb, stream);
}
else
{
bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream);
}
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
struct tfe_proxy * proxy_ref = stream->proxy_ref;
if(unlikely(proxy_ref->en_rate_limit))
{
conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options);
}
return __conn_private;
}
2018-09-05 10:38:27 +08:00
static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
struct event_base * __ev_base = stream->thread_ref->evbase;
struct tfe_proxy * proxy_ref = stream->proxy_ref;
2018-11-02 13:52:30 +08:00
__conn_private->_stream_ref = stream;
__conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE );
__conn_private->fd = fd;
bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE);
if (!__conn_private->bev)
{
TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd);
goto __errout;
}
if (stream->tcp_passthough)
{
bufferevent_setcb(__conn_private->bev, __stream_bev_passthrough_readcb,
__stream_bev_passthrough_writecb, __stream_bev_passthrough_eventcb, stream);
}
else
{
bufferevent_setcb(__conn_private->bev, __stream_bev_readcb,
__stream_bev_writecb, __stream_bev_eventcb, stream);
}
if(proxy_ref->en_rate_limit)
{
conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options);
}
return __conn_private;
__errout:
if (__conn_private != NULL) free(__conn_private);
return NULL;
}
2018-08-31 14:32:34 +08:00
void __conn_private_enable(struct tfe_conn_private * conn_private)
{
assert(conn_private != NULL && conn_private->bev != NULL);
bufferevent_enable(conn_private->bev, EV_READ | EV_WRITE);
}
void ssl_downstream_create_on_success(future_result_t * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
struct ssl_stream * downstream = ssl_downstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_downstream_create_result_release_bev(result);
_stream->defer_fd_downstream = 0;
2018-09-05 10:38:27 +08:00
_stream->conn_downstream = __conn_private_create_by_bev(_stream, bev);
_stream->ssl_downstream = downstream;
future_destroy(_stream->future_downstream_create);
_stream->future_downstream_create = NULL;
2018-08-31 14:32:34 +08:00
assert(_stream->conn_downstream != NULL && _stream->conn_upstream != NULL);
ssl_stream_dump_info(_stream->ssl_downstream, _stream->ssl_downstream_info_dump,
sizeof(_stream->ssl_downstream_info_dump));
ssl_stream_dump_info(_stream->ssl_upstream, _stream->ssl_upstream_info_dump,
sizeof(_stream->ssl_upstream_info_dump));
2018-08-31 14:32:34 +08:00
__conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream);
return;
}
void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
2018-11-02 13:52:30 +08:00
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL);
TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL downstream, close the connection : %s. ",
_stream->str_stream_addr, what);
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_DOWNSTREAM, 0, NULL);
tfe_stream_destory(_stream);
}
void ssl_upstream_create_on_success(future_result_t * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
enum ssl_stream_action ssl_action = ssl_upstream_create_result_release_action(result);
if (SSL_ACTION_PASSTHROUGH == ssl_action)
{
_stream->tcp_passthough = true;
_stream->conn_upstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_upstream);
_stream->conn_downstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream);
__conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream);
_stream->defer_fd_downstream = 0;
_stream->defer_fd_upstream = 0;
}
else if (SSL_ACTION_SHUTDOWN == ssl_action)
{
return tfe_stream_destory(_stream);
}
else
{
struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_upstream_create_result_release_bev(result);
assert(upstream != NULL && bev != NULL);
/* Create connection ctx by bev, fd's ownership is transfer to bev */
_stream->defer_fd_upstream = 0;
_stream->conn_upstream = __conn_private_create_by_bev(_stream, bev);
_stream->ssl_upstream = upstream;
assert(_stream->conn_upstream != NULL);
assert(_stream->ssl_upstream != NULL);
/* Next, create downstream */
_stream->future_downstream_create = future_create("ssl_down", ssl_downstream_create_on_success,
ssl_downstream_create_on_fail, _stream);
ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr,
_stream->ssl_upstream, _stream->defer_fd_downstream, &_stream->head);
}
future_destroy(_stream->future_upstream_create);
_stream->future_upstream_create = NULL;
}
void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
2018-11-02 13:52:30 +08:00
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL);
TFE_LOG_INFO(g_default_logger, "%s Failed to create SSL upstream, close the connection : %s. ",
_stream->str_stream_addr, what);
__stream_log_event(_stream, EVENT_LOG_CLOSE_BY_SSL_ERROR, CONN_DIR_UPSTREAM, 0, NULL);
tfe_stream_destory(_stream);
}
struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx)
{
struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1);
TFE_PROXY_STAT_INCREASE(STAT_STREAM_OPEN, 1);
2018-11-02 13:52:30 +08:00
_stream->head.thread_id=thread_ctx->thread_id;
_stream->thread_ref = thread_ctx;
_stream->proxy_ref = pxy;
_stream->stream_logger = pxy->logger;
unsigned int total_plugin_count = tfe_plugin_total_counts();
_stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count);
return &_stream->head;
}
void __stream_access_log_write(struct tfe_stream_private * stream)
{
const char * str_passthrough = stream->tcp_passthough ? "PASSTHROUGH" : "-";
const char * str_kill = stream->need_to_be_kill ? "KILL" : "-";
char str_log_event[TFE_STRING_MAX] = "";
unsigned int offset = 0;
/* Write event abstract log. It is used to determine which connection is broken */
for(unsigned int i = 0; i < stream->nr_log_event; i++)
{
struct tfe_stream_event_log * ev_log = &stream->log_event[i];
const char * str_dir = ev_log->dir == CONN_DIR_DOWNSTREAM ? "DOWN" : "UP";
offset += snprintf(str_log_event + offset, sizeof(str_log_event) - offset,
"%s/%s ", __str_stream_log_type(ev_log->type), str_dir);
}
2018-10-24 20:53:29 +08:00
MESA_handle_runtime_log(stream->stream_logger, RLOG_LV_INFO, "access",
"%d %d %d %s %s %s %s %s %s", stream->log_fd_downstream, stream->log_fd_upstream, stream->keyring_id,
stream->str_stream_addr, str_passthrough, str_kill, str_log_event,
stream->ssl_downstream_info_dump, stream->ssl_upstream_info_dump);
}
static int ev_log_to_stat_map[__EVENT_LOG_CLOSE_MAX][__CONN_DIR_MAX]{{-1}};
void __ev_log_to_stat_map_init() __attribute__((constructor, used));
void __ev_log_to_stat_map_init()
{
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_DOWNSTREAM] = -1;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_EOF;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_DOWNSTREAM] = STAT_STREAM_CLS_DOWN_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_PEER][CONN_DIR_UPSTREAM] = -1;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_EOF][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_EOF;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_FD_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR;
ev_log_to_stat_map[EVENT_LOG_CLOSE_BY_SSL_ERROR][CONN_DIR_UPSTREAM] = STAT_STREAM_CLS_UP_ERR;
}
void __stream_close_stat(struct tfe_stream_private * stream)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_CLS, 1);
if(stream->nr_log_event > 0)
{
struct tfe_stream_event_log * ev_log = &stream->log_event[0];
assert(ev_log_to_stat_map[ev_log->type][ev_log->dir] >= 0);
TFE_PROXY_STAT_INCREASE(ev_log_to_stat_map[ev_log->type][ev_log->dir], 1);
}
}
void tfe_stream_destory(struct tfe_stream_private * stream)
{
struct tfe_proxy * proxy = stream->proxy_ref;
struct tfe_thread_ctx * thread = stream->thread_ref;
struct event_base * ev_base = thread->evbase;
__stream_access_log_write(stream);
__stream_close_stat(stream);
if (stream->head.addr)
{
FREE(&(stream->head.addr));
}
if (stream->str_stream_addr)
{
FREE(&(stream->str_stream_addr));
}
if (__is_ssl(stream) && stream->ssl_upstream)
{
ssl_stream_free(stream->ssl_upstream, ev_base, stream->conn_upstream->bev);
}
if (__is_ssl(stream) && stream->ssl_downstream)
{
ssl_stream_free(stream->ssl_downstream, ev_base, stream->conn_downstream->bev);
}
if (stream->conn_upstream)
{
assert(stream->defer_fd_upstream <= 0);
__conn_private_destory(stream->conn_upstream);
}
if (stream->conn_downstream)
{
assert(stream->defer_fd_downstream <= 0);
__conn_private_destory(stream->conn_downstream);
}
if (stream->defer_fd_downstream)
{
evutil_closesocket(stream->defer_fd_downstream);
}
if (stream->defer_fd_upstream)
{
evutil_closesocket(stream->defer_fd_upstream);
}
if (stream->future_downstream_create)
{
future_destroy(stream->future_downstream_create);
}
if (stream->future_upstream_create)
{
future_destroy(stream->future_upstream_create);
}
if (proxy->scm_sender && stream->cmsg)
{
sender_scm_cmsg_send(proxy->scm_sender, stream->cmsg);
}
if (stream->cmsg)
{
tfe_cmsg_destroy(stream->cmsg);
}
FREE(&(stream->plugin_ctxs));
tfe_proxy_thread_ctx_release(stream->thread_ref);
stream->proxy_ref = NULL;
FREE(&(stream));
}
int __fd_ttl_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, int ttl)
{
struct sockaddr_storage sk_storage;
socklen_t sk_storage_len = sizeof(sk_storage);
if (getsockname(fd, (struct sockaddr *) &sk_storage, &sk_storage_len) < 0)
{
TFE_STREAM_LOG_ERROR(_stream, "getsockname(fd = %d) failed: %s", fd, strerror(errno));
2020-07-30 15:57:34 +08:00
/* after log, reset errno */
errno = 0;
return -1;
}
/* For IPv4, set TTL */
unsigned int __ttl = (unsigned int) ttl;
const char * __str_family = NULL;
int ret = 0;
if (sk_storage.ss_family == AF_INET)
{
ret = setsockopt(fd, IPPROTO_IP, IP_TTL, &__ttl, sizeof(__ttl));
__str_family = "AF_INET";
}
else if (sk_storage.ss_family == AF_INET6)
{
ret = setsockopt(fd, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &__ttl, sizeof(__ttl));
__str_family = "AF_INET6";
}
else { assert(0); }
if (ret < 0)
{
TFE_STREAM_LOG_ERROR(_stream, "setsockopt(ttl = %u, fd = %d, family = %s) failed: %s",
2020-07-30 15:57:34 +08:00
__ttl, fd, __str_family, strerror(errno));
/* after log, reset errno */
errno = 0;
return -2;
}
return 0;
}
2020-07-21 20:00:14 +08:00
struct tfe_tcp_options
{
int tcp_nodelay;
int tcp_ttl;
int tcp_keepalive;
int tcp_keepcnt;
int tcp_keepidle;
int tcp_keepintvl;
int tcp_user_timeout;
};
static void get_tcp_option_from_cmsg(struct tfe_cmsg *cmsg, struct tfe_tcp_options *options, tfe_conn_dir dir)
{
int ret;
uint16_t size = 0;
enum tfe_cmsg_tlv_type type;
memset(options, 0, sizeof(struct tfe_tcp_options));
const char *dir_str = (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream");
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_NODELAY : TFE_CMSG_UPSTREAM_TCP_NODELAY;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_nodelay), sizeof(options->tcp_nodelay), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_nodelay from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_TTL : TFE_CMSG_UPSTREAM_TCP_TTL;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_ttl), sizeof(options->tcp_ttl), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_ttl from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPALIVE : TFE_CMSG_UPSTREAM_TCP_KEEPALIVE;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepalive), sizeof(options->tcp_keepalive), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepalive from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPCNT : TFE_CMSG_UPSTREAM_TCP_KEEPCNT;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepcnt), sizeof(options->tcp_keepcnt), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepcnt from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPIDLE : TFE_CMSG_UPSTREAM_TCP_KEEPIDLE;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepidle), sizeof(options->tcp_keepidle), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepidle from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPINTVL : TFE_CMSG_UPSTREAM_TCP_KEEPINTVL;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepintvl), sizeof(options->tcp_keepintvl), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepintvl from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_USER_TIMEOUT : TFE_CMSG_UPSTREAM_TCP_USER_TIMEOUT;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_user_timeout), sizeof(options->tcp_user_timeout), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_user_timeout from cmsg: %s", dir_str, strerror(-ret));
}
}
void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, tfe_conn_dir dir)
{
struct tfe_stream * stream = &_stream->head;
struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options;
2020-07-21 20:00:14 +08:00
struct tfe_tcp_options options = {0};
/* Make it non-blocking */
int ret = evutil_make_socket_nonblocking(fd);
assert(ret >= 0);
2020-07-21 20:00:14 +08:00
/* get tcp options from tfe.conf */
/* Recv Buffer */
if (tcp_options->sz_rcv_buffer >= 0)
{
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void *) &tcp_options->sz_rcv_buffer, sizeof(int)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_RCVBUF, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->sz_rcv_buffer, strerror(errno));
2020-07-30 15:57:34 +08:00
/* after log, reset errno */
errno = 0;
}
}
/* Send Buffer */
if (tcp_options->sz_snd_buffer >= 0)
{
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (const void *) &tcp_options->sz_snd_buffer, sizeof(int)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_SNDBUF, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->sz_snd_buffer, strerror(errno));
2020-07-30 15:57:34 +08:00
/* after log, reset errno */
errno = 0;
}
}
2020-07-21 20:00:14 +08:00
if (tcp_options->enable_overwrite > 0)
{
/* get tcp options from tfe.conf */
options.tcp_keepalive = tcp_options->so_keepalive;
options.tcp_keepcnt = tcp_options->tcp_keepcnt;
options.tcp_keepidle = tcp_options->tcp_keepidle;
options.tcp_keepintvl = tcp_options->tcp_keepintvl;
options.tcp_nodelay = tcp_options->tcp_nodelay;
options.tcp_ttl = (dir == CONN_DIR_DOWNSTREAM ? tcp_options->tcp_ttl_downstream : tcp_options->tcp_ttl_upstream);
options.tcp_user_timeout = tcp_options->tcp_user_timeout;
}
else
{
/* get tcp options form cmsg */
get_tcp_option_from_cmsg(_stream->cmsg, &options, dir);
}
/* Keep-alive */
2020-07-21 20:00:14 +08:00
if (options.tcp_keepalive > 0)
{
2020-07-21 20:00:14 +08:00
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &(options.tcp_keepalive), sizeof(options.tcp_keepalive)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_KEEPALIVE, %d) failed, ignored: %s",
2020-07-21 20:00:14 +08:00
stream->str_stream_info, options.tcp_keepalive, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
2020-07-21 20:00:14 +08:00
if (options.tcp_keepcnt > 0)
{
2020-07-21 20:00:14 +08:00
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &(options.tcp_keepcnt), sizeof(options.tcp_keepcnt)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPCNT, %d) failed, ignored: %s",
2020-07-21 20:00:14 +08:00
stream->str_stream_info, options.tcp_keepcnt, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
2020-07-21 20:00:14 +08:00
if (options.tcp_keepintvl > 0)
{
2020-07-21 20:00:14 +08:00
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &(options.tcp_keepintvl), sizeof(options.tcp_keepintvl)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPINTVL, %d) failed, ignored: %s",
2020-07-21 20:00:14 +08:00
stream->str_stream_info, options.tcp_keepintvl, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
2020-07-21 20:00:14 +08:00
if (options.tcp_keepidle > 0)
{
2020-07-21 20:00:14 +08:00
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &(options.tcp_keepidle), sizeof(options.tcp_keepidle)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPIDLE, %d) failed, ignored: %s",
2020-07-21 20:00:14 +08:00
stream->str_stream_info, options.tcp_keepidle, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
2020-07-21 20:00:14 +08:00
if (options.tcp_user_timeout > 0)
{
2020-07-21 20:00:14 +08:00
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (const void *) &(options.tcp_user_timeout), sizeof(options.tcp_user_timeout)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_USER_TIMEOUT, %d) failed, ignored: %s",
2020-07-21 20:00:14 +08:00
stream->str_stream_info, options.tcp_user_timeout, strerror(errno));
2020-07-30 15:57:34 +08:00
/* after log, reset errno */
errno = 0;
}
}
2020-07-21 20:00:14 +08:00
// enable/disable
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &(options.tcp_nodelay), sizeof(options.tcp_nodelay)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_NODELAY, %d) failed, ignored: %s",
stream->str_stream_info, options.tcp_nodelay, strerror(errno));
/* after log, reset errno */
errno = 0;
}
if (options.tcp_ttl > 0)
{
2020-07-21 20:00:14 +08:00
if (__fd_ttl_option_setup(_stream, fd, options.tcp_ttl) < 0)
{
TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d",
stream->str_stream_info, options.tcp_ttl, fd);
}
}
2020-07-21 20:00:14 +08:00
TFE_LOG_DEBUG(g_default_logger,
"%p %s %s: fetch tcp options, nodelay: %d ttl: %d keepalive: %d keepcnt: %d keepidle: %d keepintvl: %d user_timeout: %d",
stream, stream->str_stream_info, (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream"),
2020-07-21 20:00:14 +08:00
options.tcp_nodelay, options.tcp_ttl, options.tcp_keepalive,
options.tcp_keepcnt, options.tcp_keepidle, options.tcp_keepintvl, options.tcp_user_timeout);
}
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
_stream->defer_fd_downstream = fd_downstream;
_stream->defer_fd_upstream = fd_upstream;
2018-10-24 20:53:29 +08:00
_stream->log_fd_downstream = fd_downstream;
_stream->log_fd_upstream = fd_upstream;
_stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM);
if (unlikely(_stream->head.addr == NULL))
{
TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, %s, terminate fds.",
fd_downstream, fd_upstream, strerror(errno)); goto __errout;
}
2020-07-21 20:00:14 +08:00
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
stream->str_stream_info = _stream->str_stream_addr;
2020-07-21 20:00:14 +08:00
__stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM);
__stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM);
if (_stream->session_type == STREAM_PROTO_PLAIN)
{
2018-09-05 10:38:27 +08:00
_stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream);
if (_stream->conn_downstream != NULL)
{
_stream->defer_fd_downstream = 0;
}
else
{
goto __errout;
}
2018-11-02 13:52:30 +08:00
_stream->conn_upstream = __conn_private_create_by_fd(_stream, fd_upstream);
if (_stream->conn_upstream != NULL)
{
_stream->defer_fd_upstream = 0;
}
else
2018-11-02 13:52:30 +08:00
{
goto __errout;
}
assert(_stream->conn_downstream != NULL);
assert(_stream->conn_upstream != NULL);
2018-08-31 14:32:34 +08:00
__conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream);
2018-11-02 13:52:30 +08:00
TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_PLAIN, 1);
}
if (_stream->session_type == STREAM_PROTO_SSL)
{
_stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler;
_stream->future_upstream_create = future_create("ssl_up",
ssl_upstream_create_on_success, ssl_upstream_create_on_fail, (void *) _stream);
/* Defer setup conn_downstream & conn_upstream in async callbacks. */
ssl_async_upstream_create(_stream->future_upstream_create,
_stream->ssl_mgr, fd_upstream, fd_downstream, &_stream->head);
2018-11-02 13:52:30 +08:00
TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1);
}
return 0;
__errout:
/* The fds not been accept by this function, clear up and release at caller */
_stream->defer_fd_downstream = 0;
_stream->defer_fd_upstream = 0;
_stream->log_fd_downstream = 0;
_stream->log_fd_upstream = 0;
return -1;
}
int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
if (opt == TFE_STREAM_OPT_SESSION_TYPE)
{
assert(sz_arg == sizeof(enum tfe_stream_proto));
_stream->session_type = *(enum tfe_stream_proto *) arg;
}
else if (opt == TFE_STREAM_OPT_PASSTHROUGH)
{
assert(sz_arg == sizeof(bool));
_stream->tcp_passthough = *(bool *) arg;
}
else if (opt == TFE_STREAM_OPT_KEYRING_ID)
{
assert(sz_arg == sizeof(unsigned int));
_stream->keyring_id = *(unsigned int *) arg;
}
return 0;
}
struct tfe_cmsg * tfe_stream_get0_cmsg(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
return _stream->cmsg;
}
void tfe_stream_cmsg_setup(const struct tfe_stream * stream, struct tfe_cmsg * cmsg)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
assert(_stream->cmsg == NULL);
_stream->cmsg = cmsg;
}
void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, const char * fmt, ...)
{
va_list arg_ptr;
va_start(arg_ptr, fmt);
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
/* Format input content */
char * __tmp_buffer;
vasprintf(&__tmp_buffer, fmt, arg_ptr);
/* Log content with stream tag */
MESA_handle_runtime_log(_stream->stream_logger, level, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer);
free(__tmp_buffer);
}
2018-12-09 21:20:24 +06:00
int tfe_stream_shutdown(const struct tfe_stream * stream)
{
return 0;
}
int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
return 0;
}
void tfe_stream_kill(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
_stream->need_to_be_kill = true;
}