From e9c11824b04e58086c27f9d0ab0d791acf976341 Mon Sep 17 00:00:00 2001 From: Lu Qiuwen Date: Sun, 14 Oct 2018 20:29:52 +0800 Subject: [PATCH] =?UTF-8?q?STREAM=E6=8C=82=E8=B5=B7=E6=97=B6=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E6=8C=82=E8=B5=B7=E6=9D=A5=E6=BA=90=EF=BC=8C=E5=9C=A8?= =?UTF-8?q?RESUME()=E8=A7=A6=E5=8F=91=E5=AF=B9=E5=BA=94=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=96=B9=E5=90=91=E7=9A=84=E8=AF=BB=E4=BA=8B=E4=BB=B6=E3=80=82?= =?UTF-8?q?HTTP=20SUSPEND/RESUME=E5=8A=9F=E8=83=BD=E5=88=9D=E6=AD=A5?= =?UTF-8?q?=E8=B0=83=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/include/tfe_stream.h | 2 +- platform/include/internal/platform.h | 1 + platform/src/tcp_stream.cpp | 16 ++++++++++++++-- plugin/protocol/http/src/http_entry.cpp | 14 +++++++++++++- plugin/protocol/http/src/http_half.cpp | 3 +++ plugin/protocol/http/test/test_http_half.cpp | 5 +++++ 6 files changed, 37 insertions(+), 4 deletions(-) diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index ff7f943..536f671 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -83,7 +83,7 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx); void tfe_stream_detach(const struct tfe_stream * stream); int tfe_stream_preempt(const struct tfe_stream * stream); -void tfe_stream_suspend(const struct tfe_stream * stream); +void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by); void tfe_stream_resume(const struct tfe_stream * stream); //close both sides of the stream. diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 0487b5f..7a5aad0 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -98,6 +98,7 @@ struct tfe_stream_private /* SUSPEND */ bool is_suspended; + enum tfe_conn_dir suspended_by; }; static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 134860c..986dab3 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -106,7 +106,7 @@ int tfe_stream_preempt(const struct tfe_stream * stream) return 0; } -void tfe_stream_suspend(const struct tfe_stream * stream) +void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) { struct tfe_stream_private * _stream = to_stream_private(stream); assert(_stream != NULL); @@ -117,6 +117,7 @@ void tfe_stream_suspend(const struct tfe_stream * stream) /* stream cannot be suspended twice or more */ assert(!_stream->is_suspended); _stream->is_suspended = true; + _stream->suspended_by = by; /* disable all events */ bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); @@ -128,9 +129,20 @@ void tfe_stream_resume(const struct tfe_stream * stream) struct tfe_stream_private * _stream = to_stream_private(stream); assert(_stream->is_suspended); - _stream->is_suspended = false; bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); + + if(_stream->suspended_by == CONN_DIR_DOWNSTREAM) + { + bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); + } + else + { + bufferevent_trigger(_stream->conn_upstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); + } + + _stream->is_suspended = false; + _stream->suspended_by = CONN_DIR_DOWNSTREAM; } struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index f266551..47bac67 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -102,6 +102,18 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea hs_private->suspend_event = (enum tfe_http_event)0; hs_private->suspend_tag_effective = false; hs_private->suspend_tag_user = false; + + if (hf_private_req_in->is_user_stream_action_set) + { + hf_private_req_in->stream_action = hf_private_req_in->user_stream_action; + } + else + { + hf_private_req_in->stream_action = ACTION_FORWARD_DATA; + } + + /* Ignore parse the content which is nullptr. */ + return hf_private_req_in->stream_action; } /* Parse the content, the data which in defered state has been ignored. */ @@ -126,7 +138,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea hs_private->suspend_tag_effective = true; /* Suspend TCP stream */ - tfe_stream_suspend(stream); + tfe_stream_suspend(stream, CONN_DIR_DOWNSTREAM); return ACTION_DEFER_DATA; } diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index e025f56..1fd9ac6 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -724,7 +724,10 @@ void hs_ops_suspend(struct tfe_http_session * session) void hs_ops_resume(struct tfe_http_session * session) { struct http_session_private * hs_private = to_hs_private(session); + struct http_connection_private * hc_private = hs_private->hc_private; + hs_private->suspend_tag_user = false; + tfe_stream_resume(hc_private->stream); } // TODO: change the return type to int, there is something happend where -1 returned. diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index d09c693..481692a 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -1485,6 +1485,11 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co return; } +void tfe_stream_resume(const struct tfe_stream * stream) +{ + return; +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv);