diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index f348747..f45d182 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -18,6 +18,12 @@ cert_store_port=9991 ca_path=resource/tfe/mesalab-ca.pem untrusted_ca_path=resource/tfe/mesalab-ca-untrust.pem +[ratelimit] +#read_rate=1048576 +#read_burst=1048576 +#write_rate=1048576 +#write_burst=1048576 + [debug] passthrough_all_tcp=0 diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index a71c8c9..e0f94be 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -46,6 +46,7 @@ struct tfe_conn_private struct tfe_stream_private * _stream_ref; evutil_socket_t fd; struct bufferevent * bev; + struct ev_token_bucket_cfg * ratelimit_bucket; uint8_t on_writing; }; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 0f382e9..b5403fd 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -46,6 +46,14 @@ struct tfe_proxy_tcp_options int tcp_ttl_downstream; }; +struct tfe_proxy_rate_limit_options +{ + unsigned int read_rate; + unsigned int read_burst; + unsigned int write_rate; + unsigned int write_burst; +}; + struct tfe_proxy_accept_para { /* Both upstream and downstream FDs */ @@ -73,6 +81,9 @@ struct tfe_proxy unsigned int nr_work_threads; struct tfe_thread_ctx * work_threads[TFE_THREAD_MAX]; + /* buffer options */ + unsigned int buffer_output_limit; + unsigned int nr_modules; struct tfe_plugin * modules; @@ -84,6 +95,10 @@ struct tfe_proxy unsigned int tcp_all_passthrough; struct tfe_proxy_tcp_options tcp_options; + /* GLOBAL RATELIMIT */ + unsigned int en_rate_limit; + struct tfe_proxy_rate_limit_options rate_limit_options; + /* PERFOMANCE MONIOTR VARIABLES*/ long long stat_val[TFE_STAT_MAX]; int fs_id[TFE_STAT_MAX]; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 1860581..c5beda0 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -238,10 +238,25 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) { /* Worker threads */ MESA_load_profile_uint_def(profile, "main", "nr_worker_threads", &proxy->nr_work_threads, 1); + MESA_load_profile_uint_def(profile, "main", "buffer_output_limit", &proxy->buffer_output_limit, 0); /* Debug */ MESA_load_profile_uint_def(profile, "debug", "passthrough_all_tcp", &proxy->tcp_all_passthrough, 0); + /* ratelimit */ + MESA_load_profile_uint_def(profile, "ratelimit", "read_rate", &proxy->rate_limit_options.read_rate, 0); + MESA_load_profile_uint_def(profile, "ratelimit", "read_burst", &proxy->rate_limit_options.read_burst, 0); + MESA_load_profile_uint_def(profile, "ratelimit", "write_rate", &proxy->rate_limit_options.write_rate, 0); + MESA_load_profile_uint_def(profile, "ratelimit", "write_burst", &proxy->rate_limit_options.write_burst, 0); + + if(proxy->rate_limit_options.read_rate != 0 + || proxy->rate_limit_options.read_burst != 0 + || proxy->rate_limit_options.write_rate != 0 + || proxy->rate_limit_options.write_burst != 0) + { + proxy->en_rate_limit = 1; + } + /* TCP options, -1 means unset, we shall not call setsockopt */ MESA_load_profile_int_def(profile, "tcp", "sz_rcv_buffer", &proxy->tcp_options.sz_rcv_buffer, -1); MESA_load_profile_int_def(profile, "tcp", "sz_snd_buffer", &proxy->tcp_options.sz_snd_buffer, -1); diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index b3ac452..e6ad917 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -1302,8 +1302,6 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) SSL_CTX_set_options(sslctx, SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS); #endif /* SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS */ - - if (mgr->no_ssl2) { SSL_CTX_set_options(sslctx, SSL_OP_NO_SSLv2); @@ -1333,8 +1331,6 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) { SSL_CTX_set_options(sslctx, SSL_OP_NO_COMPRESSION); } - - } diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 255f4b9..b5bef27 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -268,15 +268,21 @@ int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, co return ret; } -static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev) +static int conn_private_ratelimit_setup(struct tfe_conn_private * conn, struct tfe_proxy_rate_limit_options * opt) { - struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); - __conn_private->bev = bev; - __conn_private->fd = bufferevent_getfd(bev); + conn->ratelimit_bucket = ev_token_bucket_cfg_new(opt->read_rate, opt->read_burst, + opt->write_rate, opt->write_burst, NULL); - bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream); - bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); - return __conn_private; + if(unlikely(conn->ratelimit_bucket == NULL)) + { + TFE_LOG_ERROR(g_default_logger, "Failed at setting ratelimit bucket, " + "read_rate = %u, read_burst = %u, write_rate = %u, write_burst = %u", + opt->read_rate, opt->read_burst, opt->write_rate, opt->write_burst); + return -1; + } + + bufferevent_set_rate_limit(conn->bev, conn->ratelimit_bucket); + return 0; } int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type, @@ -318,11 +324,20 @@ evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) static void __conn_private_destory(struct tfe_conn_private * conn) { bufferevent_disable(conn->bev, EV_READ | EV_WRITE); + + if(conn->ratelimit_bucket) + { + ev_token_bucket_cfg_free(conn->ratelimit_bucket); + conn->ratelimit_bucket = NULL; + } + bufferevent_free(conn->bev); + if (conn->fd > 0) + { + evutil_closesocket(conn->fd); + } - if (conn->fd > 0) evutil_closesocket(conn->fd); free(conn); - TFE_PROXY_STAT_INCREASE(STAT_FD_INSTANT_CLOSE, 1); } @@ -338,6 +353,7 @@ static void __conn_private_destory_with_ssl(struct event_base * ev_base, static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + struct tfe_proxy * _proxy = _stream->proxy_ref; struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev)); struct evbuffer * __input_buffer = bufferevent_get_input(bev); @@ -393,16 +409,19 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; + const char * str_direction{}; if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; + str_direction = "UPSTREAM"; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; + str_direction = "DOWNSTREAM"; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) @@ -412,6 +431,26 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve __stream_bev_passthrough_readcb(bev, arg); } + if(events & BEV_EVENT_ERROR) + { + unsigned long err; + while ((err = (bufferevent_get_openssl_error(bev)))) + { + const char *msg = (const char*)ERR_reason_error_string(err); + const char *lib = (const char*)ERR_lib_error_string(err); + const char *func = (const char*)ERR_func_error_string(err); + + TFE_LOG_INFO(g_default_logger, "%s %s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", + _stream->str_stream_addr, str_direction, err, lib, func, msg); + } + + if (errno) + { + TFE_LOG_INFO(g_default_logger, "%s %s connection error, errno = %d, %s", + _stream->str_stream_addr, str_direction, errno, strerror(errno)); + } + } + goto __close_connection; } @@ -687,10 +726,29 @@ __call_plugin_close: tfe_stream_destory(_stream); } +static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private * stream, struct bufferevent * bev) +{ + struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); + __conn_private->bev = bev; + __conn_private->fd = bufferevent_getfd(bev); + + bufferevent_setcb(__conn_private->bev, __stream_bev_readcb, __stream_bev_writecb, __stream_bev_eventcb, stream); + bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); + + struct tfe_proxy * proxy_ref = stream->proxy_ref; + if(unlikely(proxy_ref->en_rate_limit)) + { + conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options); + } + + return __conn_private; +} + static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd) { struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); struct event_base * __ev_base = stream->thread_ref->evbase; + struct tfe_proxy * proxy_ref = stream->proxy_ref; __conn_private->_stream_ref = stream; __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS); @@ -714,6 +772,11 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private } bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); + if(unlikely(proxy_ref->en_rate_limit)) + { + conn_private_ratelimit_setup(__conn_private, &proxy_ref->rate_limit_options); + } + return __conn_private; __errout: