From ddb1ccba12d463fe38a6aed93dd0656445e50f48 Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Tue, 21 Jul 2020 20:00:14 +0800 Subject: [PATCH] =?UTF-8?q?TSG-2612=20tfe=20=E6=89=A7=E8=A1=8C=20TCP=20Opt?= =?UTF-8?q?ions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/include/tfe_cmsg.h | 25 ++++- conf/tfe/tfe.conf | 11 +- platform/include/internal/proxy.h | 19 ++-- platform/src/proxy.cpp | 36 ++++-- platform/src/tcp_stream.cpp | 175 +++++++++++++++++++++++------- 5 files changed, 208 insertions(+), 58 deletions(-) diff --git a/common/include/tfe_cmsg.h b/common/include/tfe_cmsg.h index 16c879a..e7c0142 100644 --- a/common/include/tfe_cmsg.h +++ b/common/include/tfe_cmsg.h @@ -1,6 +1,5 @@ #pragma once -#define TFE_CMSG_TLV_NR_MAX 64 struct tfe_cmsg; struct tfe_cmsg_serialize_header; @@ -42,6 +41,30 @@ enum tfe_cmsg_tlv_type /* Original Traffic's src & dst MAC address */ TFE_CMSG_SRC_MAC, TFE_CMSG_DST_MAC, + + /* TCP option information */ + TFE_CMSG_DOWNSTREAM_TCP_NODELAY, + TFE_CMSG_DOWNSTREAM_TCP_TTL, + TFE_CMSG_DOWNSTREAM_TCP_KEEPALIVE, + TFE_CMSG_DOWNSTREAM_TCP_KEEPCNT, + TFE_CMSG_DOWNSTREAM_TCP_KEEPIDLE, + TFE_CMSG_DOWNSTREAM_TCP_KEEPINTVL, + TFE_CMSG_DOWNSTREAM_TCP_USER_TIMEOUT, + + TFE_CMSG_UPSTREAM_TCP_NODELAY, + TFE_CMSG_UPSTREAM_TCP_TTL, + TFE_CMSG_UPSTREAM_TCP_KEEPALIVE, + TFE_CMSG_UPSTREAM_TCP_KEEPCNT, + TFE_CMSG_UPSTREAM_TCP_KEEPIDLE, + TFE_CMSG_UPSTREAM_TCP_KEEPINTVL, + TFE_CMSG_UPSTREAM_TCP_USER_TIMEOUT, + + TFE_CMSG_TCP_PASSTHROUGH, + + /* Add new cmsg here */ + + /* MAX cmsg num */ + TFE_CMSG_TLV_NR_MAX }; struct tfe_cmsg* tfe_cmsg_init(); diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index fa5d0bd..526e012 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -55,6 +55,8 @@ untrusted_ca_path=resource/tfe/tango-ca-untrust-ca.pem enable_health_check=1 [debug] +# 1 : enforce tcp passthrough +# 0 : Whether to passthrough depends on the tcp_options in cmsg passthrough_all_tcp=0 [ratelimit] @@ -64,11 +66,18 @@ passthrough_all_tcp=0 #write_burst=200000 [tcp] +sz_rcv_buffer=0 +sz_snd_buffer=0 + +# 1 : use tcp_options in tfe.conf +# 0 : use tcp_options in cmsg +enable_overwrite=0 +tcp_nodelay=1 so_keepalive=1 tcp_keepcnt=8 tcp_keepintvl=15 tcp_keepidle=30 -tcp_user_timeout=30 +tcp_user_timeout=600 tcp_ttl_upstream=75 tcp_ttl_downstream=70 diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 92a64d6..cc79b11 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -46,15 +46,18 @@ struct tfe_proxy_tcp_options /* TCP OPTIONS */ int sz_rcv_buffer; int sz_snd_buffer; - int so_keepalive; - int tcp_keepidle; - int tcp_keepintvl; - int tcp_keepcnt; - int tcp_user_timeout; - /* TRACE FOR DEBUG */ - int tcp_ttl_upstream; - int tcp_ttl_downstream; + /* TRACE FOR DEBUG */ + int enable_overwrite; + int tcp_nodelay; + int so_keepalive; + int tcp_keepidle; + int tcp_keepintvl; + int tcp_keepcnt; + int tcp_user_timeout; + + int tcp_ttl_upstream; + int tcp_ttl_downstream; }; struct tfe_proxy_rate_limit_options diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 1c5f421..832b01b 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -159,6 +159,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstr enum tfe_stream_proto stream_protocol; uint8_t stream_protocol_in_char = 0; + int tcp_passthrough = 0; uint16_t size = 0; int result = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_PROTOCOL, (unsigned char *)&stream_protocol_in_char, @@ -174,8 +175,22 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstr tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &stream_protocol, sizeof(stream_protocol)); tfe_stream_cmsg_setup(stream, cmsg); + if (ctx->tcp_options.enable_overwrite <= 0) + { + result = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_PASSTHROUGH, (unsigned char *)&tcp_passthrough, sizeof(tcp_passthrough), &size); + if (result < 0) + { + TFE_LOG_ERROR(ctx->logger, "failed at fetch connection's tcp_passthrough from cmsg: %s", strerror(-result)); + // goto __errout; + } + else + { + TFE_LOG_DEBUG(ctx->logger, "%p: fetch tcp options, passthrough: %d", stream, tcp_passthrough); + } + } + /* FOR DEBUG */ - if (unlikely(ctx->tcp_all_passthrough)) + if (unlikely(ctx->tcp_all_passthrough) || tcp_passthrough) { bool __true = true; enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN; @@ -374,15 +389,18 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) /* TCP options, -1 means unset, we shall not call setsockopt */ MESA_load_profile_int_def(profile, "tcp", "sz_rcv_buffer", &proxy->tcp_options.sz_rcv_buffer, -1); MESA_load_profile_int_def(profile, "tcp", "sz_snd_buffer", &proxy->tcp_options.sz_snd_buffer, -1); - MESA_load_profile_int_def(profile, "tcp", "so_keepalive", &proxy->tcp_options.so_keepalive, -1); - MESA_load_profile_int_def(profile, "tcp", "tcp_keepidle", &proxy->tcp_options.tcp_keepidle, -1); - MESA_load_profile_int_def(profile, "tcp", "tcp_keepintvl", &proxy->tcp_options.tcp_keepintvl, -1); - MESA_load_profile_int_def(profile, "tcp", "tcp_keepcnt", &proxy->tcp_options.tcp_keepcnt, -1); - MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1); - MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1); - MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1); - return 0; + MESA_load_profile_int_def(profile, "tcp", "enable_overwrite", &proxy->tcp_options.enable_overwrite, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_nodelay", &proxy->tcp_options.tcp_nodelay, -1); + MESA_load_profile_int_def(profile, "tcp", "so_keepalive", &proxy->tcp_options.so_keepalive, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_keepidle", &proxy->tcp_options.tcp_keepidle, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_keepintvl", &proxy->tcp_options.tcp_keepintvl, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_keepcnt", &proxy->tcp_options.tcp_keepcnt, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_user_timeout", &proxy->tcp_options.tcp_user_timeout, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1); + MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1); + + return 0; } static const char * __str_stat_spec_map[] = diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index fce9cf2..90d9c31 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -1249,15 +1249,80 @@ int __fd_ttl_option_setup(struct tfe_stream_private * _stream, evutil_socket_t f return 0; } +struct tfe_tcp_options +{ + int tcp_nodelay; + int tcp_ttl; + int tcp_keepalive; + int tcp_keepcnt; + int tcp_keepidle; + int tcp_keepintvl; + int tcp_user_timeout; +}; + +static void get_tcp_option_from_cmsg(struct tfe_cmsg *cmsg, struct tfe_tcp_options *options, tfe_conn_dir dir) +{ + int ret; + uint16_t size = 0; + enum tfe_cmsg_tlv_type type; + memset(options, 0, sizeof(struct tfe_tcp_options)); + + const char *dir_str = (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream"); + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_NODELAY : TFE_CMSG_UPSTREAM_TCP_NODELAY; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_nodelay), sizeof(options->tcp_nodelay), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_nodelay from cmsg: %s", dir_str, strerror(-ret)); + } + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_TTL : TFE_CMSG_UPSTREAM_TCP_TTL; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_ttl), sizeof(options->tcp_ttl), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_ttl from cmsg: %s", dir_str, strerror(-ret)); + } + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPALIVE : TFE_CMSG_UPSTREAM_TCP_KEEPALIVE; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepalive), sizeof(options->tcp_keepalive), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepalive from cmsg: %s", dir_str, strerror(-ret)); + } + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPCNT : TFE_CMSG_UPSTREAM_TCP_KEEPCNT; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepcnt), sizeof(options->tcp_keepcnt), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepcnt from cmsg: %s", dir_str, strerror(-ret)); + } + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPIDLE : TFE_CMSG_UPSTREAM_TCP_KEEPIDLE; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepidle), sizeof(options->tcp_keepidle), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepidle from cmsg: %s", dir_str, strerror(-ret)); + } + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_KEEPINTVL : TFE_CMSG_UPSTREAM_TCP_KEEPINTVL; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_keepintvl), sizeof(options->tcp_keepintvl), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_keepintvl from cmsg: %s", dir_str, strerror(-ret)); + } + type = (dir == CONN_DIR_DOWNSTREAM) ? TFE_CMSG_DOWNSTREAM_TCP_USER_TIMEOUT : TFE_CMSG_UPSTREAM_TCP_USER_TIMEOUT; + ret = tfe_cmsg_get_value(cmsg, type, (unsigned char *)&(options->tcp_user_timeout), sizeof(options->tcp_user_timeout), &size); + if (ret < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at fetch connection's %s tcp_user_timeout from cmsg: %s", dir_str, strerror(-ret)); + } +} + void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket_t fd, tfe_conn_dir dir) { struct tfe_stream * stream = &_stream->head; struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options; + struct tfe_tcp_options options = {0}; /* Make it non-blocking */ int ret = evutil_make_socket_nonblocking(fd); assert(ret >= 0); + /* get tcp options from tfe.conf */ /* Recv Buffer */ if (tcp_options->sz_rcv_buffer >= 0) { @@ -1282,71 +1347,103 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket } } - /* Keep-alive */ - if (tcp_options->so_keepalive > 0) + if (tcp_options->enable_overwrite > 0) + { + /* get tcp options from tfe.conf */ + options.tcp_keepalive = tcp_options->so_keepalive; + options.tcp_keepcnt = tcp_options->tcp_keepcnt; + options.tcp_keepidle = tcp_options->tcp_keepidle; + options.tcp_keepintvl = tcp_options->tcp_keepintvl; + options.tcp_nodelay = tcp_options->tcp_nodelay; + options.tcp_ttl = (dir == CONN_DIR_DOWNSTREAM ? tcp_options->tcp_ttl_downstream : tcp_options->tcp_ttl_upstream); + options.tcp_user_timeout = tcp_options->tcp_user_timeout; + } + else { - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &tcp_options->so_keepalive, sizeof(int)) == -1) + /* get tcp options form cmsg */ + get_tcp_option_from_cmsg(_stream->cmsg, &options, dir); + } + + /* Keep-alive */ + if (options.tcp_keepalive > 0) + { + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (const void *) &(options.tcp_keepalive), sizeof(options.tcp_keepalive)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(SO_KEEPALIVE, %d) failed, ignored: %s", - stream->str_stream_info, tcp_options->so_keepalive, strerror(errno)); - /* after log, reset errno */ - errno = 0; - } + stream->str_stream_info, options.tcp_keepalive, strerror(errno)); + /* after log, reset errno */ + errno = 0; + } } - if (tcp_options->tcp_keepcnt > 0) + if (options.tcp_keepcnt > 0) { - if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &tcp_options->tcp_keepcnt, sizeof(int)) == -1) + if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, (const void *) &(options.tcp_keepcnt), sizeof(options.tcp_keepcnt)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPCNT, %d) failed, ignored: %s", - stream->str_stream_info, tcp_options->tcp_keepcnt, strerror(errno)); - /* after log, reset errno */ - errno = 0; - } + stream->str_stream_info, options.tcp_keepcnt, strerror(errno)); + /* after log, reset errno */ + errno = 0; + } } - if (tcp_options->tcp_keepintvl > 0) + if (options.tcp_keepintvl > 0) { - if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &tcp_options->tcp_keepintvl, sizeof(int)) == -1) + if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, (const void *) &(options.tcp_keepintvl), sizeof(options.tcp_keepintvl)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPINTVL, %d) failed, ignored: %s", - stream->str_stream_info, tcp_options->tcp_keepintvl, strerror(errno)); - /* after log, reset errno */ - errno = 0; - } + stream->str_stream_info, options.tcp_keepintvl, strerror(errno)); + /* after log, reset errno */ + errno = 0; + } } - - if (tcp_options->tcp_keepidle > 0) + + if (options.tcp_keepidle > 0) { - if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &tcp_options->tcp_keepidle, sizeof(int)) == -1) + if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, (const void *) &(options.tcp_keepidle), sizeof(options.tcp_keepidle)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_KEEPIDLE, %d) failed, ignored: %s", - stream->str_stream_info, tcp_options->tcp_keepidle, strerror(errno)); - /* after log, reset errno */ - errno = 0; - } + stream->str_stream_info, options.tcp_keepidle, strerror(errno)); + /* after log, reset errno */ + errno = 0; + } } - if (tcp_options->tcp_user_timeout > 0) + if (options.tcp_user_timeout > 0) { - if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (const void *) &tcp_options->tcp_user_timeout, sizeof(int)) - == -1) + if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (const void *) &(options.tcp_user_timeout), sizeof(options.tcp_user_timeout)) == -1) { TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_USER_TIMEOUT, %d) failed, ignored: %s", - stream->str_stream_info, tcp_options->tcp_user_timeout, strerror(errno)); + stream->str_stream_info, options.tcp_user_timeout, strerror(errno)); /* after log, reset errno */ errno = 0; } } - int __ttl = (dir == CONN_DIR_UPSTREAM) ? tcp_options->tcp_ttl_upstream : tcp_options->tcp_ttl_downstream; - if (__ttl > 0 && __fd_ttl_option_setup(_stream, fd, __ttl) < 0) + // enable/disable + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &(options.tcp_nodelay), sizeof(options.tcp_nodelay)) == -1) { - TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", - stream->str_stream_info, __ttl, fd); + TFE_LOG_ERROR(g_default_logger, "%s: setsockopt(TCP_NODELAY, %d) failed, ignored: %s", + stream->str_stream_info, options.tcp_nodelay, strerror(errno)); + /* after log, reset errno */ + errno = 0; + } + + if (options.tcp_ttl > 0) + { + if (__fd_ttl_option_setup(_stream, fd, options.tcp_ttl) < 0) + { + TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", + stream->str_stream_info, options.tcp_ttl, fd); + } } - (void)ret; + TFE_LOG_DEBUG(g_default_logger, + "%s %s: fetch tcp options, nodelay: %d ttl: %d keepalive: %d keepcnt: %d keepidle: %d keepintvl: %d user_timeout: %d", + stream->str_stream_info, (dir == CONN_DIR_DOWNSTREAM ? "downstream" : "upstream"), + options.tcp_nodelay, options.tcp_ttl, options.tcp_keepalive, + options.tcp_keepcnt, options.tcp_keepidle, options.tcp_keepintvl, options.tcp_user_timeout); + } int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) @@ -1358,18 +1455,18 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst _stream->log_fd_downstream = fd_downstream; _stream->log_fd_upstream = fd_upstream; - __stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM); - __stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM); - _stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM); if (unlikely(_stream->head.addr == NULL)) { TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", fd_downstream, fd_upstream); goto __errout; } - + _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); stream->str_stream_info = _stream->str_stream_addr; + + __stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM); + __stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM); if (_stream->session_type == STREAM_PROTO_PLAIN) {