diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 850b390..6eef6f7 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -41,6 +41,7 @@ struct tfe_stream_write_ctx struct tfe_conn_private { + struct tfe_stream_private * _stream_ref; evutil_socket_t fd; struct bufferevent * bev; uint8_t on_writing; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 3fc7479..4d3ac32 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -7,9 +7,26 @@ struct ssl_mgr; struct key_keeper; struct kni_acceptor; + enum TFE_STAT_FIELD -{ +{ STAT_SIGPIPE, + /* FDs */ + STAT_FD_OPEN_BY_KNI_ACCEPT, + STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, + STAT_FD_CLOSE_BY_EVENT_WRITE, + STAT_FD_CLOSE_BY_EVENT_EOF, + STAT_FD_CLOSE_BY_EVENT_ERROR, + /* FDs */ + STAT_FD_INSTANT_CLOSE, + STAT_FD_DEFER_CLOSE_IN_QUEUE, + STAT_FD_DEFER_CLOSE_SUCCESS, + /* Stream */ + STAT_STREAM_CREATE, + STAT_STREAM_DESTROY, + /* Stream Protocol */ + STAT_STREAM_TCP_PLAIN, + STAT_STREAM_TCP_SSL, TFE_STAT_MAX }; @@ -65,10 +82,12 @@ struct tfe_proxy /* PERFOMANCE MONIOTR VARIABLES*/ long long stat_val[TFE_STAT_MAX]; int fs_id[TFE_STAT_MAX]; - }; +extern struct tfe_proxy * g_default_proxy; +#define TFE_PROXY_STAT_INCREASE(field, val) \ +do { __atomic_fetch_add(&g_default_proxy->stat_val[field], val, __ATOMIC_RELAXED); } while(0) struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx); void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx); @@ -76,4 +95,3 @@ void tfe_proxy_thread_ctx_release(struct tfe_thread_ctx * thread_ctx); struct tfe_proxy * tfe_proxy_new(const char * profile); int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para); void tfe_proxy_run(struct tfe_proxy * proxy); - diff --git a/platform/src/kni_acceptor.cpp b/platform/src/kni_acceptor.cpp index 2c4268c..780b0fb 100644 --- a/platform/src/kni_acceptor.cpp +++ b/platform/src/kni_acceptor.cpp @@ -251,6 +251,7 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user) __accept_para.downstream_fd = __fds[0]; __accept_para.upstream_fd = __fds[1]; + TFE_PROXY_STAT_INCREASE(STAT_FD_OPEN_BY_KNI_ACCEPT, 2); if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0) { goto __drop_recieved_fds; @@ -262,6 +263,7 @@ __close_kni_connection: __kni_conn_close(__ctx); __drop_recieved_fds: + TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, 2); evutil_closesocket(__fds[0]); evutil_closesocket(__fds[1]); } diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 65bc02a..7ad4185 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -15,7 +15,6 @@ #include #include - #include #include #include @@ -39,7 +38,7 @@ #include #include -static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1}; +static int signals[] = {SIGHUP, SIGPIPE, SIGUSR1}; /* Global Resource */ void * g_default_logger = NULL; @@ -76,7 +75,7 @@ struct tfe_thread_ctx * tfe_proxy_thread_ctx_acquire(struct tfe_proxy * ctx) unsigned int min_thread_id = 0; unsigned int min_load = 0; - for(unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) + for (unsigned int tid = 0; tid < ctx->nr_work_threads; tid++) { struct tfe_thread_ctx * thread_ctx = ctx->work_threads[tid]; min_thread_id = min_load > thread_ctx->load ? tid : min_thread_id; @@ -97,7 +96,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p tfe_thread_ctx * worker_thread_ctx = tfe_proxy_thread_ctx_acquire(ctx); struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); - tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, ¶->session_type, sizeof(para->session_type)); + tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, ¶->session_type, sizeof(para->session_type)); tfe_stream_option_set(stream, TFE_STREAM_OPT_KEYRING_ID, ¶->keyring_id, sizeof(para->keyring_id)); /* FOR DEBUG */ @@ -107,14 +106,15 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p enum tfe_stream_proto __session_type = STREAM_PROTO_PLAIN; tfe_stream_option_set(stream, TFE_STREAM_OPT_PASSTHROUGH, &__true, sizeof(__true)); - tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); + tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); } int ret = tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd); if (ret < 0) { TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.", - stream, para->downstream_fd, para->upstream_fd, para->session_type); goto __errout; + stream, para->downstream_fd, para->upstream_fd, para->session_type); + goto __errout; } else { @@ -125,6 +125,7 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p return 0; __errout: + if(stream != NULL) tfe_stream_destory((struct tfe_stream_private *)stream); return -1; } @@ -151,16 +152,13 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) { case SIGTERM: case SIGQUIT: - case SIGHUP: - break; - case SIGUSR1: - break; + case SIGHUP: break; + case SIGUSR1: break; case SIGPIPE: - ATOMIC_INC(&(ctx->stat_val[STAT_SIGPIPE])); + TFE_PROXY_STAT_INCREASE(STAT_SIGPIPE, 1); TFE_LOG_ERROR(ctx->logger, "Warning: Received SIGPIPE; ignoring.\n"); break; - default: - TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd); + default: TFE_LOG_ERROR(ctx->logger, "Warning: Received unexpected signal %i\n", fd); break; } } @@ -168,8 +166,8 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) { tfe_proxy * ctx = (tfe_proxy *) arg; - int i=0; - for(i=0;ifs_handle, ctx->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(ctx->stat_val[i]))); } @@ -195,23 +193,22 @@ static void * tfe_work_thread(void * arg) __currect_thread_id = ctx->thread_id; char thread_name[16]; snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); - prctl(PR_SET_NAME,(unsigned long long)thread_name,NULL,NULL,NULL); + prctl(PR_SET_NAME, (unsigned long long) thread_name, NULL, NULL, NULL); TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id); event_base_dispatch(ctx->evbase); assert(0); event_free(ev); TFE_LOG_ERROR(g_default_logger, "Work thread %u is exit...", ctx->thread_id); - return (void *)NULL; + return (void *) NULL; } - void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) { - unsigned int i=0; - for(i=0; inr_work_threads;i++) + unsigned int i = 0; + for (i = 0; i < proxy->nr_work_threads; i++) { - proxy->work_threads[i]=ALLOC(struct tfe_thread_ctx, 1); + proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1); proxy->work_threads[i]->thread_id = i; proxy->work_threads[i]->evbase = event_base_new(); } @@ -219,16 +216,17 @@ void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) } int tfe_proxy_work_thread_run(struct tfe_proxy * proxy) { - struct tfe_thread_ctx * __thread_ctx=NULL; - unsigned int i=0; - int ret=0; - for(i=0; inr_work_threads;i++) + struct tfe_thread_ctx * __thread_ctx = NULL; + unsigned int i = 0; + int ret = 0; + for (i = 0; i < proxy->nr_work_threads; i++) { - __thread_ctx=proxy->work_threads[i]; - ret = pthread_create(&__thread_ctx->thr, NULL, tfe_work_thread, (void *)__thread_ctx); + __thread_ctx = proxy->work_threads[i]; + ret = pthread_create(&__thread_ctx->thr, NULL, tfe_work_thread, (void *) __thread_ctx); if (unlikely(ret < 0)) { - TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, strerror(errno)); + TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, + strerror(errno)); return -1; } } @@ -251,35 +249,56 @@ int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) return 0; } + +static const char * __str_stat_spec_map[] = + { + [STAT_SIGPIPE] = "SIGPIPE", + [STAT_FD_OPEN_BY_KNI_ACCEPT] = "FdOpenKNI", + [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "FdCloseKNI", + [STAT_FD_CLOSE_BY_EVENT_WRITE] = "FdCloseUser", + [STAT_FD_CLOSE_BY_EVENT_EOF] = "FdCloseEOF", + [STAT_FD_CLOSE_BY_EVENT_ERROR] = "FdCloseError", + [STAT_FD_INSTANT_CLOSE] = "FdCloseInstant", + [STAT_FD_DEFER_CLOSE_IN_QUEUE] = "FdCloseDeferInQ", + [STAT_FD_DEFER_CLOSE_SUCCESS] = "FdCloseDeferSuc", + [STAT_STREAM_CREATE] = "StreamCreate", + [STAT_STREAM_DESTROY] = "StreamDestroy", + [STAT_STREAM_TCP_PLAIN] = "StreamTCPPlain", + [STAT_STREAM_TCP_SSL] = "StreamTCPSSL", + [TFE_STAT_MAX] = NULL + }; + int tfe_stat_init(struct tfe_proxy * proxy, const char * profile) { - const char* fieldstat_output="./tfe.fieldstat"; - const char* app_name="tfe3a"; - int value=0, i=0; - screen_stat_handle_t fs_handle=NULL; - fs_handle=FS_create_handle(); - FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1); - value=1; + static const char * fieldstat_output = "./tfe.fieldstat"; + static const char * app_name = "tfe3a"; + + int value = 0, i = 0; + screen_stat_handle_t fs_handle = NULL; + + /* TODO: Read Stat-D server and port from profile */ + fs_handle = FS_create_handle(); + FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, (int)strlen(fieldstat_output) + 1); + FS_set_para(fs_handle, APP_NAME, app_name, (int)strlen(app_name) + 1); + + value = 1; FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value)); - value=0; + value = 0; FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value)); - FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1); - const char* spec[TFE_STAT_MAX]; - spec[STAT_SIGPIPE]="sigpipe"; - - for(i=0;ifs_id[i]=FS_register(fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT,spec[i]); - } + proxy->fs_id[i] = FS_register(fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, __str_stat_spec_map[i]); + } + FS_start(fs_handle); - proxy->fs_handle=fs_handle; + proxy->fs_handle = fs_handle; return 0; } -int main(int argc, char *argv[]) +int main(int argc, char * argv[]) { - const char* main_profile="./conf/tfe.conf"; + const char * main_profile = "./conf/tfe.conf"; g_default_logger = MESA_create_runtime_log_handle("log/tfe.log", RLOG_LV_DEBUG); if (unlikely(g_default_logger == NULL)) @@ -290,7 +309,7 @@ int main(int argc, char *argv[]) future_promise_library_init(); tango_cache_global_init(); - + /* PROXY INSTANCE */ g_default_proxy = ALLOC(struct tfe_proxy, 1); assert(g_default_proxy); @@ -301,12 +320,12 @@ int main(int argc, char *argv[]) /* PERFOMANCE MONITOR */ tfe_stat_init(g_default_proxy, main_profile); - + /* LOGGER */ g_default_proxy->logger = g_default_logger; - /* adds locking, only required if accessed from separate threads */ - evthread_use_pthreads(); + /* adds locking, only required if accessed from separate threads */ + evthread_use_pthreads(); /* MAIN THREAD EVBASE */ g_default_proxy->evbase = event_base_new(); @@ -317,23 +336,22 @@ int main(int argc, char *argv[]) CHECK_OR_EXIT(g_default_proxy->gcev, "Failed at creating GC event. Exit. "); /* SSL INIT */ - g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", - g_default_proxy->evbase, g_default_logger); + g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", g_default_proxy->evbase, g_default_logger); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); - for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) - { - g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); - CHECK_OR_EXIT( g_default_proxy->sev[i], "Failed at create signal event. Exit."); - evsignal_add(g_default_proxy->sev[i], NULL); - } + for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) + { + g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); + CHECK_OR_EXIT(g_default_proxy->sev[i], "Failed at create signal event. Exit."); + evsignal_add(g_default_proxy->sev[i], NULL); + } struct timeval gc_delay = {2, 0}; - evtimer_add(g_default_proxy->gcev , &gc_delay); + evtimer_add(g_default_proxy->gcev, &gc_delay); /* WORKER THREAD CTX Create */ tfe_proxy_work_thread_create_ctx(g_default_proxy); - + /* ACCEPTOR INIT */ g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); @@ -341,17 +359,17 @@ int main(int argc, char *argv[]) /* PLUGIN INIT */ unsigned int plugin_iterator = 0; - for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); - plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) + for (struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); + plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) { ret = plugin_iter->on_init(g_default_proxy); CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol); TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); } - ret=tfe_proxy_work_thread_run(g_default_proxy); - CHECK_OR_EXIT(ret==0, "Failed at creating thread. Exit."); - + ret = tfe_proxy_work_thread_run(g_default_proxy); + CHECK_OR_EXIT(ret == 0, "Failed at creating thread. Exit."); + TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); event_base_dispatch(g_default_proxy->evbase); @@ -370,7 +388,7 @@ unsigned int tfe_proxy_get_work_thread_count(void) struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) { - assert(thread_idnr_work_threads); + assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->evbase; } struct event_base * tfe_proxy_get_gc_evbase(void) diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index 5a0fca2..db6d27b 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -100,7 +100,8 @@ struct session_ticket_key unsigned char name[16]; unsigned char hmac_key[32]; unsigned char aes_key[32]; -} ; +}; + struct ssl_mgr { unsigned int sslcomp; @@ -299,6 +300,7 @@ void ssl_stat_init(struct ssl_mgr * mgr) FS_STYLE_STATUS, FS_CALC_CURRENT, "usess_hit"); + value=mgr->fs_id[SSL_DOWN_CACHE_HIT]; FS_set_para(mgr->fs_handle, ID_INVISBLE, &value, sizeof(value)); value=mgr->fs_id[SSL_DOWN_CACHE_QUERY]; @@ -311,6 +313,7 @@ void ssl_stat_init(struct ssl_mgr * mgr) FS_STYLE_STATUS, FS_CALC_CURRENT, "dsess_hit"); + if(!mgr->no_sessticket) { value=mgr->fs_id[SSL_DOWN_TIKCET_QUERY]; @@ -1567,7 +1570,6 @@ static void ssl_shutdown_ctx_free(struct ssl_shutdown_ctx * ctx) static void pxy_ssl_shutdown_cb(evutil_socket_t fd, short what, void * arg) { struct ssl_shutdown_ctx * ctx = (struct ssl_shutdown_ctx *) arg; - struct timeval retry_delay = {0, 100}; void * logger = ctx->s_stream->mgr->logger; @@ -1581,6 +1583,8 @@ static void pxy_ssl_shutdown_cb(evutil_socket_t fd, short what, void * arg) ctx->ev = NULL; } + TFE_PROXY_STAT_INCREASE(STAT_FD_DEFER_CLOSE_IN_QUEUE, 1); + /* * Use the new (post-2008) semantics for SSL_shutdown() on a * non-blocking socket. SSL_shutdown() returns -1 and WANT_READ @@ -1617,7 +1621,6 @@ static void pxy_ssl_shutdown_cb(evutil_socket_t fd, short what, void * arg) goto complete; retry: - if (ctx->retries++ >= MAX_NET_RETRIES) { /* @@ -1653,8 +1656,9 @@ retry: "Cannot create event. Closing fd %d.", fd); } return; -complete: +complete: + TFE_PROXY_STAT_INCREASE(STAT_FD_DEFER_CLOSE_SUCCESS, 1); ssl_stream_free(ctx->s_stream); evutil_closesocket(fd); ssl_shutdown_ctx_free(ctx); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 5811080..27e4799 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -149,7 +149,7 @@ void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) ret = bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); assert(ret == 0); - (void)ret; + (void) ret; } void tfe_stream_resume(const struct tfe_stream * stream) @@ -313,6 +313,8 @@ static void __conn_private_destory(struct tfe_conn_private * conn) if (conn->fd > 0) evutil_closesocket(conn->fd); free(conn); + + TFE_PROXY_STAT_INCREASE(STAT_FD_INSTANT_CLOSE, 1); } static void __conn_private_destory_with_ssl(struct event_base * ev_base, @@ -324,12 +326,12 @@ static void __conn_private_destory_with_ssl(struct event_base * ev_base, return __conn_private_destory(conn); } -static void __stream_bev_downstream_statcb(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg) +static void __stream_bev_downstream_statcb(struct evbuffer * buffer, const struct evbuffer_cb_info * info, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; } -static void __stream_bev_upstream_statcb(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg) +static void __stream_bev_upstream_statcb(struct evbuffer * buffer, const struct evbuffer_cb_info * info, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; } @@ -465,6 +467,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) } struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); + assert(inbuf != NULL && outbuf != NULL); + enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA; enum tfe_stream_action action_final = ACTION_FORWARD_DATA; @@ -573,7 +577,6 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) ref_this_conn = &_stream->conn_upstream; ref_this_ssl_stream = &_stream->ssl_upstream; ref_peer_conn = &_stream->conn_downstream; - __str_dir = "up"; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) @@ -581,20 +584,20 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) ref_this_conn = &_stream->conn_downstream; ref_this_ssl_stream = &_stream->ssl_downstream; ref_peer_conn = &_stream->conn_upstream; - __str_dir = "down"; } struct evbuffer * __output_buffer = bufferevent_get_output(bev); assert(__output_buffer != NULL); - if (*ref_peer_conn == NULL && (*ref_this_conn)->on_writing == 0 && evbuffer_get_length(__output_buffer) == 0) + if (*ref_peer_conn == NULL /* Peer connection is closed */ + && (*ref_this_conn)->on_writing == 0 /* No body is prepare to write data, eg. No body call stream_write */ + && evbuffer_get_length(__output_buffer) == 0) /* Nothing is in send queue */ { + TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_WRITE, 1); + __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); *ref_this_conn = NULL; *ref_this_ssl_stream = NULL; - - // fprintf(stderr, "---- writecb ----, close this connection, " - // "stream = %p, dir = %s\n", _stream, __str_dir); } if (*ref_peer_conn == NULL && *ref_this_conn == NULL) @@ -602,8 +605,6 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) call_plugin_close(_stream); tfe_stream_destory(_stream); } - - return; } /* @@ -627,7 +628,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * ref_peer_conn = &_stream->conn_downstream; ref_this_ssl_stream = &_stream->ssl_upstream; ref_peer_ssl_stream = &_stream->ssl_downstream; - __str_dir = "up"; } if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) @@ -636,7 +636,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * ref_peer_conn = &_stream->conn_upstream; ref_this_ssl_stream = &_stream->ssl_downstream; ref_peer_ssl_stream = &_stream->ssl_upstream; - __str_dir = "down"; } if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) @@ -646,6 +645,9 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * __stream_bev_readcb(bev, arg); } + if (events & BEV_EVENT_ERROR) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_ERROR, 1); } + if (events & BEV_EVENT_EOF) { TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE_BY_EVENT_EOF, 1); } + goto __close_connection; } @@ -667,9 +669,6 @@ __close_connection: if (*ref_this_conn != NULL) { -// fprintf(stderr, "---- eventcb ----, close this connection, " -// "stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); - __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); *ref_this_conn = NULL; *ref_this_ssl_stream = NULL; @@ -692,6 +691,7 @@ static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); struct event_base * __ev_base = stream->thread_ref->evbase; + __conn_private->_stream_ref = stream; __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS); __conn_private->fd = fd; @@ -747,7 +747,7 @@ void ssl_downstream_create_on_success(future_result_t * result, void * user) void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user) { - struct tfe_stream_private * _stream = (struct tfe_stream_private *)user; + struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL); TFE_STREAM_LOG_ERROR(_stream, "Failed to create SSL downstream, close the connection : %s. ", what); @@ -787,7 +787,7 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user) void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) { - struct tfe_stream_private * _stream = (struct tfe_stream_private *)user; + struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; assert(_stream != NULL && _stream->session_type == STREAM_PROTO_SSL); TFE_STREAM_LOG_ERROR(_stream, "Failed to create SSL upstream, pass-through the connection : %s. ", what); @@ -807,6 +807,8 @@ void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, voi struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) { struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); + TFE_PROXY_STAT_INCREASE(STAT_STREAM_CREATE, 1); + _stream->thread_ref = thread_ctx; _stream->proxy_ref = pxy; _stream->stream_logger = pxy->logger; @@ -830,6 +832,7 @@ void tfe_stream_destory(struct tfe_stream_private * stream) struct tfe_thread_ctx * thread = stream->thread_ref; struct event_base * ev_base = thread->evbase; + TFE_PROXY_STAT_INCREASE(STAT_STREAM_DESTROY, 1); __stream_access_log_write(stream); if (stream->str_stream_addr) @@ -1041,7 +1044,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst if (unlikely(_stream->head.addr == NULL)) { TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", - fd_downstream, fd_upstream); goto __errout; + fd_downstream, fd_upstream); + goto __errout; } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); @@ -1050,11 +1054,22 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); _stream->conn_upstream = __conn_private_create_by_fd(_stream, fd_upstream); + /* Defer FD has been transfer to conn_downstream/conn_upstream */ + _stream->defer_fd_downstream = 0; + _stream->defer_fd_upstream = 0; + + if (unlikely(_stream->conn_downstream == NULL || _stream->conn_upstream == NULL)) + { + goto __errout; + } + assert(_stream->conn_downstream != NULL); assert(_stream->conn_upstream != NULL); __conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_upstream); + + TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_PLAIN, 1); } if (_stream->session_type == STREAM_PROTO_SSL) @@ -1067,6 +1082,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst /* Defer setup conn_downstream & conn_upstream in async callbacks. */ ssl_async_upstream_create(_stream->future_upstream_create, _stream->ssl_mgr, fd_upstream, fd_downstream, ev_base); + + TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1); } return 0;