Close #65 修正进入GC队列的HTTP Session写日志时触发的连接非法段错误

This commit is contained in:
Lu Qiuwen
2018-11-04 18:30:33 +08:00
parent adcd1640bf
commit 2d02343b50
7 changed files with 81 additions and 20 deletions

View File

@@ -37,7 +37,8 @@ struct tfe_conn
struct tfe_stream struct tfe_stream
{ {
struct tfe_stream_addr* addr; const char * str_stream_info;
struct tfe_stream_addr * addr;
enum tfe_stream_proto proto; enum tfe_stream_proto proto;
struct tfe_conn upstream; struct tfe_conn upstream;
struct tfe_conn downstream; struct tfe_conn downstream;

View File

@@ -397,6 +397,7 @@ struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id)
assert(thread_id < g_default_proxy->nr_work_threads); assert(thread_id < g_default_proxy->nr_work_threads);
return g_default_proxy->work_threads[thread_id]->evbase; return g_default_proxy->work_threads[thread_id]->evbase;
} }
struct event_base * tfe_proxy_get_gc_evbase(void) struct event_base * tfe_proxy_get_gc_evbase(void)
{ {
return g_default_proxy->evbase; return g_default_proxy->evbase;
@@ -406,6 +407,7 @@ screen_stat_handle_t tfe_proxy_get_fs_handle(void)
{ {
return g_default_proxy->fs_handle; return g_default_proxy->fs_handle;
} }
int tfe_proxy_ssl_add_trust_ca(const char* pem_file) int tfe_proxy_ssl_add_trust_ca(const char* pem_file)
{ {
return ssl_manager_add_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file); return ssl_manager_add_trust_ca(g_default_proxy->ssl_mgr_handler, pem_file);

View File

@@ -1050,6 +1050,8 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
} }
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
stream->str_stream_info = _stream->str_stream_addr;
if (_stream->session_type == STREAM_PROTO_PLAIN) if (_stream->session_type == STREAM_PROTO_PLAIN)
{ {
_stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream); _stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream);

View File

@@ -20,12 +20,11 @@ struct http_plugin
pthread_mutex_t lock_list_hs_private[TFE_THREAD_MAX]; pthread_mutex_t lock_list_hs_private[TFE_THREAD_MAX];
/* GC events */ /* GC events */
struct event * gc_event_hs_private[TFE_THREAD_MAX]; struct event * gc_event_hs_private[TFE_THREAD_MAX];
/* ACCESS LOGGER */
void * access_logger;
}; };
struct http_plugin_status extern struct http_plugin * g_http_plugin;
{
};
struct http_session_private struct http_session_private
{ {
@@ -33,6 +32,8 @@ struct http_session_private
struct tfe_http_session hs_public; struct tfe_http_session hs_public;
/* TAILQ POINTER */ /* TAILQ POINTER */
TAILQ_ENTRY(http_session_private) next; TAILQ_ENTRY(http_session_private) next;
/* HTTP CONNECTION ADDR */
char * str_stream_info;
/* REF, HTTP CONNECTION */ /* REF, HTTP CONNECTION */
struct http_connection_private * hc_private; struct http_connection_private * hc_private;
/* HTTP FRAME CTX */ /* HTTP FRAME CTX */

View File

@@ -89,6 +89,9 @@ int http_plugin_init(struct tfe_proxy * proxy)
plugin_ctx->gc_event_hs_private[thread_id] = gc_event; plugin_ctx->gc_event_hs_private[thread_id] = gc_event;
} }
plugin_ctx->access_logger = MESA_create_runtime_log_handle("log/http-access.log", RLOG_LV_INFO);
assert(plugin_ctx->access_logger != NULL);
return 0; return 0;
} }

View File

@@ -172,7 +172,7 @@ void __hf_public_req_fill_from_private(struct http_half_private * hf_private, st
/* accept-encoding, host is located in header's K-V structure */ /* accept-encoding, host is located in header's K-V structure */
hf_req_spec->method = (enum tfe_http_std_method) parser->method; hf_req_spec->method = (enum tfe_http_std_method) parser->method;
hf_private->method_or_status = (enum tfe_http_std_method) parser->method; hf_private->method_or_status = (enum tfe_http_std_method) parser->method;
const static struct http_field_name __host_field_name = {TFE_HTTP_HOST, NULL}; const static struct http_field_name __host_field_name = {TFE_HTTP_HOST, NULL};
hf_req_spec->host = (char *) tfe_http_field_read(hf_public, &__host_field_name); hf_req_spec->host = (char *) tfe_http_field_read(hf_public, &__host_field_name);
@@ -329,7 +329,11 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
char * __str_content_length = (char *) tfe_http_field_read(hf_public, &__cont_encoding_field_name); char * __str_content_length = (char *) tfe_http_field_read(hf_public, &__cont_encoding_field_name);
/* Does not contain a content-length, passthrough the whole TCP connection */ /* Does not contain a content-length, passthrough the whole TCP connection */
if (unlikely(__str_content_length == NULL)) { hf_private->is_passthrough = true; return -1;} if (unlikely(__str_content_length == NULL))
{
hf_private->is_passthrough = true;
return -1;
}
} }
tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR; tfe_http_event event = (hf_direction == TFE_HTTP_REQUEST) ? EV_HTTP_REQ_HDR : EV_HTTP_RESP_HDR;
@@ -346,8 +350,8 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
hf_private->stream_action = hf_private->user_stream_action; hf_private->stream_action = hf_private->user_stream_action;
} }
/* user's suspend tag is set, which indicate that the way to handle request/response /* user's suspend tag is set, which indicate that the way to handle request/response
* cannot be determinate at now, need to defer */ * cannot be determinate at now, need to defer */
else if (hs_private && hs_private->suspend_tag_signal) else if (hs_private && hs_private->suspend_tag_signal)
{ {
/* Pause parser, prevent to parse request/response body, /* Pause parser, prevent to parse request/response body,
@@ -362,7 +366,7 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
assert(hf_private->stream_action == ACTION_DEFER_DATA); assert(hf_private->stream_action == ACTION_DEFER_DATA);
} }
/* Otherwise, forward the request/response */ /* Otherwise, forward the request/response */
else else
{ {
hf_private->stream_action = ACTION_FORWARD_DATA; hf_private->stream_action = ACTION_FORWARD_DATA;
@@ -608,7 +612,7 @@ int hf_ops_body_begin(struct tfe_http_half * half, int by_stream)
{ {
/* By stream, we do not support content-encoding for now, /* By stream, we do not support content-encoding for now,
* send body directly without compression */ * send body directly without compression */
if(hf_private->cv_compress_object != NULL) if (hf_private->cv_compress_object != NULL)
{ {
hf_content_compress_destroy(hf_private->cv_compress_object); hf_content_compress_destroy(hf_private->cv_compress_object);
hf_private->cv_compress_object = NULL; hf_private->cv_compress_object = NULL;
@@ -676,12 +680,11 @@ int hf_ops_body_end(struct tfe_http_half * half)
hf_private->body_status = STATUS_COMPLETE; hf_private->body_status = STATUS_COMPLETE;
hf_private->message_status = STATUS_COMPLETE; hf_private->message_status = STATUS_COMPLETE;
if(hf_private->is_setup_by_stream) if (hf_private->is_setup_by_stream)
{ {
hs_private->release_lock--; hs_private->release_lock--;
} }
printf("frag write end, stream = %p, hf_private = %p\n", hf_private->session->hc_private->stream, hf_private);
return 0; return 0;
} }
@@ -879,7 +882,7 @@ void hs_ops_response_set(struct tfe_http_session * session, struct tfe_http_half
struct http_half_private * hf_in_resp_private = to_hf_response_private(hs_private); struct http_half_private * hf_in_resp_private = to_hf_response_private(hs_private);
/* Call at request's callback, early response */ /* Call at request's callback, early response */
if(hf_in_req_private != NULL && hf_in_resp_private == NULL) if (hf_in_req_private != NULL && hf_in_resp_private == NULL)
{ {
/* Drop the incoming request */ /* Drop the incoming request */
if (hf_in_req_private->stream_action == ACTION_DEFER_DATA) if (hf_in_req_private->stream_action == ACTION_DEFER_DATA)
@@ -1062,7 +1065,7 @@ void hf_private_construct(struct http_half_private * hf_private)
} }
struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id, struct http_session_private * hs_private_create(struct http_connection_private * hc_private, unsigned int thread_id,
struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp) struct http_half_private * hf_private_req, struct http_half_private * hf_private_resp)
{ {
struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1); struct http_session_private * __hs_private = ALLOC(struct http_session_private, 1);
__hs_private->thread_id = thread_id; __hs_private->thread_id = thread_id;
@@ -1075,6 +1078,7 @@ struct http_session_private * hs_private_create(struct http_connection_private *
/* HS-PRIVATE*/ /* HS-PRIVATE*/
__hs_private->hc_private = hc_private; __hs_private->hc_private = hc_private;
__hs_private->str_stream_info = tfe_strdup(hc_private->stream->str_stream_info);
return __hs_private; return __hs_private;
} }
@@ -1110,7 +1114,8 @@ void __write_access_log(struct http_session_private * hs_private)
/* Content Type */ /* Content Type */
const char * __str_cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-"; const char * __str_cont_type = resp_spec ? resp_spec->content_type != NULL ? resp_spec->content_type : "-" : "-";
/* Content Encoding */ /* Content Encoding */
const char * __str_cont_encoding = resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding: "-" : "-"; const char * __str_cont_encoding =
resp_spec ? resp_spec->content_encoding != NULL ? resp_spec->content_encoding : "-" : "-";
/* Upgrade Tag */ /* Upgrade Tag */
const char * __str_upgrade = response ? response->is_upgrade ? "UPGRADE" : "-" : "-"; const char * __str_upgrade = response ? response->is_upgrade ? "UPGRADE" : "-" : "-";
@@ -1126,12 +1131,12 @@ void __write_access_log(struct http_session_private * hs_private)
const char * __str_suspend = hs_private->suspend_counter > 0 ? "SUSPEND" : "-"; const char * __str_suspend = hs_private->suspend_counter > 0 ? "SUSPEND" : "-";
char * __access_log; char * __access_log;
asprintf(&__access_log, "%d %s %s HTTP/%d.%d %s %s %s %s %s %s %s %s %s", hs_private->hs_public.session_id, __str_method, asprintf(&__access_log, "%s %d %s %s HTTP/%d.%d %s %s %s %s %s %s %s %s %s", hs_private->str_stream_info,
__str_url, request->major, request->minor, __str_resp_code, __str_cont_type, __str_cont_encoding, hs_private->hs_public.session_id, __str_method, __str_url,
request->major, request->minor, __str_resp_code, __str_cont_type, __str_cont_encoding,
__str_upgrade, __str_req_passthrough, __str_resp_passthrough, __str_user_req, __str_user_resp, __str_suspend); __str_upgrade, __str_req_passthrough, __str_resp_passthrough, __str_user_req, __str_user_resp, __str_suspend);
const struct tfe_stream * stream = hs_private->hc_private->stream; TFE_LOG_INFO(g_http_plugin->access_logger, "%s", __access_log);
tfe_stream_write_access_log(stream, RLOG_LV_INFO, "%s", __access_log);
free(__access_log); free(__access_log);
} }
@@ -1152,6 +1157,11 @@ void hs_private_destroy(struct http_session_private * hs_private)
hf_private_destory(hf_resp); hf_private_destory(hf_resp);
} }
if (hs_private->str_stream_info)
{
free(hs_private->str_stream_info);
}
if (hs_private->hf_private_req_user) if (hs_private->hf_private_req_user)
{ {
assert(hs_private->hf_private_req_user->message_status == STATUS_COMPLETE); assert(hs_private->hf_private_req_user->message_status == STATUS_COMPLETE);
@@ -1173,6 +1183,7 @@ void hs_private_gc_destroy(struct http_session_private * hs_private, struct hs_p
if (hs_private->release_lock > 0) if (hs_private->release_lock > 0)
{ {
TAILQ_INSERT_TAIL(gc_list, hs_private, next); TAILQ_INSERT_TAIL(gc_list, hs_private, next);
hs_private->hc_private = NULL;
hs_private->in_gc_queue = true; hs_private->in_gc_queue = true;
} }
else else

View File

@@ -1493,16 +1493,57 @@ void tfe_stream_suspend(const struct tfe_stream * stream, enum tfe_conn_dir by)
return; return;
} }
int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size)
{
return 0;
}
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{ {
return 0; return 0;
} }
int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type,
void * value, size_t size)
{
return 0;
}
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
return NULL;
}
struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id)
{
return NULL;
}
struct event_base * tfe_proxy_get_gc_evbase(void)
{
return NULL;
}
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{ {
return; return;
} }
void tfe_stream_detach(const struct tfe_stream * stream)
{
return;
}
int tfe_stream_preempt(const struct tfe_stream * stream)
{
return 0;
}
unsigned int tfe_proxy_get_work_thread_count()
{
return 0;
}
const char * tfe_version() const char * tfe_version()
{ {
return NULL; return NULL;