From 2bc366fb130b2a0e2bee73107f8e3102db59528a Mon Sep 17 00:00:00 2001 From: Lu Qiuwen Date: Tue, 16 Oct 2018 10:45:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0HTTP=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E6=9E=84=E9=80=A0Body=E7=9A=84=E6=8E=A5=E5=8F=A3=E5=8F=8A?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=B9=B6=E4=BF=AE=E6=AD=A3HTTP=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/include/tfe_http.h | 26 ++++- common/src/tfe_http.cpp | 10 -- .../http/include/internal/http_common.h | 2 + .../http/include/internal/http_half.h | 3 + plugin/protocol/http/src/http_entry.cpp | 108 ++++++++++++++---- plugin/protocol/http/src/http_half.cpp | 80 ++++++++++++- plugin/protocol/http/test/test_http_half.cpp | 16 ++- vendor/CMakeLists.txt | 1 - 8 files changed, 202 insertions(+), 44 deletions(-) diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index 1e78212..ae3c3b0 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -222,6 +222,9 @@ struct tfe_http_half_ops struct tfe_http_half * (* ops_http_allow_write)(const struct tfe_http_half *); const char * (* ops_http_field_iterate)(const struct tfe_http_half *, void **, struct http_field_name *); int (* ops_append_body)(struct tfe_http_half *, char *, size_t, int); + int (* ops_body_begin)(struct tfe_http_half *, int by_stream); + int (* ops_body_data)(struct tfe_http_half *, const unsigned char * data, size_t sz_data); + int (* ops_body_end)(struct tfe_http_half *); void (* ops_free)(struct tfe_http_half * half); }; @@ -303,6 +306,7 @@ static inline const char * tfe_http_field_read(const struct tfe_http_half * half { return half->ops->ops_http_field_read(half, name); } + static inline const char * tfe_http_std_field_read(const struct tfe_http_half * half, enum tfe_http_std_field field_id) { struct http_field_name tmp_name; @@ -316,6 +320,7 @@ static inline int tfe_http_field_write(struct tfe_http_half * half, { return half->ops->ops_http_field_write(half, name, value); } + static inline int tfe_http_std_field_write(struct tfe_http_half * half, enum tfe_http_std_field field_id, const char * value) { @@ -338,10 +343,10 @@ static inline struct tfe_http_half * tfe_http_allow_write(const struct tfe_http_ return half->ops->ops_http_allow_write(half); } -static inline const char * tfe_http_field_iterate(const struct tfe_http_half * half, - void ** interator, struct http_field_name * name) +static inline const char * tfe_http_field_iterate(const struct tfe_http_half * half, void ** iterator, + struct http_field_name * name) { - return half->ops->ops_http_field_iterate(half, interator, name); + return half->ops->ops_http_field_iterate(half, iterator, name); } static inline int tfe_http_half_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag) @@ -354,6 +359,21 @@ static inline void tfe_http_half_free(struct tfe_http_half * half) return half->ops->ops_free(half); } +static inline int tfe_http_half_write_body_begin(struct tfe_http_half * half, int by_stream) +{ + return half->ops->ops_body_begin(half, by_stream); +} + +static inline int tfe_http_half_write_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data) +{ + return half->ops->ops_body_data(half, data, sz_data); +} + +static inline int tfe_http_half_write_body_end(struct tfe_http_half * half) +{ + return half->ops->ops_body_end(half); +} + static inline struct tfe_http_session * tfe_http_session_allow_write(const struct tfe_http_session * session) { return session->ops->ops_allow_write(session); diff --git a/common/src/tfe_http.cpp b/common/src/tfe_http.cpp index 53196ef..0980207 100644 --- a/common/src/tfe_http.cpp +++ b/common/src/tfe_http.cpp @@ -4,16 +4,6 @@ #include #include -struct tfe_http_half * tfe_http_request_create(int major_version, int method, const char * uri, const char * host) -{ - return NULL; -} - -struct tfe_http_half * tfe_http_response_create(int major_version, int resp_code) -{ - return NULL; -} - static const char * __str_std_header_field_map[] = { [TFE_HTTP_UNKNOWN_FIELD] = NULL, diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 429fdff..ccc0b0b 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -48,6 +48,8 @@ struct http_connection_private const struct tfe_stream * stream; /* SESSION LIST, REQUEST-RESPONSE PAIRS */ struct hs_private_list hs_private_list; + /* ORPHAN SESSION LIST */ + struct hs_private_list hs_private_orphan_list; /* IS PREEMPTED */ unsigned int is_preempted; /* SESSION ID COUNTER */ diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 1a09995..37da9a6 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -87,8 +87,11 @@ struct http_half_private /* Setup by User */ bool is_setup_by_user; + bool is_setup_by_stream; + struct evbuffer * evbuf_body; struct evbuffer * evbuf_raw; + struct tfe_stream_write_ctx * write_ctx; }; struct http_half_private * hf_private_create(tfe_http_direction ht_dir, short major, short minor); diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 47bac67..a947181 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -52,6 +52,53 @@ static int __user_event_dispatch(struct http_half_private * hf_private, return 0; } +static int __write_http_half(struct http_half_private * hf_private, + const struct tfe_stream * stream, enum tfe_conn_dir dir) +{ + /* Construct, and write response immediately */ + hf_private_construct(hf_private); + size_t __to_write_len = evbuffer_get_length(hf_private->evbuf_raw); + unsigned char * __to_write = evbuffer_pullup(hf_private->evbuf_raw, __to_write_len); + + /* Write the data to stream, UPSTREAM is the incoming direction for response */ + int ret = tfe_stream_write(stream, dir, __to_write, __to_write_len); + if (unlikely(ret < 0)) { assert(0); } + + evbuffer_drain(hf_private->evbuf_raw, __to_write_len); + return ret; +} + +static int __write_http_half_to_line(const struct tfe_stream * stream, + enum tfe_conn_dir dir, struct http_half_private * hf_private) +{ + int ret = 0; + + if (hf_private->is_setup_by_stream) + { + /* By stream, first time to write http request/response, construct the header */ + if (hf_private->write_ctx == NULL) + { + ret = __write_http_half(hf_private, stream, dir); + if (unlikely(ret < 0)) return ret; + + /* Alloc stream write ctx */ + hf_private->write_ctx = tfe_stream_write_frag_start(stream, dir); + if (unlikely(hf_private->write_ctx == NULL)) return -1; + } + } + else + { + /* Not stream, need to be complete, then construct all data and send out */ + if (hf_private->message_status == STATUS_COMPLETE) + { + ret = __write_http_half(hf_private, stream, dir); + if (unlikely(ret < 0)) return ret; + } + } + + return 0; +} + enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream, struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) { @@ -59,7 +106,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private); struct http_half_private * hf_private_req_user; - /* tfe_hexdump2file(stderr, __FUNCTION__, data, (unsigned int)len); */ +/* tfe_hexdump2file(stderr, __FUNCTION__, data, (unsigned int)len); */ int ret = 0; size_t __action_byptes; @@ -99,7 +146,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, hf_private_req_in->event_cb_user); - hs_private->suspend_event = (enum tfe_http_event)0; + hs_private->suspend_event = (enum tfe_http_event) 0; hs_private->suspend_tag_effective = false; hs_private->suspend_tag_user = false; @@ -197,6 +244,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre struct http_session_private * hs_private = TAILQ_FIRST(&hc_private->hs_private_list); struct http_half_private * hf_private_resp_in; struct http_half_private * hf_private_resp_user; + enum tfe_stream_action __stream_action; int ret = 0; size_t __action_byptes; @@ -264,38 +312,52 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre } hf_private_resp_user = hs_private->hf_private_resp_user; - if (hf_private_resp_user != NULL && hf_private_resp_user->message_status == STATUS_COMPLETE) + if (hf_private_resp_user != NULL) { - /* Construct, and write response immediately */ - hf_private_construct(hf_private_resp_user); - size_t __to_write_len = evbuffer_get_length(hf_private_resp_user->evbuf_raw); - unsigned char * __to_write = evbuffer_pullup(hf_private_resp_user->evbuf_raw, __to_write_len); + ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user); + if (unlikely(ret < 0)) + { + TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP response setup by user".); + goto __errout; + } - /* Write the data to stream, UPSTREAM is the incoming direction for response */ - ret = tfe_stream_write(stream, CONN_DIR_DOWNSTREAM, __to_write, __to_write_len); - if (unlikely(ret < 0)) { assert(0); } - - hf_private_destory(hf_private_resp_user); - hs_private->hf_private_resp_user = NULL; - } - - if (hf_private_resp_in->message_status == STATUS_COMPLETE) - { - http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); - TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - hs_private_destory(hs_private); + if (hf_private_resp_user->message_status == STATUS_COMPLETE) + { + hf_private_destory(hf_private_resp_user); + hs_private->hf_private_resp_user = NULL; + } } + __stream_action = hf_private_resp_in->stream_action; __action_byptes = hf_private_resp_in->parse_cursor; hf_private_resp_in->parse_cursor = 0; - if (hf_private_resp_in->stream_action == ACTION_FORWARD_DATA) + if (hf_private_resp_in->message_status == STATUS_COMPLETE) + { + /* Still sending user's response, should not destroy the session, + * move the session to orphan list, then we can handle the next session's response */ + if (hs_private->hf_private_resp_user != NULL) + { + TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); + TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); + } + + /* Nothing to do, everything is over, destroy the session */ + else + { + http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); + TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); + hs_private_destory(hs_private); + } + } + + if (__stream_action == ACTION_FORWARD_DATA) { tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes)); return ACTION_FORWARD_DATA; } - if (hf_private_resp_in->stream_action == ACTION_DROP_DATA) + if (__stream_action == ACTION_DROP_DATA) { tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes)); return ACTION_DROP_DATA; @@ -360,7 +422,7 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int *pme = NULL; /* Delete all live sessions */ - while(true) + while (true) { struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list); if (hs_private_iter == NULL) break; diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 1fd9ac6..58deb00 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -333,8 +333,8 @@ static int __parser_callback_on_headers_complete(http_parser * parser) hf_private->stream_action = hf_private->user_stream_action; } - /* user's suspend tag is set, which indicate that the way to handle request/response - * cannot be determinate at now, need to defer */ + /* user's suspend tag is set, which indicate that the way to handle request/response + * cannot be determinate at now, need to defer */ else if (hs_private && hs_private->suspend_tag_user) { /* Pause parser, prevent to parse request/response body, @@ -349,7 +349,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) assert(hf_private->stream_action == ACTION_DEFER_DATA); } - /* Otherwise, forward the request/response */ + /* Otherwise, forward the request/response */ else { hf_private->stream_action = ACTION_FORWARD_DATA; @@ -584,6 +584,77 @@ int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, in return ret; } +int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) +{ + struct http_half_private * hf_private = to_hf_private(half); + + assert(hf_private->evbuf_body == NULL); + if (hf_private->evbuf_body != NULL) + { + evbuffer_free(hf_private->evbuf_body); + } + + if (by_stream) + { + hf_private->is_setup_by_stream = true; + } + + hf_private->evbuf_body = evbuffer_new(); + return 0; +} + +int hf_ops_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data) +{ + struct http_half_private * hf_private = to_hf_private(half); + struct evbuffer * __tmp_output_buffer = evbuffer_new(); + int ret = 0; + + /* Need to compress output */ + if (hf_private->cv_compress_object) + { + ret = hf_content_compress_write(hf_private->cv_compress_object, data, sz_data, __tmp_output_buffer, 0); + } + else + { + ret = evbuffer_add(__tmp_output_buffer, data, sz_data); + } + + if (ret < 0) goto __out; + ret = evbuffer_add_buffer(hf_private->evbuf_body, __tmp_output_buffer); + + /* Have write ctx, should write the body to TCP stream immediately but not to store in evbuf_body */ + if (hf_private->write_ctx) + { + /* Perpare to write data, TODO: write to TCP stream by transfer evbuffer */ + const unsigned char * ptr_write_data = evbuffer_pullup(hf_private->evbuf_body, -1); + size_t sz_write_data = evbuffer_get_length(hf_private->evbuf_body); + + assert(ptr_write_data != NULL && sz_write_data >= 0); + tfe_stream_write_frag(hf_private->write_ctx, ptr_write_data, sz_write_data); + + /* Need to drain all data */ + evbuffer_drain(hf_private->evbuf_body, sz_write_data); + } + +__out: + evbuffer_free(__tmp_output_buffer); + return ret; +} + +int hf_ops_body_end(struct tfe_http_half * half) +{ + struct http_half_private * hf_private = to_hf_private(half); + if (hf_private->write_ctx) + { + tfe_stream_write_frag_end(hf_private->write_ctx); + hf_private->write_ctx = NULL; + } + + hf_private->body_status = STATUS_COMPLETE; + hf_private->message_status = STATUS_COMPLETE; + return 0; +} + void hf_private_destory(struct http_half_private * hf_private) { if (hf_private->parse_object != NULL) @@ -611,6 +682,9 @@ struct tfe_http_half_ops __http_half_ops = .ops_http_allow_write = hf_ops_allow_write, .ops_http_field_iterate = hf_ops_field_iterate, .ops_append_body = hf_ops_append_body, + .ops_body_begin = NULL, + .ops_body_data = NULL, + .ops_body_end = NULL, .ops_free = hf_ops_free }; diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index 481692a..f453343 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -105,8 +105,7 @@ void __get_http_request_header_verify_helper(struct http_half_private * hf_priva /* If-None-Match */ hdr_value = tfe_http_field_iterate(hf_public, &__iterator, &field_name); - EXPECT_EQ(field_name.field_id, TFE_HTTP_UNKNOWN_FIELD); - EXPECT_STREQ(field_name.field_name, "If-None-Match"); + EXPECT_EQ(field_name.field_id, TFE_HTTP_IF_NONE_MATCH); EXPECT_STREQ(hdr_value, "\"023aeae5eafc12082067c36031888adb3bafa797\""); } @@ -344,8 +343,7 @@ void __http_post_header_verify_helper(struct http_half_private * hf_private) /* Accept */ hdr_value = tfe_http_field_iterate(hf_public, &__iterator, &field_name); - EXPECT_EQ(field_name.field_id, TFE_HTTP_UNKNOWN_FIELD); - EXPECT_STREQ(field_name.field_name, "Cache-Control"); + EXPECT_EQ(field_name.field_id, TFE_HTTP_CACHE_CONTROL); EXPECT_STREQ(hdr_value, "no-cache"); } @@ -1490,6 +1488,16 @@ void tfe_stream_resume(const struct tfe_stream * stream) return; } +int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) +{ + return 0; +} + +void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) +{ + return; +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv); diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index fe4ba7d..fc52072 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -143,7 +143,6 @@ add_library(wiredLB SHARED IMPORTED GLOBAL) set_property(TARGET wiredLB PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libWiredLB.so) set_property(TARGET wiredLB PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) - add_library(maatframe SHARED IMPORTED GLOBAL) set_property(TARGET maatframe PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libmaatframe.so) set_property(TARGET maatframe PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR})