This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/protocol/http2/src/http2_stream.cpp

2021 lines
61 KiB
C++
Raw Normal View History

/*************************************************************************
> File Name: http2_stream.c
> Author:
> Mail:
> Created Time:
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <stdint.h>
#include <math.h>
#include <sys/param.h>
#include <zlib.h>
#include <event2/buffer.h>
#include <tfe_utils.h>
#include <tfe_stream.h>
#include <nghttp2/nghttp2.h>
#include <http2_stream.h>
#include <http2_common.h>
#include <nghttp2_session.h>
#include <nghttp2_map.h>
static const struct value_string method_vals[] =
{
{NGHTTP2_METHOD_DELETE, "DELETE"},
{NGHTTP2_METHOD_GET, "GET"},
{NGHTTP2_METHOD_HEAD, "HEAD"},
{NGHTTP2_METHOD_POST, "POST"},
{NGHTTP2_METHOD_PUT, "PUT"},
{NGHTTP2_METHOD_CONNECT, "CONNECT"},
{NGHTTP2_METHOD_UNKNOWN, "unknown"},
};
static const struct value_string headers_vals[] =
{
{TFE_HTTP_UNKNOWN_FIELD, "unkown"},
{TFE_HTTP_HOST, ":authority"},
{TFE_HTTP_REFERER, "referer"},
{TFE_HTTP_USER_AGENT, "user-agent"},
{TFE_HTTP_COOKIE, "cookie"},
{TFE_HTTP_SET_COOKIE, "set-cookie"},
{TFE_HTTP_PROXY_AUTHORIZATION, "proxy-authorization"},
{TFE_HTTP_AUTHORIZATION, "authorization"},
{TFE_HTTP_LOCATION, "location"},
{TFE_HTTP_SERVER, "server"},
{TFE_HTTP_ETAG, "etag"},
{TFE_HTTP_DATE, "date"},
{TFE_HTTP_TRAILER, "Trailer"},
{TFE_HTTP_TRANSFER_ENCODING, "transfer-encoding"},
{TFE_HTTP_VIA, "via"},
{TFE_HTTP_PRAGMA, "Pragma"},
{TFE_HTTP_CONNECTION, "connection"},
{TFE_HTTP_CONT_ENCODING, "content-encoding"},
{TFE_HTTP_CONT_LANGUAGE, "content-language"},
{TFE_HTTP_CONT_LOCATION, "content-location"},
{TFE_HTTP_CONT_RANGE, "content-range"},
{TFE_HTTP_CONT_LENGTH, "content-length"},
{TFE_HTTP_CONT_TYPE, "content-type"},
{TFE_HTTP_CONT_DISPOSITION, "content-disposition"},
{TFE_HTTP_EXPIRES, "expires"},
{TFE_HTTP_ACCEPT_ENCODING, "accept-encoding"},
{TFE_HTTP_CACHE_CONTROL, "cache-control"},
{TFE_HTTP_IF_MATCH, "if-match"},
{TFE_HTTP_IF_NONE_MATCH, "if-none-match"},
{TFE_HTTP_IF_MODIFIED_SINCE, "if-modified-since"},
{TFE_HTTP_IF_UNMODIFIED_SINCE, "if-unmodified-since"},
{TFE_HTTP_LAST_MODIFIED, "last-modified"},
};
struct user_event_dispatch
{
const struct tfe_stream *tf_stream;
const struct tfe_http_session * tfe_session;
unsigned int thread_id;
};
/*up stream */
static struct h2_stream_data_t *
TAILQ_LIST_FIND(struct tfe_session_info_t *session_info, int32_t stream_id)
{
struct h2_stream_data_t *stream = NULL, *_next_stream = NULL;
TAILQ_FOREACH_SAFE(stream, &session_info->list, next, _next_stream)
{
if (stream->stream_id == stream_id){
break;
}
}
return stream;
}
/** begin */
/** headers list, After the test is completed, you can delete it */
#define foreach_headers(list, entry) \
for(entry = ((struct http2_headers*)(list))->head; entry; entry = entry->next)
static struct header_data*
headers_del(struct http2_headers *list, struct header_data *entry)
{
if (entry->prev)
entry->prev->next = entry->next;
else
list->head = entry->next;
if (entry->next)
entry->next->prev = entry->prev;
else
list->tail = entry->prev;
entry->next = entry->prev = NULL;
list->nvlen--;
return entry;
}
void
headers_add_head(struct http2_headers *list, struct header_data *entry)
{
if (list->head){
entry->next = list->head;
list->head->prev = entry;
}else{
list->tail = entry;
}
list->head = entry;
list->nvlen++;
}
void
headers_add_tail(struct http2_headers *list, struct header_data *entry)
{
entry->next = NULL;
entry->prev = list->tail;
if (list->tail)
list->tail->next = entry;
else
list->head = entry;
list->tail = entry;
list->nvlen++;
}
static inline void
headers_init(struct http2_headers *headers)
{
memset(headers, 0, sizeof(struct http2_headers));
headers->nvlen = 0;
headers->flag = 0;
headers->head = headers->tail = 0;
}
static int
method_to_str_idx(const char * method)
{
if (strcasestr(method, "gzip") != NULL)
return HTTP2_CONTENT_ENCODING_GZIP;
if (strcasestr(method, "x-gzip") != NULL)
return HTTP2_CONTENT_ENCODING_X_GZIP;
if (strcasestr(method, "deflate") != NULL)
return HTTP2_CONTENT_ENCODING_DEFLATE;
if (strcasestr(method, "bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_BZIP2;
if (strcasestr(method, "x-bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_X_BZIP2;
if (strcasestr(method, "br") != NULL)
return HTTP2_CONTENT_ENCODING_BR;
return HTTP2_CONTENT_ENCODING_NONE;
}
static void
stream_set_id(struct h2_run_id *rid, int32_t stream_id)
{
int i =0;
for (i = 0; i < rid->num; i++){
if (rid->id[i] == stream_id){
return;
}
}
rid->id[rid->num] = stream_id;
rid->num++;
}
static void
nghttp2_stream_disable_rid(struct h2_run_id *rid)
{
int i = 0;
for (i = 0; i < rid->num; i++){
rid->id[i] = 0;
}
rid->num = 0;
}
/** end */
static int event_dispatch_cb(struct http2_half_private * half_private,
enum tfe_http_event ev, const unsigned char * data, size_t len, void * user)
{
struct user_event_dispatch *event = (struct user_event_dispatch *)user;
struct http_frame_session_ctx *frame_ctx = half_private->frame_ctx;
http_frame_raise_event(frame_ctx, event->tf_stream, (struct tfe_http_session *)event->tfe_session,
ev, data, len, event->thread_id);
return 0;
}
void half_set_callback(struct http2_half_private * half_private,
void * user, void (* user_deleter)(void *))
{
if (half_private->event_cb == NULL)
half_private->event_cb = event_dispatch_cb;
half_private->event_cb_user = user;
half_private->event_cb_user_deleter = user_deleter;
}
static const char *
half_ops_field_read(const struct tfe_http_half * half, const struct http_field_name * field)
{
const char *value = NULL;
struct header_data *header = NULL;
const struct http2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
goto finish;
foreach_headers(&half_private->headers, header){
if (header->field.field_id == field->field_id){
break;
}
}
if (header == NULL) goto finish;
value = (const char *)header->nv.value;
finish:
return value;
}
static int
half_ops_field_write(struct tfe_http_half * half, const struct http_field_name * name, const char * value)
{
int is_exist = 0;
struct header_data *header = NULL;
struct http2_half_private *half_private = nghttp2_to_half_private(half);
struct http2_headers *headers = &half_private->headers;
if (unlikely(half_private == NULL) || unlikely(headers == NULL))
goto finish;
foreach_headers(headers, header){
if (header->field.field_id == TFE_HTTP_UNKNOWN_FIELD)
continue;
if (header->field.field_id == name->field_id){
free(header->nv.value);
header->nv.value = (uint8_t*)tfe_strdup(value);
header->nv.valuelen = strlen(value);
header->field.field_name = (const char *)header->nv.name;
is_exist = 1;
break;
}
}
if (is_exist == 0){
header = ALLOC(struct header_data, 1);
memset(header, 0, sizeof(struct header_data));
if (name->field_id == TFE_HTTP_UNKNOWN_FIELD){
header->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
header->nv.name = (uint8_t *)tfe_strdup(name->field_name);
header->nv.namelen = strlen(name->field_name);
header->field.field_name = (const char *)header->nv.name;;
}else{
const char *std_name = val_to_str(name->field_id, headers_vals);
header->field.field_id = name->field_id;
header->field.field_name = NULL;
header->nv.name = (uint8_t *)tfe_strdup((const char *)std_name);
header->nv.namelen = strlen(std_name);
}
header->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
header->nv.valuelen = strlen(value);
headers_add_tail(headers, header);
}
finish:
return 0;
}
static struct tfe_http_half *
half_ops_allow_write(const struct tfe_http_half * half)
{
return (struct tfe_http_half *) half;
}
static const char *
half_ops_field_iterate(const struct tfe_http_half * half, void ** iter, struct http_field_name * field)
{
const char *value = NULL;
struct header_data **head = (struct header_data **)iter;
const struct http2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
goto finish;
if (NULL == *head){
*head = half_private->headers.head;
}else{
*head = (*head)->next;
}
if (NULL == *head)
goto finish;
field->field_id = (*head)->field.field_id;
field->field_name = (*head)->field.field_name;
value = (const char *)(*head)->nv.value;
finish:
return value;
}
static int
half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag)
{
int xret = -1;
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
if (buff == NULL && size == 0){
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, NULL, 0, resp->body.evbuf_body, body->gzip, 1);
}
resp->message_state = MANAGE_STAGE_COMPLETE;
goto finish;
}
if (resp->body.evbuf_body == NULL){
resp->body.evbuf_body = evbuffer_new();
}
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)buff, size,
resp->body.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, buff, size);
}
finish:
return xret;
}
static void
delete_nv_packet_data(struct http2_headers *headers)
{
struct header_data *header = NULL;
foreach_headers(headers, header){
free(header->nv.name);
header->nv.name = NULL;
header->nv.namelen = 0;
free(header->nv.value);
header->nv.value = NULL;
header->nv.valuelen = 0;
header->nv.flags = 0;
headers_del(headers, header);
}
headers->nvlen = 0;
headers->flag = 0;
headers->head = headers->tail = NULL;
}
void delete_stream_half_data(struct http2_half_private **data,
int body_flag)
{
if (*data){
struct data_t *body = &((*data)->body);
inflate_finished(&body->inflater);
deflate_finished(&body->deflate);
if (body->evbuf_body && body_flag){
evbuffer_free(body->evbuf_body);
body->evbuf_body = NULL;
}
if ((*data)->url_storage)
FREE(&((*data)->url_storage));
delete_nv_packet_data(&((*data)->headers));
if((*data)->event_cb_user_deleter != NULL)
(*data)->event_cb_user_deleter((*data)->event_cb_user);
free(*data);
*data = NULL;
}
return;
}
void half_ops_free(struct tfe_http_half * half)
{
struct http2_half_private * h2_private = nghttp2_to_half_private(half);
delete_stream_half_data(&h2_private, 1);
free(h2_private);
h2_private = NULL;
return;
}
int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
assert(body->evbuf_body == NULL);
if (by_stream)
{
if (body->inflater){
inflate_finished(&body->inflater);
}
if (body->deflate){
deflate_finished(&body->deflate);
}
body->gzip = HTTP2_CONTENT_ENCODING_NONE;
resp->message_state = MANAGE_STAGE_READING;
}
body->evbuf_body = evbuffer_new();
return 0;
}
int h2_half_ops_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data)
{
int xret = -1;
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data,
resp->body.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, data, sz_data);
}
return xret;
}
int h2_half_ops_body_end(struct tfe_http_half * half)
{
struct http2_half_private * resp = nghttp2_to_half_private(half);
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
return 0;
}
struct tfe_http_half_ops h2_half_ops =
{
.ops_http_field_read = half_ops_field_read,
.ops_http_field_write = half_ops_field_write,
.ops_http_allow_write = half_ops_allow_write,
.ops_http_field_iterate = half_ops_field_iterate,
.ops_append_body = half_ops_append_body,
.ops_body_begin = h2_half_ops_body_begin,
.ops_body_data = h2_half_ops_body_data,
.ops_body_end = h2_half_ops_body_end,
.ops_free = half_ops_free
};
static void
fill_resp_spec_from_handle(struct http2_half_private *half_private)
{
struct header_data *head = NULL;
struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec);
foreach_headers(&half_private->headers, head){
if (!strncmp((char *)(head->nv.name), ":status", strlen(":status"))){
resp_spec->resp_code = atoi((const char *)head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-type", strlen("content-type"))){
resp_spec->content_type = (const char *)(head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-encoding", strlen("content-encoding"))){
resp_spec->content_encoding = (const char *)(head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-length", strlen("content-length"))){
resp_spec->content_length = (const char *)(head->nv.value);
continue;
}
}
resp_spec->content_length = 0;
return;
}
static void
fill_req_spec_from_handle(struct http2_half_private *half_private)
{
int urllen = 0;
struct header_data *head = NULL;
struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec);
foreach_headers(&half_private->headers, head){
if (!strncmp((char *)(head->nv.name), ":method", strlen(":method"))){
req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(head->nv.value), method_vals);
continue;
}
if (!strncmp((char *)(head->nv.name), ":authority", strlen(":authority"))){
req_spec->host = (const char *)(head->nv.value);
urllen += head->nv.valuelen;
continue;
}
if (!strncmp((char *)(head->nv.name), ":path", strlen(":path"))){
req_spec->uri = (const char*)(head->nv.value);
urllen += head->nv.valuelen;
continue;
}
}
char *urltmp = NULL;
urltmp = (char *)malloc(urllen + 1);
if(urltmp){
sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri);
req_spec->url = urltmp;
}
return;
}
static struct http2_half_private*
tfe_half_private_init(enum tfe_http_direction direction)
{
struct http2_half_private *half_private = ALLOC(struct http2_half_private, 1);
assert(half_private);
memset(half_private, 0, sizeof(struct http2_half_private));
half_private->half_public.direction = direction;
half_private->half_public.ops = &h2_half_ops;
headers_init(&half_private->headers);
half_private->body.evbuf_body = evbuffer_new();
half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE;
half_private->body_state = MANAGE_STAGE_INIT;
half_private->message_state = MANAGE_STAGE_INIT;
return half_private;
}
static struct tfe_http_session*
h2_ops_allow_write(const struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if ( http_frame_currect_plugin_preempt(stream_data->frame_ctx) == 0){
return (struct tfe_http_session *)session;
}
return NULL;
}
void h2_ops_detach(const struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
return http_frame_currect_plugin_detach(stream_data->frame_ctx);
}
void h2_ops_drop(struct tfe_http_session * session)
{
return;
}
void h2_ops_suspend(struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
stream_data->spd_set = 1;
}
void h2_ops_resume(struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if (stream_data->spd_valid){
stream_data->rse_set = 1;
}
}
void h2_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
struct http2_half_private *half_user = nghttp2_to_half_private(req_user);
stream_data->pangu_req = half_user;
}
void h2_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
struct http2_half_private *half_user = nghttp2_to_half_private(resp);
stream_data->pangu_resp = half_user;
}
struct tfe_http_half * h2_ops_request_create(struct tfe_http_session * session,
enum tfe_http_std_method method, const char * uri)
{
struct http2_half_private * req = tfe_half_private_init(TFE_HTTP_REQUEST);
req->method_or_status = method;
req->url_storage = tfe_strdup(uri);
return &req->half_public;
}
struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session, int resp_code)
{
struct http2_half_private * resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
resp->method_or_status = resp_code;
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
if (stream_data->resp)
resp->body.gzip = stream_data->resp->body.gzip;
return &resp->half_public;
}
void h2_ops_kill(struct tfe_http_session * session)
{
}
struct tfe_http_session_ops nghttp2_session_ops =
{
.ops_allow_write = h2_ops_allow_write,
.ops_detach = h2_ops_detach,
.ops_drop = h2_ops_drop,
.ops_suspend = h2_ops_suspend,
.ops_resume = h2_ops_resume,
.ops_kill = h2_ops_kill,
.ops_request_set = h2_ops_request_set,
.ops_response_set = h2_ops_response_set,
.ops_request_create = h2_ops_request_create,
.ops_response_create = h2_ops_response_create
};
void
nghttp2_write_access_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info)
{
/* Request */
struct http2_half_private *req = h2_stream->req;
/* Response */
struct http2_half_private *resp = h2_stream->resp;
/* Req-Public */
struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL;
/* Resp-Public */
struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL;
const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-";
const char * url = req_spec ? req_spec->url : "-";
char resp_code[TFE_STRING_MAX];
if (resp_spec)
snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code);
else
snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-");
const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
const char * cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
const char * pangu_req = h2_stream->pangu_req ? "USER/REQ" : "-";
const char * pangu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-";
//const char * __str_suspend = h2_stream->suspend_counter > 0 ? "SUSPEND" : "-";
char *access_log;
asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_stream->tfe_session.session_id,
method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp);
TFE_LOG_INFO(rt_log_data()->run_log_handle, "%s", access_log);
free(access_log);
}
static int tfe_half_session_init(struct h2_stream_data_t *h2_stream, int32_t stream_id,
enum tfe_http_direction direction)
{
struct tfe_http_session *tfe_session = &h2_stream->tfe_session;
if (direction == TFE_HTTP_REQUEST){
struct http2_half_private *req = h2_stream->req;
tfe_session->ops = &nghttp2_session_ops;
tfe_session->req = &req->half_public;
tfe_session->session_id = stream_id;
}
if (direction == TFE_HTTP_RESPONSE){
struct http2_half_private *resp = h2_stream->resp;
tfe_session->resp = &resp->half_public;
}
return 0;
}
void delete_http2_stream_data(struct h2_stream_data_t *h2_stream,
const struct tfe_stream *tf_stream,
int body_flag)
{
if (tf_stream){
nghttp2_write_access_log(h2_stream, tf_stream->str_stream_info);
}
delete_stream_half_data(&h2_stream->req, body_flag);
delete_stream_half_data(&h2_stream->resp, body_flag);
}
static void
suspend_start(struct h2_stream_data_t *h2_stream,
struct http2_half_private *half, const struct tfe_stream *stream)
{
if (h2_stream->spd_valid != 1){
return;
}
enum tfe_http_event spd_event = h2_stream->spd_event;
/* Clean up suspend tag, we can support user's call suspend in this callback */
h2_stream->spd_event = (enum tfe_http_event)0;
h2_stream->spd_valid = 0;
/* */
if (h2_stream->rse_set){
tfe_stream_resume(stream);
}
/* Call user callback, tell user we resume from suspend */
h2_stream->rse_set = 0;
half->event_cb(half, spd_event, NULL, 0, half->event_cb_user);
return;
}
static int
suspend_stop(struct h2_stream_data_t *h2_stream,
const struct tfe_stream *tf_stream, enum tfe_conn_dir dir)
{
int xret = -1;
if (h2_stream->spd_set){
h2_stream->spd_valid = 1;
h2_stream->spd_set = 0;
tfe_stream_suspend(tf_stream, dir);
xret = 0;
}
return xret;
}
static ssize_t
nghttp2_client_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
(void)session;
(void)flags;
int ret = -1;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
ret = tfe_stream_write(session_info->tf_stream, CONN_DIR_UPSTREAM, data, length);
if (unlikely(ret < 0)){
assert(0);
}
return (ssize_t)length;
}
void nghttp2_disect_goaway(struct tfe_session_info_t *session_info, int server)
{
unsigned int thread_id = session_info->thread_id;
const struct tfe_stream * stream = session_info->tf_stream;
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
if (h2_stream->frame_ctx){
http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session,
thread_id);
h2_stream->frame_ctx = NULL;
}
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
}
if (session_info->as_client && !server){
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
}
if (session_info->as_server && server){
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
}
}
static int
nghttp2_client_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *resp = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch (frame->hd.type) {
case NGHTTP2_DATA:
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream)
break;
resp = h2_stream->resp;
h2_stream->frame_type |= TFE_NGHTTP2_DATA;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
break;
case NGHTTP2_HEADERS:
if ((frame->headers.cat == NGHTTP2_HCAT_RESPONSE) &&
(frame->hd.flags & NGHTTP2_FLAG_END_HEADERS)){
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Upstream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
resp = h2_stream->resp;
suspend_start(h2_stream, resp, session_info->tf_stream);
fill_resp_spec_from_handle(h2_stream->resp);
h2_stream->frame_type |= TFE_NGHTTP2_HEADERS;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user);
if (h2_stream->spd_set){
h2_stream->spd_event = EV_HTTP_RESP_HDR;
}
}
break;
case NGHTTP2_SETTINGS:
break;
case NGHTTP2_WINDOW_UPDATE:
break;
case NGHTTP2_PING:
break;
case NGHTTP2_PRIORITY:
break;
case NGHTTP2_GOAWAY:
nghttp2_disect_goaway(session_info, 1);
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Up stream control frame goaway");
break;
}
return 0;
}
static int
nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *input,
size_t input_len, void *__attribute__((__unused__))user_data)
{
size_t len;
char *uncompr = NULL;
int uncompr_len = 0, __attribute__((__unused__))ret = 0;
const unsigned char *data;
struct http2_half_private * resp = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "On data callback can't get downstream information, id = %d",
stream_id);
goto finish;
}
h2_stream->frame_type |= TFE_NGHTTP2_DATA;
stream_set_id(&session_info->h2_id, stream_id);
resp = h2_stream->resp;
if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&resp->body.inflater, resp->body.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
}
}
data = input;
len = input_len;
if (resp->body_state == MANAGE_STAGE_INIT){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len,
resp->event_cb_user);
}
resp->body.flags = flags;
resp->body_state = MANAGE_STAGE_READING;
}
if (resp->body_state == MANAGE_STAGE_READING){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len,
resp->event_cb_user);
}
resp->body.flags = flags;
goto event;
}
if (flags & 0x01){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0,
resp->event_cb_user);
}
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0,
resp->event_cb_user);
}
resp->body.flags = flags;
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
}
event:
finish:
return 0;
}
static int
nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
(void)error_code;
struct h2_stream_data_t *h2_stream;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
if (error_code != 0)
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Up stream abnormal exit, id = %d, error_code = %d",
stream_id, error_code);
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (!h2_stream) {
return 0;
}
struct http2_half_private *resp = h2_stream->resp;
if (error_code == 0 && resp->body_state != MANAGE_STAGE_COMPLETE){
if (resp->body_state != MANAGE_STAGE_INIT){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0,
resp->event_cb_user);
}
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0,
resp->event_cb_user);
}
}
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
h2_stream->frame_type |= TFE_NGHTTP2_DATA;
stream_set_id(&session_info->h2_id, stream_id);
}else{
TAILQ_REMOVE(&session_info->list, h2_stream, next);
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
}
return 0;
}
static int
nghttp2_client_on_header(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
(void)session;
(void)flags;
struct header_data *head = NULL;
struct http2_headers *headers = NULL;
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *resp = NULL;
enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Client Stream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
resp = h2_stream->resp;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
head->field.field_name = (const char *)head->nv.name;
}else{
if (field_id == TFE_HTTP_CONT_ENCODING){
resp->body.gzip = method_to_str_idx((const char *)value);
}
head->field.field_id = field_id;
head->field.field_name = NULL;
}
headers = &resp->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
break;
case NGHTTP2_PUSH_PROMISE:
if (frame->headers.cat != NGHTTP2_HCAT_REQUEST)
break;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Client Stream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
#ifdef __DEBUG_TEST
printf("%d, %s %s\n",frame->hd.stream_id, name, value);
#endif
resp = h2_stream->resp;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
}else{
if (field_id == TFE_HTTP_CONT_ENCODING){
resp->body.gzip = method_to_str_idx((const char *)value);
}
head->field.field_id = field_id;
head->field.field_name = (const char *)head->nv.name;
}
headers = &resp->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
h2_stream->frame_type |= TFE_NGHTTP2_PUSH_PROMISE;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
default:
break;
}
return 0;
}
static struct h2_stream_data_t*
create_upstream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct user_event_dispatch *event = NULL;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (h2_stream == NULL){
/** todo:When the data of the reply is pushed as promised,
there is no stream id at the reply end. to create it*/
goto finish;
}
if (h2_stream->resp){
goto finish;
}
h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->resp, event, NULL);
h2_stream->resp->frame_ctx = h2_stream->frame_ctx;
nghttp2_session_set_stream_user_data(session, stream_id, h2_stream);
finish:
return h2_stream;
}
static int
nghttp2_client_on_begin_headers(nghttp2_session * session,
const nghttp2_frame * frame,
void * user_data)
{
(void)session;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch(frame->hd.type){
case NGHTTP2_HEADERS:
create_upstream_data(session, frame->hd.stream_id, session_info);
break;
default:
break;
}
return 0;
}
static
void client_session_init(struct tfe_session_info_t *session_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks,
nghttp2_client_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_client_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks,
nghttp2_client_on_data_chunk_recv);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks,
nghttp2_client_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_client_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks,
nghttp2_client_on_begin_headers);
nghttp2_session_client_new(&session_info->as_client, callbacks, session_info);
session_info->as_client->local_window_size = NGHTTP2_MAX_WINDOW_SIZE;
session_info->as_client->local_settings.initial_window_size = NGHTTP2_MAX_WINDOW_SIZE;
nghttp2_session_callbacks_del(callbacks);
}
static ssize_t
nghttp2_server_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
(void)session;
(void)flags;
int ret = -1;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
ret = tfe_stream_write(session_info->tf_stream, CONN_DIR_DOWNSTREAM, data, length);
if (unlikely(ret < 0)){
assert(0);
}
return (ssize_t)length;
}
static int
nghttp2_server_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *req = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
return 0;
}
switch(frame->hd.type){
case NGHTTP2_HEADERS:
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){
req = h2_stream->req;
suspend_start(h2_stream, req, session_info->tf_stream);
fill_req_spec_from_handle(h2_stream->req);
req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user);
if (h2_stream->spd_set){
h2_stream->spd_event = EV_HTTP_REQ_HDR;
}
}
case NGHTTP2_DATA:
/* Check that the client request has finished */
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
/* For DATA and HEADERS frame, this callback may be called after
on_stream_close_callback. Check that stream still alive. */
}
break;
case NGHTTP2_SETTINGS:
break;
case NGHTTP2_WINDOW_UPDATE:
break;
case NGHTTP2_PING:
break;
case NGHTTP2_PRIORITY:
break;
case NGHTTP2_GOAWAY:
nghttp2_disect_goaway(session_info, 0);
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Down stream control frame goaway");
break;
default:
break;
}
return 0;
}
/*
---------------------------------------------------------------------------------------------------------------------------------
|No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR |
---------------------------------------------------------------------------------------------------------------------------------
|0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 |
---------------------------------------------------------------------------------------------------------------------------------
|REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED|
---------------------------------------------------------------------------------------------------------------------------------
|0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d |
---------------------------------------------------------------------------------------------------------------------------------
*/
static int
nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
(void)error_code;
struct h2_stream_data_t *h2_stream = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (!h2_stream) {
return 0;
}
if (error_code != 0)
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Down stream abnormal exit, id = %d, error_code = %d",
stream_id, error_code);
return 0;
}
static int
nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
enum tfe_http_std_field field_id;
struct header_data *head = NULL;
struct http2_headers *headers = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch (frame->hd.type){
case NGHTTP2_HEADERS:
if (frame->headers.cat != NGHTTP2_HCAT_REQUEST)
break;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Stream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
head->field.field_name = (const char *)head->nv.name;
}else{
head->field.field_id = field_id;
head->field.field_name = NULL;
}
headers = &h2_stream->req->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
h2_stream->frame_type |= TFE_NGHTTP2_HEADERS;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
break;
}
return 0;
}
static void
create_serv_stream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct user_event_dispatch *event = NULL;
struct http2_half_private *half_private = NULL;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (h2_stream != NULL){
goto finish;
}
h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1);
assert(h2_stream);
memset(h2_stream, 0, sizeof(struct h2_stream_data_t));
h2_stream->stream_id = stream_id;
h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->req, event, NULL);
/* Call business plugin */
half_private = h2_stream->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream,
&h2_stream->tfe_session, session_info->thread_id);
h2_stream->frame_ctx = half_private->frame_ctx;
TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next);
nghttp2_session_set_stream_user_data(session, stream_id, h2_stream);
finish:
return;
}
static int
nghttp2_server_on_begin_headers(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
return 0;
}
create_serv_stream_data(session, frame->hd.stream_id, session_info);
return 0;
}
static void
server_session_init(struct tfe_session_info_t *session_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_server_on_frame_recv);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, nghttp2_server_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_server_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, nghttp2_server_on_begin_headers);
nghttp2_session_server_new(&session_info->as_server, callbacks, session_info);
session_info->as_server->local_window_size = NGHTTP2_MAX_WINDOW_SIZE;
session_info->as_server->local_settings.initial_window_size = NGHTTP2_MAX_WINDOW_SIZE;
nghttp2_session_callbacks_del(callbacks);
}
static void
delete_server_session_data(struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream;
struct h2_stream_data_t *_h2_stream;
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
free(h2_stream);
h2_stream = NULL;
}
}
static void
delete_client_session_data(struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream;
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
free(h2_stream);
h2_stream = NULL;
}
}
static ssize_t
upstream_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
nghttp2_data_source *source,
void *user_data)
{
#define TFE_MAX_PAYLOADLEN 8192
int datalen = 0;
struct data_t *body = (struct data_t *)source->ptr;
size_t inputlen = evbuffer_get_length(body->evbuf_body);
unsigned char *input = evbuffer_pullup(body->evbuf_body, -1);
if (input == NULL || inputlen == 0){
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
goto finish;
}
if (inputlen > TFE_MAX_PAYLOADLEN){
datalen = TFE_MAX_PAYLOADLEN;
}else{
datalen = inputlen;
}
memcpy(buf, input, datalen);
evbuffer_drain(body->evbuf_body, datalen);
finish:
return datalen;
}
static enum tfe_stream_action
server_frame_submit_data(struct tfe_session_info_t *session_info, struct h2_stream_data_t **h2_stream,
uint16_t *nghttp2_type)
{
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct http2_half_private *resp = (*h2_stream)->resp;
if ((*h2_stream)->pangu_resp){
stream_action = ACTION_DROP_DATA;
*nghttp2_type |= TFE_NGHTTP2_RESPONSE;
}else{
stream_action = ACTION_FORWARD_DATA;
struct data_t *body = &resp->body;
if (body->flags == NGHTTP2_FLAG_END_STREAM &&
resp->message_state == MANAGE_STAGE_COMPLETE){
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Data stream exit, id = %d", (*h2_stream)->stream_id);
TAILQ_REMOVE(&session_info->list, *h2_stream, next);
delete_http2_stream_data(*h2_stream, session_info->tf_stream, 1);
free(*h2_stream);
*h2_stream = NULL;
}
}
return stream_action;
}
static nghttp2_nv*
nghttp2_nv_packet(struct http2_headers *headers, nghttp2_nv *hdrs)
{
int nvlen = 0;
struct header_data *header = NULL;
foreach_headers(headers, header){
hdrs[nvlen].name = header->nv.name;
hdrs[nvlen].namelen = header->nv.namelen;
hdrs[nvlen].value = header->nv.value;
hdrs[nvlen].valuelen = header->nv.valuelen;
hdrs[nvlen].flags = header->nv.flags;
nvlen++;
}
return hdrs;
}
static void
downstream_create_resp(struct h2_stream_data_t *h2_stream, nghttp2_session *as_client,
const struct tfe_stream *tf_stream, unsigned int thread_id)
{
struct user_event_dispatch *event = NULL;
if (h2_stream->resp)
goto finish;
h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
tfe_half_session_init(h2_stream, h2_stream->stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = thread_id;
event->tf_stream = tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->resp, event, NULL);
h2_stream->resp->frame_ctx = h2_stream->frame_ctx;
nghttp2_session_set_stream_user_data(as_client, h2_stream->stream_id, h2_stream);
finish:
return;
}
static void
tfe_make_nv(struct http2_headers *headers,
const char *name,const char *value, int flag)
{
/*Add head*/
struct header_data *head = NULL;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = strlen(name);
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
head->nv.valuelen = strlen(value);
headers->flag = 0x04;
if (flag)
headers_add_head(headers, head);
else
headers_add_tail(headers, head);
}
static enum tfe_stream_action
tfe_submit_response(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct http2_half_private *resp = h2_stream->pangu_resp;
#define VALUE_LEN 128
char value[VALUE_LEN] = {0};
snprintf(value, VALUE_LEN, "%d", resp->method_or_status);
tfe_make_nv(&resp->headers, ":status", (const char *)value, 1);
snprintf(value, VALUE_LEN, "tfe/%s", tfe_version());
tfe_make_nv(&resp->headers, "X-TG-Construct-By", (const char *)value, 0);
h2_stream->frame_type |= TFE_NGHTTP2_RESPONSE;
stream_set_id(&session_info->h2_id, h2_stream->stream_id);
stream_action = nghttp2_server_mem_send(session_info);
if (stream_action == ACTION_DROP_DATA){
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
}
return stream_action;
}
static enum tfe_stream_action
nghttp2_client_frame_submit_header(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *req = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
if (0 == suspend_stop(h2_stream, session_info->tf_stream, CONN_DIR_DOWNSTREAM)){
stream_action = ACTION_DEFER_DATA;
goto finish;
}
req = h2_stream->pangu_req != NULL ? h2_stream->pangu_req : h2_stream->req;
if (req == NULL){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
if (h2_stream->pangu_resp){
stream_action = tfe_submit_response(session_info, h2_stream);
goto finish;
}
headers = &req->headers;
if (headers->nvlen <= 0){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
/*Create C' half_private_resp**/
downstream_create_resp(h2_stream, session_info->as_client, session_info->tf_stream, session_info->thread_id);
nghttp2_session_set_next_stream_id(session_info->as_client, h2_stream->stream_id);
stream_id = nghttp2_submit_request(session_info->as_client, NULL,
nghttp2_nv_packet(headers, hdrs),
headers->nvlen, NULL, h2_stream);
if (stream_id < 0){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Could not submit request: %s",
nghttp2_strerror(stream_id));
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
/*clear headers data ***/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static enum tfe_stream_action
nghttp2_client_frame_submit_settings()
{
return ACTION_FORWARD_DATA;
}
enum tfe_stream_action
nghttp2_client_frame_submit(struct tfe_session_info_t *session_info)
{
int i = 0;
struct h2_run_id *h2_id = &session_info->h2_id;
struct h2_stream_data_t *nghttp2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
for(i = 0; i < h2_id->num; i++){
nghttp2_stream = TAILQ_LIST_FIND(session_info, h2_id->id[i]);
if (NULL == nghttp2_stream)
break;
if (nghttp2_stream->frame_type & TFE_NGHTTP2_HEADERS){
stream_action = nghttp2_client_frame_submit_header(session_info, nghttp2_stream);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_HEADERS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_DATA){
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_DATA;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_SETTINGS){
stream_action = nghttp2_client_frame_submit_settings();
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_SETTINGS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_PUSH_PROMISE){
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_PUSH_PROMISE;
}
}
nghttp2_stream_disable_rid(&session_info->h2_id);
return stream_action;
}
enum tfe_stream_action
nghttp2_client_mem_send(struct tfe_session_info_t *session_info)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
stream_action = nghttp2_client_frame_submit(session_info);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
goto finish;
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_header(struct h2_stream_data_t *h2_stream,
uint16_t *nghttp2_type)
{
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
if (h2_stream->pangu_resp == NULL){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
if (h2_stream->pangu_resp != NULL){
*nghttp2_type |= TFE_NGHTTP2_RESPONSE;
stream_action = ACTION_DROP_DATA;
goto finish;
}
finish:
return stream_action;
}
static void
upstream_create_req(struct tfe_session_info_t *session_info, nghttp2_session *as_server,
struct h2_stream_data_t *h2_stream, int32_t stream_id)
{
struct user_event_dispatch *event = NULL;
struct http2_half_private *half_private = NULL;
h2_stream->stream_id = stream_id;
h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->req, event, NULL);
/* Call business plugin */
half_private = h2_stream->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_STREAM_LOG_ERROR(h2_stream, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream,
&h2_stream->tfe_session, session_info->thread_id);
h2_stream->frame_ctx = half_private->frame_ctx;
TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next);
nghttp2_session_set_stream_user_data(as_server, stream_id, h2_stream);
finish:
return;
}
static enum tfe_stream_action __attribute__((__unused__))
nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
struct http2_half_private *resp = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
resp = h2_stream->resp;
if (resp == NULL)
goto finish;
headers = &resp->headers;
if (headers->nvlen <= 0)
goto finish;
/* Create s' half req*/
_h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1);
assert(_h2_stream);
memset(_h2_stream, 0, sizeof(struct h2_stream_data_t));
stream_id = nghttp2_submit_push_promise(session_info->as_server, headers->flag,
h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, _h2_stream);
if (stream_id < 0){
free(_h2_stream);
_h2_stream = NULL;
TFE_STREAM_LOG_ERROR(h2_stream, "Failed to submit push promise: %s", nghttp2_strerror(stream_id));
goto finish;
}
upstream_create_req(session_info, session_info->as_server, _h2_stream, stream_id);
/*clean header message **/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_settings()
{
return ACTION_FORWARD_DATA;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_response(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int rv = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *pangu_resp = NULL;
pangu_resp = h2_stream->pangu_resp;
if (pangu_resp == NULL)
return ACTION_FORWARD_DATA;
if (pangu_resp->message_state != MANAGE_STAGE_COMPLETE){
session_info->state = 0;
return ACTION_DROP_DATA;
}
headers = &pangu_resp->headers;
if (headers->nvlen <= 0)
return ACTION_FORWARD_DATA;
struct data_t *body = &pangu_resp->body;
char str_sz_evbuf_body[TFE_STRING_MAX];
snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body));
const static struct http_field_name __cont_encoding_length_name = {TFE_HTTP_CONT_LENGTH, NULL};
tfe_http_field_write(&pangu_resp->half_public, &__cont_encoding_length_name, str_sz_evbuf_body);
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void *)body;
data_prd.read_callback = upstream_read_callback;
rv = nghttp2_submit_response(session_info->as_server, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, &data_prd);
if (rv != 0){
return ACTION_FORWARD_DATA;
}
delete_nv_packet_data(headers);
return ACTION_DROP_DATA;
}
enum tfe_stream_action
nghttp2_server_frame_submit(struct tfe_session_info_t *session_info)
{
int i = 0;
struct h2_stream_data_t *nghttp2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct h2_run_id *h2_id = &session_info->h2_id;
for (i = 0; i < h2_id->num; i++){
nghttp2_stream = TAILQ_LIST_FIND(session_info, h2_id->id[i]);
if (NULL == nghttp2_stream)
break;
if (nghttp2_stream->frame_type & TFE_NGHTTP2_HEADERS){
stream_action = nghttp2_server_frame_submit_header(nghttp2_stream, &nghttp2_stream->frame_type);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_HEADERS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_DATA){
stream_action = server_frame_submit_data(session_info, &nghttp2_stream, &nghttp2_stream->frame_type);
if (nghttp2_stream)
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_DATA;
else
continue;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_SETTINGS){
stream_action = nghttp2_server_frame_submit_settings();
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_SETTINGS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_PUSH_PROMISE){
/*Because a data stream contains multiple stream ids, resulting in data drop,
there is no processing of the committed push frame at present.
Under the condition that the main process is not affected,
the committed push frame is not encapsulated ***/
//stream_action = nghttp2_server_frame_submit_push_promise(session_info, nghttp2_stream);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_PUSH_PROMISE;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_RESPONSE){
stream_action = nghttp2_server_frame_submit_response(session_info, nghttp2_stream);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_RESPONSE;
}
}
nghttp2_stream_disable_rid(&session_info->h2_id);
return stream_action;
}
enum tfe_stream_action
nghttp2_server_mem_send(struct tfe_session_info_t *session_info)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
stream_action = nghttp2_server_frame_submit(session_info);
if (stream_action == ACTION_DROP_DATA
&& session_info->state){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Fatal upstream send error: %s\n",
nghttp2_strerror(xret));
}
}
session_info->state = 1;
return stream_action;
}
enum tfe_stream_action
detect_up_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
readlen = nghttp2_session_mem_recv(session_info->as_client, data, len);
if (readlen < 0){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed to process server requests. Link message %s",
tfe_stream->str_stream_info);
delete_client_session_data(session_info);
goto err;
}
stream_action = nghttp2_server_mem_send(session_info);
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
err:
tfe_stream_detach(tfe_stream);
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
enum tfe_stream_action
detect_down_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
session_info->tf_stream = tfe_stream;
session_info->thread_id = thread_id;
readlen = nghttp2_session_mem_recv(session_info->as_server, data, len);
if (readlen < 0){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed to process client requests. Link message %s",
tfe_stream->str_stream_info);
delete_server_session_data(session_info);
goto err;
}
stream_action = nghttp2_client_mem_send(session_info);
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
err:
tfe_stream_detach(tfe_stream);
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
void
sess_data_ctx_fini(struct tfe_session_info_t *session_info,
const struct tfe_stream * stream, unsigned int thread_id)
{
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
if (h2_stream->frame_ctx){
http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session,
thread_id);
h2_stream->frame_ctx = NULL;
}
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
}
if (session_info->as_client){
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
}
if (session_info->as_server){
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
}
}
struct tfe_session_info_t* tfe_session_info_init()
{
nghttp2_session *session = NULL;
nghttp2_inbound_frame *iframe = NULL;
struct tfe_session_info_t *session_info = NULL;
session_info = ALLOC(struct tfe_session_info_t, 1);
assert(session_info);
memset(session_info, 0, sizeof(struct tfe_session_info_t));
session_info->state = 1;
server_session_init(session_info);
client_session_init(session_info);
session = session_info->as_client;
iframe = &(session->iframe);
iframe->state = NGHTTP2_IB_READ_FIRST_SETTINGS;
nghttp2_bufs_reset(&(session->aob.framebufs));
TAILQ_INIT(&session_info->list);
return session_info;
}