diff --git a/plugin/protocol/http2/include/internal/http2_common.h b/plugin/protocol/http2/include/internal/http2_common.h index a9f8dbf..c9e07a2 100644 --- a/plugin/protocol/http2/include/internal/http2_common.h +++ b/plugin/protocol/http2/include/internal/http2_common.h @@ -19,7 +19,7 @@ typedef struct RTLogInit2Data_ { char run_log_path[256]; - void *run_log_handle; + void *handle; } RTLogInit2Data; typedef struct Http2Plugin_{ @@ -66,7 +66,7 @@ struct z_stream_st{ BrotliEncoderState *brenc_state; }; -RTLogInit2Data *rt_log_data(); +RTLogInit2Data *logger(); Http2Plugin *http2_plugin(); diff --git a/plugin/protocol/http2/include/internal/http2_stream.h b/plugin/protocol/http2/include/internal/http2_stream.h index d406a2d..01085b1 100644 --- a/plugin/protocol/http2/include/internal/http2_stream.h +++ b/plugin/protocol/http2/include/internal/http2_stream.h @@ -29,6 +29,7 @@ struct data_t { int gzip; uint8_t flags; + ssize_t padlen; struct z_stream_st *inflater; struct z_stream_st *deflate; struct evbuffer * evbuf_body; @@ -42,6 +43,7 @@ struct header_data{ struct http2_headers{ int nvlen; + int complete; uint8_t flag; struct header_data *head, *tail; }; @@ -59,6 +61,7 @@ struct http2_half_private char * url_storage; struct data_t body; struct http2_headers headers; + struct http2_headers promised; enum nghttp2_manage_stage body_state; enum nghttp2_manage_stage message_state; @@ -82,7 +85,10 @@ struct h2_stream_data_t{ int spd_set; int spd_valid; int rse_set; + int set_cnt; int flag_end; + int spd_set_cnt; + int spd_cnt; tfe_http_event spd_event; struct http_frame_session_ctx *frame_ctx; @@ -90,20 +96,16 @@ struct h2_stream_data_t{ struct http2_half_private *resp, *pangu_resp; }; -struct h2_run_id -{ - int num; - int32_t id[128]; -}; - struct tfe_session_info_t { TAILQ_HEAD(list_head, h2_stream_data_t) list; - /* stream type**/ - int state; + int goaway; + + enum tfe_stream_action stream_action; + unsigned int thread_id; - struct h2_run_id h2_id; + /** for down stream as server */ nghttp2_session *as_server; /** for up stream as client*/ diff --git a/plugin/protocol/http2/src/http2_common.cpp b/plugin/protocol/http2/src/http2_common.cpp index ee2cfd8..bd3e368 100644 --- a/plugin/protocol/http2/src/http2_common.cpp +++ b/plugin/protocol/http2/src/http2_common.cpp @@ -21,7 +21,7 @@ RTLogInit2Data logging_sc_lid = { .run_log_level = 1, }; -RTLogInit2Data *rt_log_data() +RTLogInit2Data *logger() { return &logging_sc_lid; }; diff --git a/plugin/protocol/http2/src/http2_plugin.cpp b/plugin/protocol/http2/src/http2_plugin.cpp index eb27935..e784ef9 100644 --- a/plugin/protocol/http2/src/http2_plugin.cpp +++ b/plugin/protocol/http2/src/http2_plugin.cpp @@ -45,16 +45,16 @@ struct event_timer_ctx void load_logging_conf(const char *config) { - RTLogInit2Data *logging_sc_lid = rt_log_data(); + RTLogInit2Data *logging_sc_lid = logger(); MESA_load_profile_int_def(config, (const char *)"http",(const char *)"loglevel", &logging_sc_lid->run_log_level, 10); MESA_load_profile_string_def(config, (const char *)"http",(const char *)"logfile", logging_sc_lid->run_log_path, 128, "log/http2.log"); - logging_sc_lid->run_log_handle = MESA_create_runtime_log_handle(logging_sc_lid->run_log_path, logging_sc_lid->run_log_level); - if(logging_sc_lid->run_log_handle == NULL){ - TFE_LOG_ERROR(logging_sc_lid->run_log_handle, "Create log runtime_log_handle error, init failed!"); + logging_sc_lid->handle = MESA_create_runtime_log_handle(logging_sc_lid->run_log_path, logging_sc_lid->run_log_level); + if(logging_sc_lid->handle == NULL){ + TFE_LOG_ERROR(logging_sc_lid->handle, "Create log runtime_log_handle error, init failed!"); } return; } @@ -87,7 +87,7 @@ http2_plugin_init(struct tfe_proxy * proxy) struct event * event = event_new(ev_base, -1, EV_PERSIST, http2_plugin_timer_gc_cb, &session_info); if (unlikely(event == NULL)){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Create timer error, init failed!"); + TFE_LOG_ERROR(logger()->handle, "Create timer error, init failed!"); } evtimer_add(event, &timer); diff --git a/plugin/protocol/http2/src/http2_stream.cpp b/plugin/protocol/http2/src/http2_stream.cpp index b848fc2..5a47efd 100644 --- a/plugin/protocol/http2/src/http2_stream.cpp +++ b/plugin/protocol/http2/src/http2_stream.cpp @@ -26,6 +26,18 @@ #include #include +/* +--------------------------------------------------------------------------------------------------------------------------------- +|No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR | +--------------------------------------------------------------------------------------------------------------------------------- +|0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 | +--------------------------------------------------------------------------------------------------------------------------------- +|REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED| +--------------------------------------------------------------------------------------------------------------------------------- +|0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d | +--------------------------------------------------------------------------------------------------------------------------------- +*/ + static const struct value_string method_vals[] = { {NGHTTP2_METHOD_DELETE, "DELETE"}, @@ -73,6 +85,11 @@ static const struct value_string headers_vals[] = {TFE_HTTP_LAST_MODIFIED, "last-modified"}, }; +typedef enum { + NGHTTP2_USER_SEND = 0x0b, + NGHTTP2_USER_COLSE = 0x0c, +} nghttp2_frame_user_type; + struct user_event_dispatch { const struct tfe_stream *tf_stream; @@ -179,30 +196,68 @@ method_to_str_idx(const char * method) return HTTP2_CONTENT_ENCODING_NONE; } -static void -stream_set_id(struct h2_run_id *rid, int32_t stream_id) +static nghttp2_nv* +nghttp2_nv_packet(struct http2_headers *headers, nghttp2_nv *hdrs) { - int i =0; - for (i = 0; i < rid->num; i++){ - if (rid->id[i] == stream_id){ - return; - } + int nvlen = 0; + struct header_data *header = NULL; + + foreach_headers(headers, header){ + hdrs[nvlen].name = header->nv.name; + hdrs[nvlen].namelen = header->nv.namelen; + hdrs[nvlen].value = header->nv.value; + hdrs[nvlen].valuelen = header->nv.valuelen; + hdrs[nvlen].flags = header->nv.flags; + nvlen++; } - rid->id[rid->num] = stream_id; - rid->num++; + return hdrs; } -static void -nghttp2_stream_disable_rid(struct h2_run_id *rid) +static enum tfe_http_std_method +nghttp2_get_method(struct http2_half_private *half_private) +{ + struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec); + + return req_spec->method; +} + +static nghttp2_settings_entry* +nghttp2_iv_packet(nghttp2_settings settings, + nghttp2_settings_entry *out_iv) { int i = 0; - for (i = 0; i < rid->num; i++){ - rid->id[i] = 0; + + nghttp2_settings_entry *iv = settings.iv; + + for (i = 0; i < (int)settings.niv; i++){ + out_iv[i].settings_id = iv[i].settings_id; + out_iv[i].value = iv[i].value; } - rid->num = 0; + return out_iv; +} + +static void +delete_nv_packet_data(struct http2_headers *headers) +{ + struct header_data *header = NULL; + + foreach_headers(headers, header){ + free(header->nv.name); + header->nv.name = NULL; + header->nv.namelen = 0; + + free(header->nv.value); + header->nv.value = NULL; + + header->nv.valuelen = 0; + header->nv.flags = 0; + headers_del(headers, header); + } + headers->nvlen = 0; + headers->flag = 0; + headers->head = headers->tail = NULL; } -/** end */ static int event_dispatch_cb(struct http2_half_private * half_private, enum tfe_http_event ev, const unsigned char * data, size_t len, void * user) { @@ -357,28 +412,6 @@ finish: return xret; } -static void -delete_nv_packet_data(struct http2_headers *headers) -{ - struct header_data *header = NULL; - - foreach_headers(headers, header){ - free(header->nv.name); - header->nv.name = NULL; - header->nv.namelen = 0; - - free(header->nv.value); - header->nv.value = NULL; - - header->nv.valuelen = 0; - header->nv.flags = 0; - headers_del(headers, header); - } - headers->nvlen = 0; - headers->flag = 0; - headers->head = headers->tail = NULL; -} - void delete_stream_half_data(struct http2_half_private **data, int body_flag) { @@ -480,89 +513,6 @@ struct tfe_http_half_ops h2_half_ops = .ops_free = half_ops_free }; -static void -fill_resp_spec_from_handle(struct http2_half_private *half_private) -{ - struct header_data *head = NULL; - struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec); - - foreach_headers(&half_private->headers, head){ - if (!strncmp((char *)(head->nv.name), ":status", strlen(":status"))){ - resp_spec->resp_code = atoi((const char *)head->nv.value); - continue; - } - if (!strncmp((char *)(head->nv.name), "content-type", strlen("content-type"))){ - resp_spec->content_type = (const char *)(head->nv.value); - continue; - } - if (!strncmp((char *)(head->nv.name), "content-encoding", strlen("content-encoding"))){ - resp_spec->content_encoding = (const char *)(head->nv.value); - continue; - } - if (!strncmp((char *)(head->nv.name), "content-length", strlen("content-length"))){ - resp_spec->content_length = (const char *)(head->nv.value); - continue; - } - } - resp_spec->content_length = 0; - - return; -} - -static void -fill_req_spec_from_handle(struct http2_half_private *half_private) -{ - int urllen = 0; - struct header_data *head = NULL; - struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec); - - foreach_headers(&half_private->headers, head){ - if (!strncmp((char *)(head->nv.name), ":method", strlen(":method"))){ - req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(head->nv.value), method_vals); - continue; - } - if (!strncmp((char *)(head->nv.name), ":authority", strlen(":authority"))){ - req_spec->host = (const char *)(head->nv.value); - urllen += head->nv.valuelen; - continue; - } - if (!strncmp((char *)(head->nv.name), ":path", strlen(":path"))){ - req_spec->uri = (const char*)(head->nv.value); - urllen += head->nv.valuelen; - continue; - } - } - char *urltmp = NULL; - urltmp = (char *)malloc(urllen + 1); - if(urltmp){ - sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri); - req_spec->url = urltmp; - } - - return; -} - -static struct http2_half_private* -tfe_half_private_init(enum tfe_http_direction direction) -{ - struct http2_half_private *half_private = ALLOC(struct http2_half_private, 1); - assert(half_private); - - memset(half_private, 0, sizeof(struct http2_half_private)); - - half_private->half_public.direction = direction; - half_private->half_public.ops = &h2_half_ops; - - headers_init(&half_private->headers); - half_private->body.evbuf_body = evbuffer_new(); - half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE; - - half_private->body_state = MANAGE_STAGE_INIT; - half_private->message_state = MANAGE_STAGE_INIT; - - return half_private; -} - static struct tfe_http_session* h2_ops_allow_write(const struct tfe_http_session * session) { @@ -619,6 +569,28 @@ void h2_ops_response_set(struct tfe_http_session * session, struct tfe_http_half stream_data->pangu_resp = half_user; } +static struct http2_half_private* +tfe_half_private_init(enum tfe_http_direction direction) +{ + struct http2_half_private *half_private = ALLOC(struct http2_half_private, 1); + assert(half_private); + + memset(half_private, 0, sizeof(struct http2_half_private)); + + half_private->half_public.direction = direction; + half_private->half_public.ops = &h2_half_ops; + + headers_init(&half_private->headers); + half_private->body.evbuf_body = evbuffer_new(); + half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE; + half_private->body.padlen = 0; + + half_private->body_state = MANAGE_STAGE_INIT; + half_private->message_state = MANAGE_STAGE_INIT; + + return half_private; +} + struct tfe_http_half * h2_ops_request_create(struct tfe_http_session * session, enum tfe_http_std_method method, const char * uri) { @@ -662,769 +634,6 @@ struct tfe_http_session_ops nghttp2_session_ops = .ops_response_create = h2_ops_response_create }; -void -nghttp2_write_access_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info) -{ - /* Request */ - struct http2_half_private *req = h2_stream->req; - /* Response */ - struct http2_half_private *resp = h2_stream->resp; - /* Req-Public */ - struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL; - /* Resp-Public */ - struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL; - - const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-"; - const char * url = req_spec ? req_spec->url : "-"; - char resp_code[TFE_STRING_MAX]; - if (resp_spec) - snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code); - else - snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-"); - - const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; - const char * cont_encoding = - resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-"; - const char * pangu_req = h2_stream->pangu_req ? "USER/REQ" : "-"; - const char * pangu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-"; - //const char * __str_suspend = h2_stream->suspend_counter > 0 ? "SUSPEND" : "-"; - - char *access_log; - asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_stream->tfe_session.session_id, - method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp); - - TFE_LOG_INFO(rt_log_data()->run_log_handle, "%s", access_log); - free(access_log); -} - -static int tfe_half_session_init(struct h2_stream_data_t *h2_stream, int32_t stream_id, - enum tfe_http_direction direction) -{ - struct tfe_http_session *tfe_session = &h2_stream->tfe_session; - - if (direction == TFE_HTTP_REQUEST){ - struct http2_half_private *req = h2_stream->req; - tfe_session->ops = &nghttp2_session_ops; - tfe_session->req = &req->half_public; - tfe_session->session_id = stream_id; - - } - if (direction == TFE_HTTP_RESPONSE){ - struct http2_half_private *resp = h2_stream->resp; - tfe_session->resp = &resp->half_public; - } - return 0; -} - -void delete_http2_stream_data(struct h2_stream_data_t *h2_stream, - const struct tfe_stream *tf_stream, - int body_flag) -{ - if (tf_stream){ - nghttp2_write_access_log(h2_stream, tf_stream->str_stream_info); - } - - delete_stream_half_data(&h2_stream->req, body_flag); - - delete_stream_half_data(&h2_stream->resp, body_flag); -} - -static void -suspend_start(struct h2_stream_data_t *h2_stream, - struct http2_half_private *half, const struct tfe_stream *stream) -{ - if (h2_stream->spd_valid != 1){ - return; - } - - enum tfe_http_event spd_event = h2_stream->spd_event; - - /* Clean up suspend tag, we can support user's call suspend in this callback */ - h2_stream->spd_event = (enum tfe_http_event)0; - h2_stream->spd_valid = 0; - - /* */ - if (h2_stream->rse_set){ - tfe_stream_resume(stream); - } - - /* Call user callback, tell user we resume from suspend */ - h2_stream->rse_set = 0; - half->event_cb(half, spd_event, NULL, 0, half->event_cb_user); - - return; -} - -static int -suspend_stop(struct h2_stream_data_t *h2_stream, - const struct tfe_stream *tf_stream, enum tfe_conn_dir dir) -{ - int xret = -1; - - if (h2_stream->spd_set){ - h2_stream->spd_valid = 1; - h2_stream->spd_set = 0; - tfe_stream_suspend(tf_stream, dir); - xret = 0; - } - return xret; -} - -static ssize_t -nghttp2_client_send(nghttp2_session *session, const uint8_t *data, - size_t length, int flags, void *user_data) -{ - (void)session; - (void)flags; - int ret = -1; - - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - ret = tfe_stream_write(session_info->tf_stream, CONN_DIR_UPSTREAM, data, length); - if (unlikely(ret < 0)){ - assert(0); - } - return (ssize_t)length; -} - -void nghttp2_disect_goaway(struct tfe_session_info_t *session_info, int server) -{ - unsigned int thread_id = session_info->thread_id; - const struct tfe_stream * stream = session_info->tf_stream; - - struct h2_stream_data_t *h2_stream = NULL; - struct h2_stream_data_t *_h2_stream = NULL; - - TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ - TAILQ_REMOVE(&session_info->list, h2_stream, next); - if (h2_stream->frame_ctx){ - http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session, - thread_id); - h2_stream->frame_ctx = NULL; - } - delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); - free(h2_stream); - h2_stream = NULL; - } - if (session_info->as_client && !server){ - nghttp2_session_del(session_info->as_client); - session_info->as_client = NULL; - } - if (session_info->as_server && server){ - nghttp2_session_del(session_info->as_server); - session_info->as_server = NULL; - } -} - -static int -nghttp2_client_on_frame_recv(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) -{ - struct h2_stream_data_t *h2_stream = NULL; - struct http2_half_private *resp = NULL; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - switch (frame->hd.type) { - case NGHTTP2_DATA: - h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (!h2_stream) - break; - - resp = h2_stream->resp; - h2_stream->frame_type |= TFE_NGHTTP2_DATA; - stream_set_id(&session_info->h2_id, frame->hd.stream_id); - break; - case NGHTTP2_HEADERS: - if ((frame->headers.cat == NGHTTP2_HCAT_RESPONSE) && - (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS)){ - h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (!h2_stream){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Upstream id %d, can't find stream information(addr = %p)", - frame->hd.stream_id, session_info); - break; - } - resp = h2_stream->resp; - suspend_start(h2_stream, resp, session_info->tf_stream); - fill_resp_spec_from_handle(h2_stream->resp); - - h2_stream->frame_type |= TFE_NGHTTP2_HEADERS; - stream_set_id(&session_info->h2_id, frame->hd.stream_id); - - resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user); - if (h2_stream->spd_set){ - h2_stream->spd_event = EV_HTTP_RESP_HDR; - } - } - break; - case NGHTTP2_SETTINGS: - break; - case NGHTTP2_WINDOW_UPDATE: - break; - case NGHTTP2_PING: - break; - case NGHTTP2_PRIORITY: - break; - case NGHTTP2_GOAWAY: - nghttp2_disect_goaway(session_info, 1); - TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Up stream control frame goaway"); - break; - } - return 0; -} - -static int -nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, - int32_t stream_id, const uint8_t *input, - size_t input_len, void *__attribute__((__unused__))user_data) -{ - size_t len; - char *uncompr = NULL; - int uncompr_len = 0, __attribute__((__unused__))ret = 0; - const unsigned char *data; - struct http2_half_private * resp = NULL; - - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id); - if (!h2_stream){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "On data callback can't get downstream information, id = %d", - stream_id); - goto finish; - } - h2_stream->frame_type |= TFE_NGHTTP2_DATA; - stream_set_id(&session_info->h2_id, stream_id); - - resp = h2_stream->resp; - if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ - ret = inflate_read(input, input_len, &uncompr, &uncompr_len, - &resp->body.inflater, resp->body.gzip); - if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ - input = (const uint8_t*)uncompr; - input_len = uncompr_len; - } - } - data = input; - len = input_len; - - if (resp->body_state == MANAGE_STAGE_INIT){ - if (resp->event_cb) { - resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len, - resp->event_cb_user); - } - resp->body.flags = flags; - resp->body_state = MANAGE_STAGE_READING; - } - if (resp->body_state == MANAGE_STAGE_READING){ - if (resp->event_cb) { - resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len, - resp->event_cb_user); - } - resp->body.flags = flags; - goto event; - } - if (flags & 0x01){ - if (resp->event_cb) { - resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0, - resp->event_cb_user); - } - if (resp->event_cb) { - resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0, - resp->event_cb_user); - } - - resp->body.flags = flags; - resp->body_state = MANAGE_STAGE_COMPLETE; - resp->message_state = MANAGE_STAGE_COMPLETE; - } -event: -finish: - return 0; -} - -static int -nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *user_data) -{ - (void)error_code; - struct h2_stream_data_t *h2_stream; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - if (error_code != 0) - TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Up stream abnormal exit, id = %d, error_code = %d", - stream_id, error_code); - - h2_stream = TAILQ_LIST_FIND(session_info, stream_id); - if (!h2_stream) { - return 0; - } - - struct http2_half_private *resp = h2_stream->resp; - if (error_code == 0 && resp->body_state != MANAGE_STAGE_COMPLETE){ - if (resp->body_state != MANAGE_STAGE_INIT){ - if (resp->event_cb) { - resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0, - resp->event_cb_user); - } - if (resp->event_cb) { - resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0, - resp->event_cb_user); - } - } - resp->body_state = MANAGE_STAGE_COMPLETE; - resp->message_state = MANAGE_STAGE_COMPLETE; - h2_stream->frame_type |= TFE_NGHTTP2_DATA; - stream_set_id(&session_info->h2_id, stream_id); - }else{ - TAILQ_REMOVE(&session_info->list, h2_stream, next); - delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); - free(h2_stream); - h2_stream = NULL; - } - - return 0; -} - -static int -nghttp2_client_on_header(nghttp2_session *session, - const nghttp2_frame *frame, const uint8_t *name, - size_t namelen, const uint8_t *value, - size_t valuelen, uint8_t flags, void *user_data) -{ - (void)session; - (void)flags; - - struct header_data *head = NULL; - struct http2_headers *headers = NULL; - struct h2_stream_data_t *h2_stream = NULL; - struct http2_half_private *resp = NULL; - enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - switch (frame->hd.type) { - case NGHTTP2_HEADERS: - h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (!h2_stream){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Client Stream id %d, can't find stream information(addr = %p)", - frame->hd.stream_id, session_info); - break; - } - resp = h2_stream->resp; - head = ALLOC(struct header_data, 1); - head->nv.name = (uint8_t *)tfe_strdup((const char *)name); - head->nv.namelen = namelen; - head->nv.value = (uint8_t *)tfe_strdup((const char *)value); - head->nv.valuelen = valuelen; - head->nv.flags = flags; - field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); - if (field_id == -1){ - head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; - head->field.field_name = (const char *)head->nv.name; - }else{ - if (field_id == TFE_HTTP_CONT_ENCODING){ - resp->body.gzip = method_to_str_idx((const char *)value); - } - head->field.field_id = field_id; - head->field.field_name = NULL; - } - headers = &resp->headers; - headers->flag = frame->hd.flags; - headers_add_tail(headers, head); - - break; - case NGHTTP2_PUSH_PROMISE: - if (frame->headers.cat != NGHTTP2_HCAT_REQUEST) - break; - h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (!h2_stream){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Client Stream id %d, can't find stream information(addr = %p)", - frame->hd.stream_id, session_info); - break; - } -#ifdef __DEBUG_TEST - printf("%d, %s %s\n",frame->hd.stream_id, name, value); -#endif - resp = h2_stream->resp; - head = ALLOC(struct header_data, 1); - head->nv.name = (uint8_t *)tfe_strdup((const char *)name); - head->nv.namelen = namelen; - head->nv.value = (uint8_t *)tfe_strdup((const char *)value); - head->nv.valuelen = valuelen; - head->nv.flags = flags; - field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); - if (field_id == -1){ - head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; - }else{ - if (field_id == TFE_HTTP_CONT_ENCODING){ - resp->body.gzip = method_to_str_idx((const char *)value); - } - head->field.field_id = field_id; - head->field.field_name = (const char *)head->nv.name; - } - headers = &resp->headers; - headers->flag = frame->hd.flags; - headers_add_tail(headers, head); - h2_stream->frame_type |= TFE_NGHTTP2_PUSH_PROMISE; - stream_set_id(&session_info->h2_id, frame->hd.stream_id); - default: - break; - } - return 0; -} - -static struct h2_stream_data_t* -create_upstream_data(nghttp2_session *session, int32_t stream_id, - struct tfe_session_info_t *session_info) -{ - struct h2_stream_data_t *h2_stream = NULL; - struct user_event_dispatch *event = NULL; - - h2_stream = TAILQ_LIST_FIND(session_info, stream_id); - if (h2_stream == NULL){ - /** todo:When the data of the reply is pushed as promised, - there is no stream id at the reply end. to create it*/ - goto finish; - } - if (h2_stream->resp){ - goto finish; - } - - h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE); - tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_RESPONSE); - - event = ALLOC(struct user_event_dispatch, 1); - assert(event); - event->thread_id = session_info->thread_id; - event->tf_stream = session_info->tf_stream; - event->tfe_session = &h2_stream->tfe_session; - - half_set_callback(h2_stream->resp, event, NULL); - - h2_stream->resp->frame_ctx = h2_stream->frame_ctx; - - nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); - -finish: - return h2_stream; -} - -static int -nghttp2_client_on_begin_headers(nghttp2_session * session, - const nghttp2_frame * frame, - void * user_data) -{ - (void)session; - - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - switch(frame->hd.type){ - case NGHTTP2_HEADERS: - create_upstream_data(session, frame->hd.stream_id, session_info); - break; - default: - break; - } - return 0; -} - -static -void client_session_init(struct tfe_session_info_t *session_info) -{ - nghttp2_session_callbacks *callbacks; - - nghttp2_session_callbacks_new(&callbacks); - - nghttp2_session_callbacks_set_send_callback(callbacks, - nghttp2_client_send); - - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - nghttp2_client_on_frame_recv); - - nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, - nghttp2_client_on_data_chunk_recv); - - nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, - nghttp2_client_on_stream_close); - - nghttp2_session_callbacks_set_on_header_callback(callbacks, - nghttp2_client_on_header); - - nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, - nghttp2_client_on_begin_headers); - - nghttp2_session_client_new(&session_info->as_client, callbacks, session_info); - - session_info->as_client->local_window_size = NGHTTP2_MAX_WINDOW_SIZE; - session_info->as_client->local_settings.initial_window_size = NGHTTP2_MAX_WINDOW_SIZE; - - nghttp2_session_callbacks_del(callbacks); -} - -static ssize_t -nghttp2_server_send(nghttp2_session *session, const uint8_t *data, - size_t length, int flags, void *user_data) -{ - - (void)session; - (void)flags; - - int ret = -1; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - ret = tfe_stream_write(session_info->tf_stream, CONN_DIR_DOWNSTREAM, data, length); - if (unlikely(ret < 0)){ - assert(0); - } - - return (ssize_t)length; -} - -static int -nghttp2_server_on_frame_recv(nghttp2_session *session, - const nghttp2_frame *frame, void *user_data) -{ - struct h2_stream_data_t *h2_stream = NULL; - struct http2_half_private *req = NULL; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (!h2_stream){ - return 0; - } - switch(frame->hd.type){ - case NGHTTP2_HEADERS: - if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){ - req = h2_stream->req; - - suspend_start(h2_stream, req, session_info->tf_stream); - fill_req_spec_from_handle(h2_stream->req); - - req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user); - if (h2_stream->spd_set){ - h2_stream->spd_event = EV_HTTP_REQ_HDR; - } - } - - case NGHTTP2_DATA: - /* Check that the client request has finished */ - if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - /* For DATA and HEADERS frame, this callback may be called after - on_stream_close_callback. Check that stream still alive. */ - } - break; - case NGHTTP2_SETTINGS: - break; - case NGHTTP2_WINDOW_UPDATE: - break; - case NGHTTP2_PING: - break; - case NGHTTP2_PRIORITY: - break; - case NGHTTP2_GOAWAY: - nghttp2_disect_goaway(session_info, 0); - TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Down stream control frame goaway"); - break; - default: - break; - } - return 0; -} - -/* ---------------------------------------------------------------------------------------------------------------------------------- -|No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR | ---------------------------------------------------------------------------------------------------------------------------------- -|0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 | ---------------------------------------------------------------------------------------------------------------------------------- -|REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED| ---------------------------------------------------------------------------------------------------------------------------------- -|0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d | ---------------------------------------------------------------------------------------------------------------------------------- -*/ -static int -nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id, - uint32_t error_code, void *user_data) -{ - (void)error_code; - - struct h2_stream_data_t *h2_stream = NULL; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - h2_stream = TAILQ_LIST_FIND(session_info, stream_id); - if (!h2_stream) { - return 0; - } - if (error_code != 0) - TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Down stream abnormal exit, id = %d, error_code = %d", - stream_id, error_code); - return 0; -} - -static int -nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, - size_t namelen, const uint8_t *value, - size_t valuelen, uint8_t flags, void *user_data) -{ - enum tfe_http_std_field field_id; - struct header_data *head = NULL; - struct http2_headers *headers = NULL; - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - switch (frame->hd.type){ - case NGHTTP2_HEADERS: - if (frame->headers.cat != NGHTTP2_HCAT_REQUEST) - break; - struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); - if (!h2_stream){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Stream id %d, can't find stream information(addr = %p)", - frame->hd.stream_id, session_info); - break; - } - head = ALLOC(struct header_data, 1); - head->nv.name = (uint8_t *)tfe_strdup((const char *)name); - head->nv.namelen = namelen; - head->nv.value = (uint8_t *)tfe_strdup((const char *)value);; - head->nv.valuelen = valuelen; - head->nv.flags = flags; - field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); - if (field_id == -1){ - head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; - head->field.field_name = (const char *)head->nv.name; - }else{ - head->field.field_id = field_id; - head->field.field_name = NULL; - } - headers = &h2_stream->req->headers; - headers->flag = frame->hd.flags; - headers_add_tail(headers, head); - h2_stream->frame_type |= TFE_NGHTTP2_HEADERS; - stream_set_id(&session_info->h2_id, frame->hd.stream_id); - break; - } - return 0; -} - -static void -create_serv_stream_data(nghttp2_session *session, int32_t stream_id, - struct tfe_session_info_t *session_info) -{ - struct h2_stream_data_t *h2_stream = NULL; - struct user_event_dispatch *event = NULL; - struct http2_half_private *half_private = NULL; - - h2_stream = TAILQ_LIST_FIND(session_info, stream_id); - if (h2_stream != NULL){ - goto finish; - } - - h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1); - assert(h2_stream); - memset(h2_stream, 0, sizeof(struct h2_stream_data_t)); - h2_stream->stream_id = stream_id; - - h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST); - tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST); - - event = ALLOC(struct user_event_dispatch, 1); - assert(event); - event->thread_id = session_info->thread_id; - event->tf_stream = session_info->tf_stream; - event->tfe_session = &h2_stream->tfe_session; - - half_set_callback(h2_stream->req, event, NULL); - - /* Call business plugin */ - half_private = h2_stream->req; - - half_private->frame_ctx = http_frame_alloc(); - if (half_private->frame_ctx == NULL){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed at raising session begin event. "); - goto finish; - } - http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream, - &h2_stream->tfe_session, session_info->thread_id); - h2_stream->frame_ctx = half_private->frame_ctx; - TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next); - nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); -finish: - return; -} - -static int -nghttp2_server_on_begin_headers(nghttp2_session *session, - const nghttp2_frame *frame, - void *user_data) -{ - struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; - - if (frame->hd.type != NGHTTP2_HEADERS || - frame->headers.cat != NGHTTP2_HCAT_REQUEST) { - return 0; - } - create_serv_stream_data(session, frame->hd.stream_id, session_info); - - return 0; -} - -static void -server_session_init(struct tfe_session_info_t *session_info) -{ - nghttp2_session_callbacks *callbacks; - - nghttp2_session_callbacks_new(&callbacks); - - nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send); - - nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, - nghttp2_server_on_frame_recv); - - nghttp2_session_callbacks_set_on_stream_close_callback( - callbacks, nghttp2_server_on_stream_close); - - nghttp2_session_callbacks_set_on_header_callback(callbacks, - nghttp2_server_on_header); - - nghttp2_session_callbacks_set_on_begin_headers_callback( - callbacks, nghttp2_server_on_begin_headers); - - nghttp2_session_server_new(&session_info->as_server, callbacks, session_info); - - session_info->as_server->local_window_size = NGHTTP2_MAX_WINDOW_SIZE; - session_info->as_server->local_settings.initial_window_size = NGHTTP2_MAX_WINDOW_SIZE; - - nghttp2_session_callbacks_del(callbacks); -} - -static void -delete_server_session_data(struct tfe_session_info_t *session_info) -{ - struct h2_stream_data_t *h2_stream; - struct h2_stream_data_t *_h2_stream; - - nghttp2_session_del(session_info->as_server); - session_info->as_server = NULL; - - TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ - TAILQ_REMOVE(&session_info->list, h2_stream, next); - free(h2_stream); - h2_stream = NULL; - } -} - -static void -delete_client_session_data(struct tfe_session_info_t *session_info) -{ - struct h2_stream_data_t *h2_stream = NULL; - struct h2_stream_data_t *_h2_stream; - - nghttp2_session_del(session_info->as_client); - session_info->as_client = NULL; - - TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ - TAILQ_REMOVE(&session_info->list, h2_stream, next); - free(h2_stream); - h2_stream = NULL; - } -} - static ssize_t upstream_read_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, @@ -1459,253 +668,367 @@ finish: } static enum tfe_stream_action -server_frame_submit_data(struct tfe_session_info_t *session_info, struct h2_stream_data_t **h2_stream, - uint16_t *nghttp2_type) +nghttp2_server_frame_submit_response(struct tfe_session_info_t *session_info, + struct h2_stream_data_t *h2_stream) { - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; - struct http2_half_private *resp = (*h2_stream)->resp; - - if ((*h2_stream)->pangu_resp){ - stream_action = ACTION_DROP_DATA; - *nghttp2_type |= TFE_NGHTTP2_RESPONSE; - }else{ - stream_action = ACTION_FORWARD_DATA; - struct data_t *body = &resp->body; - if (body->flags == NGHTTP2_FLAG_END_STREAM && - resp->message_state == MANAGE_STAGE_COMPLETE){ - TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Data stream exit, id = %d", (*h2_stream)->stream_id); - - TAILQ_REMOVE(&session_info->list, *h2_stream, next); - delete_http2_stream_data(*h2_stream, session_info->tf_stream, 1); - free(*h2_stream); - *h2_stream = NULL; - } - } - return stream_action; -} - -static nghttp2_nv* -nghttp2_nv_packet(struct http2_headers *headers, nghttp2_nv *hdrs) -{ - int nvlen = 0; - struct header_data *header = NULL; - - foreach_headers(headers, header){ - hdrs[nvlen].name = header->nv.name; - hdrs[nvlen].namelen = header->nv.namelen; - hdrs[nvlen].value = header->nv.value; - hdrs[nvlen].valuelen = header->nv.valuelen; - hdrs[nvlen].flags = header->nv.flags; - nvlen++; - } - return hdrs; -} - -static void -downstream_create_resp(struct h2_stream_data_t *h2_stream, nghttp2_session *as_client, - const struct tfe_stream *tf_stream, unsigned int thread_id) -{ - struct user_event_dispatch *event = NULL; - - if (h2_stream->resp) - goto finish; - h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE); - tfe_half_session_init(h2_stream, h2_stream->stream_id, TFE_HTTP_RESPONSE); - - event = ALLOC(struct user_event_dispatch, 1); - assert(event); - event->thread_id = thread_id; - event->tf_stream = tf_stream; - event->tfe_session = &h2_stream->tfe_session; - - half_set_callback(h2_stream->resp, event, NULL); - - h2_stream->resp->frame_ctx = h2_stream->frame_ctx; - - nghttp2_session_set_stream_user_data(as_client, h2_stream->stream_id, h2_stream); -finish: - return; -} - -static void -tfe_make_nv(struct http2_headers *headers, - const char *name,const char *value, int flag) -{ - /*Add head*/ - struct header_data *head = NULL; - - head = ALLOC(struct header_data, 1); - head->nv.name = (uint8_t *)tfe_strdup((const char *)name); - head->nv.namelen = strlen(name); - - head->nv.value = (uint8_t *)tfe_strdup((const char *)value);; - head->nv.valuelen = strlen(value); - - headers->flag = 0x04; - if (flag) - headers_add_head(headers, head); - else - headers_add_tail(headers, head); - -} - -static enum tfe_stream_action -tfe_submit_response(struct tfe_session_info_t *session_info, - struct h2_stream_data_t *h2_stream) -{ - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; - struct http2_half_private *resp = h2_stream->pangu_resp; -#define VALUE_LEN 128 - char value[VALUE_LEN] = {0}; - - snprintf(value, VALUE_LEN, "%d", resp->method_or_status); - tfe_make_nv(&resp->headers, ":status", (const char *)value, 1); - - snprintf(value, VALUE_LEN, "tfe/%s", tfe_version()); - tfe_make_nv(&resp->headers, "X-TG-Construct-By", (const char *)value, 0); - - h2_stream->frame_type |= TFE_NGHTTP2_RESPONSE; - stream_set_id(&session_info->h2_id, h2_stream->stream_id); - - stream_action = nghttp2_server_mem_send(session_info); - if (stream_action == ACTION_DROP_DATA){ - stream_action = (enum tfe_stream_action)ACTION_USER_DATA; - } - - return stream_action; -} - -static enum tfe_stream_action -nghttp2_client_frame_submit_header(struct tfe_session_info_t *session_info, - struct h2_stream_data_t *h2_stream) -{ - int32_t stream_id = -1; + int rv = -1; nghttp2_nv hdrs[128] = {0}; - struct http2_headers *headers = NULL; - struct http2_half_private *req = NULL; - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + struct http2_headers *headers = NULL; + struct http2_half_private *pangu_resp = NULL; - if (0 == suspend_stop(h2_stream, session_info->tf_stream, CONN_DIR_DOWNSTREAM)){ - stream_action = ACTION_DEFER_DATA; - goto finish; + pangu_resp = h2_stream->pangu_resp; + + if (pangu_resp->message_state != MANAGE_STAGE_COMPLETE){ + return (enum tfe_stream_action)ACTION_USER_DATA; } - req = h2_stream->pangu_req != NULL ? h2_stream->pangu_req : h2_stream->req; - if (req == NULL){ - stream_action = ACTION_FORWARD_DATA; - goto finish; + headers = &pangu_resp->headers; + if (headers->nvlen <= 0) + return ACTION_FORWARD_DATA; + + struct data_t *body = &pangu_resp->body; + char str_sz_evbuf_body[TFE_STRING_MAX]; + snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body)); + + const static struct http_field_name __cont_encoding_length_name = {TFE_HTTP_CONT_LENGTH, NULL}; + tfe_http_field_write(&pangu_resp->half_public, &__cont_encoding_length_name, str_sz_evbuf_body); + + nghttp2_data_provider data_prd; + data_prd.source.ptr = (void *)body; + data_prd.read_callback = upstream_read_callback; + + rv = nghttp2_submit_response(session_info->as_server, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs), + headers->nvlen, &data_prd); + if (rv != 0){ + return ACTION_FORWARD_DATA; } - - if (h2_stream->pangu_resp){ - stream_action = tfe_submit_response(session_info, h2_stream); - goto finish; - } - - headers = &req->headers; - if (headers->nvlen <= 0){ - stream_action = ACTION_FORWARD_DATA; - goto finish; - } - /*Create C' half_private_resp**/ - downstream_create_resp(h2_stream, session_info->as_client, session_info->tf_stream, session_info->thread_id); - nghttp2_session_set_next_stream_id(session_info->as_client, h2_stream->stream_id); - - stream_id = nghttp2_submit_request(session_info->as_client, NULL, - nghttp2_nv_packet(headers, hdrs), - headers->nvlen, NULL, h2_stream); - if (stream_id < 0){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Could not submit request: %s", - nghttp2_strerror(stream_id)); - stream_action = ACTION_FORWARD_DATA; - goto finish; - } - - /*clear headers data ***/ delete_nv_packet_data(headers); - stream_action = ACTION_DROP_DATA; -finish: - return stream_action; + return ACTION_DROP_DATA; } static enum tfe_stream_action -nghttp2_client_frame_submit_settings() +server_frame_submit_data(struct tfe_session_info_t *session_info, + struct h2_stream_data_t *h2_stream, + enum tfe_conn_dir dir) { - return ACTION_FORWARD_DATA; -} + enum tfe_stream_action stream_action = ACTION_DROP_DATA; -enum tfe_stream_action -nghttp2_client_frame_submit(struct tfe_session_info_t *session_info) -{ - int i = 0; - struct h2_run_id *h2_id = &session_info->h2_id; - struct h2_stream_data_t *nghttp2_stream = NULL; - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + struct http2_half_private *resp = (dir == CONN_DIR_UPSTREAM) ? h2_stream->resp : h2_stream->req; + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; - for(i = 0; i < h2_id->num; i++){ - nghttp2_stream = TAILQ_LIST_FIND(session_info, h2_id->id[i]); - if (NULL == nghttp2_stream) - break; - if (nghttp2_stream->frame_type & TFE_NGHTTP2_HEADERS){ - stream_action = nghttp2_client_frame_submit_header(session_info, nghttp2_stream); - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_HEADERS; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_DATA){ - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_DATA; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_SETTINGS){ - stream_action = nghttp2_client_frame_submit_settings(); - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_SETTINGS; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_PUSH_PROMISE){ - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_PUSH_PROMISE; + if (h2_stream->pangu_resp){ + stream_action = nghttp2_server_frame_submit_response(session_info, h2_stream); + }else{ + int rv = -1; + struct data_t *body = &resp->body; + + nghttp2_data_provider data_prd; + data_prd.source.ptr = (void *)body; + data_prd.read_callback = upstream_read_callback; + + //printf("body->flags = %d\n", body->flags); + + rv = nghttp2_submit_data(session, body->flags, + h2_stream->stream_id, &data_prd); + if (rv != 0){ + stream_action = ACTION_FORWARD_DATA; + printf("Fatal data error: %s\n", nghttp2_strerror(rv)); } } - nghttp2_stream_disable_rid(&session_info->h2_id); - + //printf("submit data %d action = %d\n", h2_stream != NULL ? h2_stream->stream_id : NULL, stream_action); return stream_action; } -enum tfe_stream_action -nghttp2_client_mem_send(struct tfe_session_info_t *session_info) +typedef int (*nghttp2_frame_callback) (struct tfe_session_info_t *, const nghttp2_frame *, + enum tfe_conn_dir dir); + +typedef int (*nghttp2_callback) (nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir); + +static int +nghttp2_submit_frame_priority(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) { int xret = -1; - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; - stream_action = nghttp2_client_frame_submit(session_info); - if (stream_action == ACTION_DROP_DATA){ - xret = nghttp2_session_send(session_info->as_client); - if (xret != 0) { - stream_action = ACTION_FORWARD_DATA; - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Fatal downstream send error: %s\n", - nghttp2_strerror(xret)); - } - goto finish; + const nghttp2_priority *priority = &frame->priority; + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; + + int rv = nghttp2_submit_priority(session, priority->hd.flags, priority->hd.stream_id, + &(priority->pri_spec)); + if (rv != 0){ + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n", + dir, nghttp2_strerror(rv)); + goto finish; + } + xret = nghttp2_session_send(session); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); } - if (stream_action == ACTION_USER_DATA) - stream_action = ACTION_DROP_DATA; - finish: - return stream_action; + TFE_LOG_INFO(logger()->handle, "%s, %d, submit priority, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + session_info->stream_action = stream_action; + return 0; } -static enum tfe_stream_action -nghttp2_server_frame_submit_header(struct h2_stream_data_t *h2_stream, - uint16_t *nghttp2_type) +static int +nghttp2_submit_frame_rst_stream(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) { - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + int xret = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; - if (h2_stream->pangu_resp == NULL){ + const nghttp2_rst_stream *rst_stream = &frame->rst_stream; + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; + + int rv = nghttp2_submit_rst_stream(session, rst_stream->hd.flags, + rst_stream->hd.stream_id, rst_stream->error_code); + if (rv != 0){ + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit rst error: %s\n", + dir, nghttp2_strerror(rv)); + goto finish; + } + xret = nghttp2_session_send(session); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); + } +finish: + session_info->stream_action = stream_action; + TFE_LOG_INFO(logger()->handle, "%s, %d, submit rst stream, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + return 0; +} + +static int +nghttp2_submit_frame_settings(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) +{ + int xret = -1, rv = -1; + nghttp2_settings_entry iv[6] = {0}; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + nghttp2_settings settings = frame->settings; + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; + + if(settings.hd.flags == NGHTTP2_FLAG_ACK){ stream_action = ACTION_FORWARD_DATA; goto finish; } - - if (h2_stream->pangu_resp != NULL){ - *nghttp2_type |= TFE_NGHTTP2_RESPONSE; - stream_action = ACTION_DROP_DATA; + rv = nghttp2_submit_settings(session, settings.hd.flags, + nghttp2_iv_packet(settings, iv), settings.niv); + if (rv != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit settings error: %s\n", + dir, nghttp2_strerror(rv)); goto finish; + } + xret = nghttp2_session_send(session); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); } finish: - return stream_action; + session_info->stream_action = stream_action; + TFE_LOG_INFO(logger()->handle, "%s, %d, submit setting, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + return 0; +} + +static int +nghttp2_submit_frame_ping(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) +{ + int xret = -1 ,rv = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + const nghttp2_ping *ping = &frame->ping; + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; + + //if(ping->hd.flags & NGHTTP2_FLAG_ACK){ + // stream_action = ACTION_FORWARD_DATA; + // goto finish; + //} + rv = nghttp2_submit_ping(session, ping->hd.flags, + ping->opaque_data); + if (rv != 0){ + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n", + dir, nghttp2_strerror(rv)); + goto finish; + } + xret = nghttp2_session_send(session); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); + } +finish: + session_info->stream_action = stream_action; + TFE_LOG_INFO(logger()->handle, "%s, %d, submit ping, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + return 0; +} + +void delete_http2_stream_data(struct h2_stream_data_t *h2_stream, + const struct tfe_stream *tf_stream, + int body_flag) +{ + if (tf_stream){ + //nghttp2_write_access_log(h2_stream, tf_stream->str_stream_info); + } + + delete_stream_half_data(&h2_stream->req, body_flag); + + delete_stream_half_data(&h2_stream->resp, body_flag); +} + +void nghttp2_disect_goaway(struct tfe_session_info_t *session_info) +{ + unsigned int thread_id = session_info->thread_id; + const struct tfe_stream * stream = session_info->tf_stream; + + struct h2_stream_data_t *h2_stream = NULL; + struct h2_stream_data_t *_h2_stream = NULL; + + TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ + TAILQ_REMOVE(&session_info->list, h2_stream, next); + if (h2_stream->frame_ctx){ + http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session, + thread_id); + h2_stream->frame_ctx = NULL; + } + delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); + free(h2_stream); + h2_stream = NULL; + } + if (session_info->as_client){ + nghttp2_session_del(session_info->as_client); + session_info->as_client = NULL; + } + if (session_info->as_server){ + nghttp2_session_del(session_info->as_server); + session_info->as_server = NULL; + } +} + +static int +nghttp2_submit_frame_goaway(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) +{ + int xret = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + char error[1024] = {0}; + + const nghttp2_goaway *goaway = &frame->goaway; + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; + + int rv = nghttp2_submit_goaway(session, goaway->hd.flags, goaway->last_stream_id, + goaway->error_code, goaway->opaque_data, goaway->opaque_data_len); + if (rv != 0){ + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit goaway error: %s\n", + dir, nghttp2_strerror(rv)); + goto finish; + } + xret = nghttp2_session_send(session); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); + } +finish: + snprintf(error, goaway->opaque_data_len, "%s", goaway->opaque_data); + TFE_LOG_INFO(logger()->handle, "%s, %d, submit goaway, stream_id:%d, action:%d, errod_code:%d, data:%s", session_info->tf_stream->str_stream_info, + dir, goaway->last_stream_id, session_info->stream_action, goaway->error_code, goaway->opaque_data); + + session_info->goaway = 1; + session_info->stream_action = stream_action; + return 0; +} + +static int +nghttp2_submit_frame_window_update(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) +{ + int xret = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + const nghttp2_window_update *window_update = &(frame->window_update); + nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; + + int rv = nghttp2_submit_window_update(session, window_update->hd.flags,window_update->hd.stream_id, + window_update->window_size_increment); + if (rv != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit window error: %s\n", + dir, nghttp2_strerror(rv)); + goto finish; + } + xret = nghttp2_session_send(session); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); + } +finish: + session_info->stream_action = stream_action; + TFE_LOG_INFO(logger()->handle, "%s, %d, submit window update, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + return 0; +} + +static int +nghttp2_set_padlen(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) +{ + struct http2_half_private *resp = NULL; + struct h2_stream_data_t *h2_stream = NULL; + + if (dir == CONN_DIR_DOWNSTREAM) + goto finish; + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM){ + h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client, + frame->hd.stream_id); + if (!h2_stream){ + TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)", + frame->hd.stream_id, session_info); + goto finish; + } + resp = h2_stream->resp; + resp->body.padlen = frame->data.padlen; + } + +finish: + return 0; +} + +static int tfe_half_session_init(struct h2_stream_data_t *h2_stream, int32_t stream_id, + enum tfe_http_direction direction) +{ + struct tfe_http_session *tfe_session = &h2_stream->tfe_session; + + if (direction == TFE_HTTP_REQUEST){ + struct http2_half_private *req = h2_stream->req; + tfe_session->ops = &nghttp2_session_ops; + tfe_session->req = &req->half_public; + tfe_session->session_id = stream_id; + + } + if (direction == TFE_HTTP_RESPONSE){ + struct http2_half_private *resp = h2_stream->resp; + tfe_session->resp = &resp->half_public; + } + return 0; } static void @@ -1746,9 +1069,9 @@ finish: return; } -static enum tfe_stream_action __attribute__((__unused__)) +static enum tfe_stream_action nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info, - struct h2_stream_data_t *h2_stream) + struct h2_stream_data_t *h2_stream) { int32_t stream_id = -1; nghttp2_nv hdrs[128] = {0}; @@ -1761,7 +1084,7 @@ nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info if (resp == NULL) goto finish; - headers = &resp->headers; + headers = &resp->promised; if (headers->nvlen <= 0) goto finish; @@ -1783,147 +1106,1228 @@ nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info /*clean header message **/ delete_nv_packet_data(headers); stream_action = ACTION_DROP_DATA; - finish: return stream_action; } -static enum tfe_stream_action -nghttp2_server_frame_submit_settings() +static int +nghttp2_submit_frame_push_promise(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) { - return ACTION_FORWARD_DATA; + int xret = -1; + struct h2_stream_data_t *h2_stream = NULL; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + if (dir == CONN_DIR_DOWNSTREAM) + goto finish; + + h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client, + frame->hd.stream_id); + if (!h2_stream){ + TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)", + frame->hd.stream_id, session_info); + goto finish; + } + + stream_action = nghttp2_server_frame_submit_push_promise(session_info, h2_stream); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_server); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", + dir, nghttp2_strerror(xret)); + } + } + session_info->stream_action = stream_action; +finish: + TFE_LOG_INFO(logger()->handle, "%s, %d, submit push promise, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + return 0; +} + +static void +suspend_start(struct h2_stream_data_t *h2_stream, + struct http2_half_private *half, const struct tfe_stream *stream) +{ + if (h2_stream->spd_valid != 1){ + return; + } + + tfe_stream_resume(stream); + + enum tfe_http_event spd_event = h2_stream->spd_event; + + h2_stream->spd_event = (enum tfe_http_event)0; + h2_stream->spd_valid = 0; + h2_stream->spd_set_cnt--; + h2_stream->spd_cnt++; + + /* Call user callback, tell user we resume from suspend */ + h2_stream->rse_set = 0; + half->event_cb(half, spd_event, NULL, 0, half->event_cb_user); + + return; +} + +static void +fill_resp_spec_from_handle(struct http2_half_private *half_private) +{ + struct header_data *head = NULL; + struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec); + + foreach_headers(&half_private->headers, head){ + if (!strncmp((char *)(head->nv.name), ":status", strlen(":status"))){ + resp_spec->resp_code = atoi((const char *)head->nv.value); + continue; + } + if (!strncmp((char *)(head->nv.name), "content-type", strlen("content-type"))){ + resp_spec->content_type = (const char *)(head->nv.value); + continue; + } + if (!strncmp((char *)(head->nv.name), "content-encoding", strlen("content-encoding"))){ + resp_spec->content_encoding = (const char *)(head->nv.value); + continue; + } + if (!strncmp((char *)(head->nv.name), "content-length", strlen("content-length"))){ + resp_spec->content_length = (const char *)(head->nv.value); + continue; + } + } + resp_spec->content_length = 0; + + return; } static enum tfe_stream_action -nghttp2_server_frame_submit_response(struct tfe_session_info_t *session_info, - struct h2_stream_data_t *h2_stream) +nghttp2_server_frame_submit_header(struct tfe_session_info_t *session_info, + struct h2_stream_data_t *h2_stream) { - int rv = -1; + int32_t stream_id = 0; nghttp2_nv hdrs[128] = {0}; - struct http2_headers *headers = NULL; - struct http2_half_private *pangu_resp = NULL; + struct http2_headers *headers = NULL; + struct http2_half_private *resp = NULL; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; - pangu_resp = h2_stream->pangu_resp; - if (pangu_resp == NULL) - return ACTION_FORWARD_DATA; - - if (pangu_resp->message_state != MANAGE_STAGE_COMPLETE){ - session_info->state = 0; - return ACTION_DROP_DATA; + if (h2_stream->pangu_resp != NULL){ + stream_action = (enum tfe_stream_action)ACTION_USER_DATA; + goto finish; } - headers = &pangu_resp->headers; - if (headers->nvlen <= 0) - return ACTION_FORWARD_DATA; - - struct data_t *body = &pangu_resp->body; - char str_sz_evbuf_body[TFE_STRING_MAX]; - snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body)); - - const static struct http_field_name __cont_encoding_length_name = {TFE_HTTP_CONT_LENGTH, NULL}; - tfe_http_field_write(&pangu_resp->half_public, &__cont_encoding_length_name, str_sz_evbuf_body); - - nghttp2_data_provider data_prd; - data_prd.source.ptr = (void *)body; - data_prd.read_callback = upstream_read_callback; - - rv = nghttp2_submit_response(session_info->as_server, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs), - headers->nvlen, &data_prd); - if (rv != 0){ + resp = h2_stream->resp; + if (resp == NULL){ return ACTION_FORWARD_DATA; } + headers = &resp->headers; + if (headers->nvlen <= 0){ + return ACTION_FORWARD_DATA; + } + stream_id = nghttp2_submit_headers(session_info->as_server, headers->flag, + h2_stream->stream_id, NULL, nghttp2_nv_packet(headers, hdrs), + headers->nvlen, h2_stream); + if (stream_id < 0){ + printf("Fatal headers error: %s\n", nghttp2_strerror(stream_id)); + } delete_nv_packet_data(headers); - - return ACTION_DROP_DATA; -} - -enum tfe_stream_action -nghttp2_server_frame_submit(struct tfe_session_info_t *session_info) -{ - int i = 0; - struct h2_stream_data_t *nghttp2_stream = NULL; - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; - - struct h2_run_id *h2_id = &session_info->h2_id; - - for (i = 0; i < h2_id->num; i++){ - nghttp2_stream = TAILQ_LIST_FIND(session_info, h2_id->id[i]); - if (NULL == nghttp2_stream) - break; - if (nghttp2_stream->frame_type & TFE_NGHTTP2_HEADERS){ - stream_action = nghttp2_server_frame_submit_header(nghttp2_stream, &nghttp2_stream->frame_type); - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_HEADERS; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_DATA){ - stream_action = server_frame_submit_data(session_info, &nghttp2_stream, &nghttp2_stream->frame_type); - if (nghttp2_stream) - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_DATA; - else - continue; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_SETTINGS){ - stream_action = nghttp2_server_frame_submit_settings(); - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_SETTINGS; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_PUSH_PROMISE){ - /*Because a data stream contains multiple stream ids, resulting in data drop, - there is no processing of the committed push frame at present. - Under the condition that the main process is not affected, - the committed push frame is not encapsulated ***/ - //stream_action = nghttp2_server_frame_submit_push_promise(session_info, nghttp2_stream); - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_PUSH_PROMISE; - } - if (nghttp2_stream->frame_type & TFE_NGHTTP2_RESPONSE){ - stream_action = nghttp2_server_frame_submit_response(session_info, nghttp2_stream); - nghttp2_stream->frame_type &= ~TFE_NGHTTP2_RESPONSE; - } - } - nghttp2_stream_disable_rid(&session_info->h2_id); +finish: return stream_action; } -enum tfe_stream_action -nghttp2_server_mem_send(struct tfe_session_info_t *session_info) +int +nghttp2_headers_write_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info, + int dir) +{ + /* Request */ + struct http2_half_private *req = h2_stream->req; + /* Response */ + struct http2_half_private *resp = h2_stream->resp; + /* Req-Public */ + struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL; + /* Resp-Public */ + struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL; + + const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-"; + const char * url = req_spec ? req_spec->url : "-"; + char resp_code[TFE_STRING_MAX]; + if (resp_spec) + snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code); + else + snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-"); + + const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; + const char * cont_encoding = + resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-"; + + const char *hmsg = (dir == CONN_DIR_UPSTREAM) ? "response" : "request"; + + char *access_log; + asprintf(&access_log, "%s %d %s stream_id:%d %s %s HTTP2.0 %s %s %s", str_stream_info, dir, hmsg, h2_stream->tfe_session.session_id, + method, url, resp_code, cont_type, cont_encoding); + + TFE_LOG_INFO(logger()->handle, "%s", access_log); + free(access_log); + return 0; +} + +static int +nghttp2_server_submit_header(struct tfe_session_info_t *session_info, int32_t stream_id) +{ + int xret = -1; + struct http2_half_private *resp = NULL; + struct h2_stream_data_t *h2_stream = NULL; + + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client, + stream_id); + if (!h2_stream){ + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)", + stream_id, session_info); + goto finish; + } + resp = h2_stream->resp; + suspend_start(h2_stream, resp, session_info->tf_stream); + fill_resp_spec_from_handle(h2_stream->resp); + + resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user); + if (h2_stream->spd_set){ + h2_stream->spd_event = EV_HTTP_RESP_HDR; + + h2_stream->spd_valid = 1; + h2_stream->spd_set = 0; + h2_stream->spd_set_cnt++; + h2_stream->spd_cnt++; + tfe_stream_suspend(session_info->tf_stream, CONN_DIR_UPSTREAM); + stream_action = ACTION_DEFER_DATA; + goto finish; + } + nghttp2_headers_write_log(h2_stream,session_info->tf_stream->str_stream_info, CONN_DIR_UPSTREAM); + + stream_action = nghttp2_server_frame_submit_header(session_info, h2_stream); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_server); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); + } + } + if (stream_action == ACTION_USER_DATA) + stream_action = ACTION_DROP_DATA; +finish: + session_info->stream_action = stream_action; + return 0; +} + +static void +fill_req_spec_from_handle(struct http2_half_private *half_private) +{ + int urllen = 0; + struct header_data *head = NULL; + struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec); + + foreach_headers(&half_private->headers, head){ + if (!strncmp((char *)(head->nv.name), ":method", strlen(":method"))){ + req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(head->nv.value), method_vals); + continue; + } + if (!strncmp((char *)(head->nv.name), ":authority", strlen(":authority"))){ + req_spec->host = (const char *)(head->nv.value); + urllen += head->nv.valuelen; + continue; + } + if (!strncmp((char *)(head->nv.name), ":path", strlen(":path"))){ + req_spec->uri = (const char*)(head->nv.value); + urllen += head->nv.valuelen; + continue; + } + } + char *urltmp = NULL; + urltmp = (char *)malloc(urllen + 1); + if(urltmp){ + sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri); + req_spec->url = urltmp; + } + + return; +} + +static int +suspend_stop(struct h2_stream_data_t *h2_stream, + const struct tfe_stream *tf_stream, enum tfe_conn_dir dir) +{ + int xret = -1; + + if (h2_stream->spd_set){ + h2_stream->spd_valid = 1; + h2_stream->spd_set = 0; + tfe_stream_suspend(tf_stream, dir); + xret = 0; + } + return xret; +} + +static void +tfe_make_nv(struct http2_headers *headers, + const char *name,const char *value, int flag) +{ + /*Add head*/ + struct header_data *head = NULL; + + head = ALLOC(struct header_data, 1); + head->nv.name = (uint8_t *)tfe_strdup((const char *)name); + head->nv.namelen = strlen(name); + + head->nv.value = (uint8_t *)tfe_strdup((const char *)value);; + head->nv.valuelen = strlen(value); + + headers->flag = 0x04; + if (flag) + headers_add_head(headers, head); + else + headers_add_tail(headers, head); + +} + +static enum tfe_stream_action +tfe_submit_response(struct tfe_session_info_t *session_info, + struct h2_stream_data_t *h2_stream) { int xret = -1; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + struct http2_half_private *resp = h2_stream->pangu_resp; +#define VALUE_LEN 128 + char value[VALUE_LEN] = {0}; - stream_action = nghttp2_server_frame_submit(session_info); - if (stream_action == ACTION_DROP_DATA - && session_info->state){ - xret = nghttp2_session_send(session_info->as_server); - if (xret != 0) { + snprintf(value, VALUE_LEN, "%d", resp->method_or_status); + tfe_make_nv(&resp->headers, ":status", (const char *)value, 1); + + snprintf(value, VALUE_LEN, "tfe/%s", tfe_version()); + tfe_make_nv(&resp->headers, "X-TG-Construct-By", (const char *)value, 0); + + stream_action = nghttp2_server_frame_submit_response(session_info, h2_stream); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_client); + if (xret != 0) { stream_action = ACTION_FORWARD_DATA; - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Fatal upstream send error: %s\n", - nghttp2_strerror(xret)); + TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n", + nghttp2_strerror(xret)); } } - session_info->state = 1; + if (stream_action == ACTION_DROP_DATA){ + stream_action = (enum tfe_stream_action)ACTION_USER_DATA; + } return stream_action; } +static void +downstream_create_resp(struct h2_stream_data_t *h2_stream, nghttp2_session *as_client, + const struct tfe_stream *tf_stream, unsigned int thread_id) +{ + struct user_event_dispatch *event = NULL; + + if (h2_stream->resp) + goto finish; + h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE); + tfe_half_session_init(h2_stream, h2_stream->stream_id, TFE_HTTP_RESPONSE); + + event = ALLOC(struct user_event_dispatch, 1); + assert(event); + event->thread_id = thread_id; + event->tf_stream = tf_stream; + event->tfe_session = &h2_stream->tfe_session; + + half_set_callback(h2_stream->resp, event, NULL); + + h2_stream->resp->frame_ctx = h2_stream->frame_ctx; + + nghttp2_session_set_stream_user_data(as_client, h2_stream->stream_id, h2_stream); +finish: + return; +} + +static enum tfe_stream_action +nghttp2_client_frame_submit_header(struct tfe_session_info_t *session_info, + struct h2_stream_data_t *h2_stream) +{ + int32_t stream_id = -1; + nghttp2_nv hdrs[128] = {0}; + struct http2_headers *headers = NULL; + struct http2_half_private *req = NULL; + enum tfe_http_std_method method = (enum tfe_http_std_method)NGHTTP2_METHOD_UNKNOWN; + enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + + if (0 == suspend_stop(h2_stream, session_info->tf_stream, CONN_DIR_DOWNSTREAM)){ + stream_action = ACTION_DEFER_DATA; + goto finish; + } + req = h2_stream->pangu_req != NULL ? h2_stream->pangu_req : h2_stream->req; + if (req == NULL){ + stream_action = ACTION_FORWARD_DATA; + goto finish; + } + if (h2_stream->pangu_resp){ + stream_action = tfe_submit_response(session_info, h2_stream); + goto finish; + } + + headers = &req->headers; + if (headers->nvlen <= 0){ + stream_action = ACTION_FORWARD_DATA; + goto finish; + } + /*Create C' half_private_resp**/ + downstream_create_resp(h2_stream, session_info->as_client, session_info->tf_stream, session_info->thread_id); + nghttp2_session_set_next_stream_id(session_info->as_client, h2_stream->stream_id); + + method = nghttp2_get_method(h2_stream->req); + if (method == (enum tfe_http_std_method)NGHTTP2_METHOD_POST || + method == (enum tfe_http_std_method)NGHTTP2_METHOD_PUT){ + stream_id = nghttp2_submit_headers(session_info->as_client, headers->flag, + -1, NULL, nghttp2_nv_packet(headers, hdrs), + headers->nvlen, h2_stream); + }else{ + stream_id = nghttp2_submit_request(session_info->as_client, NULL, + nghttp2_nv_packet(headers, hdrs), + headers->nvlen, NULL, h2_stream); + } + + if (stream_id < 0){ + TFE_LOG_ERROR(logger()->handle, "Could not submit request: %s", + nghttp2_strerror(stream_id)); + stream_action = ACTION_FORWARD_DATA; + goto finish; + } + + /*clear headers data ***/ + delete_nv_packet_data(headers); + + stream_action = ACTION_DROP_DATA; +finish: + return stream_action; +} + +static int +nghttp2_client_submit_header(struct tfe_session_info_t *session_info, int32_t stream_id) +{ + int xret = -1; + struct http2_half_private *req = NULL; + struct h2_stream_data_t *h2_stream = NULL; + + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_server, + stream_id); + if (!h2_stream){ + stream_action = ACTION_FORWARD_DATA; + goto finish; + } + req = h2_stream->req; + suspend_start(h2_stream, req, session_info->tf_stream); + fill_req_spec_from_handle(h2_stream->req); + + req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user); + if (h2_stream->spd_set){ + h2_stream->spd_event = EV_HTTP_REQ_HDR; + + h2_stream->spd_valid = 1; + h2_stream->spd_set = 0; + h2_stream->spd_set_cnt++; + h2_stream->spd_cnt++; + tfe_stream_suspend(session_info->tf_stream, CONN_DIR_DOWNSTREAM); + stream_action = ACTION_DEFER_DATA; + goto finish; + } + nghttp2_headers_write_log(h2_stream, session_info->tf_stream->str_stream_info, CONN_DIR_DOWNSTREAM); + + stream_action = nghttp2_client_frame_submit_header(session_info, h2_stream); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_client); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n", + nghttp2_strerror(xret)); + } + } + if (stream_action == ACTION_USER_DATA) + stream_action = ACTION_DROP_DATA; + +finish: + session_info->stream_action = stream_action; + return 0; +} + +static int +nghttp2_submit_frame_header(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, + enum tfe_conn_dir dir) +{ + int xret = 0; + + if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){ + if (dir == CONN_DIR_UPSTREAM){ + xret = nghttp2_server_submit_header(session_info, frame->hd.stream_id); + TFE_LOG_INFO(logger()->handle, "%s, %d, submit response header, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + } + if (dir == CONN_DIR_DOWNSTREAM){ + xret = nghttp2_client_submit_header(session_info, frame->hd.stream_id); + TFE_LOG_INFO(logger()->handle, "%s, %d, submit request header, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + dir, frame->hd.stream_id, session_info->stream_action); + } + } + return xret; +} + +nghttp2_frame_callback nghttp2_frame_callback_array[] = { + [NGHTTP2_DATA] = nghttp2_set_padlen, + [NGHTTP2_HEADERS] = nghttp2_submit_frame_header, + [NGHTTP2_PRIORITY] = nghttp2_submit_frame_priority, + [NGHTTP2_RST_STREAM] = nghttp2_submit_frame_rst_stream, + [NGHTTP2_SETTINGS] = nghttp2_submit_frame_settings, + [NGHTTP2_PUSH_PROMISE] = nghttp2_submit_frame_push_promise, + [NGHTTP2_PING] = nghttp2_submit_frame_ping, + [NGHTTP2_GOAWAY] = nghttp2_submit_frame_goaway, + [NGHTTP2_WINDOW_UPDATE] = nghttp2_submit_frame_window_update, + [NGHTTP2_CONTINUATION] = NULL, + [NGHTTP2_ALTSVC] = NULL, +}; + +static int +nghttp2_fill_up_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, size_t valuelen, + uint8_t flags, void *user_data, enum tfe_conn_dir dir) +{ + enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD; + struct header_data *head = NULL; + struct http2_headers *headers = NULL; + + if (dir == CONN_DIR_DOWNSTREAM && + frame->headers.cat != NGHTTP2_HCAT_REQUEST){ + return 0; + } + struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (!h2_stream){ + TFE_LOG_ERROR(logger()->handle, "Header stream id %d, can't find stream information", + frame->hd.stream_id); + return 0; + } + struct http2_half_private *half = (dir == CONN_DIR_UPSTREAM) ? h2_stream->resp : h2_stream->req; + + head = ALLOC(struct header_data, 1); + head->nv.name = (uint8_t *)tfe_strdup((const char *)name); + head->nv.namelen = namelen; + head->nv.value = (uint8_t *)tfe_strdup((const char *)value);; + head->nv.valuelen = valuelen; + head->nv.flags = flags; + field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); + + if (field_id == -1){ + head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; + head->field.field_name = (const char *)head->nv.name; + }else{ + if (field_id == TFE_HTTP_CONT_ENCODING){ + half->body.gzip = method_to_str_idx((const char *)value); + } + head->field.field_id = field_id; + head->field.field_name = NULL; + } + headers = &half->headers; + headers->flag = frame->hd.flags; + headers_add_tail(headers, head); + return 0; +} + +static int +nghttp2_fill_up_promise(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) +{ + struct header_data *head = NULL; + struct http2_headers *headers = NULL; + struct http2_half_private *resp = NULL; + enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD; + + if (dir == CONN_DIR_DOWNSTREAM) + return 0; + + struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (!h2_stream){ + TFE_LOG_ERROR(logger()->handle, "Promise stream id %d, can't find stream information", + frame->hd.stream_id); + return 0; + } + resp = h2_stream->resp; + head = ALLOC(struct header_data, 1); + head->nv.name = (uint8_t *)tfe_strdup((const char *)name); + head->nv.namelen = namelen; + head->nv.value = (uint8_t *)tfe_strdup((const char *)value); + head->nv.valuelen = valuelen; + head->nv.flags = flags; + field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); + if (field_id == -1){ + head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; + head->field.field_name = (const char *)head->nv.name; + }else{ + if (field_id == TFE_HTTP_CONT_ENCODING){ + resp->body.gzip = method_to_str_idx((const char *)value); + } + head->field.field_id = field_id; + head->field.field_name = NULL; + } + headers = &resp->promised; + headers->flag = frame->hd.flags; + headers_add_tail(headers, head); + return 0; +} + +static int +nghttp2_data_send(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *data, + size_t length, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) +{ + int ret = -1; + + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + ret = tfe_stream_write(session_info->tf_stream, dir, data, length); + if (unlikely(ret < 0)){ + assert(0); + } + return (ssize_t)length; +} + +static void +submit_end_data(struct tfe_session_info_t *session_info, + struct h2_stream_data_t *h2_stream) +{ + int xret = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + + struct http2_half_private *resp = h2_stream->resp; + if (resp->body_state != MANAGE_STAGE_INIT){ + if (resp->event_cb) { + resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0, + resp->event_cb_user); + } + if (resp->event_cb) { + resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0, + resp->event_cb_user); + } + } + struct data_t *body = &resp->body; + body->flags |= NGHTTP2_FLAG_END_STREAM; + resp->body_state = MANAGE_STAGE_COMPLETE; + resp->message_state = MANAGE_STAGE_COMPLETE; + + stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_UPSTREAM); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_server); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); + } + } + TFE_LOG_INFO(logger()->handle, "%s, 1, End of stream submit, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + h2_stream->stream_id, session_info->stream_action); + + if (stream_action == ACTION_USER_DATA) + stream_action = ACTION_DROP_DATA; + session_info->stream_action = stream_action; + return; +} + +static int +nghttp2_on_stream_close(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) +{ + struct h2_stream_data_t *h2_stream = NULL; + struct http2_half_private *resp = NULL; + + int32_t stream_id = frame->hd.stream_id; + uint32_t error_code = frame->goaway.error_code; + + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + h2_stream = TAILQ_LIST_FIND(session_info, stream_id); + if (!h2_stream) + return 0; + if (error_code != 0){ + const char *str = (dir == CONN_DIR_UPSTREAM) ? "Upstream" : "Downstream"; + TFE_LOG_DEBUG(logger()->handle, "%s close, id = %d, error_code = %d", str, + stream_id, error_code); + } + if (dir == CONN_DIR_DOWNSTREAM) + goto finish; + + resp = h2_stream->resp; + if (error_code == 0 && resp->body_state != MANAGE_STAGE_COMPLETE){ + submit_end_data(session_info, h2_stream); + goto end; + } +finish: + TAILQ_REMOVE(&session_info->list, h2_stream, next); + delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); + free(h2_stream); + h2_stream = NULL; +end: + return 0; +} + +nghttp2_callback nghttp2_callback_array[] = { + [NGHTTP2_DATA] = NULL, + [NGHTTP2_HEADERS] = nghttp2_fill_up_header, + [NGHTTP2_PRIORITY] = NULL, + [NGHTTP2_RST_STREAM] = NULL, + [NGHTTP2_SETTINGS] = NULL, + [NGHTTP2_PUSH_PROMISE] = nghttp2_fill_up_promise, + [NGHTTP2_PING] = NULL, + [NGHTTP2_GOAWAY] = NULL, + [NGHTTP2_WINDOW_UPDATE] = NULL, + [NGHTTP2_CONTINUATION] = NULL, + [NGHTTP2_ALTSVC] = NULL, + [NGHTTP2_USER_SEND] = nghttp2_data_send, + [NGHTTP2_USER_COLSE] = nghttp2_on_stream_close +}; + +void +nghttp2_write_access_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info) +{ + /* Request */ + struct http2_half_private *req = h2_stream->req; + /* Response */ + struct http2_half_private *resp = h2_stream->resp; + /* Req-Public */ + struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL; + /* Resp-Public */ + struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL; + + const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-"; + const char * url = req_spec ? req_spec->url : "-"; + char resp_code[TFE_STRING_MAX]; + if (resp_spec) + snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code); + else + snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-"); + + const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; + const char * cont_encoding = + resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-"; + const char * pangu_req = h2_stream->pangu_req ? "USER/REQ" : "-"; + const char * pangu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-"; + //const char * __str_suspend = h2_stream->suspend_counter > 0 ? "SUSPEND" : "-"; + + char *access_log; + asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_stream->tfe_session.session_id, + method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp); + + TFE_LOG_INFO(logger()->handle, "%s", access_log); + free(access_log); +} + +static ssize_t +nghttp2_client_send(nghttp2_session *session, const uint8_t *data, + size_t length, int flags, void *user_data) +{ + if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){ + return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length, + NULL, 0, flags, user_data, CONN_DIR_UPSTREAM); + } + return (ssize_t)length; +} + +static int +nghttp2_client_on_frame_recv(nghttp2_session *session, + const nghttp2_frame *frame, void *user_data) +{ + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){ + return nghttp2_frame_callback_array[frame->hd.type](session_info, frame, CONN_DIR_UPSTREAM); + } + return 0; +} + +static int +nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, + int32_t stream_id, const uint8_t *input, + size_t input_len, void *user_data) +{ + size_t len; + char *uncompr = NULL; + int xret = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + int uncompr_len = 0, __attribute__((__unused__))ret = 0; + const unsigned char *data; + struct http2_half_private * resp = NULL; + + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id); + if (!h2_stream){ + TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d", + stream_id); + goto finish; + } + resp = h2_stream->resp; + evbuffer_add(resp->body.evbuf_body, input, input_len); + + if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ + ret = inflate_read(input, input_len, &uncompr, &uncompr_len, + &resp->body.inflater, resp->body.gzip); + if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ + input = (const uint8_t*)uncompr; + input_len = uncompr_len; + } + } + data = input; + len = input_len; + + if (resp->body_state == MANAGE_STAGE_INIT){ + if (resp->event_cb) { + resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len, + resp->event_cb_user); + } + if (flags == NGHTTP2_FLAG_END_STREAM){ + resp->body.flags = 0; + }else{ + resp->body.flags = flags; + } + resp->body_state = MANAGE_STAGE_READING; + } + if (resp->body_state == MANAGE_STAGE_READING){ + if (resp->event_cb) { + resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len, + resp->event_cb_user); + } + if (flags == NGHTTP2_FLAG_END_STREAM){ + resp->body.flags = 0; + }else{ + resp->body.flags = flags; + } + //goto finish; + } + + stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_UPSTREAM); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_server); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); + } + } + if (stream_action == ACTION_USER_DATA) + stream_action = ACTION_DROP_DATA; + session_info->stream_action = stream_action; +finish: + TFE_LOG_INFO(logger()->handle, "%s, 1, submit data %d, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + (int)input_len, stream_id, session_info->stream_action); + return 0; +} + +static int +nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id, + uint32_t error_code, void *user_data) +{ + nghttp2_frame frame; + memset(&frame, 0, sizeof(frame)); + + frame.hd.stream_id = stream_id; + frame.goaway.error_code = error_code; + if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){ + return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0, + NULL, 0, 0, user_data, CONN_DIR_UPSTREAM); + } + return 0; +} + +static int +nghttp2_client_on_header(nghttp2_session *session, + const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data) +{ + if ( nghttp2_callback_array[frame->hd.type] != NULL){ + return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen, + value, valuelen, flags, user_data, CONN_DIR_UPSTREAM); + } + + return 0; +} + +static struct h2_stream_data_t* +create_upstream_data(nghttp2_session *session, int32_t stream_id, + struct tfe_session_info_t *session_info) +{ + struct h2_stream_data_t *h2_stream = NULL; + struct user_event_dispatch *event = NULL; + + h2_stream = TAILQ_LIST_FIND(session_info, stream_id); + if (h2_stream == NULL){ + /** todo:When the data of the reply is pushed as promised, + there is no stream id at the reply end. to create it*/ + goto finish; + } + if (h2_stream->resp){ + goto finish; + } + + h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE); + tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_RESPONSE); + + event = ALLOC(struct user_event_dispatch, 1); + assert(event); + event->thread_id = session_info->thread_id; + event->tf_stream = session_info->tf_stream; + event->tfe_session = &h2_stream->tfe_session; + + half_set_callback(h2_stream->resp, event, NULL); + + h2_stream->resp->frame_ctx = h2_stream->frame_ctx; + + nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); + +finish: + return h2_stream; +} + +static ssize_t nghttp2_client_select_padding_callback(nghttp2_session *session, + const nghttp2_frame *frame, + size_t max_payloadlen, void *user_data) +{ + struct http2_half_private *resp = NULL; + struct h2_stream_data_t *h2_stream = NULL; + + h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); + if (!h2_stream) + return frame->hd.length; + + resp = h2_stream->resp; + return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->body.padlen)); +} + +static int +nghttp2_client_on_begin_headers(nghttp2_session * session, + const nghttp2_frame * frame, + void * user_data) +{ + (void)session; + + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + switch(frame->hd.type){ + case NGHTTP2_HEADERS: + create_upstream_data(session, frame->hd.stream_id, session_info); + break; + default: + break; + } + return 0; +} + +static +void client_session_init(struct tfe_session_info_t *session_info) +{ + nghttp2_session_callbacks *callbacks; + + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, + nghttp2_client_send); + + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + nghttp2_client_on_frame_recv); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, + nghttp2_client_on_data_chunk_recv); + + nghttp2_session_callbacks_set_select_padding_callback(callbacks, + nghttp2_client_select_padding_callback); + + nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, + nghttp2_client_on_stream_close); + + nghttp2_session_callbacks_set_on_header_callback(callbacks, + nghttp2_client_on_header); + + nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, + nghttp2_client_on_begin_headers); + + nghttp2_session_client_new(&session_info->as_client, callbacks, session_info); + + nghttp2_session_callbacks_del(callbacks); +} + +static ssize_t +nghttp2_server_send(nghttp2_session *session, const uint8_t *data, + size_t length, int flags, void *user_data) +{ + if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){ + return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length, + NULL, 0, flags, user_data, CONN_DIR_DOWNSTREAM); + } + return (ssize_t)length; +} + +static int +nghttp2_server_on_frame_recv(nghttp2_session *session, + const nghttp2_frame *frame, void *user_data) +{ + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){ + return nghttp2_frame_callback_array[frame->hd.type](session_info, frame, CONN_DIR_DOWNSTREAM); + } + + return 0; +} + +static int +nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id, + uint32_t error_code, void *user_data) +{ + nghttp2_frame frame; + memset(&frame, 0, sizeof(frame)); + + frame.hd.stream_id = stream_id; + frame.goaway.error_code = error_code; + if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){ + return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0, + NULL, 0, 0, user_data, CONN_DIR_DOWNSTREAM); + } + return 0; +} + +static int +nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, + size_t namelen, const uint8_t *value, + size_t valuelen, uint8_t flags, void *user_data) +{ + if ( nghttp2_callback_array[frame->hd.type] != NULL){ + return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen, value, + valuelen, flags, user_data,CONN_DIR_DOWNSTREAM); + } + return 0; +} + +static void +create_serv_stream_data(nghttp2_session *session, int32_t stream_id, + struct tfe_session_info_t *session_info) +{ + struct h2_stream_data_t *h2_stream = NULL; + struct user_event_dispatch *event = NULL; + struct http2_half_private *half_private = NULL; + + h2_stream = TAILQ_LIST_FIND(session_info, stream_id); + if (h2_stream != NULL){ + goto finish; + } + + h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1); + assert(h2_stream); + memset(h2_stream, 0, sizeof(struct h2_stream_data_t)); + h2_stream->stream_id = stream_id; + + h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST); + tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST); + + event = ALLOC(struct user_event_dispatch, 1); + assert(event); + event->thread_id = session_info->thread_id; + event->tf_stream = session_info->tf_stream; + event->tfe_session = &h2_stream->tfe_session; + + half_set_callback(h2_stream->req, event, NULL); + + /* Call business plugin */ + half_private = h2_stream->req; + + half_private->frame_ctx = http_frame_alloc(); + if (half_private->frame_ctx == NULL){ + TFE_LOG_ERROR(logger()->handle, "Failed at raising session begin event. "); + goto finish; + } + http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream, + &h2_stream->tfe_session, session_info->thread_id); + h2_stream->frame_ctx = half_private->frame_ctx; + TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next); + nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); +finish: + return; +} + +static int +nghttp2_server_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, + int32_t stream_id, const uint8_t *input, + size_t input_len, void *user_data) +{ + size_t __attribute__((__unused__))len; + char *uncompr = NULL; + int xret = -1; + enum tfe_stream_action stream_action = ACTION_DROP_DATA; + int uncompr_len = 0, __attribute__((__unused__))ret = 0; + const unsigned char __attribute__((__unused__))*data; + struct http2_half_private * req = NULL; + + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id); + if (!h2_stream){ + TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d", + stream_id); + goto finish; + } + req = h2_stream->req; + evbuffer_add(req->body.evbuf_body, input, input_len); + + if (req->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ + ret = inflate_read(input, input_len, &uncompr, &uncompr_len, + &req->body.inflater, req->body.gzip); + if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ + input = (const uint8_t*)uncompr; + input_len = uncompr_len; + } + } + data = input; + len = input_len; + + stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_DOWNSTREAM); + if (stream_action == ACTION_DROP_DATA){ + xret = nghttp2_session_send(session_info->as_client); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); + } + } + TFE_LOG_INFO(logger()->handle, "%s, %d, submit data %d, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, + 0, (int)input_len, stream_id, session_info->stream_action); + if (stream_action == ACTION_USER_DATA) + stream_action = ACTION_DROP_DATA; + session_info->stream_action = stream_action; +finish: + return 0; +} + +static int +nghttp2_server_on_begin_headers(nghttp2_session *session, + const nghttp2_frame *frame, + void *user_data) +{ + struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; + + if (frame->hd.type != NGHTTP2_HEADERS || + frame->headers.cat != NGHTTP2_HCAT_REQUEST) { + return 0; + } + create_serv_stream_data(session, frame->hd.stream_id, session_info); + + return 0; +} + +static void +server_session_init(struct tfe_session_info_t *session_info) +{ + nghttp2_session_callbacks *callbacks; + + nghttp2_session_callbacks_new(&callbacks); + + nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send); + + nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, + nghttp2_server_on_frame_recv); + + nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, + nghttp2_server_on_data_chunk_recv); + + nghttp2_session_callbacks_set_on_stream_close_callback( + callbacks, nghttp2_server_on_stream_close); + + nghttp2_session_callbacks_set_on_header_callback(callbacks, + nghttp2_server_on_header); + + nghttp2_session_callbacks_set_on_begin_headers_callback( + callbacks, nghttp2_server_on_begin_headers); + + nghttp2_session_server_new(&session_info->as_server, callbacks, session_info); + + nghttp2_session_callbacks_del(callbacks); +} + +static void +delete_server_session_data(struct tfe_session_info_t *session_info) +{ + struct h2_stream_data_t *h2_stream; + struct h2_stream_data_t *_h2_stream; + + nghttp2_session_del(session_info->as_server); + session_info->as_server = NULL; + + TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ + TAILQ_REMOVE(&session_info->list, h2_stream, next); + free(h2_stream); + h2_stream = NULL; + } +} + +static void +delete_client_session_data(struct tfe_session_info_t *session_info) +{ + struct h2_stream_data_t *h2_stream = NULL; + struct h2_stream_data_t *_h2_stream; + + nghttp2_session_del(session_info->as_client); + session_info->as_client = NULL; + + TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ + TAILQ_REMOVE(&session_info->list, h2_stream, next); + free(h2_stream); + h2_stream = NULL; + } +} + enum tfe_stream_action detect_up_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream, unsigned int thread_id, const unsigned char *data, size_t len) { - int readlen = 0; + int readlen = 0, xret = -1; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + session_info->tf_stream = tfe_stream; + session_info->thread_id = thread_id; + + if (!session_info->as_server) + goto forward; + readlen = nghttp2_session_mem_recv(session_info->as_client, data, len); if (readlen < 0){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed to process server requests. Link message %s", + TFE_LOG_ERROR(logger()->handle, "Failed to process server requests. Link message %s", tfe_stream->str_stream_info); delete_client_session_data(session_info); goto err; } - stream_action = nghttp2_server_mem_send(session_info); + stream_action = session_info->stream_action; + session_info->stream_action = ACTION_DROP_DATA; + //printf("up stream_acion = %d\n", stream_action); + if (stream_action == ACTION_FORWARD_DATA){ + xret = nghttp2_session_send(session_info->as_client); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n", + nghttp2_strerror(xret)); + } + stream_action = ACTION_DROP_DATA; + } + if (session_info->goaway){ + nghttp2_disect_goaway(session_info); + session_info->goaway = 0; + } + if (stream_action == ACTION_DROP_DATA){ + tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); + return ACTION_DROP_DATA; + } + +forward: if (stream_action == ACTION_FORWARD_DATA){ tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len)); return ACTION_FORWARD_DATA; } - if (stream_action == ACTION_DROP_DATA){ - tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); - return ACTION_DROP_DATA; - } err: tfe_stream_detach(tfe_stream); @@ -1935,31 +2339,48 @@ enum tfe_stream_action detect_down_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream, unsigned int thread_id, const unsigned char *data, size_t len) { - int readlen = 0; + int readlen = 0, xret = -1; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; session_info->tf_stream = tfe_stream; session_info->thread_id = thread_id; + if (!session_info->as_server) + goto forward; + readlen = nghttp2_session_mem_recv(session_info->as_server, data, len); if (readlen < 0){ - TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed to process client requests. Link message %s", + TFE_LOG_ERROR(logger()->handle, "Failed to process client requests. Link message %s", tfe_stream->str_stream_info); delete_server_session_data(session_info); goto err; } - stream_action = nghttp2_client_mem_send(session_info); + stream_action = session_info->stream_action; + session_info->stream_action = ACTION_DROP_DATA; +// printf("down stream_acion = %d\n", stream_action); + if (stream_action == ACTION_FORWARD_DATA){ + xret = nghttp2_session_send(session_info->as_server); + if (xret != 0) { + stream_action = ACTION_FORWARD_DATA; + TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); + } + stream_action = ACTION_DROP_DATA; + } + if (session_info->goaway){ + nghttp2_disect_goaway(session_info); + session_info->goaway = 0; + } + if (stream_action == ACTION_DROP_DATA){ + tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); + return ACTION_DROP_DATA; + } +forward: if (stream_action == ACTION_FORWARD_DATA){ tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len)); return ACTION_FORWARD_DATA; } - if (stream_action == ACTION_DROP_DATA){ - tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); - return ACTION_DROP_DATA; - } err: tfe_stream_detach(tfe_stream); - tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); return ACTION_DROP_DATA; } @@ -1994,8 +2415,6 @@ sess_data_ctx_fini(struct tfe_session_info_t *session_info, struct tfe_session_info_t* tfe_session_info_init() { - nghttp2_session *session = NULL; - nghttp2_inbound_frame *iframe = NULL; struct tfe_session_info_t *session_info = NULL; session_info = ALLOC(struct tfe_session_info_t, 1); @@ -2003,16 +2422,14 @@ struct tfe_session_info_t* tfe_session_info_init() memset(session_info, 0, sizeof(struct tfe_session_info_t)); - session_info->state = 1; + session_info->stream_action = ACTION_DROP_DATA; + + session_info->goaway = 0; server_session_init(session_info); client_session_init(session_info); - session = session_info->as_client; - iframe = &(session->iframe); - iframe->state = NGHTTP2_IB_READ_FIRST_SETTINGS; - nghttp2_bufs_reset(&(session->aob.framebufs)); TAILQ_INIT(&session_info->list); return session_info; diff --git a/plugin/protocol/http2/test/test_http2_stream.cpp b/plugin/protocol/http2/test/test_http2_stream.cpp index 72b9b6d..bdb5c03 100644 --- a/plugin/protocol/http2/test/test_http2_stream.cpp +++ b/plugin/protocol/http2/test/test_http2_stream.cpp @@ -236,9 +236,9 @@ TEST(Http2StreamParser, GetFrameWithHeader_02) EXPECT_GT(readlen, 0); /*Send data message**/ - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; - stream_action = nghttp2_client_mem_send(session_info); - EXPECT_EQ(stream_action, ACTION_DROP_DATA); + //enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + //stream_action = nghttp2_client_mem_send(session_info); + //EXPECT_EQ(stream_action, ACTION_DROP_DATA); } TEST(Http2StreamParser, RespFrameWithHead_01) @@ -261,9 +261,9 @@ TEST(Http2StreamParser, RespFrameWithHead_01) EXPECT_GT(readlen, 0); /*Send data head message**/ - enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; - stream_action = nghttp2_client_mem_send(session_info); - EXPECT_EQ(stream_action, ACTION_DROP_DATA); + //enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; + //stream_action = nghttp2_client_mem_send(session_info); + //EXPECT_EQ(stream_action, ACTION_DROP_DATA); /*Recv response settings*/ readlen = nghttp2_session_mem_recv(session_info->as_client, response_settings, sizeof(response_settings)); diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index 496c74e..8cba599 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -81,7 +81,7 @@ set_property(TARGET http-parser-static PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${ ExternalProject_Add(nghttp2 PREFIX nghttp2 URL ${CMAKE_CURRENT_SOURCE_DIR}/nghttp2-1.24.0.tar.gz - URL_MD5 1bf8209fc10da2d46012b03a158e6693 + URL_MD5 66d78a8b968d78ffa34b219a571a39b6 CONFIGURE_COMMAND ./configure --prefix= --disable-shared BUILD_IN_SOURCE 1) diff --git a/vendor/nghttp2-1.24.0.tar.gz b/vendor/nghttp2-1.24.0.tar.gz index f2eac3e..f94176e 100644 Binary files a/vendor/nghttp2-1.24.0.tar.gz and b/vendor/nghttp2-1.24.0.tar.gz differ