整理stream处理流程,适应KNI接口定义。

This commit is contained in:
Lu Qiuwen
2018-08-23 11:23:05 +08:00
committed by Lu
parent 7cccc70b53
commit 405f046b22
10 changed files with 155 additions and 93 deletions

View File

@@ -1,3 +1,4 @@
#pragma once
enum e_future_error enum e_future_error
{ {

View File

@@ -44,7 +44,6 @@ struct tfe_conn
struct tfe_stream struct tfe_stream
{ {
struct tfe_conn upstream; struct tfe_conn upstream;
struct tfe_conn downstream; struct tfe_conn downstream;
}; };

View File

@@ -1,2 +1,7 @@
void* kni_init(const char *unix_domain_path, struct event_base *attach); #pragma once
struct tfe_proxy;
struct kni_acceptor_ctx;
struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger);
void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx);

View File

@@ -0,0 +1,19 @@
#pragma once
#include <tfe_stream.h>
#include <event2/event.h>
struct tfe_proxy;
struct tfe_proxy_accept_para
{
/* Both upstream and downstream FDs */
evutil_socket_t upstream_fd;
evutil_socket_t downstream_fd;
/* Session Type */
enum tfe_session_proto session_type;
};
struct tfe_proxy * tfe_proxy_new(const char * profile);
int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para);
void tfe_proxy_run(struct tfe_proxy * proxy);

View File

@@ -1,5 +1,7 @@
#pragma once #pragma once
#include "tfe_future.h"
#include <event2/event.h>
#include <tfe_future.h>
struct ssl_client_hello struct ssl_client_hello
{ {
@@ -7,11 +9,14 @@ struct ssl_client_hello
char* sni; char* sni;
char* cipher_suites; char* cipher_suites;
}; };
struct ssl_client_hello* ssl_get_peek_result(future_result_t* result); struct ssl_client_hello* ssl_get_peek_result(future_result_t* result);
void ssl_free_peek_result(struct ssl_client_hello* client_hello); void ssl_free_peek_result(struct ssl_client_hello* client_hello);
void ssl_async_peek_client_hello(struct future* future, evutil_socket_t fd, struct event_base *evbase); void ssl_async_peek_client_hello(struct future* future, evutil_socket_t fd, struct event_base *evbase);
void ssl_async_connect_origin(struct future* future, const struct ssl_client_hello* client_hello, evutil_socket_t fd, const char* sni, struct event_base *evbase); void ssl_async_connect_origin(struct future* future, const struct ssl_client_hello* client_hello,
evutil_socket_t fd, const char* sni, struct event_base *evbase);
struct ssl_downstream * ssl_downstream_create(); struct ssl_downstream * ssl_downstream_create();
void ssl_upstream_free(struct ssl_upstream * p); void ssl_upstream_free(struct ssl_upstream * p);

View File

@@ -78,6 +78,8 @@ struct tfe_conn_private
struct tfe_stream_private struct tfe_stream_private
{ {
struct tfe_stream head; struct tfe_stream head;
struct tfe_proxy *proxy;
enum tfe_session_proto session_type; enum tfe_session_proto session_type;
struct tfe_conn_private conn_upstream; struct tfe_conn_private conn_upstream;
struct tfe_conn_private conn_downstream; struct tfe_conn_private conn_downstream;
@@ -96,11 +98,14 @@ struct tfe_stream_private
uint8_t is_plugin_opened; uint8_t is_plugin_opened;
int calling_idx; int calling_idx;
size_t forward_bytes; size_t forward_bytes;
size_t defere_bytes; size_t defere_bytes;
size_t drop_bytes; size_t drop_bytes;
enum tfe_app_proto app_proto; enum tfe_app_proto app_proto;
int plugin_num; int plugin_num;
struct plugin_ctx * plug_ctx; struct plugin_ctx * plug_ctx;
unsigned char passthrough; /* 1 if SSL passthrough is active */ unsigned char passthrough; /* 1 if SSL passthrough is active */

View File

@@ -12,6 +12,8 @@
#include <event2/listener.h> #include <event2/listener.h>
#include <pthread.h> #include <pthread.h>
#include <assert.h> #include <assert.h>
#include <proxy.h>
#include <kni.h>
#ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT #ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT
#define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler" #define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler"
@@ -38,6 +40,7 @@ struct kni_tlv_info
struct kni_acceptor_ctx struct kni_acceptor_ctx
{ {
/* INPUT */ /* INPUT */
struct tfe_proxy * proxy;
const char * profile; const char * profile;
void * logger; void * logger;
@@ -68,7 +71,8 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
{ {
struct kni_acceptor_ctx * __ctx = (struct kni_acceptor_ctx *)user; struct kni_acceptor_ctx * __ctx = (struct kni_acceptor_ctx *)user;
struct cmsghdr * __cmsghdr; struct cmsghdr * __cmsghdr;
int * __fds; struct tfe_proxy_accept_para __accept_para;
int * __fds = NULL;
assert(__ctx != NULL && __ctx->thread == pthread_self()); assert(__ctx != NULL && __ctx->thread == pthread_self());
assert(what & EV_READ); assert(what & EV_READ);
@@ -134,7 +138,15 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
goto __close_kni_connection; goto __close_kni_connection;
} }
/* Call Proxy's Callback */ __accept_para.session_type = __session_proto;
__accept_para.downstream_fd = __fds[0];
__accept_para.upstream_fd = __fds[1];
if (tfe_proxy_fds_accept(__ctx->proxy, &__accept_para) < 0)
{
goto __drop_recieved_fds;
}
return; return;
__close_kni_connection: __close_kni_connection:
@@ -227,7 +239,7 @@ void kni_acceptor_deinit(struct kni_acceptor_ctx *ctx)
return; return;
} }
struct kni_acceptor_ctx * kni_acceptor_init(const char *profile, void *logger) struct kni_acceptor_ctx * kni_acceptor_init(struct tfe_proxy *proxy, const char *profile, void *logger)
{ {
struct kni_acceptor_ctx * __ctx = ALLOC(struct kni_acceptor_ctx, 1); struct kni_acceptor_ctx * __ctx = ALLOC(struct kni_acceptor_ctx, 1);
struct sockaddr_un __sockaddr_un; struct sockaddr_un __sockaddr_un;

View File

@@ -13,12 +13,3 @@
#include <MESA/MESA_handle_logger.h> #include <MESA/MESA_handle_logger.h>
#include <MESA/MESA_prof_load.h> #include <MESA/MESA_prof_load.h>
#include <MESA/wired_cfg.h> #include <MESA/wired_cfg.h>
extern struct tfe_instance __g_tfe_instance;
extern struct tfe_config __g_tfe_config;
struct tfe_instance* g_tfe_instance = &__g_tfe_instance;
struct tfe_config * g_tfe_cfg = &__g_tfe_config;
const char* module_name="TFE";

View File

@@ -1,3 +1,6 @@
/*
* Proxy engine, built around libevent 2.x.
*/
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
@@ -22,48 +25,9 @@
#include <tfe_utils.h> #include <tfe_utils.h>
#include <tfe_stream.h> #include <tfe_stream.h>
#include <stream.h> #include <stream.h>
#include <kni.h> #include <proxy.h>
#include <sescache.h> #include <sescache.h>
#include <kni.h>
/*
* Proxy engine, built around libevent 2.x.
*/
#define TFE_BACKLOG_DEFAULT 20
const char * module_name_pxy = "TFE_PXY";
extern struct tfe_instance * g_tfe_instance;
__thread int __currect_thread_id;
static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
{
return;
}
/*
* Thread entry point; runs the event loop of the event base.
* Does not exit until the libevent loop is broken explicitly.
*/
static void * __tfe_thrmgr_thread_entry(void * arg)
{
struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
struct timeval timer_delay = {60, 0};
struct event * ev;
ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL);
if (!ev) return (void *)NULL;
evtimer_add(ev, &timer_delay);
ctx->running = 1;
__currect_thread_id = ctx->thread_id;
event_base_dispatch(ctx->evbase);
event_free(ev);
return (void *)NULL;
}
static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1}; static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1};
@@ -89,9 +53,41 @@ struct tfe_proxy
void * io_mod; void * io_mod;
}; };
const char * module_name_pxy = "TFE_PXY";
extern struct tfe_instance * g_tfe_instance;
__thread int __currect_thread_id;
static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
{
return;
}
/* /*
* Signal handler for SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGPIPE and SIGUSR1. * Thread entry point; runs the event loop of the event base.
* Does not exit until the libevent loop is broken explicitly.
*/ */
static void * __tfe_thrmgr_thread_entry(void * arg)
{
struct tfe_thread_ctx * ctx = (struct tfe_thread_ctx *) arg;
struct timeval timer_delay = {60, 0};
struct event * ev;
ev = event_new(ctx->evbase, -1, EV_PERSIST, __dummy_event_handler, NULL);
if (!ev) return (void *)NULL;
evtimer_add(ev, &timer_delay);
ctx->running = 1;
__currect_thread_id = ctx->thread_id;
event_base_dispatch(ctx->evbase);
event_free(ev);
return (void *)NULL;
}
static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg) static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg)
{ {
tfe_proxy * ctx = (tfe_proxy *) arg; tfe_proxy * ctx = (tfe_proxy *) arg;
@@ -105,10 +101,10 @@ static void proxy_signal_cb(evutil_socket_t fd, short what, void * arg)
case SIGUSR1: case SIGUSR1:
break; break;
case SIGPIPE: case SIGPIPE:
TFE_LOG_ERROR("Warning: Received SIGPIPE; ignoring.\n"); TFE_LOG_ERROR(ctx->main_logger, "Warning: Received SIGPIPE; ignoring.\n");
break; break;
default: default:
TFE_LOG_ERROR("Warning: Received unexpected signal %i\n", fd); TFE_LOG_ERROR(ctx->main_logger, "Warning: Received unexpected signal %i\n", fd);
break; break;
} }
} }
@@ -140,19 +136,20 @@ unsigned int select_work_thread(struct tfe_proxy * pxy)
/* /*
* Callback for accept events on the socket listener bufferevent. * Callback for accept events on the socket listener bufferevent.
*/ */
static void io_mod_accept_cb(evutil_socket_t upstream_fd, evutil_socket_t downstream_fd,
enum tfe_session_proto session_type, void * arg) int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para)
{ {
struct tfe_proxy * pxy = (struct tfe_proxy *) arg; unsigned int worker_tid = select_work_thread(ctx);
tfe_thread_ctx * worker_thread_ctx = &ctx->work_threads[worker_tid];
unsigned int worker_tid = select_work_thread(pxy); struct tfe_stream_private * stream = tfe_stream_create(para->upstream_fd,
tfe_thread_ctx * worker_thread_ctx = &pxy->work_threads[worker_tid]; para->downstream_fd, para->session_type, worker_thread_ctx);
struct tfe_stream_private * stream = tfe_stream_create(upstream_fd, if (stream == NULL) goto __errout;
downstream_fd, session_type, worker_thread_ctx); tfe_stream_setup(stream);
assert(stream != NULL); __errout:
return tfe_stream_setup(stream); return -1;
} }
/* /*
@@ -160,7 +157,7 @@ static void io_mod_accept_cb(evutil_socket_t upstream_fd, evutil_socket_t downst
* Socket clisock is the privsep client socket used for binding to ports. * Socket clisock is the privsep client socket used for binding to ports.
* Returns ctx on success, or NULL on error. * Returns ctx on success, or NULL on error.
*/ */
struct tfe_proxy * tfe_proxy_new(tfe_config * cfg) struct tfe_proxy * tfe_proxy_new(const char * profile)
{ {
struct tfe_proxy * proxy = ALLOC(struct tfe_proxy, 1); struct tfe_proxy * proxy = ALLOC(struct tfe_proxy, 1);
assert(proxy != NULL); assert(proxy != NULL);
@@ -182,6 +179,7 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * cfg)
proxy->modules[1].proto = APP_PROTO_HTTP2; proxy->modules[1].proto = APP_PROTO_HTTP2;
proxy->work_threads = ALLOC(struct tfe_thread_ctx, proxy->nr_work_threads); proxy->work_threads = ALLOC(struct tfe_thread_ctx, proxy->nr_work_threads);
proxy->io_mod = kni_acceptor_init(proxy, profile, NULL);
for (unsigned int i = 0; i < proxy->nr_work_threads; i++) for (unsigned int i = 0; i < proxy->nr_work_threads; i++)
{ {
@@ -202,8 +200,7 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * cfg)
} }
proxy->gcev = event_new(proxy->evbase, -1, EV_PERSIST, proxy_gc_cb, proxy); proxy->gcev = event_new(proxy->evbase, -1, EV_PERSIST, proxy_gc_cb, proxy);
if (!proxy->gcev) if (!proxy->gcev) goto error_out;
goto error_out;
evtimer_add(proxy->gcev, &gc_delay); evtimer_add(proxy->gcev, &gc_delay);
return proxy; return proxy;
@@ -271,8 +268,8 @@ void proxy_loopbreak(tfe_proxy * ctx)
*/ */
void proxy_free(tfe_proxy * ctx) void proxy_free(tfe_proxy * ctx)
{ {
}
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
@@ -284,11 +281,7 @@ int main(int argc, char *argv[])
//TODO: Initiate Local Cert Cache, Decryption Mirror, Field Stat, Logger and etc. //TODO: Initiate Local Cert Cache, Decryption Mirror, Field Stat, Logger and etc.
//NOTICE: Maat, Cert Store,Tango Cache are initiated in bussiness plugin. //NOTICE: Maat, Cert Store,Tango Cache are initiated in bussiness plugin.
#if 0 proxy=tfe_proxy_new(main_profile);
g_tfe_instance=ALLOC(struct tfe_instance,1);
proxy=tfe_proxy_new(g_tfe_cfg);
#endif
tfe_proxy_run(proxy); tfe_proxy_run(proxy);
proxy_free(proxy); proxy_free(proxy);
} }

View File

@@ -23,6 +23,8 @@
#include <tfe_stream.h> #include <tfe_stream.h>
#include <tfe_utils.h> #include <tfe_utils.h>
#include <tfe_future.h> #include <tfe_future.h>
#include <ssl_stream.h>
#include <stream.h> #include <stream.h>
#include <cert.h> #include <cert.h>
@@ -481,29 +483,59 @@ void peek_client_hello_on_fail(enum e_future_error err, const char * what, void
assert(0); assert(0);
} }
/* int ssl_stream_setup(struct tfe_stream_private * _stream)
* Callback for accept events on the socket listener bufferevent. {
* Called when a new incoming connection has been accepted. return 0;
* Initiates the connection to the server. The incoming connection }
* from the client is not being activated until we have a successful
* connection to the server, because we need the server's certificate int __plain_stream_conn_private_init(struct tfe_stream_private * _stream,
* in order to set up the SSL session to the client. struct tfe_conn_private * _conn, evutil_socket_t fd)
* For consistency, plain TCP works the same way, even if we could {
* start reading from the client while waiting on the connection to struct tfe_proxy * proxy = _stream->proxy;
* the server to connect. struct event_base * ev_base = _stream->thrmgr_ref->evbase;
*/
_conn->bev = bufferevent_socket_new(ev_base, fd, BEV_OPT_DEFER_CALLBACKS)
_conn->fd = fd;
_conn->closed = 0;
_conn->need_shutdown = 0;
_conn->on_writing = 0;
}
int plain_stream_setup(struct tfe_stream_private * _stream)
{
return 0;
}
void tfe_stream_setup(struct tfe_stream_private * _stream) void tfe_stream_setup(struct tfe_stream_private * _stream)
{ {
struct future * f_sni = NULL; struct future * f_sni = NULL;
tfe_thread_ctx * thread = _stream->thrmgr_ref; tfe_thread_ctx * thread = _stream->thrmgr_ref;
switch (_stream->session_type)
if (_stream->session_type == SESSION_PROTO_SSL)
{ {
case SESSION_PROTO_SSL:
// for SSL, defer dst connection setup to initial_readcb
_stream->ssl_downstream = ssl_downstream_create(); _stream->ssl_downstream = ssl_downstream_create();
_stream->async_future = future_create(peek_client_hello_on_succ, peek_client_hello_on_fail, _stream); _stream->async_future = future_create(peek_client_hello_on_succ, peek_client_hello_on_fail, _stream);
ssl_async_peek_client_hello(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream, ssl_async_peek_client_hello(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream,
_stream->thrmgr_ref->evbase); _stream->thrmgr_ref->evbase);
}
else if (_stream->session_type == SESSION_PROTO_PLAIN)
{
bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream);
bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream);
bufferevent_enable(_stream->head.upstream.bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->head.downstream.bev, EV_READ | EV_WRITE);
}
else
{
}
switch (_stream->session_type)
{
case SESSION_PROTO_SSL:
// for SSL, defer dst connection setup to initial_readcb
thread->stat.value[SSL_NUM]++; thread->stat.value[SSL_NUM]++;
break; break;
default: default: