TSG-2612 tfe 执行 TCP Options

This commit is contained in:
luwenpeng
2020-07-21 20:00:14 +08:00
parent 13289d5a71
commit ddb1ccba12
5 changed files with 208 additions and 58 deletions

View File

@@ -46,15 +46,18 @@ struct tfe_proxy_tcp_options
/* TCP OPTIONS */
int sz_rcv_buffer;
int sz_snd_buffer;
int so_keepalive;
int tcp_keepidle;
int tcp_keepintvl;
int tcp_keepcnt;
int tcp_user_timeout;
/* TRACE FOR DEBUG */
int tcp_ttl_upstream;
int tcp_ttl_downstream;
/* TRACE FOR DEBUG */
int enable_overwrite;
int tcp_nodelay;
int so_keepalive;
int tcp_keepidle;
int tcp_keepintvl;
int tcp_keepcnt;
int tcp_user_timeout;
int tcp_ttl_upstream;
int tcp_ttl_downstream;
};
struct tfe_proxy_rate_limit_options

View File

@@ -159,6 +159,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstr
enum tfe_stream_proto stream_protocol;
uint8_t stream_protocol_in_char = 0;
int tcp_passthrough = 0;
uint16_t size = 0;
int result = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char,
@@ -174,8 +175,22 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstr
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &stream_protocol, sizeof(stream_protocol));
tfe_stream_cmsg_setup(stream, cmsg);
if (ctx->tcp_options.enable_overwrite <= 0)
{
result = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_PASSTHROUGH, (unsigned char *)&tcp_passthrough, sizeof(tcp_passthrough), &size);
if (result < 0)
{
TFE_LOG_ERROR(ctx->logger, "failed at fetch connection's tcp_passthrough from cmsg: %s", strerror(-result));
// goto __errout;
}
else
{
TFE_LOG_DEBUG(ctx->logger, "%p: fetch tcp options, passthrough: %d", stream, tcp_passthrough);
}
}
/* FOR DEBUG */
if (unlikely(ctx->tcp_all_passthrough))
if (unlikely(ctx->tcp_all_passthrough) || tcp_passthrough)
{
bool __true = true;
enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN;
@@ -374,15 +389,18 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile)
/* TCP options, -1 means unset, we shall not call setsockopt */
MESA_load_profile_int_def(profile, "tcp", "sz_rcv_buffer", &proxy->tcp_options.sz_rcv_buffer, -1);
MESA_load_profile_int_def(profile, "tcp", "sz_snd_buffer", &proxy->tcp_options.sz_snd_buffer, -1);
MESA_load_profile_int_def(profile, "tcp", "so_keepalive", &proxy->tcp_options.so_keepalive, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_keepidle", &proxy->tcp_options.tcp_keepidle, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_keepintvl", &proxy->tcp_options.tcp_keepintvl, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_keepcnt", &proxy->tcp_options.tcp_keepcnt, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1);
return 0;
MESA_load_profile_int_def(profile, "tcp", "enable_overwrite", &proxy->tcp_options.enable_overwrite, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_nodelay", &proxy->tcp_options.tcp_nodelay, -1);
MESA_load_profile_int_def(profile, "tcp", "so_keepalive", &proxy->tcp_options.so_keepalive, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_keepidle", &proxy->tcp_options.tcp_keepidle, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_keepintvl", &proxy->tcp_options.tcp_keepintvl, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_keepcnt", &proxy->tcp_options.tcp_keepcnt, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1);
MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1);
return 0;
}
static const char * __str_stat_spec_map[] =

View File

@@ -1249,15 +1249,80 @@ int __fd_ttl_option_setup(struct tfe_stream_private * _stream, evutil_socket_t f
return 0;
}
struct tfe_tcp_options
{
int tcp_nodelay;
int tcp_ttl;
int tcp_keepalive;
int tcp_keepcnt;
int tcp_keepidle;
int tcp_keepintvl;
int tcp_user_timeout;
};
static void get_tcp_option_from_cmsg(struct tfe_cmsg *cmsg, struct tfe_tcp_options *options, tfe_conn_dir dir)
{
int ret;
uint16_t size = 0;
enum tfe_cmsg_tlv_type type;
memset(options, 0, sizeof(struct tfe_tcp_options));
const char *dir_str = (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream");
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_NODELAY : TFE_CMSG_UPSTREAM_TCP_NODELAY;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_nodelay), sizeof(options->tcp_nodelay), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_nodelay from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_TTL : TFE_CMSG_UPSTREAM_TCP_TTL;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_ttl), sizeof(options->tcp_ttl), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_ttl from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPALIVE : TFE_CMSG_UPSTREAM_TCP_KEEPALIVE;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepalive), sizeof(options->tcp_keepalive), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepalive from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPCNT : TFE_CMSG_UPSTREAM_TCP_KEEPCNT;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepcnt), sizeof(options->tcp_keepcnt), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepcnt from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPIDLE : TFE_CMSG_UPSTREAM_TCP_KEEPIDLE;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepidle), sizeof(options->tcp_keepidle), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepidle from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPINTVL : TFE_CMSG_UPSTREAM_TCP_KEEPINTVL;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepintvl), sizeof(options->tcp_keepintvl), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepintvl from cmsg: %s", dir_str, strerror(-ret));
}
type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_USER_TIMEOUT : TFE_CMSG_UPSTREAM_TCP_USER_TIMEOUT;
ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_user_timeout), sizeof(options->tcp_user_timeout), &size);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_user_timeout from cmsg: %s", dir_str, strerror(-ret));
}
}
void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, tfe_conn_dir dir)
{
struct tfe_stream * stream = &_stream->head;
struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options;
struct tfe_tcp_options options = {0};
/* Make it non-blocking */
int ret = evutil_make_socket_nonblocking(fd);
assert(ret >= 0);
/* get tcp options from tfe.conf */
/* Recv Buffer */
if (tcp_options->sz_rcv_buffer >= 0)
{
@@ -1282,71 +1347,103 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket
}
}
/* Keep-alive */
if (tcp_options->so_keepalive > 0)
if (tcp_options->enable_overwrite > 0)
{
/* get tcp options from tfe.conf */
options.tcp_keepalive = tcp_options->so_keepalive;
options.tcp_keepcnt = tcp_options->tcp_keepcnt;
options.tcp_keepidle = tcp_options->tcp_keepidle;
options.tcp_keepintvl = tcp_options->tcp_keepintvl;
options.tcp_nodelay = tcp_options->tcp_nodelay;
options.tcp_ttl = (dir == CONN_DIR_DOWNSTREAM ? tcp_options->tcp_ttl_downstream : tcp_options->tcp_ttl_upstream);
options.tcp_user_timeout = tcp_options->tcp_user_timeout;
}
else
{
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &tcp_options->so_keepalive, sizeof(int)) == -1)
/* get tcp options form cmsg */
get_tcp_option_from_cmsg(_stream->cmsg, &options, dir);
}
/* Keep-alive */
if (options.tcp_keepalive > 0)
{
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &(options.tcp_keepalive), sizeof(options.tcp_keepalive)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_KEEPALIVE, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->so_keepalive, strerror(errno));
/* after log, reset errno */
errno = 0;
}
stream->str_stream_info, options.tcp_keepalive, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
if (tcp_options->tcp_keepcnt > 0)
if (options.tcp_keepcnt > 0)
{
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &tcp_options->tcp_keepcnt, sizeof(int)) == -1)
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &(options.tcp_keepcnt), sizeof(options.tcp_keepcnt)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPCNT, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->tcp_keepcnt, strerror(errno));
/* after log, reset errno */
errno = 0;
}
stream->str_stream_info, options.tcp_keepcnt, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
if (tcp_options->tcp_keepintvl > 0)
if (options.tcp_keepintvl > 0)
{
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &tcp_options->tcp_keepintvl, sizeof(int)) == -1)
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &(options.tcp_keepintvl), sizeof(options.tcp_keepintvl)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPINTVL, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->tcp_keepintvl, strerror(errno));
/* after log, reset errno */
errno = 0;
}
stream->str_stream_info, options.tcp_keepintvl, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
if (tcp_options->tcp_keepidle > 0)
if (options.tcp_keepidle > 0)
{
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &tcp_options->tcp_keepidle, sizeof(int)) == -1)
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &(options.tcp_keepidle), sizeof(options.tcp_keepidle)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPIDLE, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->tcp_keepidle, strerror(errno));
/* after log, reset errno */
errno = 0;
}
stream->str_stream_info, options.tcp_keepidle, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
if (tcp_options->tcp_user_timeout > 0)
if (options.tcp_user_timeout > 0)
{
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (const void *) &tcp_options->tcp_user_timeout, sizeof(int))
== -1)
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (const void *) &(options.tcp_user_timeout), sizeof(options.tcp_user_timeout)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_USER_TIMEOUT, %d) failed, ignored: %s",
stream->str_stream_info, tcp_options->tcp_user_timeout, strerror(errno));
stream->str_stream_info, options.tcp_user_timeout, strerror(errno));
/* after log, reset errno */
errno = 0;
}
}
int __ttl = (dir == CONN_DIR_UPSTREAM) ? tcp_options->tcp_ttl_upstream : tcp_options->tcp_ttl_downstream;
if (__ttl > 0 && __fd_ttl_option_setup(_stream, fd, __ttl) < 0)
// enable/disable
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &(options.tcp_nodelay), sizeof(options.tcp_nodelay)) == -1)
{
TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d",
stream->str_stream_info, __ttl, fd);
TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_NODELAY, %d) failed, ignored: %s",
stream->str_stream_info, options.tcp_nodelay, strerror(errno));
/* after log, reset errno */
errno = 0;
}
if (options.tcp_ttl > 0)
{
if (__fd_ttl_option_setup(_stream, fd, options.tcp_ttl) < 0)
{
TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d",
stream->str_stream_info, options.tcp_ttl, fd);
}
}
(void)ret;
TFE_LOG_DEBUG(g_default_logger,
"%s %s: fetch tcp options, nodelay: %d ttl: %d keepalive: %d keepcnt: %d keepidle: %d keepintvl: %d user_timeout: %d",
stream->str_stream_info, (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream"),
options.tcp_nodelay, options.tcp_ttl, options.tcp_keepalive,
options.tcp_keepcnt, options.tcp_keepidle, options.tcp_keepintvl, options.tcp_user_timeout);
}
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
@@ -1358,18 +1455,18 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
_stream->log_fd_downstream = fd_downstream;
_stream->log_fd_upstream = fd_upstream;
__stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM);
__stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM);
_stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM);
if (unlikely(_stream->head.addr == NULL))
{
TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.",
fd_downstream, fd_upstream); goto __errout;
}
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
stream->str_stream_info = _stream->str_stream_addr;
__stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM);
__stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM);
if (_stream->session_type == STREAM_PROTO_PLAIN)
{