diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index 7c58112..3cf0bca 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -158,11 +158,11 @@ enum tfe_http_std_field TFE_HTTP_EXPIRES, TFE_HTTP_ACCEPT_ENCODING, TFE_HTTP_CACHE_CONTROL, - TLF_HTTP_IF_MATCH, - TLF_HTTP_IF_NONE_MATCH, - TLF_HTTP_IF_MODIFIED_SINCE, - TLF_HTTP_IF_UNMODIFIED_SINCE, - TLF_HTTP_LAST_MODIFIED + TLF_HTTP_IF_MATCH, + TLF_HTTP_IF_NONE_MATCH, + TLF_HTTP_IF_MODIFIED_SINCE, + TLF_HTTP_IF_UNMODIFIED_SINCE, + TLF_HTTP_LAST_MODIFIED }; enum http_ev_bit_number diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index b5e3980..ff7f943 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -83,8 +83,8 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx); void tfe_stream_detach(const struct tfe_stream * stream); int tfe_stream_preempt(const struct tfe_stream * stream); -struct promise * tfe_stream_suspend(const struct tfe_stream * stream); -void tfe_stream_resume(struct promise * promise); +void tfe_stream_suspend(const struct tfe_stream * stream); +void tfe_stream_resume(const struct tfe_stream * stream); //close both sides of the stream. int tfe_stream_shutdown(const struct tfe_stream * stream); diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index b39dbf4..3a40d79 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -97,6 +97,9 @@ struct tfe_stream_private future * future_upstream_create; /* ASYNC DOWNSTREAM */ future * future_downstream_create; + + /* SUSPEND */ + bool is_suspended; }; static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 77c89a3..134860c 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -89,7 +89,6 @@ void tfe_stream_detach(const struct tfe_stream * stream) struct tfe_stream_private * _stream = to_stream_private(stream); int plug_id = _stream->calling_idx; _stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED; - return; } int tfe_stream_preempt(const struct tfe_stream * stream) @@ -107,6 +106,33 @@ int tfe_stream_preempt(const struct tfe_stream * stream) return 0; } +void tfe_stream_suspend(const struct tfe_stream * stream) +{ + struct tfe_stream_private * _stream = to_stream_private(stream); + assert(_stream != NULL); + assert(_stream->conn_upstream != NULL && _stream->conn_downstream != NULL); + assert(_stream->conn_upstream->bev != NULL); + assert(_stream->conn_downstream->bev != NULL); + + /* stream cannot be suspended twice or more */ + assert(!_stream->is_suspended); + _stream->is_suspended = true; + + /* disable all events */ + bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); + bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); +} + +void tfe_stream_resume(const struct tfe_stream * stream) +{ + struct tfe_stream_private * _stream = to_stream_private(stream); + assert(_stream->is_suspended); + + _stream->is_suspended = false; + bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); + bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); +} + struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) { struct tfe_stream_private * _stream = to_stream_private(stream); @@ -136,7 +162,6 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); this_conn->on_writing = 0; bufferevent_enable(peer_conn->bev, EV_READ); - return; } int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) @@ -894,7 +919,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst if (unlikely(_stream->head.addr == NULL)) { TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", - fd_downstream, fd_upstream); goto __errout; + fd_downstream, fd_upstream); + goto __errout; } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); diff --git a/plugin/protocol/http/CMakeLists.txt b/plugin/protocol/http/CMakeLists.txt index 1422057..7bfa34b 100644 --- a/plugin/protocol/http/CMakeLists.txt +++ b/plugin/protocol/http/CMakeLists.txt @@ -18,6 +18,11 @@ add_executable(test-http-convert test/test_http_convert.cpp) target_include_directories(test-http-convert PRIVATE include/internal) target_link_libraries(test-http-convert gtest common http) +add_executable(test-http-entry test/test_http_convert.cpp) +target_include_directories(test-http-entry PRIVATE include/internal) +target_link_libraries(test-http-entry gtest common http) + include(GoogleTest) gtest_discover_tests(test-http-half) gtest_discover_tests(test-http-convert) +gtest_discover_tests(test-http-entry) diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 67245df..429fdff 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -34,6 +34,12 @@ struct http_session_private struct http_half_private * hf_private_req_user; /* USER SETUP RESPONSE HALF */ struct http_half_private * hf_private_resp_user; + /* SUSPEND TAG */ + bool suspend_tag_user; + /* SUSPEND EVENT */ + tfe_http_event suspend_event; + /* SUSPEND TAG EFFECTIVE */ + bool suspend_tag_effective; }; struct http_connection_private diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 826afa8..f266551 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -93,6 +93,17 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); } + /* The session is suspended, and to resume */ + if (hs_private->suspend_tag_effective) + { + hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, + hf_private_req_in->event_cb_user); + + hs_private->suspend_event = (enum tfe_http_event)0; + hs_private->suspend_tag_effective = false; + hs_private->suspend_tag_user = false; + } + /* Parse the content, the data which in defered state has been ignored. */ ret = hf_private_parse(hf_private_req_in, data, len); @@ -108,6 +119,17 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea return hf_private_req_in->stream_action; } + /* Suspend */ + if (hs_private->suspend_tag_user) + { + assert(!hs_private->suspend_tag_effective); + hs_private->suspend_tag_effective = true; + + /* Suspend TCP stream */ + tfe_stream_suspend(stream); + return ACTION_DEFER_DATA; + } + /* Some kind of error happened, write log and detach the stream */ if (ret == -1) { diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 2a6c151..b5149b7 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #define __PARSER_TO_HF_PRIVATE(_parser) ((struct http_half_private *)(_parser->data)) @@ -285,6 +286,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); enum tfe_http_direction hf_direction = hf_private->hf_public.direction; + struct http_session_private * hs_private = hf_private->session; if (evbuffer_get_length(hf_private->evbuf_header_field) != 0) { @@ -296,10 +298,10 @@ static int __parser_callback_on_headers_complete(http_parser * parser) hf_public->minor_version = parser->http_minor; /* Copy version to session */ - if (hf_private->session != NULL) + if (hs_private != NULL) { - to_hs_public(hf_private->session)->major_version = hf_public->major_version; - to_hs_public(hf_private->session)->minor_version = hf_public->minor_version; + to_hs_public(hs_private)->major_version = hf_public->major_version; + to_hs_public(hs_private)->minor_version = hf_public->minor_version; } if (hf_direction == TFE_HTTP_REQUEST) @@ -311,20 +313,37 @@ static int __parser_callback_on_headers_complete(http_parser * parser) __hf_public_resp_fill_from_private(hf_private, parser); } - if (hf_private->event_cb && hf_direction == TFE_HTTP_REQUEST) + tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR; + if (hf_private->event_cb) { hf_private->event_cb(hf_private, EV_HTTP_REQ_HDR, NULL, 0, hf_private->event_cb_user); } - if (hf_private->event_cb && hf_direction == TFE_HTTP_RESPONSE) - { - hf_private->event_cb(hf_private, EV_HTTP_RESP_HDR, NULL, 0, hf_private->event_cb_user); - } - + /* The setup of user stream option indicates that the way to handle the request/response has + * been decided, we should conform the resolution */ if (hf_private->is_user_stream_action_set) { + assert(hf_private->stream_action == ACTION_DEFER_DATA); hf_private->stream_action = hf_private->user_stream_action; } + + /* user's suspend tag is set, which indicate that the way to handle request/response + * cannot be determinate at now, need to defer */ + else if (hs_private && hs_private->suspend_tag_user) + { + /* Pause parser, prevent to parse request/response body, + * The body should be parsed after resume() */ + http_parser_pause(parser, 1); + + /* Record the event, this event will be trigger again at resume() */ + hs_private->suspend_event = event; + + /* Suspend must be setup at DEFER status. + * The request/response cannot be suspend once any bytes has been forwarded to upstream */ + assert(hf_private->stream_action == ACTION_DEFER_DATA); + } + + /* Otherwise, forward the request/response */ else { hf_private->stream_action = ACTION_FORWARD_DATA; @@ -468,23 +487,24 @@ int hf_ops_field_write(struct tfe_http_half * half, const struct http_field_name TAILQ_FOREACH(__header_iter, &hf_private->header_list, next) { if (http_field_name_compare(__header_iter->field, field) != 0) continue; - __header_found = __header_iter; break; + __header_found = __header_iter; + break; } /* Update the value */ - if(__header_found != NULL && value != NULL) + if (__header_found != NULL && value != NULL) { free(__header_found->value); __header_found->value = tfe_strdup(value); } - /* Delete the key and value */ + /* Delete the key and value */ else if (__header_found != NULL && value == NULL) { TAILQ_REMOVE(&hf_private->header_list, __header_found, next); free(__header_found->value); free(__header_found); } - /* Insert a new header k-v in the tail of the header list */ + /* Insert a new header k-v in the tail of the header list */ else if (__header_found == NULL && value != NULL) { struct http_header_private * __header = ALLOC(struct http_header_private, 1); @@ -492,7 +512,7 @@ int hf_ops_field_write(struct tfe_http_half * half, const struct http_field_name __header->value = tfe_strdup(value); TAILQ_INSERT_TAIL(&hf_private->header_list, __header, next); } - /* Nothing found, and delete nothing */ + /* Nothing found, and delete nothing */ else { return -1; @@ -648,18 +668,23 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char size_t sz_parsed = http_parser_execute(hf_private->parse_object, &__http_half_parse_setting, __data_with_offset, __len_with_offset); - /* Nothing happended */ + bool __is_paused = false; + if (HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED) + { + http_parser_pause(hf_private->parse_object, 0); + __is_paused = true; + } + if (sz_parsed == __len_with_offset) { hf_private->parse_cursor += sz_parsed; - return HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED ? 1 : 0; + return __is_paused ? 1 : 0; } /* The paused parsar indicate the message boundary has been touched, we should return. * resume it to normal status */ - if (HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED) + if (__is_paused) { - http_parser_pause(hf_private->parse_object, 0); hf_private->parse_cursor += sz_parsed + 1; return 1; } @@ -682,8 +707,18 @@ void hs_ops_detach(const struct tfe_http_session * session) } void hs_ops_drop(struct tfe_http_session * session) +{} + +void hs_ops_suspend(struct tfe_http_session * session) { - return; + struct http_session_private * hs_private = to_hs_private(session); + hs_private->suspend_tag_user = true; +} + +void hs_ops_resume(struct tfe_http_session * session) +{ + struct http_session_private * hs_private = to_hs_private(session); + hs_private->suspend_tag_user = false; } // TODO: change the return type to int, there is something happend where -1 returned. @@ -781,6 +816,8 @@ struct tfe_http_session_ops __http_session_ops = .ops_allow_write = hs_ops_allow_write, .ops_detach = hs_ops_detach, .ops_drop = hs_ops_drop, + .ops_suspend = hs_ops_suspend, + .ops_resume = hs_ops_resume, .ops_request_set = hs_ops_request_set, .ops_response_set = hs_ops_response_set, .ops_request_create = hs_ops_request_create, diff --git a/plugin/protocol/http/test/test_http_entry.cpp b/plugin/protocol/http/test/test_http_entry.cpp new file mode 100644 index 0000000..5dc7cab --- /dev/null +++ b/plugin/protocol/http/test/test_http_entry.cpp @@ -0,0 +1,4 @@ +// +// Created by luqiu on 2018/10/10. +// +