diff --git a/CMakeLists.txt b/CMakeLists.txt index eaef82b..8600044 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,10 +4,13 @@ project(tfe) set(CMAKE_CXX_STANDARD 11) set(CMAKE_C_STANDARD 11) set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -Wall) add_definitions(-D_GNU_SOURCE) +#set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_GNU_SOURCE -Wall -fsanitize=address -fno-omit-frame-pointer") +#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GNU_SOURCE -Wall -fsanitize=address -fno-omit-frame-pointer") +#set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") + enable_testing() add_subdirectory(vendor) add_subdirectory(common) diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index 053917e..0295e0e 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -130,6 +130,8 @@ struct tfe_http_resp_spec { int resp_code; const char * content_encoding; + const char * content_type; + const char * content_length; }; enum tfe_http_direction @@ -319,7 +321,7 @@ struct http_frame_plugin_status struct http_frame_session_ctx * http_frame_raise_session_begin(const tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id); -void http_frame_raise_session_end(struct http_frame_session_ctx * ht_frame, struct tfe_stream * stream, +void http_frame_raise_session_end(struct http_frame_session_ctx * ht_frame, const tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id); void http_frame_raise_event(struct http_frame_session_ctx * ht_frame, diff --git a/common/src/tfe_http.cpp b/common/src/tfe_http.cpp index 774dfcd..022f09b 100644 --- a/common/src/tfe_http.cpp +++ b/common/src/tfe_http.cpp @@ -151,7 +151,7 @@ struct http_frame_session_ctx * http_frame_raise_session_begin(const struct tfe_ return ht_frame; }; -void http_frame_raise_session_end(struct http_frame_session_ctx * ht_frame, struct tfe_stream * stream, +void http_frame_raise_session_end(struct http_frame_session_ctx * ht_frame, const tfe_stream * stream, struct tfe_http_session * ht_session, unsigned int thread_id) { unsigned int __for_each_iterator = 0; diff --git a/platform/include/internal/tcp_stream.h b/platform/include/internal/tcp_stream.h index 98ecf3f..76b7de7 100644 --- a/platform/include/internal/tcp_stream.h +++ b/platform/include/internal/tcp_stream.h @@ -11,5 +11,5 @@ enum tfe_stream_option }; int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg); -void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream); +int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream); void tfe_stream_destory(struct tfe_stream_private * stream); diff --git a/platform/src/kni_acceptor.cpp b/platform/src/kni_acceptor.cpp index 2350255..9ac9012 100644 --- a/platform/src/kni_acceptor.cpp +++ b/platform/src/kni_acceptor.cpp @@ -144,6 +144,7 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user) __accept_para.session_type = __session_proto; __accept_para.downstream_fd = __fds[0]; __accept_para.upstream_fd = __fds[1]; + __accept_para.passthrough = false; if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0) { diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index b1c6f8a..7a87226 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -82,12 +82,22 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type)); } - tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd); - - TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted", - stream, para->downstream_fd, para->upstream_fd, para->session_type); + int ret = tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd); + if (ret < 0) + { + TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.", + stream, para->downstream_fd, para->upstream_fd, para->session_type); goto __errout; + } + else + { + TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted.", + stream, para->downstream_fd, para->upstream_fd, para->session_type); + } return 0; + +__errout: + return -1; } void tfe_proxy_loopbreak(tfe_proxy * ctx) @@ -238,10 +248,6 @@ int main(int argc, char *argv[]) g_default_proxy->evbase, g_default_logger, NULL); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); - /* MODULE INIT */ - g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); - CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); - /* PLUGIN INIT */ unsigned int plugin_iterator = 0; for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); @@ -269,6 +275,10 @@ int main(int argc, char *argv[]) CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid); } + /* ACCEPTOR INIT */ + g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); + CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); + TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); event_base_dispatch(g_default_proxy->evbase); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 94b8148..9e08e51 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -660,6 +660,7 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user) void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) { + return; assert(0); } @@ -672,7 +673,7 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ unsigned int total_plugin_count = tfe_plugin_total_counts(); _stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count); - return (struct tfe_stream *) &_stream->head; + return &_stream->head; } void __stream_access_log_write(struct tfe_stream_private * stream) @@ -733,6 +734,7 @@ void tfe_stream_destory(struct tfe_stream_private * stream) { future_destroy(stream->future_upstream_create); } + stream->proxy_ref = NULL; free(stream); thread->load--; @@ -878,7 +880,7 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket return; } -void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) +int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); struct event_base * ev_base = _stream->thread_ref->evbase; @@ -890,13 +892,14 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs __stream_fd_option_setup(_stream, fd_upstream); _stream->head.addr = __stream_addr_create_by_fds(stream, fd_downstream); - _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); - if (unlikely(_stream->head.addr == NULL)) { - assert(0); + TFE_LOG_ERROR(_stream->stream_logger, "Failed to fetch address for fd %d, %d, terminate fds.", + fd_downstream, fd_upstream); goto __errout; } + _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); + if (_stream->session_type == STREAM_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); @@ -921,7 +924,10 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs _stream->ssl_mgr, fd_upstream, fd_downstream, ev_base); } - return; + return 0; + +__errout: + return -1; } int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg) @@ -949,9 +955,10 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); /* Format input content */ - char __tmp_buffer[TFE_STRING_MAX]; - vsnprintf(__tmp_buffer, sizeof(__tmp_buffer), fmt, arg_ptr); + char * __tmp_buffer; + vasprintf(&__tmp_buffer, fmt, arg_ptr); /* Log content with stream tag */ - MESA_handle_runtime_log(_stream->stream_logger, level, "S-DETAIL", "%s %s", _stream->str_stream_addr, __tmp_buffer); + MESA_handle_runtime_log(_stream->stream_logger, level, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer); + free(__tmp_buffer); } diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index 46e0af8..2a61756 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -94,3 +94,8 @@ struct http_session_private * hs_private_create(struct http_connection_private * struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp); void hs_private_destory(struct http_session_private * hs_private); + +void hs_private_hf_private_set(struct http_session_private * hs_private, + struct http_half_private * hf, enum tfe_http_direction); + +struct http_half_private * hs_private_hf_private_release(struct http_session_private * hs_private, enum tfe_http_direction); diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index c5769b0..b142b0b 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -29,7 +29,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1); TAILQ_INIT(&ht_conn->hs_private_list); ht_conn->stream = stream; - *pme = (void *)ht_conn; + *pme = (void *) ht_conn; return 0; } @@ -43,11 +43,12 @@ struct user_event_dispatch_closure static int __user_event_dispatch(struct http_half_private * hf_private, enum tfe_http_event ev, const unsigned char * data, size_t len, void * user) { - struct user_event_dispatch_closure * __closure = (struct user_event_dispatch_closure *)user; - struct http_frame_session_ctx* ht_frame = hf_private->session->ht_frame; + struct user_event_dispatch_closure * __closure = (struct user_event_dispatch_closure *) user; + struct http_frame_session_ctx * ht_frame = hf_private->session->ht_frame; //todo: - http_frame_raise_event(ht_frame, __closure->stream, (struct tfe_http_session *)__closure->session, - ev, data, len, __closure->thread_id); return 0; + http_frame_raise_event(ht_frame, __closure->stream, (struct tfe_http_session *) __closure->session, + ev, data, len, __closure->thread_id); + return 0; } enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_stream * stream, @@ -139,8 +140,78 @@ __errout: } enum tfe_stream_action __http_connection_entry_on_response(const struct tfe_stream * stream, - struct http_connection_private * ht_conn, unsigned int thread_id, const unsigned char * data, size_t len) + struct http_connection_private * hc_private, unsigned int thread_id, const unsigned char * data, size_t len) { + struct http_session_private * hs_private = TAILQ_FIRST(&hc_private->hs_private_list); + struct http_half_private * hf_private_response = to_hf_response_private(hs_private); + int ret = 0; + + /* Standalone response, it means missing something or malformed http protocol */ + if (hs_private == NULL) + { + TFE_STREAM_LOG_ERROR(stream, "Standlone HTTP response emerged. Malformed HTTP Protocol, detached. "); + goto __errout; + } + + /* First time parse http response */ + if (hf_private_response == NULL) + { + /* HTTP Version */ + short resp_major = to_hf_request_private(hs_private)->major; + short resp_minor = to_hf_request_private(hs_private)->minor; + + /* Response */ + hf_private_response = hf_private_create(TFE_HTTP_RESPONSE, resp_major, resp_minor); + hs_private_hf_private_set(hs_private, hf_private_response, TFE_HTTP_RESPONSE); + hf_private_set_session(hf_private_response, hs_private); + + /* Closure, catch stream, session and thread_id */ + struct user_event_dispatch_closure * __closure = ALLOC(struct user_event_dispatch_closure, 1); + __closure->thread_id = thread_id; + __closure->stream = stream; + __closure->session = to_hs_public(hs_private); + + /* Set callback, this callback used to raise business event */ + hf_private_set_callback(hf_private_response, __user_event_dispatch, __closure, free); + + /* Inherit user stream action, this action can affact session's behavior */ + hf_private_response->user_stream_action = to_hf_request_private(hs_private)->user_stream_action; + } + + /* Parse the content, the data which in defered state has been ignored. */ + ret = hf_private_parse(hf_private_response, data, len); + + /* Need more data, no boundary touched */ + if (ret == 0) + { + if (hf_private_response->stream_action == ACTION_DROP_DATA || + hf_private_response->stream_action == ACTION_FORWARD_DATA) + { + hf_private_response->parse_cursor = 0; + } + + return hf_private_response->stream_action; + } + + /* Some kind of error happened, write log and detach the stream */ + if (ret == -1) + { + TFE_STREAM_LOG_ERROR(stream, "Failed at parsing HTTP response: %u, %s, %s", + hf_private_request->parse_errno, http_errno_name(hf_private_request->parse_errno), + http_errno_description(hf_private_request->parse_errno)); + + goto __errout; + } + + if (hf_private_response->message_status == STATUS_COMPLETE) + { + http_frame_raise_session_end(hs_private->ht_frame, stream, &hs_private->hs_public, thread_id); + TAILQ_REMOVE(&hc_private->hs_private_list, hs_private, next); + hs_private_destory(hs_private); + } + +__errout: + tfe_stream_detach(stream); return ACTION_FORWARD_DATA; } @@ -158,7 +229,7 @@ int __http_connection_identify(const struct tfe_stream * stream, enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char * data, size_t len, void ** pme) { - struct http_connection_private * ht_conn = (struct http_connection_private *)(*pme); + struct http_connection_private * ht_conn = (struct http_connection_private *) (*pme); if (ht_conn->is_preempted == 0) { @@ -192,30 +263,33 @@ __detach: void http_connection_entry_close(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_stream_close_reason reason, void ** pme) { - struct http_connection_private * ht_conn = (struct http_connection_private *)(*pme); + struct http_connection_private * ht_conn = (struct http_connection_private *) (*pme); + *pme = NULL; /* Delete all live sessions */ - struct http_session_private * hs_private_iter = NULL; - TAILQ_FOREACH(hs_private_iter, &ht_conn->hs_private_list, next) + while(true) { + struct http_session_private * hs_private_iter = TAILQ_FIRST(&ht_conn->hs_private_list); + if (hs_private_iter == NULL) break; + TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next); hs_private_destory(hs_private_iter); } /* Clear session counter, and free ht_conn structure */ ht_conn->session_id_counter = 0; - free(ht_conn); *pme = NULL; + free(ht_conn); } static struct tfe_plugin __http_plugin_info = -{ - .symbol = "HTTP", - .type = TFE_PLUGIN_TYPE_PROTOCOL, - .on_init = http_plugin_init, - .on_deinit = http_plugin_deinit, - .on_open = http_connection_entry_open, - .on_data = http_connection_entry_data, - .on_close = http_connection_entry_close -}; + { + .symbol = "HTTP", + .type = TFE_PLUGIN_TYPE_PROTOCOL, + .on_init = http_plugin_init, + .on_deinit = http_plugin_deinit, + .on_open = http_connection_entry_open, + .on_data = http_connection_entry_data, + .on_close = http_connection_entry_close + }; TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info) diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 9384e88..6a1058d 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -145,13 +145,32 @@ void __hf_public_resp_fill_from_private(struct http_half_private * hf_private, s /* Status Code */ hf_resp_spec->resp_code = parser->status_code; + + /* Content Type */ + const static struct http_field_name __cont_encoding_type_name = + { + .field_id = TFE_HTTP_CONT_TYPE, + .field_name = NULL + }; + + hf_resp_spec->content_type = (char *) tfe_http_field_read(hf_public, &__cont_encoding_type_name); + + /* Content Length */ + const static struct http_field_name __cont_encoding_length_name = + { + .field_id = TFE_HTTP_CONT_LENGTH, + .field_name = NULL + }; + + hf_resp_spec->content_length = (char *) tfe_http_field_read(hf_public, &__cont_encoding_length_name); + + /* Content Encoding */ const static struct http_field_name __cont_encoding_field_name = { .field_id = TFE_HTTP_CONT_ENCODING, .field_name = NULL }; - /* Content Encoding */ hf_resp_spec->content_encoding = (char *) tfe_http_field_read(hf_public, &__cont_encoding_field_name); } @@ -563,13 +582,11 @@ struct http_session_private * hs_private_create(struct http_connection_private * void __write_access_log(struct http_session_private * hs_private) { /* Prepare to write session access log */ - char __access_log[TFE_STRING_MAX]; - size_t __offset = 0; /* Request */ struct http_half_private * request = to_hf_request_private(hs_private); /* Response */ - struct http_half_private * response = to_hf_request_private(hs_private); + struct http_half_private * response = to_hf_response_private(hs_private); /* Req-Public */ struct tfe_http_req_spec * req_spec = request ? &to_hf_public(request)->req_spec : NULL; /* Resp-Public */ @@ -577,15 +594,34 @@ void __write_access_log(struct http_session_private * hs_private) /* Method */ const char * __str_method = req_spec ? http_std_method_to_string(req_spec->method) : "-"; - __offset += snprintf(__access_log + __offset, sizeof(__access_log) - __offset, "%s ", __str_method); - /* URL */ const char * __str_url = req_spec ? req_spec->url : "-"; - __offset += snprintf(__access_log + __offset, sizeof(__access_log) - __offset, "%s ", __str_url); + + /* Resp code */ + char __str_resp_code[TFE_STRING_MAX]; + if (resp_spec) + { + snprintf(__str_resp_code, sizeof(__str_resp_code) - 1, "%d", resp_spec->resp_code); + } + else + { + snprintf(__str_resp_code, sizeof(__str_resp_code) - 1, "%s", "-"); + } + + /* Content Type */ + const char * __str_cont_type = resp_spec ? resp_spec->content_type : "-"; + /* Content Length */ + const char * __str_cont_length = resp_spec ? resp_spec->content_length : "-"; + /* Content Encoding */ + const char * __str_cont_encoding = resp_spec ? resp_spec->content_encoding : "-"; + + char * __access_log; + asprintf(&__access_log, "%s %s %s %s %s %s", __str_method, + __str_url, __str_resp_code, __str_cont_type, __str_cont_length, __str_cont_encoding); const struct tfe_stream * stream = hs_private->hc_private->stream; tfe_stream_write_access_log(stream, RLOG_LV_INFO, "%s", __access_log); - (void)resp_spec; + free(__access_log); } void hs_private_destory(struct http_session_private * hs_private) @@ -593,3 +629,39 @@ void hs_private_destory(struct http_session_private * hs_private) __write_access_log(hs_private); free(hs_private); } + +void hs_private_hf_private_set(struct http_session_private * hs_private, struct http_half_private * hf, + enum tfe_http_direction direction) +{ + struct tfe_http_half ** ref_old_half_public; + struct http_half_private * old_half_private; + + if (direction == TFE_HTTP_REQUEST) + { + ref_old_half_public = &hs_private->hs_public.req; + old_half_private = to_hf_private(*ref_old_half_public); + } + else + { + ref_old_half_public = &hs_private->hs_public.resp; + old_half_private = to_hf_private(*ref_old_half_public); + } + + if (old_half_private != NULL) + { + hf_private_destory(old_half_private); + *ref_old_half_public = to_hf_public(hf); + } + else + { + *ref_old_half_public = to_hf_public(hf); + } + + return; +} + +struct http_half_private * hs_private_hf_private_release(struct http_session_private * hs_private, + enum tfe_http_direction) +{ + return nullptr; +}