diff --git a/common/include/tfe_tcp_restore.h b/common/include/tfe_tcp_restore.h index 535fdb1..e706c88 100644 --- a/common/include/tfe_tcp_restore.h +++ b/common/include/tfe_tcp_restore.h @@ -38,7 +38,7 @@ struct tcp_restore_info }; void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info); -int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer); +int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer, const char *devname, unsigned int fd_so_mask); #ifdef __cpluscplus } diff --git a/common/src/tfe_tcp_restore.cpp b/common/src/tfe_tcp_restore.cpp index 08221b3..d66c9be 100644 --- a/common/src/tfe_tcp_restore.cpp +++ b/common/src/tfe_tcp_restore.cpp @@ -5,12 +5,11 @@ #include #include #include +#include #include #include -static unsigned int fd_so_mask = 0x65; - void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info) { char str_client_addr[64] = { 0 }; @@ -57,12 +56,13 @@ void tfe_tcp_restore_info_dump(const struct tcp_restore_info *info) } } -int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer) +int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const struct tcp_restore_endpoint *peer, const char *devname, unsigned int fd_so_mask) { int result = 0; int sockopt = 0; int sockfd = 0; - + char buffer[IFNAMSIZ] = {0}; + socklen_t buffer_len = sizeof(buffer); unsigned int nr_tcp_repair_opts = 0; struct tcp_repair_opt tcp_repair_opts[8]; struct tcp_repair_window tcp_repair_window = { 0 }; @@ -88,6 +88,21 @@ int tfe_tcp_restore_fd_create(const struct tcp_restore_endpoint *endpoint, const goto errout; } + result = setsockopt(sockfd, SOL_SOCKET, SO_BINDTODEVICE, devname, strlen(devname)); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at setsockopt(SO_BINDTODEVICE) on %d, %d: %s", devname, errno, strerror(errno)); + goto errout; + } + + result = getsockopt(sockfd, SOL_SOCKET, SO_BINDTODEVICE, buffer, &buffer_len); + if (result < 0) + { + TFE_LOG_ERROR(g_default_logger, "failed at getsockopt(SO_BINDTODEVICE) on %d, %d: %s", devname, errno, strerror(errno)); + goto errout; + } + TFE_LOG_DEBUG(g_default_logger, "sockfd %d successfully bound to %s device, so_mask: %x", sockfd, buffer, fd_so_mask); + // Setup TCP REPAIR Status sockopt = 1; result = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&sockopt, sizeof(sockopt)); diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index 5e5960a..b5ba0a2 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -22,6 +22,7 @@ load_balance=1 # for enable kni v3 [nfq] +device=tap0 queue_id=1 queue_maxlen=655350 queue_rcvbufsiz=983025000 @@ -166,6 +167,15 @@ default_vlan_id=2 table_info=resource/pangu/table_info_traffic_mirror.conf stat_file=log/traffic_mirror.status +[traffic_steering] +enable=1 +# 17: 0x11 +so_mask_client=17 +# 34: 0x22 +so_mask_server=34 +device_client=eth_client +device_server=eth_server + [kafka] enable=1 vsystem_id=1 diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 92540ac..446683e 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -85,6 +85,8 @@ struct tfe_stream_private struct tfe_stream_write_ctx * w_ctx_downstream; struct tfe_conn_private * conn_upstream; struct tfe_conn_private * conn_downstream; + struct tfe_conn_private * conn_fake_c; + struct tfe_conn_private * conn_fake_s; struct { @@ -130,6 +132,9 @@ struct tfe_stream_private /* KEYRING-ID */ unsigned keyring_id; + evutil_socket_t fd_fake_c; + evutil_socket_t fd_fake_s; + /* ONLY FOR LOG */ evutil_socket_t log_fd_downstream; evutil_socket_t log_fd_upstream; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 5a71deb..970a6d1 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -4,6 +4,7 @@ #include #include #include +#include struct ssl_mgr; struct key_keeper; @@ -60,6 +61,15 @@ struct tfe_proxy_tcp_options int tcp_ttl_downstream; }; +struct tfe_traffic_steering_options +{ + int enable; + int so_mask_client; + int so_mask_server; + char device_client[IFNAMSIZ]; + char device_server[IFNAMSIZ]; +}; + struct tfe_proxy_rate_limit_options { unsigned int read_rate; @@ -141,6 +151,8 @@ struct tfe_proxy /* load balancing */ enum tfe_load_balance_algo load_balance; + + struct tfe_traffic_steering_options traffic_steering_options; }; extern struct tfe_proxy * g_default_proxy; @@ -152,6 +164,6 @@ struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx); void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx); struct tfe_proxy * tfe_proxy_new(const char * profile); -int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, struct tfe_cmsg * cmsg); +int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg * cmsg); void tfe_proxy_run(struct tfe_proxy * proxy); int tfe_thread_set_affinity(int core_id); diff --git a/platform/include/internal/tcp_stream.h b/platform/include/internal/tcp_stream.h index 63fcd9f..4d77b40 100644 --- a/platform/include/internal/tcp_stream.h +++ b/platform/include/internal/tcp_stream.h @@ -12,5 +12,5 @@ enum tfe_stream_option }; int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg); -int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream); +int tfe_stream_init_by_fds(struct tfe_stream *stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, evutil_socket_t fd_fake_c, evutil_socket_t fd_fake_s); void tfe_stream_destory(struct tfe_stream_private * stream); diff --git a/platform/src/acceptor_kni_v1.cpp b/platform/src/acceptor_kni_v1.cpp index ccd6db0..1b4e0e1 100644 --- a/platform/src/acceptor_kni_v1.cpp +++ b/platform/src/acceptor_kni_v1.cpp @@ -284,7 +284,7 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user) tfe_cmsg_set(__tfe_cmsg, TFE_CMSG_POLICY_ID, (const unsigned char *)&keyring_id, sizeof(keyring_id)); TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2); - if (tfe_proxy_fds_accept(__ctx->proxy, __fds[0], __fds[1], __tfe_cmsg) < 0) + if (tfe_proxy_fds_accept(__ctx->proxy, __fds[0], __fds[1], 0, 0, __tfe_cmsg) < 0) { goto __drop_recieved_fds; } diff --git a/platform/src/acceptor_kni_v2.cpp b/platform/src/acceptor_kni_v2.cpp index ace7de6..ae6c3a8 100644 --- a/platform/src/acceptor_kni_v2.cpp +++ b/platform/src/acceptor_kni_v2.cpp @@ -120,7 +120,7 @@ void acceptor_kni_v2_event(evutil_socket_t fd, short what, void * user) } TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2); - if (tfe_proxy_fds_accept(__ctx->proxy, __fds[0], __fds[1], cmsg) < 0) + if (tfe_proxy_fds_accept(__ctx->proxy, __fds[0], __fds[1], 0, 0, cmsg) < 0) { goto __drop_recieved_fds; } diff --git a/platform/src/acceptor_kni_v3.cpp b/platform/src/acceptor_kni_v3.cpp index 290167f..1045c13 100644 --- a/platform/src/acceptor_kni_v3.cpp +++ b/platform/src/acceptor_kni_v3.cpp @@ -19,7 +19,7 @@ struct acceptor_kni_v3 { struct tfe_proxy *proxy; const char *profile; - + char device[IFNAMSIZ]; struct nfq_handle *h; struct nfq_q_handle *qh; int fd_nfq_socket; @@ -285,6 +285,8 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s int ret = 0; int fd_downstream = 0; int fd_upstream = 0; + int fd_fake_c = 0; + int fd_fake_s = 0; int hit_tcpopt = 0; uint16_t cmsg_offset = 0; uint8_t restore_opt_len = 0; @@ -404,7 +406,7 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s tfe_tcp_restore_info_dump(&restore_info); // tcp repair C2S - fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server)); + fd_upstream = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), __ctx->device, 0x65); if (fd_upstream < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(UPSTREAM)"); @@ -412,20 +414,37 @@ static int payload_handler_cb(struct nfq_q_handle *qh, struct nfgenmsg *nfmsg, s } // tcp repair S2C - fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client)); + fd_downstream = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), __ctx->device, 0x65); if (fd_downstream < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(DOWNSTREAM)"); goto end; } + if (__ctx->proxy->traffic_steering_options.enable) + { + fd_fake_c = tfe_tcp_restore_fd_create(&(restore_info.client), &(restore_info.server), __ctx->proxy->traffic_steering_options.device_client, __ctx->proxy->traffic_steering_options.so_mask_client); + if (fd_fake_c < 0) + { + TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(fd_fake_c)"); + goto end; + } + + fd_fake_s = tfe_tcp_restore_fd_create(&(restore_info.server), &(restore_info.client), __ctx->proxy->traffic_steering_options.device_server, __ctx->proxy->traffic_steering_options.so_mask_server); + if (fd_fake_s < 0) + { + TFE_LOG_ERROR(g_default_logger, "Failed at tcp_restore_fd_create(fd_fake_s)"); + goto end; + } + } + if (tfe_cmsg_deserialize((const unsigned char *)restore_info.cmsg, restore_info.cmsg_len, &cmsg) < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tfe_cmsg_deserialize()"); goto end; } - if (tfe_proxy_fds_accept(__ctx->proxy, fd_downstream, fd_upstream, cmsg) < 0) + if (tfe_proxy_fds_accept(__ctx->proxy, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s, cmsg) < 0) { TFE_LOG_ERROR(g_default_logger, "Failed at tfe_proxy_fds_accept()"); goto end; @@ -556,6 +575,7 @@ struct acceptor_kni_v3 *acceptor_kni_v3_create(struct tfe_proxy *proxy, const ch __ctx->proxy = proxy; __ctx->profile = profile; + MESA_load_profile_string_def(profile, "nfq", "device", __ctx->device, sizeof(__ctx->device), "tap0"); MESA_load_profile_uint_def(profile, "nfq", "queue_id", &(__ctx->queue_id), 1); MESA_load_profile_uint_def(profile, "nfq", "queue_maxlen", &(__ctx->queue_maxlen), 65535); MESA_load_profile_uint_def(profile, "nfq", "queue_rcvbufsiz", &(__ctx->queue_rcvbufsiz), 98302500); diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index b43129f..a162e23 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -155,7 +155,7 @@ int tfe_thread_set_affinity(int core_id) return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); } -int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstream, struct tfe_cmsg * cmsg) +int tfe_proxy_fds_accept(struct tfe_proxy *ctx, int fd_downstream, int fd_upstream, int fd_fake_c, int fd_fake_s, struct tfe_cmsg *cmsg) { struct tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); @@ -203,7 +203,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, int fd_downstream, int fd_upstr TFE_LOG_DEBUG(ctx->logger, "%p: fetch tcp options: cmsg's tcp_passthrough: %d, conf's tcp_passthrough: %d, enalbe passthrough: %d", stream, tcp_passthrough, ctx->tcp_all_passthrough, (ctx->tcp_all_passthrough > 0 || tcp_passthrough > 0) ? 1 : 0); - result = tfe_stream_init_by_fds(stream, fd_downstream, fd_upstream); + result = tfe_stream_init_by_fds(stream, fd_downstream, fd_upstream, fd_fake_c, fd_fake_s); if (result < 0) { TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.", @@ -430,7 +430,13 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_upstream", &proxy->tcp_options.tcp_ttl_upstream, -1); MESA_load_profile_int_def(profile, "tcp", "tcp_ttl_downstream", &proxy->tcp_options.tcp_ttl_downstream, -1); - return 0; + MESA_load_profile_int_def(profile, "traffic_steering", "enable", &proxy->traffic_steering_options.enable, 0); + MESA_load_profile_int_def(profile, "traffic_steering", "so_mask_client", &proxy->traffic_steering_options.so_mask_client, 0x11); + MESA_load_profile_int_def(profile, "traffic_steering", "so_mask_server", &proxy->traffic_steering_options.so_mask_server, 0x22); + MESA_load_profile_string_def(profile, "traffic_steering", "device_client", proxy->traffic_steering_options.device_client, sizeof(proxy->traffic_steering_options.device_client), "eth_client"); + MESA_load_profile_string_def(profile, "traffic_steering", "device_server", proxy->traffic_steering_options.device_server, sizeof(proxy->traffic_steering_options.device_server), "eth_server"); + + return 0; } static const char * __str_stat_spec_map[] = diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 92bae92..0c13deb 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -583,13 +583,48 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; enum tfe_conn_dir dir = __bev_dir(_stream, bev); + struct tfe_conn_private * this_conn = NULL; + struct tfe_conn_private * peer_conn = NULL; + struct evbuffer * inbuf = NULL; + struct evbuffer * outbuf = NULL; - UNUSED struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); + if (_stream->proxy_ref->traffic_steering_options.enable) + { + if (bev == _stream->conn_downstream->bev) + { + this_conn = _stream->conn_downstream; + peer_conn = _stream->conn_fake_c; + } + else if (bev == _stream->conn_upstream->bev) + { + this_conn = _stream->conn_upstream; + peer_conn = _stream->conn_fake_s; + } + else + { + assert(0); + } + + inbuf = bufferevent_get_input(bev); + int data_len = evbuffer_get_length(inbuf); + outbuf = bufferevent_get_output(peer_conn->bev); + assert(inbuf != NULL && outbuf != NULL); + evbuffer_add_buffer(outbuf, inbuf); + + // TODO 增加计数 + + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, send %d bytes form %s to %s", + data_len, bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream", + bev == _stream->conn_downstream->bev ? "conn_fake_c" : "conn_fake_s"); + + return; + } + this_conn = __this_conn(_stream, dir); + peer_conn = __peer_conn(_stream, dir); /* Peer connection is terminated, drain all data. * This connection will be destoryed in __event_cb */ - struct evbuffer * inbuf = bufferevent_get_input(bev); + inbuf = bufferevent_get_input(bev); size_t contigous_len = evbuffer_get_length(inbuf); if (peer_conn == NULL) { @@ -603,7 +638,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) _stream->is_first_call_rxcb = 1; } - struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); + outbuf = bufferevent_get_output(peer_conn->bev); assert(inbuf != NULL && outbuf != NULL); enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA; @@ -737,6 +772,14 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; struct ssl_stream ** ref_this_ssl_stream{}; + + if (_stream->proxy_ref->traffic_steering_options.enable) + { + // TODO 增加计数 + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run writecb", bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream"); + return; + } + enum tfe_conn_dir conn_dir = __bev_dir(_stream, bev); if (conn_dir == CONN_DIR_UPSTREAM) @@ -793,6 +836,13 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * enum tfe_conn_dir peer_conn_dir{}; size_t rx_offset = 0; + if (_stream->proxy_ref->traffic_steering_options.enable) + { + // TODO 增加计数 + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run eventcb", bev == _stream->conn_downstream->bev ? "conn_downstream" : "conn_upstream"); + return; + } + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; @@ -976,6 +1026,200 @@ __errout: return NULL; } +static void __steering_stream_bev_readcb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + struct tfe_conn_private * peer_conn = NULL; + + if (bev == _stream->conn_fake_c->bev) + { + peer_conn = _stream->conn_downstream; + } + else if (bev == _stream->conn_fake_s->bev) + { + peer_conn = _stream->conn_upstream; + } + else + { + assert(0); + } + + struct evbuffer * __input_buffer = bufferevent_get_input(bev); + if (peer_conn == NULL) + { + evbuffer_drain(__input_buffer, evbuffer_get_length(__input_buffer)); + return; + } + + // TODO 增加计数 + + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, send %d bytes form %s to %s", + evbuffer_get_length(__input_buffer), bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s", + bev == _stream->conn_fake_c->bev ? "conn_downstream" : "conn_upstream" + ); + + struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); + evbuffer_add_buffer(__output_buffer, __input_buffer); +} + +static void __steering_stream_bev_writecb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + struct tfe_conn_private ** ref_this_conn{}; + struct tfe_conn_private ** ref_peer_conn{}; + + if (bev == _stream->conn_fake_c->bev) + { + ref_this_conn = &_stream->conn_fake_c; + ref_peer_conn = &_stream->conn_downstream; + } + else if (bev == _stream->conn_fake_s->bev) + { + ref_this_conn = &_stream->conn_fake_s; + ref_peer_conn = &_stream->conn_upstream; + } + else + { + assert(0); + } + + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run write cb", bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s"); + + struct evbuffer * __output_buffer = bufferevent_get_output(bev); + assert(__output_buffer != NULL); + + // TODO 资源释放 + // TODO 资源释放 + // TODO 资源释放 + // TODO 资源释放 + + if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0) + { + __conn_private_destory(*ref_this_conn); + *ref_this_conn = NULL; + } + + if (*ref_peer_conn == NULL && *ref_this_conn == NULL) + { + // TODO call_plugin_close(_stream); + tfe_stream_destory(_stream); + } +} + +static void __steering_stream_bev_eventcb(struct bufferevent *bev, short events, void *arg) +{ + struct tfe_stream_private *_stream = (struct tfe_stream_private *)arg; + struct tfe_conn_private **ref_this_conn{}; + struct tfe_conn_private **ref_peer_conn{}; + + if (bev == _stream->conn_fake_c->bev) + { + ref_this_conn = &_stream->conn_fake_c; + ref_peer_conn = &_stream->conn_downstream; + } + else if (bev == _stream->conn_fake_s->bev) + { + ref_this_conn = &_stream->conn_fake_s; + ref_peer_conn = &_stream->conn_upstream; + } + else + { + assert(0); + } + + TFE_LOG_DEBUG(__STREAM_LOGGER(_stream), "decrypted traffic steering, %s run event cb", bev == _stream->conn_fake_c->bev ? "conn_fake_c" : "conn_fake_s"); + + if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) + { + if (evbuffer_get_length(bufferevent_get_input(bev))) + { + __steering_stream_bev_readcb(bev, arg); + } + + if (events & BEV_EVENT_ERROR) + { + unsigned long err; + while ((err = (bufferevent_get_openssl_error(bev)))) + { + const char *msg = (const char *)ERR_reason_error_string(err); + const char *lib = (const char *)ERR_lib_error_string(err); + const char *func = (const char *)ERR_func_error_string(err); + TFE_LOG_INFO(g_default_logger, "%s connection error, bufferevent_get_openssl_error() = %lu: %s %s %s", _stream->str_stream_addr, err, lib, func, msg); + } + + if (errno) + { + TFE_LOG_INFO(g_default_logger, "%s connection error, errno = %d, %s", _stream->str_stream_addr, errno, strerror(errno)); + } + } + + goto __close_connection; + } + + return; + +__close_connection: + // TODO 资源释放 + // TODO 资源释放 + // TODO 资源释放 + // TODO 资源释放 + if (*ref_peer_conn != NULL) + { + struct bufferevent *__peer_bev = (*ref_peer_conn)->bev; + struct evbuffer *__peer_output_buffer = bufferevent_get_output(__peer_bev); + + if (evbuffer_get_length(__peer_output_buffer) == 0) + { + __conn_private_destory(*ref_peer_conn); + *ref_peer_conn = NULL; + } + } + + if (*ref_this_conn != NULL) + { + __conn_private_destory(*ref_this_conn); + *ref_this_conn = NULL; + } + + if (*ref_this_conn == NULL && *ref_peer_conn == NULL) + { + // TODO call_plugin_close(_stream); + tfe_stream_destory(_stream); + } + + return; +} + +static tfe_conn_private *__conn_private_create_by_fake_fd(struct tfe_stream_private *stream, evutil_socket_t fd) +{ + struct tfe_conn_private *__conn_private = ALLOC(struct tfe_conn_private, 1); + __conn_private->fd = fd; + __conn_private->_stream_ref = stream; + __conn_private->bev = bufferevent_socket_new(stream->thread_ref->evbase, fd, BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE); + if (!__conn_private->bev) + { + TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd); + goto __errout; + } + + bufferevent_disable(__conn_private->bev, EV_READ | EV_WRITE); + bufferevent_setcb(__conn_private->bev, __steering_stream_bev_readcb, __steering_stream_bev_writecb, __steering_stream_bev_eventcb, stream); + if (stream->proxy_ref->en_rate_limit) + { + conn_private_ratelimit_setup(__conn_private, &stream->proxy_ref->rate_limit_options); + } + + return __conn_private; + +__errout: + if (__conn_private != NULL) + { + free(__conn_private); + } + + return NULL; +} + void __conn_private_enable(struct tfe_conn_private * conn_private) { assert(conn_private != NULL && conn_private->bev != NULL); @@ -1007,6 +1251,12 @@ void ssl_downstream_create_on_success(future_result_t * result, void * user) __conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_upstream); + if (_stream->proxy_ref->traffic_steering_options.enable) + { + __conn_private_enable(_stream->conn_fake_c); + __conn_private_enable(_stream->conn_fake_s); + } + return; } @@ -1183,6 +1433,28 @@ void tfe_stream_destory(struct tfe_stream_private * stream) __conn_private_destory(stream->conn_downstream); } + if (stream->conn_fake_c) + { + assert(stream->fd_fake_c <= 0); + __conn_private_destory(stream->conn_fake_c); + } + + if (stream->conn_fake_s) + { + assert(stream->fd_fake_s <= 0); + __conn_private_destory(stream->conn_fake_s); + } + + if (stream->fd_fake_c) + { + evutil_closesocket(stream->fd_fake_c); + } + + if (stream->fd_fake_s) + { + evutil_closesocket(stream->fd_fake_s); + } + if (stream->defer_fd_downstream) { evutil_closesocket(stream->defer_fd_downstream); @@ -1458,7 +1730,7 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket } -int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) +int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, evutil_socket_t fd_fake_c, evutil_socket_t fd_fake_s) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); @@ -1466,6 +1738,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst _stream->defer_fd_upstream = fd_upstream; _stream->log_fd_downstream = fd_downstream; _stream->log_fd_upstream = fd_upstream; + _stream->fd_fake_c = fd_fake_c; + _stream->fd_fake_s = fd_fake_s; _stream->head.addr = tfe_stream_addr_create_by_fd(fd_downstream, CONN_DIR_DOWNSTREAM); if (unlikely(_stream->head.addr == NULL)) @@ -1480,6 +1754,35 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst __stream_fd_option_setup(_stream, fd_downstream, CONN_DIR_DOWNSTREAM); __stream_fd_option_setup(_stream, fd_upstream, CONN_DIR_UPSTREAM); + if (_stream->proxy_ref->traffic_steering_options.enable) + { + __stream_fd_option_setup(_stream, fd_fake_s, CONN_DIR_DOWNSTREAM); + __stream_fd_option_setup(_stream, fd_fake_c, CONN_DIR_UPSTREAM); + + _stream->conn_fake_s = __conn_private_create_by_fake_fd(_stream, fd_fake_s); + if (_stream->conn_fake_s == NULL) + { + goto __errout; + } + _stream->fd_fake_s = 0; + + _stream->conn_fake_c = __conn_private_create_by_fake_fd(_stream, fd_fake_c); + if (_stream->conn_fake_c == NULL) + { + goto __errout; + } + _stream->fd_fake_c = 0; + + assert(_stream->conn_fake_s != NULL); + assert(_stream->conn_fake_c != NULL); + + // enable on upsteam and downsteam success + // __conn_private_enable(_stream->conn_fake_s); + // __conn_private_enable(_stream->conn_fake_c); + + // TFE_PROXY_STAT_INCREASE(STAT_STREAM_STEERING, 1); + } + if (_stream->session_type == STREAM_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); @@ -1508,6 +1811,12 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst __conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_upstream); + if (_stream->proxy_ref->traffic_steering_options.enable) + { + __conn_private_enable(_stream->conn_fake_s); + __conn_private_enable(_stream->conn_fake_c); + } + TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_PLAIN, 1); } @@ -1525,6 +1834,7 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1); } + return 0; __errout: