diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index 65c02bb..fc74090 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -391,7 +391,8 @@ struct http_frame_plugin_status { void * pme; unsigned int detached; - unsigned int preempt; + unsigned int is_preempted; + unsigned int preempt_by; }; struct http_frame_session_ctx * http_frame_raise_session_begin(const tfe_stream * stream, diff --git a/common/src/tfe_http.cpp b/common/src/tfe_http.cpp index 45d8803..cf5bf53 100644 --- a/common/src/tfe_http.cpp +++ b/common/src/tfe_http.cpp @@ -236,10 +236,11 @@ int http_frame_currect_plugin_preempt(struct http_frame_session_ctx * ht_frame) for(unsigned int i = 0; i < ht_frame->nr_plugin_status; i++) { struct http_frame_plugin_status * __plugin_status = &ht_frame->plugin_status[i]; - if (__plugin_status->preempt) return -1; + if (__plugin_status->is_preempted) return -1; } - assert(ht_frame->calling_plugin_status != NULL); - ht_frame->calling_plugin_status->preempt = 1; + //TODO: + //assert(ht_frame->calling_plugin_status != NULL); + //ht_frame->calling_plugin_status->is_preempted = 1; return 0; } diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index 17bef3a..9d58d8e 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -555,7 +555,6 @@ static void ssl_async_peek_client_hello(struct future * future, evutil_socket_t struct peek_client_hello_ctx * ctx = ALLOC(struct peek_client_hello_ctx, 1); ctx->ev = event_new(evbase, fd, EV_READ, peek_client_hello_cb, p); ctx->logger = logger; - event_add(ctx->ev, NULL); promise_set_ctx(p, (void *) ctx, peek_client_hello_ctx_free_cb); return; diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 9e08e51..1333a46 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -661,7 +661,6 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user) void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) { return; - assert(0); } struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) @@ -899,7 +898,6 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); - if (_stream->session_type == STREAM_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); diff --git a/plugin/business/pangu-http/pangu_http.cpp b/plugin/business/pangu-http/pangu_http.cpp index dc69782..cbcb11f 100644 --- a/plugin/business/pangu-http/pangu_http.cpp +++ b/plugin/business/pangu-http/pangu_http.cpp @@ -242,7 +242,6 @@ struct replace_ctx size_t n_rule; struct tfe_http_half * replacing; struct evbuffer * http_body; - size_t body_size; }; struct pangu_http_ctx @@ -270,19 +269,24 @@ static struct pangu_http_ctx * pangu_http_ctx_new(unsigned int thread_id) static void pangu_http_ctx_free(struct pangu_http_ctx * ctx) { - if (ctx->rep_ctx != NULL) + if (ctx->rep_ctx == NULL) + return; + + for (size_t i = 0; i < ctx->rep_ctx->n_rule; i++) + { + FREE(&(ctx->rep_ctx->rule[i].find)); + FREE(&(ctx->rep_ctx->rule[i].replace_with)); + } + + if (ctx->rep_ctx->http_body) { - for (size_t i = 0; i < ctx->rep_ctx->n_rule; i++) - { - FREE(&(ctx->rep_ctx->rule[i].find)); - FREE(&(ctx->rep_ctx->rule[i].replace_with)); - } evbuffer_free(ctx->rep_ctx->http_body); ctx->rep_ctx->http_body = NULL; - //todo destroy http_half; - assert(ctx->rep_ctx->replacing == NULL); - FREE(&ctx->rep_ctx); } + + //todo destroy http_half; + assert(ctx->rep_ctx->replacing == NULL); + FREE(&ctx->rep_ctx); FREE(&ctx->enforce_rules); FREE(&ctx->enforce_para); Maat_clean_status(&(ctx->mid)); @@ -470,18 +474,19 @@ static char * strtok_r_esc(char * s, const char delim, char ** save_ptr) size_t format_replace_rule(const char * exec_para, struct replace_rule * replace, size_t n_replace) { - char * tmp = ALLOC(char, strlen(exec_para) + 1); char * token = NULL, * sub_token = NULL, * saveptr = NULL, * saveptr2 = NULL; size_t idx = 0; - const char * str_zone = "replace="; + + const char * str_zone = "zone="; const char * str_subs = "substitute="; memcpy(tmp, exec_para, strlen(exec_para)); + for (token = tmp;; token = NULL) { sub_token = strtok_r(token, ";", &saveptr); - if (sub_token == NULL) - break; + if (sub_token == NULL) break; + if (0 == strncasecmp(sub_token, str_zone, strlen(str_zone))) { replace[idx].zone = zone_name_to_id(sub_token + strlen(str_zone)); @@ -490,11 +495,14 @@ size_t format_replace_rule(const char * exec_para, struct replace_rule * replace break; } } + + sub_token = strtok_r(NULL, ";", &saveptr); if (0 == strncasecmp(sub_token, str_subs, strlen(str_subs))) { - sub_token += strlen(str_subs); + sub_token += strlen(str_subs) + 1; replace[idx].find = tfe_strdup(strtok_r_esc(sub_token, '/', &saveptr2)); replace[idx].replace_with = tfe_strdup(strtok_r_esc(NULL, '/', &saveptr2)); + idx++; if (idx == n_replace) { @@ -502,10 +510,12 @@ size_t format_replace_rule(const char * exec_para, struct replace_rule * replace } } } + free(tmp); tmp = NULL; return idx; } + size_t select_replace_rule(enum replace_zone zone, const struct replace_rule * replace, size_t n_replace, const struct replace_rule ** selected, size_t n_selected) { @@ -520,6 +530,7 @@ size_t select_replace_rule(enum replace_zone zone, const struct replace_rule * r } return j; } + static struct evbuffer * replace_string(const char * in, const struct replace_rule * zone) { //Reference to https://www.lemoda.net/c/unix-regex/ @@ -531,6 +542,7 @@ static struct evbuffer * replace_string(const char * in, const struct replace_ru size_t replace_len = strlen(zone->replace_with); + assert(strlen(zone->find) != 0); status = regcomp(®, zone->find, REG_EXTENDED | REG_NEWLINE); if (status != 0) { @@ -588,6 +600,7 @@ static struct evbuffer * replace_string(const char * in, const struct replace_ru } p += m[0].rm_eo; } + if (is_replaced) { evbuffer_add(out, pre_sub_expr_end, in_sz - (pre_sub_expr_end - p)); @@ -595,8 +608,8 @@ static struct evbuffer * replace_string(const char * in, const struct replace_ru regfree(®); return out; - } + struct evbuffer * execute_replace_rule(const char * in, size_t in_sz, enum replace_zone zone, const struct replace_rule * rules, size_t n_rule) { @@ -618,127 +631,176 @@ struct evbuffer * execute_replace_rule(const char * in, size_t in_sz, interator = in; for (i = 0; i < n_todo; i++) { - new_out = replace_string(interator, todo[i]); if (new_out != NULL) { pre_out = out; out = new_out; interator = (char *) evbuffer_pullup(out, -1); - evbuffer_free(pre_out); - pre_out = NULL; + + if (pre_out != NULL) + { + evbuffer_free(pre_out); + pre_out = NULL; + } } } return out; } + void http_replace(const struct tfe_stream * stream, const struct tfe_http_session * session, enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, struct pangu_http_ctx * ctx) { - void * interator = NULL; - struct http_field_name tmp_name; - const char * buff_in = NULL; - struct evbuffer * rewrite_url = NULL, * rewrite_buff = NULL; - struct replace_ctx * rep_ctx = NULL; struct tfe_http_session * to_write_sess = NULL; - to_write_sess = tfe_http_session_allow_write(session); if (to_write_sess == NULL) //fail to wirte, abandon. { TFE_STREAM_LOG_INFO(stream, "tfe_http_session_allow_write() %s failed.", session->req->req_spec.uri); - tfe_http_session_detach(session); - return; + tfe_http_session_detach(session); return; } + + struct replace_ctx * rep_ctx = ctx->rep_ctx; if (ctx->rep_ctx == NULL) { - ctx->rep_ctx = rep_ctx = ALLOC(struct replace_ctx, 1); - rep_ctx->rule = ALLOC(struct replace_rule, MAX_EDIT_ZONE_NUM); - rep_ctx->n_rule = format_replace_rule(ctx->enforce_para, rep_ctx->rule, MAX_EDIT_ZONE_NUM); - } - if (events & EV_HTTP_REQ_HDR) - { - rewrite_url = execute_replace_rule(session->req->req_spec.uri, strlen(session->req->req_spec.uri), - kZoneRequestUri, rep_ctx->rule, rep_ctx->n_rule); - } - if ((events & EV_HTTP_REQ_HDR) | (events & EV_HTTP_RESP_HDR)) - { - - if (events & EV_HTTP_REQ_HDR) + /* we must determinate the replace action on HTTP header, otherwise, + * the header has been forwarded, only replace the body but not modify header will raise exception */ + if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR)) { - rep_ctx->replacing = tfe_http_session_request_create(to_write_sess, session->req->req_spec.method, - rewrite_url != NULL ? (char *) evbuffer_pullup(rewrite_url, -1) : session->req->req_spec.uri); - evbuffer_free(rewrite_url); - rewrite_url = NULL; + ctx->rep_ctx = rep_ctx = ALLOC(struct replace_ctx, 1); + rep_ctx->rule = ALLOC(struct replace_rule, MAX_EDIT_ZONE_NUM); + rep_ctx->n_rule = format_replace_rule(ctx->enforce_para, rep_ctx->rule, MAX_EDIT_ZONE_NUM); } else { - rep_ctx->replacing = tfe_http_session_response_create(to_write_sess, session->resp->resp_spec.resp_code); - } - while (1) - { - buff_in = tfe_http_field_iterate(session->req, &interator, &tmp_name); - if (tmp_name.field_id == TFE_HTTP_CONT_LENGTH) - { - continue; - } - if (buff_in != NULL) - { - rewrite_buff = execute_replace_rule(buff_in, strlen(buff_in), - events & EV_HTTP_REQ_HDR ? kZoneRequestHeaders : kZoneResponseHeader, rep_ctx->rule, - rep_ctx->n_rule); - tfe_http_field_write(rep_ctx->replacing, &tmp_name, - rewrite_buff != NULL ? (char *) evbuffer_pullup(rewrite_buff, -1) : buff_in); - evbuffer_free(rewrite_buff); - rewrite_buff = NULL; - - } - else - { - break; - } + TFE_STREAM_LOG_INFO(stream, "Can only setup replace on REQ/RESP headers, detached."); + tfe_http_session_detach(session); return; } } - if ((events & EV_HTTP_REQ_BODY_BEGIN) | (events & EV_HTTP_RESP_BODY_BEGIN)) - { - assert(rep_ctx->http_body == NULL); - assert(rep_ctx->body_size = 0); - rep_ctx->http_body = evbuffer_new(); - } - if (body_frag != NULL) - { - evbuffer_add(rep_ctx->http_body, body_frag, frag_size); - rep_ctx->body_size++; - } - if ((events & EV_HTTP_REQ_BODY_END) | (events & EV_HTTP_RESP_BODY_END)) - { - assert(rep_ctx->body_size == evbuffer_get_length(rep_ctx->http_body)); - buff_in = (char *) evbuffer_pullup(rep_ctx->http_body, -1); - rewrite_buff = execute_replace_rule(buff_in, rep_ctx->body_size, - events & EV_HTTP_REQ_HDR ? kZoneRequestHeaders : kZoneResponseHeader, rep_ctx->rule, rep_ctx->n_rule); - char cont_len_str[TFE_SYMBOL_MAX]; - snprintf(cont_len_str, sizeof(cont_len_str), "%lu", evbuffer_get_length(rewrite_buff)); - _wrap_std_field_write(rep_ctx->replacing, TFE_HTTP_CONT_LENGTH, cont_len_str); - tfe_http_half_append_body(rep_ctx->replacing, - (char *) evbuffer_pullup(rewrite_buff, -1), evbuffer_get_length(rewrite_buff), 0); - evbuffer_free(rewrite_buff); - rewrite_buff = NULL; + struct tfe_http_half * in_req_half = session->req; + struct tfe_http_half * in_resp_half = session->resp; + struct tfe_http_req_spec * in_req_spec = &in_req_half->req_spec; + struct tfe_http_resp_spec * in_resp_spec = &in_resp_half->resp_spec; + + if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR)) + { + struct evbuffer * rewrite_uri = NULL; if (is_http_request(events)) { + rewrite_uri = execute_replace_rule(in_req_spec->uri, strlen(in_req_spec->uri), + kZoneRequestUri, rep_ctx->rule, rep_ctx->n_rule); + + rep_ctx->replacing = tfe_http_session_request_create(to_write_sess, in_req_spec->method, + rewrite_uri != NULL ? (char *) evbuffer_pullup(rewrite_uri, -1) : in_req_spec->uri); + tfe_http_session_request_set(to_write_sess, rep_ctx->replacing); } else { + rep_ctx->replacing = tfe_http_session_response_create(to_write_sess, in_resp_spec->resp_code); tfe_http_session_response_set(to_write_sess, rep_ctx->replacing); } - rep_ctx->replacing = NULL;//http half's ownership has been transfered to session. - evbuffer_free(rep_ctx->http_body); - rep_ctx->http_body = NULL; - rep_ctx->body_size = 0; + if (rewrite_uri != NULL) + { + evbuffer_free(rewrite_uri); + rewrite_uri = NULL; + } + + enum replace_zone zone = is_http_request(events) ? kZoneRequestHeaders : kZoneResponseHeader; + struct tfe_http_half * in_half = is_http_request(events) ? in_req_half : in_resp_half; + + struct http_field_name in_header_field{}; + const char * in_header_value = NULL; + void * iterator = NULL; + + while (true) + { + if ((in_header_value = tfe_http_field_iterate(in_half, &iterator, &in_header_field)) == NULL) + { + break; + } + + struct evbuffer * rewrite_buff = execute_replace_rule(in_header_value, + strlen(in_header_value), zone, rep_ctx->rule, rep_ctx->n_rule); + + if (rewrite_buff != NULL) + { + tfe_http_field_write(rep_ctx->replacing, &in_header_field, (char *) evbuffer_pullup(rewrite_buff, -1)); + } + else + { + tfe_http_field_write(rep_ctx->replacing, &in_header_field, in_header_value); + } + + if (rewrite_buff != NULL) + { + evbuffer_free(rewrite_buff); + } + } } - return; + if ((events & EV_HTTP_REQ_BODY_BEGIN) || (events & EV_HTTP_RESP_BODY_BEGIN)) + { + assert(rep_ctx->http_body == NULL); + rep_ctx->http_body = evbuffer_new(); + } + + if ((events & EV_HTTP_REQ_BODY_CONT) || (events & EV_HTTP_RESP_BODY_CONT)) + { + evbuffer_add(rep_ctx->http_body, body_frag, frag_size); + } + + if ((events & EV_HTTP_REQ_BODY_END) || (events & EV_HTTP_RESP_BODY_END)) + { + char * __http_body = (char *) evbuffer_pullup(rep_ctx->http_body, -1); + size_t __http_body_len = evbuffer_get_length(rep_ctx->http_body); + + enum replace_zone replace_zone = is_http_request(events) ? kZoneRequestHeaders : kZoneResponseHeader; + struct evbuffer * rewrite_buff; + + if (is_http_request(events)) + { + rewrite_buff = execute_replace_rule(__http_body, __http_body_len, kZoneRequestBody, + rep_ctx->rule, rep_ctx->n_rule); + } + else + { + rewrite_buff = execute_replace_rule(__http_body, __http_body_len, kZoneResponseBody, + rep_ctx->rule, rep_ctx->n_rule); + } + + if (rewrite_buff != NULL) + { + char * __rewrite_buff = (char *) evbuffer_pullup(rewrite_buff, -1); + size_t __sz_rewrite_buff = evbuffer_get_length(rewrite_buff); + tfe_http_half_append_body(rep_ctx->replacing, __rewrite_buff, __sz_rewrite_buff, 0); + } + else + { + tfe_http_half_append_body(rep_ctx->replacing, __http_body, __http_body_len, 0); + } + + if (rewrite_buff != NULL) + { + evbuffer_free(rewrite_buff); + rewrite_buff = NULL; + } + + if (rep_ctx->http_body != NULL) + { + evbuffer_free(rep_ctx->http_body); + rep_ctx->http_body = NULL; + } + } + + if ((events & EV_HTTP_REQ_END) || (events & EV_HTTP_RESP_END)) + { + tfe_http_half_append_body(rep_ctx->replacing, NULL, 0, 0); + rep_ctx->replacing = NULL; + } } + static void http_reject(const struct tfe_http_session * session, enum tfe_http_event events, struct pangu_http_ctx * ctx) { @@ -766,6 +828,7 @@ static void http_reject(const struct tfe_http_session * session, enum tfe_http_e _wrap_std_field_write(response, TFE_HTTP_CONT_LENGTH, cont_len_str); tfe_http_half_append_body(response, page_buff, page_size, 0); + tfe_http_half_append_body(response, NULL, 0, 0); tfe_http_session_response_set(to_write_sess, response); tfe_http_session_detach(session); @@ -800,6 +863,7 @@ static void http_redirect(const struct tfe_http_session * session, enum tfe_http response = tfe_http_session_response_create(to_write, resp_code); _wrap_std_field_write(response, TFE_HTTP_LOCATION, url); + tfe_http_half_append_body(response, NULL, 0, 0); tfe_http_session_response_set(to_write, response); tfe_http_session_detach(session); diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 1a9e3fe..2d7d470 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -67,6 +67,7 @@ struct http_half_private /* default stream action */ enum tfe_stream_action stream_action; enum tfe_stream_action user_stream_action; + bool is_user_stream_action_set; /* Setup by User */ bool is_setup_by_user; diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index cada659..eb2ac78 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -56,7 +56,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) { struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); - struct http_half_private * hf_private_request = to_hf_request_private(hs_private); + struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private); + struct http_half_private * hf_private_req_user; /* tfe_hexdump(stderr, __FUNCTION__, data, (unsigned int)len); */ int ret = 0; @@ -65,12 +66,12 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea /* There is no available in session list, * that indicate all HTTP request has corresponding response, * or the last request is finished, we need to create a new session. */ - if (hs_private == NULL || hf_private_request->message_status == STATUS_COMPLETE) + if (hs_private == NULL || hf_private_req_in->message_status == STATUS_COMPLETE) { /* HTTP Request and Session */ - hf_private_request = hf_private_create(TFE_HTTP_REQUEST, 1, 0); - hs_private = hs_private_create(hc_private, hf_private_request, NULL); - hf_private_set_session(hf_private_request, hs_private); + hf_private_req_in = hf_private_create(TFE_HTTP_REQUEST, 1, 0); + hs_private = hs_private_create(hc_private, hf_private_req_in, NULL); + hf_private_set_session(hf_private_req_in, hs_private); /* Closure, catch stream, session and thread_id */ struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); @@ -79,7 +80,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea __closure->session = to_hs_public(hs_private); /* Set callback, this callback used to raise business event */ - hf_private_set_callback(hf_private_request, __user_event_dispatch, __closure, free); + hf_private_set_callback(hf_private_req_in, __user_event_dispatch, __closure, free); /* Call business plugin */ hs_private->ht_frame = http_frame_raise_session_begin(stream, &hs_private->hs_public, thread_id); @@ -93,41 +94,57 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea } /* Parse the content, the data which in defered state has been ignored. */ - ret = hf_private_parse(hf_private_request, data, len); + ret = hf_private_parse(hf_private_req_in, data, len); /* Need more data, no boundary touched */ if (ret == 0) { - if (hf_private_request->stream_action == ACTION_DROP_DATA || - hf_private_request->stream_action == ACTION_FORWARD_DATA) + if (hf_private_req_in->stream_action == ACTION_DROP_DATA || + hf_private_req_in->stream_action == ACTION_FORWARD_DATA) { - hf_private_request->parse_cursor = 0; + hf_private_req_in->parse_cursor = 0; } - return hf_private_request->stream_action; + return hf_private_req_in->stream_action; } /* Some kind of error happened, write log and detach the stream */ if (ret == -1) { TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s", - hf_private_request->parse_errno, http_errno_name(hf_private_request->parse_errno), - http_errno_description(hf_private_request->parse_errno)); + hf_private_req_in->parse_errno, http_errno_name(hf_private_req_in->parse_errno), + http_errno_description(hf_private_req_in->parse_errno)); goto __errout; } /* Touch a boundary, such as the end of HTTP headers, bodys, et al. */ - __action_byptes = hf_private_request->parse_cursor; - hf_private_request->parse_cursor = 0; + __action_byptes = hf_private_req_in->parse_cursor; + hf_private_req_in->parse_cursor = 0; - if (hf_private_request->stream_action == ACTION_FORWARD_DATA) + hf_private_req_user = hs_private->hf_private_req_user; + if (hf_private_req_user != NULL && hf_private_req_user->message_status == STATUS_COMPLETE) + { + /* Construct, and write response immediately */ + hf_private_construct(hf_private_req_user); + size_t __to_write_len = evbuffer_get_length(hf_private_req_user->evbuf_raw); + unsigned char * __to_write = evbuffer_pullup(hf_private_req_user->evbuf_raw, __to_write_len); + + /* Write the data to stream, UPSTREAM is the incoming direction for response */ + ret = tfe_stream_write(stream, CONN_DIR_UPSTREAM, __to_write, __to_write_len); + if (unlikely(ret < 0)) { assert(0); } + + hf_private_destory(hf_private_req_user); + hs_private->hf_private_resp_user = NULL; + } + + if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA) { tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes)); return ACTION_FORWARD_DATA; } - if (hf_private_request->stream_action == ACTION_DROP_DATA) + if (hf_private_req_in->stream_action == ACTION_DROP_DATA) { tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes)); return ACTION_DROP_DATA; @@ -158,8 +175,6 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre } hf_private_resp_in = to_hf_response_private(hs_private); - hf_private_resp_user = hs_private->hf_private_resp_user; - /* First time parse http response */ if (hf_private_resp_in == NULL) { @@ -172,41 +187,14 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre hs_private_hf_private_set(hs_private, hf_private_resp_in, TFE_HTTP_RESPONSE); hf_private_set_session(hf_private_resp_in, hs_private); - if (hf_private_resp_user != NULL) - { - /* Set nothing callback, dont call user callback because the data need to be droped */ - hf_private_set_callback(hf_private_resp_in, NULL, NULL, NULL); - /* Drop all data, because the user's response need to be send */ - hf_private_resp_in->stream_action = ACTION_DROP_DATA; - } - else - { - /* Closure, catch stream, session and thread_id */ - struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); - __closure->thread_id = thread_id; - __closure->stream = stream; - __closure->session = to_hs_public(hs_private); + /* Closure, catch stream, session and thread_id */ + struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); + __closure->thread_id = thread_id; + __closure->stream = stream; + __closure->session = to_hs_public(hs_private); - /* Set callback, this callback used to raise business event */ - hf_private_set_callback(hf_private_resp_in, __user_event_dispatch, __closure, free); - /* Inherit user stream action, this action can affact session's behavior */ - hf_private_resp_in->user_stream_action = to_hf_request_private(hs_private)->user_stream_action; - } - } - - if (hf_private_resp_user != NULL) - { - /* Construct, and write response immediately */ - hf_private_construct(hf_private_resp_user); - size_t __to_write_len = evbuffer_get_length(hf_private_resp_user->evbuf_raw); - unsigned char * __to_write = evbuffer_pullup(hf_private_resp_user->evbuf_raw, __to_write_len); - - /* Write the data to stream, UPSTREAM is the incoming direction for response */ - ret = tfe_stream_write(stream, CONN_DIR_DOWNSTREAM, __to_write, __to_write_len); - if (unlikely(ret < 0)) { assert(0); } - - hf_private_destory(hf_private_resp_user); - hs_private->hf_private_resp_user = NULL; + /* Set callback, this callback used to raise business event */ + hf_private_set_callback(hf_private_resp_in, __user_event_dispatch, __closure, free); } /* Parse the content, the data which in defered state has been ignored. */ @@ -234,6 +222,22 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre goto __errout; } + hf_private_resp_user = hs_private->hf_private_resp_user; + if (hf_private_resp_user != NULL && hf_private_resp_user->message_status == STATUS_COMPLETE) + { + /* Construct, and write response immediately */ + hf_private_construct(hf_private_resp_user); + size_t __to_write_len = evbuffer_get_length(hf_private_resp_user->evbuf_raw); + unsigned char * __to_write = evbuffer_pullup(hf_private_resp_user->evbuf_raw, __to_write_len); + + /* Write the data to stream, UPSTREAM is the incoming direction for response */ + ret = tfe_stream_write(stream, CONN_DIR_DOWNSTREAM, __to_write, __to_write_len); + if (unlikely(ret < 0)) { assert(0); } + + hf_private_destory(hf_private_resp_user); + hs_private->hf_private_resp_user = NULL; + } + if (hf_private_resp_in->message_status == STATUS_COMPLETE) { http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); @@ -290,7 +294,7 @@ enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stre /* Now, we want to identify this stream */ int ret = __http_connection_identify(stream, ht_conn, data, len); - if (ret != 0) goto __detach; + if (ret < 0) goto __detach; /* This is HTTP, try to preempt the stream * It may be failed because other plugin preempted before us */ diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index dc8955f..c9b5f67 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -252,7 +252,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) hf_public->minor_version = parser->http_minor; /* Copy version to session */ - if(hf_private->session != NULL) + if (hf_private->session != NULL) { to_hs_public(hf_private->session)->major_version = hf_public->major_version; to_hs_public(hf_private->session)->minor_version = hf_public->minor_version; @@ -277,7 +277,15 @@ static int __parser_callback_on_headers_complete(http_parser * parser) hf_private->event_cb(hf_private, EV_HTTP_RESP_HDR, NULL, 0, hf_private->event_cb_user); } - hf_private->stream_action = ACTION_FORWARD_DATA; + if (hf_private->is_user_stream_action_set) + { + hf_private->stream_action = hf_private->user_stream_action; + } + else + { + hf_private->stream_action = ACTION_FORWARD_DATA; + } + return 0; } @@ -423,7 +431,19 @@ const char * hf_ops_field_iterate(const struct tfe_http_half * half, void ** ite int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag) { struct http_half_private * hf_private = to_hf_private(half); - if (hf_private->evbuf_body == NULL) { hf_private->evbuf_body = evbuffer_new(); } + + /* Indicate the body is finished */ + if (buff == NULL && size == 0) + { + hf_private->message_status = STATUS_COMPLETE; + return 0; + } + + if (hf_private->evbuf_body == NULL) + { + hf_private->evbuf_body = evbuffer_new(); + } + return evbuffer_add(hf_private->evbuf_body, buff, size); } @@ -517,7 +537,7 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char if (sz_parsed == len) { hf_private->parse_cursor += sz_parsed; - return 0; + return HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED ? 1 : 0; } /* The paused parsar indicate the message boundary has been touched, we should return. @@ -550,19 +570,48 @@ void hs_ops_drop(struct tfe_http_session * session) return; } -void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req) +// TODO: change the return type to int, there is something happend where -1 returned. +void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user) { - struct http_half_private * hf_private = to_hf_private(req); struct http_session_private * hs_private = to_hs_private(session); + struct http_half_private * hf_in_private = to_hf_request_private(hs_private); + struct http_half_private * hf_user_private = to_hf_private(req_user); - assert(hs_private->hf_private_req_user != NULL); - hs_private->hf_private_req_user = hf_private; + if (hf_in_private != NULL) + { + if (hf_in_private->stream_action == ACTION_DEFER_DATA) + { + hf_in_private->user_stream_action = ACTION_DROP_DATA; + hf_in_private->is_user_stream_action_set = true; + } + else + { + assert(0); + } + } + + assert(hs_private->hf_private_req_user == NULL); + hs_private->hf_private_req_user = hf_user_private; } void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) { struct http_half_private * hf_private = to_hf_private(resp); struct http_session_private * hs_private = to_hs_private(session); + struct http_half_private * hf_in_private = to_hf_response_private(hs_private); + + if (hf_in_private != NULL) + { + if (hf_in_private->stream_action == ACTION_DEFER_DATA) + { + hf_in_private->user_stream_action = ACTION_DROP_DATA; + hf_in_private->is_user_stream_action_set = true; + } + else + { + assert(0); + } + } assert(hs_private->hf_private_resp_user == NULL); hs_private->hf_private_resp_user = hf_private; @@ -603,7 +652,18 @@ struct tfe_http_session_ops __http_session_ops = void __construct_request_line(struct http_half_private * hf_private) { + enum tfe_http_std_method __std_method = (enum tfe_http_std_method ) hf_private->method_or_status; + const char * __str_method = http_std_method_to_string(__std_method); + if (__str_method == NULL) + { + __str_method = ""; + } + hf_private->major = 1; + hf_private->minor = 1; + + evbuffer_add_printf(hf_private->evbuf_raw, "%s %s HTTP/%d.%d\r\n", + __str_method, hf_private->url_storage, hf_private->major, hf_private->minor); } void __construct_response_line(struct http_half_private * hf_private) @@ -615,6 +675,9 @@ void __construct_response_line(struct http_half_private * hf_private) __str_resp_code = ""; } + hf_private->major = 1; + hf_private->minor = 1; + evbuffer_add_printf(hf_private->evbuf_raw, "HTTP/%d.%d %d %s\r\n", hf_private->major, hf_private->minor, __resp_code, __str_resp_code); }