支持在应答侧suspend/resume功能,合并http_entry中处理request/response的流程

This commit is contained in:
Lu Qiuwen
2018-10-28 20:13:17 +08:00
parent cf64f01f7f
commit b677d8ad0f
6 changed files with 168 additions and 263 deletions

View File

@@ -39,7 +39,7 @@ static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg
TAILQ_FOREACH_SAFE(hs_private_iter, gc_list_hs_private, next, hs_private_titer)
{
assert(hs_private_iter->release_lock >= 0);
if(hs_private_iter->release_lock > 0) continue;
if (hs_private_iter->release_lock > 0) continue;
TAILQ_REMOVE(gc_list_hs_private, hs_private_iter, next);
@@ -52,7 +52,6 @@ static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg
}
hs_private_destroy(hs_private_iter);
fprintf(stderr, "---- http_plugin_session_gc_cb, close session by GC\n, %p", hs_private_iter);
}
}
@@ -66,6 +65,7 @@ int http_plugin_init(struct tfe_proxy * proxy)
#ifndef NDEBUG
pthread_mutex_init(&plugin_ctx->lock_list_hs_private[thread_id], NULL);
#endif
TAILQ_INIT(&plugin_ctx->gc_list_hs_private[thread_id]);
struct event_base * ev_base = tfe_proxy_get_work_thread_evbase(thread_id);
struct session_gc_cb_closure * closure = ALLOC(struct session_gc_cb_closure, 1);
@@ -189,7 +189,7 @@ static int __write_http_half_to_line(const struct tfe_stream * stream,
return 0;
}
int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private,
static int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private,
struct http_half_private * hf_private_req_in, bool & need_to_close_the_session)
{
struct http_connection_private * hc_private = hs_private->hc_private;
@@ -211,16 +211,10 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_
ret = __write_http_half_to_line(stream, CONN_DIR_UPSTREAM, hf_private_req_user);
if (unlikely(ret < 0))
{
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".);
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user. ");
return ret;
}
if (hf_private_req_user->message_status == STATUS_COMPLETE)
{
hf_private_destory(hf_private_req_user);
hs_private->hf_private_req_user = NULL;
}
if (hf_private_req_in->stream_action == ACTION_DROP_DATA ||
hf_private_req_in->stream_action == ACTION_DEFER_DATA)
{
@@ -240,21 +234,11 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_
if (unlikely(ret < 0))
{
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".);
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user.");
goto __errout;
}
if (hf_private_resp_user->message_status == STATUS_COMPLETE)
{
hf_private_destory(hf_private_resp_user);
hs_private->hf_private_resp_user = NULL;
need_to_close_the_session = true;
}
else
{
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
}
need_to_close_the_session = true;
}
return 0;
@@ -263,21 +247,37 @@ __errout:
return -1;
}
enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream,
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
static int __on_response_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private,
struct http_half_private * hf_private_resp_in, bool & need_to_close_the_session)
{
struct http_half_private * hf_private_resp_user = hs_private->hf_private_resp_user;
if (hf_private_resp_user == NULL)
{
return 0;
}
int ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user);
if (unlikely(ret < 0))
{
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP response setup by user.");
return -1;
}
if (hf_private_resp_in->stream_action == ACTION_DEFER_DATA)
{
hf_private_resp_in->stream_action = ACTION_DROP_DATA;
}
return 0;
}
static int __on_request_prepare_context(const struct tfe_stream * stream, unsigned int thread_id,
struct http_connection_private * hc_private, struct http_session_private ** hs_private_out,
struct http_half_private ** hf_private_out)
{
struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list);
struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private);
bool __need_to_close_the_session = false;
int ret = 0;
enum tfe_stream_action __action = ACTION_FORWARD_DATA;
size_t __action_args = 0;
/* There is no available in session list,
* that indicate all HTTP request has corresponding response,
* or the last request is finished, we need to create a new session. */
if (hs_private == NULL || hf_private_req_in->message_status == STATUS_COMPLETE)
{
/* HTTP Request and Session */
@@ -286,8 +286,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
hf_private_set_session(hf_private_req_in, hs_private);
/* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(
struct user_event_dispatch_closure, 1);
struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1);
__closure->thread_id = thread_id;
__closure->stream = stream;
__closure->session = to_hs_public(hs_private);
@@ -300,144 +299,31 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
if (hs_private->ht_frame == NULL)
{
TFE_STREAM_LOG_ERROR(stream, "Failed at raising session begin event. ");
goto __errout;
return -1;
}
TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next);
}
/* The session is suspended, and to resume */
if (hs_private->suspend_tag_effective)
{
enum tfe_http_event __backup_event = hs_private->suspend_event;
/* Clean up suspend tag, we can support user's call suspend in this callback */
hs_private->suspend_event = (enum tfe_http_event) 0;
hs_private->suspend_tag_effective = false;
hs_private->suspend_counter++;
/* Call user callback, tell user we resume from suspend */
hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0,
hf_private_req_in->event_cb_user);
#if 0
if (__on_request_handle_user_req_or_resp(stream, hs_private,
hf_private_req_in, __need_to_close_the_session) < 0)
{
goto __errout;
}
if (hf_private_req_in->is_user_stream_action_set)
{
hf_private_req_in->stream_action = hf_private_req_in->user_stream_action;
}
else
{
hf_private_req_in->stream_action = ACTION_FORWARD_DATA;
}
/* Ignore parse the content which is nullptr. */
goto __boundary;
#endif
}
/* Parse the content, the data which in defered state has been ignored. */
ret = hf_private_parse(hf_private_req_in, data, len);
/* Need more data, no boundary touched */
if (ret == 0)
{
if (hf_private_req_in->stream_action == ACTION_DROP_DATA ||
hf_private_req_in->stream_action == ACTION_FORWARD_DATA)
{
hf_private_req_in->parse_cursor = 0;
}
return hf_private_req_in->stream_action;
}
/* Suspend */
if (hs_private->suspend_tag_effective)
{
return ACTION_DEFER_DATA;
}
/* Some kind of error happened, write log and detach the stream */
if (ret == -1 && hf_private_req_in->is_passthrough)
{
goto __errout;
}
if (ret == -1)
{
TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s",
hf_private_req_in->parse_errno, http_errno_name(hf_private_req_in->parse_errno),
http_errno_description(hf_private_req_in->parse_errno));
goto __errout;
}
if (__on_request_handle_user_req_or_resp(stream, hs_private,
hf_private_req_in, __need_to_close_the_session) < 0)
{
goto __errout;
}
__boundary:
/* Touch a boundary, such as the end of HTTP headers, bodys, et al. */
__action_args = hf_private_req_in->parse_cursor;
hf_private_req_in->parse_cursor = 0;
if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA)
{
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_args, sizeof(__action_args));
__action = ACTION_FORWARD_DATA;
}
if (hf_private_req_in->stream_action == ACTION_DROP_DATA)
{
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_args, sizeof(__action_args));
__action = ACTION_DROP_DATA;
}
__out:
/* There is nothing for this session, close the session */
if (__need_to_close_the_session)
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private->ht_frame = NULL;
hs_private_destroy(hs_private);
}
return __action;
__errout:
tfe_stream_detach(stream);
return ACTION_FORWARD_DATA;
*hs_private_out = hs_private;
*hf_private_out = hf_private_req_in;
return 0;
}
enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stream * stream,
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
static int __on_response_prepare_context(const struct tfe_stream * stream, unsigned int thread_id,
struct http_connection_private * hc_private, struct http_session_private ** hs_private_out,
struct http_half_private ** hf_private_out)
{
struct http_session_private * hs_private = TAILQ_FIRST(&hc_private->hs_private_list);
struct http_half_private * hf_private_resp_in;
struct http_half_private * hf_private_resp_user;
enum tfe_stream_action __stream_action;
int ret = 0;
size_t __action_byptes;
/* Standalone response, it means missing something or malformed http protocol */
if (hs_private == NULL)
{
TFE_STREAM_LOG_ERROR(stream, "Standlone HTTP response emerged. Malformed HTTP Protocol, detached. ");
goto __errout;
return -1;
}
hf_private_resp_in = to_hf_response_private(hs_private);
struct http_half_private * hf_private_resp_in = to_hf_response_private(hs_private);
/* First time parse http response */
if (hf_private_resp_in == NULL)
{
@@ -467,106 +353,143 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
}
}
/* Parse the content, the data which in defered state has been ignored. */
ret = hf_private_parse(hf_private_resp_in, data, len);
*hs_private_out = hs_private;
*hf_private_out = hf_private_resp_in;
return 0;
}
enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, enum tfe_conn_dir dir,
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
{
struct http_session_private * hs_private = NULL;
struct http_half_private * hf_private_in = NULL;
bool __need_to_close_the_session = false;
int ret = 0;
enum tfe_stream_action __action = ACTION_FORWARD_DATA;
size_t __action_args = 0;
/* Prepare hs_private and hf_private_in */
ret = (dir == CONN_DIR_DOWNSTREAM) ?
__on_request_prepare_context(stream, thread_id, hc_private, &hs_private, &hf_private_in) :
__on_response_prepare_context(stream, thread_id, hc_private, &hs_private, &hf_private_in);
if (ret < 0)
{
goto __passthrough;
}
/* The session is suspended, and to resume */
if (hs_private->suspend_tag_effective)
{
enum tfe_http_event __backup_event = hs_private->suspend_event;
/* Clean up suspend tag, we can support user's call suspend in this callback */
hs_private->suspend_event = (enum tfe_http_event) 0;
hs_private->suspend_tag_effective = false;
hs_private->release_lock--;
hs_private->suspend_counter++;
/* Call user callback, tell user we resume from suspend */
assert(hs_private->resume_tag_singal);
hs_private->resume_tag_singal = false;
hf_private_in->event_cb(hf_private_in, __backup_event, NULL, 0, hf_private_in->event_cb_user);
}
/* Parse the content, the data which in deferred state has been ignored. */
ret = hf_private_parse(hf_private_in, data, len);
/* Suspend, ask by user's callback */
if (hs_private->suspend_tag_signal)
{
hs_private->suspend_tag_effective = true;
hs_private->suspend_tag_signal = false;
hs_private->release_lock++;
hs_private->suspend_counter++;
tfe_stream_suspend(stream, dir);
return ACTION_DEFER_DATA;
}
/* Need more data, no boundary touched */
if (ret == 0)
{
if (hf_private_resp_in->stream_action == ACTION_DROP_DATA ||
hf_private_resp_in->stream_action == ACTION_FORWARD_DATA)
if (hf_private_in->stream_action == ACTION_DROP_DATA ||
hf_private_in->stream_action == ACTION_FORWARD_DATA)
{
hf_private_resp_in->parse_cursor = 0;
hf_private_in->parse_cursor = 0;
}
return hf_private_resp_in->stream_action;
return hf_private_in->stream_action;
}
/* Need to passthrough */
if (ret == -1 && hf_private_resp_in->is_passthrough)
if (ret == -1 && hf_private_in->is_passthrough)
{
goto __errout;
assert(0);
goto __passthrough;
}
/* Some kind of error happened, write log and detach the stream */
if (ret == -1)
{
TFE_STREAM_LOG_ERROR(stream, "Failed at parsing HTTP response: %u, %s, %s",
hf_private_request->parse_errno, http_errno_name(hf_private_request->parse_errno),
http_errno_description(hf_private_request->parse_errno));
TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s",
hf_private_in->parse_errno, http_errno_name(hf_private_in->parse_errno),
http_errno_description(hf_private_in->parse_errno));
goto __errout;
goto __passthrough;
}
/* Upgrade, passthrough the connection and close this session */
if (hf_private_resp_in->is_upgrade)
ret = (dir == CONN_DIR_DOWNSTREAM) ?
__on_request_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session) :
__on_response_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session);
if (ret < 0)
{
tfe_stream_detach(stream);
hf_private_resp_in->stream_action = ACTION_FORWARD_DATA;
assert(0);
goto __passthrough;
}
hf_private_resp_user = hs_private->hf_private_resp_user;
if (hf_private_resp_user != NULL)
/* Touch a boundary, such as the end of HTTP headers, bodys, et al. */
__action_args = hf_private_in->parse_cursor;
hf_private_in->parse_cursor = 0;
if (hf_private_in->stream_action == ACTION_FORWARD_DATA)
{
ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user);
if (unlikely(ret < 0))
{
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP response setup by user".);
goto __errout;
}
if (hf_private_resp_user->message_status == STATUS_COMPLETE)
{
hf_private_destory(hf_private_resp_user);
hs_private->hf_private_resp_user = NULL;
}
if (hf_private_resp_in->stream_action == ACTION_DEFER_DATA)
{
hf_private_resp_in->stream_action = ACTION_DROP_DATA;
}
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_args, sizeof(__action_args));
__action = ACTION_FORWARD_DATA;
}
__stream_action = hf_private_resp_in->stream_action;
__action_byptes = hf_private_resp_in->parse_cursor;
hf_private_resp_in->parse_cursor = 0;
if (hf_private_resp_in->message_status == STATUS_COMPLETE)
if (hf_private_in->stream_action == ACTION_DROP_DATA)
{
/* Still sending user's response, should not destroy the session,
* move the session to orphan list, then we can handle the next session's response */
if (hs_private->hf_private_resp_user != NULL)
{
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
}
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_args, sizeof(__action_args));
__action = ACTION_DROP_DATA;
}
/* Nothing to do, everything is over, destroy the session */
else
/* ON RESPONSE, and input message is complete, need to close the session */
if (dir == CONN_DIR_UPSTREAM && hf_private_in->message_status == STATUS_COMPLETE)
{
__need_to_close_the_session = true;
}
/* There is nothing for this session, close the session */
if (__need_to_close_the_session)
{
/* Try to close the session */
if (hs_private_can_destroy(hs_private))
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private->ht_frame = NULL;
hs_private_destroy(hs_private);
}
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private_gc_destroy(hs_private, &g_http_plugin->gc_list_hs_private[thread_id]);
}
if (__stream_action == ACTION_FORWARD_DATA)
{
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes));
return ACTION_FORWARD_DATA;
}
return __action;
if (__stream_action == ACTION_DROP_DATA)
{
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes));
return ACTION_DROP_DATA;
}
goto __errout;
__errout:
__passthrough:
tfe_stream_detach(stream);
return ACTION_FORWARD_DATA;
}
@@ -608,8 +531,7 @@ enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stre
}
/* This stream has been preempt, this plugin try to parse it */
return (dir == CONN_DIR_DOWNSTREAM) ? __http_connection_entry_on_request(stream, ht_conn, thread_id,
data, len) : __http_connection_entry_on_response(stream, ht_conn, thread_id, data, len);
return http_connection_entry(stream, dir, ht_conn, thread_id, data, len);
__detach:
tfe_stream_detach(stream);
@@ -640,20 +562,6 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int
hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]);
}
TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_orphan_list, next, hs_private_titer)
{
TAILQ_REMOVE(&ht_conn->hs_private_orphan_list, hs_private_iter, next);
if (hs_private_iter->ht_frame)
{
struct tfe_http_session * hs_public = to_hs_public(hs_private_iter);
http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id);
hs_private_iter->ht_frame = NULL;
}
hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]);
}
/* Clear session counter, and free ht_conn structure */
ht_conn->session_id_counter = 0;
free(ht_conn);