This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/protocol/http2/src/http2_stream.cpp
fengweihao 4b0235d199 修改key_keeper请求为post
修改key_keeper请求连接为长连接
修改HTTP2流id设置时机
2019-09-16 14:01:14 +08:00

2735 lines
88 KiB
C++

/*************************************************************************
> File Name: http2_stream.c
> Author:
> Mail:
> Created Time:
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <stdint.h>
#include <math.h>
#include <sys/param.h>
#include <zlib.h>
#include <event2/buffer.h>
#include <tfe_utils.h>
#include <tfe_stream.h>
#include <nghttp2/nghttp2.h>
#include <http2_stream.h>
#include <http2_common.h>
#include <nghttp2_session.h>
#include <nghttp2_map.h>
#include <tfe_stream.h>
/*
---------------------------------------------------------------------------------------------------------------------------------
|No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR |
---------------------------------------------------------------------------------------------------------------------------------
|0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 |
---------------------------------------------------------------------------------------------------------------------------------
|REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED|
---------------------------------------------------------------------------------------------------------------------------------
|0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d |
---------------------------------------------------------------------------------------------------------------------------------
*/
static const struct value_string method_vals[] =
{
{NGHTTP2_METHOD_DELETE, "DELETE"},
{NGHTTP2_METHOD_GET, "GET"},
{NGHTTP2_METHOD_HEAD, "HEAD"},
{NGHTTP2_METHOD_POST, "POST"},
{NGHTTP2_METHOD_PUT, "PUT"},
{NGHTTP2_METHOD_CONNECT, "CONNECT"},
{NGHTTP2_METHOD_OPTIONS, "OPTIONS"},
{NGHTTP2_METHOD_UNKNOWN, "unknown"},
};
static const struct value_string header_vals[] =
{
{TFE_HTTP_UNKNOWN_FIELD, "unkown"},
{TFE_HTTP_HOST, ":authority"},
{TFE_HTTP_REFERER, "referer"},
{TFE_HTTP_USER_AGENT, "user-agent"},
{TFE_HTTP_COOKIE, "cookie"},
{TFE_HTTP_SET_COOKIE, "set-cookie"},
{TFE_HTTP_PROXY_AUTHORIZATION, "proxy-authorization"},
{TFE_HTTP_AUTHORIZATION, "authorization"},
{TFE_HTTP_LOCATION, "location"},
{TFE_HTTP_SERVER, "server"},
{TFE_HTTP_ETAG, "etag"},
{TFE_HTTP_DATE, "date"},
{TFE_HTTP_TRAILER, "Trailer"},
{TFE_HTTP_TRANSFER_ENCODING, "transfer-encoding"},
{TFE_HTTP_VIA, "via"},
{TFE_HTTP_PRAGMA, "pragma"},
{TFE_HTTP_CONNECTION, "connection"},
{TFE_HTTP_CONT_ENCODING, "content-encoding"},
{TFE_HTTP_CONT_LANGUAGE, "content-language"},
{TFE_HTTP_CONT_LOCATION, "content-location"},
{TFE_HTTP_CONT_RANGE, "content-range"},
{TFE_HTTP_CONT_LENGTH, "content-length"},
{TFE_HTTP_CONT_TYPE, "content-type"},
{TFE_HTTP_CONT_DISPOSITION, "content-disposition"},
{TFE_HTTP_EXPIRES, "expires"},
{TFE_HTTP_ACCEPT_ENCODING, "accept-encoding"},
{TFE_HTTP_CACHE_CONTROL, "cache-control"},
{TFE_HTTP_IF_MATCH, "if-match"},
{TFE_HTTP_IF_NONE_MATCH, "if-none-match"},
{TFE_HTTP_IF_MODIFIED_SINCE, "if-modified-since"},
{TFE_HTTP_IF_UNMODIFIED_SINCE, "if-unmodified-since"},
{TFE_HTTP_LAST_MODIFIED, "last-modified"},
};
typedef enum {
NGHTTP2_USER_SEND = 0x0b,
NGHTTP2_USER_COLSE = 0x0c,
} nghttp2_frame_user_type;
struct user_event_dispatch
{
const struct tfe_stream *tf_stream;
const struct tfe_http_session * tfe_session;
unsigned int thread_id;
};
/*up stream */
static struct tfe_h2_session *
TAILQ_LIST_FIND(struct tfe_h2_stream *h2_stream_info, int32_t stream_id)
{
struct tfe_h2_session *stream = NULL, *_next_stream = NULL;
TAILQ_FOREACH_SAFE(stream, &h2_stream_info->h2_session_list, next, _next_stream)
{
if (stream->ngh2_stream_id == stream_id){
break;
}
}
return stream;
}
static void
tfe_h2_header_add_field(struct tfe_h2_header *h2_header, const struct http_field_name * field, const char * value, int at_tail)
{
struct tfe_h2_field *peer_h2_field = ALLOC(struct tfe_h2_field, 1);
peer_h2_field->field = http_field_name_duplicate(field);
if (peer_h2_field->field->field_id == TFE_HTTP_UNKNOWN_FIELD)
{
peer_h2_field->nv.name = (uint8_t *)peer_h2_field->field->field_name;
peer_h2_field->nv.namelen = strlen(peer_h2_field->field->field_name);
}else
{
const char *std_name = val_to_str(field->field_id, header_vals);
peer_h2_field->nv.name = (uint8_t *)tfe_strdup((const char *)std_name);
peer_h2_field->nv.namelen = strlen(std_name);
}
peer_h2_field->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
peer_h2_field->nv.valuelen = strlen(value);
h2_header->nvlen++;
if (at_tail)
TAILQ_INSERT_TAIL(&h2_header->h2_field_list, peer_h2_field, next);
else
TAILQ_INSERT_HEAD(&h2_header->h2_field_list, peer_h2_field, next);
}
static nghttp2_nv*
tfe_h2_header_modify_field(struct tfe_h2_header *header, nghttp2_nv *hdrs, const char *field_name, const char *filed_value)
{
int nvlen = 0;
struct tfe_h2_field *h2_field = NULL, *peer_h2_field = NULL;
TAILQ_FOREACH_SAFE(h2_field, &header->h2_field_list, next, peer_h2_field)
{
hdrs[nvlen].name = h2_field->nv.name;
hdrs[nvlen].namelen = h2_field->nv.namelen;
if (filed_value && (0==strcasecmp((const char*)h2_field->nv.name, field_name)))
{
hdrs[nvlen].value = (uint8_t *)filed_value;
hdrs[nvlen].valuelen = strlen(filed_value);
}
else
{
hdrs[nvlen].value = h2_field->nv.value;
hdrs[nvlen].valuelen = h2_field->nv.valuelen;
}
hdrs[nvlen].flags = h2_field->nv.flags;
nvlen++;
}
return hdrs;
}
static inline void
headers_init(struct tfe_h2_header *header)
{
header->nvlen = 0;
header->flag = 0;
TAILQ_INIT(&header->h2_field_list);
}
const char * method_idx_to_str(int encode)
{
switch (encode)
{
case HTTP2_CONTENT_ENCODING_GZIP: return "gzip";
case HTTP2_CONTENT_ENCODING_X_GZIP: return "x-gzip";
case HTTP2_CONTENT_ENCODING_DEFLATE: return "deflate";
case HTTP2_CONTENT_ENCODING_BZIP2: return "bzip2";
case HTTP2_CONTENT_ENCODING_X_BZIP2: return "x-bzip2";
case HTTP2_CONTENT_ENCODING_BR: return "br";
default: return "";
}
}
static int
method_to_str_idx(const char * method)
{
if (strcasestr(method, "gzip") != NULL)
return HTTP2_CONTENT_ENCODING_GZIP;
if (strcasestr(method, "x-gzip") != NULL)
return HTTP2_CONTENT_ENCODING_X_GZIP;
if (strcasestr(method, "deflate") != NULL)
return HTTP2_CONTENT_ENCODING_DEFLATE;
if (strcasestr(method, "bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_BZIP2;
if (strcasestr(method, "x-bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_X_BZIP2;
if (strcasestr(method, "br") != NULL)
return HTTP2_CONTENT_ENCODING_BR;
return HTTP2_CONTENT_ENCODING_NONE;
}
static nghttp2_nv*
tfe_h2_header_convert_nv(struct tfe_h2_header *header, nghttp2_nv *hdrs)
{
int nvlen = 0;
struct tfe_h2_field *h2_field = NULL, *peer_h2_field = NULL;
TAILQ_FOREACH_SAFE(h2_field, &header->h2_field_list, next, peer_h2_field){
hdrs[nvlen].name = h2_field->nv.name;
hdrs[nvlen].namelen = h2_field->nv.namelen;
hdrs[nvlen].value = h2_field->nv.value;
hdrs[nvlen].valuelen = h2_field->nv.valuelen;
hdrs[nvlen].flags = h2_field->nv.flags;
nvlen++;
}
return hdrs;
}
static enum tfe_http_std_method
nghttp2_get_method(struct tfe_h2_half_private *half_private)
{
struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec);
return req_spec->method;
}
static nghttp2_session * tfe_h2_stream_get_nghttp2_session(struct tfe_h2_stream *connection, enum tfe_conn_dir dir)
{
return (dir==CONN_DIR_UPSTREAM?connection->as_server:connection->as_client);
}
static nghttp2_session * tfe_h2_stream_get_nghttp2_peer_session(struct tfe_h2_stream *connection, enum tfe_conn_dir dir)
{
return (dir==CONN_DIR_UPSTREAM?connection->as_client: connection->as_server);
}
static struct tfe_h2_half_private *tfe_h2_stream_get_half(struct tfe_h2_session *h2_session, enum tfe_conn_dir dir)
{
return (dir==CONN_DIR_UPSTREAM?h2_session->resp: h2_session->req);
}
static nghttp2_settings_entry*
nghttp2_iv_packet(nghttp2_settings settings,
nghttp2_settings_entry *out_iv)
{
int i = 0;
nghttp2_settings_entry *iv = settings.iv;
for (i = 0; i < (int)settings.niv; i++){
out_iv[i].settings_id = iv[i].settings_id;
out_iv[i].value = iv[i].value;
}
return out_iv;
}
static void
delete_nv_packet_data(struct tfe_h2_header *header)
{
struct tfe_h2_field *h2_filed=NULL, *peer_h2_filed=NULL;
TAILQ_FOREACH_SAFE(h2_filed, &header->h2_field_list, next, peer_h2_filed)
{
TAILQ_REMOVE(&header->h2_field_list, h2_filed, next);
free(h2_filed->nv.name);
h2_filed->nv.name = NULL;
h2_filed->nv.namelen = 0;
free(h2_filed->nv.value);
h2_filed->nv.value = NULL;
h2_filed->nv.valuelen = 0;
free(h2_filed->field);
h2_filed->field = NULL;
free(h2_filed);
h2_filed = NULL;
}
header->nvlen = 0;
header->flag = 0;
}
static int event_dispatch_cb(struct tfe_h2_half_private * half_private,
enum tfe_http_event ev, const unsigned char * data, size_t len, void * user)
{
struct user_event_dispatch *event = (struct user_event_dispatch *)user;
struct http_frame_session_ctx *frame_ctx = half_private->frame_ctx;
http_frame_raise_event(frame_ctx, event->tf_stream, (struct tfe_http_session *)event->tfe_session,
ev, data, len, event->thread_id);
return 0;
}
void half_set_callback(struct tfe_h2_half_private * half_private,
void * user, void (* user_deleter)(void *))
{
half_private->event_cb = event_dispatch_cb;
half_private->event_cb_user = user;
half_private->event_cb_user_deleter = user_deleter;
}
const char * h2_half_ops_field_read(const struct tfe_http_half * half, const struct http_field_name * field)
{
const struct tfe_h2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
return NULL;
struct tfe_h2_field *h2_field=NULL, *peer_h2_field=NULL;
const struct tfe_h2_header *h2_header =&(half_private->header);
TAILQ_FOREACH(h2_field, &h2_header->h2_field_list, next)
{
if (http_field_name_compare(h2_field->field, field) != 0) continue;
peer_h2_field = h2_field;
break;
}
return peer_h2_field != NULL ? (const char *)peer_h2_field->nv.value : NULL;
}
int h2_half_ops_field_write(struct tfe_http_half * half, const struct http_field_name * field, const char * value)
{
struct tfe_h2_half_private *half_private = nghttp2_to_half_private(half);
struct tfe_h2_header *h2_header = &(half_private->header);
struct tfe_h2_field *h2_field=NULL,*peer_h2_field=NULL;
if (value != NULL)
{
tfe_h2_header_add_field(h2_header, field, value, 1);
}
else
{
bool delete_success = false;
TAILQ_FOREACH_SAFE(h2_field, &h2_header->h2_field_list, next, peer_h2_field)
{
if (http_field_name_compare(h2_field->field, field) != 0)
continue;
TAILQ_REMOVE(&h2_header->h2_field_list, h2_field, next);
free(h2_field->nv.name);
free(h2_field->nv.value);
free(h2_field);
h2_header->nvlen--;
delete_success = true;
}
return delete_success ? 0 : -ENOENT;
}
return 0;
}
static struct tfe_http_half *
h2_half_ops_allow_write(const struct tfe_http_half * half)
{
return (struct tfe_http_half *) half;
}
const char * h2_half_ops_field_iterate(const struct tfe_http_half * half, void ** iter, struct http_field_name * field)
{
struct tfe_h2_field **h2_filed = (struct tfe_h2_field **)iter;
const struct tfe_h2_half_private *half_private = nghttp2_to_half_private(half);
const struct tfe_h2_header *header = &half_private->header;
if (*h2_filed == NULL)
{
*h2_filed = TAILQ_FIRST(&header->h2_field_list);
}
else
{
*h2_filed = TAILQ_NEXT(*h2_filed, next);
}
if (*h2_filed == NULL) return NULL;
/* Reference of inner data, user should copy it */
field->field_id = (*h2_filed)->field->field_id;
field->field_name = (*h2_filed)->field->field_name;
return (const char *)(*h2_filed)->nv.value;
}
static int
h2_half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag)
{
int xret = -1;
struct tfe_h2_half_private * resp = nghttp2_to_half_private(half);
struct tfe_h2_payload *body = &resp->h2_payload;
if (buff == NULL || size == 0){
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, NULL, 0, resp->h2_payload.evbuf_body, body->gzip, 1);
}
resp->message_state = H2_READ_STATE_COMPLETE;
goto finish;
}
if (resp->h2_payload.evbuf_body == NULL){
resp->h2_payload.evbuf_body = evbuffer_new();
}
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)buff, size,
resp->h2_payload.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->h2_payload.evbuf_body, buff, size);
}
finish:
return xret;
}
static void delete_http_req_spec(struct tfe_http_req_spec *req_spec)
{
if (req_spec->uri)
free((char *)req_spec->uri);
if (req_spec->host)
free((char *)req_spec->host);
if (req_spec->url)
free((char *)req_spec->url);
}
static void delete_http_resp_spec(struct tfe_http_resp_spec *resp_spec)
{
if (resp_spec->content_encoding)
free((char *)resp_spec->content_encoding);
if (resp_spec->content_type)
free((char *)resp_spec->content_type);
if (resp_spec->content_length)
free((char *)resp_spec->content_length);
}
void delete_stream_half_data(struct tfe_h2_half_private **data,
int body_flag, enum tfe_conn_dir dir)
{
if (*data){
struct tfe_h2_payload *body = &((*data)->h2_payload);
inflate_finished(&body->inflate);
deflate_finished(&body->deflate);
if (body->evbuf_body && body_flag){
evbuffer_free(body->evbuf_body);
body->evbuf_body = NULL;
}
if ((*data)->url_storage)
FREE(&((*data)->url_storage));
delete_nv_packet_data(&((*data)->header));
if (dir == CONN_DIR_DOWNSTREAM)
{
struct tfe_http_req_spec *req_spec = &((*data)->half_public.req_spec);
delete_http_req_spec(req_spec);
}
if (dir == CONN_DIR_UPSTREAM)
{
struct tfe_http_resp_spec *resp_spec = &((*data)->half_public.resp_spec);
delete_http_resp_spec(resp_spec);
}
if((*data)->event_cb_user_deleter != NULL)
(*data)->event_cb_user_deleter((*data)->event_cb_user);
free(*data);
*data = NULL;
}
return;
}
void h2_half_ops_free(struct tfe_http_half * half)
{
struct tfe_h2_half_private * h2_private = nghttp2_to_half_private(half);
delete_stream_half_data(&h2_private, 1, CONN_DIR_DOWNSTREAM);
free(h2_private);
h2_private = NULL;
return;
}
int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct tfe_h2_half_private * resp = nghttp2_to_half_private(half);
struct tfe_h2_payload *body = &resp->h2_payload;
assert(body->evbuf_body == NULL);
if (by_stream)
{
if (body->inflate){
inflate_finished(&body->inflate);
}
if (body->deflate){
deflate_finished(&body->deflate);
}
body->gzip = HTTP2_CONTENT_ENCODING_NONE;
resp->message_state = H2_READ_STATE_READING;
resp->by_stream = by_stream;
}
body->evbuf_body = evbuffer_new();
return 0;
}
uint32_t tfe_h2_half_get_ngh2_stream_id(struct tfe_h2_half_private *h2_half)
{
struct tfe_h2_session* father_session=h2_half->father_session;
return father_session->ngh2_stream_id;
}
int h2_half_ops_body_data(struct tfe_http_half * h2_response, const unsigned char * data, size_t sz_data)
{
int xret = -1;
struct tfe_h2_half_private * h2_resp_priv = nghttp2_to_half_private(h2_response);
struct tfe_h2_payload *body = &h2_resp_priv->h2_payload;
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data,
h2_resp_priv->h2_payload.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(h2_resp_priv->h2_payload.evbuf_body, data, sz_data);
}
return xret;
}
int h2_half_ops_body_end(struct tfe_http_half * half)
{
struct tfe_h2_half_private * resp = nghttp2_to_half_private(half);
resp->body_state = H2_READ_STATE_COMPLETE;
resp->message_state = H2_READ_STATE_COMPLETE;
return 0;
}
struct tfe_http_half_ops h2_half_ops =
{
.ops_http_field_read = h2_half_ops_field_read,
.ops_http_field_write = h2_half_ops_field_write,
.ops_http_allow_write = h2_half_ops_allow_write,
.ops_http_field_iterate = h2_half_ops_field_iterate,
.ops_append_body = h2_half_ops_append_body,
.ops_body_begin = h2_half_ops_body_begin,
.ops_body_data = h2_half_ops_body_data,
.ops_body_end = h2_half_ops_body_end,
.ops_free = h2_half_ops_free
};
static struct tfe_http_session*
h2_ops_allow_write(const struct tfe_http_session * session)
{
struct tfe_h2_session *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if ( http_frame_currect_plugin_preempt(stream_data->frame_ctx) == 0){
return (struct tfe_http_session *)session;
}
return NULL;
}
void h2_ops_detach(const struct tfe_http_session * session)
{
struct tfe_h2_session *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
return http_frame_currect_plugin_detach(stream_data->frame_ctx);
}
void h2_ops_drop(struct tfe_http_session * session)
{
return;
}
void h2_ops_suspend(struct tfe_http_session * session)
{
#ifdef TFE_CACHE
struct tfe_h2_session *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
stream_data->cache.spd_set = 1;
#endif
}
void h2_ops_resume(struct tfe_http_session * session)
{
#ifdef TFE_CACHE
struct tfe_h2_session *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if (stream_data->cache.spd_valid){
tfe_stream_resume(stream_data->tf_stream);
stream_data->cache.rse_set = 1;
}
#endif
}
void h2_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user)
{
struct tfe_h2_session *stream_data = nghttp2_to_stream_data(session);
struct tfe_h2_half_private *half_user = nghttp2_to_half_private(req_user);
half_user->header.flag = stream_data->req->header.flag;
stream_data->plugin_built_req = half_user;
}
void h2_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
{
struct tfe_h2_session *stream_data = nghttp2_to_stream_data(session);
struct tfe_h2_half_private *half_user = nghttp2_to_half_private(resp);
stream_data->plugin_built_resp = half_user;
}
static struct tfe_h2_half_private*
tfe_half_private_init(enum tfe_http_direction direction, int32_t stream_id,
nghttp2_session *session)
{
struct tfe_h2_half_private *half_private = ALLOC(struct tfe_h2_half_private, 1);
assert(half_private);
half_private->half_public.direction = direction;
half_private->half_public.ops = &h2_half_ops;
headers_init(&half_private->header);
headers_init(&half_private->promised);
half_private->h2_payload.inflate = NULL;
half_private->h2_payload.deflate = NULL;
half_private->h2_payload.evbuf_body = evbuffer_new();
half_private->h2_payload.gzip = HTTP2_CONTENT_ENCODING_NONE;
half_private->h2_payload.padlen = 0;
half_private->stream_id = stream_id;
half_private->session = session;
half_private->body_state = H2_READ_STATE_BEGIN;
half_private->message_state = H2_READ_STATE_BEGIN;
return half_private;
}
struct tfe_http_half * h2_ops_request_create(struct tfe_http_session * session,
enum tfe_http_std_method method, const char * uri)
{
struct tfe_h2_half_private * req = tfe_half_private_init(TFE_HTTP_REQUEST, 0, NULL);
req->method_or_status = method;
req->url_storage = tfe_strdup(uri);
return &req->half_public;
}
struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session, int resp_code)
{
struct tfe_h2_session *stream = nghttp2_to_stream_data(session);
struct tfe_h2_half_private * resp = tfe_half_private_init(TFE_HTTP_RESPONSE, stream->ngh2_stream_id,
stream->session);
resp->method_or_status = resp_code;
if (stream->resp)
resp->h2_payload.gzip = stream->resp->h2_payload.gzip;
return &resp->half_public;
}
void h2_ops_kill(struct tfe_http_session * session)
{
struct tfe_h2_session *h2_session = nghttp2_to_stream_data(session);
struct tfe_h2_stream *h2_stream = h2_session->father_stream;
h2_stream->kill_signal = 1;
}
struct tfe_http_session_ops nghttp2_session_ops =
{
.ops_allow_write = h2_ops_allow_write,
.ops_detach = h2_ops_detach,
.ops_drop = h2_ops_drop,
.ops_suspend = h2_ops_suspend,
.ops_resume = h2_ops_resume,
.ops_kill = h2_ops_kill,
.ops_request_set = h2_ops_request_set,
.ops_response_set = h2_ops_response_set,
.ops_request_create = h2_ops_request_create,
.ops_response_create = h2_ops_response_create
};
static ssize_t
no_data_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
nghttp2_data_source *source,
void *user_data)
{
(void)session;
(void)stream_id;
(void)buf;
(void)length;
(void)source;
(void)user_data;
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
}
static ssize_t
upstream_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
nghttp2_data_source *source,
void *user_data)
{
unsigned char *input = NULL;
ssize_t datalen = 0, inputlen=0;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
struct tfe_h2_session *h2_session = TAILQ_LIST_FIND(h2_stream_info, stream_id);
if (h2_session == NULL)
{
TFE_LOG_ERROR(logger()->handle, "Failed to send data, stream_id %d", stream_id);
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
}
struct tfe_h2_payload *to_send_body = (struct tfe_h2_payload *)source->ptr;
if (!to_send_body->evbuf_body || 0==(inputlen = evbuffer_get_length(to_send_body->evbuf_body))
||!(input = evbuffer_pullup(to_send_body->evbuf_body, MIN(length, inputlen))))
{
if ((to_send_body->flags & NGHTTP2_FLAG_END_STREAM) == 0)
{
*data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
}
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return 0;
}
datalen=MIN(length, inputlen);
memcpy(buf, input, datalen);
evbuffer_drain(to_send_body->evbuf_body, datalen);
return datalen;
}
static enum tfe_stream_action
nghttp2_frame_submit_built_resp(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int rv = -1;
struct tfe_h2_header *h2_header = NULL;
struct tfe_h2_half_private *pangu_resp = NULL, *resp = NULL;
resp = h2_session->resp;
pangu_resp = h2_session->plugin_built_resp;
if (pangu_resp->message_state != H2_READ_STATE_COMPLETE && (evbuffer_get_length(resp->h2_payload.evbuf_body) > 0))
{
return (enum tfe_stream_action)ACTION_USER_DATA;
}
h2_header = &pangu_resp->header;
if (h2_header->nvlen <= 0)
return ACTION_FORWARD_DATA;
struct tfe_h2_payload *body = &pangu_resp->h2_payload;
body->flags |= NGHTTP2_FLAG_END_STREAM;
char str_sz_evbuf_body[TFE_STRING_MAX];
snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body));
const static struct http_field_name cont_field = {TFE_HTTP_CONT_LENGTH, NULL};
tfe_http_field_write(&pangu_resp->half_public, &cont_field, NULL);
tfe_http_field_write(&pangu_resp->half_public, &cont_field, str_sz_evbuf_body);
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE)
{
const static struct http_field_name encoding_field = {TFE_HTTP_CONT_ENCODING, NULL};
const char *content_encoding = method_idx_to_str(body->gzip);
tfe_http_field_write(&pangu_resp->half_public, &encoding_field, NULL);
tfe_http_field_write(&pangu_resp->half_public, &encoding_field, content_encoding);
}
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void *)body;
data_prd.read_callback = upstream_read_callback;
nghttp2_nv hdrs[h2_header->nvlen];
/*Adapt Http uri Settings**/
rv = nghttp2_submit_response(h2_stream_info->as_server, h2_session->ngh2_stream_id, tfe_h2_header_convert_nv(h2_header, hdrs),
h2_header->nvlen, &data_prd);
if (rv != 0){
return ACTION_FORWARD_DATA;
}
delete_nv_packet_data(h2_header);
return ACTION_DROP_DATA;
}
static enum tfe_stream_action
nghttp2_frame_submit_built_req(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int32_t stream_id = -1;
struct tfe_h2_header *h2_header = NULL;
struct tfe_h2_half_private *plugin_built_req = h2_session->plugin_built_req;
if (plugin_built_req->message_state != H2_READ_STATE_COMPLETE){
return (enum tfe_stream_action)ACTION_USER_DATA;
}
h2_header = &plugin_built_req->header;
if (h2_header->nvlen <= 0)
return ACTION_FORWARD_DATA;
struct tfe_h2_payload *body = &plugin_built_req->h2_payload;
body->flags = NGHTTP2_FLAG_END_STREAM;
char str_sz_evbuf_body[TFE_STRING_MAX];
snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body));
const static struct http_field_name encoding_field = {TFE_HTTP_CONT_LENGTH, NULL};
tfe_http_field_write(&plugin_built_req->half_public, &encoding_field, NULL);
tfe_http_field_write(&plugin_built_req->half_public, &encoding_field, str_sz_evbuf_body);
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void *)body;
data_prd.read_callback = upstream_read_callback;
nghttp2_nv hdrs[h2_header->nvlen];
/*Adapt Http uri Settings**/
nghttp2_session_set_next_stream_id(h2_stream_info->as_client, h2_session->ngh2_stream_id);
stream_id = nghttp2_submit_request(h2_stream_info->as_client, NULL,
tfe_h2_header_modify_field(h2_header, hdrs, ":path", plugin_built_req->url_storage),
h2_header->nvlen, &data_prd, h2_session);
if (stream_id < 0){
TFE_LOG_ERROR(logger()->handle, "Could not submit request: %s",
nghttp2_strerror(stream_id));
return ACTION_FORWARD_DATA;
}
delete_nv_packet_data(h2_header);
return ACTION_DROP_DATA;
}
static void
nghttp2_submit_end_data_by_h2_half(struct tfe_h2_stream *h2_stream_info, int32_t stream_id, enum tfe_conn_dir dir)
{
int xret = -1;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(h2_stream_info, dir);
nghttp2_data_provider data_provider;
data_provider.read_callback = no_data_read_callback;
xret = nghttp2_submit_data(ngh2_session, NGHTTP2_FLAG_END_STREAM,
stream_id, &data_provider);
if (xret != 0){
return;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n",nghttp2_strerror(xret));
}
return;
}
static enum tfe_stream_action
nghttp2_submit_data_by_user(struct tfe_h2_stream *connection,
struct tfe_h2_session *h2_session,
enum tfe_conn_dir dir)
{
int rv = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
struct tfe_h2_half_private *h2_half = tfe_h2_stream_get_half(h2_session, dir);
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
struct tfe_h2_payload *body = &h2_half->h2_payload;
nghttp2_data_provider upstream_data_provider;
upstream_data_provider.source.ptr = (void *)body;
upstream_data_provider.read_callback = upstream_read_callback;
rv = nghttp2_submit_data(ngh2_session, body->flags,
h2_session->ngh2_stream_id, &upstream_data_provider);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
//printf("Fatal server submit data error: %s\n", nghttp2_strerror(rv));
}
return stream_action;
}
static enum tfe_stream_action
nghttp2_submit_data_by_h2_half(struct tfe_h2_stream *connection,
struct tfe_h2_session *h2_session,
enum tfe_conn_dir dir)
{
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
struct tfe_h2_half_private *h2_half = tfe_h2_stream_get_half(h2_session, dir);
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
if (h2_session->plugin_built_resp)
{
stream_action = nghttp2_frame_submit_built_resp(connection, h2_session);
}
else if (h2_session->plugin_built_req)
{
stream_action = nghttp2_frame_submit_built_req(connection, h2_session);
}
else
{
int rv = -1;
struct tfe_h2_payload *body = &h2_half->h2_payload;
nghttp2_data_provider upstream_data_provider;
upstream_data_provider.source.ptr = (void *)body;
upstream_data_provider.read_callback = upstream_read_callback;
rv = nghttp2_submit_data(ngh2_session, body->flags,
h2_session->ngh2_stream_id, &upstream_data_provider);
if (rv != 0)
{
stream_action = ACTION_FORWARD_DATA;
//printf("Fatal server submit data error: %s\n", nghttp2_strerror(rv));
}
}
return stream_action;
}
typedef int (*nghttp2_frame_callback) (struct tfe_h2_stream *, const nghttp2_frame *,
enum tfe_conn_dir dir);
typedef int (*nghttp2_callback) (nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir);
static int
nghttp2_submit_frame_priority(struct tfe_h2_stream *connection,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_priority *priority = &frame->priority;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
int rv = nghttp2_submit_priority(ngh2_session, priority->hd.flags, priority->hd.stream_id,
&(priority->pri_spec));
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit priority error: %s\n",
dir, nghttp2_strerror(rv));
return 0;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
ngh2_session->last_sent_stream_id = MAX(ngh2_session->last_sent_stream_id, frame->hd.stream_id);
connection->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_rst_stream(struct tfe_h2_stream *connection,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_rst_stream *rst_stream = &frame->rst_stream;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
int rv = nghttp2_submit_rst_stream(ngh2_session, rst_stream->hd.flags,
rst_stream->hd.stream_id, rst_stream->error_code);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit rst error: %s\n",
dir, nghttp2_strerror(rv));
return 0;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
connection->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_settings(struct tfe_h2_stream *connection,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1, rv = -1;
nghttp2_settings_entry iv[6] = {0};
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
nghttp2_settings settings = frame->settings;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
nghttp2_session *ngh2_peer_session = tfe_h2_stream_get_nghttp2_peer_session(connection, dir);
if(settings.hd.flags == NGHTTP2_FLAG_ACK){
xret = nghttp2_session_send(ngh2_peer_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s, %d\n",nghttp2_strerror(xret), __LINE__);
}
stream_action = ACTION_DROP_DATA;
return 0;
}
rv = nghttp2_submit_settings(ngh2_session, settings.hd.flags,
nghttp2_iv_packet(settings, iv), settings.niv);
if (rv != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit settings error: %s\n",
dir, nghttp2_strerror(rv));
return 0;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
connection->stream_action = stream_action;
#ifdef TFE_LOG_HTTP2
TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit setting, stream_id:%d, action:%d", connection->tf_stream->str_stream_info,
dir, frame->hd.stream_id, connection->stream_action);
#endif
return 0;
}
static int
nghttp2_submit_frame_ping(struct tfe_h2_stream *connection,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1 ,rv = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_ping *ping = &frame->ping;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
rv = nghttp2_submit_ping(ngh2_session, ping->hd.flags, ping->opaque_data);
if (rv != 0)
{
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit ping error: %s\n", dir, nghttp2_strerror(rv));
return 0;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n", dir, nghttp2_strerror(xret));
}
connection->stream_action = stream_action;
return 0;
}
void
nghttp2_write_access_log(struct tfe_h2_session *h2_session, const char * str_stream_info)
{
/* Request */
struct tfe_h2_half_private *req = h2_session->req;
/* Response */
struct tfe_h2_half_private *resp = h2_session->resp;
/* Req-Public */
struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL;
/* Resp-Public */
struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL;
const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-";
const char * url = req_spec ? req_spec->url : "-";
char resp_code[TFE_STRING_MAX];
if (resp_spec)
snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code);
else
snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-");
const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
const char * cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
const char * pangu_req = h2_session->plugin_built_req ? "USER/REQ" : "-";
const char * pangu_resp = h2_session->plugin_built_resp ? "USER/RESP" : "-";
//const char * __str_suspend = h2_session->suspend_counter > 0 ? "SUSPEND" : "-";
char *access_log;
asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_session->tfe_session.session_id,
method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp);
TFE_LOG_INFO(logger()->handle, "%s", access_log);
free(access_log);
}
void delete_http2_stream_data(struct tfe_h2_session *h2_session,
const struct tfe_stream *tf_stream,
int body_flag)
{
delete_stream_half_data(&h2_session->req, body_flag, CONN_DIR_DOWNSTREAM);
delete_stream_half_data(&h2_session->resp, body_flag, CONN_DIR_UPSTREAM);
}
void nghttp2_disect_goaway(struct tfe_h2_stream *h2_stream_info)
{
unsigned int thread_id = h2_stream_info->thread_id;
const struct tfe_stream * stream = h2_stream_info->tf_stream;
struct tfe_h2_session *h2_session = NULL;
struct tfe_h2_session *peer_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_session, &h2_stream_info->h2_session_list, next, peer_h2_stream){
TAILQ_REMOVE(&h2_stream_info->h2_session_list, h2_session, next);
if (h2_session->frame_ctx){
http_frame_raise_session_end(h2_session->frame_ctx, stream, &h2_session->tfe_session,
thread_id);
h2_session->frame_ctx = NULL;
}
delete_http2_stream_data(h2_session, h2_stream_info->tf_stream, 1);
free(h2_session);
h2_session = NULL;
}
if (h2_stream_info->as_client){
nghttp2_session_del(h2_stream_info->as_client);
h2_stream_info->as_client = NULL;
}
if (h2_stream_info->as_server){
nghttp2_session_del(h2_stream_info->as_server);
h2_stream_info->as_server = NULL;
}
}
static int
nghttp2_submit_frame_goaway(struct tfe_h2_stream *connection, const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
char *error = NULL; size_t eroro_len=0;
const nghttp2_goaway *goaway = &frame->goaway;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
int rv = nghttp2_submit_goaway(ngh2_session, goaway->hd.flags, goaway->last_stream_id,
goaway->error_code, goaway->opaque_data, goaway->opaque_data_len);
if (rv != 0){
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit goaway error: %s\n",
dir, nghttp2_strerror(rv));
goto finish;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
finish:
eroro_len = goaway->opaque_data_len;
error = ALLOC(char, eroro_len + 1);
snprintf(error, eroro_len, "%s", goaway->opaque_data);
TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit goaway, stream_id:%d, action:%d, errod_code:%d, data:%s", connection->tf_stream->str_stream_info,
dir, goaway->last_stream_id, connection->stream_action, goaway->error_code, goaway->opaque_data);
FREE(&error);
connection->goaway = 1;
connection->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_window_update(struct tfe_h2_stream *connection,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
const nghttp2_window_update *window_update = &(frame->window_update);
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(connection, dir);
int rv = nghttp2_submit_window_update(ngh2_session, window_update->hd.flags,window_update->hd.stream_id,
window_update->window_size_increment);
if (rv != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Submit window error: %s\n",
dir, nghttp2_strerror(rv));
return 0;
}
xret = nghttp2_session_send(ngh2_session);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
connection->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_header_by_not_modify(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int xret = -1;
int32_t stream_id = 0;
struct tfe_h2_header headers;
struct tfe_h2_half_private *resp = h2_session->resp;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
headers = resp->header;
if (headers.nvlen <= 0 || ((headers.flag & NGHTTP2_FLAG_END_STREAM)!=1) ){
return 0;
}
nghttp2_nv hdrs[headers.nvlen];
stream_id = nghttp2_submit_headers(h2_stream_info->as_server, headers.flag,
h2_session->ngh2_stream_id, NULL, tfe_h2_header_convert_nv(&headers, hdrs),
headers.nvlen, h2_session);
if (stream_id < 0){
printf("Fatal headers error: %s\n", nghttp2_strerror(stream_id));
stream_action = ACTION_FORWARD_DATA;
}
delete_nv_packet_data(&headers);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(h2_stream_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s, %d\n",nghttp2_strerror(xret), __LINE__);
}
}
h2_stream_info->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_complete_data(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session, enum tfe_conn_dir dir)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
struct tfe_h2_half_private *h2_half = tfe_h2_stream_get_half(h2_session, dir);
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_session(h2_stream_info, dir);
enum tfe_http_event http_body_event = (dir==CONN_DIR_UPSTREAM?EV_HTTP_RESP_BODY_END: EV_HTTP_REQ_BODY_END);
enum tfe_http_event http_event = (dir==CONN_DIR_UPSTREAM?EV_HTTP_RESP_END: EV_HTTP_REQ_END);
if (h2_half->body_state != H2_READ_STATE_BEGIN)
{
if (h2_half->event_cb)
{
h2_half->event_cb(h2_half, http_body_event, NULL, 0,
h2_half->event_cb_user);
}
if (h2_half->event_cb)
{
h2_half->event_cb(h2_half, http_event, NULL, 0,
h2_half->event_cb_user);
}
}
struct tfe_h2_payload *payload = &h2_half->h2_payload;
payload->flags |= NGHTTP2_FLAG_END_STREAM;
h2_half->body_state = H2_READ_STATE_COMPLETE;
h2_half->message_state = H2_READ_STATE_COMPLETE;
stream_action = nghttp2_submit_data_by_h2_half(h2_stream_info, h2_session, dir);
if (stream_action == ACTION_DROP_DATA)
{
xret = nghttp2_session_send(ngh2_session);
if (xret != 0)
{
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s %d\n",nghttp2_strerror(xret), __LINE__);
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
h2_stream_info->stream_action = stream_action;
return 1;
}
static int
nghttp2_submit_frame_data(struct tfe_h2_stream *h2_stream_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
struct tfe_h2_session *h2_session = NULL;
nghttp2_session *ngh2_session = tfe_h2_stream_get_nghttp2_peer_session(h2_stream_info, dir);
h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(ngh2_session, frame->hd.stream_id);
if (h2_session == NULL)
{
nghttp2_submit_end_data_by_h2_half(h2_stream_info, frame->hd.stream_id, dir);
h2_stream_info->stream_action = ACTION_DROP_DATA;
return 0;
}
struct tfe_h2_half_private *h2_half = tfe_h2_stream_get_half(h2_session, dir);
/*HEAD STREMA_END + DATA(9)**/
if (h2_half == NULL)
{
nghttp2_submit_end_data_by_h2_half(h2_stream_info, frame->hd.stream_id, dir);
h2_stream_info->stream_action = ACTION_DROP_DATA;
return 0;
}
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
{
if (dir == CONN_DIR_UPSTREAM)
h2_half->h2_payload.padlen = frame->data.padlen;
if (h2_half->body_state != H2_READ_STATE_COMPLETE){
nghttp2_submit_complete_data(h2_stream_info, h2_session, dir);
}
}
return 0;
}
static int tfe_half_session_init(struct tfe_h2_session *h2_session, int32_t stream_id,
enum tfe_http_direction direction)
{
struct tfe_http_session *tfe_session = &h2_session->tfe_session;
tfe_session->major_version = 2;
if (direction == TFE_HTTP_REQUEST){
struct tfe_h2_half_private *req = h2_session->req;
tfe_session->ops = &nghttp2_session_ops;
tfe_session->req = &req->half_public;
tfe_session->session_id = stream_id;
}
if (direction == TFE_HTTP_RESPONSE){
struct tfe_h2_half_private *resp = h2_session->resp;
tfe_session->resp = &resp->half_public;
tfe_session->session_id = stream_id;
}
return 0;
}
static void
upstream_create_req(struct tfe_h2_stream *h2_stream_info, nghttp2_session *as_server,
struct tfe_h2_session *h2_session, int32_t stream_id)
{
struct user_event_dispatch *event = NULL;
struct tfe_h2_half_private *half_private = NULL;
h2_session->ngh2_stream_id = stream_id;
h2_session->session = as_server;
h2_session->req = tfe_half_private_init(TFE_HTTP_REQUEST, 0, NULL);
tfe_half_session_init(h2_session, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = h2_stream_info->thread_id;
event->tf_stream = h2_stream_info->tf_stream;
event->tfe_session = &h2_session->tfe_session;
half_set_callback(h2_session->req, event, free);
/* Call business plugin */
half_private = h2_session->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_STREAM_LOG_ERROR(h2_session, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, h2_stream_info->tf_stream,
&h2_session->tfe_session, h2_stream_info->thread_id);
h2_session->frame_ctx = half_private->frame_ctx;
h2_session->father_stream = h2_stream_info;
TAILQ_INSERT_TAIL(&h2_stream_info->h2_session_list, h2_session, next);
nghttp2_session_set_stream_user_data(as_server, stream_id, h2_session);
finish:
return;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_push_promise(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int32_t stream_id = -1;
struct tfe_h2_header *headers = NULL;
struct tfe_h2_session *peer_h2_stream = NULL;
struct tfe_h2_half_private *resp = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
resp = h2_session->resp;
if (resp == NULL)
return stream_action;
headers = &resp->promised;
if (headers->nvlen <= 0)
return stream_action;
/* Create s' half req*/
peer_h2_stream = (struct tfe_h2_session *)ALLOC(struct tfe_h2_session, 1);
assert(peer_h2_stream);
nghttp2_nv hdrs[headers->nvlen];
stream_id = nghttp2_submit_push_promise(h2_stream_info->as_server, headers->flag,
h2_session->ngh2_stream_id, tfe_h2_header_convert_nv(headers, hdrs),
headers->nvlen, peer_h2_stream);
if (stream_id < 0){
free(peer_h2_stream);
peer_h2_stream = NULL;
TFE_STREAM_LOG_ERROR(h2_session, "Failed to submit push promise: %s", nghttp2_strerror(stream_id));
goto finish;
}
upstream_create_req(h2_stream_info, h2_stream_info->as_server, peer_h2_stream, stream_id);
/*clean header message **/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static int
nghttp2_submit_frame_push_promise(struct tfe_h2_stream *h2_stream_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = -1;
struct tfe_h2_session *h2_session = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
if (dir == CONN_DIR_DOWNSTREAM)
goto finish;
h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(h2_stream_info->as_client,
frame->hd.stream_id);
if (!h2_session){
TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, h2_stream_info);
goto finish;
}
stream_action = nghttp2_server_frame_submit_push_promise(h2_stream_info, h2_session);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(h2_stream_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "dir(%d), Fatal send error: %s\n",
dir, nghttp2_strerror(xret));
}
}
h2_stream_info->stream_action = stream_action;
finish:
#ifdef TFE_LOG_HTTP2
TFE_LOG_DEBUG(logger()->handle, "%s, %d, submit push promise, stream_id:%d, action:%d", h2_stream_info->tf_stream->str_stream_info,
dir, frame->hd.stream_id, h2_stream_info->stream_action);
#endif
return 0;
}
#ifdef TFE_CACHE
static int
suspend_start(struct tfe_h2_session *h2_session,
struct tfe_h2_half_private *half, const struct tfe_stream *stream)
{
if (h2_session->cache.spd_valid != 1){
return 0;
}
tfe_stream_resume(stream);
enum tfe_http_event spd_event = h2_session->cache.spd_event;
h2_session->cache.spd_event = (enum tfe_http_event)0;
h2_session->cache.spd_valid = 0;
h2_session->cache.spd_set_cnt--;
h2_session->cache.spd_cnt++;
/* Call user callback, tell user we resume from suspend */
h2_session->cache.rse_set = 0;
half->event_cb(half, spd_event, NULL, 0, half->event_cb_user);
return 1;
}
#endif
static void
fill_resp_spec_from_handle(struct tfe_h2_half_private *half_private)
{
struct tfe_h2_field *h2_field = NULL, *peer_h2_field = NULL;
struct tfe_h2_header *header = &half_private->header;
struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec);
TAILQ_FOREACH_SAFE(h2_field, &header->h2_field_list, next, peer_h2_field){
if (!strncmp((char *)(h2_field->nv.name), ":status", strlen(":status"))){
resp_spec->resp_code = atoi((const char *)h2_field->nv.value);
continue;
}
if (!strncmp((char *)(h2_field->nv.name), "content-type", strlen("content-type"))){
resp_spec->content_type = tfe_strdup((const char *)(h2_field->nv.value));;
continue;
}
if (!strncmp((char *)(h2_field->nv.name), "content-encoding", strlen("content-encoding"))){
resp_spec->content_encoding = tfe_strdup((const char *)(h2_field->nv.value));;
continue;
}
if (!strncmp((char *)(h2_field->nv.name), "content-length", strlen("content-length"))){
resp_spec->content_length = tfe_strdup((const char *)(h2_field->nv.value));;
continue;
}
}
return;
}
int
nghttp2_write_log(struct tfe_h2_session *h2_session, const char * str_stream_info,
int dir)
{
/* Request */
struct tfe_h2_half_private *req = h2_session->req;
/* Response */
struct tfe_h2_half_private *resp = h2_session->resp;
/* Req-Public */
struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL;
/* Resp-Public */
struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL;
const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-";
const char * url = req_spec ? req_spec->url : "-";
char resp_code[TFE_STRING_MAX];
if (resp_spec)
snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code);
else
snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-");
const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
const char * cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
const char *hmsg = (dir == CONN_DIR_UPSTREAM) ? "response" : "request";
const char * panggu_req = h2_session->plugin_built_req ? "USER/REQ" : "-";
const char * panggu_resp = h2_session->plugin_built_resp ? "USER/RESP" : "-";
/* SUSPEND */
const char * suspend = h2_session->cache.spd_cnt > 0 ? "SUSPEND" : "-";
char *access_log;
asprintf(&access_log, "%s %d %s stream_id:%d %s %s HTTP2.0 %s %s %s %s %s %s", str_stream_info, dir, hmsg, h2_session->tfe_session.session_id,
method, url, resp_code, cont_type, cont_encoding, panggu_req, panggu_resp, suspend);
TFE_LOG_INFO(logger()->handle, "%s", access_log);
free(access_log);
return 0;
}
static enum tfe_stream_action
nghttp2_submit_built_response(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int xret = -1;
char value[128] = {0};
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct tfe_h2_half_private *resp = h2_session->plugin_built_resp;
struct http_field_name field;
field.field_id = TFE_HTTP_UNKNOWN_FIELD;
field.field_name = "X-TG-Construct-By";
snprintf(value, sizeof(value), "tfe/%s", tfe_version());
tfe_h2_header_add_field(&resp->header, &field, (const char *)value, 0);
field.field_id = TFE_HTTP_UNKNOWN_FIELD;
field.field_name = ":status";
snprintf(value, sizeof(value), "%d", resp->method_or_status);
tfe_h2_header_add_field(&resp->header, &field, (const char *)value, 0);
stream_action = nghttp2_frame_submit_built_resp(h2_stream_info, h2_session);
if (stream_action == ACTION_DROP_DATA)
{
xret = nghttp2_session_send(h2_stream_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_DROP_DATA)
{
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
}
return stream_action;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_header(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int32_t xret = 0;
struct tfe_h2_header *headers = NULL;
struct tfe_h2_half_private *resp = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
if (h2_session->plugin_built_resp != NULL){
struct tfe_h2_payload *h2_payload = &(h2_session->plugin_built_resp->h2_payload);
if (h2_payload->evbuf_body != NULL && (evbuffer_get_length(h2_payload->evbuf_body) > 0))
{
stream_action = nghttp2_submit_built_response(h2_stream_info, h2_session);
}
else{
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
}
return stream_action;
}
resp = h2_session->resp;
if (resp == NULL){
return ACTION_FORWARD_DATA;
}
headers = &resp->header;
if (headers->nvlen <= 0){
return ACTION_FORWARD_DATA;
}
nghttp2_nv hdrs[headers->nvlen];
xret = nghttp2_submit_headers(h2_stream_info->as_server, headers->flag,
h2_session->ngh2_stream_id, NULL, tfe_h2_header_convert_nv(headers, hdrs),
headers->nvlen, h2_session);
if (xret < 0){
printf("Fatal headers error: %s\n", nghttp2_strerror(xret));
}
delete_nv_packet_data(headers);
return stream_action;
}
static int
nghttp2_server_submit_header(struct tfe_h2_stream *h2_stream_info, int32_t stream_id)
{
int xret = -1;
struct tfe_h2_half_private *resp = NULL;
struct tfe_h2_session *h2_session = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(h2_stream_info->as_client,
stream_id);
if (!h2_session)
{
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Upstream id %d, can't find stream information(addr = %p)",
stream_id, h2_stream_info);
goto finish;
}
assert(h2_session->ngh2_stream_id == stream_id);
resp = h2_session->resp;
fill_resp_spec_from_handle(h2_session->resp);
resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user);
if (h2_session->cache.spd_set){
h2_session->cache.spd_event = EV_HTTP_RESP_HDR;
h2_session->cache.spd_valid = 1;
h2_session->cache.spd_set = 0;
h2_session->cache.spd_set_cnt++;
h2_session->cache.spd_cnt++;
tfe_stream_suspend(h2_stream_info->tf_stream, CONN_DIR_UPSTREAM);
stream_action = ACTION_DEFER_DATA;
goto finish;
}
nghttp2_write_log(h2_session,h2_stream_info->tf_stream->str_stream_info, CONN_DIR_UPSTREAM);
stream_action = nghttp2_server_frame_submit_header(h2_stream_info, h2_session);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(h2_stream_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s\n", nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
finish:
h2_stream_info->stream_action = stream_action;
return 0;
}
static void fill_req_spec_from_handle(struct tfe_h2_half_private *half_private)
{
int urllen = 0;
struct tfe_h2_field *h2_field = NULL, *peer_h2_field = NULL;
struct tfe_h2_header *header = &half_private->header;
struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec);
TAILQ_FOREACH_SAFE(h2_field, &header->h2_field_list, next, peer_h2_field){
if (!strncmp((char *)(h2_field->nv.name), ":method", strlen(":method"))){
req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(h2_field->nv.value), method_vals);
continue;
}
if (!strncmp((char *)(h2_field->nv.name), ":authority", strlen(":authority"))){
req_spec->host = tfe_strdup((const char *)(h2_field->nv.value));
urllen += h2_field->nv.valuelen;
continue;
}
if (!strncmp((char *)(h2_field->nv.name), ":path", strlen(":path"))){
req_spec->uri = tfe_strdup((const char*)(h2_field->nv.value));
urllen += h2_field->nv.valuelen;
continue;
}
}
char *urltmp = ALLOC(char, urllen + 1);
if(urltmp){
sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri);
req_spec->url = urltmp;
}
return;
}
#ifdef TFE_CACHE
static int
suspend_stop(struct tfe_h2_session *h2_session,
const struct tfe_stream *tf_stream, enum tfe_conn_dir dir)
{
int xret = -1;
if (h2_session->cache.spd_set){
h2_session->cache.spd_valid = 1;
h2_session->cache.spd_set = 0;
tfe_stream_suspend(tf_stream, dir);
xret = 0;
}
return xret;
}
#endif
static void
downstream_create_resp(struct tfe_h2_session *h2_session, nghttp2_session *as_client,
nghttp2_session *as_server, const struct tfe_stream *tf_stream, unsigned int thread_id)
{
struct user_event_dispatch *event = NULL;
if (h2_session->resp)
goto finish;
h2_session->resp = tfe_half_private_init(TFE_HTTP_RESPONSE, h2_session->ngh2_stream_id, as_server);
tfe_half_session_init(h2_session, h2_session->ngh2_stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = thread_id;
event->tf_stream = tf_stream;
event->tfe_session = &h2_session->tfe_session;
half_set_callback(h2_session->resp, event, free);
h2_session->resp->frame_ctx = h2_session->frame_ctx;
nghttp2_session_set_stream_user_data(as_client, h2_session->ngh2_stream_id, h2_session);
finish:
return;
}
static enum tfe_stream_action
nghttp2_client_frame_submit_header(struct tfe_h2_stream *h2_stream_info,
struct tfe_h2_session *h2_session)
{
int32_t stream_id = -1;
struct tfe_h2_header *headers = NULL;
struct tfe_h2_half_private *req = NULL;
enum tfe_http_std_method method = (enum tfe_http_std_method)NGHTTP2_METHOD_UNKNOWN;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
req = h2_session->plugin_built_req != NULL ? h2_session->plugin_built_req : h2_session->req;
if (req == NULL){
return ACTION_FORWARD_DATA;
}
/*Create C' half_private_resp**/
downstream_create_resp(h2_session, h2_stream_info->as_client, h2_stream_info->as_server,
h2_stream_info->tf_stream, h2_stream_info->thread_id);
/*Adapt inconsistent client and server stream ids ***/
if (h2_session->plugin_built_resp)
{
stream_action = nghttp2_submit_built_response(h2_stream_info, h2_session);
return stream_action;
}
headers = &req->header;
if (headers->nvlen <= 0)
{
return ACTION_FORWARD_DATA;
}
method = nghttp2_get_method(h2_session->req);
if (method == (enum tfe_http_std_method)NGHTTP2_METHOD_POST || method == (enum tfe_http_std_method)NGHTTP2_METHOD_PUT)
{
if (h2_session->plugin_built_req != NULL)
{
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
return stream_action;
}
}
nghttp2_nv hdrs[headers->nvlen];
/**Register the stream id as -1 and read the next stream id */
nghttp2_session_set_next_stream_id(h2_stream_info->as_client, h2_session->ngh2_stream_id);
stream_id = nghttp2_submit_headers(h2_stream_info->as_client, headers->flag,
-1, NULL, tfe_h2_header_modify_field(headers, hdrs, ":path", req->url_storage),
headers->nvlen, h2_session);
if (stream_id < 0){
TFE_LOG_ERROR(logger()->handle, "Could not submit request: %s",
nghttp2_strerror(stream_id));
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
stream_action = ACTION_DROP_DATA;
finish:
delete_nv_packet_data(headers);
return stream_action;
}
static int
nghttp2_client_submit_header(struct tfe_h2_stream *h2_stream_info, int32_t stream_id)
{
int xret = -1;
struct tfe_h2_half_private *req = NULL;
struct tfe_h2_session *h2_session = NULL;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(h2_stream_info->as_server,
stream_id);
if (!h2_session){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
assert(h2_session->ngh2_stream_id == stream_id);
req = h2_session->req;
fill_req_spec_from_handle(h2_session->req);
//h2_stream_info->as_client->last_sent_stream_id = MIN(h2_stream_info->as_client->last_sent_stream_id, stream_id) - 1;
req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user);
nghttp2_write_log(h2_session, h2_stream_info->tf_stream->str_stream_info, CONN_DIR_DOWNSTREAM);
stream_action = nghttp2_client_frame_submit_header(h2_stream_info, h2_session);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(h2_stream_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
finish:
h2_stream_info->stream_action = stream_action;
return 0;
}
static int
nghttp2_submit_frame_header(struct tfe_h2_stream *h2_stream_info,const nghttp2_frame *frame,
enum tfe_conn_dir dir)
{
int xret = 0;
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS)
{
if (dir == CONN_DIR_UPSTREAM)
{
xret = nghttp2_server_submit_header(h2_stream_info, frame->hd.stream_id);
}
if (dir == CONN_DIR_DOWNSTREAM)
{
xret = nghttp2_client_submit_header(h2_stream_info, frame->hd.stream_id);
}
}
return xret;
}
nghttp2_frame_callback nghttp2_frame_callback_array[] = {
[NGHTTP2_DATA] = nghttp2_submit_frame_data,
[NGHTTP2_HEADERS] = nghttp2_submit_frame_header,
[NGHTTP2_PRIORITY] = nghttp2_submit_frame_priority,
[NGHTTP2_RST_STREAM] = nghttp2_submit_frame_rst_stream,
[NGHTTP2_SETTINGS] = nghttp2_submit_frame_settings,
[NGHTTP2_PUSH_PROMISE] = nghttp2_submit_frame_push_promise,
[NGHTTP2_PING] = nghttp2_submit_frame_ping,
[NGHTTP2_GOAWAY] = nghttp2_submit_frame_goaway,
[NGHTTP2_WINDOW_UPDATE] = nghttp2_submit_frame_window_update,
[NGHTTP2_CONTINUATION] = NULL,
[NGHTTP2_ALTSVC] = NULL,
};
static int
nghttp2_fill_up_header(nghttp2_session *ngh2_session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value, size_t valuelen,
uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
struct tfe_h2_header *h2_header = NULL;
if (dir == CONN_DIR_DOWNSTREAM &&
frame->headers.cat != NGHTTP2_HCAT_REQUEST){
return 0;
}
struct tfe_h2_session *h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(ngh2_session, frame->hd.stream_id);
if (!h2_session){
TFE_LOG_ERROR(logger()->handle, "Header stream id %d, can't find stream information",
frame->hd.stream_id);
return 0;
}
struct tfe_h2_half_private *half = (dir == CONN_DIR_UPSTREAM) ? h2_session->resp : h2_session->req;
struct http_field_name field;
field.field_id = (enum tfe_http_std_field)str_to_val((const char *)name, header_vals);
if (field.field_id == TFE_HTTP_UNKNOWN_FIELD)
{
field.field_name = (const char *)name;
}
if (field.field_id == TFE_HTTP_CONT_ENCODING)
{
half->h2_payload.gzip = method_to_str_idx((const char *)value);
}
h2_header = &half->header;
tfe_h2_header_add_field(h2_header, &field, (const char *)value, 1);
h2_header->flag = frame->hd.flags;
return 0;
}
static int
nghttp2_fill_up_promise(nghttp2_session *ngh2_session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
struct tfe_h2_header *headers = NULL;
struct tfe_h2_half_private *resp = NULL;
if (dir == CONN_DIR_DOWNSTREAM)
return 0;
struct tfe_h2_session *h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(ngh2_session, frame->hd.stream_id);
if (!h2_session){
TFE_LOG_ERROR(logger()->handle, "Promise stream id %d, can't find stream information",
frame->hd.stream_id);
return 0;
}
resp = h2_session->resp;
struct http_field_name field;
field.field_id = (enum tfe_http_std_field)str_to_val((const char *)name, header_vals);
if (field.field_id == TFE_HTTP_UNKNOWN_FIELD)
{
field.field_name = (const char *)name;
}
if (field.field_id == TFE_HTTP_CONT_ENCODING)
{
resp->h2_payload.gzip = method_to_str_idx((const char *)value);
}
headers = &resp->promised;
tfe_h2_header_add_field(headers, &field, (const char *)value, 1);
headers->flag = frame->hd.flags;
return 0;
}
static int
nghttp2_data_send(nghttp2_session *ngh2_session, const nghttp2_frame *frame, const uint8_t *data,
size_t length, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
int ret = -1;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
ret = tfe_stream_write(h2_stream_info->tf_stream, dir, data, length);
if (unlikely(ret < 0)){
assert(0);
}
return (ssize_t)length;
}
static int
nghttp2_on_stream_close(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data, enum tfe_conn_dir dir)
{
struct tfe_h2_session *h2_session = NULL;
struct tfe_h2_half_private *resp = NULL;
int32_t stream_id = frame->hd.stream_id;
uint32_t error_code = frame->goaway.error_code;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
h2_session = TAILQ_LIST_FIND(h2_stream_info, stream_id);
if (!h2_session)
return 0;
if (error_code != 0){
const char *str = (dir == CONN_DIR_UPSTREAM) ? "Simulation s" : "Simulation c";
TFE_LOG_DEBUG(logger()->handle, "%s close, id = %d, error_code = %d", str,
stream_id, error_code);
}
if (dir == CONN_DIR_DOWNSTREAM)
goto finish;
resp = h2_session->resp;
if (error_code == 0 && resp->body_state != H2_READ_STATE_COMPLETE){
if (resp->body_state == H2_READ_STATE_BEGIN &&
h2_stream_info->stream_action != ACTION_DEFER_DATA)
nghttp2_submit_header_by_not_modify(h2_stream_info, h2_session);
goto end;
}
finish:
TAILQ_REMOVE(&h2_stream_info->h2_session_list, h2_session, next);
if (h2_session->frame_ctx){
http_frame_raise_session_end(h2_session->frame_ctx, h2_session->tf_stream, &h2_session->tfe_session,
h2_stream_info->thread_id);
h2_session->frame_ctx = NULL;
}
delete_http2_stream_data(h2_session, h2_stream_info->tf_stream, 1);
free(h2_session);
h2_session = NULL;
end:
return 0;
}
nghttp2_callback nghttp2_callback_array[] = {
[NGHTTP2_DATA] = NULL,
[NGHTTP2_HEADERS] = nghttp2_fill_up_header,
[NGHTTP2_PRIORITY] = NULL,
[NGHTTP2_RST_STREAM] = NULL,
[NGHTTP2_SETTINGS] = NULL,
[NGHTTP2_PUSH_PROMISE] = nghttp2_fill_up_promise,
[NGHTTP2_PING] = NULL,
[NGHTTP2_GOAWAY] = NULL,
[NGHTTP2_WINDOW_UPDATE] = NULL,
[NGHTTP2_CONTINUATION] = NULL,
[NGHTTP2_ALTSVC] = NULL,
[NGHTTP2_USER_SEND] = nghttp2_data_send,
[NGHTTP2_USER_COLSE] = nghttp2_on_stream_close
};
static ssize_t
nghttp2_client_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){
return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length,
NULL, 0, flags, user_data, CONN_DIR_UPSTREAM);
}
return (ssize_t)length;
}
static int
nghttp2_client_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){
return nghttp2_frame_callback_array[frame->hd.type](h2_stream_info, frame, CONN_DIR_UPSTREAM);
}
return 0;
}
static int
nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *input,
size_t input_len, void *user_data)
{
size_t len;
char *uncompr = NULL;
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
int uncompr_len = 0, __attribute__((__unused__))ret = 0;
const unsigned char *data;
struct tfe_h2_half_private * resp = NULL;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
/*proc build resp*/
struct tfe_h2_session *h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(session, stream_id);
if (h2_session == NULL)
{
return 0;
}
resp = h2_session->resp;
if (resp == NULL)
{
h2_stream_info->stream_action = ACTION_DROP_DATA;
return 0;
}
evbuffer_add(resp->h2_payload.evbuf_body, input, input_len);
if (resp->h2_payload.gzip != HTTP2_CONTENT_ENCODING_NONE)
{
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&resp->h2_payload.inflate, resp->h2_payload.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr_len > 0)
{
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
}
else
{
stream_action = nghttp2_submit_data_by_user(h2_stream_info, h2_session, CONN_DIR_UPSTREAM);
if (stream_action == ACTION_DROP_DATA)
{
xret = nghttp2_session_send(h2_stream_info->as_server);
if (xret != 0)
{
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream(%d) send error: %s\n",stream_id, nghttp2_strerror(xret));
}
}
h2_stream_info->stream_action = stream_action;
return 0;
}
}
data = input;
len = input_len;
if (resp->body_state == H2_READ_STATE_BEGIN)
{
if (resp->event_cb)
{
resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len,
resp->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM)
{
resp->h2_payload.flags = NGHTTP2_FLAG_NONE;
}else
{
resp->h2_payload.flags = flags;
}
resp->body_state = H2_READ_STATE_READING;
}
if (resp->body_state == H2_READ_STATE_READING)
{
if (resp->event_cb)
{
resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len,
resp->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM)
{
resp->h2_payload.flags = NGHTTP2_FLAG_NONE;
}else
{
resp->h2_payload.flags = flags;
}
}
if (uncompr_len) FREE(&uncompr);
stream_action = nghttp2_submit_data_by_h2_half(h2_stream_info, h2_session, CONN_DIR_UPSTREAM);
if (stream_action == ACTION_DROP_DATA)
{
xret = nghttp2_session_send(h2_stream_info->as_server);
if (xret != 0)
{
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream(%d) send error: %s\n",stream_id, nghttp2_strerror(xret));
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
h2_stream_info->stream_action = stream_action;
return 0;
}
static int
nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
nghttp2_frame frame;
memset(&frame, 0, sizeof(frame));
frame.hd.stream_id = stream_id;
frame.goaway.error_code = error_code;
if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){
return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0,
NULL, 0, 0, user_data, CONN_DIR_UPSTREAM);
}
return 0;
}
static int
nghttp2_client_on_header(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
if ( nghttp2_callback_array[frame->hd.type] != NULL){
return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen,
value, valuelen, flags, user_data, CONN_DIR_UPSTREAM);
}
return 0;
}
static struct tfe_h2_session*
create_upstream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_h2_stream *h2_stream_info)
{
struct tfe_h2_session *h2_session = NULL;
struct user_event_dispatch *event = NULL;
struct tfe_h2_half_private *half_private = NULL;
h2_session = TAILQ_LIST_FIND(h2_stream_info, stream_id);
if (h2_session == NULL){
goto resp;
}
if (h2_session->resp){
/** todo:When the data of the reply is pushed as promised,
there is no stream id at the reply end. to create it*/
goto finish;
}
resp:
h2_session = (struct tfe_h2_session *)ALLOC(struct tfe_h2_session, 1);
assert(h2_session);
h2_session->ngh2_stream_id = stream_id;
h2_session->tf_stream = h2_stream_info->tf_stream;
h2_session->resp = tfe_half_private_init(TFE_HTTP_RESPONSE, stream_id, h2_stream_info->as_server);
tfe_half_session_init(h2_session, stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = h2_stream_info->thread_id;
event->tf_stream = h2_stream_info->tf_stream;
event->tfe_session = &h2_session->tfe_session;
half_set_callback(h2_session->resp, event, free);
half_private = h2_session->resp;
if (h2_session->frame_ctx == NULL)
{
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_STREAM_LOG_ERROR(h2_session, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, h2_stream_info->tf_stream,
&h2_session->tfe_session, h2_stream_info->thread_id);
h2_session->frame_ctx = half_private->frame_ctx;
}
h2_session->resp->frame_ctx = h2_session->frame_ctx;
nghttp2_session_set_stream_user_data(session, stream_id, h2_session);
finish:
return h2_session;
}
static ssize_t nghttp2_client_select_padding_callback(nghttp2_session *session,
const nghttp2_frame *frame,
size_t max_payloadlen, void *user_data)
{
struct tfe_h2_half_private *resp = NULL;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
struct tfe_h2_session *h2_session = TAILQ_LIST_FIND(h2_stream_info, frame->hd.stream_id);
if (h2_session == NULL) return frame->hd.length;
resp = h2_session->resp;
if (resp == NULL)
return frame->hd.length;
return (ssize_t)MIN(max_payloadlen, frame->hd.length + (resp->h2_payload.padlen));
}
static int
nghttp2_client_on_begin_headers(nghttp2_session * session,
const nghttp2_frame * frame,
void * user_data)
{
(void)session;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
switch(frame->hd.type){
case NGHTTP2_HEADERS:
create_upstream_data(session, frame->hd.stream_id, h2_stream_info);
break;
default:
break;
}
return 0;
}
static
void client_session_init(struct tfe_h2_stream *h2_stream_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks,
nghttp2_client_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_client_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks,
nghttp2_client_on_data_chunk_recv);
nghttp2_session_callbacks_set_select_padding_callback(callbacks,
nghttp2_client_select_padding_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks,
nghttp2_client_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_client_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks,
nghttp2_client_on_begin_headers);
nghttp2_session_client_new(&h2_stream_info->as_client, callbacks, h2_stream_info);
nghttp2_session_callbacks_del(callbacks);
}
static ssize_t
nghttp2_server_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
if ( nghttp2_callback_array[NGHTTP2_USER_SEND] != NULL){
return (ssize_t)nghttp2_callback_array[NGHTTP2_USER_SEND](session, NULL, data, length,
NULL, 0, flags, user_data, CONN_DIR_DOWNSTREAM);
}
return (ssize_t)length;
}
static int
nghttp2_server_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
if ( nghttp2_frame_callback_array[frame->hd.type] != NULL){
return nghttp2_frame_callback_array[frame->hd.type](h2_stream_info, frame, CONN_DIR_DOWNSTREAM);
}
return 0;
}
static int
nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
nghttp2_frame frame;
memset(&frame, 0, sizeof(frame));
frame.hd.stream_id = stream_id;
frame.goaway.error_code = error_code;
if ( nghttp2_callback_array[NGHTTP2_USER_COLSE] != NULL){
return nghttp2_callback_array[NGHTTP2_USER_COLSE](session, &frame, NULL, 0,
NULL, 0, 0, user_data, CONN_DIR_DOWNSTREAM);
}
return 0;
}
static int
nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
if ( nghttp2_callback_array[frame->hd.type] != NULL){
return nghttp2_callback_array[frame->hd.type](session, frame, name, namelen, value,
valuelen, flags, user_data,CONN_DIR_DOWNSTREAM);
}
return 0;
}
static void
create_serv_stream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_h2_stream *h2_stream_info)
{
struct tfe_h2_session *h2_session = NULL;
struct user_event_dispatch *event = NULL;
struct tfe_h2_half_private *half_private = NULL;
h2_session = TAILQ_LIST_FIND(h2_stream_info, stream_id);
if (h2_session != NULL){
nghttp2_session_set_stream_user_data(session, stream_id, h2_session);
goto finish;
}
h2_session = (struct tfe_h2_session *)ALLOC(struct tfe_h2_session, 1);
assert(h2_session);
h2_session->ngh2_stream_id = stream_id;
h2_session->session = h2_stream_info->as_server;
h2_session->tf_stream = h2_stream_info->tf_stream;
h2_session->req = tfe_half_private_init(TFE_HTTP_REQUEST, 0, NULL);
tfe_half_session_init(h2_session, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = h2_stream_info->thread_id;
event->tf_stream = h2_stream_info->tf_stream;
event->tfe_session = &h2_session->tfe_session;
half_set_callback(h2_session->req, event, free);
/* Call business plugin */
half_private = h2_session->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_LOG_ERROR(logger()->handle, "Failed at raising session begin event. ");
goto finish;
}
h2_session->frame_ctx = half_private->frame_ctx;
http_frame_raise_session_begin(half_private->frame_ctx, h2_stream_info->tf_stream,
&h2_session->tfe_session, h2_stream_info->thread_id);
h2_session->father_stream = h2_stream_info;
TAILQ_INSERT_TAIL(&h2_stream_info->h2_session_list, h2_session, next);
nghttp2_session_set_stream_user_data(session, stream_id, h2_session);
finish:
return;
}
static int
nghttp2_server_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *input,
size_t input_len, void *user_data)
{
size_t __attribute__((__unused__))len;
char *uncompr = NULL;
int xret = -1;
enum tfe_stream_action stream_action = ACTION_DROP_DATA;
int uncompr_len = 0, __attribute__((__unused__))ret = 0;
const unsigned char __attribute__((__unused__))*data;
struct tfe_h2_half_private * req = NULL;
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
struct tfe_h2_session *h2_session = (struct tfe_h2_session *)nghttp2_session_get_stream_user_data(session, stream_id);
if (!h2_session){
TFE_LOG_ERROR(logger()->handle, "On data callback can't get downstream information, id = %d",
stream_id);
goto finish;
}
req = h2_session->req;
req->h2_payload.flags = flags;
evbuffer_add(req->h2_payload.evbuf_body, input, input_len);
if (req->h2_payload.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&req->h2_payload.inflate, req->h2_payload.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
}
else
{
if (uncompr_len) FREE(&uncompr);
}
}
data = input;
len = input_len;
/*todo post data scan**/
if (req->body_state == H2_READ_STATE_BEGIN){
if (req->event_cb)
{
req->event_cb(req, EV_HTTP_REQ_BODY_BEGIN, NULL, len,
req->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM)
{
req->h2_payload.flags = NGHTTP2_FLAG_NONE;
}else
{
req->h2_payload.flags = flags;
}
req->body_state = H2_READ_STATE_READING;
}
if (req->body_state == H2_READ_STATE_READING)
{
if (req->event_cb)
{
req->event_cb(req, EV_HTTP_REQ_BODY_CONT, data, len,
req->event_cb_user);
}
if (flags == NGHTTP2_FLAG_END_STREAM){
req->h2_payload.flags = NGHTTP2_FLAG_NONE;
}else{
req->h2_payload.flags = flags;
}
}
if (uncompr_len) FREE(&uncompr);
stream_action = nghttp2_submit_data_by_h2_half(h2_stream_info, h2_session, CONN_DIR_DOWNSTREAM);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(h2_stream_info->as_client);
if (xret != 0)
{
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(logger()->handle, "Fatal upstream send error: %s, %d\n",nghttp2_strerror(xret), __LINE__);
}
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
h2_stream_info->stream_action = stream_action;
finish:
return 0;
}
static int
nghttp2_server_on_begin_headers(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
struct tfe_h2_stream *h2_stream_info = (struct tfe_h2_stream *)user_data;
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
return 0;
}
create_serv_stream_data(session, frame->hd.stream_id, h2_stream_info);
return 0;
}
static void
server_session_init(struct tfe_h2_stream *h2_stream_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_server_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks,
nghttp2_server_on_data_chunk_recv);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, nghttp2_server_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_server_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, nghttp2_server_on_begin_headers);
nghttp2_session_server_new(&h2_stream_info->as_server, callbacks, h2_stream_info);
h2_stream_info->as_server->opt_flags |= NGHTTP2_OPTMASK_NO_CLOSED_STREAMS;
nghttp2_session_callbacks_del(callbacks);
}
static void
delete_server_session_data(struct tfe_h2_stream *h2_stream_info)
{
struct tfe_h2_session *h2_session;
struct tfe_h2_session *peer_h2_stream;
nghttp2_session_del(h2_stream_info->as_server);
h2_stream_info->as_server = NULL;
TAILQ_FOREACH_SAFE(h2_session, &h2_stream_info->h2_session_list, next, peer_h2_stream)
{
TAILQ_REMOVE(&h2_stream_info->h2_session_list, h2_session, next);
if (h2_session->frame_ctx){
http_frame_raise_session_end(h2_session->frame_ctx, h2_stream_info->tf_stream, &h2_session->tfe_session,
h2_stream_info->thread_id);
h2_session->frame_ctx = NULL;
}
delete_http2_stream_data(h2_session, h2_stream_info->tf_stream, 1);
free(h2_session);
h2_session = NULL;
}
}
static void
delete_client_session_data(struct tfe_h2_stream *h2_stream_info)
{
struct tfe_h2_session *h2_session = NULL;
struct tfe_h2_session *peer_h2_stream;
nghttp2_session_del(h2_stream_info->as_client);
h2_stream_info->as_client = NULL;
TAILQ_FOREACH_SAFE(h2_session, &h2_stream_info->h2_session_list, next, peer_h2_stream){
TAILQ_REMOVE(&h2_stream_info->h2_session_list, h2_session, next);
if (h2_session->frame_ctx){
http_frame_raise_session_end(h2_session->frame_ctx, h2_stream_info->tf_stream, &h2_session->tfe_session,
h2_stream_info->thread_id);
h2_session->frame_ctx = NULL;
}
delete_http2_stream_data(h2_session, h2_stream_info->tf_stream, 1);
free(h2_session);
h2_session = NULL;
}
}
enum tfe_stream_action
detect_up_stream_protocol(struct tfe_h2_stream *h2_stream_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
h2_stream_info->tf_stream = tfe_stream;
h2_stream_info->thread_id = thread_id;
if (!h2_stream_info->as_client)
goto forward;
readlen = nghttp2_session_mem_recv(h2_stream_info->as_client, data, len);
if (readlen < 0){
TFE_LOG_ERROR(logger()->handle, "Failed to process server requests. Link message %s",
tfe_stream->str_stream_info);
delete_client_session_data(h2_stream_info);
goto forward;
}
stream_action = h2_stream_info->stream_action;
h2_stream_info->stream_action = ACTION_DROP_DATA;
if (h2_stream_info->goaway){
nghttp2_disect_goaway(h2_stream_info);
h2_stream_info->goaway = 0;
}
if (h2_stream_info->kill_signal)
{
nghttp2_disect_goaway(h2_stream_info);
tfe_stream_kill(tfe_stream);
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
forward:
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
return stream_action;
}
enum tfe_stream_action
detect_down_stream_protocol(struct tfe_h2_stream *h2_stream_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
h2_stream_info->tf_stream = tfe_stream;
h2_stream_info->thread_id = thread_id;
if (!h2_stream_info->as_server)
goto forward;
readlen = nghttp2_session_mem_recv(h2_stream_info->as_server, data, len);
if (readlen < 0){
TFE_LOG_ERROR(logger()->handle, "Failed to process client requests. Link message %s",
tfe_stream->str_stream_info);
delete_server_session_data(h2_stream_info);
goto forward;
}
stream_action = h2_stream_info->stream_action;
h2_stream_info->stream_action = ACTION_DROP_DATA;
if (h2_stream_info->goaway){
nghttp2_disect_goaway(h2_stream_info);
h2_stream_info->goaway = 0;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
forward:
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
return stream_action;
}
void
sess_data_ctx_fini(struct tfe_h2_stream *h2_stream_info,
const struct tfe_stream * stream, unsigned int thread_id)
{
struct tfe_h2_session *h2_session = NULL;
struct tfe_h2_session *peer_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_session, &h2_stream_info->h2_session_list, next, peer_h2_stream){
TAILQ_REMOVE(&h2_stream_info->h2_session_list, h2_session, next);
if (h2_session->frame_ctx){
http_frame_raise_session_end(h2_session->frame_ctx, stream, &h2_session->tfe_session,
thread_id);
h2_session->frame_ctx = NULL;
}
delete_http2_stream_data(h2_session, h2_stream_info->tf_stream, 1);
free(h2_session);
h2_session = NULL;
}
if (h2_stream_info->as_client){
nghttp2_session_del(h2_stream_info->as_client);
h2_stream_info->as_client = NULL;
}
if (h2_stream_info->as_server){
nghttp2_session_del(h2_stream_info->as_server);
h2_stream_info->as_server = NULL;
}
}
struct tfe_h2_stream* tfe_session_info_init()
{
struct tfe_h2_stream *h2_stream_info = NULL;
h2_stream_info = ALLOC(struct tfe_h2_stream, 1);
assert(h2_stream_info);
memset(h2_stream_info, 0, sizeof(struct tfe_h2_stream));
h2_stream_info->stream_action = ACTION_DROP_DATA;
h2_stream_info->goaway = 0;
server_session_init(h2_stream_info);
client_session_init(h2_stream_info);
TAILQ_INIT(&h2_stream_info->h2_session_list);
return h2_stream_info;
}