修正suspend/resume语义实现的若干问题,增加自行构建request/response的header标志

This commit is contained in:
Lu Qiuwen
2018-10-26 20:30:06 +08:00
parent d3d34355ef
commit cf64f01f7f
9 changed files with 104 additions and 35 deletions

View File

@@ -157,3 +157,4 @@ static inline unsigned char* tfe_hexdump(unsigned char *dst, unsigned char *src,
return dst; return dst;
} }
const char * tfe_version();

View File

@@ -66,7 +66,7 @@ static __attribute__((__used__)) const char * TFE_VERSION_version_UNKNOWN = NULL
/* VERSION STRING */ /* VERSION STRING */
#ifdef TFE_GIT_VERSION #ifdef TFE_GIT_VERSION
static __attribute__((__used__)) const char * tfe_version = TFE_GIT_VERSION; static __attribute__((__used__)) const char * __tfe_version = TFE_GIT_VERSION;
#else #else
static __attribute__((__used__)) const char * tfe_version = "Unknown"; static __attribute__((__used__)) const char * tfe_version = "Unknown";
#endif #endif
@@ -354,6 +354,12 @@ int main(int argc, char *argv[])
return 0; return 0;
} }
const char * tfe_version()
{
return __tfe_version;
}
unsigned int tfe_proxy_get_work_thread_count(void) unsigned int tfe_proxy_get_work_thread_count(void)
{ {
return g_default_proxy->nr_work_threads; return g_default_proxy->nr_work_threads;

View File

@@ -141,9 +141,17 @@ void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by)
_stream->is_suspended = true; _stream->is_suspended = true;
_stream->suspended_by = by; _stream->suspended_by = by;
fprintf(stderr, "---- tfe-stream-suspend ----, %p, by = %d\n", _stream, by);
/* disable all events */ /* disable all events */
bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); int ret = 0;
bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); ret = bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
assert(ret == 0);
ret = bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
assert(ret == 0);
(void)ret;
} }
void tfe_stream_resume(const struct tfe_stream * stream) void tfe_stream_resume(const struct tfe_stream * stream)
@@ -154,6 +162,8 @@ void tfe_stream_resume(const struct tfe_stream * stream)
bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE);
assert(_stream->is_suspended == true);
fprintf(stderr, "---- tfe-stream-resume ----, %p\n", _stream);
if (_stream->suspended_by == CONN_DIR_DOWNSTREAM) if (_stream->suspended_by == CONN_DIR_DOWNSTREAM)
{ {
bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
@@ -538,6 +548,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
break; break;
} }
fprintf(stderr, "----- action = %d, bev = %p, input bev: %llu\n", action_final, bev, evbuffer_get_length(inbuf));
#if 0 #if 0
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
{ {
@@ -640,7 +652,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
__stream_bev_readcb(bev, arg); __stream_bev_readcb(bev, arg);
} }
// fprintf(stderr, "---- eventcb ----, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir);
goto __close_connection; goto __close_connection;
} }
@@ -678,6 +689,7 @@ __close_connection:
return; return;
__call_plugin_close: __call_plugin_close:
fprintf(stderr, "---- eventcb ---- call close, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir);
call_plugin_close(_stream); call_plugin_close(_stream);
tfe_stream_destory(_stream); tfe_stream_destory(_stream);
} }

View File

@@ -907,6 +907,7 @@ int make_revalidate_request(const struct tfe_stream * stream, const struct tfe_h
if(events & EV_HTTP_REQ_END && ctx->cache_revalidate_req) if(events & EV_HTTP_REQ_END && ctx->cache_revalidate_req)
{ {
ctx->cache_revalidate_req=NULL; ctx->cache_revalidate_req=NULL;
return RESUMED_CB_NO_MORE_CALLS;
} }
return RESUMED_CB_MORE_CALLS; return RESUMED_CB_MORE_CALLS;
} }
@@ -1001,12 +1002,12 @@ static void cache_pending_on_succ(future_result_t * result, void * user)
ctx->f_cache_pending=NULL; ctx->f_cache_pending=NULL;
if(meta==NULL) if(meta==NULL)
{ {
ctx->pending_result==PENDING_RESULT_MISS; ctx->pending_result = PENDING_RESULT_MISS;
return; return;
} }
if(!(meta->etag && meta->last_modified)) if(!(meta->etag && meta->last_modified))
{ {
ctx->pending_result==PENDING_RESULT_FOBIDDEN; ctx->pending_result = PENDING_RESULT_FOBIDDEN;
return; return;
} }
ctx->pending_result=PENDING_RESULT_REVALIDATE; ctx->pending_result=PENDING_RESULT_REVALIDATE;
@@ -1058,6 +1059,7 @@ void cache_pending(const struct tfe_http_session * session, unsigned int thread_
{ {
case PENDING_RESULT_REVALIDATE: case PENDING_RESULT_REVALIDATE:
ctx->ref_session=tfe_http_session_allow_write(session); ctx->ref_session=tfe_http_session_allow_write(session);
assert(ctx->ref_session != NULL);
tfe_http_session_suspend(ctx->ref_session); tfe_http_session_suspend(ctx->ref_session);
break; break;
case PENDING_RESULT_ALLOWED: case PENDING_RESULT_ALLOWED:

View File

@@ -432,7 +432,7 @@ static void cache_query_meta_on_succ(future_result_t * result, void * user)
struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p); struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p);
struct tango_cache_result* _result=tango_cache_read_result(result); struct tango_cache_result* _result=tango_cache_read_result(result);
ctx->ref_tango_cache_result=_result; ctx->ref_tango_cache_result=_result;
time_t cache_last_modified_time=0, request_last_modified_time=0;
switch(_result->type) switch(_result->type)
{ {
case RESULT_TYPE_HEADER: case RESULT_TYPE_HEADER:
@@ -690,9 +690,14 @@ void web_cache_update(struct cache_update_context* ctx, const unsigned char * bo
} }
void web_cache_update_end(struct cache_update_context* ctx) void web_cache_update_end(struct cache_update_context* ctx)
{ {
fprintf(stderr, "------- web_cache_update_end , %p\n", ctx);
tango_cache_update_end(ctx->write_ctx); tango_cache_update_end(ctx->write_ctx);
ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING]));
ctx->write_ctx = NULL;
ctx->ref_cache_handle = NULL;
free(ctx); free(ctx);
ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING]));
return; return;
} }

View File

@@ -41,18 +41,20 @@ struct http_session_private
struct http_half_private * hf_private_req_user; struct http_half_private * hf_private_req_user;
/* USER SETUP RESPONSE HALF */ /* USER SETUP RESPONSE HALF */
struct http_half_private * hf_private_resp_user; struct http_half_private * hf_private_resp_user;
/* SUSPEND TAG */
bool suspend_tag_user;
/* SUSPEND EVENT */ /* SUSPEND EVENT */
tfe_http_event suspend_event; tfe_http_event suspend_event;
/* SUSPEND TAG EFFECTIVE */ /* SUSPEND TAG EFFECTIVE */
bool suspend_tag_effective; bool suspend_tag_effective;
/* STREAM WRITE EFFECTIVE */
bool stream_write_tag_effective;
/* RELEASE LOCK, when the tag is zero, the session can be destroyed */ /* RELEASE LOCK, when the tag is zero, the session can be destroyed */
int release_lock; int release_lock;
/* thread id */ /* thread id */
unsigned int thread_id; unsigned int thread_id;
/* SUSPEND COUNTER, ONLY FOR DEBUG AND LOG */
#ifndef NDEBUG
int suspend_counter;
#endif
bool in_gc_queue;
}; };
struct http_connection_private struct http_connection_private

View File

@@ -309,13 +309,19 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
/* The session is suspended, and to resume */ /* The session is suspended, and to resume */
if (hs_private->suspend_tag_effective) if (hs_private->suspend_tag_effective)
{ {
enum tfe_http_event __backup_event = hs_private->suspend_event;
/* Clean up suspend tag, we can support user's call suspend in this callback */
hs_private->suspend_event = (enum tfe_http_event) 0;
hs_private->suspend_tag_effective = false;
hs_private->suspend_counter++;
/* Call user callback, tell user we resume from suspend */
hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0,
hf_private_req_in->event_cb_user); hf_private_req_in->event_cb_user);
hs_private->suspend_event = (enum tfe_http_event) 0;
hs_private->suspend_tag_effective = false;
hs_private->suspend_tag_user = false;
#if 0
if (__on_request_handle_user_req_or_resp(stream, hs_private, if (__on_request_handle_user_req_or_resp(stream, hs_private,
hf_private_req_in, __need_to_close_the_session) < 0) hf_private_req_in, __need_to_close_the_session) < 0)
{ {
@@ -333,6 +339,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
/* Ignore parse the content which is nullptr. */ /* Ignore parse the content which is nullptr. */
goto __boundary; goto __boundary;
#endif
} }
/* Parse the content, the data which in defered state has been ignored. */ /* Parse the content, the data which in defered state has been ignored. */
@@ -351,13 +358,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
} }
/* Suspend */ /* Suspend */
if (hs_private->suspend_tag_user) if (hs_private->suspend_tag_effective)
{ {
assert(!hs_private->suspend_tag_effective);
hs_private->suspend_tag_effective = true;
/* Suspend TCP stream */
tfe_stream_suspend(stream, CONN_DIR_DOWNSTREAM);
return ACTION_DEFER_DATA; return ACTION_DEFER_DATA;
} }

View File

@@ -348,7 +348,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
/* user's suspend tag is set, which indicate that the way to handle request/response /* user's suspend tag is set, which indicate that the way to handle request/response
* cannot be determinate at now, need to defer */ * cannot be determinate at now, need to defer */
else if (hs_private && hs_private->suspend_tag_user) else if (hs_private && hs_private->suspend_tag_effective)
{ {
/* Pause parser, prevent to parse request/response body, /* Pause parser, prevent to parse request/response body,
* The body should be parsed after resume() */ * The body should be parsed after resume() */
@@ -362,7 +362,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
assert(hf_private->stream_action == ACTION_DEFER_DATA); assert(hf_private->stream_action == ACTION_DEFER_DATA);
} }
/* Otherwise, forward the request/response */ /* Otherwise, forward the request/response */
else else
{ {
hf_private->stream_action = ACTION_FORWARD_DATA; hf_private->stream_action = ACTION_FORWARD_DATA;
@@ -615,9 +615,8 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream)
} }
hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE; hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE;
hf_private->message_status = STATUS_READING;
hf_private->is_setup_by_stream = true; hf_private->is_setup_by_stream = true;
hs_private->stream_write_tag_effective = true;
hs_private->release_lock++; hs_private->release_lock++;
} }
@@ -679,8 +678,7 @@ int hf_ops_body_end(struct tfe_http_half * half)
if(hf_private->is_setup_by_stream) if(hf_private->is_setup_by_stream)
{ {
hs_private->stream_write_tag_effective = true; hs_private->release_lock--;
hs_private->release_lock++;
} }
printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private);
@@ -802,7 +800,7 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char
* resume it to normal status */ * resume it to normal status */
if (__is_paused) if (__is_paused)
{ {
hf_private->parse_cursor += sz_parsed + 1; hf_private->parse_cursor += sz_parsed;
return 1; return 1;
} }
@@ -829,8 +827,15 @@ void hs_ops_drop(struct tfe_http_session * session)
void hs_ops_suspend(struct tfe_http_session * session) void hs_ops_suspend(struct tfe_http_session * session)
{ {
struct http_session_private * hs_private = to_hs_private(session); struct http_session_private * hs_private = to_hs_private(session);
hs_private->suspend_tag_user = true; struct http_connection_private * hc_private = hs_private->hc_private;
hs_private->release_lock++; hs_private->release_lock++;
hs_private->suspend_counter++;
hs_private->suspend_tag_effective = true;
tfe_stream_suspend(hc_private->stream, CONN_DIR_DOWNSTREAM);
fprintf(stderr, "---- suspend ----, url = %s, counter = %d\n",
hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter);
} }
void hs_ops_resume(struct tfe_http_session * session) void hs_ops_resume(struct tfe_http_session * session)
@@ -838,9 +843,16 @@ void hs_ops_resume(struct tfe_http_session * session)
struct http_session_private * hs_private = to_hs_private(session); struct http_session_private * hs_private = to_hs_private(session);
struct http_connection_private * hc_private = hs_private->hc_private; struct http_connection_private * hc_private = hs_private->hc_private;
hs_private->suspend_tag_user = false; assert(!hs_private->in_gc_queue);
hs_private->release_lock--; hs_private->release_lock--;
tfe_stream_resume(hc_private->stream); if (hs_private->suspend_tag_effective)
{
tfe_stream_resume(hc_private->stream);
}
fprintf(stderr, "---- resume ----, url = %s, counter = %d\n",
hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter);
} }
// TODO: change the return type to int, there is something happend where -1 returned. // TODO: change the return type to int, there is something happend where -1 returned.
@@ -865,6 +877,8 @@ void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half
assert(hs_private->hf_private_req_user == NULL); assert(hs_private->hf_private_req_user == NULL);
hs_private->hf_private_req_user = hf_user_private; hs_private->hf_private_req_user = hf_user_private;
hs_private->hf_private_req_user->message_status = STATUS_COMPLETE;
hs_private->hf_private_req_user->session = hs_private;
} }
void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
@@ -897,7 +911,8 @@ void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half
assert(hs_private->hf_private_resp_user == NULL); assert(hs_private->hf_private_resp_user == NULL);
hs_private->hf_private_resp_user = hf_private; hs_private->hf_private_resp_user = hf_private;
hf_private->session = hs_private; hs_private->hf_private_resp_user->message_status = STATUS_COMPLETE;
hs_private->hf_private_resp_user->session = hs_private;
} }
struct tfe_http_half * hs_ops_request_create(struct tfe_http_session * session, struct tfe_http_half * hs_ops_request_create(struct tfe_http_session * session,
@@ -1042,6 +1057,8 @@ void hf_private_construct(struct http_half_private * hf_private)
evbuffer_add_printf(hf_private->evbuf_raw, "%s: %s\r\n", str_field, str_value); evbuffer_add_printf(hf_private->evbuf_raw, "%s: %s\r\n", str_field, str_value);
} }
/* Trace Tags */
evbuffer_add_printf(hf_private->evbuf_raw, "%s: tfe/%s\r\n", "X-TG-Construct-By", tfe_version());
/* delimitor between header and body */ /* delimitor between header and body */
evbuffer_add_printf(hf_private->evbuf_raw, "\r\n"); evbuffer_add_printf(hf_private->evbuf_raw, "\r\n");
@@ -1154,8 +1171,20 @@ void hs_private_destroy(struct http_session_private * hs_private)
void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list) void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list)
{ {
if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next); if (hs_private->release_lock > 0)
else return hs_private_destroy(hs_private); {
TAILQ_INSERT_TAIL(gc_list, hs_private, next);
hs_private->in_gc_queue = true;
}
else
{
return hs_private_destroy(hs_private);
}
}
bool hs_private_can_destroy(struct http_session_private * hs_private)
{
return hs_private->release_lock <= 0;
} }
void hs_private_hf_private_set(struct http_session_private * hs_private, void hs_private_hf_private_set(struct http_session_private * hs_private,

View File

@@ -348,7 +348,7 @@ void __http_post_header_verify_helper(struct http_half_private * hf_private)
} }
struct __post_http_req_ctx struct __post_http_req_ctx
{ {
unsigned int calling_seq{0}; unsigned int calling_seq{0};
size_t total_length{0}; size_t total_length{0};
@@ -1488,6 +1488,11 @@ void tfe_stream_resume(const struct tfe_stream * stream)
return; return;
} }
void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by)
{
return;
}
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{ {
return 0; return 0;
@@ -1498,6 +1503,11 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
return; return;
} }
const char * tfe_version()
{
return NULL;
}
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);