增加HTTP流式构造Body的接口及实现并修正HTTP单元测试用例

This commit is contained in:
Lu Qiuwen
2018-10-16 10:45:18 +08:00
parent 619f004c49
commit 2bc366fb13
8 changed files with 202 additions and 44 deletions

View File

@@ -52,6 +52,53 @@ static int __user_event_dispatch(struct http_half_private * hf_private,
return 0;
}
static int __write_http_half(struct http_half_private * hf_private,
const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
/* Construct, and write response immediately */
hf_private_construct(hf_private);
size_t __to_write_len = evbuffer_get_length(hf_private->evbuf_raw);
unsigned char * __to_write = evbuffer_pullup(hf_private->evbuf_raw, __to_write_len);
/* Write the data to stream, UPSTREAM is the incoming direction for response */
int ret = tfe_stream_write(stream, dir, __to_write, __to_write_len);
if (unlikely(ret < 0)) { assert(0); }
evbuffer_drain(hf_private->evbuf_raw, __to_write_len);
return ret;
}
static int __write_http_half_to_line(const struct tfe_stream * stream,
enum tfe_conn_dir dir, struct http_half_private * hf_private)
{
int ret = 0;
if (hf_private->is_setup_by_stream)
{
/* By stream, first time to write http request/response, construct the header */
if (hf_private->write_ctx == NULL)
{
ret = __write_http_half(hf_private, stream, dir);
if (unlikely(ret < 0)) return ret;
/* Alloc stream write ctx */
hf_private->write_ctx = tfe_stream_write_frag_start(stream, dir);
if (unlikely(hf_private->write_ctx == NULL)) return -1;
}
}
else
{
/* Not stream, need to be complete, then construct all data and send out */
if (hf_private->message_status == STATUS_COMPLETE)
{
ret = __write_http_half(hf_private, stream, dir);
if (unlikely(ret < 0)) return ret;
}
}
return 0;
}
enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream,
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
{
@@ -59,7 +106,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private);
struct http_half_private * hf_private_req_user;
/* tfe_hexdump2file(stderr, __FUNCTION__, data, (unsigned int)len); */
/* tfe_hexdump2file(stderr, __FUNCTION__, data, (unsigned int)len); */
int ret = 0;
size_t __action_byptes;
@@ -99,7 +146,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0,
hf_private_req_in->event_cb_user);
hs_private->suspend_event = (enum tfe_http_event)0;
hs_private->suspend_event = (enum tfe_http_event) 0;
hs_private->suspend_tag_effective = false;
hs_private->suspend_tag_user = false;
@@ -197,6 +244,7 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
struct http_session_private * hs_private = TAILQ_FIRST(&hc_private->hs_private_list);
struct http_half_private * hf_private_resp_in;
struct http_half_private * hf_private_resp_user;
enum tfe_stream_action __stream_action;
int ret = 0;
size_t __action_byptes;
@@ -264,38 +312,52 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
}
hf_private_resp_user = hs_private->hf_private_resp_user;
if (hf_private_resp_user != NULL && hf_private_resp_user->message_status == STATUS_COMPLETE)
if (hf_private_resp_user != NULL)
{
/* Construct, and write response immediately */
hf_private_construct(hf_private_resp_user);
size_t __to_write_len = evbuffer_get_length(hf_private_resp_user->evbuf_raw);
unsigned char * __to_write = evbuffer_pullup(hf_private_resp_user->evbuf_raw, __to_write_len);
ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user);
if (unlikely(ret < 0))
{
TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP response setup by user".);
goto __errout;
}
/* Write the data to stream, UPSTREAM is the incoming direction for response */
ret = tfe_stream_write(stream, CONN_DIR_DOWNSTREAM, __to_write, __to_write_len);
if (unlikely(ret < 0)) { assert(0); }
hf_private_destory(hf_private_resp_user);
hs_private->hf_private_resp_user = NULL;
}
if (hf_private_resp_in->message_status == STATUS_COMPLETE)
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private_destory(hs_private);
if (hf_private_resp_user->message_status == STATUS_COMPLETE)
{
hf_private_destory(hf_private_resp_user);
hs_private->hf_private_resp_user = NULL;
}
}
__stream_action = hf_private_resp_in->stream_action;
__action_byptes = hf_private_resp_in->parse_cursor;
hf_private_resp_in->parse_cursor = 0;
if (hf_private_resp_in->stream_action == ACTION_FORWARD_DATA)
if (hf_private_resp_in->message_status == STATUS_COMPLETE)
{
/* Still sending user's response, should not destroy the session,
* move the session to orphan list, then we can handle the next session's response */
if (hs_private->hf_private_resp_user != NULL)
{
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next);
}
/* Nothing to do, everything is over, destroy the session */
else
{
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next);
hs_private_destory(hs_private);
}
}
if (__stream_action == ACTION_FORWARD_DATA)
{
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes));
return ACTION_FORWARD_DATA;
}
if (hf_private_resp_in->stream_action == ACTION_DROP_DATA)
if (__stream_action == ACTION_DROP_DATA)
{
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes));
return ACTION_DROP_DATA;
@@ -360,7 +422,7 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int
*pme = NULL;
/* Delete all live sessions */
while(true)
while (true)
{
struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list);
if (hs_private_iter == NULL) break;

View File

@@ -333,8 +333,8 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
hf_private->stream_action = hf_private->user_stream_action;
}
/* user's suspend tag is set, which indicate that the way to handle request/response
* cannot be determinate at now, need to defer */
/* user's suspend tag is set, which indicate that the way to handle request/response
* cannot be determinate at now, need to defer */
else if (hs_private && hs_private->suspend_tag_user)
{
/* Pause parser, prevent to parse request/response body,
@@ -349,7 +349,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
assert(hf_private->stream_action == ACTION_DEFER_DATA);
}
/* Otherwise, forward the request/response */
/* Otherwise, forward the request/response */
else
{
hf_private->stream_action = ACTION_FORWARD_DATA;
@@ -584,6 +584,77 @@ int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, in
return ret;
}
int hf_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct http_half_private * hf_private = to_hf_private(half);
assert(hf_private->evbuf_body == NULL);
if (hf_private->evbuf_body != NULL)
{
evbuffer_free(hf_private->evbuf_body);
}
if (by_stream)
{
hf_private->is_setup_by_stream = true;
}
hf_private->evbuf_body = evbuffer_new();
return 0;
}
int hf_ops_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data)
{
struct http_half_private * hf_private = to_hf_private(half);
struct evbuffer * __tmp_output_buffer = evbuffer_new();
int ret = 0;
/* Need to compress output */
if (hf_private->cv_compress_object)
{
ret = hf_content_compress_write(hf_private->cv_compress_object, data, sz_data, __tmp_output_buffer, 0);
}
else
{
ret = evbuffer_add(__tmp_output_buffer, data, sz_data);
}
if (ret < 0) goto __out;
ret = evbuffer_add_buffer(hf_private->evbuf_body, __tmp_output_buffer);
/* Have write ctx, should write the body to TCP stream immediately but not to store in evbuf_body */
if (hf_private->write_ctx)
{
/* Perpare to write data, TODO: write to TCP stream by transfer evbuffer */
const unsigned char * ptr_write_data = evbuffer_pullup(hf_private->evbuf_body, -1);
size_t sz_write_data = evbuffer_get_length(hf_private->evbuf_body);
assert(ptr_write_data != NULL && sz_write_data >= 0);
tfe_stream_write_frag(hf_private->write_ctx, ptr_write_data, sz_write_data);
/* Need to drain all data */
evbuffer_drain(hf_private->evbuf_body, sz_write_data);
}
__out:
evbuffer_free(__tmp_output_buffer);
return ret;
}
int hf_ops_body_end(struct tfe_http_half * half)
{
struct http_half_private * hf_private = to_hf_private(half);
if (hf_private->write_ctx)
{
tfe_stream_write_frag_end(hf_private->write_ctx);
hf_private->write_ctx = NULL;
}
hf_private->body_status = STATUS_COMPLETE;
hf_private->message_status = STATUS_COMPLETE;
return 0;
}
void hf_private_destory(struct http_half_private * hf_private)
{
if (hf_private->parse_object != NULL)
@@ -611,6 +682,9 @@ struct tfe_http_half_ops __http_half_ops =
.ops_http_allow_write = hf_ops_allow_write,
.ops_http_field_iterate = hf_ops_field_iterate,
.ops_append_body = hf_ops_append_body,
.ops_body_begin = NULL,
.ops_body_data = NULL,
.ops_body_end = NULL,
.ops_free = hf_ops_free
};