diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 85e0087..e99583d 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -157,3 +157,4 @@ static inline unsigned char* tfe_hexdump(unsigned char *dst, unsigned char *src, return dst; } +const char * tfe_version(); diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 6038500..488dcc1 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -66,7 +66,7 @@ static __attribute__((__used__)) const char * TFE_VERSION_version_UNKNOWN = NULL /* VERSION STRING */ #ifdef TFE_GIT_VERSION -static __attribute__((__used__)) const char * tfe_version = TFE_GIT_VERSION; +static __attribute__((__used__)) const char * __tfe_version = TFE_GIT_VERSION; #else static __attribute__((__used__)) const char * tfe_version = "Unknown"; #endif @@ -354,6 +354,12 @@ int main(int argc, char *argv[]) return 0; } + +const char * tfe_version() +{ + return __tfe_version; +} + unsigned int tfe_proxy_get_work_thread_count(void) { return g_default_proxy->nr_work_threads; diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index c73881f..027fd30 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -141,9 +141,17 @@ void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) _stream->is_suspended = true; _stream->suspended_by = by; + fprintf(stderr, "---- tfe-stream-suspend ----, %p, by = %d\n", _stream, by); + /* disable all events */ - bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); - bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); + int ret = 0; + ret = bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); + assert(ret == 0); + + ret = bufferevent_disable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); + assert(ret == 0); + + (void)ret; } void tfe_stream_resume(const struct tfe_stream * stream) @@ -154,6 +162,8 @@ void tfe_stream_resume(const struct tfe_stream * stream) bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); + assert(_stream->is_suspended == true); + fprintf(stderr, "---- tfe-stream-resume ----, %p\n", _stream); if (_stream->suspended_by == CONN_DIR_DOWNSTREAM) { bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); @@ -538,6 +548,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) break; } + fprintf(stderr, "----- action = %d, bev = %p, input bev: %llu\n", action_final, bev, evbuffer_get_length(inbuf)); + #if 0 if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) { @@ -640,7 +652,6 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * __stream_bev_readcb(bev, arg); } -// fprintf(stderr, "---- eventcb ----, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); goto __close_connection; } @@ -678,6 +689,7 @@ __close_connection: return; __call_plugin_close: + fprintf(stderr, "---- eventcb ---- call close, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); call_plugin_close(_stream); tfe_stream_destory(_stream); } diff --git a/plugin/business/pangu-http/src/pangu_http.cpp b/plugin/business/pangu-http/src/pangu_http.cpp index cc3771b..9499cd5 100644 --- a/plugin/business/pangu-http/src/pangu_http.cpp +++ b/plugin/business/pangu-http/src/pangu_http.cpp @@ -907,6 +907,7 @@ int make_revalidate_request(const struct tfe_stream * stream, const struct tfe_h if(events & EV_HTTP_REQ_END && ctx->cache_revalidate_req) { ctx->cache_revalidate_req=NULL; + return RESUMED_CB_NO_MORE_CALLS; } return RESUMED_CB_MORE_CALLS; } @@ -1001,12 +1002,12 @@ static void cache_pending_on_succ(future_result_t * result, void * user) ctx->f_cache_pending=NULL; if(meta==NULL) { - ctx->pending_result==PENDING_RESULT_MISS; + ctx->pending_result = PENDING_RESULT_MISS; return; } if(!(meta->etag && meta->last_modified)) { - ctx->pending_result==PENDING_RESULT_FOBIDDEN; + ctx->pending_result = PENDING_RESULT_FOBIDDEN; return; } ctx->pending_result=PENDING_RESULT_REVALIDATE; @@ -1058,6 +1059,7 @@ void cache_pending(const struct tfe_http_session * session, unsigned int thread_ { case PENDING_RESULT_REVALIDATE: ctx->ref_session=tfe_http_session_allow_write(session); + assert(ctx->ref_session != NULL); tfe_http_session_suspend(ctx->ref_session); break; case PENDING_RESULT_ALLOWED: diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index a20b2d7..7354145 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -432,7 +432,7 @@ static void cache_query_meta_on_succ(future_result_t * result, void * user) struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); ctx->ref_tango_cache_result=_result; - time_t cache_last_modified_time=0, request_last_modified_time=0; + switch(_result->type) { case RESULT_TYPE_HEADER: @@ -690,9 +690,14 @@ void web_cache_update(struct cache_update_context* ctx, const unsigned char * bo } void web_cache_update_end(struct cache_update_context* ctx) { + fprintf(stderr, "------- web_cache_update_end , %p\n", ctx); + tango_cache_update_end(ctx->write_ctx); + ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); + + ctx->write_ctx = NULL; + ctx->ref_cache_handle = NULL; free(ctx); - ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); return; } diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 56fa65f..e220839 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -41,18 +41,20 @@ struct http_session_private struct http_half_private * hf_private_req_user; /* USER SETUP RESPONSE HALF */ struct http_half_private * hf_private_resp_user; - /* SUSPEND TAG */ - bool suspend_tag_user; /* SUSPEND EVENT */ tfe_http_event suspend_event; /* SUSPEND TAG EFFECTIVE */ bool suspend_tag_effective; - /* STREAM WRITE EFFECTIVE */ - bool stream_write_tag_effective; /* RELEASE LOCK, when the tag is zero, the session can be destroyed */ int release_lock; /* thread id */ unsigned int thread_id; + + /* SUSPEND COUNTER, ONLY FOR DEBUG AND LOG */ +#ifndef NDEBUG + int suspend_counter; +#endif + bool in_gc_queue; }; struct http_connection_private diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 8f42893..e496607 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -309,13 +309,19 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea /* The session is suspended, and to resume */ if (hs_private->suspend_tag_effective) { + enum tfe_http_event __backup_event = hs_private->suspend_event; + + /* Clean up suspend tag, we can support user's call suspend in this callback */ + hs_private->suspend_event = (enum tfe_http_event) 0; + hs_private->suspend_tag_effective = false; + hs_private->suspend_counter++; + + /* Call user callback, tell user we resume from suspend */ hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, hf_private_req_in->event_cb_user); - hs_private->suspend_event = (enum tfe_http_event) 0; - hs_private->suspend_tag_effective = false; - hs_private->suspend_tag_user = false; +#if 0 if (__on_request_handle_user_req_or_resp(stream, hs_private, hf_private_req_in, __need_to_close_the_session) < 0) { @@ -333,6 +339,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea /* Ignore parse the content which is nullptr. */ goto __boundary; +#endif } /* Parse the content, the data which in defered state has been ignored. */ @@ -351,13 +358,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea } /* Suspend */ - if (hs_private->suspend_tag_user) + if (hs_private->suspend_tag_effective) { - assert(!hs_private->suspend_tag_effective); - hs_private->suspend_tag_effective = true; - - /* Suspend TCP stream */ - tfe_stream_suspend(stream, CONN_DIR_DOWNSTREAM); return ACTION_DEFER_DATA; } diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index a85374b..7f04128 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -348,7 +348,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) /* user's suspend tag is set, which indicate that the way to handle request/response * cannot be determinate at now, need to defer */ - else if (hs_private && hs_private->suspend_tag_user) + else if (hs_private && hs_private->suspend_tag_effective) { /* Pause parser, prevent to parse request/response body, * The body should be parsed after resume() */ @@ -362,7 +362,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) assert(hf_private->stream_action == ACTION_DEFER_DATA); } - /* Otherwise, forward the request/response */ + /* Otherwise, forward the request/response */ else { hf_private->stream_action = ACTION_FORWARD_DATA; @@ -615,9 +615,8 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) } hf_private->content_encoding = HTTP_ACCEPT_ENCODING_NONE; + hf_private->message_status = STATUS_READING; hf_private->is_setup_by_stream = true; - - hs_private->stream_write_tag_effective = true; hs_private->release_lock++; } @@ -679,8 +678,7 @@ int hf_ops_body_end(struct tfe_http_half * half) if(hf_private->is_setup_by_stream) { - hs_private->stream_write_tag_effective = true; - hs_private->release_lock++; + hs_private->release_lock--; } printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); @@ -802,7 +800,7 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char * resume it to normal status */ if (__is_paused) { - hf_private->parse_cursor += sz_parsed + 1; + hf_private->parse_cursor += sz_parsed; return 1; } @@ -829,8 +827,15 @@ void hs_ops_drop(struct tfe_http_session * session) void hs_ops_suspend(struct tfe_http_session * session) { struct http_session_private * hs_private = to_hs_private(session); - hs_private->suspend_tag_user = true; + struct http_connection_private * hc_private = hs_private->hc_private; + hs_private->release_lock++; + hs_private->suspend_counter++; + hs_private->suspend_tag_effective = true; + + tfe_stream_suspend(hc_private->stream, CONN_DIR_DOWNSTREAM); + fprintf(stderr, "---- suspend ----, url = %s, counter = %d\n", + hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter); } void hs_ops_resume(struct tfe_http_session * session) @@ -838,9 +843,16 @@ void hs_ops_resume(struct tfe_http_session * session) struct http_session_private * hs_private = to_hs_private(session); struct http_connection_private * hc_private = hs_private->hc_private; - hs_private->suspend_tag_user = false; + assert(!hs_private->in_gc_queue); + hs_private->release_lock--; - tfe_stream_resume(hc_private->stream); + if (hs_private->suspend_tag_effective) + { + tfe_stream_resume(hc_private->stream); + } + + fprintf(stderr, "---- resume ----, url = %s, counter = %d\n", + hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter); } // TODO: change the return type to int, there is something happend where -1 returned. @@ -865,6 +877,8 @@ void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half assert(hs_private->hf_private_req_user == NULL); hs_private->hf_private_req_user = hf_user_private; + hs_private->hf_private_req_user->message_status = STATUS_COMPLETE; + hs_private->hf_private_req_user->session = hs_private; } void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) @@ -897,7 +911,8 @@ void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half assert(hs_private->hf_private_resp_user == NULL); hs_private->hf_private_resp_user = hf_private; - hf_private->session = hs_private; + hs_private->hf_private_resp_user->message_status = STATUS_COMPLETE; + hs_private->hf_private_resp_user->session = hs_private; } struct tfe_http_half * hs_ops_request_create(struct tfe_http_session * session, @@ -1042,6 +1057,8 @@ void hf_private_construct(struct http_half_private * hf_private) evbuffer_add_printf(hf_private->evbuf_raw, "%s: %s\r\n", str_field, str_value); } + /* Trace Tags */ + evbuffer_add_printf(hf_private->evbuf_raw, "%s: tfe/%s\r\n", "X-TG-Construct-By", tfe_version()); /* delimitor between header and body */ evbuffer_add_printf(hf_private->evbuf_raw, "\r\n"); @@ -1154,8 +1171,20 @@ void hs_private_destroy(struct http_session_private * hs_private) void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list) { - if (hs_private->release_lock > 0) TAILQ_INSERT_TAIL(gc_list, hs_private, next); - else return hs_private_destroy(hs_private); + if (hs_private->release_lock > 0) + { + TAILQ_INSERT_TAIL(gc_list, hs_private, next); + hs_private->in_gc_queue = true; + } + else + { + return hs_private_destroy(hs_private); + } +} + +bool hs_private_can_destroy(struct http_session_private * hs_private) +{ + return hs_private->release_lock <= 0; } void hs_private_hf_private_set(struct http_session_private * hs_private, diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index f453343..5d43c09 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -348,7 +348,7 @@ void __http_post_header_verify_helper(struct http_half_private * hf_private) } struct __post_http_req_ctx -{ + { unsigned int calling_seq{0}; size_t total_length{0}; @@ -1488,6 +1488,11 @@ void tfe_stream_resume(const struct tfe_stream * stream) return; } +void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) +{ + return; +} + int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { return 0; @@ -1498,6 +1503,11 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) return; } +const char * tfe_version() +{ + return NULL; +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv);