diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 8718c71..85e0087 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -107,6 +107,13 @@ char *tfe_thread_safe_ctime(const time_t *tp, char *buf, int len); #define TFE_PTR_DIFF(ptr1, ptr2) ((uintptr_t)(ptr1) - (uintptr_t)(ptr2)) #define TFE_DIM(x) (sizeof (x) / sizeof ((x)[0])) +#ifndef TAILQ_FOREACH_SAFE +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST((head)); \ + (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ + (var) = (tvar)) +#endif + #include static inline void tfe_hexdump2file(FILE *f, const char * title, const void * buf, unsigned int len) diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 7234d44..f7a7052 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -355,6 +355,7 @@ unsigned int tfe_proxy_get_work_thread_count(void) { return g_default_proxy->nr_work_threads; } + struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) { assert(thread_idnr_work_threads); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index ba824c1..3d29b11 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -80,6 +80,28 @@ static inline bool __is_ssl(struct tfe_stream_private * _stream) return (_stream->session_type == STREAM_PROTO_SSL); } +static void __call_plugin_close(struct tfe_stream_private * _stream) +{ + unsigned int plugin_id_iter = 0; + unsigned int plugin_id = 0; + + for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter); + p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter)) + { + _stream->calling_idx = plugin_id; + struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id]; + + /* TODO: do not use pme to determinate we call on_open or not ever. */ + if (p_info_iter->on_close && plug_ctx->pme != NULL) + { + p_info_iter->on_close(&_stream->head, _stream->thread_ref->thread_id, + REASON_PASSIVE_CLOSED, &(plug_ctx->pme)); + } + + plugin_id++; + } +} + /* ==================================================================================================================== * INTERFACE * ===================================================================================================================*/ @@ -341,6 +363,7 @@ static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * ar if (*ref_peer_conn == NULL && *ref_this_conn == NULL) { + __call_plugin_close(_stream); tfe_stream_destory(_stream); } @@ -398,6 +421,7 @@ __close_connection: if (*ref_this_conn == NULL && *ref_peer_conn == NULL) { + __call_plugin_close(_stream); tfe_stream_destory(_stream); } @@ -553,6 +577,7 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) if (*ref_peer_conn == NULL && *ref_this_conn == NULL) { + __call_plugin_close(_stream); tfe_stream_destory(_stream); } @@ -637,25 +662,6 @@ __close_connection: return; __call_plugin_close: - unsigned int plugin_id_iter = 0; - unsigned int plugin_id = 0; - - for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter); - p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter)) - { - _stream->calling_idx = plugin_id; - struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id]; - - //TODO: do not use pme to determinate we call on_open or not ever. - if (p_info_iter->on_close && plug_ctx->pme != NULL) - { - p_info_iter->on_close(&_stream->head, _stream->thread_ref->thread_id, - REASON_PASSIVE_CLOSED, &(plug_ctx->pme)); - } - - plugin_id++; - } - tfe_stream_destory(_stream); } diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 85b5673..56fa65f 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -9,13 +9,19 @@ extern "C" #include #include +TAILQ_HEAD(hs_private_list, http_session_private); + struct http_plugin { - + /* Garbage List for HTTP-SESSION object. + * these objects can not be destroyed when TCP connection terminated. */ + struct hs_private_list gc_list_hs_private[TFE_THREAD_MAX]; + /* Lock for garbage list */ + pthread_mutex_t lock_list_hs_private[TFE_THREAD_MAX]; + /* GC events */ + struct event * gc_event_hs_private[TFE_THREAD_MAX]; }; -TAILQ_HEAD(hs_private_list, http_session_private); - struct http_plugin_status { @@ -41,6 +47,12 @@ struct http_session_private tfe_http_event suspend_event; /* SUSPEND TAG EFFECTIVE */ bool suspend_tag_effective; + /* STREAM WRITE EFFECTIVE */ + bool stream_write_tag_effective; + /* RELEASE LOCK, when the tag is zero, the session can be destroyed */ + int release_lock; + /* thread id */ + unsigned int thread_id; }; struct http_connection_private diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 37da9a6..665b764 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -118,10 +118,11 @@ void hf_private_set_callback(struct http_half_private * hf_private, hf_private_c void hf_private_set_session(struct http_half_private * hf_private, struct http_session_private * hs_private); -struct http_session_private * hs_private_create(struct http_connection_private * hc_private, - struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp); +struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id, + struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp); -void hs_private_destory(struct http_session_private * hs_private); +void hs_private_destroy(struct http_session_private * hs_private); +void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list); void hs_private_hf_private_set(struct http_session_private * hs_private, struct http_half_private * hf, enum tfe_http_direction); diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index fa84675..2ec4348 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -5,17 +5,90 @@ #include #include #include +#include #include #include #include #include +#include struct http_plugin __g_http_plugin; struct http_plugin * g_http_plugin = &__g_http_plugin; +struct session_gc_cb_closure +{ +#ifndef NDEBUG + unsigned int __magic__; +#endif + struct http_plugin * plugin; + unsigned int thread_id; +}; + +static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + struct session_gc_cb_closure * closure = (struct session_gc_cb_closure *) arg; + assert(closure->__magic__ == 0x8021); + + struct http_plugin * plugin_ctx = closure->plugin; + struct hs_private_list * gc_list_hs_private = &plugin_ctx->gc_list_hs_private[closure->thread_id]; + + struct http_session_private * hs_private_iter = NULL; + struct http_session_private * hs_private_titer = NULL; + + TAILQ_FOREACH_SAFE(hs_private_iter, gc_list_hs_private, next, hs_private_titer) + { + assert(hs_private_iter->release_lock >= 0); + if(hs_private_iter->release_lock > 0) continue; + + TAILQ_REMOVE(gc_list_hs_private, hs_private_iter, next); + + /* Call the http frame to raise END event */ + if (hs_private_iter->ht_frame) + { + struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); + http_frame_raise_session_end(hs_private_iter->ht_frame, NULL, hs_public, hs_private_iter->thread_id); + hs_private_iter->ht_frame = NULL; + } + + hs_private_destroy(hs_private_iter); + fprintf(stderr, "---- http_plugin_session_gc_cb, close session by GC\n, %p", hs_private_iter); + } +} + int http_plugin_init(struct tfe_proxy * proxy) { + unsigned int nr_work_thread = tfe_proxy_get_work_thread_count(); + struct http_plugin * plugin_ctx = g_http_plugin; + + for (unsigned int thread_id = 0; thread_id < nr_work_thread; thread_id++) + { +#ifndef NDEBUG + pthread_mutex_init(&plugin_ctx->lock_list_hs_private[thread_id], NULL); +#endif + struct event_base * ev_base = tfe_proxy_get_work_thread_evbase(thread_id); + struct session_gc_cb_closure * closure = ALLOC(struct session_gc_cb_closure, 1); + +#ifndef NDEBUG + closure->__magic__ = 0x8021; +#endif + closure->plugin = plugin_ctx; + closure->thread_id = thread_id; + + /* TODO: Load GC delay from configure files */ + struct timeval gc_delay = {0, 500 * 1000}; + struct event * gc_event = event_new(ev_base, -1, EV_PERSIST, http_plugin_session_gc_cb, closure); + + if (unlikely(gc_event == NULL)) + { + /* TODO: write a log */ + assert(0); + } + + evtimer_add(gc_event, &gc_delay); + plugin_ctx->gc_event_hs_private[thread_id] = gc_event; + } + return 0; } @@ -28,7 +101,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th enum tfe_conn_dir dir, void ** pme) { struct http_connection_private * ht_conn = ALLOC( - struct http_connection_private, 1); + struct http_connection_private, 1); TAILQ_INIT(&ht_conn->hs_private_list); TAILQ_INIT(&ht_conn->hs_private_orphan_list); ht_conn->stream = stream; @@ -139,11 +212,11 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_ if (hf_private_req_user->message_status == STATUS_COMPLETE) { hf_private_destory(hf_private_req_user); - hs_private->hf_private_resp_user = NULL; + hs_private->hf_private_req_user = NULL; } assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA - || hf_private_req_in->stream_action == ACTION_DROP_DATA); + || hf_private_req_in->stream_action == ACTION_DROP_DATA); hf_private_req_in->stream_action = ACTION_DROP_DATA; } @@ -172,7 +245,7 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_ } assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA - || hf_private_req_in->stream_action == ACTION_DROP_DATA); + || hf_private_req_in->stream_action == ACTION_DROP_DATA); hf_private_req_in->stream_action = ACTION_DROP_DATA; } @@ -201,12 +274,12 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea { /* HTTP Request and Session */ hf_private_req_in = hf_private_create(TFE_HTTP_REQUEST, 1, 0); - hs_private = hs_private_create(hc_private, hf_private_req_in, NULL); + hs_private = hs_private_create(hc_private, thread_id, hf_private_req_in, NULL); hf_private_set_session(hf_private_req_in, hs_private); /* Closure, catch stream, session and thread_id */ struct user_event_dispatch_closure * __closure = ALLOC( - struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -241,7 +314,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea goto __errout; } - if(hf_private_req_in->is_user_stream_action_set) + if (hf_private_req_in->is_user_stream_action_set) { hf_private_req_in->stream_action = hf_private_req_in->user_stream_action; } @@ -249,7 +322,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea { hf_private_req_in->stream_action = ACTION_FORWARD_DATA; } - + /* Ignore parse the content which is nullptr. */ goto __boundary; } @@ -319,7 +392,9 @@ __out: { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - hs_private_destory(hs_private); + + hs_private->ht_frame = NULL; + hs_private_destroy(hs_private); } return __action; @@ -362,7 +437,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre /* Closure, catch stream, session and thread_id */ struct user_event_dispatch_closure * __closure = ALLOC( - struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -439,12 +514,14 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); } - /* Nothing to do, everything is over, destroy the session */ + /* Nothing to do, everything is over, destroy the session */ else { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - hs_private_destory(hs_private); + + hs_private->ht_frame = NULL; + hs_private_destroy(hs_private); } } @@ -516,21 +593,45 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int enum tfe_stream_close_reason reason, void ** pme) { struct http_connection_private * ht_conn = (struct http_connection_private *) (*pme); - *pme = NULL; + struct http_plugin * plugin_ctx = g_http_plugin; - /* Delete all live sessions */ - while (true) + struct http_session_private * hs_private_iter = NULL; + struct http_session_private * hs_private_titer = NULL; + + TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_list, next, hs_private_titer) { - struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list); - if (hs_private_iter == NULL) break; - TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next); - hs_private_destory(hs_private_iter); + + /* Call the http frame to raise END event */ + if (hs_private_iter->ht_frame) + { + struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); + http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id); + hs_private_iter->ht_frame = NULL; + } + + hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]); + } + + TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_orphan_list, next, hs_private_titer) + { + TAILQ_REMOVE(&ht_conn->hs_private_orphan_list, hs_private_iter, next); + + if (hs_private_iter->ht_frame) + { + struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); + http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id); + hs_private_iter->ht_frame = NULL; + } + + hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]); } /* Clear session counter, and free ht_conn structure */ ht_conn->session_id_counter = 0; free(ht_conn); + + *pme = NULL; } static struct tfe_plugin __http_plugin_info = @@ -544,5 +645,4 @@ static struct tfe_plugin __http_plugin_info = .on_close = http_connection_entry_close }; -TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info -) +TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info) diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index b41b026..a916113 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -588,6 +588,8 @@ int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, in int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) { struct http_half_private * hf_private = to_hf_private(half); + struct http_session_private * hs_private = hf_private->session; + assert(hf_private->evbuf_body == NULL); if (by_stream) @@ -602,6 +604,9 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE; hf_private->is_setup_by_stream = true; + + hs_private->stream_write_tag_effective = true; + hs_private->release_lock++; } hf_private->evbuf_body = evbuffer_new(); @@ -649,6 +654,8 @@ __out: int hf_ops_body_end(struct tfe_http_half * half) { struct http_half_private * hf_private = to_hf_private(half); + struct http_session_private * hs_private = hf_private->session; + if (hf_private->write_ctx) { tfe_stream_write_frag_end(hf_private->write_ctx); @@ -658,7 +665,13 @@ int hf_ops_body_end(struct tfe_http_half * half) hf_private->body_status = STATUS_COMPLETE; hf_private->message_status = STATUS_COMPLETE; -// printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); + if(hf_private->is_setup_by_stream) + { + hs_private->stream_write_tag_effective = true; + hs_private->release_lock++; + } + + printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); return 0; } @@ -800,6 +813,7 @@ void hs_ops_suspend(struct tfe_http_session * session) { struct http_session_private * hs_private = to_hs_private(session); hs_private->suspend_tag_user = true; + hs_private->release_lock++; } void hs_ops_resume(struct tfe_http_session * session) @@ -808,6 +822,7 @@ void hs_ops_resume(struct tfe_http_session * session) struct http_connection_private * hc_private = hs_private->hc_private; hs_private->suspend_tag_user = false; + hs_private->release_lock--; tfe_stream_resume(hc_private->stream); } @@ -1024,10 +1039,11 @@ void hf_private_construct(struct http_half_private * hf_private) return; } -struct http_session_private * hs_private_create(struct http_connection_private * hc_private, - struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) +struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id, + struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) { struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1); + __hs_private->thread_id = thread_id; /* HS-PUBLIC */ __hs_private->hs_public.ops = &__http_session_ops; @@ -1085,12 +1101,43 @@ void __write_access_log(struct http_session_private * hs_private) free(__access_log); } -void hs_private_destory(struct http_session_private * hs_private) +void hs_private_destroy(struct http_session_private * hs_private) { + struct http_half_private * hf_req = to_hf_request_private(hs_private); + struct http_half_private * hf_resp = to_hf_response_private(hs_private); + __write_access_log(hs_private); + + if (hf_req != NULL) + { + hf_private_destory(hf_req); + } + + if (hf_resp != NULL) + { + hf_private_destory(hf_resp); + } + + if (hs_private->hf_private_req_user) + { + hf_private_destory(hs_private->hf_private_req_user); + } + + if (hs_private->hf_private_resp_user) + { + hf_private_destory(hs_private->hf_private_resp_user); + } + + assert(hs_private->ht_frame == NULL); free(hs_private); } +void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list) +{ + if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next); + else return hs_private_destroy(hs_private); +} + void hs_private_hf_private_set(struct http_session_private * hs_private, struct http_half_private * hf, enum tfe_http_direction direction) {