diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 435f359..7dda4cb 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(common src/tfe_stat.cpp src/tfe_utils.cpp src/tfe_future.cpp) +add_library(common src/tfe_stat.cpp src/tfe_utils.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(common MESA_handle_logger) diff --git a/common/include/tfe_plugin.h b/common/include/tfe_plugin.h index e00040a..805651e 100644 --- a/common/include/tfe_plugin.h +++ b/common/include/tfe_plugin.h @@ -1,5 +1,4 @@ #pragma once - #include struct tfe_proxy; @@ -44,10 +43,14 @@ struct tfe_plugin }; /* Register plugin */ -int tfe_plugin_register(struct tfe_plugin * plugin); +void tfe_plugin_register(struct tfe_plugin * plugin); /* Unregister plugin */ void tfe_plugin_unregister(struct tfe_plugin * plugin); +/* Interate */ +struct tfe_plugin * tfe_plugin_iterate(unsigned int * iterate); +unsigned int tfe_plugin_total_counts(); + #define TFE_PLUGIN_REGISTER(_symbol, _plugin) \ static void __attribute__((constructor, used)) __plugin_loader_##_symbol(); \ static void __plugin_loader_##_symbol() \ diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index bb76ae0..ec5a27a 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -84,3 +84,35 @@ char* tfe_strdup(const char* s); #define TFE_PTR_SUB(ptr, x) ((void*)((uintptr_t)ptr - (x))) #define TFE_PTR_DIFF(ptr1, ptr2) ((uintptr_t)(ptr1) - (uintptr_t)(ptr2)) #define TFE_DIM(x) (sizeof (x) / sizeof ((x)[0])) + +#include + +static inline void tfe_hexdump(FILE *f, const char * title, const void * buf, unsigned int len) +{ + unsigned int i, out, ofs; + const unsigned char *data = (const unsigned char *)buf; + +#define LINE_LEN 80 + char line[LINE_LEN]; /* space needed 8+16*3+3+16 == 75 */ + + fprintf(f, "%s at [%p], len=%u\n", (title)? title : " Dump data", data, len); + ofs = 0; + while (ofs < len) { + /* format the line in the buffer, then use printf to output to screen */ + out = snprintf(line, LINE_LEN, "%08X:", ofs); + for (i = 0; ((ofs + i) < len) && (i < 16); i++) + out += snprintf(line+out, LINE_LEN - out, " %02X", (data[ofs+i] & 0xff)); + for(; i <= 16; i++) + out += snprintf(line+out, LINE_LEN - out, " | "); + for(i = 0; (ofs < len) && (i < 16); i++, ofs++) { + unsigned char c = data[ofs]; + if ( (c < ' ') || (c > '~')) + c = '.'; + out += snprintf(line+out, LINE_LEN - out, "%c", c); + } + fprintf(f, "%s\n", line); + } + fflush(f); + +#undef LINE_LEN +} diff --git a/common/src/tfe_http.cpp b/common/src/tfe_http.cpp new file mode 100644 index 0000000..80e4edf --- /dev/null +++ b/common/src/tfe_http.cpp @@ -0,0 +1,24 @@ + +#include +#include +#include + +const char * tfe_http_field_read(const struct tfe_http_half * half, const struct http_field_name * name) +{ + return NULL; +} + +int tfe_http_field_write(struct tfe_http_half * half, const struct http_field_name * name, const char * value) +{ + return 0; +} + +struct tfe_http_half * tfe_http_request_create(int major_version, int method, const char * uri, const char * host) +{ + return NULL; +} + +struct tfe_http_half * tfe_http_response_create(int major_version, int resp_code) +{ + return NULL; +} diff --git a/common/src/tfe_plugin.cpp b/common/src/tfe_plugin.cpp new file mode 100644 index 0000000..72ec01b --- /dev/null +++ b/common/src/tfe_plugin.cpp @@ -0,0 +1,30 @@ + +#include + +#ifndef TFE_PLUGIN_MAX +#define TFE_PLUGIN_MAX 128 +#endif + +static struct tfe_plugin * __g_tfe_plugin_info[TFE_PLUGIN_MAX]; +static unsigned int __g_nr_plugin_info = 0; + +void tfe_plugin_register(struct tfe_plugin * plugin) +{ + __g_tfe_plugin_info[__g_nr_plugin_info++] = plugin; +} + +void tfe_plugin_unregister(struct tfe_plugin * plugin) +{ + return; +} + +struct tfe_plugin * tfe_plugin_iterate(unsigned int * iterate) +{ + if (*iterate < __g_nr_plugin_info) return __g_tfe_plugin_info[(*iterate)++]; + else return NULL; +} + +unsigned int tfe_plugin_total_counts() +{ + return __g_nr_plugin_info; +} diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index 8dda336..49e5919 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -15,6 +15,8 @@ target_link_libraries(tfe pthread dl MESA_htable wiredcfg MESA_field_stat) +target_link_libraries(tfe -Wl,--whole-archive http -Wl,--no-whole-archive) + install(TARGETS tfe RUNTIME DESTINATION ./) ### test_key_keeper diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index 12a8ae8..60581ed 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -74,13 +74,16 @@ struct tfe_stream_private int calling_idx; size_t forward_bytes; - size_t defere_bytes; size_t drop_bytes; - enum tfe_app_proto app_proto; - int plugin_num; + size_t defer_bytes; + struct timeval defer_timeval; - struct plugin_ctx * plug_ctx; + enum tfe_app_proto app_proto; + + /* Plugin Ctxs */ + unsigned int nr_plugin_ctxs; + struct plugin_ctx * plugin_ctxs; /* TCP forward without scan or decode when the passthough is set */ bool passthough; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index ff9f4b4..44d5243 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -32,6 +32,7 @@ #include #include #include +#include static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1}; @@ -230,6 +231,16 @@ int main(int argc, char *argv[]) g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); + /* PLUGIN INIT */ + unsigned int plugin_iterator = 0; + for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator); + plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator)) + { + ret = plugin_iter->on_init(g_default_proxy); + CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol); + TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); + } + for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) { g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index ca788e0..8fd4dd5 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -39,60 +39,68 @@ static void __stream_bev_readcb(struct bufferevent *, void *); static void __stream_bev_writecb(struct bufferevent *, void *); static void __stream_bev_eventcb(struct bufferevent *, short, void *); -static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream) +/* ==================================================================================================================== + * HELPER FUNCTIONS + * ===================================================================================================================*/ + +static inline struct tfe_stream_private * to_stream_private(const struct tfe_stream * stream) { return container_of(stream, struct tfe_stream_private, head); } -static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) +static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); } -static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) +static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) { return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); } -static inline enum tfe_conn_dir __BEV_DIR(struct tfe_stream_private * _stream, struct bufferevent * bev) +static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev) { return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM); } -static inline bool __IS_SSL(struct tfe_stream_private * _stream) +static inline bool __is_ssl(struct tfe_stream_private * _stream) { return (_stream->session_type == STREAM_PROTO_SSL); } +/* ==================================================================================================================== + * INTERFACE + * ===================================================================================================================*/ + void tfe_stream_detach(const struct tfe_stream * stream) { - struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); + struct tfe_stream_private * _stream = to_stream_private(stream); int plug_id = _stream->calling_idx; - _stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED; + _stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED; return; } int tfe_stream_preempt(const struct tfe_stream * stream) { - struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); + struct tfe_stream_private * _stream = to_stream_private(stream); int plug_id = _stream->calling_idx; int i = 0; - for (i = 0; i < _stream->plugin_num; i++) + for (i = 0; i < _stream->nr_plugin_ctxs; i++) { - if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION) + if (_stream->plugin_ctxs[i].state == PLUG_STATE_PREEPTION) { return -1; } } - _stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION; + _stream->plugin_ctxs[plug_id].state = PLUG_STATE_PREEPTION; return 0; } struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) { - struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); - struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); - struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + struct tfe_stream_private * _stream = to_stream_private(stream); + struct tfe_conn_private * this_conn = __this_conn(_stream, dir); + struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); if (this_conn->on_writing == 1) { @@ -107,14 +115,14 @@ struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_strea int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) { - struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);; + struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);; int ret = bufferevent_write(this_conn->bev, data, size); return ret; } void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) { - struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir); - struct tfe_conn_private * peer_conn = __PEER_CONN(w_ctx->_stream, w_ctx->dir); + struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); + struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); this_conn->on_writing = 0; bufferevent_enable(peer_conn->bev, EV_READ); return; @@ -141,6 +149,35 @@ static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private return __conn_private; } +int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type, + void * value, size_t size) +{ + struct tfe_stream_private * _stream = to_stream_private(stream); + +#define __SAVE_PARAM(what) do { \ + if(size != sizeof(__typeof__(what))) { return -EINVAL; } \ + else { what = *(__typeof(what) *)value; } } while(0) \ + + switch (type) + { + case ACTION_OPT_FOWARD_BYTES: __SAVE_PARAM(_stream->forward_bytes); + break; + case ACTION_OPT_DEFER_BYTES: __SAVE_PARAM(_stream->defer_bytes); + break; + case ACTION_OPT_DEFER_TIME_TV: __SAVE_PARAM(_stream->defer_timeval); + break; + case ACTION_OPT_DROP_BYTES: __SAVE_PARAM(_stream->drop_bytes); + break; + } + +#undef __SAVE_PARAM + return 0; +} + +/* ==================================================================================================================== + * CONNECTION STRUCTURE AND OPERATION FUCTIONS + * ===================================================================================================================*/ + evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) { evutil_socket_t __to_release_fd = conn->fd; @@ -160,7 +197,7 @@ static void __conn_private_destory(struct tfe_conn_private * conn) static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, __BEV_DIR(_stream, bev)); + struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev)); struct evbuffer * __input_buffer = bufferevent_get_input(bev); if (peer_conn == NULL) @@ -179,13 +216,13 @@ static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * ar struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; - if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM) + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; } - if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM) + if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; @@ -214,13 +251,13 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_peer_conn{}; - if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM) + if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) { ref_this_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_downstream; } - if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM) + if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) { ref_this_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_upstream; @@ -273,39 +310,40 @@ __close_connection: static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); - struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + enum tfe_conn_dir dir = __bev_dir(_stream, bev); + struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); - int i = 0; - enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA; - - const struct tfe_plugin * plugins = _stream->thread_ref->modules; - struct plugin_ctx * plug_ctx = NULL; - int plug_num = _stream->thread_ref->nr_modules; + enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA; + enum tfe_stream_action action_final = ACTION_FORWARD_DATA; struct evbuffer * inbuf = bufferevent_get_input(bev); struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); - size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0; + size_t drain_size = 0; + size_t contigous_len = evbuffer_get_length(inbuf); const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len); - _stream->defere_bytes = 0; + _stream->defer_bytes = 0; _stream->drop_bytes = 0; _stream->forward_bytes = 0; - for (i = 0; i < plug_num; i++) + unsigned int plugin_id_iter = 0; + unsigned int plugin_id = 0; + + for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter); + p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter)) { - _stream->calling_idx = i; - plug_ctx = _stream->plug_ctx + i; + _stream->calling_idx = plugin_id; + struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id]; if (_stream->is_plugin_opened == 0) { - plugins[i].on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme)); + p_info_iter->on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme)); _stream->is_plugin_opened = 1; } else { - action_tmp = plugins[i].on_data(&_stream->head, _stream->thread_ref->thread_id, + action_tmp = p_info_iter->on_data(&_stream->head, _stream->thread_ref->thread_id, dir, contiguous_data, contigous_len, &(plug_ctx->pme)); } @@ -313,6 +351,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) { action_final = action_tmp; } + + plugin_id++; } switch (action_final) @@ -327,6 +367,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) evbuffer_add_buffer(outbuf, inbuf); } break; + case ACTION_DROP_DATA: if (_stream->drop_bytes > 0) { @@ -337,10 +378,12 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) drain_size = evbuffer_get_length(inbuf); } evbuffer_drain(inbuf, drain_size); + break; + case ACTION_DEFER_DATA: - if (_stream->defere_bytes > 0) + if (_stream->defer_bytes > 0) { - bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0); + bufferevent_setwatermark(bev, EV_WRITE, _stream->defer_bytes, 0); } break; default: assert(0); @@ -354,8 +397,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) { - bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, - TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); + bufferevent_setwatermark(peer_conn->bev, EV_WRITE, + TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); bufferevent_disable(bev, EV_READ); } @@ -370,8 +413,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) static void __stream_bev_writecb(struct bufferevent * bev, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); - struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + enum tfe_conn_dir dir = __bev_dir(_stream, bev); + struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); // struct evbuffer * outbuf = bufferevent_get_output(bev); @@ -391,9 +434,9 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg) static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg) { struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); - struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); - struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + enum tfe_conn_dir dir = __bev_dir(_stream, bev); + struct tfe_conn_private * this_conn = __this_conn(_stream, dir); + struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); const struct tfe_plugin * plugins = _stream->thread_ref->modules; struct plugin_ctx * plug_ctx = NULL; @@ -404,7 +447,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * { this_conn->closed = 1; reason = REASON_ERROR; - if(__IS_SSL(_stream)) + if (__is_ssl(_stream)) { ssl_stream_log_error(bev, dir, __STREAM_LOGGER(_stream)); } @@ -429,7 +472,7 @@ call_plugin_close: for (i = 0; i < plug_num; i++) { _stream->calling_idx = i; - plug_ctx = _stream->plug_ctx + i; + plug_ctx = _stream->plugin_ctxs + i; plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme)); } @@ -469,7 +512,6 @@ __errout: return NULL; } - void __conn_private_enable(struct tfe_conn_private * conn_private) { assert(conn_private != NULL && conn_private->bev != NULL); @@ -519,7 +561,7 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user) _stream->defer_fd_upstream = 0; /* Next, create downstream */ - _stream->future_downstream_create = future_create("ssl_down",ssl_downstream_create_on_success, + _stream->future_downstream_create = future_create("ssl_down", ssl_downstream_create_on_success, ssl_downstream_create_on_fail, _stream); ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr, @@ -536,6 +578,9 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); _stream->thread_ref = thread_ctx; _stream->proxy_ref = pxy; + + unsigned int total_plugin_count = tfe_plugin_total_counts(); + _stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count); return (struct tfe_stream *) &_stream->head; } @@ -544,13 +589,13 @@ void tfe_stream_destory(struct tfe_stream_private * stream) struct tfe_thread_ctx * thread = stream->thread_ref; struct event_base * ev_base = thread->evbase; - if (__IS_SSL(stream) && stream->ssl_upstream) + if (__is_ssl(stream) && stream->ssl_upstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd); } - if (__IS_SSL(stream) && stream->ssl_downstream) + if (__is_ssl(stream) && stream->ssl_downstream) { evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_downstream); ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd); @@ -585,7 +630,7 @@ void tfe_stream_destory(struct tfe_stream_private * stream) { future_destroy(stream->future_upstream_create); } - stream->proxy_ref=NULL; + stream->proxy_ref = NULL; free(stream); thread->load--; } @@ -628,7 +673,6 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs return; } - int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg) { struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); @@ -636,12 +680,12 @@ int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt if (opt == TFE_STREAM_OPT_SESSION_TYPE) { assert(sz_arg == sizeof(enum tfe_stream_proto)); - _stream->session_type = *(enum tfe_stream_proto *)arg; + _stream->session_type = *(enum tfe_stream_proto *) arg; } else if (opt == TFE_STREAM_OPT_PASSTHROUGH) { assert(sz_arg == sizeof(bool)); - _stream->passthough = *(bool *)arg; + _stream->passthough = *(bool *) arg; } return 0; diff --git a/plugin/protocol/http/include/internal/http_common.h b/plugin/protocol/http/include/internal/http_common.h index fa62b22..70480b6 100644 --- a/plugin/protocol/http/include/internal/http_common.h +++ b/plugin/protocol/http/include/internal/http_common.h @@ -32,6 +32,13 @@ struct http_connection_private unsigned int session_id_counter; }; +enum http_half_status +{ + STATUS_INIT, + STATUS_READING, + STATUS_COMPLETE, +}; + struct http_half_private { /* PUBLIC STRUCTURE */ @@ -60,8 +67,9 @@ struct http_half_private struct evbuffer * evbuf_header_value; struct evbuffer * evbuf_body; - /* STATUS */ - bool finished; + enum http_half_status status_header; + enum http_half_status status_body; + enum http_half_status status_message; }; static inline struct http_half_private * to_hf_request_private(struct http_session_private * hs_private) diff --git a/plugin/protocol/http/src/http_entry.cpp b/plugin/protocol/http/src/http_entry.cpp index 2ea1c77..fe641c2 100644 --- a/plugin/protocol/http/src/http_entry.cpp +++ b/plugin/protocol/http/src/http_entry.cpp @@ -68,6 +68,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea { struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); struct http_half_private * hf_private_request = NULL; + + /* tfe_hexdump(stderr, __FUNCTION__, data, (unsigned int)len); */ int ret = 0; /* There is no available in session list, @@ -80,7 +82,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea /* The last request is finished, we need to create a new session, * or proceed parse content for last request */ hf_private_request = to_hf_request_private(hs_private); - if (hf_private_request->finished) + if (hf_private_request->status_message == STATUS_COMPLETE) { goto __new_session; } @@ -112,11 +114,18 @@ __parse: } assert(ret == 1); + if (hf_private_request->status_header == STATUS_COMPLETE) + { + printf("===== URI: %s\n", hf_private_request->hf_public.req_spec.uri); + } /* Touch a boundary, such as the end of HTTP headers, bodys, et al. * need to call user's cb */ size_t __forward_bytes = hf_private_request->parse_cursor; tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__forward_bytes, sizeof(__forward_bytes)); + + /* Clear the parser cursor */ + hf_private_request->parse_cursor = 0; return ACTION_FORWARD_DATA; } @@ -164,6 +173,7 @@ enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stre * It may be failed because other plugin preempted before us */ ret = tfe_stream_preempt(stream); if (ret != 0) goto __detach; + ht_conn->is_preempted = 1; } /* This stream has been preempt, this plugin try to parse it */ @@ -182,15 +192,15 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int } static struct tfe_plugin __http_plugin_info = - { - .symbol = "HTTP", - .type = TFE_PLUGIN_TYPE_PROTOCOL, - .proto = APP_PROTO_HTTP1, - .on_init = http_plugin_init, - .on_deinit = http_plugin_deinit, - .on_open = http_connection_entry_open, - .on_data = http_connection_entry_data, - .on_close = http_connection_entry_close - }; +{ + .symbol = "HTTP", + .type = TFE_PLUGIN_TYPE_PROTOCOL, + .proto = APP_PROTO_HTTP1, + .on_init = http_plugin_init, + .on_deinit = http_plugin_deinit, + .on_open = http_connection_entry_open, + .on_data = http_connection_entry_data, + .on_close = http_connection_entry_close +}; TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info) diff --git a/plugin/protocol/http/src/http_half.cpp b/plugin/protocol/http/src/http_half.cpp index 38e3453..1d08301 100644 --- a/plugin/protocol/http/src/http_half.cpp +++ b/plugin/protocol/http/src/http_half.cpp @@ -160,6 +160,9 @@ void __hf_public_resp_fill_from_private(struct http_half_private * hf_private, s * REQUEST PARSER CALLBACKS * ================================================================================================================== */ +#define __HF_PRIVATE_CHANGE_STATUS(_status, _now, _to) \ +do { assert(_status == _now); _status = _to; } while(0) + static int __parser_callback_on_message_begin(struct http_parser * parser) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); @@ -172,18 +175,25 @@ static int __parser_callback_on_message_begin(struct http_parser * parser) hf_private->evbuf_header_field = evbuffer_new(); hf_private->evbuf_header_value = evbuffer_new(); hf_private->evbuf_body = evbuffer_new(); + + hf_private->status_header = STATUS_INIT; + hf_private->status_body = STATUS_INIT; + hf_private->status_message = STATUS_READING; return 0; } static int __parser_callback_on_uri_field(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_INIT, STATUS_READING); return evbuffer_add(hf_private->evbuf_uri, at, length); } static int __parser_callback_on_header_field(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_READING); + /* Last field-value tuple doesn't push into hf_private, flush these */ if (evbuffer_get_length(hf_private->evbuf_header_field) != 0) { @@ -196,15 +206,10 @@ static int __parser_callback_on_header_field(struct http_parser * parser, const static int __parser_callback_on_header_value(struct http_parser * parser, const char * at, size_t length) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_READING); return evbuffer_add(hf_private->evbuf_header_value, at, length); } -static int __parser_callback_on_body(struct http_parser * parser, const char * at, size_t length) -{ - struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - return evbuffer_add(hf_private->evbuf_body, at, length); -} - static int __parser_callback_on_headers_complete(http_parser * parser) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); @@ -228,14 +233,24 @@ static int __parser_callback_on_headers_complete(http_parser * parser) __hf_public_resp_fill_from_private(hf_private, parser); } - http_parser_pause(parser, 1); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_COMPLETE); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_body, STATUS_INIT, STATUS_READING); return 0; } +static int __parser_callback_on_body(struct http_parser * parser, const char * at, size_t length) +{ + struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); + return evbuffer_add(hf_private->evbuf_body, at, length); +} + static int __parser_callback_on_message_complete(http_parser * parser) { struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); - http_parser_pause(parser, 1); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_COMPLETE, STATUS_COMPLETE); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_body, STATUS_READING || STATUS_COMPLETE, STATUS_COMPLETE); + __HF_PRIVATE_CHANGE_STATUS(hf_private->status_message, STATUS_READING, STATUS_COMPLETE); + return 0; } @@ -311,6 +326,13 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char goto __out; } + /* Touch message's boundary, the parser jumps to end */ + if (hf_private->status_message == STATUS_COMPLETE) + { + ret = 1; + goto __out; + } + /* Some kind of exception happend */ if (sz_parsed && HTTP_PARSER_ERRNO(hf_private->parse_object) > 0) {