增加HTTP Early-Answer功能,用户在请求侧设置应答侧数据时,立即发送到线路。
This commit is contained in:
@@ -27,7 +27,8 @@ void http_plugin_deinit(struct tfe_proxy * proxy)
|
|||||||
int http_connection_entry_open(const struct tfe_stream * stream, unsigned int thread_id,
|
int http_connection_entry_open(const struct tfe_stream * stream, unsigned int thread_id,
|
||||||
enum tfe_conn_dir dir, void ** pme)
|
enum tfe_conn_dir dir, void ** pme)
|
||||||
{
|
{
|
||||||
struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1);
|
struct http_connection_private * ht_conn = ALLOC(
|
||||||
|
struct http_connection_private, 1);
|
||||||
TAILQ_INIT(&ht_conn->hs_private_list);
|
TAILQ_INIT(&ht_conn->hs_private_list);
|
||||||
TAILQ_INIT(&ht_conn->hs_private_orphan_list);
|
TAILQ_INIT(&ht_conn->hs_private_orphan_list);
|
||||||
ht_conn->stream = stream;
|
ht_conn->stream = stream;
|
||||||
@@ -92,7 +93,7 @@ static int __write_http_half_to_line(const struct tfe_stream * stream,
|
|||||||
|
|
||||||
/* Still need to send data(not call body_end()), need to write frag start
|
/* Still need to send data(not call body_end()), need to write frag start
|
||||||
* Otherwise, means the body is ready, send out directly without frag */
|
* Otherwise, means the body is ready, send out directly without frag */
|
||||||
if(hf_private->message_status < STATUS_COMPLETE)
|
if (hf_private->message_status < STATUS_COMPLETE)
|
||||||
{
|
{
|
||||||
hf_private->write_ctx = tfe_stream_write_frag_start(stream, dir);
|
hf_private->write_ctx = tfe_stream_write_frag_start(stream, dir);
|
||||||
assert(hf_private->write_ctx != NULL);
|
assert(hf_private->write_ctx != NULL);
|
||||||
@@ -115,16 +116,80 @@ static int __write_http_half_to_line(const struct tfe_stream * stream,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private,
|
||||||
|
struct http_half_private * hf_private_req_in, bool & need_to_close_the_session)
|
||||||
|
{
|
||||||
|
struct http_connection_private * hc_private = hs_private->hc_private;
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
/* Cannot setup user request and user response simultaneously */
|
||||||
|
assert(!(hs_private->hf_private_req_user != NULL && hs_private->hf_private_resp_user != NULL));
|
||||||
|
|
||||||
|
/* User's construct request */
|
||||||
|
if (hs_private->hf_private_req_user != NULL)
|
||||||
|
{
|
||||||
|
struct http_half_private * hf_private_req_user = hs_private->hf_private_req_user;
|
||||||
|
ret = __write_http_half_to_line(stream, CONN_DIR_UPSTREAM, hf_private_req_user);
|
||||||
|
if (unlikely(ret < 0))
|
||||||
|
{
|
||||||
|
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hf_private_req_user->message_status == STATUS_COMPLETE)
|
||||||
|
{
|
||||||
|
hf_private_destory(hf_private_req_user);
|
||||||
|
hs_private->hf_private_resp_user = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA);
|
||||||
|
hf_private_req_in->stream_action = ACTION_DROP_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* User's construct response, send before real response arrived. */
|
||||||
|
else if (hs_private->hf_private_resp_user != NULL)
|
||||||
|
{
|
||||||
|
struct http_half_private * hf_private_resp_user = hs_private->hf_private_resp_user;
|
||||||
|
ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user);
|
||||||
|
if (unlikely(ret < 0))
|
||||||
|
{
|
||||||
|
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".);
|
||||||
|
goto __errout;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hf_private_resp_user->message_status == STATUS_COMPLETE)
|
||||||
|
{
|
||||||
|
hf_private_destory(hf_private_resp_user);
|
||||||
|
hs_private->hf_private_resp_user = NULL;
|
||||||
|
need_to_close_the_session = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
|
||||||
|
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(hf_private_req_in->stream_action == ACTION_DEFER_DATA);
|
||||||
|
hf_private_req_in->stream_action = ACTION_DROP_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
__errout:
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream,
|
enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream,
|
||||||
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
|
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
|
||||||
{
|
{
|
||||||
struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list);
|
struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list);
|
||||||
struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private);
|
struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private);
|
||||||
struct http_half_private * hf_private_req_user;
|
|
||||||
|
|
||||||
/* tfe_hexdump2file(stderr, __FUNCTION__, data, (unsigned int)len); */
|
bool __need_to_close_the_session = false;
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
size_t __action_byptes;
|
|
||||||
|
enum tfe_stream_action __action = ACTION_FORWARD_DATA;
|
||||||
|
size_t __action_args = 0;
|
||||||
|
|
||||||
/* There is no available in session list,
|
/* There is no available in session list,
|
||||||
* that indicate all HTTP request has corresponding response,
|
* that indicate all HTTP request has corresponding response,
|
||||||
@@ -137,7 +202,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
|
|||||||
hf_private_set_session(hf_private_req_in, hs_private);
|
hf_private_set_session(hf_private_req_in, hs_private);
|
||||||
|
|
||||||
/* Closure, catch stream, session and thread_id */
|
/* Closure, catch stream, session and thread_id */
|
||||||
struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1);
|
struct user_event_dispatch_closure * __closure = ALLOC(
|
||||||
|
struct user_event_dispatch_closure, 1);
|
||||||
__closure->thread_id = thread_id;
|
__closure->thread_id = thread_id;
|
||||||
__closure->stream = stream;
|
__closure->stream = stream;
|
||||||
__closure->session = to_hs_public(hs_private);
|
__closure->session = to_hs_public(hs_private);
|
||||||
@@ -166,17 +232,14 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
|
|||||||
hs_private->suspend_tag_effective = false;
|
hs_private->suspend_tag_effective = false;
|
||||||
hs_private->suspend_tag_user = false;
|
hs_private->suspend_tag_user = false;
|
||||||
|
|
||||||
if (hf_private_req_in->is_user_stream_action_set)
|
if (__on_request_handle_user_req_or_resp(stream, hs_private,
|
||||||
|
hf_private_req_in, __need_to_close_the_session) < 0)
|
||||||
{
|
{
|
||||||
hf_private_req_in->stream_action = hf_private_req_in->user_stream_action;
|
goto __errout;
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
hf_private_req_in->stream_action = ACTION_FORWARD_DATA;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Ignore parse the content which is nullptr. */
|
/* Ignore parse the content which is nullptr. */
|
||||||
return hf_private_req_in->stream_action;
|
goto __out;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Parse the content, the data which in defered state has been ignored. */
|
/* Parse the content, the data which in defered state has been ignored. */
|
||||||
@@ -216,38 +279,37 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Touch a boundary, such as the end of HTTP headers, bodys, et al. */
|
/* Touch a boundary, such as the end of HTTP headers, bodys, et al. */
|
||||||
__action_byptes = hf_private_req_in->parse_cursor;
|
__action_args = hf_private_req_in->parse_cursor;
|
||||||
hf_private_req_in->parse_cursor = 0;
|
hf_private_req_in->parse_cursor = 0;
|
||||||
|
|
||||||
hf_private_req_user = hs_private->hf_private_req_user;
|
if (__on_request_handle_user_req_or_resp(stream, hs_private,
|
||||||
if (hf_private_req_user != NULL && hf_private_req_user->message_status == STATUS_COMPLETE)
|
hf_private_req_in, __need_to_close_the_session) < 0)
|
||||||
{
|
{
|
||||||
/* Construct, and write response immediately */
|
goto __errout;
|
||||||
hf_private_construct(hf_private_req_user);
|
|
||||||
size_t __to_write_len = evbuffer_get_length(hf_private_req_user->evbuf_raw);
|
|
||||||
unsigned char * __to_write = evbuffer_pullup(hf_private_req_user->evbuf_raw, __to_write_len);
|
|
||||||
|
|
||||||
/* Write the data to stream, UPSTREAM is the incoming direction for response */
|
|
||||||
ret = tfe_stream_write(stream, CONN_DIR_UPSTREAM, __to_write, __to_write_len);
|
|
||||||
if (unlikely(ret < 0)) { assert(0); }
|
|
||||||
|
|
||||||
hf_private_destory(hf_private_req_user);
|
|
||||||
hs_private->hf_private_resp_user = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA)
|
if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA)
|
||||||
{
|
{
|
||||||
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes));
|
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_args, sizeof(__action_args));
|
||||||
return ACTION_FORWARD_DATA;
|
__action = ACTION_FORWARD_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hf_private_req_in->stream_action == ACTION_DROP_DATA)
|
if (hf_private_req_in->stream_action == ACTION_DROP_DATA)
|
||||||
{
|
{
|
||||||
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes));
|
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_args, sizeof(__action_args));
|
||||||
return ACTION_DROP_DATA;
|
__action = ACTION_DROP_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
goto __errout;
|
__out:
|
||||||
|
/* There is nothing for this session, close the session */
|
||||||
|
if (__need_to_close_the_session)
|
||||||
|
{
|
||||||
|
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
|
||||||
|
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
|
||||||
|
hs_private_destory(hs_private);
|
||||||
|
}
|
||||||
|
|
||||||
|
return __action;
|
||||||
|
|
||||||
__errout:
|
__errout:
|
||||||
tfe_stream_detach(stream);
|
tfe_stream_detach(stream);
|
||||||
@@ -286,7 +348,8 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
|
|||||||
hf_private_set_session(hf_private_resp_in, hs_private);
|
hf_private_set_session(hf_private_resp_in, hs_private);
|
||||||
|
|
||||||
/* Closure, catch stream, session and thread_id */
|
/* Closure, catch stream, session and thread_id */
|
||||||
struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1);
|
struct user_event_dispatch_closure * __closure = ALLOC(
|
||||||
|
struct user_event_dispatch_closure, 1);
|
||||||
__closure->thread_id = thread_id;
|
__closure->thread_id = thread_id;
|
||||||
__closure->stream = stream;
|
__closure->stream = stream;
|
||||||
__closure->session = to_hs_public(hs_private);
|
__closure->session = to_hs_public(hs_private);
|
||||||
@@ -363,7 +426,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
|
|||||||
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
|
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Nothing to do, everything is over, destroy the session */
|
/* Nothing to do, everything is over, destroy the session */
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
|
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
|
||||||
@@ -468,4 +531,5 @@ static struct tfe_plugin __http_plugin_info =
|
|||||||
.on_close = http_connection_entry_close
|
.on_close = http_connection_entry_close
|
||||||
};
|
};
|
||||||
|
|
||||||
TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info)
|
TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user