初步完成HTTP应答侧解析功能,并修正一系类错误处理类的问题。

This commit is contained in:
Lu Qiuwen
2018-09-23 17:33:05 +08:00
parent 7b6dbb06aa
commit 2798783641
10 changed files with 223 additions and 49 deletions

View File

@@ -144,6 +144,7 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
__accept_para.session_type = __session_proto;
__accept_para.downstream_fd = __fds[0];
__accept_para.upstream_fd = __fds[1];
__accept_para.passthrough = false;
if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0)
{

View File

@@ -82,12 +82,22 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p
tfe_stream_option_set(stream, TFE_STREAM_OPT_SESSION_TYPE, &__session_type, sizeof(__session_type));
}
tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd);
TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted",
stream, para->downstream_fd, para->upstream_fd, para->session_type);
int ret = tfe_stream_init_by_fds(stream, para->downstream_fd, para->upstream_fd);
if (ret < 0)
{
TFE_LOG_ERROR(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accept failed.",
stream, para->downstream_fd, para->upstream_fd, para->session_type); goto __errout;
}
else
{
TFE_LOG_DEBUG(ctx->logger, "%p, Fds(downstream = %d, upstream = %d, type = %d) accepted.",
stream, para->downstream_fd, para->upstream_fd, para->session_type);
}
return 0;
__errout:
return -1;
}
void tfe_proxy_loopbreak(tfe_proxy * ctx)
@@ -238,10 +248,6 @@ int main(int argc, char *argv[])
g_default_proxy->evbase, g_default_logger, NULL);
CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit.");
/* MODULE INIT */
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
/* PLUGIN INIT */
unsigned int plugin_iterator = 0;
for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator);
@@ -269,6 +275,10 @@ int main(int argc, char *argv[])
CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid);
}
/* ACCEPTOR INIT */
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. ");
event_base_dispatch(g_default_proxy->evbase);

View File

@@ -660,6 +660,7 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user)
void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
return;
assert(0);
}
@@ -672,7 +673,7 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_
unsigned int total_plugin_count = tfe_plugin_total_counts();
_stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count);
return (struct tfe_stream *) &_stream->head;
return &_stream->head;
}
void __stream_access_log_write(struct tfe_stream_private * stream)
@@ -733,6 +734,7 @@ void tfe_stream_destory(struct tfe_stream_private * stream)
{
future_destroy(stream->future_upstream_create);
}
stream->proxy_ref = NULL;
free(stream);
thread->load--;
@@ -878,7 +880,7 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket
return;
}
void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
struct event_base * ev_base = _stream->thread_ref->evbase;
@@ -890,13 +892,14 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs
__stream_fd_option_setup(_stream, fd_upstream);
_stream->head.addr = __stream_addr_create_by_fds(stream, fd_downstream);
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
if (unlikely(_stream->head.addr == NULL))
{
assert(0);
TFE_LOG_ERROR(_stream->stream_logger, "Failed to fetch address for fd %d, %d, terminate fds.",
fd_downstream, fd_upstream); goto __errout;
}
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);
if (_stream->session_type == STREAM_PROTO_PLAIN)
{
_stream->conn_downstream = __conn_private_create_by_fd(_stream, fd_downstream);
@@ -921,7 +924,10 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs
_stream->ssl_mgr, fd_upstream, fd_downstream, ev_base);
}
return;
return 0;
__errout:
return -1;
}
int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg)
@@ -949,9 +955,10 @@ void tfe_stream_write_access_log(const struct tfe_stream * stream, int level, co
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
/* Format input content */
char __tmp_buffer[TFE_STRING_MAX];
vsnprintf(__tmp_buffer, sizeof(__tmp_buffer), fmt, arg_ptr);
char * __tmp_buffer;
vasprintf(&__tmp_buffer, fmt, arg_ptr);
/* Log content with stream tag */
MESA_handle_runtime_log(_stream->stream_logger, level, "S-DETAIL", "%s %s", _stream->str_stream_addr, __tmp_buffer);
MESA_handle_runtime_log(_stream->stream_logger, level, "access", "%s %s", _stream->str_stream_addr, __tmp_buffer);
free(__tmp_buffer);
}