diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index f6846a1..6a2faaa 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -188,7 +188,7 @@ struct http_field_name /* Standard Field */ enum tfe_http_std_field field_id; /* Non-NULL when field_id is HTTP_UNKNOWN_FIELD */ - char * field_name; + const char * field_name; }; typedef void (http_session_begin_cb)(const struct tfe_stream * stream, @@ -334,14 +334,23 @@ rpc_finish_cb_ }; */ - /* TODO: transfer these declaration to ht_frame.h */ -struct ht_frame_session_ctx * http_frame_raise_session_begin(const tfe_stream * stream, +struct http_frame_plugin_status +{ + void * pme; + unsigned int detached; + unsigned int preempt; +}; + +struct http_frame_session_ctx * http_frame_raise_session_begin(const tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id); -void http_frame_raise_session_end(struct ht_frame_session_ctx * ss_ctx, struct tfe_stream * stream, +void http_frame_raise_session_end(struct http_frame_session_ctx * ht_frame, struct tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id); -void http_frame_raise_event(struct ht_frame_session_ctx * ht_frame, - const tfe_stream * stream, struct tfe_http_session * ht_session, enum tfe_http_event event, - const unsigned char * data, size_t datalen, unsigned int thread_id); +void http_frame_raise_event(struct http_frame_session_ctx * ht_frame, + const struct tfe_stream * stream, struct tfe_http_session * ht_session, enum tfe_http_event event, + const unsigned char * data, size_t datalen, unsigned int thread_id) + +void http_frame_currect_plugin_detach(struct http_frame_session_ctx * ht_frame); +int http_frame_currect_plugin_preempt(struct http_frame_session_ctx * ht_frame); diff --git a/common/src/tfe_http.cpp b/common/src/tfe_http.cpp index bb8d170..42b0a89 100644 --- a/common/src/tfe_http.cpp +++ b/common/src/tfe_http.cpp @@ -14,34 +14,51 @@ struct tfe_http_half * tfe_http_response_create(int major_version, int resp_code return NULL; } -struct ht_frame_session_ctx +struct http_frame_session_ctx { - void ** pmes; - unsigned int nr_pmes; - struct tfe_plugin * preempt_plugin; + struct http_frame_plugin_status * plugin_status; + unsigned int nr_plugin_status; + + struct tfe_plugin * calling_plugin; + struct http_frame_plugin_status * calling_plugin_status; }; -struct ht_frame_session_ctx * http_frame_raise_session_begin(const struct tfe_stream * stream, +struct http_frame_plugin_status * http_frame_current_plugin_status(struct http_frame_session_ctx * ht_frame) +{ + return ht_frame->calling_plugin_status; +} + +struct http_frame_session_ctx * http_frame_raise_session_begin(const struct tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id) { - struct ht_frame_session_ctx * ss_ctx = ALLOC(struct ht_frame_session_ctx, 1); - ss_ctx->nr_pmes = tfe_plugin_total_counts(); - ss_ctx->pmes = ALLOC(void *, ss_ctx->nr_pmes); + struct http_frame_session_ctx * ht_frame = ALLOC(struct http_frame_session_ctx, 1); + ht_frame->nr_plugin_status = tfe_plugin_total_counts(); + ht_frame->plugin_status = ALLOC(struct http_frame_plugin_status, ht_frame->nr_plugin_status); unsigned int __for_each_iterator = 0; unsigned int __plugin_id = 0; - struct tfe_plugin * plugin_info_iter; + struct tfe_plugin * plugin_info_iter; TFE_PLUGIN_FOREACH(plugin_info_iter, &__for_each_iterator) { if (plugin_info_iter->on_session_begin == NULL) continue; - plugin_info_iter->on_session_begin(stream, ht_session, thread_id, &ss_ctx->pmes[__plugin_id]); + + /* Calling ctx, in callback can fetch by calling frame_plugin_status_get_XXX */ + ht_frame->calling_plugin = plugin_info_iter; + ht_frame->calling_plugin_status = &ht_frame->plugin_status[__plugin_id]; + + /* Call session begin */ + void ** calling_pme = &ht_frame->calling_plugin_status->pme; + plugin_info_iter->on_session_begin(stream, ht_session, thread_id, calling_pme); } - return ss_ctx; + /* Clear calling ctx */ + ht_frame->calling_plugin = NULL; + ht_frame->calling_plugin_status = NULL; + return ht_frame; }; -void http_frame_raise_session_end(struct ht_frame_session_ctx * ss_ctx, struct tfe_stream * stream, +void http_frame_raise_session_end(struct http_frame_session_ctx * ht_frame, struct tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id) { unsigned int __for_each_iterator = 0; @@ -51,14 +68,24 @@ void http_frame_raise_session_end(struct ht_frame_session_ctx * ss_ctx, struct t TFE_PLUGIN_FOREACH(plugin_info_iter, &__for_each_iterator) { if (plugin_info_iter->on_session_end == NULL) continue; - plugin_info_iter->on_session_end(stream, ht_session, thread_id, &ss_ctx->pmes[__plugin_id]); + + /* Calling ctx, in callback can fetch by calling frame_plugin_status_get_XXX */ + ht_frame->calling_plugin = plugin_info_iter; + ht_frame->calling_plugin_status = &ht_frame->plugin_status[__plugin_id]; + + /* Call session end */ + void ** calling_pme = &ht_frame->calling_plugin_status->pme; + plugin_info_iter->on_session_end(stream, ht_session, thread_id, calling_pme); } - free(ss_ctx->pmes); - free(ss_ctx); + ht_frame->calling_plugin = NULL; + ht_frame->calling_plugin_status = NULL; + + free(ht_frame->plugin_status); + free(ht_frame); } -void http_frame_raise_event(struct ht_frame_session_ctx * ht_frame, +void http_frame_raise_event(struct http_frame_session_ctx * ht_frame, const struct tfe_stream * stream, struct tfe_http_session * ht_session, enum tfe_http_event event, const unsigned char * data, size_t datalen, unsigned int thread_id) { @@ -68,12 +95,34 @@ void http_frame_raise_event(struct ht_frame_session_ctx * ht_frame, struct tfe_plugin * plugin_info_iter; TFE_PLUGIN_FOREACH(plugin_info_iter, &__for_each_iterator) { - if (plugin_info_iter->on_session_data == NULL) - continue; + if (plugin_info_iter->on_session_data == NULL) continue; - plugin_info_iter->on_session_data(stream, ht_session, event, data, - datalen, thread_id, &ht_frame->pmes[__plugin_id]); + /* Calling ctx, in callback can fetch by calling frame_plugin_status_get_XXX */ + ht_frame->calling_plugin = plugin_info_iter; + ht_frame->calling_plugin_status = &ht_frame->plugin_status[__plugin_id]; + + void ** calling_pme = &ht_frame->calling_plugin_status->pme; + plugin_info_iter->on_session_data(stream, ht_session, event, data, datalen, thread_id, calling_pme); } - return; + ht_frame->calling_plugin = NULL; + ht_frame->calling_plugin_status = NULL; +} + +void http_frame_currect_plugin_detach(struct http_frame_session_ctx * ht_frame) +{ + ht_frame->calling_plugin_status->detached = 1; +} + +int http_frame_currect_plugin_preempt(struct http_frame_session_ctx * ht_frame) +{ + for(unsigned int i = 0; i < ht_frame->nr_plugin_status; i++) + { + struct http_frame_plugin_status * __plugin_status = &ht_frame->plugin_status[i]; + if (__plugin_status->preempt) return -1; + } + + assert(ht_frame->calling_plugin_status != NULL); + ht_frame->calling_plugin_status->preempt = 1; + return 0; } diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 8fd4dd5..dc6606d 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -84,8 +84,7 @@ int tfe_stream_preempt(const struct tfe_stream * stream) { struct tfe_stream_private * _stream = to_stream_private(stream); int plug_id = _stream->calling_idx; - int i = 0; - for (i = 0; i < _stream->nr_plugin_ctxs; i++) + for (unsigned int i = 0; i < _stream->nr_plugin_ctxs; i++) { if (_stream->plugin_ctxs[i].state == PLUG_STATE_PREEPTION) { diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 3feecb2..1f4cfde 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -6,6 +6,7 @@ extern "C" } #include +#include #include struct http_plugin @@ -13,12 +14,22 @@ struct http_plugin }; TAILQ_HEAD(hs_private_list, http_session_private); + +struct http_plugin_status +{ + +}; + struct http_session_private { + /* PUBLIC */ struct tfe_http_session hs_public; + /* TAILQ POINTER */ TAILQ_ENTRY(http_session_private) next; + /* REF, HTTP CONNECTION */ struct http_connection_private * hc_private; - struct ht_frame_session_ctx * ht_frame; + /* HTTP FRAME CTX */ + struct http_frame_session_ctx * ht_frame; }; struct http_connection_private @@ -35,79 +46,46 @@ struct http_connection_private unsigned int session_id_counter; }; -TAILQ_HEAD(http_header_private_list, http_header_private); -struct http_header_private -{ - TAILQ_ENTRY(http_header_private) next; - struct http_field_name * field; - char * value; -}; - -enum hf_private_status -{ - STATUS_INIT, - STATUS_READING, - STATUS_COMPLETE, -}; - -struct http_half_private -{ - /* PUBLIC STRUCTURE */ - struct tfe_http_half hf_public; - /* SESSION OF THIS REQUEST/RESPONSE */ - struct http_session_private * session; - /* HTTP PARSER */ - struct http_parser * parse_object; - /* HTTP PARSER SETTING */ - struct http_parser_settings * parse_settings; - /* CURSOR RELATED TO DELAYED DATA */ - size_t parse_cursor; - /* HTTP PARSER'S ERRNO */ - enum http_errno parse_errno; - /* HEADER K-V */ - struct http_header_private_list header_list; - - /* UNDERLAY BUFFER */ - int method_or_status; - short major; - short minor; - - struct evbuffer * evbuf_uri; - char * underlay_uri; - char * underlay_url; - - struct evbuffer * evbuf_header_field; - struct evbuffer * evbuf_header_value; - struct evbuffer * evbuf_body; - - enum hf_private_status status_header; - enum hf_private_status status_body; - enum hf_private_status status_message; -}; - static inline struct http_half_private * to_hf_request_private(struct http_session_private * hs_private) { + if(hs_private == NULL) return NULL; struct tfe_http_half * hf_public = hs_private->hs_public.req; return container_of(hf_public, struct http_half_private, hf_public); } static inline struct http_half_private * to_hf_response_private(struct http_session_private * hs_private) { + if(hs_private == NULL) return NULL; struct tfe_http_half * hf_public = hs_private->hs_public.resp; return container_of(hf_public, struct http_half_private, hf_public); } static inline struct tfe_http_half * to_hf_public(struct http_half_private * hf_private) { + if(hf_private == NULL) return NULL; return &hf_private->hf_public; } static inline struct http_half_private * to_hf_private(struct tfe_http_half * hf_public) { + if(hf_public == NULL) return NULL; return container_of(hf_public, struct http_half_private, hf_public); } static inline const struct http_half_private * to_hf_private(const struct tfe_http_half * hf_public) { + if(hf_public == NULL) return NULL; return container_of(hf_public, struct http_half_private, hf_public); } + +static inline const struct tfe_http_session * to_hs_public(const struct http_session_private * hs_private) +{ + if (hs_private == NULL) return NULL; + return &hs_private->hs_public; +} + +static inline struct http_session_private * to_hs_private(struct tfe_http_session * hs_public) +{ + if (hs_public == NULL) return NULL; + return container_of(hs_public, struct http_session_private, hs_public); +} diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 684215d..cbe6f61 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -1,5 +1,68 @@ #pragma once +#include +#include +#include + +typedef int (hf_private_cb)(struct http_half_private * hf_private, + tfe_http_event ev, const unsigned char * data, size_t len, void * user); + +enum hf_private_status +{ + STATUS_INIT, + STATUS_READING, + STATUS_COMPLETE, +}; + +struct http_header_private +{ + TAILQ_ENTRY(http_header_private) next; + struct http_field_name * field; + char * value; +}; + +TAILQ_HEAD(http_header_private_list, http_header_private); +struct http_half_private +{ + /* PUBLIC STRUCTURE */ + struct tfe_http_half hf_public; + /* SESSION OF THIS REQUEST/RESPONSE */ + struct http_session_private * session; + /* Callback Function */ + hf_private_cb * event_cb; + /* Callback Function User Pointer */ + void * event_cb_user; + /* Callback Function User Pointer Deleter */ + void (* event_cb_user_deleter)(void *); + /* HTTP PARSER */ + struct http_parser * parse_object; + /* HTTP PARSER SETTING */ + struct http_parser_settings * parse_settings; + /* CURSOR RELATED TO DELAYED DATA */ + size_t parse_cursor; + /* HTTP PARSER'S ERRNO */ + enum http_errno parse_errno; + /* HEADER K-V */ + struct http_header_private_list header_list; + + /* UNDERLAY BUFFER */ + int method_or_status; + short major; + short minor; + + struct evbuffer * evbuf_uri; + struct evbuffer * evbuf_header_field; + struct evbuffer * evbuf_header_value; + struct evbuffer * evbuf_body; + + enum hf_private_status body_status; + enum hf_private_status message_status; + + /* default stream action */ + enum tfe_stream_action stream_action; + enum tfe_stream_action user_stream_action; +}; + struct http_half_private * hf_private_create(tfe_http_direction ht_dir, short major, short minor); void hf_private_destory(struct http_half_private * hf_private); @@ -16,3 +79,9 @@ void hf_private_destory(struct http_half_private * hf_private); * -1 for error. */ int hf_private_parse(struct http_half_private * hf_private, const unsigned char * data, size_t len); + +void hf_private_set_callback(struct http_half_private * hf_private, hf_private_cb * cb, + void * user, void (* fn_user_deleter)(void *)); + +struct http_session_private * hs_private_create(struct http_connection_private * hc_private, + struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp); diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 283655a..0f25617 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -23,92 +23,78 @@ void http_plugin_deinit(struct tfe_proxy * proxy) return; } -struct http_session_private * hs_private_create(struct http_connection_private * hc_private, - struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) -{ - struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1); - - /* HS-PUBLIC */ - __hs_private->hs_public.req = hf_private_req != NULL ? to_hf_public(hf_private_req) : NULL; - __hs_private->hs_public.resp = hf_private_req != NULL ? to_hf_public(hf_private_resp) : NULL; - __hs_private->hs_public.session_id = hc_private->session_id_counter++; - - /* HS-PRIVATE*/ - __hs_private->hc_private = hc_private; - return __hs_private; -} - -void hs_private_destory(struct http_session_private * hs_private) -{ - free(hs_private); -} - -static void __SET_PME_HC_PRIVATE(struct http_connection_private * h_conn, void ** pme) -{ - *pme = (void *) h_conn; -} - -static struct http_connection_private * __GET_PME_HC_PRIVATE(void ** pme) -{ - return (struct http_connection_private *) (*pme); -} - int http_connection_entry_open(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_conn_dir dir, void ** pme) { struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1); TAILQ_INIT(&ht_conn->hs_private_list); - __SET_PME_HC_PRIVATE(ht_conn, pme); + *pme = (void *)ht_conn; return 0; } +struct user_event_dispatch_closure +{ + const struct tfe_stream * stream; + const struct tfe_http_session * session; + unsigned int thread_id; +}; + +static int __user_event_dispatch(struct http_half_private * hf_private, + enum tfe_http_event ev, const unsigned char * data, size_t len, void * user) +{ + struct user_event_dispatch_closure * __closure = (struct user_event_dispatch_closure *)user; + struct http_frame_session_ctx* ht_frame = hf_private->session->ht_frame; + + http_frame_raise_event(ht_frame, __closure->stream, __closure->session, + ev, data, len, __closure->thread_id); return 0; +} + enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream, - struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) + struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) { struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); - struct http_half_private * hf_private_request = NULL; + struct http_half_private * hf_private_request = to_hf_request_private(hs_private); /* tfe_hexdump(stderr, __FUNCTION__, data, (unsigned int)len); */ int ret = 0; + size_t __action_byptes; /* There is no available in session list, - * that indicate all HTTP request has corresponding response */ - if (hs_private == NULL) + * that indicate all HTTP request has corresponding response, + * or the last request is finished, we need to create a new session. */ + if (hs_private == NULL || hf_private_request->message_status == STATUS_COMPLETE) { - goto __new_session; + /* HTTP Request and Session */ + hf_private_request = hf_private_create(TFE_HTTP_REQUEST, 1, 0); + hs_private = hs_private_create(hc_private, hf_private_request, NULL); + + /* Closure, catch stream, session and thread_id */ + struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); + __closure->thread_id = thread_id; + __closure->stream = stream; + __closure->session = to_hs_public(hs_private); + + /* Set callback, this callback used to raise business event */ + hf_private_set_callback(hf_private_request, __user_event_dispatch, __closure, free); + + /* Call business plugin */ + hs_private->ht_frame = http_frame_raise_session_begin(stream, &hs_private->hs_public, thread_id); + if (hs_private->ht_frame == NULL) + { + TFE_STREAM_LOG_ERROR(stream, "Failed at raising session begin event. "); + goto __errout; + } + + TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); } - /* The last request is finished, we need to create a new session, - * or proceed parse content for last request */ - hf_private_request = to_hf_request_private(hs_private); - if (hf_private_request->status_message == STATUS_COMPLETE) - { - goto __new_session; - } - - goto __parse; - -__new_session: - hf_private_request = hf_private_create(TFE_HTTP_REQUEST, 1, 0); - hs_private = hs_private_create(hc_private, hf_private_request, NULL); - - /* Call business plugin */ - hs_private->ht_frame = http_frame_raise_session_begin(stream, &hs_private->hs_public, thread_id); - if(hs_private->ht_frame == NULL) - { - TFE_STREAM_LOG_ERROR(stream, "Failed at raising session begin event. "); - tfe_stream_detach(stream); - return ACTION_FORWARD_DATA; - } - - TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); - -__parse: + /* proceed parse content for last request */ ret = hf_private_parse(hf_private_request, data, len); + /* Need more data, no boundary touched */ if (ret == 0) { - return ACTION_DEFER_DATA; + return hf_private_request->stream_action; } /* Some kind of error happened, write log and detach the stream */ @@ -118,23 +104,29 @@ __parse: hf_private_request->parse_errno, http_errno_name(hf_private_request->parse_errno), http_errno_description(hf_private_request->parse_errno)); - tfe_stream_detach(stream); + goto __errout; + } + + /* Touch a boundary, such as the end of HTTP headers, bodys, et al. */ + __action_byptes = hf_private_request->parse_cursor; + hf_private_request->parse_cursor = 0; + + if (hf_private_request->stream_action == ACTION_FORWARD_DATA) + { + tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__action_byptes, sizeof(__action_byptes)); return ACTION_FORWARD_DATA; } - if (hf_private_request->status_header == STATUS_COMPLETE) + if (hf_private_request->stream_action == ACTION_DROP_DATA) { - http_frame_raise_event(hs_private->ht_frame, stream, &hs_private->hs_public, - EV_HTTP_REQ_HDR, NULL, 0, thread_id); + tfe_stream_action_set_opt(stream, ACTION_OPT_DROP_BYTES, &__action_byptes, sizeof(__action_byptes)); + return ACTION_DROP_DATA; } - /* Touch a boundary, such as the end of HTTP headers, bodys, et al. - * need to call user's cb */ - size_t __forward_bytes = hf_private_request->parse_cursor; - tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__forward_bytes, sizeof(__forward_bytes)); + goto __errout; - /* Clear the parser cursor */ - hf_private_request->parse_cursor = 0; +__errout: + tfe_stream_detach(stream); return ACTION_FORWARD_DATA; } @@ -158,7 +150,7 @@ int __http_connection_identify(const struct tfe_stream * stream, enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char * data, size_t len, void ** pme) { - struct http_connection_private * ht_conn = __GET_PME_HC_PRIVATE(pme); + struct http_connection_private * ht_conn = (struct http_connection_private *)(*pme); if (ht_conn->is_preempted == 0) { @@ -197,7 +189,8 @@ __detach: void http_connection_entry_close(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_stream_close_reason reason, void ** pme) { - struct http_connection_private * __ht_conn = __GET_PME_HC_PRIVATE(pme); + struct http_connection_private * __ht_conn = (struct http_connection_private *)(*pme); + free(__ht_conn); } static struct tfe_plugin __http_plugin_info = diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 35c2011..87a6eb3 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -1,6 +1,4 @@ #include -#include -#include #include #include @@ -9,6 +7,10 @@ #include #include +#include +#include +#include + #define __PARSER_TO_HF_PRIVATE(_parser) ((struct http_half_private *)(_parser->data)) static const char * __str_std_header_field_map[] = @@ -115,10 +117,10 @@ void __hf_public_req_fill_from_private(struct http_half_private * hf_private, st /* accept-encoding, host is located in header's K-V structure */ hf_req_spec->method = (enum tfe_http_std_method) parser->method; const static struct http_field_name __host_field_name = - { - .field_id = TFE_HTTP_HOST, - .field_name = NULL - }; + { + .field_id = TFE_HTTP_HOST, + .field_name = NULL + }; hf_req_spec->host = (char *) tfe_http_field_read(hf_public, &__host_field_name); @@ -140,10 +142,10 @@ void __hf_public_resp_fill_from_private(struct http_half_private * hf_private, s /* Status Code */ hf_resp_spec->resp_code = parser->status_code; const static struct http_field_name __cont_encoding_field_name = - { - .field_id = TFE_HTTP_CONT_ENCODING, - .field_name = NULL - }; + { + .field_id = TFE_HTTP_CONT_ENCODING, + .field_name = NULL + }; /* Content Encoding */ hf_resp_spec->content_encoding = (char *) tfe_http_field_read(hf_public, &__cont_encoding_field_name); @@ -167,24 +169,23 @@ static int __parser_callback_on_message_begin(struct http_parser * parser) hf_private->evbuf_header_value = evbuffer_new(); hf_private->evbuf_body = evbuffer_new(); - hf_private->status_header = STATUS_INIT; - hf_private->status_body = STATUS_INIT; - hf_private->status_message = STATUS_READING; + hf_private->body_status = STATUS_INIT; + hf_private->message_status = STATUS_READING; + /* Never call user's callback, need to defer data */ + hf_private->stream_action = ACTION_DEFER_DATA; return 0; } static int __parser_callback_on_uri_field(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_INIT, STATUS_READING); return evbuffer_add(hf_private->evbuf_uri, at, length); } static int __parser_callback_on_header_field(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_READING); /* Last field-value tuple doesn't push into hf_private, flush these */ if (evbuffer_get_length(hf_private->evbuf_header_field) != 0) @@ -198,7 +199,6 @@ static int __parser_callback_on_header_field(struct http_parser * parser, const static int __parser_callback_on_header_value(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_READING); return evbuffer_add(hf_private->evbuf_header_value, at, length); } @@ -225,41 +225,91 @@ static int __parser_callback_on_headers_complete(http_parser * parser) __hf_public_resp_fill_from_private(hf_private, parser); } - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_COMPLETE); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_body, STATUS_INIT, STATUS_READING); + if (hf_private->event_cb && hf_direction == TFE_HTTP_REQUEST) + { + hf_private->event_cb(hf_private, EV_HTTP_REQ_HDR, NULL, 0, hf_private->event_cb_user); + } + if (hf_private->event_cb && hf_direction == TFE_HTTP_RESPONSE) + { + hf_private->event_cb(hf_private, EV_HTTP_RESP_HDR, NULL, 0, hf_private->event_cb_user); + } + + hf_private->stream_action = ACTION_FORWARD_DATA; return 0; } static int __parser_callback_on_body(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - return evbuffer_add(hf_private->evbuf_body, at, length); + enum tfe_http_direction hf_direction = hf_private->hf_public.direction; + enum tfe_http_event ev_body_begin; + enum tfe_http_event ev_body_cont; + + if (hf_direction == TFE_HTTP_REQUEST) + { + ev_body_begin = EV_HTTP_REQ_BODY_BEGIN; + ev_body_cont = EV_HTTP_REQ_BODY_CONT; + } + else + { + ev_body_begin = EV_HTTP_RESP_BODY_BEGIN; + ev_body_cont = EV_HTTP_RESP_BODY_CONT; + } + + if (hf_private->body_status == STATUS_INIT && hf_private->event_cb) + { + hf_private->event_cb(hf_private, ev_body_begin, NULL, parser->content_length, hf_private->event_cb_user); + } + + if (hf_private->event_cb) + { + hf_private->event_cb(hf_private, ev_body_cont, (const unsigned char *) at, length, hf_private->event_cb_user); + } + + hf_private->body_status = STATUS_READING; + return 0; } static int __parser_callback_on_message_complete(http_parser * parser) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_COMPLETE, STATUS_COMPLETE); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_body, STATUS_READING || STATUS_COMPLETE, STATUS_COMPLETE); - __HF_PRIVATE_CHANGE_STATUS(hf_private->status_message, STATUS_READING, STATUS_COMPLETE); + enum tfe_http_direction hf_direction = hf_private->hf_public.direction; + enum tfe_http_event ev_body_end; + if (hf_direction == TFE_HTTP_REQUEST) + { + ev_body_end = EV_HTTP_REQ_BODY_END; + } + else + { + ev_body_end = EV_HTTP_RESP_BODY_END; + } + + if (hf_private->event_cb) + { + hf_private->event_cb(hf_private, ev_body_end, NULL, 0, hf_private->event_cb_user); + } + + hf_private->body_status = STATUS_COMPLETE; + hf_private->message_status = STATUS_COMPLETE; + http_parser_pause(parser, 1); return 0; } static http_parser_settings __http_half_parse_setting = -{ - .on_message_begin = __parser_callback_on_message_begin, - .on_url = __parser_callback_on_uri_field, - .on_status = NULL, - .on_header_field = __parser_callback_on_header_field, - .on_header_value = __parser_callback_on_header_value, - .on_headers_complete = __parser_callback_on_headers_complete, - .on_body = __parser_callback_on_body, - .on_message_complete = __parser_callback_on_message_complete, - .on_chunk_header = NULL, - .on_chunk_complete = NULL -}; + { + .on_message_begin = __parser_callback_on_message_begin, + .on_url = __parser_callback_on_uri_field, + .on_status = NULL, + .on_header_field = __parser_callback_on_header_field, + .on_header_value = __parser_callback_on_header_value, + .on_headers_complete = __parser_callback_on_headers_complete, + .on_body = __parser_callback_on_body, + .on_message_complete = __parser_callback_on_message_complete, + .on_chunk_header = NULL, + .on_chunk_complete = NULL + }; const char * hf_ops_field_read(const struct tfe_http_half * half, const struct http_field_name * field) { @@ -326,7 +376,16 @@ int hf_ops_append_body(struct tfe_http_half * half, char * buff, size_t size, in void hf_private_destory(struct http_half_private * hf_private) { - if (hf_private->parse_object != NULL) free(hf_private->parse_object); + if (hf_private->parse_object != NULL) + { + free(hf_private->parse_object); + } + + if (hf_private->event_cb_user_deleter != NULL) + { + hf_private->event_cb_user_deleter(hf_private->event_cb_user); + } + free(hf_private); } @@ -373,10 +432,17 @@ struct http_half_private * hf_private_create(tfe_http_direction ht_dir, short ma return hf_private; } +void hf_private_set_callback(struct http_half_private * hf_private, hf_private_cb * cb, + void * user, void (* user_deleter)(void *)) +{ + hf_private->event_cb = cb; + hf_private->event_cb_user = user; + hf_private->event_cb_user_deleter = user_deleter; +} + int hf_private_parse(struct http_half_private * hf_private, const unsigned char * data, size_t len) { assert(hf_private->parse_cursor <= len); - int ret = 0; /* Caculate the memory zones to scan. The zone from data to data + cursor has been scaned * at last construct procedure, so we don't need to scan again. */ @@ -387,29 +453,92 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char size_t sz_parsed = http_parser_execute(hf_private->parse_object, &__http_half_parse_setting, __data_with_offset, __len_with_offset); - /* The paused parsar indicate that some kind of boundary has been touched, - * we should return and call user's data callback. resume it to normal status */ - if (sz_parsed && HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED) + /* Nothing happended */ + if (sz_parsed == len) + { + hf_private->parse_cursor += sz_parsed; + return 0; + } + + /* The paused parsar indicate the message boundary has been touched, we should return. + * resume it to normal status */ + if (HTTP_PARSER_ERRNO(hf_private->parse_object) == HPE_PAUSED) { http_parser_pause(hf_private->parse_object, 0); - hf_private->parse_cursor += sz_parsed; - - if (data[hf_private->parse_cursor] == '\r' && data[hf_private->parse_cursor + 1] == '\n') - { - hf_private->parse_cursor++; - } - + hf_private->parse_cursor += sz_parsed + 1; return 1; } - /* Some kind of exception happend */ - if (sz_parsed != __len_with_offset) - { - hf_private->parse_errno = HTTP_PARSER_ERRNO(hf_private->parse_object); - ret = -1; goto __out; - } - -__out: - hf_private->parse_cursor += sz_parsed; - return ret; + hf_private->parse_errno = HTTP_PARSER_ERRNO(hf_private->parse_object); + return -1; +} + +static struct tfe_http_session * hs_ops_allow_write(const struct tfe_http_session * session) +{ + struct http_session_private * hs_private = to_hs_private((struct tfe_http_session *)session); + return http_frame_currect_plugin_preempt(hs_private->ht_frame) ? (struct tfe_http_session *)session : NULL; +} + +void hs_ops_detach(const struct tfe_http_session * session) +{ + struct http_session_private * hs_private = to_hs_private((struct tfe_http_session *)session); + return http_frame_currect_plugin_detach(hs_private->ht_frame); +} + +void hs_ops_drop(struct tfe_http_session * session) +{ + return; +} + +void hs_ops_request_set(struct tfe_http_session * session, struct tfe_http_half * req) +{ + return; +} + +void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half * resp) +{ + return; +} + +struct tfe_http_half * hs_ops_request_create(struct tfe_http_session * session, + enum tfe_http_std_method method, const char * uri) +{ + return NULL; +} + +struct tfe_http_half * hs_ops_response_create(struct tfe_http_session * session, int resp_code) +{ + return NULL; +} + +struct tfe_http_session_ops __http_session_ops = +{ + .ops_allow_write = hs_ops_allow_write, + .ops_detach = hs_ops_detach, + .ops_drop = hs_ops_drop, + .ops_request_set = hs_ops_request_set, + .ops_response_set = hs_ops_response_set, + .ops_request_create = hs_ops_request_create, + .ops_response_create = hs_ops_response_create +}; + +struct http_session_private * hs_private_create(struct http_connection_private * hc_private, + struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) +{ + struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1); + + /* HS-PUBLIC */ + __hs_private->hs_public.ops = &__http_session_ops; + __hs_private->hs_public.req = hf_private_req != NULL ? to_hf_public(hf_private_req) : NULL; + __hs_private->hs_public.resp = hf_private_req != NULL ? to_hf_public(hf_private_resp) : NULL; + __hs_private->hs_public.session_id = hc_private->session_id_counter++; + + /* HS-PRIVATE*/ + __hs_private->hc_private = hc_private; + return __hs_private; +} + +void hs_private_destory(struct http_session_private * hs_private) +{ + free(hs_private); } diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index bea37f9..ddd6514 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -8,6 +8,7 @@ ExternalProject_Add(OpenSSL PREFIX openssl URL_MD5 5271477e4d93f4ea032b665ef095ff24 CONFIGURE_COMMAND ./Configure linux-x86_64 --prefix= --openssldir=/lib/ssl no-weak-ssl-ciphers enable-ec_nistp_64_gcc_128 no-shared + INSTALL_COMMAND make install_sw BUILD_IN_SOURCE 1) ExternalProject_Get_Property(OpenSSL INSTALL_DIR)