初步完成连接层的调用框架。

This commit is contained in:
zhengchao
2018-08-16 21:19:07 +08:00
parent c43a2e86a3
commit 4d845a2b00
5 changed files with 607 additions and 1207 deletions

View File

@@ -40,47 +40,68 @@ struct tfe_stream
struct tfe_conn upstream; struct tfe_conn upstream;
struct tfe_conn downstream; struct tfe_conn downstream;
void* application_pme;
}; };
enum tfe_stream_action
{
ACTION_FORWARD_DATA,
ACTION_DEFER_DATA,
ACTION_DROP_DATA
};
enum tfe_stream_action_opt
{
ACTION_OPT_FOWARD_BYTES, //value is size_t, default: forward entire data
ACTION_OPT_DEFER_TIME_TV, //value is "struct timeval " which defines in <time.h>, default: time defer is not enabled
ACTION_OPT_DEFER_BYTES, //value is size_t, default: defer entire data
ACTION_OPT_DROP_BYTES //value is size_t, default: drop entire data
};
enum tfe_stream_close_reason
{
REASON_PASSIVE_CLOSED,
REASON_ACTIVE_CLOSED,
REASON_ERROR
};
int tfe_stream_action_set_opt(const struct tfe_stream* stream,enum tfe_stream_action_opt type, void* value, size_t size);
/*
@return 0 if successful, or -1 if an error occurred
*/
int tfe_stream_write(const struct tfe_stream* stream, enum tfe_conn_dir dir, const unsigned char *data, size_t len);
struct tfe_stream_write_ctx{};
//following tfe_stream_write_xx functions are NOT thread safe, MUST be called in the stream process thread.
struct tfe_stream_write_ctx* tfe_stream_write_frag_start(const struct tfe_stream* stream, enum tfe_conn_dir dir);
/*
@return 0 if successful, or -1 if an error occurred
*/
int tfe_stream_write_frag(struct tfe_stream_write_ctx* w_ctx,const unsigned char *data, size_t size);
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx* w_ctx);
//Return 1 for identify as its traffic; //Return 1 for identify as its traffic;
//Return 0 for unknown traffic; //Return 0 for unknown traffic;
typedef int proto_pend_cb_t(const struct tfe_stream* stream, struct evbuffer *data, void **pme); typedef tfe_stream_action stream_open_cb_t(const struct tfe_stream* stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char *data, size_t len, void **pme);
typedef tfe_stream_action stream_data_cb_t(const struct tfe_stream* stream, unsigned int thread_id, enum tfe_conn_dir dir, const unsigned char *data, size_t len, void **pme);
typedef void stream_close_cb_t(const struct tfe_stream* stream, unsigned int thread_id, enum tfe_stream_close_reason reason, void **pme);
enum tfe_proto_action void tfe_stream_detach(const struct tfe_stream* stream);
{ int tfe_stream_preempt(const struct tfe_stream* stream);
PROTO_ATCION_FORWARD,
PROTO_ACTION_DEFER,
PROTO_ACTION_STEAL,
PROTO_ACTION_PASSTHROUGH,
PROTO_ACTION_CLOSE
};
enum tfe_proto_action_opt
{
ACTION_OPT_DEFER_TIME_TV, //value is "struct timeval " which defines in <time.h>, default: time defer is not enabled
ACTION_OPT_DEFER_BYTES, //value is size_t, default: defer entire evbufer
ACTION_OPT_FOWARD_BYTES, //value is size_t, default: forward entire evbufer
ACTION_OPT_CLOSE_DIR //value is enum tfe_conn_dir, default: close both side.
};
int tfe_proto_action_set_opt(const struct tfe_stream* stream,enum tfe_proto_action_opt type, void* value, size_t size);
typedef tfe_proto_action proto_read_cb_t(const struct tfe_stream* stream, struct evbuffer *data, enum tfe_conn_dir dir, void **pme);
typedef void proto_close_cb_t(const struct tfe_stream* stream, int ev, void **pme);
int stream_shutdown(const struct tfe_stream* stream);//close both sides of the stream.
int stream_shutdown_dir(const struct tfe_stream* stream, enum tfe_conn_dir dir);
//typedef int proto_onwrite_cb_t(struct tfe_stream*, struct evbuffer *data, void **pme); //typedef int proto_onwrite_cb_t(struct tfe_stream*, struct evbuffer *data, void **pme);
struct tfe_proto_module struct tfe_plugin
{ {
char symbol[TFE_SYMBOL_MAX]; char symbol[TFE_SYMBOL_MAX];
proto_pend_cb_t *on_pend; enum tfe_app_proto proto;
proto_read_cb_t *on_read; stream_open_cb_t* on_open;
proto_close_cb_t *on_close; stream_data_cb_t* on_data;
stream_close_cb_t* on_close;
// proto_onwrite_cb_t *onwrite; // proto_onwrite_cb_t *onwrite;
}; };
int tfe_io_write(struct pxy_conn_desc* dest,int dir,struct evbuffer *data); int tfe_io_write(struct pxy_conn_desc* dest,int dir,struct evbuffer *data);
int tfe_xxx_proto_init(struct tfe_proto_module*m); int tfe_xxx_proto_init(struct tfe_plugin*m);

View File

@@ -885,6 +885,8 @@ struct ssl_upstream* ssl_upstream_create(tfe_config*opts, const char* sni)
}*/ }*/
void ssl_upstream_free(struct ssl_upstream* p) void ssl_upstream_free(struct ssl_upstream* p)
{ {
X509_free(p->orig_cert);
//todo close ssl with a callback.
//SSL_free(ctx->ssl); //SSL_free(ctx->ssl);
} }
struct ssl_downstream* ssl_downstream_create(void) struct ssl_downstream* ssl_downstream_create(void)

View File

@@ -37,13 +37,6 @@ extern struct tfe_instance* g_tfe_instance;
__thread int __currect_thread_id; __thread int __currect_thread_id;
struct tfe_thread_manager_ctx
{
tfe_config *opts;
unsigned int nr_thread;
tfe_thread_ctx* thr_ctx;
};
void free_thread_manager(struct tfe_thread_manager_ctx* ctx) void free_thread_manager(struct tfe_thread_manager_ctx* ctx)
{ {
@@ -123,6 +116,8 @@ struct tfe_proxy
cert_mgr* cert_mgr; cert_mgr* cert_mgr;
struct sess_cache* dsess_cache; struct sess_cache* dsess_cache;
struct sess_cache* ssess_cache; struct sess_cache* ssess_cache;
int module_num;
struct tfe_plugin* modules;
}; };
/* /*
@@ -261,6 +256,13 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * opts)
proxy->dsess_cache=session_cache_init(); proxy->dsess_cache=session_cache_init();
proxy->ssess_cache=session_cache_init(); proxy->ssess_cache=session_cache_init();
proxy->module_num=2;
proxy->modules=ALLOC(struct tfe_plugin,proxy->module_num);
proxy->modules[0].proto=APP_PROTO_HTTP1;
//todo: setup each protocol module.
//proxy->modules[0].on_read=xxx;
proxy->modules[1].proto=APP_PROTO_HTTP2;
proxy->work_threads=ALLOC(struct tfe_thread_ctx, proxy->thread_num); proxy->work_threads=ALLOC(struct tfe_thread_ctx, proxy->thread_num);
for(i=0;i<proxy->thread_num;i++) for(i=0;i<proxy->thread_num;i++)
{ {
@@ -270,7 +272,8 @@ struct tfe_proxy * tfe_proxy_new(tfe_config * opts)
proxy->work_threads[i].cert_mgr=proxy->cert_mgr;//reference proxy->work_threads[i].cert_mgr=proxy->cert_mgr;//reference
proxy->work_threads[i].dsess_cache=proxy->dsess_cache; proxy->work_threads[i].dsess_cache=proxy->dsess_cache;
proxy->work_threads[i].ssess_cache=proxy->ssess_cache; proxy->work_threads[i].ssess_cache=proxy->ssess_cache;
proxy->work_threads[i].module_num=proxy->module_num;
proxy->work_threads[i].modules=proxy->modules
} }
//Todo: Not handle signal if have mutliple proxy instance. //Todo: Not handle signal if have mutliple proxy instance.
for (i = 0; i < (sizeof(signals) / sizeof(int)); i++) for (i = 0; i < (sizeof(signals) / sizeof(int)); i++)

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@
struct tfe_thread_ctx struct tfe_thread_ctx
{ {
pthread_t thr; pthread_t thr;
int thread_id; unsigned int thread_id;
size_t load; size_t load;
struct event_base *evbase; struct event_base *evbase;
unsigned char running; unsigned char running;
@@ -13,7 +13,8 @@ struct tfe_thread_ctx
cert_mgr cert_mgr; cert_mgr cert_mgr;
struct sess_cache* dsess_cache; struct sess_cache* dsess_cache;
struct sess_cache* ssess_cache; struct sess_cache* ssess_cache;
const int module_num;
const struct tfe_plugin* modules;
}; };
@@ -37,12 +38,38 @@ struct ssl_upstream
SSL *ssl; SSL *ssl;
struct future* conn_ssl_srv; struct future* conn_ssl_srv;
}; };
enum tfe_plugin_state
{
PLUG_STATE_READONLY,
PLUG_STATE_PREEPTION,
PLUG_STATE_DETACHED
};
struct plugin_ctx
{
enum tfe_plugin_state state;
void *pme;
};
struct tfe_stream_write_ctx
{
struct tfe_stream_private* _stream;
enum tfe_conn_dir dir;
};
struct tfe_conn_private
{
evutil_socket_t fd;
struct bufferevent *bev;
uint8_t on_writing;
uint8_t closed;
uint8_t need_shutdown;
struct tfe_stream_write_ctx w_ctx;
};
struct tfe_stream_private struct tfe_stream_private
{ {
struct tfe_stream head; struct tfe_stream head;
enum tfe_session_proto session_type; enum tfe_session_proto session_type;
struct tfe_conn_private conn_upstream;
struct tfe_conn_private conn_downstream;
union union
{ {
struct ssl_downstream* ssl_downstream; struct ssl_downstream* ssl_downstream;
@@ -53,13 +80,18 @@ struct tfe_stream_private
struct ssl_upstream* ssl_upstream; struct ssl_upstream* ssl_upstream;
void* raw_upstream; void* raw_upstream;
}; };
enum tfe_app_proto app_type; uint8_t is_fisrt_read;
int calling_idx;
size_t forward_bytes;
size_t defere_bytes;
size_t drop_bytes;
enum tfe_app_proto app_proto;
int plugin_num;
struct plugin_ctx* plug_ctx;
unsigned char passthrough; /* 1 if SSL passthrough is active */ unsigned char passthrough; /* 1 if SSL passthrough is active */
evutil_socket_t fd_downstream, fd_upstream; evutil_socket_t fd_downstream, fd_upstream;
struct tfe_thread_ctx* thrmgr_ref; struct tfe_thread_ctx* thrmgr_ref;
future* async_future; future* async_future;
}; };