This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/plugin/protocol/http2/src/http2_stream.cpp

2001 lines
60 KiB
C++
Raw Normal View History

/*************************************************************************
> File Name: http2_stream.c
> Author:
> Mail:
> Created Time:
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <stdint.h>
#include <math.h>
#include <sys/param.h>
#include <zlib.h>
#include <event2/buffer.h>
#include <tfe_utils.h>
#include <tfe_stream.h>
#include <nghttp2/nghttp2.h>
#include <http2_stream.h>
#include <http2_common.h>
#include <nghttp2_session.h>
#include <nghttp2_map.h>
static const struct value_string method_vals[] =
{
{NGHTTP2_METHOD_DELETE, "DELETE"},
{NGHTTP2_METHOD_GET, "GET"},
{NGHTTP2_METHOD_HEAD, "HEAD"},
{NGHTTP2_METHOD_POST, "POST"},
{NGHTTP2_METHOD_PUT, "PUT"},
{NGHTTP2_METHOD_CONNECT, "CONNECT"},
{NGHTTP2_METHOD_UNKNOWN, "unknown"},
};
static const struct value_string headers_vals[] =
{
{TFE_HTTP_UNKNOWN_FIELD, "unkown"},
{TFE_HTTP_HOST, ":authority"},
{TFE_HTTP_REFERER, "referer"},
{TFE_HTTP_USER_AGENT, "user-agent"},
{TFE_HTTP_COOKIE, "cookie"},
{TFE_HTTP_SET_COOKIE, "set-cookie"},
{TFE_HTTP_PROXY_AUTHORIZATION, "proxy-authorization"},
{TFE_HTTP_AUTHORIZATION, "authorization"},
{TFE_HTTP_LOCATION, "location"},
{TFE_HTTP_SERVER, "server"},
{TFE_HTTP_ETAG, "etag"},
{TFE_HTTP_DATE, "date"},
{TFE_HTTP_TRAILER, "Trailer"},
{TFE_HTTP_TRANSFER_ENCODING, "transfer-encoding"},
{TFE_HTTP_VIA, "via"},
{TFE_HTTP_PRAGMA, "Pragma"},
{TFE_HTTP_CONNECTION, "connection"},
{TFE_HTTP_CONT_ENCODING, "content-encoding"},
{TFE_HTTP_CONT_LANGUAGE, "content-language"},
{TFE_HTTP_CONT_LOCATION, "content-location"},
{TFE_HTTP_CONT_RANGE, "content-range"},
{TFE_HTTP_CONT_LENGTH, "content-length"},
{TFE_HTTP_CONT_TYPE, "content-type"},
{TFE_HTTP_CONT_DISPOSITION, "content-disposition"},
{TFE_HTTP_EXPIRES, "expires"},
{TFE_HTTP_ACCEPT_ENCODING, "accept-encoding"},
{TFE_HTTP_CACHE_CONTROL, "cache-control"},
{TFE_HTTP_IF_MATCH, "if-match"},
{TFE_HTTP_IF_NONE_MATCH, "if-none-match"},
{TFE_HTTP_IF_MODIFIED_SINCE, "if-modified-since"},
{TFE_HTTP_IF_UNMODIFIED_SINCE, "if-unmodified-since"},
{TFE_HTTP_LAST_MODIFIED, "last-modified"},
};
/* ACK Header : 0x000x000x000x040x010x000x000x000x00*/
static uint8_t ackheader[] = {
0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00
};
static int ping(const unsigned char *data, size_t len)
{
if (len >= 8 &&
(data[3] == 0x6 &&
(data[4] == 0x1 || data[4] == 0x0) &&
(data[16] == 0x0 || data[16] == 0x1))){
return 1;
}
return 0;
}
struct user_event_dispatch
{
const struct tfe_stream *tf_stream;
const struct tfe_http_session * tfe_session;
unsigned int thread_id;
};
/*up stream */
static struct h2_stream_data_t *
TAILQ_LIST_FIND(struct tfe_session_info_t *session_info, int32_t stream_id)
{
struct h2_stream_data_t *stream = NULL, *_next_stream = NULL;
TAILQ_FOREACH_SAFE(stream, &session_info->list, next, _next_stream)
{
if (stream->stream_id == stream_id){
break;
}
}
return stream;
}
/** begin */
/** headers list, After the test is completed, you can delete it */
#define foreach_headers(list, entry) \
for(entry = ((struct http2_headers*)(list))->head; entry; entry = entry->next)
static struct header_data*
headers_del(struct http2_headers *list, struct header_data *entry)
{
if (entry->prev)
entry->prev->next = entry->next;
else
list->head = entry->next;
if (entry->next)
entry->next->prev = entry->prev;
else
list->tail = entry->prev;
entry->next = entry->prev = NULL;
list->nvlen--;
return entry;
}
void
headers_add_head(struct http2_headers *list, struct header_data *entry)
{
if (list->head){
entry->next = list->head;
list->head->prev = entry;
}else{
list->tail = entry;
}
list->head = entry;
list->nvlen++;
}
void
headers_add_tail(struct http2_headers *list, struct header_data *entry)
{
entry->next = NULL;
entry->prev = list->tail;
if (list->tail)
list->tail->next = entry;
else
list->head = entry;
list->tail = entry;
list->nvlen++;
}
static inline void
headers_init(struct http2_headers *headers)
{
memset(headers, 0, sizeof(struct http2_headers));
headers->nvlen = 0;
headers->flag = 0;
headers->head = headers->tail = 0;
}
static int
method_to_str_idx(const char * method)
{
if (strcasestr(method, "gzip") != NULL)
return HTTP2_CONTENT_ENCODING_GZIP;
if (strcasestr(method, "x-gzip") != NULL)
return HTTP2_CONTENT_ENCODING_X_GZIP;
if (strcasestr(method, "deflate") != NULL)
return HTTP2_CONTENT_ENCODING_DEFLATE;
if (strcasestr(method, "bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_BZIP2;
if (strcasestr(method, "x-bzip2") != NULL)
return HTTP2_CONTENT_ENCODING_X_BZIP2;
if (strcasestr(method, "br") != NULL)
return HTTP2_CONTENT_ENCODING_BR;
return HTTP2_CONTENT_ENCODING_NONE;
}
static void
stream_set_id(struct h2_run_id *rid, int32_t stream_id)
{
int i =0;
for (i = 0; i < rid->num; i++){
if (rid->id[i] == stream_id){
return;
}
}
rid->id[rid->num] = stream_id;
rid->num++;
}
static void
nghttp2_stream_disable_rid(struct h2_run_id *rid)
{
int i = 0;
for (i = 0; i < rid->num; i++){
rid->id[i] = 0;
}
rid->num = 0;
}
/** end */
static int event_dispatch_cb(struct http2_half_private * half_private,
enum tfe_http_event ev, const unsigned char * data, size_t len, void * user)
{
struct user_event_dispatch *event = (struct user_event_dispatch *)user;
struct http_frame_session_ctx *frame_ctx = half_private->frame_ctx;
http_frame_raise_event(frame_ctx, event->tf_stream, (struct tfe_http_session *)event->tfe_session,
ev, data, len, event->thread_id);
return 0;
}
void half_set_callback(struct http2_half_private * half_private,
void * user, void (* user_deleter)(void *))
{
if (half_private->event_cb == NULL)
half_private->event_cb = event_dispatch_cb;
half_private->event_cb_user = user;
half_private->event_cb_user_deleter = user_deleter;
}
static const char *
half_ops_field_read(const struct tfe_http_half * half, const struct http_field_name * field)
{
const char *value = NULL;
struct header_data *header = NULL;
const struct http2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
goto finish;
foreach_headers(&half_private->headers, header){
if (header->field.field_id == field->field_id){
break;
}
}
if (header == NULL) goto finish;
value = (const char *)header->nv.value;
finish:
return value;
}
static int
half_ops_field_write(struct tfe_http_half * half, const struct http_field_name * name, const char * value)
{
int is_exist = 0;
struct header_data *header = NULL;
struct http2_half_private *half_private = nghttp2_to_half_private(half);
struct http2_headers *headers = &half_private->headers;
if (unlikely(half_private == NULL) || unlikely(headers == NULL))
goto finish;
foreach_headers(headers, header){
if (header->field.field_id == TFE_HTTP_UNKNOWN_FIELD)
continue;
if (header->field.field_id == name->field_id){
free(header->nv.value);
header->nv.value = (uint8_t*)tfe_strdup(value);
header->nv.valuelen = strlen(value);
header->field.field_name = (const char *)header->nv.name;
is_exist = 1;
break;
}
}
if (is_exist == 0){
header = ALLOC(struct header_data, 1);
memset(header, 0, sizeof(struct header_data));
if (name->field_id == TFE_HTTP_UNKNOWN_FIELD){
header->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
header->nv.name = (uint8_t *)tfe_strdup(name->field_name);
header->nv.namelen = strlen(name->field_name);
header->field.field_name = (const char *)header->nv.name;;
}else{
const char *std_name = val_to_str(name->field_id, headers_vals);
header->field.field_id = name->field_id;
header->field.field_name = NULL;
header->nv.name = (uint8_t *)tfe_strdup((const char *)std_name);
header->nv.namelen = strlen(std_name);
}
header->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
header->nv.valuelen = strlen(value);
headers_add_tail(headers, header);
}
finish:
return 0;
}
static struct tfe_http_half *
half_ops_allow_write(const struct tfe_http_half * half)
{
return (struct tfe_http_half *) half;
}
static const char *
half_ops_field_iterate(const struct tfe_http_half * half, void ** iter, struct http_field_name * field)
{
const char *value = NULL;
struct header_data **head = (struct header_data **)iter;
const struct http2_half_private *half_private = nghttp2_to_half_private(half);
if (unlikely(half_private == NULL))
goto finish;
if (NULL == *head){
*head = half_private->headers.head;
}else{
*head = (*head)->next;
}
if (NULL == *head)
goto finish;
field->field_id = (*head)->field.field_id;
field->field_name = (*head)->field.field_name;
value = (const char *)(*head)->nv.value;
finish:
return value;
}
static int
half_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag)
{
int xret = -1;
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
if (buff == NULL && size == 0){
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, NULL, 0, resp->body.evbuf_body, body->gzip, 1);
}
resp->message_state = MANAGE_STAGE_COMPLETE;
goto finish;
}
if (resp->body.evbuf_body == NULL){
resp->body.evbuf_body = evbuffer_new();
}
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)buff, size,
resp->body.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, buff, size);
}
finish:
return xret;
}
static void
delete_nv_packet_data(struct http2_headers *headers)
{
struct header_data *header = NULL;
foreach_headers(headers, header){
free(header->nv.name);
header->nv.name = NULL;
header->nv.namelen = 0;
free(header->nv.value);
header->nv.value = NULL;
header->nv.valuelen = 0;
header->nv.flags = 0;
headers_del(headers, header);
}
headers->nvlen = 0;
headers->flag = 0;
headers->head = headers->tail = NULL;
}
void delete_stream_half_data(struct http2_half_private **data,
int body_flag)
{
if (*data){
struct data_t *body = &((*data)->body);
inflate_finished(&body->inflater);
deflate_finished(&body->deflate);
if (body->evbuf_body && body_flag){
evbuffer_free(body->evbuf_body);
body->evbuf_body = NULL;
}
if ((*data)->url_storage)
FREE(&((*data)->url_storage));
delete_nv_packet_data(&((*data)->headers));
if((*data)->event_cb_user_deleter != NULL)
(*data)->event_cb_user_deleter((*data)->event_cb_user);
free(*data);
*data = NULL;
}
return;
}
void half_ops_free(struct tfe_http_half * half)
{
struct http2_half_private * h2_private = nghttp2_to_half_private(half);
delete_stream_half_data(&h2_private, 1);
free(h2_private);
h2_private = NULL;
return;
}
int h2_half_ops_body_begin(struct tfe_http_half * half, int by_stream)
{
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
assert(body->evbuf_body == NULL);
if (by_stream)
{
if (body->inflater){
inflate_finished(&body->inflater);
}
if (body->deflate){
deflate_finished(&body->deflate);
}
body->gzip = HTTP2_CONTENT_ENCODING_NONE;
resp->message_state = MANAGE_STAGE_READING;
}
body->evbuf_body = evbuffer_new();
return 0;
}
int h2_half_ops_body_data(struct tfe_http_half * half, const unsigned char * data, size_t sz_data)
{
int xret = -1;
struct http2_half_private * resp = nghttp2_to_half_private(half);
struct data_t *body = &resp->body;
if (body->gzip != HTTP2_CONTENT_ENCODING_NONE){
xret = deflate_write(&body->deflate, (const uint8_t *)data, sz_data,
resp->body.evbuf_body, body->gzip, 0);
}else{
xret = evbuffer_add(resp->body.evbuf_body, data, sz_data);
}
return xret;
}
int h2_half_ops_body_end(struct tfe_http_half * half)
{
struct http2_half_private * resp = nghttp2_to_half_private(half);
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
return 0;
}
struct tfe_http_half_ops h2_half_ops =
{
.ops_http_field_read = half_ops_field_read,
.ops_http_field_write = half_ops_field_write,
.ops_http_allow_write = half_ops_allow_write,
.ops_http_field_iterate = half_ops_field_iterate,
.ops_append_body = half_ops_append_body,
.ops_body_begin = h2_half_ops_body_begin,
.ops_body_data = h2_half_ops_body_data,
.ops_body_end = h2_half_ops_body_end,
.ops_free = half_ops_free
};
static void
fill_resp_spec_from_handle(struct http2_half_private *half_private)
{
struct header_data *head = NULL;
struct tfe_http_resp_spec *resp_spec = &(half_private->half_public.resp_spec);
foreach_headers(&half_private->headers, head){
if (!strncmp((char *)(head->nv.name), ":status", strlen(":status"))){
resp_spec->resp_code = atoi((const char *)head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-type", strlen("content-type"))){
resp_spec->content_type = (const char *)(head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-encoding", strlen("content-encoding"))){
resp_spec->content_encoding = (const char *)(head->nv.value);
continue;
}
if (!strncmp((char *)(head->nv.name), "content-length", strlen("content-length"))){
resp_spec->content_length = (const char *)(head->nv.value);
continue;
}
}
resp_spec->content_length = 0;
return;
}
static void
fill_req_spec_from_handle(struct http2_half_private *half_private)
{
int urllen = 0;
struct header_data *head = NULL;
struct tfe_http_req_spec *req_spec = &(half_private->half_public.req_spec);
foreach_headers(&half_private->headers, head){
if (!strncmp((char *)(head->nv.name), ":method", strlen(":method"))){
req_spec->method = (enum tfe_http_std_method)str_to_val((const char *)(head->nv.value), method_vals);
continue;
}
if (!strncmp((char *)(head->nv.name), ":authority", strlen(":authority"))){
req_spec->host = (const char *)(head->nv.value);
urllen += head->nv.valuelen;
continue;
}
if (!strncmp((char *)(head->nv.name), ":path", strlen(":path"))){
req_spec->uri = (const char*)(head->nv.value);
urllen += head->nv.valuelen;
continue;
}
}
char *urltmp = NULL;
urltmp = (char *)malloc(urllen + 1);
if(urltmp){
sprintf(urltmp, "%s%s", (char *)req_spec->host, (char *)req_spec->uri);
req_spec->url = urltmp;
}
return;
}
static struct http2_half_private*
tfe_half_private_init(enum tfe_http_direction direction)
{
struct http2_half_private *half_private = ALLOC(struct http2_half_private, 1);
assert(half_private);
memset(half_private, 0, sizeof(struct http2_half_private));
half_private->half_public.direction = direction;
half_private->half_public.ops = &h2_half_ops;
headers_init(&half_private->headers);
half_private->body.evbuf_body = evbuffer_new();
half_private->body.gzip = HTTP2_CONTENT_ENCODING_NONE;
half_private->body_state = MANAGE_STAGE_INIT;
half_private->message_state = MANAGE_STAGE_INIT;
return half_private;
}
static struct tfe_http_session*
h2_ops_allow_write(const struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if ( http_frame_currect_plugin_preempt(stream_data->frame_ctx) == 0){
return (struct tfe_http_session *)session;
}
return NULL;
}
void h2_ops_detach(const struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
return http_frame_currect_plugin_detach(stream_data->frame_ctx);
}
void h2_ops_drop(struct tfe_http_session * session)
{
return;
}
void h2_ops_suspend(struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
stream_data->spd_set = 1;
}
void h2_ops_resume(struct tfe_http_session * session)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data((struct tfe_http_session *)session);
if (stream_data->spd_valid){
stream_data->rse_set = 1;
}
}
void h2_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
struct http2_half_private *half_user = nghttp2_to_half_private(req_user);
stream_data->pangu_req = half_user;
}
void h2_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
{
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
struct http2_half_private *half_user = nghttp2_to_half_private(resp);
stream_data->pangu_resp = half_user;
}
struct tfe_http_half * h2_ops_request_create(struct tfe_http_session * session,
enum tfe_http_std_method method, const char * uri)
{
struct http2_half_private * req = tfe_half_private_init(TFE_HTTP_REQUEST);
req->method_or_status = method;
req->url_storage = tfe_strdup(uri);
return &req->half_public;
}
struct tfe_http_half * h2_ops_response_create(struct tfe_http_session * session, int resp_code)
{
struct http2_half_private * resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
resp->method_or_status = resp_code;
struct h2_stream_data_t *stream_data = nghttp2_to_stream_data(session);
if (stream_data->resp)
resp->body.gzip = stream_data->resp->body.gzip;
return &resp->half_public;
}
void h2_ops_kill(struct tfe_http_session * session)
{
}
struct tfe_http_session_ops nghttp2_session_ops =
{
.ops_allow_write = h2_ops_allow_write,
.ops_detach = h2_ops_detach,
.ops_drop = h2_ops_drop,
.ops_suspend = h2_ops_suspend,
.ops_resume = h2_ops_resume,
.ops_kill = h2_ops_kill,
.ops_request_set = h2_ops_request_set,
.ops_response_set = h2_ops_response_set,
.ops_request_create = h2_ops_request_create,
.ops_response_create = h2_ops_response_create
};
void
nghttp2_write_access_log(struct h2_stream_data_t *h2_stream, const char * str_stream_info)
{
/* Request */
struct http2_half_private *req = h2_stream->req;
/* Response */
struct http2_half_private *resp = h2_stream->resp;
/* Req-Public */
struct tfe_http_req_spec *req_spec = req ? &(req->half_public.req_spec) : NULL;
/* Resp-Public */
struct tfe_http_resp_spec *resp_spec = resp ? &(resp->half_public.resp_spec) : NULL;
const char * method = req_spec ? val_to_str(req_spec->method, method_vals) : "-";
const char * url = req_spec ? req_spec->url : "-";
char resp_code[TFE_STRING_MAX];
if (resp_spec)
snprintf(resp_code, sizeof(resp_code) - 1, "%d", resp_spec->resp_code);
else
snprintf(resp_code, sizeof(resp_code) - 1, "%s", "-");
const char * cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
const char * cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
const char * pangu_req = h2_stream->pangu_req ? "USER/REQ" : "-";
const char * pangu_resp = h2_stream->pangu_resp ? "USER/RESP" : "-";
//const char * __str_suspend = h2_stream->suspend_counter > 0 ? "SUSPEND" : "-";
char *access_log;
asprintf(&access_log, "%s %d %s %s HTTP2.0 %s %s %s %s %s", str_stream_info, h2_stream->tfe_session.session_id,
method, url, resp_code, cont_type, cont_encoding, pangu_req, pangu_resp);
TFE_LOG_INFO(rt_log_data()->run_log_handle, "%s", access_log);
free(access_log);
}
static int tfe_half_session_init(struct h2_stream_data_t *h2_stream, int32_t stream_id,
enum tfe_http_direction direction)
{
struct tfe_http_session *tfe_session = &h2_stream->tfe_session;
if (direction == TFE_HTTP_REQUEST){
struct http2_half_private *req = h2_stream->req;
tfe_session->ops = &nghttp2_session_ops;
tfe_session->req = &req->half_public;
tfe_session->session_id = stream_id;
}
if (direction == TFE_HTTP_RESPONSE){
struct http2_half_private *resp = h2_stream->resp;
tfe_session->resp = &resp->half_public;
}
return 0;
}
void delete_http2_stream_data(struct h2_stream_data_t *h2_stream,
const struct tfe_stream *tf_stream,
int body_flag)
{
if (tf_stream){
nghttp2_write_access_log(h2_stream, tf_stream->str_stream_info);
}
delete_stream_half_data(&h2_stream->req, body_flag);
delete_stream_half_data(&h2_stream->resp, body_flag);
}
static void
suspend_start(struct h2_stream_data_t *h2_stream,
struct http2_half_private *half, const struct tfe_stream *stream)
{
if (h2_stream->spd_valid != 1){
return;
}
enum tfe_http_event spd_event = h2_stream->spd_event;
/* Clean up suspend tag, we can support user's call suspend in this callback */
h2_stream->spd_event = (enum tfe_http_event)0;
h2_stream->spd_valid = 0;
/* */
if (h2_stream->rse_set){
tfe_stream_resume(stream);
}
/* Call user callback, tell user we resume from suspend */
h2_stream->rse_set = 0;
half->event_cb(half, spd_event, NULL, 0, half->event_cb_user);
return;
}
static int
suspend_stop(struct h2_stream_data_t *h2_stream,
const struct tfe_stream *tf_stream, enum tfe_conn_dir dir)
{
int xret = -1;
if (h2_stream->spd_set){
h2_stream->spd_valid = 1;
h2_stream->spd_set = 0;
tfe_stream_suspend(tf_stream, dir);
xret = 0;
}
return xret;
}
static ssize_t
nghttp2_client_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
(void)session;
(void)flags;
int ret = -1;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
ret = tfe_stream_write(session_info->tf_stream, CONN_DIR_UPSTREAM, data, length);
if (unlikely(ret < 0)){
assert(0);
}
return (ssize_t)length;
}
static int
nghttp2_client_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *resp = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch (frame->hd.type) {
case NGHTTP2_DATA:
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream)
break;
resp = h2_stream->resp;
h2_stream->frame_type |= TFE_NGHTTP2_DATA;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
break;
case NGHTTP2_HEADERS:
if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE &&
frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Upstream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
resp = h2_stream->resp;
suspend_start(h2_stream, resp, session_info->tf_stream);
fill_resp_spec_from_handle(h2_stream->resp);
resp->event_cb(resp, EV_HTTP_RESP_HDR, NULL, 0, resp->event_cb_user);
if (h2_stream->spd_set){
h2_stream->spd_event = EV_HTTP_RESP_HDR;
}
}
break;
case NGHTTP2_SETTINGS:
break;
case NGHTTP2_WINDOW_UPDATE:
break;
case NGHTTP2_GOAWAY:
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Upstream(as client) control frame goaway");
break;
}
return 0;
}
static int
nghttp2_client_on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *input,
size_t input_len, void *__attribute__((__unused__))user_data)
{
size_t len;
char *uncompr = NULL;
int uncompr_len = 0, __attribute__((__unused__))ret = 0;
const unsigned char *data;
struct http2_half_private * resp = NULL;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "On data callback can't get downstream information, id = %d",
stream_id);
goto finish;
}
resp = h2_stream->resp;
if (resp->body.gzip != HTTP2_CONTENT_ENCODING_NONE){
ret = inflate_read(input, input_len, &uncompr, &uncompr_len,
&resp->body.inflater, resp->body.gzip);
if (((ret == Z_STREAM_END) || (ret == Z_OK)) && uncompr > 0){
input = (const uint8_t*)uncompr;
input_len = uncompr_len;
}
}
data = input;
len = input_len;
if (resp->body_state == MANAGE_STAGE_INIT){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_BEGIN, NULL, len,
resp->event_cb_user);
}
resp->body.flags = flags;
resp->body_state = MANAGE_STAGE_READING;
}
if (resp->body_state == MANAGE_STAGE_READING){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_CONT, data, len,
resp->event_cb_user);
}
resp->body.flags = flags;
goto event;
}
if (flags & 0x01){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0,
resp->event_cb_user);
}
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0,
resp->event_cb_user);
}
resp->body.flags = flags;
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
}
event:
finish:
return 0;
}
static int
nghttp2_client_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
(void)error_code;
struct h2_stream_data_t *h2_stream;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Client stream close stream_id = %d, error_code = %d",
stream_id, error_code);
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (!h2_stream) {
return 0;
}
struct http2_half_private *resp = h2_stream->resp;
if (error_code == 0 && resp->body_state != MANAGE_STAGE_COMPLETE){
if (resp->body_state != MANAGE_STAGE_INIT){
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_BODY_END, NULL, 0,
resp->event_cb_user);
}
if (resp->event_cb) {
resp->event_cb(resp, EV_HTTP_RESP_END, NULL, 0,
resp->event_cb_user);
}
}
resp->body.flags = NGHTTP2_FLAG_END_STREAM;
resp->body_state = MANAGE_STAGE_COMPLETE;
resp->message_state = MANAGE_STAGE_COMPLETE;
h2_stream->frame_type |= TFE_NGHTTP2_DATA;
stream_set_id(&session_info->h2_id, stream_id);
}
return 0;
}
static int
nghttp2_client_on_header(nghttp2_session *session,
const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
(void)session;
(void)flags;
struct header_data *head = NULL;
struct http2_headers *headers = NULL;
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *resp = NULL;
enum tfe_http_std_field field_id = TFE_HTTP_UNKNOWN_FIELD;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Client Stream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
//printf("%d, %s %s, %d\n",frame->hd.stream_id, name, value, frame->hd.flags);
resp = h2_stream->resp;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
head->field.field_name = (const char *)head->nv.name;
}else{
if (field_id == TFE_HTTP_CONT_ENCODING){
resp->body.gzip = method_to_str_idx((const char *)value);
}
head->field.field_id = field_id;
head->field.field_name = NULL;
}
headers = &resp->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
h2_stream->frame_type |= TFE_NGHTTP2_HEADERS;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
break;
case NGHTTP2_PUSH_PROMISE:
if (frame->headers.cat != NGHTTP2_HCAT_REQUEST)
break;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Client Stream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
#ifdef __DEBUG_TEST
printf("%d, %s %s\n",frame->hd.stream_id, name, value);
#endif
resp = h2_stream->resp;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
}else{
if (field_id == TFE_HTTP_CONT_ENCODING){
resp->body.gzip = method_to_str_idx((const char *)value);
}
head->field.field_id = field_id;
head->field.field_name = (const char *)head->nv.name;
}
headers = &resp->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
h2_stream->frame_type |= TFE_NGHTTP2_PUSH_PROMISE;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
default:
break;
}
return 0;
}
static struct h2_stream_data_t*
create_upstream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct user_event_dispatch *event = NULL;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (h2_stream == NULL){
/** todo:When the data of the reply is pushed as promised,
there is no stream id at the reply end. to create it*/
goto finish;
}
if (h2_stream->resp){
goto finish;
}
h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->resp, event, NULL);
h2_stream->resp->frame_ctx = h2_stream->frame_ctx;
nghttp2_session_set_stream_user_data(session, stream_id, h2_stream);
finish:
return h2_stream;
}
static int
nghttp2_client_on_begin_headers(nghttp2_session * session,
const nghttp2_frame * frame,
void * user_data)
{
(void)session;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch(frame->hd.type){
case NGHTTP2_HEADERS:
create_upstream_data(session, frame->hd.stream_id, session_info);
break;
default:
break;
}
return 0;
}
static
void client_session_init(struct tfe_session_info_t *session_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks,
nghttp2_client_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_client_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks,
nghttp2_client_on_data_chunk_recv);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks,
nghttp2_client_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_client_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks,
nghttp2_client_on_begin_headers);
nghttp2_session_client_new(&session_info->as_client, callbacks, session_info);
session_info->as_client->local_window_size = NGHTTP2_MAX_WINDOW_SIZE;
session_info->as_client->local_settings.initial_window_size = NGHTTP2_MAX_WINDOW_SIZE;
nghttp2_session_callbacks_del(callbacks);
}
static ssize_t
nghttp2_server_send(nghttp2_session *session, const uint8_t *data,
size_t length, int flags, void *user_data)
{
(void)session;
(void)flags;
int ret = -1;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
ret = tfe_stream_write(session_info->tf_stream, CONN_DIR_DOWNSTREAM, data, length);
if (unlikely(ret < 0)){
assert(0);
}
return (ssize_t)length;
}
static int
nghttp2_server_on_frame_recv(nghttp2_session *session,
const nghttp2_frame *frame, void *user_data)
{
struct h2_stream_data_t *h2_stream = NULL;
struct http2_half_private *req = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
return 0;
}
switch(frame->hd.type){
case NGHTTP2_HEADERS:
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS){
req = h2_stream->req;
suspend_start(h2_stream, req, session_info->tf_stream);
fill_req_spec_from_handle(h2_stream->req);
req->event_cb(req, EV_HTTP_REQ_HDR, NULL, 0, req->event_cb_user);
if (h2_stream->spd_set){
h2_stream->spd_event = EV_HTTP_REQ_HDR;
}
}
case NGHTTP2_DATA:
/* Check that the client request has finished */
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
/* For DATA and HEADERS frame, this callback may be called after
on_stream_close_callback. Check that stream still alive. */
}
break;
case NGHTTP2_SETTINGS:
break;
case NGHTTP2_WINDOW_UPDATE:
break;
case NGHTTP2_GOAWAY:
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Control frame goaway");
break;
default:
break;
}
return 0;
}
/*
---------------------------------------------------------------------------------------------------------------------------------
|No errors | PROTOCOL_ERROR| INTERNAL_ERROR | FLOW_CONTROL_ERROR| SETTINGS_TIMEOUT | STREAM_CLOSED | FRAME_SIZE_ERROR |
---------------------------------------------------------------------------------------------------------------------------------
|0x00 | 0x01 | 0x02 | 0x03 | 0x04 | 0x05 | 0x06 |
---------------------------------------------------------------------------------------------------------------------------------
|REFUSED_STREAM| CANCEL | COMPRESSION_ERROR| CONNECT_ERROR | ENHANCE_YOUR_CALM| INADEQUATE_SECURITY| HTTP_1_1_REQUIRED|
---------------------------------------------------------------------------------------------------------------------------------
|0x07 | 0x08 | 0x09 | 0x0a | 0x0b | 0x0c | 0x0d |
---------------------------------------------------------------------------------------------------------------------------------
*/
static int
nghttp2_server_on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data)
{
(void)error_code;
struct h2_stream_data_t *h2_stream = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (!h2_stream) {
return 0;
}
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "downstream(as server) %d is close, error_code = %d", stream_id, error_code);
return 0;
}
static int
nghttp2_server_on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name,
size_t namelen, const uint8_t *value,
size_t valuelen, uint8_t flags, void *user_data)
{
enum tfe_http_std_field field_id;
struct header_data *head = NULL;
struct http2_headers *headers = NULL;
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
switch (frame->hd.type){
case NGHTTP2_HEADERS:
if (frame->headers.cat != NGHTTP2_HCAT_REQUEST)
break;
struct h2_stream_data_t *h2_stream = (struct h2_stream_data_t *)nghttp2_session_get_stream_user_data(session, frame->hd.stream_id);
if (!h2_stream){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Stream id %d, can't find stream information(addr = %p)",
frame->hd.stream_id, session_info);
break;
}
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = namelen;
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
head->nv.valuelen = valuelen;
head->nv.flags = flags;
field_id = (enum tfe_http_std_field)str_to_val((const char *)name, headers_vals);
if (field_id == -1){
head->field.field_id = TFE_HTTP_UNKNOWN_FIELD;
head->field.field_name = (const char *)head->nv.name;
}else{
head->field.field_id = field_id;
head->field.field_name = NULL;
}
headers = &h2_stream->req->headers;
headers->flag = frame->hd.flags;
headers_add_tail(headers, head);
h2_stream->frame_type |= TFE_NGHTTP2_HEADERS;
stream_set_id(&session_info->h2_id, frame->hd.stream_id);
break;
}
return 0;
}
static void
create_serv_stream_data(nghttp2_session *session, int32_t stream_id,
struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct user_event_dispatch *event = NULL;
struct http2_half_private *half_private = NULL;
h2_stream = TAILQ_LIST_FIND(session_info, stream_id);
if (h2_stream != NULL){
goto finish;
}
h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1);
assert(h2_stream);
memset(h2_stream, 0, sizeof(struct h2_stream_data_t));
h2_stream->stream_id = stream_id;
h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->req, event, NULL);
/* Call business plugin */
half_private = h2_stream->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream,
&h2_stream->tfe_session, session_info->thread_id);
h2_stream->frame_ctx = half_private->frame_ctx;
TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next);
nghttp2_session_set_stream_user_data(session, stream_id, h2_stream);
finish:
return;
}
static int
nghttp2_server_on_begin_headers(nghttp2_session *session,
const nghttp2_frame *frame,
void *user_data)
{
struct tfe_session_info_t *session_info = (struct tfe_session_info_t *)user_data;
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
return 0;
}
create_serv_stream_data(session, frame->hd.stream_id, session_info);
return 0;
}
static void
server_session_init(struct tfe_session_info_t *session_info)
{
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_send_callback(callbacks, nghttp2_server_send);
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
nghttp2_server_on_frame_recv);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, nghttp2_server_on_stream_close);
nghttp2_session_callbacks_set_on_header_callback(callbacks,
nghttp2_server_on_header);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, nghttp2_server_on_begin_headers);
nghttp2_session_server_new(&session_info->as_server, callbacks, session_info);
session_info->as_server->local_window_size = NGHTTP2_MAX_WINDOW_SIZE;
session_info->as_server->local_settings.initial_window_size = NGHTTP2_MAX_WINDOW_SIZE;
nghttp2_session_callbacks_del(callbacks);
}
static void
delete_server_session_data(struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream;
struct h2_stream_data_t *_h2_stream;
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
free(h2_stream);
h2_stream = NULL;
}
}
static void
delete_client_session_data(struct tfe_session_info_t *session_info)
{
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream;
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
free(h2_stream);
h2_stream = NULL;
}
}
static ssize_t
upstream_read_callback(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length,
uint32_t *data_flags,
nghttp2_data_source *source,
void *user_data)
{
#define TFE_MAX_PAYLOADLEN 8192
int datalen = 0;
struct data_t *body = (struct data_t *)source->ptr;
size_t inputlen = evbuffer_get_length(body->evbuf_body);
unsigned char *input = evbuffer_pullup(body->evbuf_body, -1);
if (input == NULL || inputlen == 0){
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
goto finish;
}
if (inputlen > TFE_MAX_PAYLOADLEN){
datalen = TFE_MAX_PAYLOADLEN;
}else{
datalen = inputlen;
}
memcpy(buf, input, datalen);
evbuffer_drain(body->evbuf_body, datalen);
finish:
return datalen;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_data(struct tfe_session_info_t *session_info, struct h2_stream_data_t **h2_stream,
uint16_t *nghttp2_type)
{
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct http2_half_private *resp = (*h2_stream)->resp;
if ((*h2_stream)->pangu_resp){
stream_action = ACTION_DROP_DATA;
*nghttp2_type |= TFE_NGHTTP2_RESPONSE;
}else{
stream_action = ACTION_FORWARD_DATA;
struct data_t *body = &resp->body;
if (body->flags == NGHTTP2_FLAG_END_STREAM &&
resp->message_state == MANAGE_STAGE_COMPLETE){
TFE_LOG_DEBUG(rt_log_data()->run_log_handle, "Remove stream id(%d), h2_stream addr %p",
(*h2_stream)->stream_id, *h2_stream);
TAILQ_REMOVE(&session_info->list, *h2_stream, next);
delete_http2_stream_data(*h2_stream, session_info->tf_stream, 1);
free(*h2_stream);
*h2_stream = NULL;
}
}
return stream_action;
}
static nghttp2_nv*
nghttp2_nv_packet(struct http2_headers *headers, nghttp2_nv *hdrs)
{
int nvlen = 0;
struct header_data *header = NULL;
foreach_headers(headers, header){
hdrs[nvlen].name = header->nv.name;
hdrs[nvlen].namelen = header->nv.namelen;
hdrs[nvlen].value = header->nv.value;
hdrs[nvlen].valuelen = header->nv.valuelen;
hdrs[nvlen].flags = header->nv.flags;
nvlen++;
}
return hdrs;
}
static void
downstream_create_resp(struct h2_stream_data_t *h2_stream, nghttp2_session *as_client,
const struct tfe_stream *tf_stream, unsigned int thread_id)
{
struct user_event_dispatch *event = NULL;
if (h2_stream->resp)
goto finish;
h2_stream->resp = tfe_half_private_init(TFE_HTTP_RESPONSE);
tfe_half_session_init(h2_stream, h2_stream->stream_id, TFE_HTTP_RESPONSE);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = thread_id;
event->tf_stream = tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->resp, event, NULL);
h2_stream->resp->frame_ctx = h2_stream->frame_ctx;
nghttp2_session_set_stream_user_data(as_client, h2_stream->stream_id, h2_stream);
finish:
return;
}
static void
tfe_make_nv(struct http2_headers *headers,
const char *name,const char *value, int flag)
{
/*Add head*/
struct header_data *head = NULL;
head = ALLOC(struct header_data, 1);
head->nv.name = (uint8_t *)tfe_strdup((const char *)name);
head->nv.namelen = strlen(name);
head->nv.value = (uint8_t *)tfe_strdup((const char *)value);;
head->nv.valuelen = strlen(value);
headers->flag = 0x04;
if (flag)
headers_add_head(headers, head);
else
headers_add_tail(headers, head);
}
static enum tfe_stream_action
tfe_submit_response(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct http2_half_private *resp = h2_stream->pangu_resp;
#define VALUE_LEN 128
char value[VALUE_LEN] = {0};
snprintf(value, VALUE_LEN, "%d", resp->method_or_status);
tfe_make_nv(&resp->headers, ":status", (const char *)value, 1);
snprintf(value, VALUE_LEN, "tfe/%s", tfe_version());
tfe_make_nv(&resp->headers, "X-TG-Construct-By", (const char *)value, 0);
h2_stream->frame_type |= TFE_NGHTTP2_RESPONSE;
stream_set_id(&session_info->h2_id, h2_stream->stream_id);
stream_action = nghttp2_server_mem_send(session_info);
if (stream_action == ACTION_DROP_DATA){
stream_action = (enum tfe_stream_action)ACTION_USER_DATA;
}
return stream_action;
}
static enum tfe_stream_action
nghttp2_client_frame_submit_header(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *req = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
if (0 == suspend_stop(h2_stream, session_info->tf_stream, CONN_DIR_DOWNSTREAM)){
stream_action = ACTION_DEFER_DATA;
goto finish;
}
req = h2_stream->pangu_req != NULL ? h2_stream->pangu_req : h2_stream->req;
if (req == NULL){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
if (h2_stream->pangu_resp){
stream_action = tfe_submit_response(session_info, h2_stream);
goto finish;
}
headers = &req->headers;
if (headers->nvlen <= 0){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
/*Create C' half_private_resp**/
downstream_create_resp(h2_stream, session_info->as_client, session_info->tf_stream, session_info->thread_id);
nghttp2_session_set_next_stream_id(session_info->as_client, h2_stream->stream_id);
stream_id = nghttp2_submit_request(session_info->as_client, NULL,
nghttp2_nv_packet(headers, hdrs),
headers->nvlen, NULL, h2_stream);
if (stream_id < 0){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Could not submit request: %s",
nghttp2_strerror(stream_id));
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
/*clear headers data ***/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static enum tfe_stream_action
nghttp2_client_frame_submit_settings()
{
return ACTION_FORWARD_DATA;
}
enum tfe_stream_action
nghttp2_client_frame_submit(struct tfe_session_info_t *session_info)
{
int i = 0;
struct h2_run_id *h2_id = &session_info->h2_id;
struct h2_stream_data_t *nghttp2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
for(i = 0; i < h2_id->num; i++){
nghttp2_stream = TAILQ_LIST_FIND(session_info, h2_id->id[i]);
if (NULL == nghttp2_stream)
break;
if (nghttp2_stream->frame_type & TFE_NGHTTP2_HEADERS){
stream_action = nghttp2_client_frame_submit_header(session_info, nghttp2_stream);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_HEADERS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_DATA){
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_DATA;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_SETTINGS){
stream_action = nghttp2_client_frame_submit_settings();
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_SETTINGS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_PUSH_PROMISE){
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_PUSH_PROMISE;
}
}
nghttp2_stream_disable_rid(&session_info->h2_id);
return stream_action;
}
enum tfe_stream_action
nghttp2_client_mem_send(struct tfe_session_info_t *session_info)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
stream_action = nghttp2_client_frame_submit(session_info);
if (stream_action == ACTION_DROP_DATA){
xret = nghttp2_session_send(session_info->as_client);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Fatal downstream send error: %s\n",
nghttp2_strerror(xret));
}
goto finish;
}
if (stream_action == ACTION_USER_DATA)
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_header(struct h2_stream_data_t *h2_stream,
uint16_t *nghttp2_type)
{
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
if (h2_stream->pangu_resp == NULL){
stream_action = ACTION_FORWARD_DATA;
goto finish;
}
if (h2_stream->pangu_resp != NULL){
*nghttp2_type |= TFE_NGHTTP2_RESPONSE;
stream_action = ACTION_DROP_DATA;
goto finish;
}
finish:
return stream_action;
}
static void
upstream_create_req(struct tfe_session_info_t *session_info, nghttp2_session *as_server,
struct h2_stream_data_t *h2_stream, int32_t stream_id)
{
struct user_event_dispatch *event = NULL;
struct http2_half_private *half_private = NULL;
h2_stream->stream_id = stream_id;
h2_stream->req = tfe_half_private_init(TFE_HTTP_REQUEST);
tfe_half_session_init(h2_stream, stream_id, TFE_HTTP_REQUEST);
event = ALLOC(struct user_event_dispatch, 1);
assert(event);
event->thread_id = session_info->thread_id;
event->tf_stream = session_info->tf_stream;
event->tfe_session = &h2_stream->tfe_session;
half_set_callback(h2_stream->req, event, NULL);
/* Call business plugin */
half_private = h2_stream->req;
half_private->frame_ctx = http_frame_alloc();
if (half_private->frame_ctx == NULL){
TFE_STREAM_LOG_ERROR(h2_stream, "Failed at raising session begin event. ");
goto finish;
}
http_frame_raise_session_begin(half_private->frame_ctx, session_info->tf_stream,
&h2_stream->tfe_session, session_info->thread_id);
h2_stream->frame_ctx = half_private->frame_ctx;
TAILQ_INSERT_TAIL(&session_info->list, h2_stream, next);
nghttp2_session_set_stream_user_data(as_server, stream_id, h2_stream);
finish:
return;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_push_promise(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int32_t stream_id = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
struct http2_half_private *resp = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
resp = h2_stream->resp;
if (resp == NULL)
goto finish;
headers = &resp->headers;
if (headers->nvlen <= 0)
goto finish;
/* Create s' half req*/
_h2_stream = (struct h2_stream_data_t *)ALLOC(struct h2_stream_data_t, 1);
assert(_h2_stream);
memset(_h2_stream, 0, sizeof(struct h2_stream_data_t));
stream_id = nghttp2_submit_push_promise(session_info->as_server, headers->flag,
h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, _h2_stream);
if (stream_id < 0){
free(_h2_stream);
_h2_stream = NULL;
TFE_STREAM_LOG_ERROR(h2_stream, "Failed to submit push promise: %s", nghttp2_strerror(stream_id));
goto finish;
}
upstream_create_req(session_info, session_info->as_server, _h2_stream, stream_id);
/*clean header message **/
delete_nv_packet_data(headers);
stream_action = ACTION_DROP_DATA;
finish:
return stream_action;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_settings()
{
return ACTION_FORWARD_DATA;
}
static enum tfe_stream_action
nghttp2_server_frame_submit_response(struct tfe_session_info_t *session_info,
struct h2_stream_data_t *h2_stream)
{
int rv = -1;
nghttp2_nv hdrs[128] = {0};
struct http2_headers *headers = NULL;
struct http2_half_private *resp = NULL;
resp = h2_stream->pangu_resp;
if (resp == NULL)
return ACTION_FORWARD_DATA;
if (resp->message_state != MANAGE_STAGE_COMPLETE){
session_info->state = 0;
return ACTION_DROP_DATA;
}
headers = &resp->headers;
if (headers->nvlen <= 0)
return ACTION_FORWARD_DATA;
struct data_t *body = &resp->body;
char str_sz_evbuf_body[TFE_STRING_MAX];
snprintf(str_sz_evbuf_body, sizeof(str_sz_evbuf_body) - 1, "%lu", evbuffer_get_length(body->evbuf_body));
const static struct http_field_name __cont_encoding_length_name = {TFE_HTTP_CONT_LENGTH, NULL};
tfe_http_field_write(&resp->half_public, &__cont_encoding_length_name, str_sz_evbuf_body);
nghttp2_data_provider data_prd;
data_prd.source.ptr = (void *)body;
data_prd.read_callback = upstream_read_callback;
rv = nghttp2_submit_response(session_info->as_server, h2_stream->stream_id, nghttp2_nv_packet(headers, hdrs),
headers->nvlen, &data_prd);
if (rv != 0){
return ACTION_FORWARD_DATA;
}
delete_nv_packet_data(headers);
return ACTION_DROP_DATA;
}
enum tfe_stream_action
nghttp2_server_frame_submit(struct tfe_session_info_t *session_info)
{
int i = 0;
struct h2_stream_data_t *nghttp2_stream = NULL;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
struct h2_run_id *h2_id = &session_info->h2_id;
for (i = 0; i < h2_id->num; i++){
nghttp2_stream = TAILQ_LIST_FIND(session_info, h2_id->id[i]);
if (NULL == nghttp2_stream)
break;
if (nghttp2_stream->frame_type & TFE_NGHTTP2_HEADERS){
stream_action = nghttp2_server_frame_submit_header(nghttp2_stream, &nghttp2_stream->frame_type);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_HEADERS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_DATA){
stream_action = nghttp2_server_frame_submit_data(session_info, &nghttp2_stream, &nghttp2_stream->frame_type);
if (nghttp2_stream)
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_DATA;
else
continue;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_SETTINGS){
stream_action = nghttp2_server_frame_submit_settings();
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_SETTINGS;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_PUSH_PROMISE){
stream_action = nghttp2_server_frame_submit_push_promise(session_info, nghttp2_stream);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_PUSH_PROMISE;
}
if (nghttp2_stream->frame_type & TFE_NGHTTP2_RESPONSE){
stream_action = nghttp2_server_frame_submit_response(session_info, nghttp2_stream);
nghttp2_stream->frame_type &= ~TFE_NGHTTP2_RESPONSE;
}
}
nghttp2_stream_disable_rid(&session_info->h2_id);
return stream_action;
}
enum tfe_stream_action
nghttp2_server_mem_send(struct tfe_session_info_t *session_info)
{
int xret = -1;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
stream_action = nghttp2_server_frame_submit(session_info);
if (stream_action == ACTION_DROP_DATA
&& session_info->state){
xret = nghttp2_session_send(session_info->as_server);
if (xret != 0) {
stream_action = ACTION_FORWARD_DATA;
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Fatal upstream send error: %s\n",
nghttp2_strerror(xret));
}
}
session_info->state = 1;
return stream_action;
}
enum tfe_stream_action
detect_up_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
if (len >= 9 && !memcmp(data, ackheader, 9)){
len = 9;
goto finish;
}
if (len > 9 && !memcmp((data + len - 9), ackheader, 9)){
len = len - 9;
}
readlen = nghttp2_session_mem_recv(session_info->as_client, data, len);
if (readlen < 0){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed to process server requests. Link message %s",
tfe_stream->str_stream_info);
delete_client_session_data(session_info);
goto err;
}
stream_action = nghttp2_server_mem_send(session_info);
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
err:
tfe_stream_detach(tfe_stream);
finish:
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
enum tfe_stream_action
detect_down_stream_protocol(struct tfe_session_info_t *session_info, const struct tfe_stream *tfe_stream,
unsigned int thread_id, const unsigned char *data, size_t len)
{
int readlen = 0;
enum tfe_stream_action stream_action = ACTION_FORWARD_DATA;
session_info->tf_stream = tfe_stream;
session_info->thread_id = thread_id;
if (len >= 9 && !memcmp(data, ackheader, 9)){
/** todo: confirmation frames are automatically replied by the local agent, Immediately forward */
len = 9;
goto finish;
}
if (len > 9 && !memcmp((data + len - 9), ackheader, 9)){
len = len - 9;
}
readlen = nghttp2_session_mem_recv(session_info->as_server, data, len);
if (readlen < 0){
TFE_LOG_ERROR(rt_log_data()->run_log_handle, "Failed to process client requests. Link message %s",
tfe_stream->str_stream_info);
delete_server_session_data(session_info);
goto err;
}
stream_action = nghttp2_client_mem_send(session_info);
if (stream_action == ACTION_FORWARD_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_FOWARD_BYTES, &len, sizeof(len));
return ACTION_FORWARD_DATA;
}
if (stream_action == ACTION_DROP_DATA){
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
err:
tfe_stream_detach(tfe_stream);
finish:
tfe_stream_action_set_opt(tfe_stream, ACTION_OPT_DROP_BYTES, &len, sizeof(len));
return ACTION_DROP_DATA;
}
void
sess_data_ctx_fini(struct tfe_session_info_t *session_info,
const struct tfe_stream * stream, unsigned int thread_id)
{
struct h2_stream_data_t *h2_stream = NULL;
struct h2_stream_data_t *_h2_stream = NULL;
TAILQ_FOREACH_SAFE(h2_stream, &session_info->list, next, _h2_stream){
TAILQ_REMOVE(&session_info->list, h2_stream, next);
if (h2_stream->frame_ctx){
http_frame_raise_session_end(h2_stream->frame_ctx, stream, &h2_stream->tfe_session,
thread_id);
h2_stream->frame_ctx = NULL;
}
delete_http2_stream_data(h2_stream, session_info->tf_stream, 1);
free(h2_stream);
h2_stream = NULL;
}
if (session_info->as_client){
nghttp2_session_del(session_info->as_client);
session_info->as_client = NULL;
}
if (session_info->as_server){
nghttp2_session_del(session_info->as_server);
session_info->as_server = NULL;
}
}
struct tfe_session_info_t* tfe_session_info_init()
{
nghttp2_session *session = NULL;
nghttp2_inbound_frame *iframe = NULL;
struct tfe_session_info_t *session_info = NULL;
session_info = ALLOC(struct tfe_session_info_t, 1);
assert(session_info);
memset(session_info, 0, sizeof(struct tfe_session_info_t));
session_info->state = 1;
server_session_init(session_info);
client_session_init(session_info);
session = session_info->as_client;
iframe = &(session->iframe);
iframe->state = NGHTTP2_IB_READ_FIRST_SETTINGS;
nghttp2_bufs_reset(&(session->aob.framebufs));
TAILQ_INIT(&session_info->list);
return session_info;
}