diff --git a/common/include/tfe_stream.h b/common/include/tfe_stream.h index 6878f72..f8b6abf 100644 --- a/common/include/tfe_stream.h +++ b/common/include/tfe_stream.h @@ -95,4 +95,7 @@ void tfe_stream_resume(struct promise * promise); int tfe_stream_shutdown(const struct tfe_stream * stream); int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir); - +/** + * @brief Write linear text for given stream + */ +void tfe_stream_write_log(const struct tfe_stream * stream, int level, const char * fmt, ...); diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 60581ed..fed6137 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -56,6 +56,9 @@ struct tfe_conn_private struct tfe_stream_private { struct tfe_stream head; + char * str_stream_addr; + void * stream_logger; + struct tfe_proxy * proxy_ref; struct tfe_thread_ctx * thread_ref; diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 0b64a2d..c8337b9 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -61,7 +61,17 @@ static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) { - return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM); + if (_stream->conn_downstream && bev == _stream->conn_downstream->bev) + { + return CONN_DIR_DOWNSTREAM; + } + + if(_stream->conn_upstream && bev == _stream->conn_upstream->bev) + { + return CONN_DIR_UPSTREAM; + } + + assert(0); } static inline bool __is_ssl(struct tfe_stream_private * _stream) @@ -194,6 +204,15 @@ static void __conn_private_destory(struct tfe_conn_private * conn) free(conn); } +static void __conn_private_destory_with_ssl(struct event_base * ev_base, + struct tfe_conn_private * conn, struct ssl_stream * ssl_stream) +{ + if (ssl_stream == NULL) return __conn_private_destory(conn); + evutil_socket_t __to_closed_fd = __conn_private_release_fd(conn); + ssl_stream_free_and_close_fd(ssl_stream, ev_base, __to_closed_fd); + return __conn_private_destory(conn); +} + static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; @@ -313,15 +332,22 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) enum tfe_conn_dir dir = __bev_dir(_stream, bev); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); + /* Peer connection is terminated, drain all data. + * This connection will be destoryed in __event_cb */ + struct evbuffer * inbuf = bufferevent_get_input(bev); + if (peer_conn == NULL) + { + evbuffer_drain(inbuf, evbuffer_get_length(inbuf)); + return; + } + + struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA; enum tfe_stream_action action_final = ACTION_FORWARD_DATA; - struct evbuffer * inbuf = bufferevent_get_input(bev); - struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); - size_t drain_size = 0; size_t contigous_len = evbuffer_get_length(inbuf); - const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len); + unsigned char * contiguous_data = evbuffer_pullup(inbuf, contigous_len); _stream->defer_bytes = 0; _stream->drop_bytes = 0; @@ -395,19 +421,13 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) break; } - if (evbuffer_get_length(inbuf) != 0) - { - bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); - } - - if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) - { - bufferevent_setwatermark(peer_conn->bev, EV_WRITE, - TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); - bufferevent_disable(bev, EV_READ); - } - - return; +#if 0 + if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) + { + bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); + bufferevent_disable(bev, EV_READ); + } +#endif } /* @@ -418,18 +438,42 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) static void __stream_bev_writecb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __bev_dir(_stream, bev); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); + struct event_base * ev_base = bufferevent_get_base(bev); -// struct evbuffer * outbuf = bufferevent_get_output(bev); + struct tfe_conn_private ** ref_this_conn{}; + struct tfe_conn_private ** ref_peer_conn{}; + struct ssl_stream ** ref_this_ssl_stream{}; - if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ)) + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { - /* data source temporarily disabled; - * re-enable and reset watermark to 0. */ - bufferevent_setwatermark(bev, EV_WRITE, 0, 0); - bufferevent_enable(peer_conn->bev, EV_READ); + ref_this_conn = &_stream->conn_upstream; + ref_peer_conn = &_stream->conn_downstream; + ref_this_ssl_stream = &_stream->ssl_downstream; } + + if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) + { + ref_this_conn = &_stream->conn_downstream; + ref_peer_conn = &_stream->conn_upstream; + ref_this_ssl_stream = &_stream->ssl_downstream; + } + + struct evbuffer * __output_buffer = bufferevent_get_output(bev); + assert(__output_buffer != NULL); + + if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0) + { + __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); + *ref_this_conn = NULL; + *ref_this_ssl_stream = NULL; + } + + if (*ref_peer_conn == NULL && *ref_this_conn == NULL) + { + tfe_stream_destory(_stream); + } + + return; } /* @@ -439,50 +483,90 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __bev_dir(_stream, bev); - struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); + struct event_base * ev_base = bufferevent_get_base(bev); - const struct tfe_plugin * plugins = _stream->thread_ref->modules; - struct plugin_ctx * plug_ctx = NULL; - int plug_num = _stream->thread_ref->nr_modules, i = 0; - enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED; + struct tfe_conn_private ** ref_this_conn{}; + struct tfe_conn_private ** ref_peer_conn{}; + struct ssl_stream ** ref_this_ssl_stream{}; + struct ssl_stream ** ref_peer_ssl_stream{}; - if (events & BEV_EVENT_ERROR) + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { - this_conn->closed = 1; - reason = REASON_ERROR; - if (__is_ssl(_stream)) + ref_this_conn = &_stream->conn_upstream; + ref_peer_conn = &_stream->conn_downstream; + ref_this_ssl_stream = &_stream->ssl_upstream; + ref_peer_ssl_stream = &_stream->ssl_downstream; + } + + if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) + { + ref_this_conn = &_stream->conn_downstream; + ref_peer_conn = &_stream->conn_upstream; + ref_this_ssl_stream = &_stream->ssl_downstream; + ref_peer_ssl_stream = &_stream->ssl_upstream; + } + + if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) + { + if (evbuffer_get_length(bufferevent_get_input(bev))) { - ssl_stream_log_error(bev, dir, __STREAM_LOGGER(_stream)); + __stream_bev_readcb(bev, arg); } - goto call_plugin_close; + + goto __close_connection; } - if (events & BEV_EVENT_EOF) - { - //generate a 0 size read callback to notify plugins. - __stream_bev_readcb(bev, arg); - this_conn->closed = 1; - } - - if (peer_conn->closed == 1 && this_conn->closed == 1) - { - reason = REASON_PASSIVE_CLOSED; - goto call_plugin_close; - } return; -call_plugin_close: - for (i = 0; i < plug_num; i++) +__close_connection: + if (*ref_peer_conn != NULL) { - _stream->calling_idx = i; - plug_ctx = _stream->plugin_ctxs + i; - plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme)); + struct bufferevent * __peer_bev = (*ref_peer_conn)->bev; + struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev); + + if (evbuffer_get_length(__peer_output_buffer) == 0) + { + __conn_private_destory_with_ssl(ev_base, *ref_peer_conn, *ref_peer_ssl_stream); + *ref_peer_conn = NULL; + *ref_peer_ssl_stream = NULL; + } + } + + if (*ref_this_conn != NULL) + { + __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); + *ref_this_conn = NULL; + *ref_this_ssl_stream = NULL; + } + + if (*ref_this_conn == NULL && *ref_peer_conn == NULL) + { + goto __call_plugin_close; + } + + return; + +__call_plugin_close: + unsigned int plugin_id_iter = 0; + unsigned int plugin_id = 0; + + for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter); + p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter)) + { + _stream->calling_idx = plugin_id; + struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id]; + + //TODO: do not use pme to determinate we call on_open or not ever. + if (p_info_iter->on_close && plug_ctx->pme != NULL) + { + p_info_iter->on_close(&_stream->head, _stream->thread_ref->thread_id, + REASON_PASSIVE_CLOSED, &(plug_ctx->pme)); + } + + plugin_id++; } tfe_stream_destory(_stream); - return; } static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd) @@ -583,17 +667,30 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); _stream->thread_ref = thread_ctx; _stream->proxy_ref = pxy; + _stream->stream_logger = pxy->logger; unsigned int total_plugin_count = tfe_plugin_total_counts(); _stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count); return (struct tfe_stream *) &_stream->head; } +void __stream_access_log_write(struct tfe_stream_private * stream) +{ + MESA_handle_runtime_log(stream->stream_logger, RLOG_LV_INFO, "access", "%s", stream->str_stream_addr); +} + void tfe_stream_destory(struct tfe_stream_private * stream) { struct tfe_thread_ctx * thread = stream->thread_ref; struct event_base * ev_base = thread->evbase; + __stream_access_log_write(stream); + + if (stream->str_stream_addr) + { + free(stream->str_stream_addr); + } + if (__is_ssl(stream) && stream->ssl_upstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); @@ -673,8 +770,8 @@ static struct tfe_stream_addr * __stream_addr_create_by_fds(struct tfe_stream * sizeof(struct tfe_stream_addr) + sizeof(struct tfe_stream_addr_tuple4_v4)); struct tfe_stream_addr_ipv4 * st_addr_v4 = __stream_addr->ipv4; - struct sockaddr_in * sk_v4_src_ptr = (struct sockaddr_in *)sk_src_ptr; - struct sockaddr_in * sk_v4_dst_ptr = (struct sockaddr_in *)sk_dst_ptr; + struct sockaddr_in * sk_v4_src_ptr = (struct sockaddr_in *) sk_src_ptr; + struct sockaddr_in * sk_v4_dst_ptr = (struct sockaddr_in *) sk_dst_ptr; __stream_addr->addrtype = TFE_ADDR_STREAM_TUPLE4_V4; __stream_addr->addrlen = sizeof(struct tfe_stream_addr_tuple4_v4); @@ -691,7 +788,8 @@ static struct tfe_stream_addr * __stream_addr_create_by_fds(struct tfe_stream * else { TFE_STREAM_LOG_ERROR(stream, "Invalid sockaddr family for fd %d: sa_family is %d.", - fd_downstream, sk_src_ptr->sa_family); goto __errout; + fd_downstream, sk_src_ptr->sa_family); + goto __errout; } return __stream_addr; @@ -713,7 +811,9 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs evutil_make_socket_nonblocking(fd_upstream); _stream->head.addr = __stream_addr_create_by_fds(stream, fd_downstream); - if(unlikely(_stream->head.addr == NULL)) + _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); + + if (unlikely(_stream->head.addr == NULL)) { assert(0); } @@ -762,3 +862,17 @@ int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt return 0; } + +void tfe_stream_write_log(const struct tfe_stream * stream, int level, const char * fmt, ...) +{ + va_list arg_ptr; + va_start(arg_ptr, fmt); + struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); + + /* Format input content */ + char __tmp_buffer[TFE_STRING_MAX]; + vsnprintf(__tmp_buffer, sizeof(__tmp_buffer), fmt, arg_ptr); + + /* Log content with stream tag */ + MESA_handle_runtime_log(_stream->stream_logger, level, "S-DETAIL", "%s %s", _stream->str_stream_addr, __tmp_buffer); +} diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index 1f4cfde..1cd66db 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -34,10 +34,8 @@ struct http_session_private struct http_connection_private { - /* ADDRESS */ - struct tfe_stream_addr * layer_addr; /* STREAM */ - struct tfe_stream * stream; + const struct tfe_stream * stream; /* SESSION LIST, REQUEST-RESPONSE PAIRS */ struct hs_private_list hs_private_list; /* IS PREEMPTED */ diff --git a/plugin/protocol/http/include/internal/http_half.h b/plugin/protocol/http/include/internal/http_half.h index ac88810..46e0af8 100644 --- a/plugin/protocol/http/include/internal/http_half.h +++ b/plugin/protocol/http/include/internal/http_half.h @@ -92,3 +92,5 @@ void hf_private_set_session(struct http_half_private * hf_private, struct http_s struct http_session_private * hs_private_create(struct http_connection_private * hc_private, struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp); + +void hs_private_destory(struct http_session_private * hs_private); diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index c0ca9f4..c5769b0 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -28,6 +28,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th { struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1); TAILQ_INIT(&ht_conn->hs_private_list); + ht_conn->stream = stream; *pme = (void *)ht_conn; return 0; } @@ -89,12 +90,18 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); } - /* proceed parse content for last request */ + /* Parse the content, the data which in defered state has been ignored. */ ret = hf_private_parse(hf_private_request, data, len); /* Need more data, no boundary touched */ if (ret == 0) { + if (hf_private_request->stream_action == ACTION_DROP_DATA || + hf_private_request->stream_action == ACTION_FORWARD_DATA) + { + hf_private_request->parse_cursor = 0; + } + return hf_private_request->stream_action; } @@ -185,8 +192,19 @@ __detach: void http_connection_entry_close(const struct tfe_stream * stream, unsigned int thread_id, enum tfe_stream_close_reason reason, void ** pme) { - struct http_connection_private * __ht_conn = (struct http_connection_private *)(*pme); - free(__ht_conn); + struct http_connection_private * ht_conn = (struct http_connection_private *)(*pme); + + /* Delete all live sessions */ + struct http_session_private * hs_private_iter = NULL; + TAILQ_FOREACH(hs_private_iter, &ht_conn->hs_private_list, next) + { + TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next); + hs_private_destory(hs_private_iter); + } + + /* Clear session counter, and free ht_conn structure */ + ht_conn->session_id_counter = 0; + free(ht_conn); *pme = NULL; } static struct tfe_plugin __http_plugin_info = diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 949b2fe..d1ba418 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -560,7 +560,36 @@ struct http_session_private * hs_private_create(struct http_connection_private * return __hs_private; } +void __write_access_log(struct http_session_private * hs_private) +{ + /* Prepare to write session access log */ + char __access_log[TFE_STRING_MAX]; + size_t __offset = 0; + + /* Request */ + struct http_half_private * request = to_hf_request_private(hs_private); + /* Response */ + struct http_half_private * response = to_hf_request_private(hs_private); + /* Req-Public */ + struct tfe_http_req_spec * req_spec = request ? &to_hf_public(request)->req_spec : NULL; + /* Resp-Public */ + struct tfe_http_resp_spec * resp_spec = response ? &to_hf_public(response)->resp_spec : NULL; + + /* Method */ + const char * __str_method = req_spec ? http_std_method_to_string(req_spec->method) : "-"; + __offset += snprintf(__access_log + __offset, sizeof(__access_log) - __offset, "%s ", __str_method); + + /* URL */ + const char * __str_url = req_spec ? req_spec->url : "-"; + __offset += snprintf(__access_log + __offset, sizeof(__access_log) - __offset, "%s ", __str_url); + + const struct tfe_stream * stream = hs_private->hc_private->stream; + tfe_stream_write_log(stream, RLOG_LV_INFO, "%s", __access_log); + (void)resp_spec; +} + void hs_private_destory(struct http_session_private * hs_private) { + __write_access_log(hs_private); free(hs_private); } diff --git a/plugin/protocol/http/test/test_http_half.cpp b/plugin/protocol/http/test/test_http_half.cpp index dd991f0..3ce9f94 100644 --- a/plugin/protocol/http/test/test_http_half.cpp +++ b/plugin/protocol/http/test/test_http_half.cpp @@ -444,6 +444,11 @@ TEST(HttpHalfConstructCallback, PostWithBody) EXPECT_TRUE(ctx->req_end_called); } +void tfe_stream_write_log(const struct tfe_stream * stream, int level, const char * fmt, ...) +{ + return; +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv);