#34 实现HTTP解析层的Suspend/Resume操作

This commit is contained in:
Lu Qiuwen
2018-10-11 10:41:27 +08:00
committed by zhengchao
parent adb469395c
commit b8342e5358
9 changed files with 132 additions and 29 deletions

View File

@@ -83,8 +83,8 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx);
void tfe_stream_detach(const struct tfe_stream * stream); void tfe_stream_detach(const struct tfe_stream * stream);
int tfe_stream_preempt(const struct tfe_stream * stream); int tfe_stream_preempt(const struct tfe_stream * stream);
struct promise * tfe_stream_suspend(const struct tfe_stream * stream); void tfe_stream_suspend(const struct tfe_stream * stream);
void tfe_stream_resume(struct promise * promise); void tfe_stream_resume(const struct tfe_stream * stream);
//close both sides of the stream. //close both sides of the stream.
int tfe_stream_shutdown(const struct tfe_stream * stream); int tfe_stream_shutdown(const struct tfe_stream * stream);

View File

@@ -97,6 +97,9 @@ struct tfe_stream_private
future * future_upstream_create; future * future_upstream_create;
/* ASYNC DOWNSTREAM */ /* ASYNC DOWNSTREAM */
future * future_downstream_create; future * future_downstream_create;
/* SUSPEND */
bool is_suspended;
}; };
static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream)

View File

@@ -89,7 +89,6 @@ void tfe_stream_detach(const struct tfe_stream * stream)
struct tfe_stream_private * _stream = to_stream_private(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
int plug_id = _stream->calling_idx; int plug_id = _stream->calling_idx;
_stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED; _stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED;
return;
} }
int tfe_stream_preempt(const struct tfe_stream * stream) int tfe_stream_preempt(const struct tfe_stream * stream)
@@ -107,6 +106,33 @@ int tfe_stream_preempt(const struct tfe_stream * stream)
return 0; return 0;
} }
void tfe_stream_suspend(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
assert(_stream != NULL);
assert(_stream->conn_upstream != NULL && _stream->conn_downstream != NULL);
assert(_stream->conn_upstream->bev != NULL);
assert(_stream->conn_downstream->bev != NULL);
/* stream cannot be suspended twice or more */
assert(!_stream->is_suspended);
_stream->is_suspended = true;
/* disable all events */
bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
}
void tfe_stream_resume(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
assert(_stream->is_suspended);
_stream->is_suspended = false;
bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
}
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{ {
struct tfe_stream_private * _stream = to_stream_private(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
@@ -136,7 +162,6 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir);
this_conn->on_writing = 0; this_conn->on_writing = 0;
bufferevent_enable(peer_conn->bev, EV_READ); bufferevent_enable(peer_conn->bev, EV_READ);
return;
} }
int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size)
@@ -894,7 +919,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
if (unlikely(_stream->head.addr == NULL)) if (unlikely(_stream->head.addr == NULL))
{ {
TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.",
fd_downstream, fd_upstream); goto __errout; fd_downstream, fd_upstream);
goto __errout;
} }
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);

View File

@@ -18,6 +18,11 @@ add_executable(test-http-convert test/test_http_convert.cpp)
target_include_directories(test-http-convert PRIVATE include/internal) target_include_directories(test-http-convert PRIVATE include/internal)
target_link_libraries(test-http-convert gtest common http) target_link_libraries(test-http-convert gtest common http)
add_executable(test-http-entry test/test_http_convert.cpp)
target_include_directories(test-http-entry PRIVATE include/internal)
target_link_libraries(test-http-entry gtest common http)
include(GoogleTest) include(GoogleTest)
gtest_discover_tests(test-http-half) gtest_discover_tests(test-http-half)
gtest_discover_tests(test-http-convert) gtest_discover_tests(test-http-convert)
gtest_discover_tests(test-http-entry)

View File

@@ -34,6 +34,12 @@ struct http_session_private
struct http_half_private * hf_private_req_user; struct http_half_private * hf_private_req_user;
/* USER SETUP RESPONSE HALF */ /* USER SETUP RESPONSE HALF */
struct http_half_private * hf_private_resp_user; struct http_half_private * hf_private_resp_user;
/* SUSPEND TAG */
bool suspend_tag_user;
/* SUSPEND EVENT */
tfe_http_event suspend_event;
/* SUSPEND TAG EFFECTIVE */
bool suspend_tag_effective;
}; };
struct http_connection_private struct http_connection_private

View File

@@ -93,6 +93,17 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next);
} }
/* The session is suspended, and to resume */
if (hs_private->suspend_tag_effective)
{
hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0,
hf_private_req_in->event_cb_user);
hs_private->suspend_event = (enum tfe_http_event)0;
hs_private->suspend_tag_effective = false;
hs_private->suspend_tag_user = false;
}
/* Parse the content, the data which in defered state has been ignored. */ /* Parse the content, the data which in defered state has been ignored. */
ret = hf_private_parse(hf_private_req_in, data, len); ret = hf_private_parse(hf_private_req_in, data, len);
@@ -108,6 +119,17 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
return hf_private_req_in->stream_action; return hf_private_req_in->stream_action;
} }
/* Suspend */
if (hs_private->suspend_tag_user)
{
assert(!hs_private->suspend_tag_effective);
hs_private->suspend_tag_effective = true;
/* Suspend TCP stream */
tfe_stream_suspend(stream);
return ACTION_DEFER_DATA;
}
/* Some kind of error happened, write log and detach the stream */ /* Some kind of error happened, write log and detach the stream */
if (ret == -1) if (ret == -1)
{ {

View File

@@ -11,6 +11,7 @@
#include <tfe_utils.h> #include <tfe_utils.h>
#include <http_half.h> #include <http_half.h>
#include <http_convert.h> #include <http_convert.h>
#include <event.h>
#define __PARSER_TO_HF_PRIVATE(_parser) ((struct http_half_private *)(_parser->data)) #define __PARSER_TO_HF_PRIVATE(_parser) ((struct http_half_private *)(_parser->data))
@@ -285,6 +286,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
enum tfe_http_direction hf_direction = hf_private->hf_public.direction; enum tfe_http_direction hf_direction = hf_private->hf_public.direction;
struct http_session_private * hs_private = hf_private->session;
if (evbuffer_get_length(hf_private->evbuf_header_field) != 0) if (evbuffer_get_length(hf_private->evbuf_header_field) != 0)
{ {
@@ -296,10 +298,10 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
hf_public->minor_version = parser->http_minor; hf_public->minor_version = parser->http_minor;
/* Copy version to session */ /* Copy version to session */
if (hf_private->session != NULL) if (hs_private != NULL)
{ {
to_hs_public(hf_private->session)->major_version = hf_public->major_version; to_hs_public(hs_private)->major_version = hf_public->major_version;
to_hs_public(hf_private->session)->minor_version = hf_public->minor_version; to_hs_public(hs_private)->minor_version = hf_public->minor_version;
} }
if (hf_direction == TFE_HTTP_REQUEST) if (hf_direction == TFE_HTTP_REQUEST)
@@ -311,20 +313,37 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
__hf_public_resp_fill_from_private(hf_private, parser); __hf_public_resp_fill_from_private(hf_private, parser);
} }
if (hf_private->event_cb && hf_direction == TFE_HTTP_REQUEST) tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR;
if (hf_private->event_cb)
{ {
hf_private->event_cb(hf_private, EV_HTTP_REQ_HDR, NULL, 0, hf_private->event_cb_user); hf_private->event_cb(hf_private, EV_HTTP_REQ_HDR, NULL, 0, hf_private->event_cb_user);
} }
if (hf_private->event_cb && hf_direction == TFE_HTTP_RESPONSE) /* The setup of user stream option indicates that the way to handle the request/response has
{ * been decided, we should conform the resolution */
hf_private->event_cb(hf_private, EV_HTTP_RESP_HDR, NULL, 0, hf_private->event_cb_user);
}
if (hf_private->is_user_stream_action_set) if (hf_private->is_user_stream_action_set)
{ {
assert(hf_private->stream_action == ACTION_DEFER_DATA);
hf_private->stream_action = hf_private->user_stream_action; hf_private->stream_action = hf_private->user_stream_action;
} }
/* user's suspend tag is set, which indicate that the way to handle request/response
* cannot be determinate at now, need to defer */
else if (hs_private && hs_private->suspend_tag_user)
{
/* Pause parser, prevent to parse request/response body,
* The body should be parsed after resume() */
http_parser_pause(parser, 1);
/* Record the event, this event will be trigger again at resume() */
hs_private->suspend_event = event;
/* Suspend must be setup at DEFER status.
* The request/response cannot be suspend once any bytes has been forwarded to upstream */
assert(hf_private->stream_action == ACTION_DEFER_DATA);
}
/* Otherwise, forward the request/response */
else else
{ {
hf_private->stream_action = ACTION_FORWARD_DATA; hf_private->stream_action = ACTION_FORWARD_DATA;
@@ -468,11 +487,12 @@ int hf_ops_field_write(struct tfe_http_half * half, const struct http_field_name
TAILQ_FOREACH(__header_iter, &hf_private->header_list, next) TAILQ_FOREACH(__header_iter, &hf_private->header_list, next)
{ {
if (http_field_name_compare(__header_iter->field, field) != 0) continue; if (http_field_name_compare(__header_iter->field, field) != 0) continue;
__header_found = __header_iter; break; __header_found = __header_iter;
break;
} }
/* Update the value */ /* Update the value */
if(__header_found != NULL && value != NULL) if (__header_found != NULL && value != NULL)
{ {
free(__header_found->value); free(__header_found->value);
__header_found->value = tfe_strdup(value); __header_found->value = tfe_strdup(value);
@@ -648,18 +668,23 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char
size_t sz_parsed = http_parser_execute(hf_private->parse_object, size_t sz_parsed = http_parser_execute(hf_private->parse_object,
&__http_half_parse_setting, __data_with_offset, __len_with_offset); &__http_half_parse_setting, __data_with_offset, __len_with_offset);
/* Nothing happended */ bool __is_paused = false;
if (HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED)
{
http_parser_pause(hf_private->parse_object, 0);
__is_paused = true;
}
if (sz_parsed == __len_with_offset) if (sz_parsed == __len_with_offset)
{ {
hf_private->parse_cursor += sz_parsed; hf_private->parse_cursor += sz_parsed;
return HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED ? 1 : 0; return __is_paused ? 1 : 0;
} }
/* The paused parsar indicate the message boundary has been touched, we should return. /* The paused parsar indicate the message boundary has been touched, we should return.
* resume it to normal status */ * resume it to normal status */
if (HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED) if (__is_paused)
{ {
http_parser_pause(hf_private->parse_object, 0);
hf_private->parse_cursor += sz_parsed + 1; hf_private->parse_cursor += sz_parsed + 1;
return 1; return 1;
} }
@@ -682,8 +707,18 @@ void hs_ops_detach(const struct tfe_http_session * session)
} }
void hs_ops_drop(struct tfe_http_session * session) void hs_ops_drop(struct tfe_http_session * session)
{}
void hs_ops_suspend(struct tfe_http_session * session)
{ {
return; struct http_session_private * hs_private = to_hs_private(session);
hs_private->suspend_tag_user = true;
}
void hs_ops_resume(struct tfe_http_session * session)
{
struct http_session_private * hs_private = to_hs_private(session);
hs_private->suspend_tag_user = false;
} }
// TODO: change the return type to int, there is something happend where -1 returned. // TODO: change the return type to int, there is something happend where -1 returned.
@@ -781,6 +816,8 @@ struct tfe_http_session_ops __http_session_ops =
.ops_allow_write = hs_ops_allow_write, .ops_allow_write = hs_ops_allow_write,
.ops_detach = hs_ops_detach, .ops_detach = hs_ops_detach,
.ops_drop = hs_ops_drop, .ops_drop = hs_ops_drop,
.ops_suspend = hs_ops_suspend,
.ops_resume = hs_ops_resume,
.ops_request_set = hs_ops_request_set, .ops_request_set = hs_ops_request_set,
.ops_response_set = hs_ops_response_set, .ops_response_set = hs_ops_response_set,
.ops_request_create = hs_ops_request_create, .ops_request_create = hs_ops_request_create,

View File

@@ -0,0 +1,4 @@
//
// Created by luqiu on 2018/10/10.
//