增加Per Stream摘要日志功能,修正TCP上、下游连接不能联动关闭的问题。

* 增加Per Stream摘要日志功能,记录连接四元组、HTTP URL等关键信息,便于调试;
* 原实现在上游连接关闭时,不能关闭下游连接(反之亦然),现修正。
This commit is contained in:
Lu Qiuwen
2018-09-21 15:03:33 +08:00
parent 244c17fa2e
commit c0d1b9cf63
8 changed files with 240 additions and 68 deletions

View File

@@ -95,4 +95,7 @@ void tfe_stream_resume(struct promise * promise);
int tfe_stream_shutdown(const struct tfe_stream * stream); int tfe_stream_shutdown(const struct tfe_stream * stream);
int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir); int tfe_stream_shutdown_dir(const struct tfe_stream * stream, enum tfe_conn_dir dir);
/**
* @brief Write linear text for given stream
*/
void tfe_stream_write_log(const struct tfe_stream * stream, int level, const char * fmt, ...);

View File

@@ -56,6 +56,9 @@ struct tfe_conn_private
struct tfe_stream_private struct tfe_stream_private
{ {
struct tfe_stream head; struct tfe_stream head;
char * str_stream_addr;
void * stream_logger;
struct tfe_proxy * proxy_ref; struct tfe_proxy * proxy_ref;
struct tfe_thread_ctx * thread_ref; struct tfe_thread_ctx * thread_ref;

View File

@@ -61,7 +61,17 @@ static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private *
static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev)
{ {
return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM); if (_stream->conn_downstream && bev == _stream->conn_downstream->bev)
{
return CONN_DIR_DOWNSTREAM;
}
if(_stream->conn_upstream && bev == _stream->conn_upstream->bev)
{
return CONN_DIR_UPSTREAM;
}
assert(0);
} }
static inline bool __is_ssl(struct tfe_stream_private * _stream) static inline bool __is_ssl(struct tfe_stream_private * _stream)
@@ -194,6 +204,15 @@ static void __conn_private_destory(struct tfe_conn_private * conn)
free(conn); free(conn);
} }
static void __conn_private_destory_with_ssl(struct event_base * ev_base,
struct tfe_conn_private * conn, struct ssl_stream * ssl_stream)
{
if (ssl_stream == NULL) return __conn_private_destory(conn);
evutil_socket_t __to_closed_fd = __conn_private_release_fd(conn);
ssl_stream_free_and_close_fd(ssl_stream, ev_base, __to_closed_fd);
return __conn_private_destory(conn);
}
static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
@@ -313,15 +332,22 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
enum tfe_conn_dir dir = __bev_dir(_stream, bev); enum tfe_conn_dir dir = __bev_dir(_stream, bev);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
/* Peer connection is terminated, drain all data.
* This connection will be destoryed in __event_cb */
struct evbuffer * inbuf = bufferevent_get_input(bev);
if (peer_conn == NULL)
{
evbuffer_drain(inbuf, evbuffer_get_length(inbuf));
return;
}
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA; enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA;
enum tfe_stream_action action_final = ACTION_FORWARD_DATA; enum tfe_stream_action action_final = ACTION_FORWARD_DATA;
struct evbuffer * inbuf = bufferevent_get_input(bev);
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
size_t drain_size = 0; size_t drain_size = 0;
size_t contigous_len = evbuffer_get_length(inbuf); size_t contigous_len = evbuffer_get_length(inbuf);
const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len); unsigned char * contiguous_data = evbuffer_pullup(inbuf, contigous_len);
_stream->defer_bytes = 0; _stream->defer_bytes = 0;
_stream->drop_bytes = 0; _stream->drop_bytes = 0;
@@ -395,19 +421,13 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
break; break;
} }
if (evbuffer_get_length(inbuf) != 0) #if 0
{ if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); {
} bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT);
bufferevent_disable(bev, EV_READ);
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) }
{ #endif
bufferevent_setwatermark(peer_conn->bev, EV_WRITE,
TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT);
bufferevent_disable(bev, EV_READ);
}
return;
} }
/* /*
@@ -418,18 +438,42 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
static void __stream_bev_writecb(struct bufferevent * bev, void * arg) static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __bev_dir(_stream, bev); struct event_base * ev_base = bufferevent_get_base(bev);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
// struct evbuffer * outbuf = bufferevent_get_output(bev); struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{};
struct ssl_stream ** ref_this_ssl_stream{};
if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ)) if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{ {
/* data source temporarily disabled; ref_this_conn = &_stream->conn_upstream;
* re-enable and reset watermark to 0. */ ref_peer_conn = &_stream->conn_downstream;
bufferevent_setwatermark(bev, EV_WRITE, 0, 0); ref_this_ssl_stream = &_stream->ssl_downstream;
bufferevent_enable(peer_conn->bev, EV_READ);
} }
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
ref_this_ssl_stream = &_stream->ssl_downstream;
}
struct evbuffer * __output_buffer = bufferevent_get_output(bev);
assert(__output_buffer != NULL);
if (*ref_peer_conn == NULL && evbuffer_get_length(__output_buffer) == 0)
{
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);
*ref_this_conn = NULL;
*ref_this_ssl_stream = NULL;
}
if (*ref_peer_conn == NULL && *ref_this_conn == NULL)
{
tfe_stream_destory(_stream);
}
return;
} }
/* /*
@@ -439,50 +483,90 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg) static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __bev_dir(_stream, bev); struct event_base * ev_base = bufferevent_get_base(bev);
struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
const struct tfe_plugin * plugins = _stream->thread_ref->modules; struct tfe_conn_private ** ref_this_conn{};
struct plugin_ctx * plug_ctx = NULL; struct tfe_conn_private ** ref_peer_conn{};
int plug_num = _stream->thread_ref->nr_modules, i = 0; struct ssl_stream ** ref_this_ssl_stream{};
enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED; struct ssl_stream ** ref_peer_ssl_stream{};
if (events & BEV_EVENT_ERROR) if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{ {
this_conn->closed = 1; ref_this_conn = &_stream->conn_upstream;
reason = REASON_ERROR; ref_peer_conn = &_stream->conn_downstream;
if (__is_ssl(_stream)) ref_this_ssl_stream = &_stream->ssl_upstream;
ref_peer_ssl_stream = &_stream->ssl_downstream;
}
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{
ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream;
ref_this_ssl_stream = &_stream->ssl_downstream;
ref_peer_ssl_stream = &_stream->ssl_upstream;
}
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
{
if (evbuffer_get_length(bufferevent_get_input(bev)))
{ {
ssl_stream_log_error(bev, dir, __STREAM_LOGGER(_stream)); __stream_bev_readcb(bev, arg);
} }
goto call_plugin_close;
goto __close_connection;
} }
if (events & BEV_EVENT_EOF)
{
//generate a 0 size read callback to notify plugins.
__stream_bev_readcb(bev, arg);
this_conn->closed = 1;
}
if (peer_conn->closed == 1 && this_conn->closed == 1)
{
reason = REASON_PASSIVE_CLOSED;
goto call_plugin_close;
}
return; return;
call_plugin_close: __close_connection:
for (i = 0; i < plug_num; i++) if (*ref_peer_conn != NULL)
{ {
_stream->calling_idx = i; struct bufferevent * __peer_bev = (*ref_peer_conn)->bev;
plug_ctx = _stream->plugin_ctxs + i; struct evbuffer * __peer_output_buffer = bufferevent_get_output(__peer_bev);
plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme));
if (evbuffer_get_length(__peer_output_buffer) == 0)
{
__conn_private_destory_with_ssl(ev_base, *ref_peer_conn, *ref_peer_ssl_stream);
*ref_peer_conn = NULL;
*ref_peer_ssl_stream = NULL;
}
}
if (*ref_this_conn != NULL)
{
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);
*ref_this_conn = NULL;
*ref_this_ssl_stream = NULL;
}
if (*ref_this_conn == NULL && *ref_peer_conn == NULL)
{
goto __call_plugin_close;
}
return;
__call_plugin_close:
unsigned int plugin_id_iter = 0;
unsigned int plugin_id = 0;
for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter);
p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter))
{
_stream->calling_idx = plugin_id;
struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id];
//TODO: do not use pme to determinate we call on_open or not ever.
if (p_info_iter->on_close && plug_ctx->pme != NULL)
{
p_info_iter->on_close(&_stream->head, _stream->thread_ref->thread_id,
REASON_PASSIVE_CLOSED, &(plug_ctx->pme));
}
plugin_id++;
} }
tfe_stream_destory(_stream); tfe_stream_destory(_stream);
return;
} }
static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd) static tfe_conn_private * __conn_private_create_by_fd(struct tfe_stream_private * stream, evutil_socket_t fd)
@@ -583,17 +667,30 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_
struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1);
_stream->thread_ref = thread_ctx; _stream->thread_ref = thread_ctx;
_stream->proxy_ref = pxy; _stream->proxy_ref = pxy;
_stream->stream_logger = pxy->logger;
unsigned int total_plugin_count = tfe_plugin_total_counts(); unsigned int total_plugin_count = tfe_plugin_total_counts();
_stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count); _stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count);
return (struct tfe_stream *) &_stream->head; return (struct tfe_stream *) &_stream->head;
} }
void __stream_access_log_write(struct tfe_stream_private * stream)
{
MESA_handle_runtime_log(stream->stream_logger, RLOG_LV_INFO, "access", "%s", stream->str_stream_addr);
}
void tfe_stream_destory(struct tfe_stream_private * stream) void tfe_stream_destory(struct tfe_stream_private * stream)
{ {
struct tfe_thread_ctx * thread = stream->thread_ref; struct tfe_thread_ctx * thread = stream->thread_ref;
struct event_base * ev_base = thread->evbase; struct event_base * ev_base = thread->evbase;
__stream_access_log_write(stream);
if (stream->str_stream_addr)
{
free(stream->str_stream_addr);
}
if (__is_ssl(stream) && stream->ssl_upstream) if (__is_ssl(stream) && stream->ssl_upstream)
{ {
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream);
@@ -673,8 +770,8 @@ static struct tfe_stream_addr * __stream_addr_create_by_fds(struct tfe_stream *
sizeof(struct tfe_stream_addr) + sizeof(struct tfe_stream_addr_tuple4_v4)); sizeof(struct tfe_stream_addr) + sizeof(struct tfe_stream_addr_tuple4_v4));
struct tfe_stream_addr_ipv4 * st_addr_v4 = __stream_addr->ipv4; struct tfe_stream_addr_ipv4 * st_addr_v4 = __stream_addr->ipv4;
struct sockaddr_in * sk_v4_src_ptr = (struct sockaddr_in *)sk_src_ptr; struct sockaddr_in * sk_v4_src_ptr = (struct sockaddr_in *) sk_src_ptr;
struct sockaddr_in * sk_v4_dst_ptr = (struct sockaddr_in *)sk_dst_ptr; struct sockaddr_in * sk_v4_dst_ptr = (struct sockaddr_in *) sk_dst_ptr;
__stream_addr->addrtype = TFE_ADDR_STREAM_TUPLE4_V4; __stream_addr->addrtype = TFE_ADDR_STREAM_TUPLE4_V4;
__stream_addr->addrlen = sizeof(struct tfe_stream_addr_tuple4_v4); __stream_addr->addrlen = sizeof(struct tfe_stream_addr_tuple4_v4);
@@ -691,7 +788,8 @@ static struct tfe_stream_addr * __stream_addr_create_by_fds(struct tfe_stream *
else else
{ {
TFE_STREAM_LOG_ERROR(stream, "Invalid sockaddr family for fd %d: sa_family is %d.", TFE_STREAM_LOG_ERROR(stream, "Invalid sockaddr family for fd %d: sa_family is %d.",
fd_downstream, sk_src_ptr->sa_family); goto __errout; fd_downstream, sk_src_ptr->sa_family);
goto __errout;
} }
return __stream_addr; return __stream_addr;
@@ -713,7 +811,9 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs
evutil_make_socket_nonblocking(fd_upstream); evutil_make_socket_nonblocking(fd_upstream);
_stream->head.addr = __stream_addr_create_by_fds(stream, fd_downstream); _stream->head.addr = __stream_addr_create_by_fds(stream, fd_downstream);
if(unlikely(_stream->head.addr == NULL)) _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
if (unlikely(_stream->head.addr == NULL))
{ {
assert(0); assert(0);
} }
@@ -762,3 +862,17 @@ int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt
return 0; return 0;
} }
void tfe_stream_write_log(const struct tfe_stream * stream, int level, const char * fmt, ...)
{
va_list arg_ptr;
va_start(arg_ptr, fmt);
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
/* Format input content */
char __tmp_buffer[TFE_STRING_MAX];
vsnprintf(__tmp_buffer, sizeof(__tmp_buffer), fmt, arg_ptr);
/* Log content with stream tag */
MESA_handle_runtime_log(_stream->stream_logger, level, "S-DETAIL", "%s %s", _stream->str_stream_addr, __tmp_buffer);
}

View File

@@ -34,10 +34,8 @@ struct http_session_private
struct http_connection_private struct http_connection_private
{ {
/* ADDRESS */
struct tfe_stream_addr * layer_addr;
/* STREAM */ /* STREAM */
struct tfe_stream * stream; const struct tfe_stream * stream;
/* SESSION LIST, REQUEST-RESPONSE PAIRS */ /* SESSION LIST, REQUEST-RESPONSE PAIRS */
struct hs_private_list hs_private_list; struct hs_private_list hs_private_list;
/* IS PREEMPTED */ /* IS PREEMPTED */

View File

@@ -92,3 +92,5 @@ void hf_private_set_session(struct http_half_private * hf_private, struct http_s
struct http_session_private * hs_private_create(struct http_connection_private * hc_private, struct http_session_private * hs_private_create(struct http_connection_private * hc_private,
struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp); struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp);
void hs_private_destory(struct http_session_private * hs_private);

View File

@@ -28,6 +28,7 @@ int http_connection_entry_open(const struct tfe_stream * stream, unsigned int th
{ {
struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1); struct http_connection_private * ht_conn = ALLOC(struct http_connection_private, 1);
TAILQ_INIT(&ht_conn->hs_private_list); TAILQ_INIT(&ht_conn->hs_private_list);
ht_conn->stream = stream;
*pme = (void *)ht_conn; *pme = (void *)ht_conn;
return 0; return 0;
} }
@@ -89,12 +90,18 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next); TAILQ_INSERT_TAIL(&hc_private->hs_private_list, hs_private, next);
} }
/* proceed parse content for last request */ /* Parse the content, the data which in defered state has been ignored. */
ret = hf_private_parse(hf_private_request, data, len); ret = hf_private_parse(hf_private_request, data, len);
/* Need more data, no boundary touched */ /* Need more data, no boundary touched */
if (ret == 0) if (ret == 0)
{ {
if (hf_private_request->stream_action == ACTION_DROP_DATA ||
hf_private_request->stream_action == ACTION_FORWARD_DATA)
{
hf_private_request->parse_cursor = 0;
}
return hf_private_request->stream_action; return hf_private_request->stream_action;
} }
@@ -185,8 +192,19 @@ __detach:
void http_connection_entry_close(const struct tfe_stream * stream, unsigned int thread_id, void http_connection_entry_close(const struct tfe_stream * stream, unsigned int thread_id,
enum tfe_stream_close_reason reason, void ** pme) enum tfe_stream_close_reason reason, void ** pme)
{ {
struct http_connection_private * __ht_conn = (struct http_connection_private *)(*pme); struct http_connection_private * ht_conn = (struct http_connection_private *)(*pme);
free(__ht_conn);
/* Delete all live sessions */
struct http_session_private * hs_private_iter = NULL;
TAILQ_FOREACH(hs_private_iter, &ht_conn->hs_private_list, next)
{
TAILQ_REMOVE(&ht_conn->hs_private_list, hs_private_iter, next);
hs_private_destory(hs_private_iter);
}
/* Clear session counter, and free ht_conn structure */
ht_conn->session_id_counter = 0;
free(ht_conn); *pme = NULL;
} }
static struct tfe_plugin __http_plugin_info = static struct tfe_plugin __http_plugin_info =

View File

@@ -560,7 +560,36 @@ struct http_session_private * hs_private_create(struct http_connection_private *
return __hs_private; return __hs_private;
} }
void __write_access_log(struct http_session_private * hs_private)
{
/* Prepare to write session access log */
char __access_log[TFE_STRING_MAX];
size_t __offset = 0;
/* Request */
struct http_half_private * request = to_hf_request_private(hs_private);
/* Response */
struct http_half_private * response = to_hf_request_private(hs_private);
/* Req-Public */
struct tfe_http_req_spec * req_spec = request ? &to_hf_public(request)->req_spec : NULL;
/* Resp-Public */
struct tfe_http_resp_spec * resp_spec = response ? &to_hf_public(response)->resp_spec : NULL;
/* Method */
const char * __str_method = req_spec ? http_std_method_to_string(req_spec->method) : "-";
__offset += snprintf(__access_log + __offset, sizeof(__access_log) - __offset, "%s ", __str_method);
/* URL */
const char * __str_url = req_spec ? req_spec->url : "-";
__offset += snprintf(__access_log + __offset, sizeof(__access_log) - __offset, "%s ", __str_url);
const struct tfe_stream * stream = hs_private->hc_private->stream;
tfe_stream_write_log(stream, RLOG_LV_INFO, "%s", __access_log);
(void)resp_spec;
}
void hs_private_destory(struct http_session_private * hs_private) void hs_private_destory(struct http_session_private * hs_private)
{ {
__write_access_log(hs_private);
free(hs_private); free(hs_private);
} }

View File

@@ -444,6 +444,11 @@ TEST(HttpHalfConstructCallback, PostWithBody)
EXPECT_TRUE(ctx->req_end_called); EXPECT_TRUE(ctx->req_end_called);
} }
void tfe_stream_write_log(const struct tfe_stream * stream, int level, const char * fmt, ...)
{
return;
}
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);