diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index e99583d..8e260b4 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -48,7 +48,7 @@ do { if(!(condition)) { TFE_LOG_ERROR(g_default_logger, fmt, ##__VA_ARGS__); abo #define TFE_STREAM_LOG_DEBUG(stream, fmt, ...) #define TFE_STREAM_LOG_INFO(stream, fmt, ...) -#define TFE_STREAM_LOG_ERROR(stream, fmt, ...) +#define TFE_STREAM_LOG_ERROR(stream, fmt, ...) do { fprintf(stderr, fmt "\n", ##__VA_ARGS__); } while(0) #define TFE_STREAM_TRACE_TAG_INFO(stream, tag, fmt, ...) #define TFE_STREAM_TRACE_TAG_VERBOSE(stream, tag, fmt, ...) diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 027fd30..5811080 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -141,8 +141,6 @@ void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) _stream->is_suspended = true; _stream->suspended_by = by; - fprintf(stderr, "---- tfe-stream-suspend ----, %p, by = %d\n", _stream, by); - /* disable all events */ int ret = 0; ret = bufferevent_disable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); @@ -162,8 +160,6 @@ void tfe_stream_resume(const struct tfe_stream * stream) bufferevent_enable(_stream->conn_upstream->bev, EV_READ | EV_WRITE); bufferevent_enable(_stream->conn_downstream->bev, EV_READ | EV_WRITE); - assert(_stream->is_suspended == true); - fprintf(stderr, "---- tfe-stream-resume ----, %p\n", _stream); if (_stream->suspended_by == CONN_DIR_DOWNSTREAM) { bufferevent_trigger(_stream->conn_downstream->bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); @@ -548,8 +544,6 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) break; } - fprintf(stderr, "----- action = %d, bev = %p, input bev: %llu\n", action_final, bev, evbuffer_get_length(inbuf)); - #if 0 if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) { @@ -689,7 +683,6 @@ __close_connection: return; __call_plugin_close: - fprintf(stderr, "---- eventcb ---- call close, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); call_plugin_close(_stream); tfe_stream_destory(_stream); } diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index e220839..f0a5e6e 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -45,6 +45,10 @@ struct http_session_private tfe_http_event suspend_event; /* SUSPEND TAG EFFECTIVE */ bool suspend_tag_effective; + /* SUSPEND TAG STREAM */ + bool suspend_tag_signal; + /* RESUME SIGNAL */ + bool resume_tag_singal; /* RELEASE LOCK, when the tag is zero, the session can be destroyed */ int release_lock; /* thread id */ diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 86c8414..9b339e0 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -128,6 +128,7 @@ struct http_session_private * hs_private_create(struct http_connection_private * void hs_private_destroy(struct http_session_private * hs_private); void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_private_list * gc_list); +bool hs_private_can_destroy(struct http_session_private * hs_private); void hs_private_hf_private_set(struct http_session_private * hs_private, struct http_half_private * hf, enum tfe_http_direction); diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index e496607..cc21d52 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -39,7 +39,7 @@ static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg TAILQ_FOREACH_SAFE(hs_private_iter, gc_list_hs_private, next, hs_private_titer) { assert(hs_private_iter->release_lock >= 0); - if(hs_private_iter->release_lock > 0) continue; + if (hs_private_iter->release_lock > 0) continue; TAILQ_REMOVE(gc_list_hs_private, hs_private_iter, next); @@ -52,7 +52,6 @@ static void http_plugin_session_gc_cb(evutil_socket_t fd, short what, void * arg } hs_private_destroy(hs_private_iter); - fprintf(stderr, "---- http_plugin_session_gc_cb, close session by GC\n, %p", hs_private_iter); } } @@ -66,6 +65,7 @@ int http_plugin_init(struct tfe_proxy * proxy) #ifndef NDEBUG pthread_mutex_init(&plugin_ctx->lock_list_hs_private[thread_id], NULL); #endif + TAILQ_INIT(&plugin_ctx->gc_list_hs_private[thread_id]); struct event_base * ev_base = tfe_proxy_get_work_thread_evbase(thread_id); struct session_gc_cb_closure * closure = ALLOC(struct session_gc_cb_closure, 1); @@ -189,7 +189,7 @@ static int __write_http_half_to_line(const struct tfe_stream * stream, return 0; } -int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private, +static int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private, struct http_half_private * hf_private_req_in, bool & need_to_close_the_session) { struct http_connection_private * hc_private = hs_private->hc_private; @@ -211,16 +211,10 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_ ret = __write_http_half_to_line(stream, CONN_DIR_UPSTREAM, hf_private_req_user); if (unlikely(ret < 0)) { - TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".); + TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user. "); return ret; } - if (hf_private_req_user->message_status == STATUS_COMPLETE) - { - hf_private_destory(hf_private_req_user); - hs_private->hf_private_req_user = NULL; - } - if (hf_private_req_in->stream_action == ACTION_DROP_DATA || hf_private_req_in->stream_action == ACTION_DEFER_DATA) { @@ -240,21 +234,11 @@ int __on_request_handle_user_req_or_resp(const tfe_stream * stream, struct http_ if (unlikely(ret < 0)) { - TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user".); + TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP request setup by user."); goto __errout; } - if (hf_private_resp_user->message_status == STATUS_COMPLETE) - { - hf_private_destory(hf_private_resp_user); - hs_private->hf_private_resp_user = NULL; - need_to_close_the_session = true; - } - else - { - TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); - } + need_to_close_the_session = true; } return 0; @@ -263,21 +247,37 @@ __errout: return -1; } -enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream, - struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) +static int __on_response_handle_user_req_or_resp(const tfe_stream * stream, struct http_session_private * hs_private, + struct http_half_private * hf_private_resp_in, bool & need_to_close_the_session) +{ + struct http_half_private * hf_private_resp_user = hs_private->hf_private_resp_user; + if (hf_private_resp_user == NULL) + { + return 0; + } + + int ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user); + if (unlikely(ret < 0)) + { + TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP response setup by user."); + return -1; + } + + if (hf_private_resp_in->stream_action == ACTION_DEFER_DATA) + { + hf_private_resp_in->stream_action = ACTION_DROP_DATA; + } + + return 0; +} + +static int __on_request_prepare_context(const struct tfe_stream * stream, unsigned int thread_id, + struct http_connection_private * hc_private, struct http_session_private ** hs_private_out, + struct http_half_private ** hf_private_out) { struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private); - bool __need_to_close_the_session = false; - int ret = 0; - - enum tfe_stream_action __action = ACTION_FORWARD_DATA; - size_t __action_args = 0; - - /* There is no available in session list, - * that indicate all HTTP request has corresponding response, - * or the last request is finished, we need to create a new session. */ if (hs_private == NULL || hf_private_req_in->message_status == STATUS_COMPLETE) { /* HTTP Request and Session */ @@ -286,8 +286,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea hf_private_set_session(hf_private_req_in, hs_private); /* Closure, catch stream, session and thread_id */ - struct user_event_dispatch_closure * __closure = ALLOC( - struct user_event_dispatch_closure, 1); + struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); __closure->thread_id = thread_id; __closure->stream = stream; __closure->session = to_hs_public(hs_private); @@ -300,144 +299,31 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea if (hs_private->ht_frame == NULL) { TFE_STREAM_LOG_ERROR(stream, "Failed at raising session begin event. "); - goto __errout; + return -1; } TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); } - /* The session is suspended, and to resume */ - if (hs_private->suspend_tag_effective) - { - enum tfe_http_event __backup_event = hs_private->suspend_event; - - /* Clean up suspend tag, we can support user's call suspend in this callback */ - hs_private->suspend_event = (enum tfe_http_event) 0; - hs_private->suspend_tag_effective = false; - hs_private->suspend_counter++; - - /* Call user callback, tell user we resume from suspend */ - hf_private_req_in->event_cb(hf_private_req_in, hs_private->suspend_event, NULL, 0, - hf_private_req_in->event_cb_user); - - -#if 0 - if (__on_request_handle_user_req_or_resp(stream, hs_private, - hf_private_req_in, __need_to_close_the_session) < 0) - { - goto __errout; - } - - if (hf_private_req_in->is_user_stream_action_set) - { - hf_private_req_in->stream_action = hf_private_req_in->user_stream_action; - } - else - { - hf_private_req_in->stream_action = ACTION_FORWARD_DATA; - } - - /* Ignore parse the content which is nullptr. */ - goto __boundary; -#endif - } - - /* Parse the content, the data which in defered state has been ignored. */ - ret = hf_private_parse(hf_private_req_in, data, len); - - /* Need more data, no boundary touched */ - if (ret == 0) - { - if (hf_private_req_in->stream_action == ACTION_DROP_DATA || - hf_private_req_in->stream_action == ACTION_FORWARD_DATA) - { - hf_private_req_in->parse_cursor = 0; - } - - return hf_private_req_in->stream_action; - } - - /* Suspend */ - if (hs_private->suspend_tag_effective) - { - return ACTION_DEFER_DATA; - } - - /* Some kind of error happened, write log and detach the stream */ - if (ret == -1 && hf_private_req_in->is_passthrough) - { - goto __errout; - } - - if (ret == -1) - { - TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s", - hf_private_req_in->parse_errno, http_errno_name(hf_private_req_in->parse_errno), - http_errno_description(hf_private_req_in->parse_errno)); - - goto __errout; - } - - if (__on_request_handle_user_req_or_resp(stream, hs_private, - hf_private_req_in, __need_to_close_the_session) < 0) - { - goto __errout; - } - -__boundary: - /* Touch a boundary, such as the end of HTTP headers, bodys, et al. */ - __action_args = hf_private_req_in->parse_cursor; - hf_private_req_in->parse_cursor = 0; - - if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA) - { - tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_args, sizeof(__action_args)); - __action = ACTION_FORWARD_DATA; - } - - if (hf_private_req_in->stream_action == ACTION_DROP_DATA) - { - tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_args, sizeof(__action_args)); - __action = ACTION_DROP_DATA; - } - -__out: - /* There is nothing for this session, close the session */ - if (__need_to_close_the_session) - { - http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); - TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - - hs_private->ht_frame = NULL; - hs_private_destroy(hs_private); - } - - return __action; - -__errout: - tfe_stream_detach(stream); - return ACTION_FORWARD_DATA; + *hs_private_out = hs_private; + *hf_private_out = hf_private_req_in; + return 0; } -enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stream * stream, - struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) +static int __on_response_prepare_context(const struct tfe_stream * stream, unsigned int thread_id, + struct http_connection_private * hc_private, struct http_session_private ** hs_private_out, + struct http_half_private ** hf_private_out) { struct http_session_private * hs_private = TAILQ_FIRST(&hc_private->hs_private_list); - struct http_half_private * hf_private_resp_in; - struct http_half_private * hf_private_resp_user; - enum tfe_stream_action __stream_action; - - int ret = 0; - size_t __action_byptes; /* Standalone response, it means missing something or malformed http protocol */ if (hs_private == NULL) { TFE_STREAM_LOG_ERROR(stream, "Standlone HTTP response emerged. Malformed HTTP Protocol, detached. "); - goto __errout; + return -1; } - hf_private_resp_in = to_hf_response_private(hs_private); + struct http_half_private * hf_private_resp_in = to_hf_response_private(hs_private); /* First time parse http response */ if (hf_private_resp_in == NULL) { @@ -467,106 +353,143 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre } } - /* Parse the content, the data which in defered state has been ignored. */ - ret = hf_private_parse(hf_private_resp_in, data, len); + *hs_private_out = hs_private; + *hf_private_out = hf_private_resp_in; + + return 0; +} + +enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, enum tfe_conn_dir dir, + struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) +{ + struct http_session_private * hs_private = NULL; + struct http_half_private * hf_private_in = NULL; + + bool __need_to_close_the_session = false; + int ret = 0; + + enum tfe_stream_action __action = ACTION_FORWARD_DATA; + size_t __action_args = 0; + + /* Prepare hs_private and hf_private_in */ + ret = (dir == CONN_DIR_DOWNSTREAM) ? + __on_request_prepare_context(stream, thread_id, hc_private, &hs_private, &hf_private_in) : + __on_response_prepare_context(stream, thread_id, hc_private, &hs_private, &hf_private_in); + + if (ret < 0) + { + goto __passthrough; + } + + /* The session is suspended, and to resume */ + if (hs_private->suspend_tag_effective) + { + enum tfe_http_event __backup_event = hs_private->suspend_event; + + /* Clean up suspend tag, we can support user's call suspend in this callback */ + hs_private->suspend_event = (enum tfe_http_event) 0; + hs_private->suspend_tag_effective = false; + hs_private->release_lock--; + hs_private->suspend_counter++; + + /* Call user callback, tell user we resume from suspend */ + assert(hs_private->resume_tag_singal); + hs_private->resume_tag_singal = false; + hf_private_in->event_cb(hf_private_in, __backup_event, NULL, 0, hf_private_in->event_cb_user); + } + + /* Parse the content, the data which in deferred state has been ignored. */ + ret = hf_private_parse(hf_private_in, data, len); + + /* Suspend, ask by user's callback */ + if (hs_private->suspend_tag_signal) + { + hs_private->suspend_tag_effective = true; + hs_private->suspend_tag_signal = false; + hs_private->release_lock++; + hs_private->suspend_counter++; + + tfe_stream_suspend(stream, dir); + return ACTION_DEFER_DATA; + } /* Need more data, no boundary touched */ if (ret == 0) { - if (hf_private_resp_in->stream_action == ACTION_DROP_DATA || - hf_private_resp_in->stream_action == ACTION_FORWARD_DATA) + if (hf_private_in->stream_action == ACTION_DROP_DATA || + hf_private_in->stream_action == ACTION_FORWARD_DATA) { - hf_private_resp_in->parse_cursor = 0; + hf_private_in->parse_cursor = 0; } - return hf_private_resp_in->stream_action; + return hf_private_in->stream_action; } - /* Need to passthrough */ - if (ret == -1 && hf_private_resp_in->is_passthrough) + if (ret == -1 && hf_private_in->is_passthrough) { - goto __errout; + assert(0); + goto __passthrough; } /* Some kind of error happened, write log and detach the stream */ if (ret == -1) { - TFE_STREAM_LOG_ERROR(stream, "Failed at parsing HTTP response: %u, %s, %s", - hf_private_request->parse_errno, http_errno_name(hf_private_request->parse_errno), - http_errno_description(hf_private_request->parse_errno)); + TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s", + hf_private_in->parse_errno, http_errno_name(hf_private_in->parse_errno), + http_errno_description(hf_private_in->parse_errno)); - goto __errout; + goto __passthrough; } - /* Upgrade, passthrough the connection and close this session */ - if (hf_private_resp_in->is_upgrade) + ret = (dir == CONN_DIR_DOWNSTREAM) ? + __on_request_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session) : + __on_response_handle_user_req_or_resp(stream, hs_private, hf_private_in, __need_to_close_the_session); + + if (ret < 0) { - tfe_stream_detach(stream); - hf_private_resp_in->stream_action = ACTION_FORWARD_DATA; + assert(0); + goto __passthrough; } - hf_private_resp_user = hs_private->hf_private_resp_user; - if (hf_private_resp_user != NULL) + /* Touch a boundary, such as the end of HTTP headers, bodys, et al. */ + __action_args = hf_private_in->parse_cursor; + hf_private_in->parse_cursor = 0; + + if (hf_private_in->stream_action == ACTION_FORWARD_DATA) { - ret = __write_http_half_to_line(stream, CONN_DIR_DOWNSTREAM, hf_private_resp_user); - if (unlikely(ret < 0)) - { - TFE_STREAM_LOG_ERROR(stream, "Failed to write HTTP response setup by user".); - goto __errout; - } - - if (hf_private_resp_user->message_status == STATUS_COMPLETE) - { - hf_private_destory(hf_private_resp_user); - hs_private->hf_private_resp_user = NULL; - } - - if (hf_private_resp_in->stream_action == ACTION_DEFER_DATA) - { - hf_private_resp_in->stream_action = ACTION_DROP_DATA; - } + tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_args, sizeof(__action_args)); + __action = ACTION_FORWARD_DATA; } - __stream_action = hf_private_resp_in->stream_action; - __action_byptes = hf_private_resp_in->parse_cursor; - hf_private_resp_in->parse_cursor = 0; - - if (hf_private_resp_in->message_status == STATUS_COMPLETE) + if (hf_private_in->stream_action == ACTION_DROP_DATA) { - /* Still sending user's response, should not destroy the session, - * move the session to orphan list, then we can handle the next session's response */ - if (hs_private->hf_private_resp_user != NULL) - { - TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - TAILQ_INSERT_TAIL(&hc_private->hs_private_orphan_list, hs_private, next); - } + tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_args, sizeof(__action_args)); + __action = ACTION_DROP_DATA; + } - /* Nothing to do, everything is over, destroy the session */ - else + /* ON RESPONSE, and input message is complete, need to close the session */ + if (dir == CONN_DIR_UPSTREAM && hf_private_in->message_status == STATUS_COMPLETE) + { + __need_to_close_the_session = true; + } + + /* There is nothing for this session, close the session */ + if (__need_to_close_the_session) + { + /* Try to close the session */ + if (hs_private_can_destroy(hs_private)) { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); - TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); - hs_private->ht_frame = NULL; - hs_private_destroy(hs_private); } + + TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); + hs_private_gc_destroy(hs_private, &g_http_plugin->gc_list_hs_private[thread_id]); } - if (__stream_action == ACTION_FORWARD_DATA) - { - tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes)); - return ACTION_FORWARD_DATA; - } + return __action; - if (__stream_action == ACTION_DROP_DATA) - { - tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes)); - return ACTION_DROP_DATA; - } - - goto __errout; - -__errout: +__passthrough: tfe_stream_detach(stream); return ACTION_FORWARD_DATA; } @@ -608,8 +531,7 @@ enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stre } /* This stream has been preempt, this plugin try to parse it */ - return (dir == CONN_DIR_DOWNSTREAM) ? __http_connection_entry_on_request(stream, ht_conn, thread_id, - data, len) : __http_connection_entry_on_response(stream, ht_conn, thread_id, data, len); + return http_connection_entry(stream, dir, ht_conn, thread_id, data, len); __detach: tfe_stream_detach(stream); @@ -640,20 +562,6 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]); } - TAILQ_FOREACH_SAFE(hs_private_iter, &ht_conn->hs_private_orphan_list, next, hs_private_titer) - { - TAILQ_REMOVE(&ht_conn->hs_private_orphan_list, hs_private_iter, next); - - if (hs_private_iter->ht_frame) - { - struct tfe_http_session * hs_public = to_hs_public(hs_private_iter); - http_frame_raise_session_end(hs_private_iter->ht_frame, stream, hs_public, hs_private_iter->thread_id); - hs_private_iter->ht_frame = NULL; - } - - hs_private_gc_destroy(hs_private_iter, &plugin_ctx->gc_list_hs_private[thread_id]); - } - /* Clear session counter, and free ht_conn structure */ ht_conn->session_id_counter = 0; free(ht_conn); diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 7f04128..c38e0d8 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -348,7 +348,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) /* user's suspend tag is set, which indicate that the way to handle request/response * cannot be determinate at now, need to defer */ - else if (hs_private && hs_private->suspend_tag_effective) + else if (hs_private && hs_private->suspend_tag_signal) { /* Pause parser, prevent to parse request/response body, * The body should be parsed after resume() */ @@ -829,13 +829,7 @@ void hs_ops_suspend(struct tfe_http_session * session) struct http_session_private * hs_private = to_hs_private(session); struct http_connection_private * hc_private = hs_private->hc_private; - hs_private->release_lock++; - hs_private->suspend_counter++; - hs_private->suspend_tag_effective = true; - - tfe_stream_suspend(hc_private->stream, CONN_DIR_DOWNSTREAM); - fprintf(stderr, "---- suspend ----, url = %s, counter = %d\n", - hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter); + hs_private->suspend_tag_signal = true; } void hs_ops_resume(struct tfe_http_session * session) @@ -844,15 +838,11 @@ void hs_ops_resume(struct tfe_http_session * session) struct http_connection_private * hc_private = hs_private->hc_private; assert(!hs_private->in_gc_queue); - - hs_private->release_lock--; if (hs_private->suspend_tag_effective) { tfe_stream_resume(hc_private->stream); + hs_private->resume_tag_singal = true; } - - fprintf(stderr, "---- resume ----, url = %s, counter = %d\n", - hs_private->hs_public.req->req_spec.url, hs_private->suspend_counter); } // TODO: change the return type to int, there is something happend where -1 returned. @@ -1128,10 +1118,17 @@ void __write_access_log(struct http_session_private * hs_private) const char * __str_req_passthrough = request ? request->is_passthrough ? "PASS-THROUGH/REQ" : "-" : "-"; const char * __str_resp_passthrough = response ? response->is_passthrough ? "PASS-THROUGH/REP" : "-" : "-"; + /* USER SETUP REQUEST/RESPONSE */ + const char * __str_user_req = hs_private->hf_private_req_user ? "USER/REQ" : "-"; + const char * __str_user_resp = hs_private->hf_private_resp_user ? "USER/RESP" : "-"; + + /* SUSPEND */ + const char * __str_suspend = hs_private->suspend_counter > 0 ? "SUSPEND" : "-"; + char * __access_log; - asprintf(&__access_log, "%d %s %s HTTP/%d.%d %s %s %s %s %s %s", hs_private->hs_public.session_id, __str_method, + asprintf(&__access_log, "%d %s %s HTTP/%d.%d %s %s %s %s %s %s %s %s %s", hs_private->hs_public.session_id, __str_method, __str_url, request->major, request->minor, __str_resp_code, __str_cont_type, __str_cont_encoding, - __str_upgrade, __str_req_passthrough, __str_resp_passthrough); + __str_upgrade, __str_req_passthrough, __str_resp_passthrough, __str_user_req, __str_user_resp, __str_suspend); const struct tfe_stream * stream = hs_private->hc_private->stream; tfe_stream_write_access_log(stream, RLOG_LV_INFO, "%s", __access_log); @@ -1157,11 +1154,13 @@ void hs_private_destroy(struct http_session_private * hs_private) if (hs_private->hf_private_req_user) { + assert(hs_private->hf_private_req_user->message_status == STATUS_COMPLETE); hf_private_destory(hs_private->hf_private_req_user); } if (hs_private->hf_private_resp_user) { + assert(hs_private->hf_private_resp_user->message_status == STATUS_COMPLETE); hf_private_destory(hs_private->hf_private_resp_user); }