完成HTTP请求侧解析调试,可以解析请求的URL。

* 增加插件管理功能(简单实现),可以调用解析层插件;
* 调整HTTP请求侧解析回调函数实现;
* 增加hexdump工具函数;
This commit is contained in:
Lu Qiuwen
2018-09-07 17:27:23 +08:00
parent e31ecbb8db
commit b6a2250786
12 changed files with 273 additions and 84 deletions

View File

@@ -1,4 +1,4 @@
add_library(common src/tfe_stat.cpp src/tfe_utils.cpp src/tfe_future.cpp) add_library(common src/tfe_stat.cpp src/tfe_utils.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp)
target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
target_link_libraries(common MESA_handle_logger) target_link_libraries(common MESA_handle_logger)

View File

@@ -1,5 +1,4 @@
#pragma once #pragma once
#include <tfe_stream.h> #include <tfe_stream.h>
struct tfe_proxy; struct tfe_proxy;
@@ -44,10 +43,14 @@ struct tfe_plugin
}; };
/* Register plugin */ /* Register plugin */
int tfe_plugin_register(struct tfe_plugin * plugin); void tfe_plugin_register(struct tfe_plugin * plugin);
/* Unregister plugin */ /* Unregister plugin */
void tfe_plugin_unregister(struct tfe_plugin * plugin); void tfe_plugin_unregister(struct tfe_plugin * plugin);
/* Interate */
struct tfe_plugin * tfe_plugin_iterate(unsigned int * iterate);
unsigned int tfe_plugin_total_counts();
#define TFE_PLUGIN_REGISTER(_symbol, _plugin) \ #define TFE_PLUGIN_REGISTER(_symbol, _plugin) \
static void __attribute__((constructor, used)) __plugin_loader_##_symbol(); \ static void __attribute__((constructor, used)) __plugin_loader_##_symbol(); \
static void __plugin_loader_##_symbol() \ static void __plugin_loader_##_symbol() \

View File

@@ -84,3 +84,35 @@ char* tfe_strdup(const char* s);
#define TFE_PTR_SUB(ptr, x) ((void*)((uintptr_t)ptr - (x))) #define TFE_PTR_SUB(ptr, x) ((void*)((uintptr_t)ptr - (x)))
#define TFE_PTR_DIFF(ptr1, ptr2) ((uintptr_t)(ptr1) - (uintptr_t)(ptr2)) #define TFE_PTR_DIFF(ptr1, ptr2) ((uintptr_t)(ptr1) - (uintptr_t)(ptr2))
#define TFE_DIM(x) (sizeof (x) / sizeof ((x)[0])) #define TFE_DIM(x) (sizeof (x) / sizeof ((x)[0]))
#include <stdio.h>
static inline void tfe_hexdump(FILE *f, const char * title, const void * buf, unsigned int len)
{
unsigned int i, out, ofs;
const unsigned char *data = (const unsigned char *)buf;
#define LINE_LEN 80
char line[LINE_LEN]; /* space needed 8+16*3+3+16 == 75 */
fprintf(f, "%s at [%p], len=%u\n", (title)? title : " Dump data", data, len);
ofs = 0;
while (ofs < len) {
/* format the line in the buffer, then use printf to output to screen */
out = snprintf(line, LINE_LEN, "%08X:", ofs);
for (i = 0; ((ofs + i) < len) && (i < 16); i++)
out += snprintf(line+out, LINE_LEN - out, " %02X", (data[ofs+i] & 0xff));
for(; i <= 16; i++)
out += snprintf(line+out, LINE_LEN - out, " | ");
for(i = 0; (ofs < len) && (i < 16); i++, ofs++) {
unsigned char c = data[ofs];
if ( (c < ' ') || (c > '~'))
c = '.';
out += snprintf(line+out, LINE_LEN - out, "%c", c);
}
fprintf(f, "%s\n", line);
}
fflush(f);
#undef LINE_LEN
}

24
common/src/tfe_http.cpp Normal file
View File

@@ -0,0 +1,24 @@
#include <stdio.h>
#include <stdlib.h>
#include <tfe_http.h>
const char * tfe_http_field_read(const struct tfe_http_half * half, const struct http_field_name * name)
{
return NULL;
}
int tfe_http_field_write(struct tfe_http_half * half, const struct http_field_name * name, const char * value)
{
return 0;
}
struct tfe_http_half * tfe_http_request_create(int major_version, int method, const char * uri, const char * host)
{
return NULL;
}
struct tfe_http_half * tfe_http_response_create(int major_version, int resp_code)
{
return NULL;
}

30
common/src/tfe_plugin.cpp Normal file
View File

@@ -0,0 +1,30 @@
#include <tfe_plugin.h>
#ifndef TFE_PLUGIN_MAX
#define TFE_PLUGIN_MAX 128
#endif
static struct tfe_plugin * __g_tfe_plugin_info[TFE_PLUGIN_MAX];
static unsigned int __g_nr_plugin_info = 0;
void tfe_plugin_register(struct tfe_plugin * plugin)
{
__g_tfe_plugin_info[__g_nr_plugin_info++] = plugin;
}
void tfe_plugin_unregister(struct tfe_plugin * plugin)
{
return;
}
struct tfe_plugin * tfe_plugin_iterate(unsigned int * iterate)
{
if (*iterate < __g_nr_plugin_info) return __g_tfe_plugin_info[(*iterate)++];
else return NULL;
}
unsigned int tfe_plugin_total_counts()
{
return __g_nr_plugin_info;
}

View File

@@ -15,6 +15,8 @@ target_link_libraries(tfe pthread dl
MESA_htable wiredcfg MESA_htable wiredcfg
MESA_field_stat) MESA_field_stat)
target_link_libraries(tfe -Wl,--whole-archive http -Wl,--no-whole-archive)
install(TARGETS tfe RUNTIME DESTINATION ./) install(TARGETS tfe RUNTIME DESTINATION ./)
### test_key_keeper ### test_key_keeper

View File

@@ -74,13 +74,16 @@ struct tfe_stream_private
int calling_idx; int calling_idx;
size_t forward_bytes; size_t forward_bytes;
size_t defere_bytes;
size_t drop_bytes; size_t drop_bytes;
enum tfe_app_proto app_proto; size_t defer_bytes;
int plugin_num; struct timeval defer_timeval;
struct plugin_ctx * plug_ctx; enum tfe_app_proto app_proto;
/* Plugin Ctxs */
unsigned int nr_plugin_ctxs;
struct plugin_ctx * plugin_ctxs;
/* TCP forward without scan or decode when the passthough is set */ /* TCP forward without scan or decode when the passthough is set */
bool passthough; bool passthough;

View File

@@ -32,6 +32,7 @@
#include <kni_acceptor.h> #include <kni_acceptor.h>
#include <tcp_stream.h> #include <tcp_stream.h>
#include <MESA/MESA_prof_load.h> #include <MESA/MESA_prof_load.h>
#include <tfe_plugin.h>
static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1}; static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGPIPE, SIGUSR1};
@@ -230,6 +231,16 @@ int main(int argc, char *argv[])
g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger);
CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. ");
/* PLUGIN INIT */
unsigned int plugin_iterator = 0;
for(struct tfe_plugin * plugin_iter = tfe_plugin_iterate(&plugin_iterator);
plugin_iter != NULL; plugin_iter = tfe_plugin_iterate(&plugin_iterator))
{
ret = plugin_iter->on_init(g_default_proxy);
CHECK_OR_EXIT(ret >= 0, "Plugin %s init failed. Exit. ", plugin_iter->symbol);
TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol);
}
for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++)
{ {
g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy); g_default_proxy->sev[i] = evsignal_new(g_default_proxy->evbase, signals[i], __signal_handler_cb, g_default_proxy);

View File

@@ -39,60 +39,68 @@ static void __stream_bev_readcb(struct bufferevent *, void *);
static void __stream_bev_writecb(struct bufferevent *, void *); static void __stream_bev_writecb(struct bufferevent *, void *);
static void __stream_bev_eventcb(struct bufferevent *, short, void *); static void __stream_bev_eventcb(struct bufferevent *, short, void *);
static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream) /* ====================================================================================================================
* HELPER FUNCTIONS
* ===================================================================================================================*/
static inline struct tfe_stream_private * to_stream_private(const struct tfe_stream * stream)
{ {
return container_of(stream, struct tfe_stream_private, head); return container_of(stream, struct tfe_stream_private, head);
} }
static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{ {
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream));
} }
static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{ {
return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); return ((dir == CONN_DIR_DOWNSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
} }
static inline enum tfe_conn_dir __BEV_DIR(struct tfe_stream_private * _stream, struct bufferevent * bev) static inline enum tfe_conn_dir __bev_dir(struct tfe_stream_private * _stream, struct bufferevent * bev)
{ {
return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM); return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_DOWNSTREAM : CONN_DIR_UPSTREAM);
} }
static inline bool __IS_SSL(struct tfe_stream_private * _stream) static inline bool __is_ssl(struct tfe_stream_private * _stream)
{ {
return (_stream->session_type == STREAM_PROTO_SSL); return (_stream->session_type == STREAM_PROTO_SSL);
} }
/* ====================================================================================================================
* INTERFACE
* ===================================================================================================================*/
void tfe_stream_detach(const struct tfe_stream * stream) void tfe_stream_detach(const struct tfe_stream * stream)
{ {
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
int plug_id = _stream->calling_idx; int plug_id = _stream->calling_idx;
_stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED; _stream->plugin_ctxs[plug_id].state = PLUG_STATE_DETACHED;
return; return;
} }
int tfe_stream_preempt(const struct tfe_stream * stream) int tfe_stream_preempt(const struct tfe_stream * stream)
{ {
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
int plug_id = _stream->calling_idx; int plug_id = _stream->calling_idx;
int i = 0; int i = 0;
for (i = 0; i < _stream->plugin_num; i++) for (i = 0; i < _stream->nr_plugin_ctxs; i++)
{ {
if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION) if (_stream->plugin_ctxs[i].state == PLUG_STATE_PREEPTION)
{ {
return -1; return -1;
} }
} }
_stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION; _stream->plugin_ctxs[plug_id].state = PLUG_STATE_PREEPTION;
return 0; return 0;
} }
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{ {
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); struct tfe_stream_private * _stream = to_stream_private(stream);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
if (this_conn->on_writing == 1) if (this_conn->on_writing == 1)
{ {
@@ -107,14 +115,14 @@ struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_strea
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{ {
struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);; struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);;
int ret = bufferevent_write(this_conn->bev, data, size); int ret = bufferevent_write(this_conn->bev, data, size);
return ret; return ret;
} }
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{ {
struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir);
this_conn->on_writing = 0; this_conn->on_writing = 0;
bufferevent_enable(peer_conn->bev, EV_READ); bufferevent_enable(peer_conn->bev, EV_READ);
return; return;
@@ -141,6 +149,35 @@ static tfe_conn_private * __conn_private_create_by_bev(struct tfe_stream_private
return __conn_private; return __conn_private;
} }
int tfe_stream_action_set_opt(const struct tfe_stream * stream, enum tfe_stream_action_opt type,
void * value, size_t size)
{
struct tfe_stream_private * _stream = to_stream_private(stream);
#define __SAVE_PARAM(what) do { \
if(size != sizeof(__typeof__(what))) { return -EINVAL; } \
else { what = *(__typeof(what) *)value; } } while(0) \
switch (type)
{
case ACTION_OPT_FOWARD_BYTES: __SAVE_PARAM(_stream->forward_bytes);
break;
case ACTION_OPT_DEFER_BYTES: __SAVE_PARAM(_stream->defer_bytes);
break;
case ACTION_OPT_DEFER_TIME_TV: __SAVE_PARAM(_stream->defer_timeval);
break;
case ACTION_OPT_DROP_BYTES: __SAVE_PARAM(_stream->drop_bytes);
break;
}
#undef __SAVE_PARAM
return 0;
}
/* ====================================================================================================================
* CONNECTION STRUCTURE AND OPERATION FUCTIONS
* ===================================================================================================================*/
evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn)
{ {
evutil_socket_t __to_release_fd = conn->fd; evutil_socket_t __to_release_fd = conn->fd;
@@ -160,7 +197,7 @@ static void __conn_private_destory(struct tfe_conn_private * conn)
static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg) static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, __BEV_DIR(_stream, bev)); struct tfe_conn_private * peer_conn = __peer_conn(_stream, __bev_dir(_stream, bev));
struct evbuffer * __input_buffer = bufferevent_get_input(bev); struct evbuffer * __input_buffer = bufferevent_get_input(bev);
if (peer_conn == NULL) if (peer_conn == NULL)
@@ -179,13 +216,13 @@ static void __stream_bev_passthrough_writecb(struct bufferevent * bev, void * ar
struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{}; struct tfe_conn_private ** ref_peer_conn{};
if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM) if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{ {
ref_this_conn = &_stream->conn_upstream; ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_downstream;
} }
if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM) if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{ {
ref_this_conn = &_stream->conn_downstream; ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_upstream;
@@ -214,13 +251,13 @@ static void __stream_bev_passthrough_eventcb(struct bufferevent * bev, short eve
struct tfe_conn_private ** ref_this_conn{}; struct tfe_conn_private ** ref_this_conn{};
struct tfe_conn_private ** ref_peer_conn{}; struct tfe_conn_private ** ref_peer_conn{};
if (__BEV_DIR(_stream, bev) == CONN_DIR_UPSTREAM) if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
{ {
ref_this_conn = &_stream->conn_upstream; ref_this_conn = &_stream->conn_upstream;
ref_peer_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_downstream;
} }
if (__BEV_DIR(_stream, bev) == CONN_DIR_DOWNSTREAM) if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
{ {
ref_this_conn = &_stream->conn_downstream; ref_this_conn = &_stream->conn_downstream;
ref_peer_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_upstream;
@@ -273,39 +310,40 @@ __close_connection:
static void __stream_bev_readcb(struct bufferevent * bev, void * arg) static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); enum tfe_conn_dir dir = __bev_dir(_stream, bev);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
int i = 0; enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA;
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA; enum tfe_stream_action action_final = ACTION_FORWARD_DATA;
const struct tfe_plugin * plugins = _stream->thread_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thread_ref->nr_modules;
struct evbuffer * inbuf = bufferevent_get_input(bev); struct evbuffer * inbuf = bufferevent_get_input(bev);
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0; size_t drain_size = 0;
size_t contigous_len = evbuffer_get_length(inbuf);
const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len); const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len);
_stream->defere_bytes = 0; _stream->defer_bytes = 0;
_stream->drop_bytes = 0; _stream->drop_bytes = 0;
_stream->forward_bytes = 0; _stream->forward_bytes = 0;
for (i = 0; i < plug_num; i++) unsigned int plugin_id_iter = 0;
unsigned int plugin_id = 0;
for (const struct tfe_plugin * p_info_iter = tfe_plugin_iterate(&plugin_id_iter);
p_info_iter != NULL; p_info_iter = tfe_plugin_iterate(&plugin_id_iter))
{ {
_stream->calling_idx = i; _stream->calling_idx = plugin_id;
plug_ctx = _stream->plug_ctx + i; struct plugin_ctx * plug_ctx = &_stream->plugin_ctxs[plugin_id];
if (_stream->is_plugin_opened == 0) if (_stream->is_plugin_opened == 0)
{ {
plugins[i].on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme)); p_info_iter->on_open(&_stream->head, _stream->thread_ref->thread_id, dir, &(plug_ctx->pme));
_stream->is_plugin_opened = 1; _stream->is_plugin_opened = 1;
} }
else else
{ {
action_tmp = plugins[i].on_data(&_stream->head, _stream->thread_ref->thread_id, action_tmp = p_info_iter->on_data(&_stream->head, _stream->thread_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme)); dir, contiguous_data, contigous_len, &(plug_ctx->pme));
} }
@@ -313,6 +351,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
{ {
action_final = action_tmp; action_final = action_tmp;
} }
plugin_id++;
} }
switch (action_final) switch (action_final)
@@ -327,6 +367,7 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
evbuffer_add_buffer(outbuf, inbuf); evbuffer_add_buffer(outbuf, inbuf);
} }
break; break;
case ACTION_DROP_DATA: case ACTION_DROP_DATA:
if (_stream->drop_bytes > 0) if (_stream->drop_bytes > 0)
{ {
@@ -337,10 +378,12 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
drain_size = evbuffer_get_length(inbuf); drain_size = evbuffer_get_length(inbuf);
} }
evbuffer_drain(inbuf, drain_size); evbuffer_drain(inbuf, drain_size);
break;
case ACTION_DEFER_DATA: case ACTION_DEFER_DATA:
if (_stream->defere_bytes > 0) if (_stream->defer_bytes > 0)
{ {
bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0); bufferevent_setwatermark(bev, EV_WRITE, _stream->defer_bytes, 0);
} }
break; break;
default: assert(0); default: assert(0);
@@ -354,8 +397,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
{ {
bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, bufferevent_setwatermark(peer_conn->bev, EV_WRITE,
TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT);
bufferevent_disable(bev, EV_READ); bufferevent_disable(bev, EV_READ);
} }
@@ -370,8 +413,8 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
static void __stream_bev_writecb(struct bufferevent * bev, void * arg) static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); enum tfe_conn_dir dir = __bev_dir(_stream, bev);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
// struct evbuffer * outbuf = bufferevent_get_output(bev); // struct evbuffer * outbuf = bufferevent_get_output(bev);
@@ -391,9 +434,9 @@ static void __stream_bev_writecb(struct bufferevent * bev, void * arg)
static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg) static void __stream_bev_eventcb(struct bufferevent * bev, short events, void * arg)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __BEV_DIR(_stream, bev); enum tfe_conn_dir dir = __bev_dir(_stream, bev);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
const struct tfe_plugin * plugins = _stream->thread_ref->modules; const struct tfe_plugin * plugins = _stream->thread_ref->modules;
struct plugin_ctx * plug_ctx = NULL; struct plugin_ctx * plug_ctx = NULL;
@@ -404,7 +447,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
{ {
this_conn->closed = 1; this_conn->closed = 1;
reason = REASON_ERROR; reason = REASON_ERROR;
if(__IS_SSL(_stream)) if (__is_ssl(_stream))
{ {
ssl_stream_log_error(bev, dir, __STREAM_LOGGER(_stream)); ssl_stream_log_error(bev, dir, __STREAM_LOGGER(_stream));
} }
@@ -429,7 +472,7 @@ call_plugin_close:
for (i = 0; i < plug_num; i++) for (i = 0; i < plug_num; i++)
{ {
_stream->calling_idx = i; _stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i; plug_ctx = _stream->plugin_ctxs + i;
plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme)); plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme));
} }
@@ -469,7 +512,6 @@ __errout:
return NULL; return NULL;
} }
void __conn_private_enable(struct tfe_conn_private * conn_private) void __conn_private_enable(struct tfe_conn_private * conn_private)
{ {
assert(conn_private != NULL && conn_private->bev != NULL); assert(conn_private != NULL && conn_private->bev != NULL);
@@ -519,7 +561,7 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user)
_stream->defer_fd_upstream = 0; _stream->defer_fd_upstream = 0;
/* Next, create downstream */ /* Next, create downstream */
_stream->future_downstream_create = future_create("ssl_down",ssl_downstream_create_on_success, _stream->future_downstream_create = future_create("ssl_down", ssl_downstream_create_on_success,
ssl_downstream_create_on_fail, _stream); ssl_downstream_create_on_fail, _stream);
ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr, ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr,
@@ -536,6 +578,9 @@ struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_
struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1);
_stream->thread_ref = thread_ctx; _stream->thread_ref = thread_ctx;
_stream->proxy_ref = pxy; _stream->proxy_ref = pxy;
unsigned int total_plugin_count = tfe_plugin_total_counts();
_stream->plugin_ctxs = ALLOC(struct plugin_ctx, total_plugin_count);
return (struct tfe_stream *) &_stream->head; return (struct tfe_stream *) &_stream->head;
} }
@@ -544,13 +589,13 @@ void tfe_stream_destory(struct tfe_stream_private * stream)
struct tfe_thread_ctx * thread = stream->thread_ref; struct tfe_thread_ctx * thread = stream->thread_ref;
struct event_base * ev_base = thread->evbase; struct event_base * ev_base = thread->evbase;
if (__IS_SSL(stream) && stream->ssl_upstream) if (__is_ssl(stream) && stream->ssl_upstream)
{ {
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream);
ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd); ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd);
} }
if (__IS_SSL(stream) && stream->ssl_downstream) if (__is_ssl(stream) && stream->ssl_downstream)
{ {
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_downstream); evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_downstream);
ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd); ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd);
@@ -585,7 +630,7 @@ void tfe_stream_destory(struct tfe_stream_private * stream)
{ {
future_destroy(stream->future_upstream_create); future_destroy(stream->future_upstream_create);
} }
stream->proxy_ref=NULL; stream->proxy_ref = NULL;
free(stream); free(stream);
thread->load--; thread->load--;
} }
@@ -628,7 +673,6 @@ void tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downs
return; return;
} }
int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg) int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt, const void * arg, size_t sz_arg)
{ {
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
@@ -636,12 +680,12 @@ int tfe_stream_option_set(struct tfe_stream * stream, enum tfe_stream_option opt
if (opt == TFE_STREAM_OPT_SESSION_TYPE) if (opt == TFE_STREAM_OPT_SESSION_TYPE)
{ {
assert(sz_arg == sizeof(enum tfe_stream_proto)); assert(sz_arg == sizeof(enum tfe_stream_proto));
_stream->session_type = *(enum tfe_stream_proto *)arg; _stream->session_type = *(enum tfe_stream_proto *) arg;
} }
else if (opt == TFE_STREAM_OPT_PASSTHROUGH) else if (opt == TFE_STREAM_OPT_PASSTHROUGH)
{ {
assert(sz_arg == sizeof(bool)); assert(sz_arg == sizeof(bool));
_stream->passthough = *(bool *)arg; _stream->passthough = *(bool *) arg;
} }
return 0; return 0;

View File

@@ -32,6 +32,13 @@ struct http_connection_private
unsigned int session_id_counter; unsigned int session_id_counter;
}; };
enum http_half_status
{
STATUS_INIT,
STATUS_READING,
STATUS_COMPLETE,
};
struct http_half_private struct http_half_private
{ {
/* PUBLIC STRUCTURE */ /* PUBLIC STRUCTURE */
@@ -60,8 +67,9 @@ struct http_half_private
struct evbuffer * evbuf_header_value; struct evbuffer * evbuf_header_value;
struct evbuffer * evbuf_body; struct evbuffer * evbuf_body;
/* STATUS */ enum http_half_status status_header;
bool finished; enum http_half_status status_body;
enum http_half_status status_message;
}; };
static inline struct http_half_private * to_hf_request_private(struct http_session_private * hs_private) static inline struct http_half_private * to_hf_request_private(struct http_session_private * hs_private)

View File

@@ -68,6 +68,8 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
{ {
struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list); struct http_session_private * hs_private = TAILQ_LAST(&hc_private->hs_private_list, hs_private_list);
struct http_half_private * hf_private_request = NULL; struct http_half_private * hf_private_request = NULL;
/* tfe_hexdump(stderr, __FUNCTION__, data, (unsigned int)len); */
int ret = 0; int ret = 0;
/* There is no available in session list, /* There is no available in session list,
@@ -80,7 +82,7 @@ enum tfe_stream_action __http_connection_entry_on_request(const struct tfe_strea
/* The last request is finished, we need to create a new session, /* The last request is finished, we need to create a new session,
* or proceed parse content for last request */ * or proceed parse content for last request */
hf_private_request = to_hf_request_private(hs_private); hf_private_request = to_hf_request_private(hs_private);
if (hf_private_request->finished) if (hf_private_request->status_message == STATUS_COMPLETE)
{ {
goto __new_session; goto __new_session;
} }
@@ -112,11 +114,18 @@ __parse:
} }
assert(ret == 1); assert(ret == 1);
if (hf_private_request->status_header == STATUS_COMPLETE)
{
printf("===== URI: %s\n", hf_private_request->hf_public.req_spec.uri);
}
/* Touch a boundary, such as the end of HTTP headers, bodys, et al. /* Touch a boundary, such as the end of HTTP headers, bodys, et al.
* need to call user's cb */ * need to call user's cb */
size_t __forward_bytes = hf_private_request->parse_cursor; size_t __forward_bytes = hf_private_request->parse_cursor;
tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__forward_bytes, sizeof(__forward_bytes)); tfe_stream_action_set_opt(stream, ACTION_OPT_FOWARD_BYTES, &__forward_bytes, sizeof(__forward_bytes));
/* Clear the parser cursor */
hf_private_request->parse_cursor = 0;
return ACTION_FORWARD_DATA; return ACTION_FORWARD_DATA;
} }
@@ -164,6 +173,7 @@ enum tfe_stream_action http_connection_entry_data(const struct tfe_stream * stre
* It may be failed because other plugin preempted before us */ * It may be failed because other plugin preempted before us */
ret = tfe_stream_preempt(stream); ret = tfe_stream_preempt(stream);
if (ret != 0) goto __detach; if (ret != 0) goto __detach;
ht_conn->is_preempted = 1;
} }
/* This stream has been preempt, this plugin try to parse it */ /* This stream has been preempt, this plugin try to parse it */
@@ -182,15 +192,15 @@ void http_connection_entry_close(const struct tfe_stream * stream, unsigned int
} }
static struct tfe_plugin __http_plugin_info = static struct tfe_plugin __http_plugin_info =
{ {
.symbol = "HTTP", .symbol = "HTTP",
.type = TFE_PLUGIN_TYPE_PROTOCOL, .type = TFE_PLUGIN_TYPE_PROTOCOL,
.proto = APP_PROTO_HTTP1, .proto = APP_PROTO_HTTP1,
.on_init = http_plugin_init, .on_init = http_plugin_init,
.on_deinit = http_plugin_deinit, .on_deinit = http_plugin_deinit,
.on_open = http_connection_entry_open, .on_open = http_connection_entry_open,
.on_data = http_connection_entry_data, .on_data = http_connection_entry_data,
.on_close = http_connection_entry_close .on_close = http_connection_entry_close
}; };
TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info) TFE_PLUGIN_REGISTER(HTTP, __http_plugin_info)

View File

@@ -160,6 +160,9 @@ void __hf_public_resp_fill_from_private(struct http_half_private * hf_private, s
* REQUEST PARSER CALLBACKS * REQUEST PARSER CALLBACKS
* ================================================================================================================== */ * ================================================================================================================== */
#define __HF_PRIVATE_CHANGE_STATUS(_status, _now, _to) \
do { assert(_status == _now); _status = _to; } while(0)
static int __parser_callback_on_message_begin(struct http_parser * parser) static int __parser_callback_on_message_begin(struct http_parser * parser)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
@@ -172,18 +175,25 @@ static int __parser_callback_on_message_begin(struct http_parser * parser)
hf_private->evbuf_header_field = evbuffer_new(); hf_private->evbuf_header_field = evbuffer_new();
hf_private->evbuf_header_value = evbuffer_new(); hf_private->evbuf_header_value = evbuffer_new();
hf_private->evbuf_body = evbuffer_new(); hf_private->evbuf_body = evbuffer_new();
hf_private->status_header = STATUS_INIT;
hf_private->status_body = STATUS_INIT;
hf_private->status_message = STATUS_READING;
return 0; return 0;
} }
static int __parser_callback_on_uri_field(struct http_parser * parser, const char * at, size_t length) static int __parser_callback_on_uri_field(struct http_parser * parser, const char * at, size_t length)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
__HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_INIT, STATUS_READING);
return evbuffer_add(hf_private->evbuf_uri, at, length); return evbuffer_add(hf_private->evbuf_uri, at, length);
} }
static int __parser_callback_on_header_field(struct http_parser * parser, const char * at, size_t length) static int __parser_callback_on_header_field(struct http_parser * parser, const char * at, size_t length)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
__HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_READING);
/* Last field-value tuple doesn't push into hf_private, flush these */ /* Last field-value tuple doesn't push into hf_private, flush these */
if (evbuffer_get_length(hf_private->evbuf_header_field) != 0) if (evbuffer_get_length(hf_private->evbuf_header_field) != 0)
{ {
@@ -196,15 +206,10 @@ static int __parser_callback_on_header_field(struct http_parser * parser, const
static int __parser_callback_on_header_value(struct http_parser * parser, const char * at, size_t length) static int __parser_callback_on_header_value(struct http_parser * parser, const char * at, size_t length)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
__HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_READING);
return evbuffer_add(hf_private->evbuf_header_value, at, length); return evbuffer_add(hf_private->evbuf_header_value, at, length);
} }
static int __parser_callback_on_body(struct http_parser * parser, const char * at, size_t length)
{
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
return evbuffer_add(hf_private->evbuf_body, at, length);
}
static int __parser_callback_on_headers_complete(http_parser * parser) static int __parser_callback_on_headers_complete(http_parser * parser)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
@@ -228,14 +233,24 @@ static int __parser_callback_on_headers_complete(http_parser * parser)
__hf_public_resp_fill_from_private(hf_private, parser); __hf_public_resp_fill_from_private(hf_private, parser);
} }
http_parser_pause(parser, 1); __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_READING, STATUS_COMPLETE);
__HF_PRIVATE_CHANGE_STATUS(hf_private->status_body, STATUS_INIT, STATUS_READING);
return 0; return 0;
} }
static int __parser_callback_on_body(struct http_parser * parser, const char * at, size_t length)
{
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
return evbuffer_add(hf_private->evbuf_body, at, length);
}
static int __parser_callback_on_message_complete(http_parser * parser) static int __parser_callback_on_message_complete(http_parser * parser)
{ {
struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser); struct http_half_private * hf_private = __PARSER_TO_HF_PRIVATE(parser);
http_parser_pause(parser, 1); __HF_PRIVATE_CHANGE_STATUS(hf_private->status_header, STATUS_COMPLETE, STATUS_COMPLETE);
__HF_PRIVATE_CHANGE_STATUS(hf_private->status_body, STATUS_READING || STATUS_COMPLETE, STATUS_COMPLETE);
__HF_PRIVATE_CHANGE_STATUS(hf_private->status_message, STATUS_READING, STATUS_COMPLETE);
return 0; return 0;
} }
@@ -311,6 +326,13 @@ int hf_private_parse(struct http_half_private * hf_private, const unsigned char
goto __out; goto __out;
} }
/* Touch message's boundary, the parser jumps to end */
if (hf_private->status_message == STATUS_COMPLETE)
{
ret = 1;
goto __out;
}
/* Some kind of exception happend */ /* Some kind of exception happend */
if (sz_parsed && HTTP_PARSER_ERRNO(hf_private->parse_object) > 0) if (sz_parsed && HTTP_PARSER_ERRNO(hf_private->parse_object) > 0)
{ {