This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/protocol/http2/src/http2_stream.cpp
fengweihao ad004b29a6 *.修改帧处理方式
*.删除同一流中多ID遍历模式
*.添加函数指针数组管理不同帧回调
*.添加流处理打印
2019-05-24 18:45:45 +08:00

2438 lines
77 KiB
C++

/*************************************************************************
> File Name: http2_stream.c
> Author:
> Mail:
> Created Time:
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <stdint.h>
#include <math.h>
#include <sys/param.h>
#include <zlib.h>
#include <event2/buffer.h>
#include <tfe_utils.h>
#include <tfe_stream.h>
#include <nghttp2/nghttp2.h>
#include <http2_stream.h>
#include <http2_common.h>
#include <nghttp2_session.h>
#include <nghttp2_map.h>
/*
---------------------------------------------------------------------------------------------------------------------------------
|No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR |
---------------------------------------------------------------------------------------------------------------------------------
|0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 |
---------------------------------------------------------------------------------------------------------------------------------
|REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED|
---------------------------------------------------------------------------------------------------------------------------------
|0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d |
---------------------------------------------------------------------------------------------------------------------------------
*/
static const struct value_string method_vals[] =
{
{NGHTTP2_METHOD_DELETE, "DELETE"},
{NGHTTP2_METHOD_GET, "GET"},
{NGHTTP2_METHOD_HEAD, "HEAD"},
{NGHTTP2_METHOD_POST, "POST"},
{NGHTTP2_METHOD_PUT, "PUT"},
{NGHTTP2_METHOD_CONNECT, "CONNECT"},
{NGHTTP2_METHOD_UNKNOWN, "unknown"},
};
static const struct value_string headers_vals[] =
{
{TFE_HTTP_UNKNOWN_FIELD, "unkown"},
{TFE_HTTP_HOST, ":authority"},
{TFE_HTTP_REFERER, "referer"},
{TFE_HTTP_USER_AGENT, "user-agent"},
{TFE_HTTP_COOKIE, "cookie"},
{TFE_HTTP_SET_COOKIE, "set-cookie"},
{TFE_HTTP_PROXY_AUTHORIZATION, "proxy-authorization"},
{TFE_HTTP_AUTHORIZATION, "authorization"},
{TFE_HTTP_LOCATION, "location"},
{TFE_HTTP_SERVER, "server"},
{TFE_HTTP_ETAG, "etag"},
{TFE_HTTP_DATE, "date"},
{TFE_HTTP_TRAILER, "Trailer"},
{TFE_HTTP_TRANSFER_ENCODING, "transfer-encoding"},
{TFE_HTTP_VIA, "via"},
{TFE_HTTP_PRAGMA, "Pragma"},
{TFE_HTTP_CONNECTION, "connection"},
{TFE_HTTP_CONT_ENCODING, "content-encoding"},
{TFE_HTTP_CONT_LANGUAGE, "content-language"},
{TFE_HTTP_CONT_LOCATION, "content-location"},
{TFE_HTTP_CONT_RANGE, "content-range"},
{TFE_HTTP_CONT_LENGTH, "content-length"},
{TFE_HTTP_CONT_TYPE, "content-type"},
{TFE_HTTP_CONT_DISPOSITION, "content-disposition"},
{TFE_HTTP_EXPIRES, "expires"},
{TFE_HTTP_ACCEPT_ENCODING, "accept-encoding"},
{TFE_HTTP_CACHE_CONTROL, "cache-control"},
{TFE_HTTP_IF_MATCH, "if-match"},
{TFE_HTTP_IF_NONE_MATCH, "if-none-match"},
{TFE_HTTP_IF_MODIFIED_SINCE, "if-modified-since"},
{TFE_HTTP_IF_UNMODIFIED_SINCE, "if-unmodified-since"},
{TFE_HTTP_LAST_MODIFIED, "last-modified"},
};
typedef enum {
NGHTTP2_USER_SEND = 0x0b,
NGHTTP2_USER_COLSE = 0x0c,
} nghttp2_frame_user_type;
struct user_event_dispatch
{
const struct tfe_stream *tf_stream;
const struct tfe_http_session * tfe_session;
unsigned int thread_id;
};
/*up stream */
static struct h2_stream_data_t *
TAILQ_LIST_FIND(struct tfe_session_info_t *session_info, int32_t stream_id)
{
struct h2_stream_data_t *stream = NULL, *_next_stream = NULL;
TAILQ_FOREACH_SAFE(stream, &session_info->list, next, _next_stream)
{
if (stream->stream_id == stream_id){
break;
}
}
return stream;
}
/** begin */
/** headers list, After the test is completed, you can delete it */
#define foreach_headers(list, entry) \
for(entry = ((struct http2_headers*)(list))->head; entry; entry = entry->next)
static struct header_data*
headers_del(struct http2_headers *list, struct header_data *entry)
{
if (entry->prev)
entry->prev->next = entry->next;
else
list->head = entry->next;
if (entry->next)
entry->next->prev = entry->prev;
else
list->tail = entry->prev;
entry->next = entry->prev = NULL;
list->nvlen--;
return entry;
}
void
headers_add_head(struct http2_headers *list, struct header_data *entry)
{
if (list->head){
entry->next = list->head;
list->head->prev = entry;
}else{
list->tail = entry;
}
list->head = entry;
list->nvlen++;
}
void
headers_add_tail(struct http2_headers *list, struct header_data *entry)
{
entry->next = NULL;
entry->prev = list->tail;
if (list->tail)
list->tail->next = entry;
else
list->head = entry;
list->tail = entry;
list->nvlen++;
}
static inline void
headers_init(struct http2_headers *headers)
{
memset(headers, 0, sizeof(struct http2_headers));
headers->nvlen = 0;
headers->flag = 0;
headers->head = headers->tail = 0;
}
static int
method_to_str_idx(const char * method)
{
if (strcasestr(method, "gzip") != NULL)
return HTTP2_CONTENT_ENCODING_GZIP;
if (strcasestr(method, "x-gzip") != NULL)
return HTTP2_CONTENT_ENCODING_X_GZIP;
if (strcasestr(method, "deflate") != NULL)
return HTTP2_CONTENT_ENCODING_DEFLATE;
if (strcasestr(method, "bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_BZIP2;
if (strcasestr(method, "x-bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_X_BZIP2;
if (strcasestr(method, "br") != NULL)
return HTTP2_CONTENT_ENCODING_BR;
return HTTP2_CONTENT_ENCODING_NONE;
}
static nghttp2_nv*
nghttp2_nv_packet(struct http2_headers *headers, nghttp2_nv *hdrs)
{
int nvlen = 0;
struct header_data *header = NULL;
foreach_headers(headers, header){
hdrs[nvlen].name = header->nv.name;
hdrs[nvlen].namelen = header->nv.namelen;
hdrs[nvlen].value = header->nv.value;
hdrs[nvlen].valuelen = header->nv.valuelen;
hdrs[nvlen].flags = header->nv.flags;
nvlen++;
}
return hdrs;
}
static enum tfe_http_std_method
nghttp2_get_method(struct http2_half_private *half_private)
{
struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec);
return req_spec->method;
}
static nghttp2_settings_entry*
nghttp2_iv_packet(nghttp2_settings settings,
nghttp2_settings_entry *out_iv)
{
int i = 0;
nghttp2_settings_entry *iv = settings.iv;
for (i = 0; i < (int)settings.niv; i++){
out_iv[i].settings_id = iv[i].settings_id;
out_iv[i].value = iv[i].value;
}
return out_iv;
}
static void
delete_nv_packet_data(struct http2_headers *headers)
{
struct header_data *header = NULL;
foreach_headers(headers, header){
free(header->nv.name);
header->nv.name = NULL;
header->nv.namelen = 0;
free(header->nv.value);
header->nv.value = NULL;
header->nv.valuelen = 0;
header->nv.flags = 0;
headers_del(headers, header);
}
headers->nvlen = 0;
headers->flag = 0;
headers->head = headers->tail = NULL;
}
static int event_dispatch_cb(struct http2_half_private * half_private,
enum tfe_http_event ev, const unsigned char * data, size_t len, void * user)
{
struct user_event_dispatch *event = (struct user_event_dispatch *)user;
struct http_frame_session_ctx *frame_ctx = half_private->frame_ctx;
http_frame_raise_event(frame_ctx, event->tf_stream, (struct tfe_http_session *)event->tfe_session,
ev, data, len, event->thread_id);
return 0;
}
void half_set_callback(struct http2_half_private * half_private,
void * user, void (* user_deleter)(void *))
{
if (half_private->event_cb == NULL)
half_private->event_cb = event_dispatch_cb;
half_private->event_cb_user = user;
half_private->event_cb_user_deleter = user_deleter;
}
static const char *
half_ops_field_read(const struct tfe_http_half * half, const struct http_field_name * field)
{
const char *value = NULL;
struct header_data *header = NULL;
const struct http2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
goto finish;
foreach_headers(&half_private->headers, header){
if (header->field.field_id == field->field_id){
break;
}
}
if (header == NULL) goto finish;
value = (const char *)header->nv.value;
finish:
return value;
}
static int
half_ops_field_write(struct tfe_http_half * half, const struct http_field_name * name, const char * value)
{
int is_exist = 0;
struct header_data *header = NULL;
struct http2_half_private *half_private = nghttp2_to_half_private(half);
struct http2_headers *headers = &half_private->headers;
if (unlikely(half_private == NULL) || unlikely(headers == NULL))
goto finish;
foreach_headers(headers, header){
if (header->field.field_id == TFE_HTTP_UNKNOWN_FIELD)
continue;
if (header->field.field_id == name->field_id){
free(header->nv.value);
header->nv.value = (uint8_t*)tfe_strdup(value);
header->nv.valuelen = strlen(value);
header->field.field_name = (const char *)header->nv.name;
is_exist = 1;
break;
}
}
if (is_exist == 0){
header = ALLOC(struct header_data, 1);
memset(header, 0, sizeof(struct header_data));
if (name->field_id == TFE_HTTP_UNKNOWN_FIELD){
header->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
header->nv.name = (uint8_t *)tfe_strdup(name->field_name);
header->nv.namelen = strlen(name->field_name);
header->field.field_name = (const char *)header->nv.name;;
}else{
const char *std_name = val_to_str(name->field_id, headers_vals);
header->field.field_id = name->field_id;
header->field.field_name = NULL;
header->nv.name = (uint8_t *)tfe_strdup((const char *)std_name);
header->nv.namelen = strlen(std_name);
}
header->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
header->nv.valuelen = strlen(value);
headers_add_tail(headers, header);
}
finish:
return 0;
}
static struct tfe_http_half *
half_ops_allow_write(const struct tfe_http_half * half)
{
return (struct tfe_http_half *) half;
}
static const char *
half_ops_field_iterate(const struct tfe_http_half * half, void ** iter, struct http_field_name * field)
{
const char *value = NULL;
struct header_data **head = (struct header_data **)iter;
const struct http2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
goto finish;
if (NULL == *head){
*head = half_private->headers.head;
}else{
*head = (*head)->next;
}
if (NULL == *head)
goto finish;
field->field_id = (*head)->field.field_id;
field->field_name = (*head)->field.field_name;
value = (const char *)(*head)->nv.value;
finish:
return value;
}
static int
half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag)
{
int xret = -1;
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
if (buff == NULL && size == 0){
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, NULL, 0, resp->body.evbuf_body, body->gzip, 1);
}
resp->message_state = MANAGE_STAGE_COMPLETE;
goto finish;
}
if (resp->body.evbuf_body == NULL){
resp->body.evbuf_body = evbuffer_new();
}
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)buff, size,
resp->body.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, buff, size);
}
finish:
return xret;
}
void delete_stream_half_data(struct http2_half_private **data,
int body_flag)
{
if (*data){
struct data_t *body = &((*data)->body);
inflate_finished(&body->inflater);
deflate_finished(&body->deflate);
if (body->evbuf_body && body_flag){
evbuffer_free(body->evbuf_body);
body->evbuf_body = NULL;
}
if ((*data)->url_storage)
FREE(&((*data)->url_storage));
delete_nv_packet_data(&((*data)->headers));
if((*data)->event_cb_user_deleter != NULL)
(*data)->event_cb_user_deleter((*data)->event_cb_user);
free(*data);
*data = NULL;
}
return;
}
void half_ops_free(struct tfe_http_half * half)
{
struct http2_half_private * h2_private = nghttp2_to_half_private(half);
delete_stream_half_data(&h2_private, 1);
free(h2_private);
h2_private = NULL;
return;
}
int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
assert(body->evbuf_body == NULL);
if (by_stream)
{
if (body->inflater){
inflate_finished(&body->inflater);
}
if (body->deflate){
deflate_finished(&body->deflate);
}
body->gzip = HTTP2_CONTENT_ENCODING_NONE;
resp->message_state = MANAGE_STAGE_READING;
}
body->evbuf_body = evbuffer_new();
return 0;
}
int h2_half_ops_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data)
{
int xret = -1;
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data,
resp->body.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, data, sz_data);
}
return xret;
}
int h2_half_ops_body_end(struct tfe_http_half * half)
{
struct http2_half_private * resp = nghttp2_to_half_private(half);
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
return 0;
}
struct tfe_http_half_ops h2_half_ops =
{
.ops_http_field_read = half_ops_field_read,
.ops_http_field_write = half_ops_field_write,
.ops_http_allow_write = half_ops_allow_write,
.ops_http_field_iterate = half_ops_field_iterate,
.ops_append_body = half_ops_append_body,
.ops_body_begin = h2_half_ops_body_begin,
.ops_body_data = h2_half_ops_body_data,
.ops_body_end = h2_half_ops_body_end,
.ops_free = half_ops_free
};
static struct tfe_http_session*
h2_ops_allow_write(const struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if ( http_frame_currect_plugin_preempt(stream_data->frame_ctx) == 0){
return (struct tfe_http_session *)session;
}
return NULL;
}
void h2_ops_detach(const struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
return http_frame_currect_plugin_detach(stream_data->frame_ctx);
}
void h2_ops_drop(struct tfe_http_session * session)
{
return;
}
void h2_ops_suspend(struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
stream_data->spd_set = 1;
}
void h2_ops_resume(struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if (stream_data->spd_valid){
stream_data->rse_set = 1;
}
}
void h2_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
struct http2_half_private *half_user = nghttp2_to_half_private(req_user);
stream_data->pangu_req = half_user;
}
void h2_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
struct http2_half_private *half_user = nghttp2_to_half_private(resp);
stream_data->pangu_resp = half_user;
}
static struct http2_half_private*
tfe_half_private_init(enum tfe_http_direction direction)
{
struct http2_half_private *half_private = ALLOC(struct http2_half_private, 1);
assert(half_private);
memset(half_private, 0, sizeof(struct http2_half_private));
half_private->half_public.direction = direction;
half_private->half_public.ops = &h2_half_ops;
headers_init(&half_private->headers);
half_private->body.evbuf_body = evbuffer_new();
half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE;
half_private->body.padlen = 0;
half_private->body_state = MANAGE_STAGE_INIT;
half_private->message_state = MANAGE_STAGE_INIT;
return half_private;
}
struct tfe_http_half * h2_ops_request_create(struct tfe_http_session * session,
enum tfe_http_std_method method, const char * uri)
{
struct http2_half_private * req = tfe_half_private_init(TFE_HTTP_REQUEST);
req->method_or_status = method;
req->url_storage = tfe_strdup(uri);
return &req->half_public;
}
struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session, int resp_code)
{
struct http2_half_private * resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
resp->method_or_status = resp_code;
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
if (stream_data->resp)
resp->body.gzip = stream_data->resp->body.gzip;
return &resp->half_public;
}
void h2_ops_kill(struct tfe_http_session * session)
{
}
struct tfe_http_session_ops nghttp2_session_ops =
{
.ops_allow_write = h2_ops_allow_write,
.ops_detach = h2_ops_detach,
.ops_drop = h2_ops_drop,
.ops_suspend = h2_ops_suspend,
.ops_resume = h2_ops_resume,
.ops_kill = h2_ops_kill,
.ops_request_set = h2_ops_request_set,
.ops_response_set = h2_ops_response_set,
.ops_request_create = h2_ops_request_create,
.ops_response_create = h2_ops_response_create
};
static ssize_t
upstream_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
nghttp2_data_source *source,
void *user_data)
{
#define TFE_MAX_PAYLOADLEN 8192
int datalen = 0;
struct data_t *body = (struct data_t *)source->ptr;
size_t inputlen = evbuffer_get_length(body->evbuf_body);
unsigned char *input = evbuffer_pullup(body->evbuf_body, -1);
if (input == NULL || inputlen == 0){
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
goto finish;
}
if (inputlen > TFE_MAX_PAYLOADLEN){
datalen = TFE_MAX_PAYLOADLEN;
}else{
datalen = inputlen;
}
memcpy(buf, input, datalen);
evbuffer_drain(body->evbuf_body, datalen);
finish:
return datalen;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_response(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int rv = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *pangu_resp = NULL;
pangu_resp = h2_stream->pangu_resp;
if (pangu_resp->message_state != MANAGE_STAGE_COMPLETE){
return (enum tfe_stream_action)ACTION_USER_DATA;
}
headers = &pangu_resp->headers;
if (headers->nvlen <= 0)
return ACTION_FORWARD_DATA;
struct data_t *body = &pangu_resp->body;
char str_sz_evbuf_body[TFE_STRING_MAX];
snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body));
const static struct http_field_name __cont_encoding_length_name = {TFE_HTTP_CONT_LENGTH, NULL};
tfe_http_field_write(&pangu_resp->half_public, &__cont_encoding_length_name, str_sz_evbuf_body);
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void *)body;
data_prd.read_callback = upstream_read_callback;
rv = nghttp2_submit_response(session_info->as_server, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, &data_prd);
if (rv != 0){
return ACTION_FORWARD_DATA;
}
delete_nv_packet_data(headers);
return ACTION_DROP_DATA;
}
static enum tfe_stream_action
server_frame_submit_data(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream,
enum tfe_conn_dir dir)
{
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
struct http2_half_private *resp = (dir == CONN_DIR_UPSTREAM) ? h2_stream->resp : h2_stream->req;
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
if (h2_stream->pangu_resp){
stream_action = nghttp2_server_frame_submit_response(session_info, h2_stream);
}else{
int rv = -1;
struct data_t *body = &resp->body;
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void *)body;
data_prd.read_callback = upstream_read_callback;
//printf("body->flags = %d\n", body->flags);
rv = nghttp2_submit_data(session, body->flags,
h2_stream->stream_id, &data_prd);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
printf("Fatal data error: %s\n", nghttp2_strerror(rv));
}
}
//printf("submit data %d action = %d\n", h2_stream != NULL ? h2_stream->stream_id : NULL, stream_action);
return stream_action;
}
typedef int (*nghttp2_frame_callback) (struct tfe_session_info_t *, const nghttp2_frame *,
enum tfe_conn_dir dir);
typedef int (*nghttp2_callback) (nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir);
static int
nghttp2_submit_frame_priority(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_priority *priority = &frame->priority;
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
int rv = nghttp2_submit_priority(session, priority->hd.flags, priority->hd.stream_id,
&(priority->pri_spec));
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
TFE_LOG_INFO(logger()->handle, "%s, %d, submit priority, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
session_info->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_rst_stream(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_rst_stream *rst_stream = &frame->rst_stream;
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
int rv = nghttp2_submit_rst_stream(session, rst_stream->hd.flags,
rst_stream->hd.stream_id, rst_stream->error_code);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit rst error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
session_info->stream_action = stream_action;
TFE_LOG_INFO(logger()->handle, "%s, %d, submit rst stream, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
return 0;
}
static int
nghttp2_submit_frame_settings(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1, rv = -1;
nghttp2_settings_entry iv[6] = {0};
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
nghttp2_settings settings = frame->settings;
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
if(settings.hd.flags == NGHTTP2_FLAG_ACK){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
rv = nghttp2_submit_settings(session, settings.hd.flags,
nghttp2_iv_packet(settings, iv), settings.niv);
if (rv != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit settings error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
session_info->stream_action = stream_action;
TFE_LOG_INFO(logger()->handle, "%s, %d, submit setting, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
return 0;
}
static int
nghttp2_submit_frame_ping(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1 ,rv = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_ping *ping = &frame->ping;
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
//if(ping->hd.flags & NGHTTP2_FLAG_ACK){
// stream_action = ACTION_FORWARD_DATA;
// goto finish;
//}
rv = nghttp2_submit_ping(session, ping->hd.flags,
ping->opaque_data);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
session_info->stream_action = stream_action;
TFE_LOG_INFO(logger()->handle, "%s, %d, submit ping, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
return 0;
}
void delete_http2_stream_data(struct h2_stream_data_t *h2_stream,
const struct tfe_stream *tf_stream,
int body_flag)
{
if (tf_stream){
//nghttp2_write_access_log(h2_stream, tf_stream->str_stream_info);
}
delete_stream_half_data(&h2_stream->req, body_flag);
delete_stream_half_data(&h2_stream->resp, body_flag);
}
void nghttp2_disect_goaway(struct tfe_session_info_t *session_info)
{
unsigned int thread_id = session_info->thread_id;
const struct tfe_stream * stream = session_info->tf_stream;
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
if (h2_stream->frame_ctx){
http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session,
thread_id);
h2_stream->frame_ctx = NULL;
}
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
}
if (session_info->as_client){
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
}
if (session_info->as_server){
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
}
}
static int
nghttp2_submit_frame_goaway(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
char error[1024] = {0};
const nghttp2_goaway *goaway = &frame->goaway;
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
int rv = nghttp2_submit_goaway(session, goaway->hd.flags, goaway->last_stream_id,
goaway->error_code, goaway->opaque_data, goaway->opaque_data_len);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit goaway error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
snprintf(error, goaway->opaque_data_len, "%s", goaway->opaque_data);
TFE_LOG_INFO(logger()->handle, "%s, %d, submit goaway, stream_id:%d, action:%d, errod_code:%d, data:%s", session_info->tf_stream->str_stream_info,
dir, goaway->last_stream_id, session_info->stream_action, goaway->error_code, goaway->opaque_data);
session_info->goaway = 1;
session_info->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_window_update(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_window_update *window_update = &(frame->window_update);
nghttp2_session *session = (dir == CONN_DIR_UPSTREAM) ? session_info->as_server : session_info->as_client;
int rv = nghttp2_submit_window_update(session, window_update->hd.flags,window_update->hd.stream_id,
window_update->window_size_increment);
if (rv != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit window error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
session_info->stream_action = stream_action;
TFE_LOG_INFO(logger()->handle, "%s, %d, submit window update, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
return 0;
}
static int
nghttp2_set_padlen(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
struct http2_half_private *resp = NULL;
struct h2_stream_data_t *h2_stream = NULL;
if (dir == CONN_DIR_DOWNSTREAM)
goto finish;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM){
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client,
frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
goto finish;
}
resp = h2_stream->resp;
resp->body.padlen = frame->data.padlen;
}
finish:
return 0;
}
static int tfe_half_session_init(struct h2_stream_data_t *h2_stream, int32_t stream_id,
enum tfe_http_direction direction)
{
struct tfe_http_session *tfe_session = &h2_stream->tfe_session;
if (direction == TFE_HTTP_REQUEST){
struct http2_half_private *req = h2_stream->req;
tfe_session->ops = &nghttp2_session_ops;
tfe_session->req = &req->half_public;
tfe_session->session_id = stream_id;
}
if (direction == TFE_HTTP_RESPONSE){
struct http2_half_private *resp = h2_stream->resp;
tfe_session->resp = &resp->half_public;
}
return 0;
}
static void
upstream_create_req(struct tfe_session_info_t *session_info, nghttp2_session *as_server,
struct h2_stream_data_t *h2_stream, int32_t stream_id)
{
struct user_event_dispatch *event = NULL;
struct http2_half_private *half_private = NULL;
h2_stream->stream_id = stream_id;
h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->req, event, NULL);
/* Call business plugin */
half_private = h2_stream->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_STREAM_LOG_ERROR(h2_stream, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream,
&h2_stream->tfe_session, session_info->thread_id);
h2_stream->frame_ctx = half_private->frame_ctx;
TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next);
nghttp2_session_set_stream_user_data(as_server, stream_id, h2_stream);
finish:
return;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
struct http2_half_private *resp = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
resp = h2_stream->resp;
if (resp == NULL)
goto finish;
headers = &resp->promised;
if (headers->nvlen <= 0)
goto finish;
/* Create s' half req*/
_h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1);
assert(_h2_stream);
memset(_h2_stream, 0, sizeof(struct h2_stream_data_t));
stream_id = nghttp2_submit_push_promise(session_info->as_server, headers->flag,
h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, _h2_stream);
if (stream_id < 0){
free(_h2_stream);
_h2_stream = NULL;
TFE_STREAM_LOG_ERROR(h2_stream, "Failed to submit push promise: %s", nghttp2_strerror(stream_id));
goto finish;
}
upstream_create_req(session_info, session_info->as_server, _h2_stream, stream_id);
/*clean header message **/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static int
nghttp2_submit_frame_push_promise(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
struct h2_stream_data_t *h2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
if (dir == CONN_DIR_DOWNSTREAM)
goto finish;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client,
frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
goto finish;
}
stream_action = nghttp2_server_frame_submit_push_promise(session_info, h2_stream);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
}
session_info->stream_action = stream_action;
finish:
TFE_LOG_INFO(logger()->handle, "%s, %d, submit push promise, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
return 0;
}
static void
suspend_start(struct h2_stream_data_t *h2_stream,
struct http2_half_private *half, const struct tfe_stream *stream)
{
if (h2_stream->spd_valid != 1){
return;
}
tfe_stream_resume(stream);
enum tfe_http_event spd_event = h2_stream->spd_event;
h2_stream->spd_event = (enum tfe_http_event)0;
h2_stream->spd_valid = 0;
h2_stream->spd_set_cnt--;
h2_stream->spd_cnt++;
/* Call user callback, tell user we resume from suspend */
h2_stream->rse_set = 0;
half->event_cb(half, spd_event, NULL, 0, half->event_cb_user);
return;
}
static void
fill_resp_spec_from_handle(struct http2_half_private *half_private)
{
struct header_data *head = NULL;
struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec);
foreach_headers(&half_private->headers, head){
if (!strncmp((char *)(head->nv.name), ":status", strlen(":status"))){
resp_spec->resp_code = atoi((const char *)head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-type", strlen("content-type"))){
resp_spec->content_type = (const char *)(head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-encoding", strlen("content-encoding"))){
resp_spec->content_encoding = (const char *)(head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-length", strlen("content-length"))){
resp_spec->content_length = (const char *)(head->nv.value);
continue;
}
}
resp_spec->content_length = 0;
return;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_header(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = 0;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *resp = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
if (h2_stream->pangu_resp != NULL){
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
goto finish;
}
resp = h2_stream->resp;
if (resp == NULL){
return ACTION_FORWARD_DATA;
}
headers = &resp->headers;
if (headers->nvlen <= 0){
return ACTION_FORWARD_DATA;
}
stream_id = nghttp2_submit_headers(session_info->as_server, headers->flag,
h2_stream->stream_id, NULL, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, h2_stream);
if (stream_id < 0){
printf("Fatal headers error: %s\n", nghttp2_strerror(stream_id));
}
delete_nv_packet_data(headers);
finish:
return stream_action;
}
int
nghttp2_headers_write_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info,
int dir)
{
/* Request */
struct http2_half_private *req = h2_stream->req;
/* Response */
struct http2_half_private *resp = h2_stream->resp;
/* Req-Public */
struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL;
/* Resp-Public */
struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL;
const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-";
const char * url = req_spec ? req_spec->url : "-";
char resp_code[TFE_STRING_MAX];
if (resp_spec)
snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code);
else
snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-");
const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
const char * cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
const char *hmsg = (dir == CONN_DIR_UPSTREAM) ? "response" : "request";
char *access_log;
asprintf(&access_log, "%s %d %s stream_id:%d %s %s HTTP2.0 %s %s %s", str_stream_info, dir, hmsg, h2_stream->tfe_session.session_id,
method, url, resp_code, cont_type, cont_encoding);
TFE_LOG_INFO(logger()->handle, "%s", access_log);
free(access_log);
return 0;
}
static int
nghttp2_server_submit_header(struct tfe_session_info_t *session_info, int32_t stream_id)
{
int xret = -1;
struct http2_half_private *resp = NULL;
struct h2_stream_data_t *h2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_client,
stream_id);
if (!h2_stream){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)",
stream_id, session_info);
goto finish;
}
resp = h2_stream->resp;
suspend_start(h2_stream, resp, session_info->tf_stream);
fill_resp_spec_from_handle(h2_stream->resp);
resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user);
if (h2_stream->spd_set){
h2_stream->spd_event = EV_HTTP_RESP_HDR;
h2_stream->spd_valid = 1;
h2_stream->spd_set = 0;
h2_stream->spd_set_cnt++;
h2_stream->spd_cnt++;
tfe_stream_suspend(session_info->tf_stream, CONN_DIR_UPSTREAM);
stream_action = ACTION_DEFER_DATA;
goto finish;
}
nghttp2_headers_write_log(h2_stream,session_info->tf_stream->str_stream_info, CONN_DIR_UPSTREAM);
stream_action = nghttp2_server_frame_submit_header(session_info, h2_stream);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
finish:
session_info->stream_action = stream_action;
return 0;
}
static void
fill_req_spec_from_handle(struct http2_half_private *half_private)
{
int urllen = 0;
struct header_data *head = NULL;
struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec);
foreach_headers(&half_private->headers, head){
if (!strncmp((char *)(head->nv.name), ":method", strlen(":method"))){
req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(head->nv.value), method_vals);
continue;
}
if (!strncmp((char *)(head->nv.name), ":authority", strlen(":authority"))){
req_spec->host = (const char *)(head->nv.value);
urllen += head->nv.valuelen;
continue;
}
if (!strncmp((char *)(head->nv.name), ":path", strlen(":path"))){
req_spec->uri = (const char*)(head->nv.value);
urllen += head->nv.valuelen;
continue;
}
}
char *urltmp = NULL;
urltmp = (char *)malloc(urllen + 1);
if(urltmp){
sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri);
req_spec->url = urltmp;
}
return;
}
static int
suspend_stop(struct h2_stream_data_t *h2_stream,
const struct tfe_stream *tf_stream, enum tfe_conn_dir dir)
{
int xret = -1;
if (h2_stream->spd_set){
h2_stream->spd_valid = 1;
h2_stream->spd_set = 0;
tfe_stream_suspend(tf_stream, dir);
xret = 0;
}
return xret;
}
static void
tfe_make_nv(struct http2_headers *headers,
const char *name,const char *value, int flag)
{
/*Add head*/
struct header_data *head = NULL;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = strlen(name);
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
head->nv.valuelen = strlen(value);
headers->flag = 0x04;
if (flag)
headers_add_head(headers, head);
else
headers_add_tail(headers, head);
}
static enum tfe_stream_action
tfe_submit_response(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct http2_half_private *resp = h2_stream->pangu_resp;
#define VALUE_LEN 128
char value[VALUE_LEN] = {0};
snprintf(value, VALUE_LEN, "%d", resp->method_or_status);
tfe_make_nv(&resp->headers, ":status", (const char *)value, 1);
snprintf(value, VALUE_LEN, "tfe/%s", tfe_version());
tfe_make_nv(&resp->headers, "X-TG-Construct-By", (const char *)value, 0);
stream_action = nghttp2_server_frame_submit_response(session_info, h2_stream);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_DROP_DATA){
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
}
return stream_action;
}
static void
downstream_create_resp(struct h2_stream_data_t *h2_stream, nghttp2_session *as_client,
const struct tfe_stream *tf_stream, unsigned int thread_id)
{
struct user_event_dispatch *event = NULL;
if (h2_stream->resp)
goto finish;
h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
tfe_half_session_init(h2_stream, h2_stream->stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = thread_id;
event->tf_stream = tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->resp, event, NULL);
h2_stream->resp->frame_ctx = h2_stream->frame_ctx;
nghttp2_session_set_stream_user_data(as_client, h2_stream->stream_id, h2_stream);
finish:
return;
}
static enum tfe_stream_action
nghttp2_client_frame_submit_header(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *req = NULL;
enum tfe_http_std_method method = (enum tfe_http_std_method)NGHTTP2_METHOD_UNKNOWN;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
if (0 == suspend_stop(h2_stream, session_info->tf_stream, CONN_DIR_DOWNSTREAM)){
stream_action = ACTION_DEFER_DATA;
goto finish;
}
req = h2_stream->pangu_req != NULL ? h2_stream->pangu_req : h2_stream->req;
if (req == NULL){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
if (h2_stream->pangu_resp){
stream_action = tfe_submit_response(session_info, h2_stream);
goto finish;
}
headers = &req->headers;
if (headers->nvlen <= 0){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
/*Create C' half_private_resp**/
downstream_create_resp(h2_stream, session_info->as_client, session_info->tf_stream, session_info->thread_id);
nghttp2_session_set_next_stream_id(session_info->as_client, h2_stream->stream_id);
method = nghttp2_get_method(h2_stream->req);
if (method == (enum tfe_http_std_method)NGHTTP2_METHOD_POST ||
method == (enum tfe_http_std_method)NGHTTP2_METHOD_PUT){
stream_id = nghttp2_submit_headers(session_info->as_client, headers->flag,
-1, NULL, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, h2_stream);
}else{
stream_id = nghttp2_submit_request(session_info->as_client, NULL,
nghttp2_nv_packet(headers, hdrs),
headers->nvlen, NULL, h2_stream);
}
if (stream_id < 0){
TFE_LOG_ERROR(logger()->handle, "Could not submit request: %s",
nghttp2_strerror(stream_id));
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
/*clear headers data ***/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static int
nghttp2_client_submit_header(struct tfe_session_info_t *session_info, int32_t stream_id)
{
int xret = -1;
struct http2_half_private *req = NULL;
struct h2_stream_data_t *h2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session_info->as_server,
stream_id);
if (!h2_stream){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
req = h2_stream->req;
suspend_start(h2_stream, req, session_info->tf_stream);
fill_req_spec_from_handle(h2_stream->req);
req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user);
if (h2_stream->spd_set){
h2_stream->spd_event = EV_HTTP_REQ_HDR;
h2_stream->spd_valid = 1;
h2_stream->spd_set = 0;
h2_stream->spd_set_cnt++;
h2_stream->spd_cnt++;
tfe_stream_suspend(session_info->tf_stream, CONN_DIR_DOWNSTREAM);
stream_action = ACTION_DEFER_DATA;
goto finish;
}
nghttp2_headers_write_log(h2_stream, session_info->tf_stream->str_stream_info, CONN_DIR_DOWNSTREAM);
stream_action = nghttp2_client_frame_submit_header(session_info, h2_stream);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
finish:
session_info->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_header(struct tfe_session_info_t *session_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = 0;
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){
if (dir == CONN_DIR_UPSTREAM){
xret = nghttp2_server_submit_header(session_info, frame->hd.stream_id);
TFE_LOG_INFO(logger()->handle, "%s, %d, submit response header, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
}
if (dir == CONN_DIR_DOWNSTREAM){
xret = nghttp2_client_submit_header(session_info, frame->hd.stream_id);
TFE_LOG_INFO(logger()->handle, "%s, %d, submit request header, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, session_info->stream_action);
}
}
return xret;
}
nghttp2_frame_callback nghttp2_frame_callback_array[] = {
[NGHTTP2_DATA] = nghttp2_set_padlen,
[NGHTTP2_HEADERS] = nghttp2_submit_frame_header,
[NGHTTP2_PRIORITY] = nghttp2_submit_frame_priority,
[NGHTTP2_RST_STREAM] = nghttp2_submit_frame_rst_stream,
[NGHTTP2_SETTINGS] = nghttp2_submit_frame_settings,
[NGHTTP2_PUSH_PROMISE] = nghttp2_submit_frame_push_promise,
[NGHTTP2_PING] = nghttp2_submit_frame_ping,
[NGHTTP2_GOAWAY] = nghttp2_submit_frame_goaway,
[NGHTTP2_WINDOW_UPDATE] = nghttp2_submit_frame_window_update,
[NGHTTP2_CONTINUATION] = NULL,
[NGHTTP2_ALTSVC] = NULL,
};
static int
nghttp2_fill_up_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value, size_t valuelen,
uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD;
struct header_data *head = NULL;
struct http2_headers *headers = NULL;
if (dir == CONN_DIR_DOWNSTREAM &&
frame->headers.cat != NGHTTP2_HCAT_REQUEST){
return 0;
}
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(logger()->handle, "Header stream id %d, can't find stream information",
frame->hd.stream_id);
return 0;
}
struct http2_half_private *half = (dir == CONN_DIR_UPSTREAM) ? h2_stream->resp : h2_stream->req;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
head->field.field_name = (const char *)head->nv.name;
}else{
if (field_id == TFE_HTTP_CONT_ENCODING){
half->body.gzip = method_to_str_idx((const char *)value);
}
head->field.field_id = field_id;
head->field.field_name = NULL;
}
headers = &half->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
return 0;
}
static int
nghttp2_fill_up_promise(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
struct header_data *head = NULL;
struct http2_headers *headers = NULL;
struct http2_half_private *resp = NULL;
enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD;
if (dir == CONN_DIR_DOWNSTREAM)
return 0;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(logger()->handle, "Promise stream id %d, can't find stream information",
frame->hd.stream_id);
return 0;
}
resp = h2_stream->resp;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
head->field.field_name = (const char *)head->nv.name;
}else{
if (field_id == TFE_HTTP_CONT_ENCODING){
resp->body.gzip = method_to_str_idx((const char *)value);
}
head->field.field_id = field_id;
head->field.field_name = NULL;
}
headers = &resp->promised;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
return 0;
}
static int
nghttp2_data_send(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *data,
size_t length, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
int ret = -1;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
ret = tfe_stream_write(session_info->tf_stream, dir, data, length);
if (unlikely(ret < 0)){
assert(0);
}
return (ssize_t)length;
}
static void
submit_end_data(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
struct http2_half_private *resp = h2_stream->resp;
if (resp->body_state != MANAGE_STAGE_INIT){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0,
resp->event_cb_user);
}
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0,
resp->event_cb_user);
}
}
struct data_t *body = &resp->body;
body->flags |= NGHTTP2_FLAG_END_STREAM;
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_UPSTREAM);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret));
}
}
TFE_LOG_INFO(logger()->handle, "%s, 1, End of stream submit, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
h2_stream->stream_id, session_info->stream_action);
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
session_info->stream_action = stream_action;
return;
}
static int
nghttp2_on_stream_close(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *resp = NULL;
int32_t stream_id = frame->hd.stream_id;
uint32_t error_code = frame->goaway.error_code;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (!h2_stream)
return 0;
if (error_code != 0){
const char *str = (dir == CONN_DIR_UPSTREAM) ? "Upstream" : "Downstream";
TFE_LOG_DEBUG(logger()->handle, "%s close, id = %d, error_code = %d", str,
stream_id, error_code);
}
if (dir == CONN_DIR_DOWNSTREAM)
goto finish;
resp = h2_stream->resp;
if (error_code == 0 && resp->body_state != MANAGE_STAGE_COMPLETE){
submit_end_data(session_info, h2_stream);
goto end;
}
finish:
TAILQ_REMOVE(&session_info->list, h2_stream, next);
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
end:
return 0;
}
nghttp2_callback nghttp2_callback_array[] = {
[NGHTTP2_DATA] = NULL,
[NGHTTP2_HEADERS] = nghttp2_fill_up_header,
[NGHTTP2_PRIORITY] = NULL,
[NGHTTP2_RST_STREAM] = NULL,
[NGHTTP2_SETTINGS] = NULL,
[NGHTTP2_PUSH_PROMISE] = nghttp2_fill_up_promise,
[NGHTTP2_PING] = NULL,
[NGHTTP2_GOAWAY] = NULL,
[NGHTTP2_WINDOW_UPDATE] = NULL,
[NGHTTP2_CONTINUATION] = NULL,
[NGHTTP2_ALTSVC] = NULL,
[NGHTTP2_USER_SEND] = nghttp2_data_send,
[NGHTTP2_USER_COLSE] = nghttp2_on_stream_close
};
void
nghttp2_write_access_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info)
{
/* Request */
struct http2_half_private *req = h2_stream->req;
/* Response */
struct http2_half_private *resp = h2_stream->resp;
/* Req-Public */
struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL;
/* Resp-Public */
struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL;
const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-";
const char * url = req_spec ? req_spec->url : "-";
char resp_code[TFE_STRING_MAX];
if (resp_spec)
snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code);
else
snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-");
const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
const char * cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
const char * pangu_req = h2_stream->pangu_req ? "USER/REQ" : "-";
const char * pangu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-";
//const char * __str_suspend = h2_stream->suspend_counter > 0 ? "SUSPEND" : "-";
char *access_log;
asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_stream->tfe_session.session_id,
method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp);
TFE_LOG_INFO(logger()->handle, "%s", access_log);
free(access_log);
}
static ssize_t
nghttp2_client_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){
return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length,
NULL, 0, flags, user_data, CONN_DIR_UPSTREAM);
}
return (ssize_t)length;
}
static int
nghttp2_client_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){
return nghttp2_frame_callback_array[frame->hd.type](session_info, frame, CONN_DIR_UPSTREAM);
}
return 0;
}
static int
nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *input,
size_t input_len, void *user_data)
{
size_t len;
char *uncompr = NULL;
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
int uncompr_len = 0, __attribute__((__unused__))ret = 0;
const unsigned char *data;
struct http2_half_private * resp = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id);
if (!h2_stream){
TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d",
stream_id);
goto finish;
}
resp = h2_stream->resp;
evbuffer_add(resp->body.evbuf_body, input, input_len);
if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&resp->body.inflater, resp->body.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
}
}
data = input;
len = input_len;
if (resp->body_state == MANAGE_STAGE_INIT){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len,
resp->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM){
resp->body.flags = 0;
}else{
resp->body.flags = flags;
}
resp->body_state = MANAGE_STAGE_READING;
}
if (resp->body_state == MANAGE_STAGE_READING){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len,
resp->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM){
resp->body.flags = 0;
}else{
resp->body.flags = flags;
}
//goto finish;
}
stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_UPSTREAM);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
session_info->stream_action = stream_action;
finish:
TFE_LOG_INFO(logger()->handle, "%s, 1, submit data %d, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
(int)input_len, stream_id, session_info->stream_action);
return 0;
}
static int
nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
nghttp2_frame frame;
memset(&frame, 0, sizeof(frame));
frame.hd.stream_id = stream_id;
frame.goaway.error_code = error_code;
if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){
return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0,
NULL, 0, 0, user_data, CONN_DIR_UPSTREAM);
}
return 0;
}
static int
nghttp2_client_on_header(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
if ( nghttp2_callback_array[frame->hd.type] != NULL){
return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen,
value, valuelen, flags, user_data, CONN_DIR_UPSTREAM);
}
return 0;
}
static struct h2_stream_data_t*
create_upstream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct user_event_dispatch *event = NULL;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (h2_stream == NULL){
/** todo:When the data of the reply is pushed as promised,
there is no stream id at the reply end. to create it*/
goto finish;
}
if (h2_stream->resp){
goto finish;
}
h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->resp, event, NULL);
h2_stream->resp->frame_ctx = h2_stream->frame_ctx;
nghttp2_session_set_stream_user_data(session, stream_id, h2_stream);
finish:
return h2_stream;
}
static ssize_t nghttp2_client_select_padding_callback(nghttp2_session *session,
const nghttp2_frame *frame,
size_t max_payloadlen, void *user_data)
{
struct http2_half_private *resp = NULL;
struct h2_stream_data_t *h2_stream = NULL;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream)
return frame->hd.length;
resp = h2_stream->resp;
return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->body.padlen));
}
static int
nghttp2_client_on_begin_headers(nghttp2_session * session,
const nghttp2_frame * frame,
void * user_data)
{
(void)session;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch(frame->hd.type){
case NGHTTP2_HEADERS:
create_upstream_data(session, frame->hd.stream_id, session_info);
break;
default:
break;
}
return 0;
}
static
void client_session_init(struct tfe_session_info_t *session_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks,
nghttp2_client_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_client_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks,
nghttp2_client_on_data_chunk_recv);
nghttp2_session_callbacks_set_select_padding_callback(callbacks,
nghttp2_client_select_padding_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks,
nghttp2_client_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_client_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks,
nghttp2_client_on_begin_headers);
nghttp2_session_client_new(&session_info->as_client, callbacks, session_info);
nghttp2_session_callbacks_del(callbacks);
}
static ssize_t
nghttp2_server_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){
return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length,
NULL, 0, flags, user_data, CONN_DIR_DOWNSTREAM);
}
return (ssize_t)length;
}
static int
nghttp2_server_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){
return nghttp2_frame_callback_array[frame->hd.type](session_info, frame, CONN_DIR_DOWNSTREAM);
}
return 0;
}
static int
nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
nghttp2_frame frame;
memset(&frame, 0, sizeof(frame));
frame.hd.stream_id = stream_id;
frame.goaway.error_code = error_code;
if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){
return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0,
NULL, 0, 0, user_data, CONN_DIR_DOWNSTREAM);
}
return 0;
}
static int
nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
if ( nghttp2_callback_array[frame->hd.type] != NULL){
return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen, value,
valuelen, flags, user_data,CONN_DIR_DOWNSTREAM);
}
return 0;
}
static void
create_serv_stream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct user_event_dispatch *event = NULL;
struct http2_half_private *half_private = NULL;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (h2_stream != NULL){
goto finish;
}
h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1);
assert(h2_stream);
memset(h2_stream, 0, sizeof(struct h2_stream_data_t));
h2_stream->stream_id = stream_id;
h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->req, event, NULL);
/* Call business plugin */
half_private = h2_stream->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_LOG_ERROR(logger()->handle, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream,
&h2_stream->tfe_session, session_info->thread_id);
h2_stream->frame_ctx = half_private->frame_ctx;
TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next);
nghttp2_session_set_stream_user_data(session, stream_id, h2_stream);
finish:
return;
}
static int
nghttp2_server_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *input,
size_t input_len, void *user_data)
{
size_t __attribute__((__unused__))len;
char *uncompr = NULL;
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
int uncompr_len = 0, __attribute__((__unused__))ret = 0;
const unsigned char __attribute__((__unused__))*data;
struct http2_half_private * req = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id);
if (!h2_stream){
TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d",
stream_id);
goto finish;
}
req = h2_stream->req;
evbuffer_add(req->body.evbuf_body, input, input_len);
if (req->body.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&req->body.inflater, req->body.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
}
}
data = input;
len = input_len;
stream_action = server_frame_submit_data(session_info, h2_stream, CONN_DIR_DOWNSTREAM);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret));
}
}
TFE_LOG_INFO(logger()->handle, "%s, %d, submit data %d, stream_id:%d, action:%d", session_info->tf_stream->str_stream_info,
0, (int)input_len, stream_id, session_info->stream_action);
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
session_info->stream_action = stream_action;
finish:
return 0;
}
static int
nghttp2_server_on_begin_headers(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
return 0;
}
create_serv_stream_data(session, frame->hd.stream_id, session_info);
return 0;
}
static void
server_session_init(struct tfe_session_info_t *session_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_server_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks,
nghttp2_server_on_data_chunk_recv);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, nghttp2_server_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_server_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, nghttp2_server_on_begin_headers);
nghttp2_session_server_new(&session_info->as_server, callbacks, session_info);
nghttp2_session_callbacks_del(callbacks);
}
static void
delete_server_session_data(struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream;
struct h2_stream_data_t *_h2_stream;
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
free(h2_stream);
h2_stream = NULL;
}
}
static void
delete_client_session_data(struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream;
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
free(h2_stream);
h2_stream = NULL;
}
}
enum tfe_stream_action
detect_up_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0, xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
session_info->tf_stream = tfe_stream;
session_info->thread_id = thread_id;
if (!session_info->as_server)
goto forward;
readlen = nghttp2_session_mem_recv(session_info->as_client, data, len);
if (readlen < 0){
TFE_LOG_ERROR(logger()->handle, "Failed to process server requests. Link message %s",
tfe_stream->str_stream_info);
delete_client_session_data(session_info);
goto err;
}
stream_action = session_info->stream_action;
session_info->stream_action = ACTION_DROP_DATA;
//printf("up stream_acion = %d\n", stream_action);
if (stream_action == ACTION_FORWARD_DATA){
xret = nghttp2_session_send(session_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
stream_action = ACTION_DROP_DATA;
}
if (session_info->goaway){
nghttp2_disect_goaway(session_info);
session_info->goaway = 0;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
forward:
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
err:
tfe_stream_detach(tfe_stream);
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
enum tfe_stream_action
detect_down_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0, xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
session_info->tf_stream = tfe_stream;
session_info->thread_id = thread_id;
if (!session_info->as_server)
goto forward;
readlen = nghttp2_session_mem_recv(session_info->as_server, data, len);
if (readlen < 0){
TFE_LOG_ERROR(logger()->handle, "Failed to process client requests. Link message %s",
tfe_stream->str_stream_info);
delete_server_session_data(session_info);
goto err;
}
stream_action = session_info->stream_action;
session_info->stream_action = ACTION_DROP_DATA;
// printf("down stream_acion = %d\n", stream_action);
if (stream_action == ACTION_FORWARD_DATA){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret));
}
stream_action = ACTION_DROP_DATA;
}
if (session_info->goaway){
nghttp2_disect_goaway(session_info);
session_info->goaway = 0;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
forward:
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
err:
tfe_stream_detach(tfe_stream);
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
void
sess_data_ctx_fini(struct tfe_session_info_t *session_info,
const struct tfe_stream * stream, unsigned int thread_id)
{
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
if (h2_stream->frame_ctx){
http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session,
thread_id);
h2_stream->frame_ctx = NULL;
}
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
}
if (session_info->as_client){
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
}
if (session_info->as_server){
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
}
}
struct tfe_session_info_t* tfe_session_info_init()
{
struct tfe_session_info_t *session_info = NULL;
session_info = ALLOC(struct tfe_session_info_t, 1);
assert(session_info);
memset(session_info, 0, sizeof(struct tfe_session_info_t));
session_info->stream_action = ACTION_DROP_DATA;
session_info->goaway = 0;
server_session_init(session_info);
client_session_init(session_info);
TAILQ_INIT(&session_info->list);
return session_info;
}