From af9b36eecb2310558e426d427bc4bf6a6eb1e3cd Mon Sep 17 00:00:00 2001 From: Lu Qiuwen Date: Wed, 17 Oct 2018 16:45:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0HTTP=20Early-Answer=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=8C=E7=94=A8=E6=88=B7=E5=9C=A8=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E4=BE=A7=E8=AE=BE=E7=BD=AE=E5=BA=94=E7=AD=94=E4=BE=A7=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=97=B6=EF=BC=8C=E7=AB=8B=E5=8D=B3=E5=8F=91=E9=80=81?= =?UTF-8?q?=E5=88=B0=E7=BA=BF=E8=B7=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugin/protocol/http/src/http_entry.cpp | 134 +++++++++++++++++------- 1 file changed, 99 insertions(+), 35 deletions(-) diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 8ecbd57..faa5bae 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -27,7 +27,8 @@ void http_plugin_deinit(struct tfe_proxy * proxy) int http_connection_entry_open(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_conn_dir dir, void ** pme) { - struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1); + struct http_connection_private * ht_conn = ALLOC( + struct http_connection_private, 1); TAILQ_INIT(&ht_conn->hs_private_list); TAILQ_INIT(&ht_conn->hs_private_orphan_list); ht_conn->stream = stream; @@ -92,7 +93,7 @@ static int __write_http_half_to_line(const struct tfe_stream * stream, /* Still need to send data(not call body_end()), need to write frag start * Otherwise, means the body is ready, send out directly without frag */ - if(hf_private->message_status < STATUS_COMPLETE) + if (hf_private->message_status < STATUS_COMPLETE) { hf_private->write_ctx = tfe_stream_write_frag_start(stream, dir); assert(hf_private->write_ctx != NULL); @@ -115,16 +116,80 @@ static int __write_http_half_to_line(const struct tfe_stream * stream, return 0; } +int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private, + struct http_half_private * hf_private_req_in, bool & need_to_close_the_session) +{ + struct http_connection_private * hc_private = hs_private->hc_private; + int ret = 0; + + /* Cannot setup user request and user response simultaneously */ + assert(!(hs_private->hf_private_req_user != NULL && hs_private->hf_private_resp_user != NULL)); + + /* User's construct request */ + if (hs_private->hf_private_req_user != NULL) + { + struct http_half_private * hf_private_req_user = hs_private->hf_private_req_user; + ret = __write_http_half_to_line(stream, CONN_DIR_UPSTREAM, hf_private_req_user); + if (unlikely(ret < 0)) + { + TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".); + return ret; + } + + if (hf_private_req_user->message_status == STATUS_COMPLETE) + { + hf_private_destory(hf_private_req_user); + hs_private->hf_private_resp_user = NULL; + } + + assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA); + hf_private_req_in->stream_action = ACTION_DROP_DATA; + } + + /* User's construct response, send before real response arrived. */ + else if (hs_private->hf_private_resp_user != NULL) + { + struct http_half_private * hf_private_resp_user = hs_private->hf_private_resp_user; + ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user); + if (unlikely(ret < 0)) + { + TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".); + goto __errout; + } + + if (hf_private_resp_user->message_status == STATUS_COMPLETE) + { + hf_private_destory(hf_private_resp_user); + hs_private->hf_private_resp_user = NULL; + need_to_close_the_session = true; + } + else + { + TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); + TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); + } + + assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA); + hf_private_req_in->stream_action = ACTION_DROP_DATA; + } + + return 0; + +__errout: + return -1; +} + enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream, struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) { struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private); - struct http_half_private * hf_private_req_user; -/* tfe_hexdump2file(stderr, __FUNCTION__, data, (unsigned int)len); */ + bool __need_to_close_the_session = false; int ret = 0; - size_t __action_byptes; + + enum tfe_stream_action __action = ACTION_FORWARD_DATA; + size_t __action_args = 0; /* There is no available in session list, * that indicate all HTTP request has corresponding response, @@ -137,7 +202,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea hf_private_set_session(hf_private_req_in, hs_private); /* Closure, catch stream, session and thread_id */ - struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure * __closure = ALLOC( + struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -166,17 +232,14 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea hs_private->suspend_tag_effective = false; hs_private->suspend_tag_user = false; - if (hf_private_req_in->is_user_stream_action_set) + if (__on_request_handle_user_req_or_resp(stream, hs_private, + hf_private_req_in, __need_to_close_the_session) < 0) { - hf_private_req_in->stream_action = hf_private_req_in->user_stream_action; - } - else - { - hf_private_req_in->stream_action = ACTION_FORWARD_DATA; + goto __errout; } /* Ignore parse the content which is nullptr. */ - return hf_private_req_in->stream_action; + goto __out; } /* Parse the content, the data which in defered state has been ignored. */ @@ -216,38 +279,37 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea } /* Touch a boundary, such as the end of HTTP headers, bodys, et al. */ - __action_byptes = hf_private_req_in->parse_cursor; + __action_args = hf_private_req_in->parse_cursor; hf_private_req_in->parse_cursor = 0; - hf_private_req_user = hs_private->hf_private_req_user; - if (hf_private_req_user != NULL && hf_private_req_user->message_status == STATUS_COMPLETE) + if (__on_request_handle_user_req_or_resp(stream, hs_private, + hf_private_req_in, __need_to_close_the_session) < 0) { - /* Construct, and write response immediately */ - hf_private_construct(hf_private_req_user); - size_t __to_write_len = evbuffer_get_length(hf_private_req_user->evbuf_raw); - unsigned char * __to_write = evbuffer_pullup(hf_private_req_user->evbuf_raw, __to_write_len); - - /* Write the data to stream, UPSTREAM is the incoming direction for response */ - ret = tfe_stream_write(stream, CONN_DIR_UPSTREAM, __to_write, __to_write_len); - if (unlikely(ret < 0)) { assert(0); } - - hf_private_destory(hf_private_req_user); - hs_private->hf_private_resp_user = NULL; + goto __errout; } if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA) { - tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes)); - return ACTION_FORWARD_DATA; + tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_args, sizeof(__action_args)); + __action = ACTION_FORWARD_DATA; } if (hf_private_req_in->stream_action == ACTION_DROP_DATA) { - tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes)); - return ACTION_DROP_DATA; + tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_args, sizeof(__action_args)); + __action = ACTION_DROP_DATA; } - goto __errout; +__out: + /* There is nothing for this session, close the session */ + if (__need_to_close_the_session) + { + http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); + TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); + hs_private_destory(hs_private); + } + + return __action; __errout: tfe_stream_detach(stream); @@ -286,7 +348,8 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre hf_private_set_session(hf_private_resp_in, hs_private); /* Closure, catch stream, session and thread_id */ - struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure * __closure = ALLOC( + struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -363,7 +426,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); } - /* Nothing to do, everything is over, destroy the session */ + /* Nothing to do, everything is over, destroy the session */ else { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); @@ -468,4 +531,5 @@ static struct tfe_plugin __http_plugin_info = .on_close = http_connection_entry_close }; -TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info) +TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info +)