diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index 536f671..0c07bb1 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -37,7 +37,8 @@ struct tfe_conn struct tfe_stream { - struct tfe_stream_addr* addr; + const char * str_stream_info; + struct tfe_stream_addr * addr; enum tfe_stream_proto proto; struct tfe_conn upstream; struct tfe_conn downstream; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 93cb4f2..c95d092 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -397,6 +397,7 @@ struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) assert(thread_id < g_default_proxy->nr_work_threads); return g_default_proxy->work_threads[thread_id]->evbase; } + struct event_base * tfe_proxy_get_gc_evbase(void) { return g_default_proxy->evbase; @@ -406,6 +407,7 @@ screen_stat_handle_t tfe_proxy_get_fs_handle(void) { return g_default_proxy->fs_handle; } + int tfe_proxy_ssl_add_trust_ca(const char* pem_file) { return ssl_manager_add_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 8d28c8a..216d69b 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -1050,6 +1050,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); + stream->str_stream_info = _stream->str_stream_addr; + if (_stream->session_type == STREAM_PROTO_PLAIN) { _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index f0a5e6e..0bdaeee 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -20,12 +20,11 @@ struct http_plugin pthread_mutex_t lock_list_hs_private[TFE_THREAD_MAX]; /* GC events */ struct event * gc_event_hs_private[TFE_THREAD_MAX]; + /* ACCESS LOGGER */ + void * access_logger; }; -struct http_plugin_status -{ - -}; +extern struct http_plugin * g_http_plugin; struct http_session_private { @@ -33,6 +32,8 @@ struct http_session_private struct tfe_http_session hs_public; /* TAILQ POINTER */ TAILQ_ENTRY(http_session_private) next; + /* HTTP CONNECTION ADDR */ + char * str_stream_info; /* REF, HTTP CONNECTION */ struct http_connection_private * hc_private; /* HTTP FRAME CTX */ diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index cc21d52..a97e3cc 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -89,6 +89,9 @@ int http_plugin_init(struct tfe_proxy * proxy) plugin_ctx->gc_event_hs_private[thread_id] = gc_event; } + plugin_ctx->access_logger = MESA_create_runtime_log_handle("log/http-access.log", RLOG_LV_INFO); + assert(plugin_ctx->access_logger != NULL); + return 0; } diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index c38e0d8..7a2d6a3 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -172,7 +172,7 @@ void __hf_public_req_fill_from_private(struct http_half_private * hf_private, st /* accept-encoding, host is located in header's K-V structure */ hf_req_spec->method = (enum tfe_http_std_method) parser->method; - hf_private->method_or_status = (enum tfe_http_std_method) parser->method; + hf_private->method_or_status = (enum tfe_http_std_method) parser->method; const static struct http_field_name __host_field_name = {TFE_HTTP_HOST, NULL}; hf_req_spec->host = (char *) tfe_http_field_read(hf_public, &__host_field_name); @@ -329,7 +329,11 @@ static int __parser_callback_on_headers_complete(http_parser * parser) char * __str_content_length = (char *) tfe_http_field_read(hf_public, &__cont_encoding_field_name); /* Does not contain a content-length, passthrough the whole TCP connection */ - if (unlikely(__str_content_length == NULL)) { hf_private->is_passthrough = true; return -1;} + if (unlikely(__str_content_length == NULL)) + { + hf_private->is_passthrough = true; + return -1; + } } tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR; @@ -346,8 +350,8 @@ static int __parser_callback_on_headers_complete(http_parser * parser) hf_private->stream_action = hf_private->user_stream_action; } - /* user's suspend tag is set, which indicate that the way to handle request/response - * cannot be determinate at now, need to defer */ + /* user's suspend tag is set, which indicate that the way to handle request/response + * cannot be determinate at now, need to defer */ else if (hs_private && hs_private->suspend_tag_signal) { /* Pause parser, prevent to parse request/response body, @@ -362,7 +366,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser) assert(hf_private->stream_action == ACTION_DEFER_DATA); } - /* Otherwise, forward the request/response */ + /* Otherwise, forward the request/response */ else { hf_private->stream_action = ACTION_FORWARD_DATA; @@ -608,7 +612,7 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream) { /* By stream, we do not support content-encoding for now, * send body directly without compression */ - if(hf_private->cv_compress_object != NULL) + if (hf_private->cv_compress_object != NULL) { hf_content_compress_destroy(hf_private->cv_compress_object); hf_private->cv_compress_object = NULL; @@ -676,12 +680,11 @@ int hf_ops_body_end(struct tfe_http_half * half) hf_private->body_status = STATUS_COMPLETE; hf_private->message_status = STATUS_COMPLETE; - if(hf_private->is_setup_by_stream) + if (hf_private->is_setup_by_stream) { hs_private->release_lock--; } - printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private); return 0; } @@ -879,7 +882,7 @@ void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half struct http_half_private * hf_in_resp_private = to_hf_response_private(hs_private); /* Call at request's callback, early response */ - if(hf_in_req_private != NULL && hf_in_resp_private == NULL) + if (hf_in_req_private != NULL && hf_in_resp_private == NULL) { /* Drop the incoming request */ if (hf_in_req_private->stream_action == ACTION_DEFER_DATA) @@ -1062,7 +1065,7 @@ void hf_private_construct(struct http_half_private * hf_private) } struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id, - struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) + struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) { struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1); __hs_private->thread_id = thread_id; @@ -1075,6 +1078,7 @@ struct http_session_private * hs_private_create(struct http_connection_private * /* HS-PRIVATE*/ __hs_private->hc_private = hc_private; + __hs_private->str_stream_info = tfe_strdup(hc_private->stream->str_stream_info); return __hs_private; } @@ -1110,7 +1114,8 @@ void __write_access_log(struct http_session_private * hs_private) /* Content Type */ const char * __str_cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; /* Content Encoding */ - const char * __str_cont_encoding = resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding: "-" : "-"; + const char * __str_cont_encoding = + resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-"; /* Upgrade Tag */ const char * __str_upgrade = response ? response->is_upgrade ? "UPGRADE" : "-" : "-"; @@ -1126,12 +1131,12 @@ void __write_access_log(struct http_session_private * hs_private) const char * __str_suspend = hs_private->suspend_counter > 0 ? "SUSPEND" : "-"; char * __access_log; - asprintf(&__access_log, "%d %s %s HTTP/%d.%d %s %s %s %s %s %s %s %s %s", hs_private->hs_public.session_id, __str_method, - __str_url, request->major, request->minor, __str_resp_code, __str_cont_type, __str_cont_encoding, + asprintf(&__access_log, "%s %d %s %s HTTP/%d.%d %s %s %s %s %s %s %s %s %s", hs_private->str_stream_info, + hs_private->hs_public.session_id, __str_method, __str_url, + request->major, request->minor, __str_resp_code, __str_cont_type, __str_cont_encoding, __str_upgrade, __str_req_passthrough, __str_resp_passthrough, __str_user_req, __str_user_resp, __str_suspend); - const struct tfe_stream * stream = hs_private->hc_private->stream; - tfe_stream_write_access_log(stream, RLOG_LV_INFO, "%s", __access_log); + TFE_LOG_INFO(g_http_plugin->access_logger, "%s", __access_log); free(__access_log); } @@ -1152,6 +1157,11 @@ void hs_private_destroy(struct http_session_private * hs_private) hf_private_destory(hf_resp); } + if (hs_private->str_stream_info) + { + free(hs_private->str_stream_info); + } + if (hs_private->hf_private_req_user) { assert(hs_private->hf_private_req_user->message_status == STATUS_COMPLETE); @@ -1173,6 +1183,7 @@ void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_p if (hs_private->release_lock > 0) { TAILQ_INSERT_TAIL(gc_list, hs_private, next); + hs_private->hc_private = NULL; hs_private->in_gc_queue = true; } else diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index 5d43c09..4da6603 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -1493,16 +1493,57 @@ void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by) return; } +int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) +{ + return 0; +} + int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { return 0; } +int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type, + void * value, size_t size) +{ + return 0; +} + +struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) +{ + return NULL; +} + +struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) +{ + return NULL; +} + +struct event_base * tfe_proxy_get_gc_evbase(void) +{ + return NULL; +} + void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) { return; } +void tfe_stream_detach(const struct tfe_stream * stream) +{ + return; +} + +int tfe_stream_preempt(const struct tfe_stream * stream) +{ + return 0; +} + +unsigned int tfe_proxy_get_work_thread_count() +{ + return 0; +} + const char * tfe_version() { return NULL;