增加HTTP Session延迟回收机制,完善HTTP Session销毁流程

This commit is contained in:
Lu Qiuwen
2018-10-22 21:22:59 +08:00
parent 0f31b948ba
commit bcfe14055f
7 changed files with 224 additions and 50 deletions

View File

@@ -5,17 +5,90 @@
#include <tfe_stream.h>
#include <http_parser.h>
#include <malloc.h>
#include <pthread.h>
#include <http_common.h>
#include <http_half.h>
#include <assert.h>
#include <event.h>
#include <tfe_proxy.h>
struct http_plugin __g_http_plugin;
struct http_plugin * g_http_plugin = &__g_http_plugin;
struct session_gc_cb_closure
{
#ifndef NDEBUG
unsigned int __magic__;
#endif
struct http_plugin * plugin;
unsigned int thread_id;
};
static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg)
{
struct session_gc_cb_closure * closure = (struct session_gc_cb_closure *) arg;
assert(closure->__magic__ == 0x8021);
struct http_plugin * plugin_ctx = closure->plugin;
struct hs_private_list * gc_list_hs_private = &plugin_ctx->gc_list_hs_private[closure->thread_id];
struct http_session_private * hs_private_iter = NULL;
struct http_session_private * hs_private_titer = NULL;
TAILQ_FOREACH_SAFE(hs_private_iter, gc_list_hs_private, next, hs_private_titer)
{
assert(hs_private_iter->release_lock >= 0);
if(hs_private_iter->release_lock > 0) continue;
TAILQ_REMOVE(gc_list_hs_private, hs_private_iter, next);
/* Call the http frame to raise END event */
if (hs_private_iter->ht_frame)
{
struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
http_frame_raise_session_end(hs_private_iter->ht_frame, NULL, hs_public, hs_private_iter->thread_id);
hs_private_iter->ht_frame = NULL;
}
hs_private_destroy(hs_private_iter);
fprintf(stderr, "---- http_plugin_session_gc_cb, close session by GC\n, %p", hs_private_iter);
}
}
int http_plugin_init(struct tfe_proxy * proxy)
{
unsigned int nr_work_thread = tfe_proxy_get_work_thread_count();
struct http_plugin * plugin_ctx = g_http_plugin;
for (unsigned int thread_id = 0; thread_id < nr_work_thread; thread_id++)
{
#ifndef NDEBUG
pthread_mutex_init(&plugin_ctx->lock_list_hs_private[thread_id], NULL);
#endif
struct event_base * ev_base = tfe_proxy_get_work_thread_evbase(thread_id);
struct session_gc_cb_closure * closure = ALLOC(struct session_gc_cb_closure, 1);
#ifndef NDEBUG
closure->__magic__ = 0x8021;
#endif
closure->plugin = plugin_ctx;
closure->thread_id = thread_id;
/* TODO: Load GC delay from configure files */
struct timeval gc_delay = {0, 500 * 1000};
struct event * gc_event = event_new(ev_base, -1, EV_PERSIST, http_plugin_session_gc_cb, closure);
if (unlikely(gc_event == NULL))
{
/* TODO: write a log */
assert(0);
}
evtimer_add(gc_event, &gc_delay);
plugin_ctx->gc_event_hs_private[thread_id] = gc_event;
}
return 0;
}
@@ -28,7 +101,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th
enum tfe_conn_dir dir, void ** pme)
{
struct http_connection_private * ht_conn = ALLOC(
struct http_connection_private, 1);
struct http_connection_private, 1);
TAILQ_INIT(&ht_conn->hs_private_list);
TAILQ_INIT(&ht_conn->hs_private_orphan_list);
ht_conn->stream = stream;
@@ -139,11 +212,11 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_
if (hf_private_req_user->message_status == STATUS_COMPLETE)
{
hf_private_destory(hf_private_req_user);
hs_private->hf_private_resp_user = NULL;
hs_private->hf_private_req_user = NULL;
}
assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA
|| hf_private_req_in->stream_action == ACTION_DROP_DATA);
|| hf_private_req_in->stream_action == ACTION_DROP_DATA);
hf_private_req_in->stream_action = ACTION_DROP_DATA;
}
@@ -172,7 +245,7 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_
}
assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA
|| hf_private_req_in->stream_action == ACTION_DROP_DATA);
|| hf_private_req_in->stream_action == ACTION_DROP_DATA);
hf_private_req_in->stream_action = ACTION_DROP_DATA;
}
@@ -201,12 +274,12 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
{
/* HTTP Request and Session */
hf_private_req_in = hf_private_create(TFE_HTTP_REQUEST, 1, 0);
hs_private = hs_private_create(hc_private, hf_private_req_in, NULL);
hs_private = hs_private_create(hc_private, thread_id, hf_private_req_in, NULL);
hf_private_set_session(hf_private_req_in, hs_private);
/* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(
struct user_event_dispatch_closure, 1);
struct user_event_dispatch_closure, 1);
__closure->thread_id = thread_id;
__closure->stream = stream;
__closure->session = to_hs_public(hs_private);
@@ -241,7 +314,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
goto __errout;
}
if(hf_private_req_in->is_user_stream_action_set)
if (hf_private_req_in->is_user_stream_action_set)
{
hf_private_req_in->stream_action = hf_private_req_in->user_stream_action;
}
@@ -249,7 +322,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
{
hf_private_req_in->stream_action = ACTION_FORWARD_DATA;
}
/* Ignore parse the content which is nullptr. */
goto __boundary;
}
@@ -319,7 +392,9 @@ __out:
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private_destory(hs_private);
hs_private->ht_frame = NULL;
hs_private_destroy(hs_private);
}
return __action;
@@ -362,7 +437,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
/* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(
struct user_event_dispatch_closure, 1);
struct user_event_dispatch_closure, 1);
__closure->thread_id = thread_id;
__closure->stream = stream;
__closure->session = to_hs_public(hs_private);
@@ -439,12 +514,14 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
}
/* Nothing to do, everything is over, destroy the session */
/* Nothing to do, everything is over, destroy the session */
else
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private_destory(hs_private);
hs_private->ht_frame = NULL;
hs_private_destroy(hs_private);
}
}
@@ -516,21 +593,45 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int
enum tfe_stream_close_reason reason, void ** pme)
{
struct http_connection_private * ht_conn = (struct http_connection_private *) (*pme);
*pme = NULL;
struct http_plugin * plugin_ctx = g_http_plugin;
/* Delete all live sessions */
while (true)
struct http_session_private * hs_private_iter = NULL;
struct http_session_private * hs_private_titer = NULL;
TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_list, next, hs_private_titer)
{
struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list);
if (hs_private_iter == NULL) break;
TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next);
hs_private_destory(hs_private_iter);
/* Call the http frame to raise END event */
if (hs_private_iter->ht_frame)
{
struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id);
hs_private_iter->ht_frame = NULL;
}
hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]);
}
TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_orphan_list, next, hs_private_titer)
{
TAILQ_REMOVE(&ht_conn->hs_private_orphan_list, hs_private_iter, next);
if (hs_private_iter->ht_frame)
{
struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id);
hs_private_iter->ht_frame = NULL;
}
hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]);
}
/* Clear session counter, and free ht_conn structure */
ht_conn->session_id_counter = 0;
free(ht_conn);
*pme = NULL;
}
static struct tfe_plugin __http_plugin_info =
@@ -544,5 +645,4 @@ static struct tfe_plugin __http_plugin_info =
.on_close = http_connection_entry_close
};
TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info
)
TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info)