/************************************************************************* > File Name: http2_stream.c > Author: > Mail: > Created Time: ************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* --------------------------------------------------------------------------------------------------------------------------------- |No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR | --------------------------------------------------------------------------------------------------------------------------------- |0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 | --------------------------------------------------------------------------------------------------------------------------------- |REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED| --------------------------------------------------------------------------------------------------------------------------------- |0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d | --------------------------------------------------------------------------------------------------------------------------------- */ static const struct value_string method_vals[] = { {NGHTTP2_METHOD_DELETE, "DELETE"}, {NGHTTP2_METHOD_GET, "GET"}, {NGHTTP2_METHOD_HEAD, "HEAD"}, {NGHTTP2_METHOD_POST, "POST"}, {NGHTTP2_METHOD_PUT, "PUT"}, {NGHTTP2_METHOD_CONNECT, "CONNECT"}, {NGHTTP2_METHOD_OPTIONS, "OPTIONS"}, {NGHTTP2_METHOD_UNKNOWN, "unknown"}, }; static const struct value_string headers_vals[] = { {TFE_HTTP_UNKNOWN_FIELD, "unkown"}, {TFE_HTTP_HOST, ":authority"}, {TFE_HTTP_REFERER, "referer"}, {TFE_HTTP_USER_AGENT, "user-agent"}, {TFE_HTTP_COOKIE, "cookie"}, {TFE_HTTP_SET_COOKIE, "set-cookie"}, {TFE_HTTP_PROXY_AUTHORIZATION, "proxy-authorization"}, {TFE_HTTP_AUTHORIZATION, "authorization"}, {TFE_HTTP_LOCATION, "location"}, {TFE_HTTP_SERVER, "server"}, {TFE_HTTP_ETAG, "etag"}, {TFE_HTTP_DATE, "date"}, {TFE_HTTP_TRAILER, "Trailer"}, {TFE_HTTP_TRANSFER_ENCODING, "transfer-encoding"}, {TFE_HTTP_VIA, "via"}, {TFE_HTTP_PRAGMA, "pragma"}, {TFE_HTTP_CONNECTION, "connection"}, {TFE_HTTP_CONT_ENCODING, "content-encoding"}, {TFE_HTTP_CONT_LANGUAGE, "content-language"}, {TFE_HTTP_CONT_LOCATION, "content-location"}, {TFE_HTTP_CONT_RANGE, "content-range"}, {TFE_HTTP_CONT_LENGTH, "content-length"}, {TFE_HTTP_CONT_TYPE, "content-type"}, {TFE_HTTP_CONT_DISPOSITION, "content-disposition"}, {TFE_HTTP_EXPIRES, "expires"}, {TFE_HTTP_ACCEPT_ENCODING, "accept-encoding"}, {TFE_HTTP_CACHE_CONTROL, "cache-control"}, {TFE_HTTP_IF_MATCH, "if-match"}, {TFE_HTTP_IF_NONE_MATCH, "if-none-match"}, {TFE_HTTP_IF_MODIFIED_SINCE, "if-modified-since"}, {TFE_HTTP_IF_UNMODIFIED_SINCE, "if-unmodified-since"}, {TFE_HTTP_LAST_MODIFIED, "last-modified"}, }; typedef enum { NGHTTP2_USER_SEND = 0x0b, NGHTTP2_USER_COLSE = 0x0c, } nghttp2_frame_user_type; struct user_event_dispatch { const struct tfe_stream *tf_stream; const struct tfe_http_session * tfe_session; unsigned int thread_id; }; /*up stream */ static struct h2_stream_data_t * TAILQ_LIST_FIND(struct tfe_session_info_t *session_info, int32_t stream_id) { struct h2_stream_data_t *stream = NULL, *_next_stream = NULL; TAILQ_FOREACH_SAFE(stream, &session_info->list, next, _next_stream) { if (stream->stream_id == stream_id){ break; } } return stream; } /** begin */ /** headers list, After the test is completed, you can delete it */ #define foreach_headers(list, entry) \ for(entry = ((struct http2_headers*)(list))->head; entry; entry = entry->next) static struct header_data* headers_del(struct http2_headers *list, struct header_data *entry) { if (entry->prev) entry->prev->next = entry->next; else list->head = entry->next; if (entry->next) entry->next->prev = entry->prev; else list->tail = entry->prev; entry->next = entry->prev = NULL; list->nvlen--; return entry; } void headers_add_head(struct http2_headers *list, struct header_data *entry) { if (list->head){ entry->next = list->head; list->head->prev = entry; }else{ list->tail = entry; } list->head = entry; list->nvlen++; } void headers_add_tail(struct http2_headers *list, struct header_data *entry) { entry->next = NULL; entry->prev = list->tail; if (list->tail) list->tail->next = entry; else list->head = entry; list->tail = entry; list->nvlen++; } static inline void headers_init(struct http2_headers *headers) { memset(headers, 0, sizeof(struct http2_headers)); headers->nvlen = 0; headers->flag = 0; headers->head = headers->tail = 0; } static int method_to_str_idx(const char * method) { if (strcasestr(method, "gzip") != NULL) return HTTP2_CONTENT_ENCODING_GZIP; if (strcasestr(method, "x-gzip") != NULL) return HTTP2_CONTENT_ENCODING_X_GZIP; if (strcasestr(method, "deflate") != NULL) return HTTP2_CONTENT_ENCODING_DEFLATE; if (strcasestr(method, "bzip2") != NULL) return HTTP2_CONTENT_ENCODING_BZIP2; if (strcasestr(method, "x-bzip2") != NULL) return HTTP2_CONTENT_ENCODING_X_BZIP2; if (strcasestr(method, "br") != NULL) return HTTP2_CONTENT_ENCODING_BR; return HTTP2_CONTENT_ENCODING_NONE; } static nghttp2_nv* nghttp2_nv_packet(struct http2_headers *headers, nghttp2_nv *hdrs) { int nvlen = 0; struct header_data *header = NULL; foreach_headers(headers, header){ hdrs[nvlen].name = header->nv.name; hdrs[nvlen].namelen = header->nv.namelen; hdrs[nvlen].value = header->nv.value; hdrs[nvlen].valuelen = header->nv.valuelen; hdrs[nvlen].flags = header->nv.flags; nvlen++; } return hdrs; } static enum tfe_http_std_method nghttp2_get_method(struct http2_half_private *half_private) { struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec); return req_spec->method; } static nghttp2_settings_entry* nghttp2_iv_packet(nghttp2_settings settings, nghttp2_settings_entry *out_iv) { int i = 0; nghttp2_settings_entry *iv = settings.iv; for (i = 0; i < (int)settings.niv; i++){ out_iv[i].settings_id = iv[i].settings_id; out_iv[i].value = iv[i].value; } return out_iv; } static void delete_nv_packet_data(struct http2_headers *headers) { struct header_data *header = NULL; foreach_headers(headers, header){ free(header->nv.name); header->nv.name = NULL; header->nv.namelen = 0; free(header->nv.value); header->nv.value = NULL; header->nv.valuelen = 0; header->nv.flags = 0; headers_del(headers, header); } headers->nvlen = 0; headers->flag = 0; headers->head = headers->tail = NULL; } static int event_dispatch_cb(struct http2_half_private * half_private, enum tfe_http_event ev, const unsigned char * data, size_t len, void * user) { struct user_event_dispatch *event = (struct user_event_dispatch *)user; struct http_frame_session_ctx *frame_ctx = half_private->frame_ctx; http_frame_raise_event(frame_ctx, event->tf_stream, (struct tfe_http_session *)event->tfe_session, ev, data, len, event->thread_id); return 0; } void half_set_callback(struct http2_half_private * half_private, void * user, void (* user_deleter)(void *)) { if (half_private->event_cb == NULL) half_private->event_cb = event_dispatch_cb; half_private->event_cb_user = user; half_private->event_cb_user_deleter = user_deleter; } static const char * half_ops_field_read(const struct tfe_http_half * half, const struct http_field_name * field) { struct header_data *header = NULL; const struct http2_half_private *half_private = nghttp2_to_half_private(half); if (unlikely(half_private == NULL)) return NULL; foreach_headers(&half_private->headers, header){ if (http_field_name_compare(&header->field, field) != 0) continue; break; } return header != NULL ? (const char *)header->nv.value : NULL; } static int half_ops_field_write(struct tfe_http_half * half, const struct http_field_name * name, const char * value) { int is_exist = 0; struct header_data *header = NULL; struct http2_half_private *half_private = nghttp2_to_half_private(half); struct http2_headers *headers = &half_private->headers; if (unlikely(half_private == NULL) || unlikely(headers == NULL)) goto finish; foreach_headers(headers, header){ if (header->field.field_id == TFE_HTTP_UNKNOWN_FIELD) continue; if (header->field.field_id == name->field_id){ if (value){ free(header->nv.value); header->nv.value = (uint8_t*)tfe_strdup(value); header->nv.valuelen = strlen(value); header->field.field_name = (const char *)header->nv.name; is_exist = 1; }else{ headers_del(headers, header); } break; } } if (is_exist == 0){ if (value == NULL){ goto finish; } header = ALLOC(struct header_data, 1); memset(header, 0, sizeof(struct header_data)); if (name->field_id == TFE_HTTP_UNKNOWN_FIELD){ header->field.field_id = TFE_HTTP_UNKNOWN_FIELD; header->nv.name = (uint8_t *)tfe_strdup(name->field_name); header->nv.namelen = strlen(name->field_name); header->field.field_name = (const char *)header->nv.name;; }else{ const char *std_name = val_to_str(name->field_id, headers_vals); header->field.field_id = name->field_id; header->field.field_name = NULL; header->nv.name = (uint8_t *)tfe_strdup((const char *)std_name); header->nv.namelen = strlen(std_name); } header->nv.value = (uint8_t *)tfe_strdup((const char *)value);; header->nv.valuelen = strlen(value); headers_add_tail(headers, header); } finish: return 0; } static struct tfe_http_half * half_ops_allow_write(const struct tfe_http_half * half) { return (struct tfe_http_half *) half; } static const char * half_ops_field_iterate(const struct tfe_http_half * half, void ** iter, struct http_field_name * field) { const char *value = NULL; struct header_data **head = (struct header_data **)iter; const struct http2_half_private *half_private = nghttp2_to_half_private(half); if (unlikely(half_private == NULL)) goto finish; if (NULL == *head){ *head = half_private->headers.head; }else{ *head = (*head)->next; } if (NULL == *head) goto finish; field->field_id = (*head)->field.field_id; field->field_name = (*head)->field.field_name; value = (const char *)(*head)->nv.value; finish: return value; } static int half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag) { int xret = -1; struct http2_half_private * resp = nghttp2_to_half_private(half); struct data_t *body = &resp->body; if (buff == NULL && size == 0){ if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){ xret = deflate_write(&body->deflate, NULL, 0, resp->body.evbuf_body, body->gzip, 1); } resp->message_state = MANAGE_STAGE_COMPLETE; goto finish; } if (resp->body.evbuf_body == NULL){ resp->body.evbuf_body = evbuffer_new(); } if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){ xret = deflate_write(&body->deflate, (const uint8_t *)buff, size, resp->body.evbuf_body, body->gzip, 0); }else{ xret = evbuffer_add(resp->body.evbuf_body, buff, size); } finish: return xret; } void delete_stream_half_data(struct http2_half_private **data, int body_flag) { if (*data){ struct data_t *body = &((*data)->body); inflate_finished(&body->inflater); deflate_finished(&body->deflate); if (body->evbuf_body && body_flag){ evbuffer_free(body->evbuf_body); body->evbuf_body = NULL; } if ((*data)->url_storage) FREE(&((*data)->url_storage)); delete_nv_packet_data(&((*data)->headers)); if((*data)->event_cb_user_deleter != NULL) (*data)->event_cb_user_deleter((*data)->event_cb_user); free(*data); *data = NULL; } return; } void half_ops_free(struct tfe_http_half * half) { struct http2_half_private * h2_private = nghttp2_to_half_private(half); delete_stream_half_data(&h2_private, 1); free(h2_private); h2_private = NULL; return; } int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream) { struct http2_half_private * resp = nghttp2_to_half_private(half); struct data_t *body = &resp->body; assert(body->evbuf_body == NULL); if (by_stream) { if (body->inflater){ inflate_finished(&body->inflater); } if (body->deflate){ deflate_finished(&body->deflate); } body->gzip = HTTP2_CONTENT_ENCODING_NONE; resp->message_state = MANAGE_STAGE_READING; resp->by_stream = by_stream; } body->evbuf_body = evbuffer_new(); return 0; } static tfe_stream_action cache_frame_submit_data(int32_t stream_id, nghttp2_session *session); static enum tfe_stream_action cache_frame_submit_header(nghttp2_session *as_server, int32_t stream_id); int h2_half_ops_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data) { int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; struct http2_half_private * resp = nghttp2_to_half_private(half); struct data_t *body = &resp->body; if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){ xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data, resp->body.evbuf_body, body->gzip, 0); }else{ xret = evbuffer_add(resp->body.evbuf_body, data, sz_data); } if (resp->by_stream){ stream_action = cache_frame_submit_header(resp->session, resp->stream_id); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(resp->session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n", nghttp2_strerror(xret)); } } stream_action = cache_frame_submit_data(resp->stream_id, resp->session); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(resp->session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } } return xret; } int h2_half_ops_body_end(struct tfe_http_half * half) { int xret = -1; struct http2_half_private * resp = nghttp2_to_half_private(half); enum tfe_stream_action stream_action = ACTION_DROP_DATA; resp->body_state = MANAGE_STAGE_COMPLETE; resp->message_state = MANAGE_STAGE_COMPLETE; if (resp->by_stream){ resp->body.flags |= NGHTTP2_FLAG_END_STREAM; stream_action = cache_frame_submit_data(resp->stream_id, resp->session); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(resp->session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } } return 0; } struct tfe_http_half_ops h2_half_ops = { .ops_http_field_read = half_ops_field_read, .ops_http_field_write = half_ops_field_write, .ops_http_allow_write = half_ops_allow_write, .ops_http_field_iterate = half_ops_field_iterate, .ops_append_body = half_ops_append_body, .ops_body_begin = h2_half_ops_body_begin, .ops_body_data = h2_half_ops_body_data, .ops_body_end = h2_half_ops_body_end, .ops_free = half_ops_free }; static struct tfe_http_session* h2_ops_allow_write(const struct tfe_http_session * session) { struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session); if ( http_frame_currect_plugin_preempt(stream_data->frame_ctx) == 0){ return (struct tfe_http_session *)session; } return NULL; } void h2_ops_detach(const struct tfe_http_session * session) { struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session); return http_frame_currect_plugin_detach(stream_data->frame_ctx); } void h2_ops_drop(struct tfe_http_session * session) { return; } void h2_ops_suspend(struct tfe_http_session * session) { #ifdef TFE_CACHE struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session); stream_data->cache.spd_set = 1; #endif } void h2_ops_resume(struct tfe_http_session * session) { #ifdef TFE_CACHE struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session); if (stream_data->cache.spd_valid){ tfe_stream_resume(stream_data->tf_stream); stream_data->cache.rse_set = 1; } #endif } void h2_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user) { struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session); struct http2_half_private *half_user = nghttp2_to_half_private(req_user); stream_data->pangu_req = half_user; } void h2_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) { struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session); struct http2_half_private *half_user = nghttp2_to_half_private(resp); stream_data->pangu_resp = half_user; } static struct http2_half_private* tfe_half_private_init(enum tfe_http_direction direction, int32_t stream_id, nghttp2_session *session) { struct http2_half_private *half_private = ALLOC(struct http2_half_private, 1); assert(half_private); memset(half_private, 0, sizeof(struct http2_half_private)); half_private->half_public.direction = direction; half_private->half_public.ops = &h2_half_ops; headers_init(&half_private->headers); half_private->body.evbuf_body = evbuffer_new(); half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE; half_private->body.padlen = 0; half_private->stream_id = stream_id; half_private->session = session; half_private->body_state = MANAGE_STAGE_INIT; half_private->message_state = MANAGE_STAGE_INIT; return half_private; } struct tfe_http_half * h2_ops_request_create(struct tfe_http_session * session, enum tfe_http_std_method method, const char * uri) { struct http2_half_private * req = tfe_half_private_init(TFE_HTTP_REQUEST, 0, NULL); req->method_or_status = method; req->url_storage = tfe_strdup(uri); return &req->half_public; } struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session, int resp_code) { struct h2_stream_data_t *stream = nghttp2_to_stream_data(session); struct http2_half_private * resp = tfe_half_private_init(TFE_HTTP_RESPONSE, stream->stream_id, stream->session); resp->method_or_status = resp_code; if (stream->resp) resp->body.gzip = stream->resp->body.gzip; return &resp->half_public; } void h2_ops_kill(struct tfe_http_session * session) { } struct tfe_http_session_ops nghttp2_session_ops = { .ops_allow_write = h2_ops_allow_write, .ops_detach = h2_ops_detach, .ops_drop = h2_ops_drop, .ops_suspend = h2_ops_suspend, .ops_resume = h2_ops_resume, .ops_kill = h2_ops_kill, .ops_request_set = h2_ops_request_set, .ops_response_set = h2_ops_response_set, .ops_request_create = h2_ops_request_create, .ops_response_create = h2_ops_response_create }; static ssize_t upstream_read_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data) { #define TFE_MAX_PAYLOADLEN 8192 int datalen = 0; struct data_t *body = (struct data_t *)source->ptr; if (!body || NULL == body->evbuf_body){ *data_flags |= NGHTTP2_DATA_FLAG_EOF; return datalen; } size_t inputlen = evbuffer_get_length(body->evbuf_body); unsigned char *input = evbuffer_pullup(body->evbuf_body, -1); if (input == NULL || inputlen == 0){ *data_flags |= NGHTTP2_DATA_FLAG_EOF; goto finish; } if (inputlen > TFE_MAX_PAYLOADLEN){ datalen = TFE_MAX_PAYLOADLEN; }else{ datalen = inputlen; } memcpy(buf, input, datalen); evbuffer_drain(body->evbuf_body, datalen); finish: return datalen; } static enum tfe_stream_action nghttp2_server_frame_submit_response(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int rv = -1; nghttp2_nv hdrs[128] = {0}; struct http2_headers *headers = NULL; struct http2_half_private *pangu_resp = NULL; pangu_resp = h2_stream->pangu_resp; if (pangu_resp->message_state != MANAGE_STAGE_COMPLETE){ return (enum tfe_stream_action)ACTION_USER_DATA; } headers = &pangu_resp->headers; if (headers->nvlen <= 0) return ACTION_FORWARD_DATA; struct data_t *body = &pangu_resp->body; char str_sz_evbuf_body[TFE_STRING_MAX]; snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body)); const static struct http_field_name __cont_encoding_length_name = {TFE_HTTP_CONT_LENGTH, NULL}; tfe_http_field_write(&pangu_resp->half_public, &__cont_encoding_length_name, str_sz_evbuf_body); nghttp2_data_provider data_prd; data_prd.source.ptr = (void *)body; data_prd.read_callback = upstream_read_callback; rv = nghttp2_submit_response(session_info->as_server, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs), headers->nvlen, &data_prd); if (rv != 0){ return ACTION_FORWARD_DATA; } delete_nv_packet_data(headers); return ACTION_DROP_DATA; } static tfe_stream_action cache_frame_submit_data(int32_t stream_id, nghttp2_session *session) { int rv = -1; struct http2_headers *headers = NULL; struct h2_stream_data_t *h2_stream = NULL; struct http2_half_private *pangu_resp = NULL; enum tfe_stream_action stream_action = ACTION_DROP_DATA; h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id); if (!h2_stream){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Cache id %d, Flow is empty", stream_id); return stream_action; } pangu_resp = h2_stream->pangu_resp; headers = &pangu_resp->headers; if (headers->nvlen > 0){ return ACTION_FORWARD_DATA; } struct data_t *body = &pangu_resp->body; nghttp2_data_provider data_prd; data_prd.source.ptr = (void *)body; data_prd.read_callback = upstream_read_callback; rv = nghttp2_submit_data(session, body->flags, stream_id, &data_prd); if (rv != 0){ printf("Fatal data error: %s\n", nghttp2_strerror(rv)); } return stream_action; } static enum tfe_stream_action server_frame_submit_data(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream, enum tfe_conn_dir dir) { enum tfe_stream_action stream_action = ACTION_DROP_DATA; struct http2_half_private *resp = (dir == CONN_DIR_UPSTREAM) ? h2_stream->resp : h2_stream->req; nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; if (h2_stream->pangu_resp){ stream_action = nghttp2_server_frame_submit_response(session_info, h2_stream); }else{ int rv = -1; struct data_t *body = &resp->body; nghttp2_data_provider data_prd; data_prd.source.ptr = (void *)body; data_prd.read_callback = upstream_read_callback; rv = nghttp2_submit_data(session, body->flags, h2_stream->stream_id, &data_prd); if (rv != 0){ stream_action = ACTION_FORWARD_DATA; printf("Fatal data error: %s\n", nghttp2_strerror(rv)); } } return stream_action; } typedef int (*nghttp2_frame_callback) (struct tfe_session_info_t *, const nghttp2_frame *, enum tfe_conn_dir dir); typedef int (*nghttp2_callback) (nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir); static int nghttp2_submit_frame_priority(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; const nghttp2_priority *priority = &frame->priority; nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; int rv = nghttp2_submit_priority(session, priority->hd.flags, priority->hd.stream_id, &(priority->pri_spec)); if (rv != 0){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n", dir, nghttp2_strerror(rv)); goto finish; } xret = nghttp2_session_send(session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } finish: TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit priority, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); session_info->stream_action = stream_action; return 0; } static int nghttp2_submit_frame_rst_stream(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; const nghttp2_rst_stream *rst_stream = &frame->rst_stream; nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; int rv = nghttp2_submit_rst_stream(session, rst_stream->hd.flags, rst_stream->hd.stream_id, rst_stream->error_code); if (rv != 0){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit rst error: %s\n", dir, nghttp2_strerror(rv)); goto finish; } xret = nghttp2_session_send(session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } finish: session_info->stream_action = stream_action; TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit rst stream, stream_id:%d, action:%d, error_code = %d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action, rst_stream->error_code); return 0; } static int nghttp2_submit_frame_settings(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1, rv = -1; nghttp2_settings_entry iv[6] = {0}; enum tfe_stream_action stream_action = ACTION_DROP_DATA; nghttp2_settings settings = frame->settings; nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; nghttp2_session *_session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_client : session_info->as_server; if(settings.hd.flags == NGHTTP2_FLAG_ACK){ xret = nghttp2_session_send(_session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } stream_action = ACTION_DROP_DATA; goto finish; } rv = nghttp2_submit_settings(session, settings.hd.flags, nghttp2_iv_packet(settings, iv), settings.niv); if (rv != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit settings error: %s\n", dir, nghttp2_strerror(rv)); goto finish; } xret = nghttp2_session_send(session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } finish: session_info->stream_action = stream_action; TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit setting, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); return 0; } static int nghttp2_submit_frame_ping(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1 ,rv = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; const nghttp2_ping *ping = &frame->ping; nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; rv = nghttp2_submit_ping(session, ping->hd.flags, ping->opaque_data); if (rv != 0){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n", dir, nghttp2_strerror(rv)); goto finish; } xret = nghttp2_session_send(session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } finish: session_info->stream_action = stream_action; TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit ping, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); return 0; } void nghttp2_write_access_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info) { /* Request */ struct http2_half_private *req = h2_stream->req; /* Response */ struct http2_half_private *resp = h2_stream->resp; /* Req-Public */ struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL; /* Resp-Public */ struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL; const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-"; const char * url = req_spec ? req_spec->url : "-"; char resp_code[TFE_STRING_MAX]; if (resp_spec) snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code); else snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-"); const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; const char * cont_encoding = resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-"; const char * pangu_req = h2_stream->pangu_req ? "USER/REQ" : "-"; const char * pangu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-"; //const char * __str_suspend = h2_stream->suspend_counter > 0 ? "SUSPEND" : "-"; char *access_log; asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_stream->tfe_session.session_id, method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp); TFE_LOG_INFO(logger()->handle, "%s", access_log); free(access_log); } void delete_http2_stream_data(struct h2_stream_data_t *h2_stream, const struct tfe_stream *tf_stream, int body_flag) { delete_stream_half_data(&h2_stream->req, body_flag); delete_stream_half_data(&h2_stream->resp, body_flag); } void nghttp2_disect_goaway(struct tfe_session_info_t *session_info) { unsigned int thread_id = session_info->thread_id; const struct tfe_stream * stream = session_info->tf_stream; struct h2_stream_data_t *h2_stream = NULL; struct h2_stream_data_t *_h2_stream = NULL; TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ TAILQ_REMOVE(&session_info->list, h2_stream, next); if (h2_stream->frame_ctx){ http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session, thread_id); h2_stream->frame_ctx = NULL; } delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); free(h2_stream); h2_stream = NULL; } if (session_info->as_client){ nghttp2_session_del(session_info->as_client); session_info->as_client = NULL; } if (session_info->as_server){ nghttp2_session_del(session_info->as_server); session_info->as_server = NULL; } } static int nghttp2_submit_frame_goaway(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; char error[1024] = {0}; const nghttp2_goaway *goaway = &frame->goaway; nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; int rv = nghttp2_submit_goaway(session, goaway->hd.flags, goaway->last_stream_id, goaway->error_code, goaway->opaque_data, goaway->opaque_data_len); if (rv != 0){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit goaway error: %s\n", dir, nghttp2_strerror(rv)); goto finish; } xret = nghttp2_session_send(session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } finish: snprintf(error, goaway->opaque_data_len, "%s", goaway->opaque_data); TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit goaway, stream_id:%d, action:%d, errod_code:%d, data:%s", session_info->tf_stream->str_stream_info, dir, goaway->last_stream_id, session_info->stream_action, goaway->error_code, goaway->opaque_data); session_info->goaway = 1; session_info->stream_action = stream_action; return 0; } static int nghttp2_submit_frame_window_update(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; const nghttp2_window_update *window_update = &(frame->window_update); nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client; int rv = nghttp2_submit_window_update(session, window_update->hd.flags,window_update->hd.stream_id, window_update->window_size_increment); if (rv != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit window error: %s\n", dir, nghttp2_strerror(rv)); goto finish; } xret = nghttp2_session_send(session); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } finish: session_info->stream_action = stream_action; TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit window update, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); return 0; } static int nghttp2_submit_end_header(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int xret = -1; int32_t stream_id = 0; nghttp2_nv hdrs[128] = {0}; struct http2_headers headers; struct http2_half_private *resp = h2_stream->resp; enum tfe_stream_action stream_action = ACTION_DROP_DATA; headers = resp->headers; if (headers.nvlen <= 0){ goto finish; } if ((headers.flag & NGHTTP2_FLAG_END_STREAM) != 1 ){ goto finish; } stream_id = nghttp2_submit_headers(session_info->as_server, headers.flag, h2_stream->stream_id, NULL, nghttp2_nv_packet(&headers, hdrs), headers.nvlen, h2_stream); if (stream_id < 0){ printf("Fatal headers error: %s\n", nghttp2_strerror(stream_id)); stream_action = ACTION_FORWARD_DATA; } delete_nv_packet_data(&headers); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_server); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } finish: session_info->stream_action = stream_action; return 0; } static int nghttp2_submit_end_data(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; struct http2_half_private *resp = h2_stream->resp; if (resp->body_state == MANAGE_STAGE_INIT){ nghttp2_submit_end_header(session_info, h2_stream); } if (resp->body_state != MANAGE_STAGE_INIT){ if (resp->event_cb) { resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0, resp->event_cb_user); } if (resp->event_cb) { resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0, resp->event_cb_user); } } struct data_t *body = &resp->body; body->flags |= NGHTTP2_FLAG_END_STREAM; resp->body_state = MANAGE_STAGE_COMPLETE; resp->message_state = MANAGE_STAGE_COMPLETE; stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_UPSTREAM); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_server); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } TFE_LOG_DEBUG(logger()->handle, "%s, 1, End of stream submit, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, h2_stream->stream_id, session_info->stream_action); if (stream_action == ACTION_USER_DATA) stream_action = ACTION_DROP_DATA; session_info->stream_action = stream_action; return 1; } static int nghttp2_submit_frame_data(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { struct http2_half_private *resp = NULL; struct h2_stream_data_t *h2_stream = NULL; if (dir == CONN_DIR_DOWNSTREAM){ goto finish; } if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM){ h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client, frame->hd.stream_id); if (!h2_stream){ TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)", frame->hd.stream_id, session_info); goto finish; } resp = h2_stream->resp; resp->body.padlen = frame->data.padlen; if (resp->body_state != MANAGE_STAGE_COMPLETE){ nghttp2_submit_end_data(session_info, h2_stream); } } finish: return 0; } static int tfe_half_session_init(struct h2_stream_data_t *h2_stream, int32_t stream_id, enum tfe_http_direction direction) { struct tfe_http_session *tfe_session = &h2_stream->tfe_session; if (direction == TFE_HTTP_REQUEST){ struct http2_half_private *req = h2_stream->req; tfe_session->ops = &nghttp2_session_ops; tfe_session->req = &req->half_public; tfe_session->session_id = stream_id; } if (direction == TFE_HTTP_RESPONSE){ struct http2_half_private *resp = h2_stream->resp; tfe_session->resp = &resp->half_public; } return 0; } static void upstream_create_req(struct tfe_session_info_t *session_info, nghttp2_session *as_server, struct h2_stream_data_t *h2_stream, int32_t stream_id) { struct user_event_dispatch *event = NULL; struct http2_half_private *half_private = NULL; h2_stream->stream_id = stream_id; h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST, 0, NULL); tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST); event = ALLOC(struct user_event_dispatch, 1); assert(event); event->thread_id = session_info->thread_id; event->tf_stream = session_info->tf_stream; event->tfe_session = &h2_stream->tfe_session; half_set_callback(h2_stream->req, event, NULL); /* Call business plugin */ half_private = h2_stream->req; half_private->frame_ctx = http_frame_alloc(); if (half_private->frame_ctx == NULL){ TFE_STREAM_LOG_ERROR(h2_stream, "Failed at raising session begin event. "); goto finish; } http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream, &h2_stream->tfe_session, session_info->thread_id); h2_stream->frame_ctx = half_private->frame_ctx; TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next); nghttp2_session_set_stream_user_data(as_server, stream_id, h2_stream); finish: return; } static enum tfe_stream_action nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int32_t stream_id = -1; nghttp2_nv hdrs[128] = {0}; struct http2_headers *headers = NULL; struct h2_stream_data_t *_h2_stream = NULL; struct http2_half_private *resp = NULL; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; resp = h2_stream->resp; if (resp == NULL) goto finish; headers = &resp->promised; if (headers->nvlen <= 0) goto finish; /* Create s' half req*/ _h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1); assert(_h2_stream); memset(_h2_stream, 0, sizeof(struct h2_stream_data_t)); stream_id = nghttp2_submit_push_promise(session_info->as_server, headers->flag, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs), headers->nvlen, _h2_stream); if (stream_id < 0){ free(_h2_stream); _h2_stream = NULL; TFE_STREAM_LOG_ERROR(h2_stream, "Failed to submit push promise: %s", nghttp2_strerror(stream_id)); goto finish; } upstream_create_req(session_info, session_info->as_server, _h2_stream, stream_id); /*clean header message **/ delete_nv_packet_data(headers); stream_action = ACTION_DROP_DATA; finish: return stream_action; } static int nghttp2_submit_frame_push_promise(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = -1; struct h2_stream_data_t *h2_stream = NULL; enum tfe_stream_action stream_action = ACTION_DROP_DATA; if (dir == CONN_DIR_DOWNSTREAM) goto finish; h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client, frame->hd.stream_id); if (!h2_stream){ TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)", frame->hd.stream_id, session_info); goto finish; } stream_action = nghttp2_server_frame_submit_push_promise(session_info, h2_stream); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_server); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret)); } } session_info->stream_action = stream_action; finish: TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit push promise, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); return 0; } #ifdef TFE_CACHE static int suspend_start(struct h2_stream_data_t *h2_stream, struct http2_half_private *half, const struct tfe_stream *stream) { if (h2_stream->cache.spd_valid != 1){ return 0; } tfe_stream_resume(stream); enum tfe_http_event spd_event = h2_stream->cache.spd_event; h2_stream->cache.spd_event = (enum tfe_http_event)0; h2_stream->cache.spd_valid = 0; h2_stream->cache.spd_set_cnt--; h2_stream->cache.spd_cnt++; /* Call user callback, tell user we resume from suspend */ h2_stream->cache.rse_set = 0; half->event_cb(half, spd_event, NULL, 0, half->event_cb_user); return 1; } #endif static void fill_resp_spec_from_handle(struct http2_half_private *half_private) { struct header_data *head = NULL; struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec); foreach_headers(&half_private->headers, head){ if (!strncmp((char *)(head->nv.name), ":status", strlen(":status"))){ resp_spec->resp_code = atoi((const char *)head->nv.value); continue; } if (!strncmp((char *)(head->nv.name), "content-type", strlen("content-type"))){ //resp_spec->content_type = tfe_strdup((const char *)(head->nv.value)); resp_spec->content_type = (const char *)(head->nv.value); continue; } if (!strncmp((char *)(head->nv.name), "content-encoding", strlen("content-encoding"))){ //resp_spec->content_encoding = tfe_strdup((const char *)(head->nv.value)); resp_spec->content_encoding = (const char *)(head->nv.value); continue; } if (!strncmp((char *)(head->nv.name), "content-length", strlen("content-length"))){ //resp_spec->content_length = tfe_strdup((const char *)(head->nv.value)); resp_spec->content_length = (const char *)(head->nv.value); continue; } } return; } static void tfe_make_nv(struct http2_headers *headers, const char *name,const char *value, int flag) { /*Add head*/ struct header_data *head = NULL; head = ALLOC(struct header_data, 1); head->nv.name = (uint8_t *)tfe_strdup((const char *)name); head->nv.namelen = strlen(name); head->nv.value = (uint8_t *)tfe_strdup((const char *)value);; head->nv.valuelen = strlen(value); headers->flag = 0x04; if (flag) headers_add_head(headers, head); else headers_add_tail(headers, head); } int nghttp2_headers_write_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info, int dir) { /* Request */ struct http2_half_private *req = h2_stream->req; /* Response */ struct http2_half_private *resp = h2_stream->resp; /* Req-Public */ struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL; /* Resp-Public */ struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL; const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-"; const char * url = req_spec ? req_spec->url : "-"; char resp_code[TFE_STRING_MAX]; if (resp_spec) snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code); else snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-"); const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; const char * cont_encoding = resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-"; const char *hmsg = (dir == CONN_DIR_UPSTREAM) ? "response" : "request"; const char * panggu_req = h2_stream->pangu_req ? "USER/REQ" : "-"; const char * panggu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-"; /* SUSPEND */ const char * suspend = h2_stream->cache.spd_cnt > 0 ? "SUSPEND" : "-"; char *access_log; asprintf(&access_log, "%s %d %s stream_id:%d %s %s HTTP2.0 %s %s %s %s %s %s", str_stream_info, dir, hmsg, h2_stream->tfe_session.session_id, method, url, resp_code, cont_type, cont_encoding, panggu_req, panggu_resp, suspend); TFE_LOG_INFO(logger()->handle, "%s", access_log); free(access_log); return 0; } static enum tfe_stream_action cache_frame_submit_header(nghttp2_session *as_server, int32_t stream_id) { nghttp2_nv hdrs[128] = {0}; struct http2_headers *headers = NULL; struct http2_half_private *pangu_resp = NULL; enum tfe_stream_action stream_action = ACTION_DROP_DATA; struct h2_stream_data_t *h2_stream = NULL; #define VALUE_LEN 128 char value[VALUE_LEN] = {0}; h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(as_server, stream_id); if (!h2_stream){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Cache id %d, Flow is empty", stream_id); return stream_action; } pangu_resp = h2_stream->pangu_resp; if (pangu_resp == NULL){ return ACTION_FORWARD_DATA; } headers = &pangu_resp->headers; if (headers->nvlen <= 0){ return ACTION_FORWARD_DATA; } snprintf(value, VALUE_LEN, "%d", pangu_resp->method_or_status); tfe_make_nv(&pangu_resp->headers, ":status", (const char *)value, 1); snprintf(value, VALUE_LEN, "tfe/%s", tfe_version()); tfe_make_nv(&pangu_resp->headers, "X-TG-Construct-By", (const char *)value, 0); stream_id = nghttp2_submit_headers(as_server, headers->flag, h2_stream->stream_id, NULL, nghttp2_nv_packet(headers, hdrs), headers->nvlen, h2_stream); if (stream_id < 0){ printf("Fatal headers error: %s\n", nghttp2_strerror(stream_id)); } delete_nv_packet_data(headers); nghttp2_headers_write_log(h2_stream, h2_stream->tf_stream->str_stream_info, CONN_DIR_UPSTREAM); return stream_action; } static enum tfe_stream_action nghttp2_server_frame_submit_header(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int32_t stream_id = 0; nghttp2_nv hdrs[128] = {0}; struct http2_headers *headers = NULL; struct http2_half_private *resp = NULL; enum tfe_stream_action stream_action = ACTION_DROP_DATA; if (h2_stream->pangu_resp != NULL){ stream_action = (enum tfe_stream_action)ACTION_USER_DATA; goto finish; } resp = h2_stream->resp; if (resp == NULL){ return ACTION_FORWARD_DATA; } headers = &resp->headers; if (headers->nvlen <= 0){ return ACTION_FORWARD_DATA; } stream_id = nghttp2_submit_headers(session_info->as_server, headers->flag, h2_stream->stream_id, NULL, nghttp2_nv_packet(headers, hdrs), headers->nvlen, h2_stream); if (stream_id < 0){ printf("Fatal headers error: %s\n", nghttp2_strerror(stream_id)); } #ifdef tfe_cache if (headers->flag &NGHTTP2_FLAG_END_STREAM) #endif delete_nv_packet_data(headers); finish: return stream_action; } static int nghttp2_server_submit_header(struct tfe_session_info_t *session_info, int32_t stream_id) { int xret = -1; struct http2_half_private *resp = NULL; struct h2_stream_data_t *h2_stream = NULL; enum tfe_stream_action stream_action = ACTION_DROP_DATA; h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client, stream_id); if (!h2_stream){ stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)", stream_id, session_info); goto finish; } resp = h2_stream->resp; fill_resp_spec_from_handle(h2_stream->resp); resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user); if (h2_stream->cache.spd_set){ h2_stream->cache.spd_event = EV_HTTP_RESP_HDR; h2_stream->cache.spd_valid = 1; h2_stream->cache.spd_set = 0; h2_stream->cache.spd_set_cnt++; h2_stream->cache.spd_cnt++; tfe_stream_suspend(session_info->tf_stream, CONN_DIR_UPSTREAM); session_info->stream_id = stream_id; stream_action = ACTION_DEFER_DATA; goto finish; } nghttp2_headers_write_log(h2_stream,session_info->tf_stream->str_stream_info, CONN_DIR_UPSTREAM); stream_action = nghttp2_server_frame_submit_header(session_info, h2_stream); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_server); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } if (stream_action == ACTION_USER_DATA) stream_action = ACTION_DROP_DATA; finish: session_info->stream_action = stream_action; return 0; } static void fill_req_spec_from_handle(struct http2_half_private *half_private) { int urllen = 0; struct header_data *head = NULL; struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec); foreach_headers(&half_private->headers, head){ if (!strncmp((char *)(head->nv.name), ":method", strlen(":method"))){ req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(head->nv.value), method_vals); continue; } if (!strncmp((char *)(head->nv.name), ":authority", strlen(":authority"))){ //req_spec->host = tfe_strdup((const char *)(head->nv.value)); req_spec->host = (const char *)(head->nv.value); urllen += head->nv.valuelen; continue; } if (!strncmp((char *)(head->nv.name), ":path", strlen(":path"))){ //req_spec->uri = tfe_strdup((const char*)(head->nv.value)); req_spec->uri = (const char*)(head->nv.value); urllen += head->nv.valuelen; continue; } } char *urltmp = NULL; urltmp = (char *)malloc(urllen + 1); if(urltmp){ sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri); req_spec->url = urltmp; } return; } #ifdef TFE_CACHE static int suspend_stop(struct h2_stream_data_t *h2_stream, const struct tfe_stream *tf_stream, enum tfe_conn_dir dir) { int xret = -1; if (h2_stream->cache.spd_set){ h2_stream->cache.spd_valid = 1; h2_stream->cache.spd_set = 0; tfe_stream_suspend(tf_stream, dir); xret = 0; } return xret; } #endif static enum tfe_stream_action tfe_submit_response(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int xret = -1; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; struct http2_half_private *resp = h2_stream->pangu_resp; #define VALUE_LEN 128 char value[VALUE_LEN] = {0}; snprintf(value, VALUE_LEN, "%d", resp->method_or_status); tfe_make_nv(&resp->headers, ":status", (const char *)value, 1); snprintf(value, VALUE_LEN, "tfe/%s", tfe_version()); tfe_make_nv(&resp->headers, "X-TG-Construct-By", (const char *)value, 0); stream_action = nghttp2_server_frame_submit_response(session_info, h2_stream); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_client); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n", nghttp2_strerror(xret)); } } if (stream_action == ACTION_DROP_DATA){ stream_action = (enum tfe_stream_action)ACTION_USER_DATA; } return stream_action; } static void downstream_create_resp(struct h2_stream_data_t *h2_stream, nghttp2_session *as_client, nghttp2_session *as_server, const struct tfe_stream *tf_stream, unsigned int thread_id) { struct user_event_dispatch *event = NULL; if (h2_stream->resp) goto finish; h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE, h2_stream->stream_id, as_server); tfe_half_session_init(h2_stream, h2_stream->stream_id, TFE_HTTP_RESPONSE); event = ALLOC(struct user_event_dispatch, 1); assert(event); event->thread_id = thread_id; event->tf_stream = tf_stream; event->tfe_session = &h2_stream->tfe_session; half_set_callback(h2_stream->resp, event, NULL); h2_stream->resp->frame_ctx = h2_stream->frame_ctx; nghttp2_session_set_stream_user_data(as_client, h2_stream->stream_id, h2_stream); finish: return; } static enum tfe_stream_action nghttp2_client_frame_submit_header(struct tfe_session_info_t *session_info, struct h2_stream_data_t *h2_stream) { int32_t stream_id = -1; nghttp2_nv hdrs[128] = {0}; struct http2_headers *headers = NULL; struct http2_half_private *req = NULL; enum tfe_http_std_method method = (enum tfe_http_std_method)NGHTTP2_METHOD_UNKNOWN; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; req = h2_stream->pangu_req != NULL ? h2_stream->pangu_req : h2_stream->req; if (req == NULL){ stream_action = ACTION_FORWARD_DATA; goto finish; } if (h2_stream->pangu_resp){ stream_action = tfe_submit_response(session_info, h2_stream); goto finish; } headers = &req->headers; if (headers->nvlen <= 0){ stream_action = ACTION_FORWARD_DATA; goto finish; } /*Create C' half_private_resp**/ downstream_create_resp(h2_stream, session_info->as_client, session_info->as_server, session_info->tf_stream, session_info->thread_id); nghttp2_session_set_next_stream_id(session_info->as_client, h2_stream->stream_id); method = nghttp2_get_method(h2_stream->req); if (method == (enum tfe_http_std_method)NGHTTP2_METHOD_POST || method == (enum tfe_http_std_method)NGHTTP2_METHOD_PUT){ stream_id = nghttp2_submit_headers(session_info->as_client, headers->flag, -1, NULL, nghttp2_nv_packet(headers, hdrs), headers->nvlen, h2_stream); }else{ stream_id = nghttp2_submit_request(session_info->as_client, NULL, nghttp2_nv_packet(headers, hdrs), headers->nvlen, NULL, h2_stream); } if (stream_id < 0){ TFE_LOG_ERROR(logger()->handle, "Could not submit request: %s", nghttp2_strerror(stream_id)); stream_action = ACTION_FORWARD_DATA; goto finish; } /*clear headers data ***/ delete_nv_packet_data(headers); stream_action = ACTION_DROP_DATA; finish: return stream_action; } static int nghttp2_client_submit_header(struct tfe_session_info_t *session_info, int32_t stream_id) { int xret = -1; struct http2_half_private *req = NULL; struct h2_stream_data_t *h2_stream = NULL; enum tfe_stream_action stream_action = ACTION_DROP_DATA; h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_server, stream_id); if (!h2_stream){ stream_action = ACTION_FORWARD_DATA; goto finish; } req = h2_stream->req; fill_req_spec_from_handle(h2_stream->req); req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user); if (h2_stream->cache.spd_set){ h2_stream->cache.spd_event = EV_HTTP_REQ_HDR; h2_stream->cache.spd_valid = 1; h2_stream->cache.spd_set = 0; h2_stream->cache.spd_set_cnt++; h2_stream->cache.spd_cnt++; tfe_stream_suspend(session_info->tf_stream, CONN_DIR_DOWNSTREAM); session_info->stream_id = stream_id; stream_action = ACTION_DEFER_DATA; goto finish; } nghttp2_headers_write_log(h2_stream, session_info->tf_stream->str_stream_info, CONN_DIR_DOWNSTREAM); stream_action = nghttp2_client_frame_submit_header(session_info, h2_stream); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_client); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n", nghttp2_strerror(xret)); } } if (stream_action == ACTION_USER_DATA) stream_action = ACTION_DROP_DATA; finish: session_info->stream_action = stream_action; return 0; } static int nghttp2_submit_frame_header(struct tfe_session_info_t *session_info,const nghttp2_frame *frame, enum tfe_conn_dir dir) { int xret = 0; if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){ if (dir == CONN_DIR_UPSTREAM){ xret = nghttp2_server_submit_header(session_info, frame->hd.stream_id); TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit response header, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); } if (dir == CONN_DIR_DOWNSTREAM){ xret = nghttp2_client_submit_header(session_info, frame->hd.stream_id); TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit request header, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, dir, frame->hd.stream_id, session_info->stream_action); } } return xret; } nghttp2_frame_callback nghttp2_frame_callback_array[] = { [NGHTTP2_DATA] = nghttp2_submit_frame_data, [NGHTTP2_HEADERS] = nghttp2_submit_frame_header, [NGHTTP2_PRIORITY] = nghttp2_submit_frame_priority, [NGHTTP2_RST_STREAM] = nghttp2_submit_frame_rst_stream, [NGHTTP2_SETTINGS] = nghttp2_submit_frame_settings, [NGHTTP2_PUSH_PROMISE] = nghttp2_submit_frame_push_promise, [NGHTTP2_PING] = nghttp2_submit_frame_ping, [NGHTTP2_GOAWAY] = nghttp2_submit_frame_goaway, [NGHTTP2_WINDOW_UPDATE] = nghttp2_submit_frame_window_update, [NGHTTP2_CONTINUATION] = NULL, [NGHTTP2_ALTSVC] = NULL, }; static int nghttp2_fill_up_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) { enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD; struct header_data *head = NULL; struct http2_headers *headers = NULL; if (dir == CONN_DIR_DOWNSTREAM && frame->headers.cat != NGHTTP2_HCAT_REQUEST){ return 0; } struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (!h2_stream){ TFE_LOG_ERROR(logger()->handle, "Header stream id %d, can't find stream information", frame->hd.stream_id); return 0; } struct http2_half_private *half = (dir == CONN_DIR_UPSTREAM) ? h2_stream->resp : h2_stream->req; head = ALLOC(struct header_data, 1); head->nv.name = (uint8_t *)tfe_strdup((const char *)name); head->nv.namelen = namelen; head->nv.value = (uint8_t *)tfe_strdup((const char *)value);; head->nv.valuelen = valuelen; head->nv.flags = flags; field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); if (field_id == -1){ head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; head->field.field_name = (const char *)head->nv.name; }else{ if (field_id == TFE_HTTP_CONT_ENCODING){ half->body.gzip = method_to_str_idx((const char *)value); } head->field.field_id = field_id; head->field.field_name = NULL; } headers = &half->headers; headers->flag = frame->hd.flags; headers_add_tail(headers, head); return 0; } static int nghttp2_fill_up_promise(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) { struct header_data *head = NULL; struct http2_headers *headers = NULL; struct http2_half_private *resp = NULL; enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD; if (dir == CONN_DIR_DOWNSTREAM) return 0; struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (!h2_stream){ TFE_LOG_ERROR(logger()->handle, "Promise stream id %d, can't find stream information", frame->hd.stream_id); return 0; } resp = h2_stream->resp; head = ALLOC(struct header_data, 1); head->nv.name = (uint8_t *)tfe_strdup((const char *)name); head->nv.namelen = namelen; head->nv.value = (uint8_t *)tfe_strdup((const char *)value); head->nv.valuelen = valuelen; head->nv.flags = flags; field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals); if (field_id == -1){ head->field.field_id = TFE_HTTP_UNKNOWN_FIELD; head->field.field_name = (const char *)head->nv.name; }else{ if (field_id == TFE_HTTP_CONT_ENCODING){ resp->body.gzip = method_to_str_idx((const char *)value); } head->field.field_id = field_id; head->field.field_name = NULL; } headers = &resp->promised; headers->flag = frame->hd.flags; headers_add_tail(headers, head); return 0; } static int nghttp2_data_send(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *data, size_t length, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) { int ret = -1; struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; ret = tfe_stream_write(session_info->tf_stream, dir, data, length); if (unlikely(ret < 0)){ assert(0); } return (ssize_t)length; } static int nghttp2_on_stream_close(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir) { struct h2_stream_data_t *h2_stream = NULL; struct http2_half_private *resp = NULL; int32_t stream_id = frame->hd.stream_id; uint32_t error_code = frame->goaway.error_code; struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; h2_stream = TAILQ_LIST_FIND(session_info, stream_id); if (!h2_stream) return 0; if (error_code != 0){ const char *str = (dir == CONN_DIR_UPSTREAM) ? "Upstream" : "Downstream"; TFE_LOG_DEBUG(logger()->handle, "%s close, id = %d, error_code = %d", str, stream_id, error_code); } if (dir == CONN_DIR_DOWNSTREAM) goto finish; resp = h2_stream->resp; if (error_code == 0 && resp->body_state != MANAGE_STAGE_COMPLETE){ if (resp->body_state == MANAGE_STAGE_INIT && session_info->stream_action != ACTION_DEFER_DATA) nghttp2_submit_end_header(session_info, h2_stream); goto end; } finish: TAILQ_REMOVE(&session_info->list, h2_stream, next); delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); free(h2_stream); h2_stream = NULL; end: return 0; } nghttp2_callback nghttp2_callback_array[] = { [NGHTTP2_DATA] = NULL, [NGHTTP2_HEADERS] = nghttp2_fill_up_header, [NGHTTP2_PRIORITY] = NULL, [NGHTTP2_RST_STREAM] = NULL, [NGHTTP2_SETTINGS] = NULL, [NGHTTP2_PUSH_PROMISE] = nghttp2_fill_up_promise, [NGHTTP2_PING] = NULL, [NGHTTP2_GOAWAY] = NULL, [NGHTTP2_WINDOW_UPDATE] = NULL, [NGHTTP2_CONTINUATION] = NULL, [NGHTTP2_ALTSVC] = NULL, [NGHTTP2_USER_SEND] = nghttp2_data_send, [NGHTTP2_USER_COLSE] = nghttp2_on_stream_close }; static ssize_t nghttp2_client_send(nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) { if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){ return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length, NULL, 0, flags, user_data, CONN_DIR_UPSTREAM); } return (ssize_t)length; } static int nghttp2_client_on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){ return nghttp2_frame_callback_array[frame->hd.type](session_info, frame, CONN_DIR_UPSTREAM); } return 0; } static int nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *input, size_t input_len, void *user_data) { size_t len; char *uncompr = NULL; int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; int uncompr_len = 0, __attribute__((__unused__))ret = 0; const unsigned char *data; struct http2_half_private * resp = NULL; struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id); if (!h2_stream){ TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d", stream_id); goto finish; } resp = h2_stream->resp; evbuffer_add(resp->body.evbuf_body, input, input_len); if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ ret = inflate_read(input, input_len, &uncompr, &uncompr_len, &resp->body.inflater, resp->body.gzip); if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ input = (const uint8_t*)uncompr; input_len = uncompr_len; } } data = input; len = input_len; if (resp->body_state == MANAGE_STAGE_INIT){ if (resp->event_cb) { resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len, resp->event_cb_user); } if (flags == NGHTTP2_FLAG_END_STREAM){ resp->body.flags = 0; }else{ resp->body.flags = flags; } resp->body_state = MANAGE_STAGE_READING; } if (resp->body_state == MANAGE_STAGE_READING){ if (resp->event_cb) { resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len, resp->event_cb_user); } if (flags == NGHTTP2_FLAG_END_STREAM){ resp->body.flags = 0; }else{ resp->body.flags = flags; } } stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_UPSTREAM); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_server); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } TFE_LOG_DEBUG(logger()->handle, "%s, 1, submit data %d, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, (int)input_len, stream_id, stream_action); if (stream_action == ACTION_USER_DATA) stream_action = ACTION_DROP_DATA; session_info->stream_action = stream_action; finish: return 0; } static int nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { nghttp2_frame frame; memset(&frame, 0, sizeof(frame)); frame.hd.stream_id = stream_id; frame.goaway.error_code = error_code; if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){ return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0, NULL, 0, 0, user_data, CONN_DIR_UPSTREAM); } return 0; } static int nghttp2_client_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) { if ( nghttp2_callback_array[frame->hd.type] != NULL){ return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen, value, valuelen, flags, user_data, CONN_DIR_UPSTREAM); } return 0; } static struct h2_stream_data_t* create_upstream_data(nghttp2_session *session, int32_t stream_id, struct tfe_session_info_t *session_info) { struct h2_stream_data_t *h2_stream = NULL; struct user_event_dispatch *event = NULL; h2_stream = TAILQ_LIST_FIND(session_info, stream_id); if (h2_stream == NULL){ /** todo:When the data of the reply is pushed as promised, there is no stream id at the reply end. to create it*/ goto finish; } if (h2_stream->resp){ goto finish; } h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE, stream_id, session_info->as_server); tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_RESPONSE); event = ALLOC(struct user_event_dispatch, 1); assert(event); event->thread_id = session_info->thread_id; event->tf_stream = session_info->tf_stream; event->tfe_session = &h2_stream->tfe_session; half_set_callback(h2_stream->resp, event, NULL); h2_stream->resp->frame_ctx = h2_stream->frame_ctx; nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); finish: return h2_stream; } static ssize_t nghttp2_client_select_padding_callback(nghttp2_session *session, const nghttp2_frame *frame, size_t max_payloadlen, void *user_data) { struct http2_half_private *resp = NULL; struct h2_stream_data_t *h2_stream = NULL; h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id); if (!h2_stream) return frame->hd.length; resp = h2_stream->resp; if (!resp) return frame->hd.length; return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->body.padlen)); } static int nghttp2_client_on_begin_headers(nghttp2_session * session, const nghttp2_frame * frame, void * user_data) { (void)session; struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; switch(frame->hd.type){ case NGHTTP2_HEADERS: create_upstream_data(session, frame->hd.stream_id, session_info); break; default: break; } return 0; } static void client_session_init(struct tfe_session_info_t *session_info) { nghttp2_session_callbacks *callbacks; nghttp2_session_callbacks_new(&callbacks); nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_client_send); nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, nghttp2_client_on_frame_recv); nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, nghttp2_client_on_data_chunk_recv); nghttp2_session_callbacks_set_select_padding_callback(callbacks, nghttp2_client_select_padding_callback); nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, nghttp2_client_on_stream_close); nghttp2_session_callbacks_set_on_header_callback(callbacks, nghttp2_client_on_header); nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, nghttp2_client_on_begin_headers); nghttp2_session_client_new(&session_info->as_client, callbacks, session_info); nghttp2_session_callbacks_del(callbacks); } static ssize_t nghttp2_server_send(nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) { if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){ return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length, NULL, 0, flags, user_data, CONN_DIR_DOWNSTREAM); } return (ssize_t)length; } static int nghttp2_server_on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){ return nghttp2_frame_callback_array[frame->hd.type](session_info, frame, CONN_DIR_DOWNSTREAM); } return 0; } static int nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) { nghttp2_frame frame; memset(&frame, 0, sizeof(frame)); frame.hd.stream_id = stream_id; frame.goaway.error_code = error_code; if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){ return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0, NULL, 0, 0, user_data, CONN_DIR_DOWNSTREAM); } return 0; } static int nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) { if ( nghttp2_callback_array[frame->hd.type] != NULL){ return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen, value, valuelen, flags, user_data,CONN_DIR_DOWNSTREAM); } return 0; } static void create_serv_stream_data(nghttp2_session *session, int32_t stream_id, struct tfe_session_info_t *session_info) { struct h2_stream_data_t *h2_stream = NULL; struct user_event_dispatch *event = NULL; struct http2_half_private *half_private = NULL; h2_stream = TAILQ_LIST_FIND(session_info, stream_id); if (h2_stream != NULL){ nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); goto finish; } h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1); assert(h2_stream); memset(h2_stream, 0, sizeof(struct h2_stream_data_t)); h2_stream->stream_id = stream_id; h2_stream->session = session_info->as_server; h2_stream->tf_stream = session_info->tf_stream; h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST, 0, NULL); tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST); event = ALLOC(struct user_event_dispatch, 1); assert(event); event->thread_id = session_info->thread_id; event->tf_stream = session_info->tf_stream; event->tfe_session = &h2_stream->tfe_session; half_set_callback(h2_stream->req, event, NULL); /* Call business plugin */ half_private = h2_stream->req; half_private->frame_ctx = http_frame_alloc(); if (half_private->frame_ctx == NULL){ TFE_LOG_ERROR(logger()->handle, "Failed at raising session begin event. "); goto finish; } http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream, &h2_stream->tfe_session, session_info->thread_id); h2_stream->frame_ctx = half_private->frame_ctx; TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next); nghttp2_session_set_stream_user_data(session, stream_id, h2_stream); finish: return; } static int nghttp2_server_on_data_chunk_recv(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *input, size_t input_len, void *user_data) { size_t __attribute__((__unused__))len; char *uncompr = NULL; int xret = -1; enum tfe_stream_action stream_action = ACTION_DROP_DATA; int uncompr_len = 0, __attribute__((__unused__))ret = 0; const unsigned char __attribute__((__unused__))*data; struct http2_half_private * req = NULL; struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id); if (!h2_stream){ TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d", stream_id); goto finish; } req = h2_stream->req; req->body.flags = flags; evbuffer_add(req->body.evbuf_body, input, input_len); if (req->body.gzip != HTTP2_CONTENT_ENCODING_NONE){ ret = inflate_read(input, input_len, &uncompr, &uncompr_len, &req->body.inflater, req->body.gzip); if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){ input = (const uint8_t*)uncompr; input_len = uncompr_len; } } data = input; len = input_len; stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_DOWNSTREAM); if (stream_action == ACTION_DROP_DATA){ xret = nghttp2_session_send(session_info->as_client); if (xret != 0) { stream_action = ACTION_FORWARD_DATA; TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret)); } } TFE_LOG_INFO(logger()->handle, "%s, %d, submit data %d, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info, 0, (int)input_len, stream_id, session_info->stream_action); if (stream_action == ACTION_USER_DATA) stream_action = ACTION_DROP_DATA; session_info->stream_action = stream_action; finish: return 0; } static int nghttp2_server_on_begin_headers(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) { struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data; if (frame->hd.type != NGHTTP2_HEADERS || frame->headers.cat != NGHTTP2_HCAT_REQUEST) { return 0; } create_serv_stream_data(session, frame->hd.stream_id, session_info); return 0; } static void server_session_init(struct tfe_session_info_t *session_info) { nghttp2_session_callbacks *callbacks; nghttp2_session_callbacks_new(&callbacks); nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send); nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, nghttp2_server_on_frame_recv); nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, nghttp2_server_on_data_chunk_recv); nghttp2_session_callbacks_set_on_stream_close_callback( callbacks, nghttp2_server_on_stream_close); nghttp2_session_callbacks_set_on_header_callback(callbacks, nghttp2_server_on_header); nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks, nghttp2_server_on_begin_headers); nghttp2_session_server_new(&session_info->as_server, callbacks, session_info); nghttp2_session_callbacks_del(callbacks); } static void delete_server_session_data(struct tfe_session_info_t *session_info) { struct h2_stream_data_t *h2_stream; struct h2_stream_data_t *_h2_stream; nghttp2_session_del(session_info->as_server); session_info->as_server = NULL; TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ TAILQ_REMOVE(&session_info->list, h2_stream, next); free(h2_stream); h2_stream = NULL; } } static void delete_client_session_data(struct tfe_session_info_t *session_info) { struct h2_stream_data_t *h2_stream = NULL; struct h2_stream_data_t *_h2_stream; nghttp2_session_del(session_info->as_client); session_info->as_client = NULL; TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ TAILQ_REMOVE(&session_info->list, h2_stream, next); free(h2_stream); h2_stream = NULL; } } enum tfe_stream_action detect_up_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream, unsigned int thread_id, const unsigned char *data, size_t len) { int readlen = 0; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; session_info->tf_stream = tfe_stream; session_info->thread_id = thread_id; if (!session_info->as_server) goto forward; readlen = nghttp2_session_mem_recv(session_info->as_client, data, len); if (readlen < 0){ TFE_LOG_ERROR(logger()->handle, "Failed to process server requests. Link message %s", tfe_stream->str_stream_info); delete_client_session_data(session_info); goto forward; } stream_action = session_info->stream_action; session_info->stream_action = ACTION_DROP_DATA; if (session_info->goaway){ nghttp2_disect_goaway(session_info); session_info->goaway = 0; } if (stream_action == ACTION_DROP_DATA){ tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); return ACTION_DROP_DATA; } forward: if (stream_action == ACTION_FORWARD_DATA){ tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len)); return ACTION_FORWARD_DATA; } return stream_action; } enum tfe_stream_action detect_down_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream, unsigned int thread_id, const unsigned char *data, size_t len) { int readlen = 0; enum tfe_stream_action stream_action = ACTION_FORWARD_DATA; session_info->tf_stream = tfe_stream; session_info->thread_id = thread_id; if (!session_info->as_server) goto forward; readlen = nghttp2_session_mem_recv(session_info->as_server, data, len); if (readlen < 0){ TFE_LOG_ERROR(logger()->handle, "Failed to process client requests. Link message %s", tfe_stream->str_stream_info); delete_server_session_data(session_info); goto forward; } stream_action = session_info->stream_action; session_info->stream_action = ACTION_DROP_DATA; if (session_info->goaway){ nghttp2_disect_goaway(session_info); session_info->goaway = 0; } if (stream_action == ACTION_DROP_DATA){ tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len)); return ACTION_DROP_DATA; } forward: if (stream_action == ACTION_FORWARD_DATA){ tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len)); return ACTION_FORWARD_DATA; } return stream_action; } void sess_data_ctx_fini(struct tfe_session_info_t *session_info, const struct tfe_stream * stream, unsigned int thread_id) { struct h2_stream_data_t *h2_stream = NULL; struct h2_stream_data_t *_h2_stream = NULL; TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){ TAILQ_REMOVE(&session_info->list, h2_stream, next); if (h2_stream->frame_ctx){ http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session, thread_id); h2_stream->frame_ctx = NULL; } delete_http2_stream_data(h2_stream, session_info->tf_stream, 1); free(h2_stream); h2_stream = NULL; } if (session_info->as_client){ nghttp2_session_del(session_info->as_client); session_info->as_client = NULL; } if (session_info->as_server){ nghttp2_session_del(session_info->as_server); session_info->as_server = NULL; } } struct tfe_session_info_t* tfe_session_info_init() { struct tfe_session_info_t *session_info = NULL; session_info = ALLOC(struct tfe_session_info_t, 1); assert(session_info); memset(session_info, 0, sizeof(struct tfe_session_info_t)); session_info->stream_action = ACTION_DROP_DATA; session_info->goaway = 0; server_session_init(session_info); client_session_init(session_info); TAILQ_INIT(&session_info->list); return session_info; }