STREAM挂起时记录挂起来源,在RESUME()触发对应连接方向的读事件。HTTP SUSPEND/RESUME功能初步调通

This commit is contained in:
Lu Qiuwen
2018-10-14 20:29:52 +08:00
parent 294201ecd8
commit e9c11824b0
6 changed files with 37 additions and 4 deletions

View File

@@ -83,7 +83,7 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx);
void tfe_stream_detach(const struct tfe_stream * stream); void tfe_stream_detach(const struct tfe_stream * stream);
int tfe_stream_preempt(const struct tfe_stream * stream); int tfe_stream_preempt(const struct tfe_stream * stream);
void tfe_stream_suspend(const struct tfe_stream * stream); void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by);
void tfe_stream_resume(const struct tfe_stream * stream); void tfe_stream_resume(const struct tfe_stream * stream);
//close both sides of the stream. //close both sides of the stream.

View File

@@ -98,6 +98,7 @@ struct tfe_stream_private
/* SUSPEND */ /* SUSPEND */
bool is_suspended; bool is_suspended;
enum tfe_conn_dir suspended_by;
}; };
static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream)

View File

@@ -106,7 +106,7 @@ int tfe_stream_preempt(const struct tfe_stream * stream)
return 0; return 0;
} }
void tfe_stream_suspend(const struct tfe_stream * stream) void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by)
{ {
struct tfe_stream_private * _stream = to_stream_private(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
assert(_stream != NULL); assert(_stream != NULL);
@@ -117,6 +117,7 @@ void tfe_stream_suspend(const struct tfe_stream * stream)
/* stream cannot be suspended twice or more */ /* stream cannot be suspended twice or more */
assert(!_stream->is_suspended); assert(!_stream->is_suspended);
_stream->is_suspended = true; _stream->is_suspended = true;
_stream->suspended_by = by;
/* disable all events */ /* disable all events */
bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
@@ -128,9 +129,20 @@ void tfe_stream_resume(const struct tfe_stream * stream)
struct tfe_stream_private * _stream = to_stream_private(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
assert(_stream->is_suspended); assert(_stream->is_suspended);
_stream->is_suspended = false;
bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
if(_stream->suspended_by == CONN_DIR_DOWNSTREAM)
{
bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
else
{
bufferevent_trigger(_stream->conn_upstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
_stream->is_suspended = false;
_stream->suspended_by = CONN_DIR_DOWNSTREAM;
} }
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)

View File

@@ -102,6 +102,18 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
hs_private->suspend_event = (enum tfe_http_event)0; hs_private->suspend_event = (enum tfe_http_event)0;
hs_private->suspend_tag_effective = false; hs_private->suspend_tag_effective = false;
hs_private->suspend_tag_user = false; hs_private->suspend_tag_user = false;
if (hf_private_req_in->is_user_stream_action_set)
{
hf_private_req_in->stream_action = hf_private_req_in->user_stream_action;
}
else
{
hf_private_req_in->stream_action = ACTION_FORWARD_DATA;
}
/* Ignore parse the content which is nullptr. */
return hf_private_req_in->stream_action;
} }
/* Parse the content, the data which in defered state has been ignored. */ /* Parse the content, the data which in defered state has been ignored. */
@@ -126,7 +138,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
hs_private->suspend_tag_effective = true; hs_private->suspend_tag_effective = true;
/* Suspend TCP stream */ /* Suspend TCP stream */
tfe_stream_suspend(stream); tfe_stream_suspend(stream, CONN_DIR_DOWNSTREAM);
return ACTION_DEFER_DATA; return ACTION_DEFER_DATA;
} }

View File

@@ -724,7 +724,10 @@ void hs_ops_suspend(struct tfe_http_session * session)
void hs_ops_resume(struct tfe_http_session * session) void hs_ops_resume(struct tfe_http_session * session)
{ {
struct http_session_private * hs_private = to_hs_private(session); struct http_session_private * hs_private = to_hs_private(session);
struct http_connection_private * hc_private = hs_private->hc_private;
hs_private->suspend_tag_user = false; hs_private->suspend_tag_user = false;
tfe_stream_resume(hc_private->stream);
} }
// TODO: change the return type to int, there is something happend where -1 returned. // TODO: change the return type to int, there is something happend where -1 returned.

View File

@@ -1485,6 +1485,11 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co
return; return;
} }
void tfe_stream_resume(const struct tfe_stream * stream)
{
return;
}
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);