变更stream系列文件的名称,修正了部分编译错误

* 变更stream系列文件的名称为ssl_stream, tcp_stream等;
* 变更stream.h为platform.h,因该文件为平台整体公用;
* 修正了ssl_stream, ssl_sess_cache文件中的编译错误,部分实现的bug。
* 调整了tfe_future的路径,由平台实现改为公用组件。
This commit is contained in:
Lu Qiuwen
2018-08-27 21:10:45 +08:00
parent f60b634ec6
commit 8869f1888c
19 changed files with 1278 additions and 1272 deletions

View File

@@ -3,6 +3,7 @@ project(tfe)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_C_STANDARD 11)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
add_subdirectory(vendor)
add_subdirectory(common)

View File

@@ -1,4 +1,4 @@
add_library(common src/tfe_stat.cpp src/tfe_utils.cpp)
add_library(common src/tfe_stat.cpp src/tfe_utils.cpp src/tfe_future.cpp)
target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include)
target_link_libraries(common MESA_handle_logger)

View File

@@ -15,6 +15,13 @@
#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
#define TFE_STRUCT_ALLOC(struct_type) __extension__ \
({ \
((struct_type) * ) __struct_ptr; \
__struct_ptr = ((struct_type) *)malloc(sizeof((struct_type))); \
__struct_ptr; \
})
#define likely(expr) __builtin_expect((expr), 1)
#define unlikely(expr) __builtin_expect((expr), 0)
@@ -27,6 +34,33 @@ do { MESA_handle_runtime_log(handler, RLOG_LV_INFO, NULL, fmt, ##__VA_ARGS__); }
#define TFE_LOG_DEBUG(handler, fmt, ...) \
do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, NULL, fmt, ##__VA_ARGS__); } while(0) \
#ifndef offsetof
/** Return the offset of a field in a structure. */
#define offsetof(TYPE, MEMBER) __builtin_offsetof (TYPE, MEMBER)
#endif
/**
* Return pointer to the wrapping struct instance.
*
* Example:
*
* struct wrapper {
* ...
* struct child c;
* ...
* };
*
* struct child *x = obtain(...);
* struct wrapper *w = container_of(x, struct wrapper, c);
*/
#ifndef container_of
#define container_of(ptr, type, member) __extension__ ({ \
const typeof(((type *)0)->member) *_ptr = (ptr); \
__attribute__((unused)) type *_target_ptr = (type *)(ptr); \
(type *)(((uintptr_t)_ptr) - offsetof(type, member)); \
})
#endif
int addr_sock_to_layer(struct sockaddr * sock_addr, int sockaddrlen, struct layer_addr * layer_addr);
int addr_layer_to_sock(struct layer_addr * layer_addr, struct sockaddr * sock_addr);
char* tfe_strdup(const char* s);

View File

@@ -1,7 +1,10 @@
add_executable(tfe src/cert.cpp src/future.cpp src/kni.cpp src/tfe_stream.cpp src/main.cpp src/proxy.cpp)
add_executable(tfe src/key_keeper.cpp src/kni_acceptor.cpp src/ssl_stream.cpp src/ssl_sess_cache.cpp
src/tcp_stream.cpp src/main.cpp src/proxy.cpp)
target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external)
target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal)
target_link_libraries(tfe common)
target_link_libraries(tfe pthread dl openssl-ssl-static openssl-crypto-static pthread libevent-static
libevent-static-openssl libevent-static-pthreads MESA_handle_logger MESA_prof_load wiredcfg)
libevent-static-openssl libevent-static-pthreads MESA_handle_logger MESA_prof_load MESA_htable wiredcfg)

View File

@@ -1,8 +1,8 @@
#ifndef CERT_H
#define CERT_H
#pragma once
#include <openssl/ssl.h>
#include <pthread.h>
#include <tfe_future.h>
struct keyring
{
@@ -16,6 +16,5 @@ struct key_keeper * key_keeper_destroy(struct key_keeper *keeper);
struct keyring* key_keeper_release_cert(future_result_t* result);
void key_keeper_free_keyring(struct keyring* cert);
void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, int keyring_id,
void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, int keyring_id,
X509 * origin_cert, int is_cert_valid, struct event_base * evbase);
#endif /* !CERT_H */

View File

@@ -5,7 +5,8 @@
#include <tfe_stream.h>
#include <tfe_stat.h>
#include <cert.h>
#include <tfe_future.h>
#include <proxy.h>
struct tfe_thread_ctx
{
@@ -26,26 +27,6 @@ struct tfe_thread_ctx
const struct tfe_plugin * modules;
};
//Downstream: comunication form client to proxy
//Upstream: communication form proxy to server
struct ssl_downstream
{
/* server name indicated by client in SNI TLS extension */
char * sni;
SSL * ssl;
X509 * fake_cert_ref;//?
int keyring_id;
struct future * future_sni_peek;
struct future * future_get_cert;
};
struct ssl_upstream
{
X509 * orig_cert;
SSL * ssl;
struct future * conn_ssl_srv;
};
enum tfe_plugin_state
{
PLUG_STATE_READONLY,
@@ -78,22 +59,18 @@ struct tfe_conn_private
struct tfe_stream_private
{
struct tfe_stream head;
struct tfe_proxy *proxy;
struct tfe_proxy * proxy_ref;
struct tfe_thread_ctx * thread_ref;
enum tfe_session_proto session_type;
struct tfe_conn_private conn_upstream;
struct tfe_conn_private conn_downstream;
struct tfe_conn_private * conn_upstream;
struct tfe_conn_private * conn_downstream;
union
struct
{
struct ssl_downstream * ssl_downstream;
void * raw_downstream;
};
union
{
struct ssl_upstream * ssl_upstream;
void * raw_upstream;
struct ssl_mgr * ssl_mgr;
struct ssl_stream * ssl_downstream;
struct ssl_stream * ssl_upstream;
};
uint8_t is_plugin_opened;
@@ -109,15 +86,17 @@ struct tfe_stream_private
struct plugin_ctx * plug_ctx;
unsigned char passthrough; /* 1 if SSL passthrough is active */
evutil_socket_t fd_downstream;
evutil_socket_t fd_upstream;
/* For defer connection setup */
evutil_socket_t defer_fd_downstream;
evutil_socket_t defer_fd_upstream;
struct tfe_thread_ctx * thrmgr_ref;
future * async_future;
/* ASYNC UPSTREAM */
future * future_upstream_create;
/* ASYNC DOWNSTREAM */
future * future_downstream_create;
};
struct tfe_stream_private * tfe_stream_create(evutil_socket_t fd_downstream, evutil_socket_t fd_upstream,
enum tfe_session_proto session_type, tfe_thread_ctx * thread);
void tfe_stream_setup(struct tfe_stream_private * _stream);
static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream)
{
return _stream->proxy_ref->main_logger;
}

View File

@@ -2,8 +2,32 @@
#include <tfe_stream.h>
#include <event2/event.h>
#include <ssl_stream.h>
struct ssl_mgr;
struct key_keeper;
struct tfe_proxy
{
char name[TFE_SYMBOL_MAX];
struct event_base * evbase;
struct event * sev[8];
struct event * gcev;
struct tfe_config * opts;
void * main_logger;
unsigned int nr_work_threads;
struct tfe_thread_ctx * work_threads;
unsigned int nr_modules;
struct tfe_plugin * modules;
void * io_mod;
struct ssl_mgr * ssl_mgr_handler;
struct key_keeper * key_keeper_handler;
};
struct tfe_proxy;
struct tfe_proxy_accept_para
{
/* Both upstream and downstream FDs */
@@ -17,3 +41,4 @@ struct tfe_proxy_accept_para
struct tfe_proxy * tfe_proxy_new(const char * profile);
int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para);
void tfe_proxy_run(struct tfe_proxy * proxy);

View File

@@ -0,0 +1,16 @@
#pragma once
#include <sys/socket.h>
#include <openssl/ssl.h>
#include <tfe_stream.h>
struct sess_cache;
struct sess_cache * ssl_sess_cache_create(unsigned int slot_size, unsigned int expire_seconds, enum tfe_conn_dir served);
void ssl_sess_cache_destroy(struct sess_cache * cache);
void up_session_set(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni, SSL_SESSION * value);
SSL_SESSION * up_session_get(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni);
void down_session_set(struct sess_cache * cache, const SSL_SESSION * sess);
void down_session_del(struct sess_cache * cache, const SSL_SESSION * sess);
SSL_SESSION * down_session_get(struct sess_cache * cache, const unsigned char * id, int idlen);

View File

@@ -1,22 +1,29 @@
#pragma once
#include <event2/event.h>
#include <tfe_future.h>
#include <field_stat2.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/rand.h>
#include <openssl/x509.h>
#include <openssl/x509v3.h>
#include <MESA/field_stat2.h>
struct ssl_stream;
struct ssl_mgr;
struct ssl_mgr* ssl_manager_init(const char* ini_profile, const char* section, struct event_base *evbase, void* logger, screen_stat_handle_t* fs);
void ssl_manager_destroy(struct ssl_mgr* mgr);
struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct event_base * evbase,
void * logger, screen_stat_handle_t * fs);
void ssl_manager_destroy(struct ssl_mgr * mgr);
struct ssl_stream* ssl_upstream_create_result_release_stream(future_result_t* result);
struct bufferevent* ssl_upstream_create_result_release_bev(future_result_t* result);
void ssl_async_upstream_create(struct future* f, struct ssl_mgr* mgr, evutil_socket_t fd_upstream, evutil_socket_t fd_downstream, struct event_base *evbase);
struct ssl_stream* ssl_downstream_create_result_release_stream(future_result_t* result);
struct bufferevent* ssl_downstream_create_result_release_bev(future_result_t* result);
void ssl_async_downstream_create(struct future* f, struct ssl_mgr* mgr, struct ssl_stream* upstream, evutil_socket_t fd_downstream, int keyring_id, struct event_base *evbase);
void ssl_stream_free_and_close_fd(struct ssl_stream* stream, struct event_base *evbase, evutil_socket_t fd);
struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result);
struct bufferevent * ssl_upstream_create_result_release_bev(future_result_t * result);
void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream,
evutil_socket_t fd_downstream, struct event_base * evbase);
struct ssl_stream * ssl_downstream_create_result_release_stream(future_result_t * result);
struct bufferevent * ssl_downstream_create_result_release_bev(future_result_t * result);
void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream,
evutil_socket_t fd_downstream, int keyring_id, struct event_base * evbase);
void ssl_stream_free_and_close_fd(struct ssl_stream * stream, struct event_base * evbase, evutil_socket_t fd);

View File

@@ -0,0 +1,8 @@
#pragma once
#include <platform.h>
struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx);
void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type,
evutil_socket_t fd_downstream, evutil_socket_t fd_upstream);
void tfe_stream_destory(struct tfe_stream_private * stream);

View File

@@ -5,15 +5,18 @@
#include <unistd.h>
#include <sys/socket.h>
#include <sys/errno.h>
#include <MESA/MESA_prof_load.h>
#include <tfe_stream.h>
#include <evutil.h>
#include <event2/listener.h>
#include <pthread.h>
#include <assert.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <MESA/MESA_prof_load.h>
#include <tfe_stream.h>
#include <kni_acceptor.h>
#include <proxy.h>
#include <kni.h>
#include <platform.h>
#ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT
#define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler"

View File

@@ -24,34 +24,14 @@
#include <MESA/MESA_handle_logger.h>
#include <tfe_utils.h>
#include <tfe_stream.h>
#include <stream.h>
#include <platform.h>
#include <proxy.h>
#include <sescache.h>
#include <kni.h>
#include <kni_acceptor.h>
#include <tcp_stream.h>
static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1};
struct tfe_proxy
{
char name[TFE_SYMBOL_MAX];
struct event_base * evbase;
struct event * sev[sizeof(signals) / sizeof(int)];
struct event * gcev;
struct tfe_config * opts;
void * main_logger;
struct sess_cache * dsess_cache;
struct sess_cache * ssess_cache;
unsigned int nr_work_threads;
struct tfe_thread_ctx * work_threads;
unsigned int nr_modules;
struct tfe_plugin * modules;
void * io_mod;
};
const char * module_name_pxy = "TFE_PXY";
extern struct tfe_instance * g_tfe_instance;
@@ -142,14 +122,10 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p
unsigned int worker_tid = select_work_thread(ctx);
tfe_thread_ctx * worker_thread_ctx = &ctx->work_threads[worker_tid];
struct tfe_stream_private * stream = tfe_stream_create(para->upstream_fd,
para->downstream_fd, para->session_type, worker_thread_ctx);
struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx);
tfe_stream_init_by_fds(stream, para->session_type, para->downstream_fd, para->upstream_fd);
if (stream == NULL) goto __errout;
tfe_stream_setup(stream);
__errout:
return -1;
return 0;
}
/*
@@ -169,9 +145,6 @@ struct tfe_proxy * tfe_proxy_new(const char * profile)
event_enable_debug_mode();
proxy->evbase = event_base_new();
proxy->dsess_cache = session_cache_init();
proxy->ssess_cache = session_cache_init();
proxy->nr_modules = 2;
proxy->modules = ALLOC(struct tfe_plugin, proxy->nr_modules);
@@ -185,8 +158,6 @@ struct tfe_proxy * tfe_proxy_new(const char * profile)
{
proxy->work_threads[i].thread_id = i;
proxy->work_threads[i].evbase = event_base_new();
proxy->work_threads[i].dsess_cache = proxy->dsess_cache;
proxy->work_threads[i].ssess_cache = proxy->ssess_cache;
proxy->work_threads[i].nr_modules = proxy->nr_modules;
proxy->work_threads[i].modules = proxy->modules;
}

View File

@@ -2,72 +2,85 @@
#include <ssl_sess_cache.h>
#include <ssl.h>
#include <MESA_htable.h>
#include <field_stat2.h>
#define SESS_CACHE_NOT_FOUND -1
#define SESS_CACHE_FOUND 0
#define SESS_CACHE_UPDATE_OLD 1
#define SESS_CACHE_ADD_NEW 2
#define SESS_CACHE_INVALID 3
#include <MESA/MESA_htable.h>
#include <MESA/field_stat2.h>
#define SESS_CACHE_NOT_FOUND -1
#define SESS_CACHE_FOUND 0
#define SESS_CACHE_UPDATE_OLD 1
#define SESS_CACHE_ADD_NEW 2
#define SESS_CACHE_INVALID 3
struct asn1_sess
{
unsigned char* buff;
unsigned char * buff;
size_t size;
};
struct sess_set_args
{
MESA_htable_handle hash;
struct asn1_sess* new_sess;
struct asn1_sess * new_sess;
};
struct sess_cache
{
enum tfe_conn_dir served_for;
MESA_htable_handle hash;
long long hit_cnt, miss_cnt,del_err;
long long hit_cnt, miss_cnt, del_err;
};
static void ssl_sess_free_serialized(void *data)
static void ssl_sess_free_serialized(void * data)
{
struct asn1_sess* p=(struct asn1_sess*)data;
struct asn1_sess * p = (struct asn1_sess *) data;
free(p->buff);
p->size=0;
p->size = 0;
free(p);
return;
}
static struct asn1_sess* ssl_sess_serialize(SSL_SESSION *sess)
static struct asn1_sess * ssl_sess_serialize(SSL_SESSION * sess)
{
struct asn1_sess* result=ALLOC(struct asn1_sess,1);
result->size = i2d_SSL_SESSION(sess, NULL);
/*When using i2d_SSL_SESSION(), the memory location pointed to by pp must be large enough to hold the binary representation of the session.
There is no known limit on the size of the created ASN1 representation, so the necessary amount of space should be obtained by first calling
i2d_SSL_SESSION() with pp=NULL, and obtain the size needed, then allocate the memory and call i2d_SSL_SESSION() again.*/
result->buff=ALLOC(unsigned char,result->size);
struct asn1_sess * result = ALLOC(struct asn1_sess, 1);
int __i2d_size = i2d_SSL_SESSION(sess, NULL);
result->size = (size_t) __i2d_size;
assert(__i2d_size > 0);
/* When using i2d_SSL_SESSION(), the memory location pointed to by pp must be large enough to
* hold the binary representation of the session. There is no known limit on the size of the
* created ASN1 representation, so the necessary amount of space should be obtained by first
* calling i2d_SSL_SESSION() with pp=NULL, and obtain the size needed,
* then allocate the memory and call i2d_SSL_SESSION() again.*/
result->buff = ALLOC(unsigned char, result->size);
i2d_SSL_SESSION(sess, &(result->buff));
return result;
}
static SSL_SESSION * ssl_sess_deserialize(const struct asn1_sess* asn1)
static SSL_SESSION * ssl_sess_deserialize(const struct asn1_sess * asn1)
{
SSL_SESSION *sess=NULL;
sess = d2i_SSL_SESSION(NULL, &(asn1->buff), asn1->size); /* increments asn1 */
SSL_SESSION * sess = NULL;
d2i_SSL_SESSION(&sess, (const unsigned char **) &(asn1->buff), (long) asn1->size); /* increments asn1 */
return sess;
}
static int ssl_sess_varify_cb(void *data, int eliminate_type)
static int ssl_sess_verify_cb(void * data, int eliminate_type)
{
SSL_SESSION *sess=NULL;
int ret=0;
const struct asn1_sess* asn1=(struct asn1_sess*)data;
if(eliminate_type==ELIMINATE_TYPE_NUM)
const struct asn1_sess * asn1 = (struct asn1_sess *) data;
if (eliminate_type == ELIMINATE_TYPE_NUM)
{
return 1; //direct expired.
return 1; //direct expired.
}
sess=ssl_sess_deserialize(asn1);
ret=ssl_session_is_valid(sess);
SSL_SESSION * sess = ssl_sess_deserialize(asn1);
int ret = ssl_session_is_valid(sess);
SSL_SESSION_free(sess);
if(ret==0)
if (ret == 0)
{
return 1; //should be expired (deleted).
return 1; //should be expired (deleted).
}
else
{
@@ -75,126 +88,134 @@ static int ssl_sess_varify_cb(void *data, int eliminate_type)
}
}
static long sess_cache_get_cb(void *data, const uchar *key, uint size, void *user_arg)
static long sess_cache_get_cb(void * data, const uchar * key, uint size, void * user_arg)
{
SSL_SESSION *sess=NULL;
int is_valid=0;
if(data==NULL)
SSL_SESSION * sess = NULL;
int is_valid = 0;
if (data == NULL)
{
return SESS_CACHE_NOT_FOUND;
}
const struct asn1_sess* asn1=(struct asn1_sess*)data;
sess=ssl_sess_deserialize(data,asn1);
is_valid=ssl_session_is_valid(sess);
if(is_valid==0)
const struct asn1_sess * asn1 = (struct asn1_sess *) data;
sess = ssl_sess_deserialize(asn1);
is_valid = ssl_session_is_valid(sess);
if (is_valid == 0)
{
SSL_SESSION_free(sess);
return SESS_CACHE_INVALID;
}
else
{
*(SSL_SESSION **)user_arg=sess;
*(SSL_SESSION **) user_arg = sess;
return SESS_CACHE_FOUND;
}
}
static long sess_cache_set_cb(void *data, const uchar *key, uint size, void *user_arg)
static long sess_cache_set_cb(void * data, const uchar * key, uint size, void * user_arg)
{
struct sess_set_args* args=(struct sess_set_args*)user_arg;
struct asn1_sess* new_asn1=args->new_sess;
struct asn1_sess *cur_asn1=(struct asn1_sess*)data;
int ret=0;
if(cur_asn1!=NULL)
struct sess_set_args * args = (struct sess_set_args *) user_arg;
struct asn1_sess * new_asn1 = args->new_sess;
struct asn1_sess * cur_asn1 = (struct asn1_sess *) data;
int ret = 0;
if (cur_asn1 != NULL)
{
free(cur_asn1->buff);
cur_asn1->size=new_asn1->size;
cur_asn1->buff=ALLOC(unsigned char, cur_asn1->size);
memcpy(cur_asn1->buff,new_asn1->buff,cur_asn1->size);
cur_asn1->size = new_asn1->size;
cur_asn1->buff = ALLOC(unsigned char, cur_asn1->size);
memcpy(cur_asn1->buff, new_asn1->buff, cur_asn1->size);
return SESS_CACHE_UPDATE_OLD;
}
else
{
ret=MESA_htable_add(args->hash, key, size, new_asn1);
assert(ret>=0);
ret = MESA_htable_add(args->hash, key, size, new_asn1);
assert(ret >= 0);
return SESS_CACHE_ADD_NEW;
}
}
static int upsess_mk_key(struct sockaddr * addr, socklen_t addr_len, const char* sni, unsigned char** key_buf)
static size_t upsess_mk_key(struct sockaddr * addr, socklen_t addrlen, const char * sni, unsigned char ** key_buf)
{
size_t key_size=0;
unsigned char* tmp=NULL, p=NULL;
size_t key_size = 0;
unsigned char * tmp = NULL;
size_t tmp_size;
dynbuf_t tmp, *db;
short port;
size_t snilen;
switch (((struct sockaddr_storage *)addr)->ss_family) {
switch (addr->sa_family)
{
case AF_INET:
tmp = (unsigned char *)
&((struct sockaddr_in*)addr)->sin_addr;
tmp = (unsigned char *)&((struct sockaddr_in *) addr)->sin_addr;
tmp_size = sizeof(struct in_addr);
port = ((struct sockaddr_in*)addr)->sin_port;
port = ((struct sockaddr_in *) addr)->sin_port;
break;
case AF_INET6:
tmp = (unsigned char *)
&((struct sockaddr_in6*)addr)->sin6_addr;
tmp = (unsigned char *)&((struct sockaddr_in6 *) addr)->sin6_addr;
tmp_size = sizeof(struct in6_addr);
port = ((struct sockaddr_in6*)addr)->sin6_port;
port = ((struct sockaddr_in6 *) addr)->sin6_port;
break;
default:
//should never happens.
assert(0);
break;
}
snilen = sni ? strlen(sni) : 0;
key_size=tmp_size+sizeof(port)+snilen;
*key_buf=ALLOC(unsigned char, key_size);
p=*key_buff;
memcpy(p,tmp,tmp_size);
p+=tmp_size;
memcpy(p, (char*)&port, sizeof(port));
p+=sizeof(port);
return key_size;
snilen = sni ? strlen(sni) : 0;
key_size = tmp_size + sizeof(port) + snilen;
*key_buf = ALLOC(unsigned char, key_size);
unsigned char * p = *key_buf;
memcpy(p, tmp, tmp_size);
p += tmp_size;
memcpy(p, (char *) &port, sizeof(port));
p += sizeof(port);
return key_size;
}
void up_session_set(struct sess_cache* cache, struct sockaddr * addr, socklen_t addr_len, const char* sni, SSL_SESSION * sess)
void up_session_set(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni,
SSL_SESSION * sess)
{
char* key=NULL;
int ret=0;
size_t key_size=0;
long cb_ret=0;
void* no_use=NULL;
assert(cache->served_for==CONN_DIR_UPSTREAM);
key_size=upsess_mk_key(addr, addr_len, sni, &key);
struct asn1_sess* asn1=NULL;
asn1=ssl_sess_serialize(sess);
unsigned char * key = NULL;
int ret = 0;
size_t key_size = 0;
long cb_ret = 0;
void * no_use = NULL;
assert(cache->served_for == CONN_DIR_UPSTREAM);
key_size = upsess_mk_key(addr, addr_len, sni, &key);
struct asn1_sess * asn1 = NULL;
asn1 = ssl_sess_serialize(sess);
struct sess_set_args set_args;
set_args.hash=cache->hash;
set_args.new_sess=asn1;
no_use=MESA_htable_search_cb(cache->hash, key, key_size, sess_cache_set_cb, &set_args,&cb_ret);
if(cb_ret==SESS_CACHE_UPDATE_OLD)
set_args.hash = cache->hash;
set_args.new_sess = asn1;
no_use = MESA_htable_search_cb(cache->hash, key, key_size, sess_cache_set_cb, &set_args, &cb_ret);
if (cb_ret == SESS_CACHE_UPDATE_OLD)
{
ssl_sess_free_serialized(asn1);
}
free(key);
return;
}
SSL_SESSION* up_session_get(struct sess_cache* cache, struct sockaddr * addr, socklen_t addr_len, const char* sni)
SSL_SESSION * up_session_get(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni)
{
SSL_SESSION* sess=NULL;
void* no_use=NULL;
long cb_ret=0;
char* key=NULL;
size_t key_size=0;
assert(cache->served_for==CONN_DIR_UPSTREAM);
key_size=upsess_mk_key(addr, addr_len, sni, &key);
no_use=MESA_htable_search_cb(cache->hash, key, key_size,sess_cache_get_cb, &sess, &cb_ret);
SSL_SESSION * sess = NULL;
void * no_use = NULL;
long cb_ret = 0;
size_t key_size = 0;
assert(cache->served_for == CONN_DIR_UPSTREAM);
unsigned char * key = NULL;
key_size = upsess_mk_key(addr, addr_len, sni, &key);
no_use = MESA_htable_search_cb(cache->hash, key, key_size, sess_cache_get_cb, &sess, &cb_ret);
free(key);
key=NULL;
if(cb_ret==1)
key = NULL;
if (cb_ret == 1)
{
cache->hit_cnt++;
return sess;
@@ -206,39 +227,41 @@ SSL_SESSION* up_session_get(struct sess_cache* cache, struct sockaddr *
}
}
void down_session_set(struct sess_cache* cache, const SSL_SESSION* sess)
void down_session_set(struct sess_cache * cache, const SSL_SESSION * sess)
{
unsigned int idlen=0;
struct asn1_sess* asn1=NULL;
long cb_ret=0;
void* no_use=NULL;
int ret=0;
assert(cache->served_for==CONN_DIR_DOWNSTREAM);
asn1=ssl_sess_serialize(sess);
unsigned int idlen = 0;
struct asn1_sess * asn1 = NULL;
long cb_ret = 0;
void * no_use = NULL;
int ret = 0;
assert(cache->served_for == CONN_DIR_DOWNSTREAM);
asn1 = ssl_sess_serialize((SSL_SESSION *) sess);
/*
* SSL_SESSION_get_id() returns a pointer to the internal session id value for the session s.
* The length of the id in bytes is stored in *idlen. The length may be 0.
* The caller should not free the returned pointer directly.
*/
const unsigned char* id = SSL_SESSION_get_id(sess, &idlen);
struct sess_set_args set_args;
set_args.hash=cache->hash;
set_args.new_sess=asn1;
no_use=MESA_htable_search_cb(cache->hash, id, (unsigned int)idlen, sess_cache_set_cb, &set_args,&cb_ret);
if(cb_ret==SESS_CACHE_UPDATE_OLD)
const unsigned char * id = SSL_SESSION_get_id(sess, &idlen);
struct sess_set_args set_args{.hash = cache->hash, .new_sess = asn1};
no_use = MESA_htable_search_cb(cache->hash, id, (unsigned int) idlen, sess_cache_set_cb, &set_args, &cb_ret);
if (cb_ret == SESS_CACHE_UPDATE_OLD)
{
ssl_sess_free_serialized(asn1);
}
return;
}
SSL_SESSION* down_session_get(struct sess_cache* cache, unsigned char * id, int idlen)
SSL_SESSION * down_session_get(struct sess_cache * cache, const unsigned char * id, int idlen)
{
SSL_SESSION* sess=NULL;
void* no_use=NULL;
long cb_ret=0;
assert(cache->served_for==CONN_DIR_DOWNSTREAM);
no_use=MESA_htable_search_cb(cache->hash, id, (unsigned int)idlen, sess_cache_get_cb, &sess,&cb_ret);
if(cb_ret==1)
SSL_SESSION * sess = NULL;
void * no_use = NULL;
long cb_ret = 0;
assert(cache->served_for == CONN_DIR_DOWNSTREAM);
no_use = MESA_htable_search_cb(cache->hash, id, (unsigned int) idlen, sess_cache_get_cb, &sess, &cb_ret);
if (cb_ret == 1)
{
cache->hit_cnt++;
return sess;
@@ -250,49 +273,66 @@ SSL_SESSION* down_session_get(struct sess_cache* cache, unsigned char
}
}
void down_session_del(struct sess_cache* cache, const SSL_SESSION* sess)
void down_session_del(struct sess_cache * cache, const SSL_SESSION * sess)
{
assert(cache->served_for==CONN_DIR_DOWNSTREAM);
unsigned int len=0;
const unsigned char* id = SSL_SESSION_get_id(sess, &len);
int ret=MESA_htable_del(cache->hash, id, len, NULL);
if(ret!=MESA_HTABLE_RET_OK)
assert(cache->served_for == CONN_DIR_DOWNSTREAM);
unsigned int len = 0;
const unsigned char * id = SSL_SESSION_get_id(sess, &len);
int ret = MESA_htable_del(cache->hash, id, len, NULL);
if (ret != MESA_HTABLE_RET_OK)
{
cache->del_err++;
}
return;
}
struct sess_cache* ssl_sess_cache_create(int slot_size, int expire_seconds, enum tfe_conn_dir served)
int __wrapper_MESA_htable_set_opt(MESA_htable_handle table, enum MESA_htable_opt opt_type, unsigned value)
{
struct sess_cache* cache=ALLOC(struct sess_cache, 1);
MESA_htable_handle htable=NULL;
int ret=0,max_num=slot_size*4;
htable=MESA_htable_born();
value=0;//no print
ret=MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, &(value), sizeof(value));
value=1;//thread safe
ret=MESA_htable_set_opt(htable, MHO_THREAD_SAFE, value, sizeof(value));
assert(ret==0);
value=16;
ret=MESA_htable_set_opt(htable, MHO_MUTEX_NUM, value, sizeof(value));
ret=MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, &(slot_size), sizeof(slot_size));
ret=MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, &(max_num), sizeof(max_num));
ret=MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, &(expire_seconds), sizeof(expire_seconds));
value=HASH_ELIMINATE_ALGO_FIFO;
ret=MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, &(value), sizeof(value));
ret=MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE, ssl_sess_free_serialized, sizeof(ssl_sess_free_serialized));
ret=MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY, ssl_sess_varify_cb, sizeof(ssl_sess_varify_cb));
assert(ret==0);
ret=MESA_htable_mature(htable);
assert(ret==0);
cache->hash=htable;
cache->served_for=served;
int ret = MESA_htable_set_opt(table, opt_type, &value, (int)(sizeof(value)));
assert(ret == 0);
return ret;
}
int __wrapper_MESA_htable_set_opt(MESA_htable_handle table, enum MESA_htable_opt opt_type, void * val, size_t len)
{
int ret = MESA_htable_set_opt(table, opt_type, val, (int)len);
assert(ret == 0);
return ret;
}
struct sess_cache * ssl_sess_cache_create(unsigned int slot_size, unsigned int expire_seconds, enum tfe_conn_dir served)
{
struct sess_cache * cache = ALLOC(struct sess_cache, 1);
unsigned max_num = slot_size * 4;
int ret = 0;
MESA_htable_handle htable = MESA_htable_born();
ret = __wrapper_MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, 0);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_THREAD_SAFE, 1);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_MUTEX_NUM, 16);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, slot_size);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, max_num);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, expire_seconds);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE,
HASH_ELIMINATE_ALGO_FIFO);
ret = __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE,
(void *)ssl_sess_free_serialized, sizeof(&ssl_sess_free_serialized));
ret = __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY,
(void *)ssl_sess_verify_cb, sizeof(&ssl_sess_verify_cb));
ret = MESA_htable_mature(htable);
assert(ret == 0);
cache->hash = htable;
cache->served_for = served;
return cache;
}
void ssl_sess_cache_destroy(struct sess_cache* cache)
void ssl_sess_cache_destroy(struct sess_cache * cache)
{
MESA_htable_destroy(cache->hash, NULL);
cache->hash=NULL;
cache->hash = NULL;
free(cache);
return;
}

View File

@@ -1,15 +0,0 @@
#pragma once
#include <sys/socket.h>
#include <openssl/ssl.h>
#include <tfe_stream.h>
struct sess_cache;
struct sess_cache* ssl_sess_cache_create(int slot_size, int expire_seconds, enum tfe_conn_dir served);
void ssl_sess_cache_destroy(struct sess_cache* cache);
void up_session_set(struct sess_cache* cache, struct sockaddr * addr, socklen_t addr_len, const char* sni, SSL_SESSION * value)
SSL_SESSION* up_session_get(struct sess_cache* cache, struct sockaddr *addr, socklen_t addr_len, const char* sni);
void down_session_set(struct sess_cache* cache, const SSL_SESSION* sess);
SSL_SESSION* down_session_get(struct sess_cache* cache, unsigned char * id, int idlen);

File diff suppressed because it is too large Load Diff

492
platform/src/tcp_stream.cpp Normal file
View File

@@ -0,0 +1,492 @@
#include <netinet/in.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/dns.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/rand.h>
#include <openssl/x509.h>
#include <openssl/x509v3.h>
#include <tfe_stream.h>
#include <tfe_utils.h>
#include <tfe_future.h>
#include <platform.h>
#include <ssl_stream.h>
#include <tcp_stream.h>
#include <cert.h>
#include <proxy.h>
#ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT
#define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024)
#endif
/* forward declaration of libevent callbacks */
static void tfe_stream_readcb(struct bufferevent *, void *);
static void tfe_stream_writecb(struct bufferevent *, void *);
static void tfe_stream_eventcb(struct bufferevent *, short, void *);
static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream)
{
return container_of(stream, struct tfe_stream_private, head);
}
static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream));
}
static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream));
}
static inline enum tfe_conn_dir __DIR(struct tfe_stream_private * _stream, struct bufferevent * bev)
{
return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM);
}
static inline bool __IS_SSL(struct tfe_stream_private * _stream)
{
return (_stream->session_type == SESSION_PROTO_SSL);
}
void tfe_stream_detach(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream);
int plug_id = _stream->calling_idx;
_stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED;
return;
}
int tfe_stream_preempt(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream);
int plug_id = _stream->calling_idx;
int i = 0;
for (i = 0; i < _stream->plugin_num; i++)
{
if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION)
{
return -1;
}
}
_stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION;
return 0;
}
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
if (this_conn->on_writing == 1)
{
return NULL;
}
this_conn->w_ctx.dir = dir;
this_conn->w_ctx._stream = _stream;
this_conn->on_writing = 1;
bufferevent_disable(peer_conn->bev, EV_READ);
return &(this_conn->w_ctx);
}
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{
struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);;
int ret = bufferevent_write(this_conn->bev, data, size);
return ret;
}
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{
struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(w_ctx->_stream, w_ctx->dir);
this_conn->on_writing = 0;
bufferevent_enable(peer_conn->bev, EV_READ);
return;
}
int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size)
{
int ret = 0;
struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir);
ret = tfe_stream_write_frag(wctx, data, size);
tfe_stream_write_frag_end(wctx);
return ret;
}
/*
* Callback for read events on the up- and downstream connection bufferevents.
* Called when there is data ready in the input evbuffer.
*/
static void tfe_stream_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __DIR(_stream, bev);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
int i = 0, ret = 0;
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA;
const struct tfe_plugin * plugins = _stream->thread_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thread_ref->nr_modules;
struct evbuffer * inbuf = bufferevent_get_input(bev);
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0;
const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len);
_stream->defere_bytes = 0;
_stream->drop_bytes = 0;
_stream->forward_bytes = 0;
for (i = 0; i < plug_num; i++)
{
_stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i;
if (_stream->is_plugin_opened == 0)
{
action_tmp = plugins[i].on_open(&_stream->head, _stream->thread_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme));
_stream->is_plugin_opened = 1;
}
else
{
action_tmp = plugins[i].on_data(&_stream->head, _stream->thread_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme));
}
if (plug_ctx->state == PLUG_STATE_PREEPTION)
{
action_final = action_tmp;
}
}
switch (action_final)
{
case ACTION_FORWARD_DATA:
if (_stream->forward_bytes > 0)
{
evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes);
}
else
{
evbuffer_add_buffer(outbuf, inbuf);
}
break;
case ACTION_DROP_DATA:
if (_stream->drop_bytes > 0)
{
drain_size = _stream->drop_bytes;
}
else
{
drain_size = evbuffer_get_length(inbuf);
}
evbuffer_drain(inbuf, drain_size);
case ACTION_DEFER_DATA:
if (_stream->defere_bytes > 0)
{
bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0);
}
break;
default: assert(0);
break;
}
if (evbuffer_get_length(inbuf) != 0)
{
bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT)
{
bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2,
TFE_CONFIG_OUTPUT_LIMIT_DEFAULT);
bufferevent_disable(bev, EV_READ);
}
return;
}
/*
* Callback for write events on the up- and downstream connection bufferevents.
* Called when either all data from the output evbuffer has been written,
* or if the outbuf is only half full again after having been full.
*/
static void tfe_stream_writecb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __DIR(_stream, bev);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
struct evbuffer * outbuf = bufferevent_get_output(bev);
if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ))
{
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(peer_conn->bev, EV_READ);
}
}
/*
* Callback for meta events on the up- and downstream connection bufferevents.
* Called when EOF has been reached, a connection has been made, and on errors.
*/
static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = __DIR(_stream, bev);
struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir);
struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir);
const struct tfe_plugin * plugins = _stream->thread_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thread_ref->nr_modules, i = 0;
enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED;
if (events & BEV_EVENT_ERROR)
{
this_conn->closed = 1;
reason = REASON_ERROR;
goto call_plugin_close;
}
if (events & BEV_EVENT_EOF)
{
//generate a 0 size read callback to notify plugins.
tfe_stream_readcb(bev, arg);
this_conn->closed = 1;
}
if (peer_conn->closed == 1 && this_conn->closed == 1)
{
reason = REASON_PASSIVE_CLOSED;
goto call_plugin_close;
}
return;
call_plugin_close:
for (i = 0; i < plug_num; i++)
{
_stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i;
plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme));
}
tfe_stream_destory(_stream);
return;
}
static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, evutil_socket_t fd)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
struct event_base * __ev_base = stream->thread_ref->evbase;
__conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS);
__conn_private->fd = fd;
if (!__conn_private->bev)
{
TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd);
goto __errout;
}
bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream);
bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE);
return __conn_private;
__errout:
if (__conn_private != NULL) free(__conn_private);
return NULL;
}
static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, struct bufferevent * bev)
{
struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1);
__conn_private->bev = bev;
__conn_private->fd = bufferevent_getfd(bev);
bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream);
bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE);
return __conn_private;
}
evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn)
{
evutil_socket_t __to_release_fd = conn->fd;
conn->fd = 0;
return __to_release_fd;
}
static void __conn_private_destory(struct tfe_conn_private * conn)
{
return;
}
void ssl_downstream_create_on_success(future_result_t * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
struct ssl_stream * downstream = ssl_downstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_downstream_create_result_release_bev(result);
_stream->conn_downstream = __conn_private_create(_stream, bev);
_stream->ssl_downstream = downstream;
future_destroy(_stream->future_downstream_create);
_stream->future_downstream_create = NULL;
_stream->defer_fd_downstream = 0;
return;
}
void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
return;
}
void ssl_upstream_create_on_success(future_result_t * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
struct event_base * ev_base = _stream->thread_ref->evbase;
struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_upstream_create_result_release_bev(result);
assert(upstream != NULL && bev != NULL);
/* Create connection ctx by bev */
_stream->conn_upstream = __conn_private_create(_stream, bev);
_stream->ssl_upstream = upstream;
future_destroy(_stream->future_upstream_create);
_stream->future_upstream_create = NULL;
_stream->defer_fd_upstream = 0;
/* Next, create downstream */
_stream->future_downstream_create = future_create(ssl_downstream_create_on_success,
ssl_downstream_create_on_fail, _stream);
ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr,
_stream->ssl_upstream, _stream->defer_fd_downstream, /* KEYRING ID */ 0, ev_base);
}
void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user)
{
assert(0);
}
struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx)
{
struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1);
_stream->thread_ref = thread_ctx;
_stream->proxy_ref = pxy;
return (struct tfe_stream *) &_stream->head;
}
void tfe_stream_destory(struct tfe_stream_private * stream)
{
struct tfe_thread_ctx * thread = stream->thread_ref;
struct tfe_proxy * proxy = stream->proxy_ref;
struct event_base * ev_base = thread->evbase;
if (__IS_SSL(stream) && stream->ssl_upstream)
{
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream);
ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd);
}
if (__IS_SSL(stream) && stream->ssl_downstream)
{
evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream);
ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd);
}
if (stream->conn_upstream)
{
__conn_private_destory(stream->conn_upstream);
}
if (stream->conn_downstream)
{
__conn_private_destory(stream->conn_downstream);
}
if (stream->defer_fd_downstream)
{
evutil_closesocket(stream->defer_fd_downstream);
}
if (stream->defer_fd_upstream)
{
evutil_closesocket(stream->defer_fd_upstream);
}
if (stream->future_downstream_create)
{
future_destroy(stream->future_downstream_create);
}
if (stream->future_upstream_create)
{
future_destroy(stream->future_upstream_create);
}
free(stream);
thread->load--;
}
void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type,
evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
{
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
struct event_base * ev_base = _stream->thread_ref->evbase;
if (session_type == SESSION_PROTO_PLAIN)
{
_stream->conn_downstream = __conn_private_create(_stream, fd_downstream);
_stream->conn_upstream = __conn_private_create(_stream, fd_upstream);
assert(_stream->conn_downstream != NULL);
assert(_stream->conn_upstream != NULL);
}
if (session_type == SESSION_PROTO_SSL)
{
_stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler;
_stream->future_upstream_create = future_create(
ssl_upstream_create_on_success, ssl_upstream_create_on_fail, (void *) _stream);
/* Defer setup conn_downstream & conn_upstream in async callbacks. */
ssl_async_upstream_create(_stream->future_upstream_create,
_stream->ssl_mgr, fd_upstream, fd_downstream, ev_base);
_stream->defer_fd_downstream = fd_downstream;
_stream->defer_fd_upstream = fd_upstream;
}
_stream->session_type = session_type;
}

View File

@@ -1,560 +0,0 @@
#include <netinet/in.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/dns.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/rand.h>
#include <openssl/x509.h>
#include <openssl/x509v3.h>
#include <tfe_stream.h>
#include <tfe_utils.h>
#include <tfe_future.h>
#include <ssl_stream.h>
#include <stream.h>
#include <cert.h>
#define STREAM_EVBASE(s) ((s)->thrmgr_ref->evbase)
/*
* Maximum size of data to buffer per connection direction before
* temporarily stopping to read data from the other end.
*/
#define OUTBUF_LIMIT (1024*1024)
/*
* Print helper for logging code.
*/
#define STRORDASH(x) (((x)&&*(x))?(x):"-")
/* forward declaration of libevent callbacks */
static void tfe_stream_readcb(struct bufferevent *, void *);
static void tfe_stream_writecb(struct bufferevent *, void *);
static void tfe_stream_eventcb(struct bufferevent *, short, void *);
static void stream_fd_readcb(evutil_socket_t, short, void *);
static void tfe_stream_free(struct tfe_stream_private * stream)
{
struct tfe_thread_ctx * thread = stream->thrmgr_ref;
thread->load--;
switch (stream->session_type)
{
case SESSION_PROTO_SSL:
#if 0
ssl_upstream_free(stream->ssl_upstream);
ssl_downstream_free(stream->ssl_downstream);
#endif
thread->stat.value[SSL_NUM]--;
break;
default: break;
}
free(stream);
thread->stat.value[STREAM_NUM]--;
return;
}
void tfe_stream_detach(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) stream;
int plug_id = _stream->calling_idx;
_stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED;
return;
}
int tfe_stream_preempt(const struct tfe_stream * stream)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) stream;
int plug_id = _stream->calling_idx;
int i = 0;
for (i = 0; i < _stream->plugin_num; i++)
{
if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION)
{
return -1;
}
}
_stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION;
return 0;
}
static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
struct tfe_conn_private * this_conn = NULL;
this_conn = ((dir == CONN_DIR_UPSTREAM) ? &(_stream->conn_downstream) : &(_stream->conn_upstream));
return this_conn;
}
static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir)
{
struct tfe_conn_private * peer_conn = NULL;
peer_conn = (dir == CONN_DIR_UPSTREAM) ? &(_stream->conn_downstream) : &(_stream->conn_upstream);
return peer_conn;
}
struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) stream;
struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
if (this_conn->on_writing == 1)
{
return NULL;
}
this_conn->w_ctx.dir = dir;
this_conn->w_ctx._stream = _stream;
this_conn->on_writing = 1;
bufferevent_disable(peer_conn->bev, EV_READ);
return &(this_conn->w_ctx);
}
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{
struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);;
int ret = bufferevent_write(this_conn->bev, data, size);
return ret;
}
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{
struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);
struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir);
this_conn->on_writing = 0;
bufferevent_enable(peer_conn->bev, EV_READ);
return;
}
int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size)
{
int ret = 0;
struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir);
ret = tfe_stream_write_frag(wctx, data, size);
tfe_stream_write_frag_end(wctx);
return ret;
}
/*
#define ON_OPEN_CALL 0
#define ON_DATA_CALL 1
#define ON_CLOSE_CALL 2
enum tfe_stream_action tfe_stream_call_plugin(struct tfe_stream_private* _stream, enum tfe_conn_dir dir, int what, struct evbuffer * inbuf)
{
size_t contigous_len=evbuffer_get_length(inbuf),drain_size=0;
const char* contiguous_data=evbuffer_pullup(inbuf,contigous_len);
int i=0,ret=0;
int plug_num=_stream->thrmgr_ref->module_num;
const struct tfe_plugin* plugins=_stream->thrmgr_ref->modules;
struct plugin_ctx* plug_ctx=NULL;
enum tfe_stream_action action_tmp=ACTION_FORWARD_DATA, action_final=ACTION_FORWARD_DATA;
_stream->defere_bytes=0;
_stream->drop_bytes=0;
_stream->forward_bytes=0;
switch(what)
{
case ON_OPEN_CALL:
for(i=0;i<plug_num;i++)
{
action_tmp=plugins[i].on_open(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme));
if(plug_ctx->state=PLUG_STATE_PREEPTION)
{
action_final=action_tmp;
}
}
break;
case ON_DATA_CALL:
for(i=0;i<plug_num;i++)
{
action_tmp=plugins[i].on_data(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme));
if(plug_ctx->state=PLUG_STATE_PREEPTION)
{
action_final=action_tmp;
}
}
case ON_CLOSE_CALL:
for(i=0;i<plug_num;i++)
{
plugins[i].on_close(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme));
if(plug_ctx->state=PLUG_STATE_PREEPTION)
{
action_final=action_tmp;
}
}
}
for(i=0;i<plug_num;i++)
{
_stream->calling_idx=i;
switch(what)
{
}
plug_ctx=_stream->plug_ctx+i;
if(_stream->is_fisrt_read==1)
{
action_tmp=plugins[i].on_open(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme));
_stream->is_fisrt_read=0;
}
else
{
action_tmp=plugins[i].on_data(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme));
}
if(plug_ctx->state=PLUG_STATE_PREEPTION)
{
action_final=action_tmp;
}
}
}
*/
/*
* Callback for read events on the up- and downstream connection bufferevents.
* Called when there is data ready in the input evbuffer.
*/
static void tfe_stream_readcb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = (bev == _stream->conn_downstream.bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM;
struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
int i = 0, ret = 0;
enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA;
const struct tfe_plugin * plugins = _stream->thrmgr_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thrmgr_ref->nr_modules;
struct evbuffer * inbuf = bufferevent_get_input(bev);
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0;
const unsigned char * contiguous_data = (const unsigned char *)evbuffer_pullup(inbuf, contigous_len);
_stream->defere_bytes = 0;
_stream->drop_bytes = 0;
_stream->forward_bytes = 0;
for (i = 0; i < plug_num; i++)
{
_stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i;
if (_stream->is_plugin_opened == 0)
{
action_tmp = plugins[i].on_open(&_stream->head, _stream->thrmgr_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme));
_stream->is_plugin_opened = 1;
}
else
{
action_tmp = plugins[i].on_data(&_stream->head, _stream->thrmgr_ref->thread_id,
dir, contiguous_data, contigous_len, &(plug_ctx->pme));
}
if (plug_ctx->state == PLUG_STATE_PREEPTION)
{
action_final = action_tmp;
}
}
switch (action_final)
{
case ACTION_FORWARD_DATA:
if (_stream->forward_bytes > 0)
{
evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes);
}
else
{
evbuffer_add_buffer(outbuf, inbuf);
}
break;
case ACTION_DROP_DATA:
if (_stream->drop_bytes > 0)
{
drain_size = _stream->drop_bytes;
}
else
{
drain_size = evbuffer_get_length(inbuf);
}
evbuffer_drain(inbuf, drain_size);
case ACTION_DEFER_DATA:
if (_stream->defere_bytes > 0)
{
bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0);
}
break;
default:
assert(0);
break;
}
if (evbuffer_get_length(inbuf) != 0)
{
bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
}
if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT)
{
/* temporarily disable data source;
* set an appropriate watermark. */
bufferevent_setwatermark(peer_conn->bev, EV_WRITE, OUTBUF_LIMIT / 2, OUTBUF_LIMIT);
bufferevent_disable(bev, EV_READ);
}
return;
}
/*
* Callback for write events on the up- and downstream connection bufferevents.
* Called when either all data from the output evbuffer has been written,
* or if the outbuf is only half full again after having been full.
*/
static void tfe_stream_writecb(struct bufferevent * bev, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = (bev == _stream->conn_downstream.bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM;
struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
struct evbuffer * outbuf = bufferevent_get_output(bev);
if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ))
{
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(peer_conn->bev, EV_READ);
}
}
/*
* Callback for meta events on the up- and downstream connection bufferevents.
* Called when EOF has been reached, a connection has been made, and on errors.
*/
static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg;
enum tfe_conn_dir dir = (bev == _stream->conn_downstream.bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM;
struct tfe_conn_private * this_conn = __this_conn(_stream, dir);
struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir);
const struct tfe_plugin * plugins = _stream->thrmgr_ref->modules;
struct plugin_ctx * plug_ctx = NULL;
int plug_num = _stream->thrmgr_ref->nr_modules, i = 0;
enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED;
if (events & BEV_EVENT_ERROR)
{
this_conn->closed = 1;
reason = REASON_ERROR;
goto call_plugin_close;
}
if (events & BEV_EVENT_EOF)
{
//generate a 0 size read callback to notify plugins.
tfe_stream_readcb(bev, arg);
this_conn->closed = 1;
}
if (peer_conn->closed == 1 && this_conn->closed == 1)
{
reason = REASON_PASSIVE_CLOSED;
goto call_plugin_close;
}
return;
call_plugin_close:
for (i = 0; i < plug_num; i++)
{
_stream->calling_idx = i;
plug_ctx = _stream->plug_ctx + i;
plugins[i].on_close(&(_stream->head), _stream->thrmgr_ref->thread_id, reason, &(plug_ctx->pme));
}
tfe_stream_free(_stream);
return;
}
void ssl_get_cert_on_succ(void * result, void * user)
{
cert_t * cert = (cert_t *) result;
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
//_stream->ssl_downstream->ssl = downstream_ssl_create(_stream);
//_stream->ssl_downstream->ssl = downstream_ssl_create(_stream);
cert_free(cert);
bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream);
bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream);
bufferevent_enable(_stream->head.upstream.bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->head.downstream.bev, EV_READ | EV_WRITE);
future_destroy(_stream->ssl_downstream->future_get_cert);
_stream->ssl_downstream->future_get_cert = NULL;
return;
}
void ssl_get_cert_on_fail(enum e_future_error err, const char * what, void * user)
{
assert(0);
}
void ssl_conn_origin_on_succ(void * result, void * user)
{
struct bufferevent * bev = (struct bufferevent *) result;
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
_stream->head.upstream.bev = bev;
_stream->ssl_upstream->ssl = bufferevent_openssl_get_ssl(bev); /* does not inc refc */
_stream->ssl_upstream->orig_cert = SSL_get_peer_certificate(_stream->ssl_upstream->ssl);
#if 0
up_session_set(_stream->thrmgr_ref->dsess_cache, _stream->ssl_downstream->sni,
SSL_get0_session(_stream->ssl_upstream->ssl));
#endif
_stream->ssl_downstream->future_get_cert = future_create(ssl_get_cert_on_succ, ssl_get_cert_on_fail, _stream);
#if 0
cert_mgr_async_get(_stream->ssl_downstream->future_get_cert,
_stream->thrmgr_ref->cert_mgr,
_stream->ssl_downstream->sni,
_stream->ssl_downstream->keyring_id,
_stream->ssl_upstream->orig_cert);
#endif
future_destroy(_stream->ssl_upstream->conn_ssl_srv);
_stream->ssl_upstream->conn_ssl_srv = NULL;
}
void ssl_conn_origin_on_fail(enum e_future_error err, const char * what, void * user)
{
//TODO:
assert(0);
}
void peek_client_hello_on_succ(void * result, void * user)
{
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
assert(_stream->session_type == SESSION_PROTO_SSL);
struct ssl_downstream * ssl_downstream = _stream->ssl_downstream;
struct ssl_upstream * ssl_upstream = _stream->ssl_upstream;
_stream->ssl_downstream->sni = tfe_strdup((const char *) result);
future_destroy(ssl_downstream->future_sni_peek);
ssl_downstream->future_sni_peek = NULL;
_stream->ssl_upstream = ALLOC(struct ssl_upstream, 1);
_stream->ssl_upstream->conn_ssl_srv = future_create(ssl_conn_origin_on_succ, ssl_conn_origin_on_fail, _stream);
ssl_async_upstream_create(_stream->ssl_upstream->conn_ssl_srv, _stream->fd_upstream, _stream->ssl_downstream->sni,
_stream->thrmgr_ref->evbase, NULL);
}
void peek_client_hello_on_fail(enum e_future_error err, const char * what, void * user)
{
//TODO:
assert(0);
}
int ssl_stream_setup(struct tfe_stream_private * _stream)
{
return 0;
}
int __plain_stream_conn_private_init(struct tfe_stream_private * _stream,
struct tfe_conn_private * _conn, evutil_socket_t fd)
{
struct tfe_proxy * proxy = _stream->proxy;
struct event_base * ev_base = _stream->thrmgr_ref->evbase;
_conn->bev = bufferevent_socket_new(ev_base, fd, BEV_OPT_DEFER_CALLBACKS)
_conn->fd = fd;
_conn->closed = 0;
_conn->need_shutdown = 0;
_conn->on_writing = 0;
}
int plain_stream_setup(struct tfe_stream_private * _stream)
{
return 0;
}
void tfe_stream_setup(struct tfe_stream_private * _stream)
{
struct future * f_sni = NULL;
tfe_thread_ctx * thread = _stream->thrmgr_ref;
if (_stream->session_type == SESSION_PROTO_SSL)
{
_stream->ssl_downstream = ssl_downstream_create();
_stream->async_future = future_create(peek_client_hello_on_succ, peek_client_hello_on_fail, _stream);
ssl_async_peek_client_hello(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream,
_stream->thrmgr_ref->evbase);
}
else if (_stream->session_type == SESSION_PROTO_PLAIN)
{
bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream);
bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream);
bufferevent_enable(_stream->head.upstream.bev, EV_READ | EV_WRITE);
bufferevent_enable(_stream->head.downstream.bev, EV_READ | EV_WRITE);
}
else
{
}
switch (_stream->session_type)
{
case SESSION_PROTO_SSL:
// for SSL, defer dst connection setup to initial_readcb
thread->stat.value[SSL_NUM]++;
break;
default:
//todo:
stream_fd_readcb(_stream->fd_downstream, 0, _stream);
break;
}
return;
}
struct tfe_stream_private * tfe_stream_create(evutil_socket_t fd_downstream, evutil_socket_t fd_upstream,
enum tfe_session_proto session_type, tfe_thread_ctx * thread)
{
struct tfe_stream_private * conn_private = NULL;
struct tfe_stream * conn_public = NULL;
conn_private = ALLOC(struct tfe_stream_private, 1);
conn_private->session_type = session_type;
conn_private->fd_downstream = fd_downstream;
conn_private->fd_upstream = fd_upstream;
conn_private->thrmgr_ref = thread;
conn_private->is_plugin_opened = 0;
conn_public = &(conn_private->head);
thread->stat.value[STREAM_NUM]++;
return conn_private;
}
/* vim: set noet ft=c: */