初步调通HTTP请求头部内容替换业务

This commit is contained in:
Lu Qiuwen
2018-09-25 20:32:24 +08:00
parent d2e4ce94c2
commit d0ab629f4c
8 changed files with 297 additions and 166 deletions

View File

@@ -391,7 +391,8 @@ struct http_frame_plugin_status
{ {
void * pme; void * pme;
unsigned int detached; unsigned int detached;
unsigned int preempt; unsigned int is_preempted;
unsigned int preempt_by;
}; };
struct http_frame_session_ctx * http_frame_raise_session_begin(const tfe_stream * stream, struct http_frame_session_ctx * http_frame_raise_session_begin(const tfe_stream * stream,

View File

@@ -236,10 +236,11 @@ int http_frame_currect_plugin_preempt(struct http_frame_session_ctx * ht_frame)
for(unsigned int i = 0; i < ht_frame->nr_plugin_status; i++) for(unsigned int i = 0; i < ht_frame->nr_plugin_status; i++)
{ {
struct http_frame_plugin_status * __plugin_status = &ht_frame->plugin_status[i]; struct http_frame_plugin_status * __plugin_status = &ht_frame->plugin_status[i];
if (__plugin_status->preempt) return -1; if (__plugin_status->is_preempted) return -1;
} }
assert(ht_frame->calling_plugin_status != NULL); //TODO:
ht_frame->calling_plugin_status->preempt = 1; //assert(ht_frame->calling_plugin_status != NULL);
//ht_frame->calling_plugin_status->is_preempted = 1;
return 0; return 0;
} }

View File

@@ -555,7 +555,6 @@ static void ssl_async_peek_client_hello(struct future * future, evutil_socket_t
struct peek_client_hello_ctx * ctx = ALLOC(struct peek_client_hello_ctx, 1); struct peek_client_hello_ctx * ctx = ALLOC(struct peek_client_hello_ctx, 1);
ctx->ev = event_new(evbase, fd, EV_READ, peek_client_hello_cb, p); ctx->ev = event_new(evbase, fd, EV_READ, peek_client_hello_cb, p);
ctx->logger = logger; ctx->logger = logger;
event_add(ctx->ev, NULL); event_add(ctx->ev, NULL);
promise_set_ctx(p, (void *) ctx, peek_client_hello_ctx_free_cb); promise_set_ctx(p, (void *) ctx, peek_client_hello_ctx_free_cb);
return; return;

View File

@@ -661,7 +661,6 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user)
void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{ {
return; return;
assert(0);
} }
struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx)
@@ -899,7 +898,6 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
} }
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
if (_stream->session_type == STREAM_PROTO_PLAIN) if (_stream->session_type == STREAM_PROTO_PLAIN)
{ {
_stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream);

View File

@@ -242,7 +242,6 @@ struct replace_ctx
size_t n_rule; size_t n_rule;
struct tfe_http_half * replacing; struct tfe_http_half * replacing;
struct evbuffer * http_body; struct evbuffer * http_body;
size_t body_size;
}; };
struct pangu_http_ctx struct pangu_http_ctx
@@ -270,19 +269,24 @@ static struct pangu_http_ctx * pangu_http_ctx_new(unsigned int thread_id)
static void pangu_http_ctx_free(struct pangu_http_ctx * ctx) static void pangu_http_ctx_free(struct pangu_http_ctx * ctx)
{ {
if (ctx->rep_ctx != NULL) if (ctx->rep_ctx == NULL)
{ return;
for (size_t i = 0; i < ctx->rep_ctx->n_rule; i++) for (size_t i = 0; i < ctx->rep_ctx->n_rule; i++)
{ {
FREE(&(ctx->rep_ctx->rule[i].find)); FREE(&(ctx->rep_ctx->rule[i].find));
FREE(&(ctx->rep_ctx->rule[i].replace_with)); FREE(&(ctx->rep_ctx->rule[i].replace_with));
} }
if (ctx->rep_ctx->http_body)
{
evbuffer_free(ctx->rep_ctx->http_body); evbuffer_free(ctx->rep_ctx->http_body);
ctx->rep_ctx->http_body = NULL; ctx->rep_ctx->http_body = NULL;
}
//todo destroy http_half; //todo destroy http_half;
assert(ctx->rep_ctx->replacing == NULL); assert(ctx->rep_ctx->replacing == NULL);
FREE(&ctx->rep_ctx); FREE(&ctx->rep_ctx);
}
FREE(&ctx->enforce_rules); FREE(&ctx->enforce_rules);
FREE(&ctx->enforce_para); FREE(&ctx->enforce_para);
Maat_clean_status(&(ctx->mid)); Maat_clean_status(&(ctx->mid));
@@ -470,18 +474,19 @@ static char * strtok_r_esc(char * s, const char delim, char ** save_ptr)
size_t format_replace_rule(const char * exec_para, struct replace_rule * replace, size_t n_replace) size_t format_replace_rule(const char * exec_para, struct replace_rule * replace, size_t n_replace)
{ {
char * tmp = ALLOC(char, strlen(exec_para) + 1); char * tmp = ALLOC(char, strlen(exec_para) + 1);
char * token = NULL, * sub_token = NULL, * saveptr = NULL, * saveptr2 = NULL; char * token = NULL, * sub_token = NULL, * saveptr = NULL, * saveptr2 = NULL;
size_t idx = 0; size_t idx = 0;
const char * str_zone = "replace=";
const char * str_zone = "zone=";
const char * str_subs = "substitute="; const char * str_subs = "substitute=";
memcpy(tmp, exec_para, strlen(exec_para)); memcpy(tmp, exec_para, strlen(exec_para));
for (token = tmp;; token = NULL) for (token = tmp;; token = NULL)
{ {
sub_token = strtok_r(token, ";", &saveptr); sub_token = strtok_r(token, ";", &saveptr);
if (sub_token == NULL) if (sub_token == NULL) break;
break;
if (0 == strncasecmp(sub_token, str_zone, strlen(str_zone))) if (0 == strncasecmp(sub_token, str_zone, strlen(str_zone)))
{ {
replace[idx].zone = zone_name_to_id(sub_token + strlen(str_zone)); replace[idx].zone = zone_name_to_id(sub_token + strlen(str_zone));
@@ -490,11 +495,14 @@ size_t format_replace_rule(const char * exec_para, struct replace_rule * replace
break; break;
} }
} }
sub_token = strtok_r(NULL, ";", &saveptr);
if (0 == strncasecmp(sub_token, str_subs, strlen(str_subs))) if (0 == strncasecmp(sub_token, str_subs, strlen(str_subs)))
{ {
sub_token += strlen(str_subs); sub_token += strlen(str_subs) + 1;
replace[idx].find = tfe_strdup(strtok_r_esc(sub_token, '/', &saveptr2)); replace[idx].find = tfe_strdup(strtok_r_esc(sub_token, '/', &saveptr2));
replace[idx].replace_with = tfe_strdup(strtok_r_esc(NULL, '/', &saveptr2)); replace[idx].replace_with = tfe_strdup(strtok_r_esc(NULL, '/', &saveptr2));
idx++; idx++;
if (idx == n_replace) if (idx == n_replace)
{ {
@@ -502,10 +510,12 @@ size_t format_replace_rule(const char * exec_para, struct replace_rule * replace
} }
} }
} }
free(tmp); free(tmp);
tmp = NULL; tmp = NULL;
return idx; return idx;
} }
size_t select_replace_rule(enum replace_zone zone, const struct replace_rule * replace, size_t n_replace, size_t select_replace_rule(enum replace_zone zone, const struct replace_rule * replace, size_t n_replace,
const struct replace_rule ** selected, size_t n_selected) const struct replace_rule ** selected, size_t n_selected)
{ {
@@ -520,6 +530,7 @@ size_t select_replace_rule(enum replace_zone zone, const struct replace_rule * r
} }
return j; return j;
} }
static struct evbuffer * replace_string(const char * in, const struct replace_rule * zone) static struct evbuffer * replace_string(const char * in, const struct replace_rule * zone)
{ {
//Reference to https://www.lemoda.net/c/unix-regex/ //Reference to https://www.lemoda.net/c/unix-regex/
@@ -531,6 +542,7 @@ static struct evbuffer * replace_string(const char * in, const struct replace_ru
size_t replace_len = strlen(zone->replace_with); size_t replace_len = strlen(zone->replace_with);
assert(strlen(zone->find) != 0);
status = regcomp(&reg, zone->find, REG_EXTENDED | REG_NEWLINE); status = regcomp(&reg, zone->find, REG_EXTENDED | REG_NEWLINE);
if (status != 0) if (status != 0)
{ {
@@ -588,6 +600,7 @@ static struct evbuffer * replace_string(const char * in, const struct replace_ru
} }
p += m[0].rm_eo; p += m[0].rm_eo;
} }
if (is_replaced) if (is_replaced)
{ {
evbuffer_add(out, pre_sub_expr_end, in_sz - (pre_sub_expr_end - p)); evbuffer_add(out, pre_sub_expr_end, in_sz - (pre_sub_expr_end - p));
@@ -595,8 +608,8 @@ static struct evbuffer * replace_string(const char * in, const struct replace_ru
regfree(&reg); regfree(&reg);
return out; return out;
} }
struct evbuffer * execute_replace_rule(const char * in, size_t in_sz, struct evbuffer * execute_replace_rule(const char * in, size_t in_sz,
enum replace_zone zone, const struct replace_rule * rules, size_t n_rule) enum replace_zone zone, const struct replace_rule * rules, size_t n_rule)
{ {
@@ -618,127 +631,176 @@ struct evbuffer * execute_replace_rule(const char * in, size_t in_sz,
interator = in; interator = in;
for (i = 0; i < n_todo; i++) for (i = 0; i < n_todo; i++)
{ {
new_out = replace_string(interator, todo[i]); new_out = replace_string(interator, todo[i]);
if (new_out != NULL) if (new_out != NULL)
{ {
pre_out = out; pre_out = out;
out = new_out; out = new_out;
interator = (char *) evbuffer_pullup(out, -1); interator = (char *) evbuffer_pullup(out, -1);
if (pre_out != NULL)
{
evbuffer_free(pre_out); evbuffer_free(pre_out);
pre_out = NULL; pre_out = NULL;
} }
} }
}
return out; return out;
} }
void http_replace(const struct tfe_stream * stream, const struct tfe_http_session * session, void http_replace(const struct tfe_stream * stream, const struct tfe_http_session * session,
enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, struct pangu_http_ctx * ctx) enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, struct pangu_http_ctx * ctx)
{ {
void * interator = NULL;
struct http_field_name tmp_name;
const char * buff_in = NULL;
struct evbuffer * rewrite_url = NULL, * rewrite_buff = NULL;
struct replace_ctx * rep_ctx = NULL;
struct tfe_http_session * to_write_sess = NULL; struct tfe_http_session * to_write_sess = NULL;
to_write_sess = tfe_http_session_allow_write(session); to_write_sess = tfe_http_session_allow_write(session);
if (to_write_sess == NULL) //fail to wirte, abandon. if (to_write_sess == NULL) //fail to wirte, abandon.
{ {
TFE_STREAM_LOG_INFO(stream, "tfe_http_session_allow_write() %s failed.", session->req->req_spec.uri); TFE_STREAM_LOG_INFO(stream, "tfe_http_session_allow_write() %s failed.", session->req->req_spec.uri);
tfe_http_session_detach(session); tfe_http_session_detach(session); return;
return;
} }
struct replace_ctx * rep_ctx = ctx->rep_ctx;
if (ctx->rep_ctx == NULL) if (ctx->rep_ctx == NULL)
{
/* we must determinate the replace action on HTTP header, otherwise,
* the header has been forwarded, only replace the body but not modify header will raise exception */
if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR))
{ {
ctx->rep_ctx = rep_ctx = ALLOC(struct replace_ctx, 1); ctx->rep_ctx = rep_ctx = ALLOC(struct replace_ctx, 1);
rep_ctx->rule = ALLOC(struct replace_rule, MAX_EDIT_ZONE_NUM); rep_ctx->rule = ALLOC(struct replace_rule, MAX_EDIT_ZONE_NUM);
rep_ctx->n_rule = format_replace_rule(ctx->enforce_para, rep_ctx->rule, MAX_EDIT_ZONE_NUM); rep_ctx->n_rule = format_replace_rule(ctx->enforce_para, rep_ctx->rule, MAX_EDIT_ZONE_NUM);
} }
if (events & EV_HTTP_REQ_HDR)
{
rewrite_url = execute_replace_rule(session->req->req_spec.uri, strlen(session->req->req_spec.uri),
kZoneRequestUri, rep_ctx->rule, rep_ctx->n_rule);
}
if ((events & EV_HTTP_REQ_HDR) | (events & EV_HTTP_RESP_HDR))
{
if (events & EV_HTTP_REQ_HDR)
{
rep_ctx->replacing = tfe_http_session_request_create(to_write_sess, session->req->req_spec.method,
rewrite_url != NULL ? (char *) evbuffer_pullup(rewrite_url, -1) : session->req->req_spec.uri);
evbuffer_free(rewrite_url);
rewrite_url = NULL;
}
else else
{ {
rep_ctx->replacing = tfe_http_session_response_create(to_write_sess, session->resp->resp_spec.resp_code); TFE_STREAM_LOG_INFO(stream, "Can only setup replace on REQ/RESP headers, detached.");
tfe_http_session_detach(session); return;
} }
while (1)
{
buff_in = tfe_http_field_iterate(session->req, &interator, &tmp_name);
if (tmp_name.field_id == TFE_HTTP_CONT_LENGTH)
{
continue;
} }
if (buff_in != NULL)
{
rewrite_buff = execute_replace_rule(buff_in, strlen(buff_in),
events & EV_HTTP_REQ_HDR ? kZoneRequestHeaders : kZoneResponseHeader, rep_ctx->rule,
rep_ctx->n_rule);
tfe_http_field_write(rep_ctx->replacing, &tmp_name,
rewrite_buff != NULL ? (char *) evbuffer_pullup(rewrite_buff, -1) : buff_in);
evbuffer_free(rewrite_buff);
rewrite_buff = NULL;
} struct tfe_http_half * in_req_half = session->req;
else struct tfe_http_half * in_resp_half = session->resp;
{ struct tfe_http_req_spec * in_req_spec = &in_req_half->req_spec;
break; struct tfe_http_resp_spec * in_resp_spec = &in_resp_half->resp_spec;
}
}
}
if ((events & EV_HTTP_REQ_BODY_BEGIN) | (events & EV_HTTP_RESP_BODY_BEGIN))
{
assert(rep_ctx->http_body == NULL);
assert(rep_ctx->body_size = 0);
rep_ctx->http_body = evbuffer_new();
}
if (body_frag != NULL)
{
evbuffer_add(rep_ctx->http_body, body_frag, frag_size);
rep_ctx->body_size++;
}
if ((events & EV_HTTP_REQ_BODY_END) | (events & EV_HTTP_RESP_BODY_END))
{
assert(rep_ctx->body_size == evbuffer_get_length(rep_ctx->http_body)); if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR))
buff_in = (char *) evbuffer_pullup(rep_ctx->http_body, -1); {
rewrite_buff = execute_replace_rule(buff_in, rep_ctx->body_size, struct evbuffer * rewrite_uri = NULL;
events & EV_HTTP_REQ_HDR ? kZoneRequestHeaders : kZoneResponseHeader, rep_ctx->rule, rep_ctx->n_rule);
char cont_len_str[TFE_SYMBOL_MAX];
snprintf(cont_len_str, sizeof(cont_len_str), "%lu", evbuffer_get_length(rewrite_buff));
_wrap_std_field_write(rep_ctx->replacing, TFE_HTTP_CONT_LENGTH, cont_len_str);
tfe_http_half_append_body(rep_ctx->replacing,
(char *) evbuffer_pullup(rewrite_buff, -1), evbuffer_get_length(rewrite_buff), 0);
evbuffer_free(rewrite_buff);
rewrite_buff = NULL;
if (is_http_request(events)) if (is_http_request(events))
{ {
rewrite_uri = execute_replace_rule(in_req_spec->uri, strlen(in_req_spec->uri),
kZoneRequestUri, rep_ctx->rule, rep_ctx->n_rule);
rep_ctx->replacing = tfe_http_session_request_create(to_write_sess, in_req_spec->method,
rewrite_uri != NULL ? (char *) evbuffer_pullup(rewrite_uri, -1) : in_req_spec->uri);
tfe_http_session_request_set(to_write_sess, rep_ctx->replacing); tfe_http_session_request_set(to_write_sess, rep_ctx->replacing);
} }
else else
{ {
rep_ctx->replacing = tfe_http_session_response_create(to_write_sess, in_resp_spec->resp_code);
tfe_http_session_response_set(to_write_sess, rep_ctx->replacing); tfe_http_session_response_set(to_write_sess, rep_ctx->replacing);
} }
rep_ctx->replacing = NULL;//http half's ownership has been transfered to session.
if (rewrite_uri != NULL)
{
evbuffer_free(rewrite_uri);
rewrite_uri = NULL;
}
enum replace_zone zone = is_http_request(events) ? kZoneRequestHeaders : kZoneResponseHeader;
struct tfe_http_half * in_half = is_http_request(events) ? in_req_half : in_resp_half;
struct http_field_name in_header_field{};
const char * in_header_value = NULL;
void * iterator = NULL;
while (true)
{
if ((in_header_value = tfe_http_field_iterate(in_half, &iterator, &in_header_field)) == NULL)
{
break;
}
struct evbuffer * rewrite_buff = execute_replace_rule(in_header_value,
strlen(in_header_value), zone, rep_ctx->rule, rep_ctx->n_rule);
if (rewrite_buff != NULL)
{
tfe_http_field_write(rep_ctx->replacing, &in_header_field, (char *) evbuffer_pullup(rewrite_buff, -1));
}
else
{
tfe_http_field_write(rep_ctx->replacing, &in_header_field, in_header_value);
}
if (rewrite_buff != NULL)
{
evbuffer_free(rewrite_buff);
}
}
}
if ((events & EV_HTTP_REQ_BODY_BEGIN) || (events & EV_HTTP_RESP_BODY_BEGIN))
{
assert(rep_ctx->http_body == NULL);
rep_ctx->http_body = evbuffer_new();
}
if ((events & EV_HTTP_REQ_BODY_CONT) || (events & EV_HTTP_RESP_BODY_CONT))
{
evbuffer_add(rep_ctx->http_body, body_frag, frag_size);
}
if ((events & EV_HTTP_REQ_BODY_END) || (events & EV_HTTP_RESP_BODY_END))
{
char * __http_body = (char *) evbuffer_pullup(rep_ctx->http_body, -1);
size_t __http_body_len = evbuffer_get_length(rep_ctx->http_body);
enum replace_zone replace_zone = is_http_request(events) ? kZoneRequestHeaders : kZoneResponseHeader;
struct evbuffer * rewrite_buff;
if (is_http_request(events))
{
rewrite_buff = execute_replace_rule(__http_body, __http_body_len, kZoneRequestBody,
rep_ctx->rule, rep_ctx->n_rule);
}
else
{
rewrite_buff = execute_replace_rule(__http_body, __http_body_len, kZoneResponseBody,
rep_ctx->rule, rep_ctx->n_rule);
}
if (rewrite_buff != NULL)
{
char * __rewrite_buff = (char *) evbuffer_pullup(rewrite_buff, -1);
size_t __sz_rewrite_buff = evbuffer_get_length(rewrite_buff);
tfe_http_half_append_body(rep_ctx->replacing, __rewrite_buff, __sz_rewrite_buff, 0);
}
else
{
tfe_http_half_append_body(rep_ctx->replacing, __http_body, __http_body_len, 0);
}
if (rewrite_buff != NULL)
{
evbuffer_free(rewrite_buff);
rewrite_buff = NULL;
}
if (rep_ctx->http_body != NULL)
{
evbuffer_free(rep_ctx->http_body); evbuffer_free(rep_ctx->http_body);
rep_ctx->http_body = NULL; rep_ctx->http_body = NULL;
rep_ctx->body_size = 0;
} }
return; }
if ((events & EV_HTTP_REQ_END) || (events & EV_HTTP_RESP_END))
{
tfe_http_half_append_body(rep_ctx->replacing, NULL, 0, 0);
rep_ctx->replacing = NULL;
}
} }
static void http_reject(const struct tfe_http_session * session, enum tfe_http_event events, static void http_reject(const struct tfe_http_session * session, enum tfe_http_event events,
struct pangu_http_ctx * ctx) struct pangu_http_ctx * ctx)
{ {
@@ -766,6 +828,7 @@ static void http_reject(const struct tfe_http_session * session, enum tfe_http_e
_wrap_std_field_write(response, TFE_HTTP_CONT_LENGTH, cont_len_str); _wrap_std_field_write(response, TFE_HTTP_CONT_LENGTH, cont_len_str);
tfe_http_half_append_body(response, page_buff, page_size, 0); tfe_http_half_append_body(response, page_buff, page_size, 0);
tfe_http_half_append_body(response, NULL, 0, 0);
tfe_http_session_response_set(to_write_sess, response); tfe_http_session_response_set(to_write_sess, response);
tfe_http_session_detach(session); tfe_http_session_detach(session);
@@ -800,6 +863,7 @@ static void http_redirect(const struct tfe_http_session * session, enum tfe_http
response = tfe_http_session_response_create(to_write, resp_code); response = tfe_http_session_response_create(to_write, resp_code);
_wrap_std_field_write(response, TFE_HTTP_LOCATION, url); _wrap_std_field_write(response, TFE_HTTP_LOCATION, url);
tfe_http_half_append_body(response, NULL, 0, 0);
tfe_http_session_response_set(to_write, response); tfe_http_session_response_set(to_write, response);
tfe_http_session_detach(session); tfe_http_session_detach(session);

View File

@@ -67,6 +67,7 @@ struct http_half_private
/* default stream action */ /* default stream action */
enum tfe_stream_action stream_action; enum tfe_stream_action stream_action;
enum tfe_stream_action user_stream_action; enum tfe_stream_action user_stream_action;
bool is_user_stream_action_set;
/* Setup by User */ /* Setup by User */
bool is_setup_by_user; bool is_setup_by_user;

View File

@@ -56,7 +56,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len)
{ {
struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list);
struct http_half_private * hf_private_request = to_hf_request_private(hs_private); struct http_half_private * hf_private_req_in = to_hf_request_private(hs_private);
struct http_half_private * hf_private_req_user;
/* tfe_hexdump(stderr, __FUNCTION__, data, (unsigned int)len); */ /* tfe_hexdump(stderr, __FUNCTION__, data, (unsigned int)len); */
int ret = 0; int ret = 0;
@@ -65,12 +66,12 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
/* There is no available in session list, /* There is no available in session list,
* that indicate all HTTP request has corresponding response, * that indicate all HTTP request has corresponding response,
* or the last request is finished, we need to create a new session. */ * or the last request is finished, we need to create a new session. */
if (hs_private == NULL || hf_private_request->message_status == STATUS_COMPLETE) if (hs_private == NULL || hf_private_req_in->message_status == STATUS_COMPLETE)
{ {
/* HTTP Request and Session */ /* HTTP Request and Session */
hf_private_request = hf_private_create(TFE_HTTP_REQUEST, 1, 0); hf_private_req_in = hf_private_create(TFE_HTTP_REQUEST, 1, 0);
hs_private = hs_private_create(hc_private, hf_private_request, NULL); hs_private = hs_private_create(hc_private, hf_private_req_in, NULL);
hf_private_set_session(hf_private_request, hs_private); hf_private_set_session(hf_private_req_in, hs_private);
/* Closure, catch stream, session and thread_id */ /* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1);
@@ -79,7 +80,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
__closure->session = to_hs_public(hs_private); __closure->session = to_hs_public(hs_private);
/* Set callback, this callback used to raise business event */ /* Set callback, this callback used to raise business event */
hf_private_set_callback(hf_private_request, __user_event_dispatch, __closure, free); hf_private_set_callback(hf_private_req_in, __user_event_dispatch, __closure, free);
/* Call business plugin */ /* Call business plugin */
hs_private->ht_frame = http_frame_raise_session_begin(stream, &hs_private->hs_public, thread_id); hs_private->ht_frame = http_frame_raise_session_begin(stream, &hs_private->hs_public, thread_id);
@@ -93,41 +94,57 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
} }
/* Parse the content, the data which in defered state has been ignored. */ /* Parse the content, the data which in defered state has been ignored. */
ret = hf_private_parse(hf_private_request, data, len); ret = hf_private_parse(hf_private_req_in, data, len);
/* Need more data, no boundary touched */ /* Need more data, no boundary touched */
if (ret == 0) if (ret == 0)
{ {
if (hf_private_request->stream_action == ACTION_DROP_DATA || if (hf_private_req_in->stream_action == ACTION_DROP_DATA ||
hf_private_request->stream_action == ACTION_FORWARD_DATA) hf_private_req_in->stream_action == ACTION_FORWARD_DATA)
{ {
hf_private_request->parse_cursor = 0; hf_private_req_in->parse_cursor = 0;
} }
return hf_private_request->stream_action; return hf_private_req_in->stream_action;
} }
/* Some kind of error happened, write log and detach the stream */ /* Some kind of error happened, write log and detach the stream */
if (ret == -1) if (ret == -1)
{ {
TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s", TFE_STREAM_LOG_ERROR(stream, "Failed at parsing stream as HTTP: %u, %s, %s",
hf_private_request->parse_errno, http_errno_name(hf_private_request->parse_errno), hf_private_req_in->parse_errno, http_errno_name(hf_private_req_in->parse_errno),
http_errno_description(hf_private_request->parse_errno)); http_errno_description(hf_private_req_in->parse_errno));
goto __errout; goto __errout;
} }
/* Touch a boundary, such as the end of HTTP headers, bodys, et al. */ /* Touch a boundary, such as the end of HTTP headers, bodys, et al. */
__action_byptes = hf_private_request->parse_cursor; __action_byptes = hf_private_req_in->parse_cursor;
hf_private_request->parse_cursor = 0; hf_private_req_in->parse_cursor = 0;
if (hf_private_request->stream_action == ACTION_FORWARD_DATA) hf_private_req_user = hs_private->hf_private_req_user;
if (hf_private_req_user != NULL && hf_private_req_user->message_status == STATUS_COMPLETE)
{
/* Construct, and write response immediately */
hf_private_construct(hf_private_req_user);
size_t __to_write_len = evbuffer_get_length(hf_private_req_user->evbuf_raw);
unsigned char * __to_write = evbuffer_pullup(hf_private_req_user->evbuf_raw, __to_write_len);
/* Write the data to stream, UPSTREAM is the incoming direction for response */
ret = tfe_stream_write(stream, CONN_DIR_UPSTREAM, __to_write, __to_write_len);
if (unlikely(ret < 0)) { assert(0); }
hf_private_destory(hf_private_req_user);
hs_private->hf_private_resp_user = NULL;
}
if (hf_private_req_in->stream_action == ACTION_FORWARD_DATA)
{ {
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes)); tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes));
return ACTION_FORWARD_DATA; return ACTION_FORWARD_DATA;
} }
if (hf_private_request->stream_action == ACTION_DROP_DATA) if (hf_private_req_in->stream_action == ACTION_DROP_DATA)
{ {
tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes)); tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes));
return ACTION_DROP_DATA; return ACTION_DROP_DATA;
@@ -158,8 +175,6 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
} }
hf_private_resp_in = to_hf_response_private(hs_private); hf_private_resp_in = to_hf_response_private(hs_private);
hf_private_resp_user = hs_private->hf_private_resp_user;
/* First time parse http response */ /* First time parse http response */
if (hf_private_resp_in == NULL) if (hf_private_resp_in == NULL)
{ {
@@ -172,15 +187,6 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
hs_private_hf_private_set(hs_private, hf_private_resp_in, TFE_HTTP_RESPONSE); hs_private_hf_private_set(hs_private, hf_private_resp_in, TFE_HTTP_RESPONSE);
hf_private_set_session(hf_private_resp_in, hs_private); hf_private_set_session(hf_private_resp_in, hs_private);
if (hf_private_resp_user != NULL)
{
/* Set nothing callback, dont call user callback because the data need to be droped */
hf_private_set_callback(hf_private_resp_in, NULL, NULL, NULL);
/* Drop all data, because the user's response need to be send */
hf_private_resp_in->stream_action = ACTION_DROP_DATA;
}
else
{
/* Closure, catch stream, session and thread_id */ /* Closure, catch stream, session and thread_id */
struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1);
__closure->thread_id = thread_id; __closure->thread_id = thread_id;
@@ -189,24 +195,6 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
/* Set callback, this callback used to raise business event */ /* Set callback, this callback used to raise business event */
hf_private_set_callback(hf_private_resp_in, __user_event_dispatch, __closure, free); hf_private_set_callback(hf_private_resp_in, __user_event_dispatch, __closure, free);
/* Inherit user stream action, this action can affact session's behavior */
hf_private_resp_in->user_stream_action = to_hf_request_private(hs_private)->user_stream_action;
}
}
if (hf_private_resp_user != NULL)
{
/* Construct, and write response immediately */
hf_private_construct(hf_private_resp_user);
size_t __to_write_len = evbuffer_get_length(hf_private_resp_user->evbuf_raw);
unsigned char * __to_write = evbuffer_pullup(hf_private_resp_user->evbuf_raw, __to_write_len);
/* Write the data to stream, UPSTREAM is the incoming direction for response */
ret = tfe_stream_write(stream, CONN_DIR_DOWNSTREAM, __to_write, __to_write_len);
if (unlikely(ret < 0)) { assert(0); }
hf_private_destory(hf_private_resp_user);
hs_private->hf_private_resp_user = NULL;
} }
/* Parse the content, the data which in defered state has been ignored. */ /* Parse the content, the data which in defered state has been ignored. */
@@ -234,6 +222,22 @@ enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stre
goto __errout; goto __errout;
} }
hf_private_resp_user = hs_private->hf_private_resp_user;
if (hf_private_resp_user != NULL && hf_private_resp_user->message_status == STATUS_COMPLETE)
{
/* Construct, and write response immediately */
hf_private_construct(hf_private_resp_user);
size_t __to_write_len = evbuffer_get_length(hf_private_resp_user->evbuf_raw);
unsigned char * __to_write = evbuffer_pullup(hf_private_resp_user->evbuf_raw, __to_write_len);
/* Write the data to stream, UPSTREAM is the incoming direction for response */
ret = tfe_stream_write(stream, CONN_DIR_DOWNSTREAM, __to_write, __to_write_len);
if (unlikely(ret < 0)) { assert(0); }
hf_private_destory(hf_private_resp_user);
hs_private->hf_private_resp_user = NULL;
}
if (hf_private_resp_in->message_status == STATUS_COMPLETE) if (hf_private_resp_in->message_status == STATUS_COMPLETE)
{ {
http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id);
@@ -290,7 +294,7 @@ enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stre
/* Now, we want to identify this stream */ /* Now, we want to identify this stream */
int ret = __http_connection_identify(stream, ht_conn, data, len); int ret = __http_connection_identify(stream, ht_conn, data, len);
if (ret != 0) goto __detach; if (ret < 0) goto __detach;
/* This is HTTP, try to preempt the stream /* This is HTTP, try to preempt the stream
* It may be failed because other plugin preempted before us */ * It may be failed because other plugin preempted before us */

View File

@@ -252,7 +252,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
hf_public->minor_version = parser->http_minor; hf_public->minor_version = parser->http_minor;
/* Copy version to session */ /* Copy version to session */
if(hf_private->session != NULL) if (hf_private->session != NULL)
{ {
to_hs_public(hf_private->session)->major_version = hf_public->major_version; to_hs_public(hf_private->session)->major_version = hf_public->major_version;
to_hs_public(hf_private->session)->minor_version = hf_public->minor_version; to_hs_public(hf_private->session)->minor_version = hf_public->minor_version;
@@ -277,7 +277,15 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
hf_private->event_cb(hf_private, EV_HTTP_RESP_HDR, NULL, 0, hf_private->event_cb_user); hf_private->event_cb(hf_private, EV_HTTP_RESP_HDR, NULL, 0, hf_private->event_cb_user);
} }
if (hf_private->is_user_stream_action_set)
{
hf_private->stream_action = hf_private->user_stream_action;
}
else
{
hf_private->stream_action = ACTION_FORWARD_DATA; hf_private->stream_action = ACTION_FORWARD_DATA;
}
return 0; return 0;
} }
@@ -423,7 +431,19 @@ const char * hf_ops_field_iterate(const struct tfe_http_half * half, void ** ite
int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag) int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, int flag)
{ {
struct http_half_private * hf_private = to_hf_private(half); struct http_half_private * hf_private = to_hf_private(half);
if (hf_private->evbuf_body == NULL) { hf_private->evbuf_body = evbuffer_new(); }
/* Indicate the body is finished */
if (buff == NULL && size == 0)
{
hf_private->message_status = STATUS_COMPLETE;
return 0;
}
if (hf_private->evbuf_body == NULL)
{
hf_private->evbuf_body = evbuffer_new();
}
return evbuffer_add(hf_private->evbuf_body, buff, size); return evbuffer_add(hf_private->evbuf_body, buff, size);
} }
@@ -517,7 +537,7 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char
if (sz_parsed == len) if (sz_parsed == len)
{ {
hf_private->parse_cursor += sz_parsed; hf_private->parse_cursor += sz_parsed;
return 0; return HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED ? 1 : 0;
} }
/* The paused parsar indicate the message boundary has been touched, we should return. /* The paused parsar indicate the message boundary has been touched, we should return.
@@ -550,19 +570,48 @@ void hs_ops_drop(struct tfe_http_session * session)
return; return;
} }
void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req) // TODO: change the return type to int, there is something happend where -1 returned.
void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req_user)
{ {
struct http_half_private * hf_private = to_hf_private(req);
struct http_session_private * hs_private = to_hs_private(session); struct http_session_private * hs_private = to_hs_private(session);
struct http_half_private * hf_in_private = to_hf_request_private(hs_private);
struct http_half_private * hf_user_private = to_hf_private(req_user);
assert(hs_private->hf_private_req_user != NULL); if (hf_in_private != NULL)
hs_private->hf_private_req_user = hf_private; {
if (hf_in_private->stream_action == ACTION_DEFER_DATA)
{
hf_in_private->user_stream_action = ACTION_DROP_DATA;
hf_in_private->is_user_stream_action_set = true;
}
else
{
assert(0);
}
}
assert(hs_private->hf_private_req_user == NULL);
hs_private->hf_private_req_user = hf_user_private;
} }
void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp)
{ {
struct http_half_private * hf_private = to_hf_private(resp); struct http_half_private * hf_private = to_hf_private(resp);
struct http_session_private * hs_private = to_hs_private(session); struct http_session_private * hs_private = to_hs_private(session);
struct http_half_private * hf_in_private = to_hf_response_private(hs_private);
if (hf_in_private != NULL)
{
if (hf_in_private->stream_action == ACTION_DEFER_DATA)
{
hf_in_private->user_stream_action = ACTION_DROP_DATA;
hf_in_private->is_user_stream_action_set = true;
}
else
{
assert(0);
}
}
assert(hs_private->hf_private_resp_user == NULL); assert(hs_private->hf_private_resp_user == NULL);
hs_private->hf_private_resp_user = hf_private; hs_private->hf_private_resp_user = hf_private;
@@ -603,7 +652,18 @@ struct tfe_http_session_ops __http_session_ops =
void __construct_request_line(struct http_half_private * hf_private) void __construct_request_line(struct http_half_private * hf_private)
{ {
enum tfe_http_std_method __std_method = (enum tfe_http_std_method ) hf_private->method_or_status;
const char * __str_method = http_std_method_to_string(__std_method);
if (__str_method == NULL)
{
__str_method = "";
}
hf_private->major = 1;
hf_private->minor = 1;
evbuffer_add_printf(hf_private->evbuf_raw, "%s %s HTTP/%d.%d\r\n",
__str_method, hf_private->url_storage, hf_private->major, hf_private->minor);
} }
void __construct_response_line(struct http_half_private * hf_private) void __construct_response_line(struct http_half_private * hf_private)
@@ -615,6 +675,9 @@ void __construct_response_line(struct http_half_private * hf_private)
__str_resp_code = ""; __str_resp_code = "";
} }
hf_private->major = 1;
hf_private->minor = 1;
evbuffer_add_printf(hf_private->evbuf_raw, "HTTP/%d.%d %d %s\r\n", evbuffer_add_printf(hf_private->evbuf_raw, "HTTP/%d.%d %d %s\r\n",
hf_private->major, hf_private->minor, __resp_code, __str_resp_code); hf_private->major, hf_private->minor, __resp_code, __str_resp_code);
} }