diff --git a/CMakeLists.txt b/CMakeLists.txt index f8055bb..054812f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ project(tfe) set(CMAKE_CXX_STANDARD 11) set(CMAKE_C_STANDARD 11) +set(CMAKE_POSITION_INDEPENDENT_CODE ON) add_subdirectory(vendor) add_subdirectory(common) diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index babf6b6..435f359 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(common src/tfe_stat.cpp src/tfe_utils.cpp) +add_library(common src/tfe_stat.cpp src/tfe_utils.cpp src/tfe_future.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(common MESA_handle_logger) diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 21f888c..91063c6 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -15,6 +15,13 @@ #define ALLOC(type, number) ((type *)calloc(sizeof(type), number)) +#define TFE_STRUCT_ALLOC(struct_type) __extension__ \ +({ \ + ((struct_type) * ) __struct_ptr; \ + __struct_ptr = ((struct_type) *)malloc(sizeof((struct_type))); \ + __struct_ptr; \ +}) + #define likely(expr) __builtin_expect((expr), 1) #define unlikely(expr) __builtin_expect((expr), 0) @@ -27,6 +34,33 @@ do { MESA_handle_runtime_log(handler, RLOG_LV_INFO, NULL, fmt, ##__VA_ARGS__); } #define TFE_LOG_DEBUG(handler, fmt, ...) \ do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, NULL, fmt, ##__VA_ARGS__); } while(0) \ +#ifndef offsetof +/** Return the offset of a field in a structure. */ +#define offsetof(TYPE, MEMBER) __builtin_offsetof (TYPE, MEMBER) +#endif + +/** + * Return pointer to the wrapping struct instance. + * + * Example: + * + * struct wrapper { + * ... + * struct child c; + * ... + * }; + * + * struct child *x = obtain(...); + * struct wrapper *w = container_of(x, struct wrapper, c); + */ +#ifndef container_of +#define container_of(ptr, type, member) __extension__ ({ \ + const typeof(((type *)0)->member) *_ptr = (ptr); \ + __attribute__((unused)) type *_target_ptr = (type *)(ptr); \ + (type *)(((uintptr_t)_ptr) - offsetof(type, member)); \ + }) +#endif + int addr_sock_to_layer(struct sockaddr * sock_addr, int sockaddrlen, struct layer_addr * layer_addr); int addr_layer_to_sock(struct layer_addr * layer_addr, struct sockaddr * sock_addr); char* tfe_strdup(const char* s); diff --git a/platform/src/future.cpp b/common/src/tfe_future.cpp similarity index 100% rename from platform/src/future.cpp rename to common/src/tfe_future.cpp diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index cd64283..76f891c 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -1,7 +1,10 @@ -add_executable(tfe src/cert.cpp src/future.cpp src/kni.cpp src/tfe_stream.cpp src/main.cpp src/proxy.cpp) +add_executable(tfe src/key_keeper.cpp src/kni_acceptor.cpp src/ssl_stream.cpp src/ssl_sess_cache.cpp + src/tcp_stream.cpp src/main.cpp src/proxy.cpp) + target_include_directories(tfe PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/external) target_include_directories(tfe PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include/internal) target_link_libraries(tfe common) target_link_libraries(tfe pthread dl openssl-ssl-static openssl-crypto-static pthread libevent-static - libevent-static-openssl libevent-static-pthreads MESA_handle_logger MESA_prof_load wiredcfg) + libevent-static-openssl libevent-static-pthreads MESA_handle_logger MESA_prof_load MESA_htable wiredcfg) + diff --git a/platform/include/internal/key_keeper.h b/platform/include/internal/key_keeper.h index 5b3b77e..fbed3c0 100644 --- a/platform/include/internal/key_keeper.h +++ b/platform/include/internal/key_keeper.h @@ -1,8 +1,8 @@ -#ifndef CERT_H -#define CERT_H +#pragma once #include #include +#include struct keyring { @@ -16,6 +16,5 @@ struct key_keeper * key_keeper_destroy(struct key_keeper *keeper); struct keyring* key_keeper_release_cert(future_result_t* result); void key_keeper_free_keyring(struct keyring* cert); -void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, int keyring_id, +void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, int keyring_id, X509 * origin_cert, int is_cert_valid, struct event_base * evbase); -#endif /* !CERT_H */ diff --git a/platform/include/internal/kni.h b/platform/include/internal/kni_acceptor.h similarity index 100% rename from platform/include/internal/kni.h rename to platform/include/internal/kni_acceptor.h diff --git a/platform/include/internal/stream.h b/platform/include/internal/platform.h similarity index 54% rename from platform/include/internal/stream.h rename to platform/include/internal/platform.h index 90110db..39180bb 100644 --- a/platform/include/internal/stream.h +++ b/platform/include/internal/platform.h @@ -5,7 +5,8 @@ #include #include -#include +#include +#include struct tfe_thread_ctx { @@ -26,26 +27,6 @@ struct tfe_thread_ctx const struct tfe_plugin * modules; }; -//Downstream: comunication form client to proxy -//Upstream: communication form proxy to server -struct ssl_downstream -{ - /* server name indicated by client in SNI TLS extension */ - char * sni; - SSL * ssl; - X509 * fake_cert_ref;//? - int keyring_id; - struct future * future_sni_peek; - struct future * future_get_cert; -}; - -struct ssl_upstream -{ - X509 * orig_cert; - SSL * ssl; - struct future * conn_ssl_srv; -}; - enum tfe_plugin_state { PLUG_STATE_READONLY, @@ -78,22 +59,18 @@ struct tfe_conn_private struct tfe_stream_private { struct tfe_stream head; - struct tfe_proxy *proxy; + struct tfe_proxy * proxy_ref; + struct tfe_thread_ctx * thread_ref; enum tfe_session_proto session_type; - struct tfe_conn_private conn_upstream; - struct tfe_conn_private conn_downstream; + struct tfe_conn_private * conn_upstream; + struct tfe_conn_private * conn_downstream; - union + struct { - struct ssl_downstream * ssl_downstream; - void * raw_downstream; - }; - - union - { - struct ssl_upstream * ssl_upstream; - void * raw_upstream; + struct ssl_mgr * ssl_mgr; + struct ssl_stream * ssl_downstream; + struct ssl_stream * ssl_upstream; }; uint8_t is_plugin_opened; @@ -109,15 +86,17 @@ struct tfe_stream_private struct plugin_ctx * plug_ctx; unsigned char passthrough; /* 1 if SSL passthrough is active */ - evutil_socket_t fd_downstream; - evutil_socket_t fd_upstream; + /* For defer connection setup */ + evutil_socket_t defer_fd_downstream; + evutil_socket_t defer_fd_upstream; - struct tfe_thread_ctx * thrmgr_ref; - future * async_future; + /* ASYNC UPSTREAM */ + future * future_upstream_create; + /* ASYNC DOWNSTREAM */ + future * future_downstream_create; }; -struct tfe_stream_private * tfe_stream_create(evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, - enum tfe_session_proto session_type, tfe_thread_ctx * thread); - -void tfe_stream_setup(struct tfe_stream_private * _stream); - +static inline void * __STREAM_LOGGER(struct tfe_stream_private * _stream) +{ + return _stream->proxy_ref->main_logger; +} diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 79cfacd..b620e28 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -2,8 +2,32 @@ #include #include +#include + +struct ssl_mgr; +struct key_keeper; + +struct tfe_proxy +{ + char name[TFE_SYMBOL_MAX]; + struct event_base * evbase; + struct event * sev[8]; + struct event * gcev; + + struct tfe_config * opts; + void * main_logger; + + unsigned int nr_work_threads; + struct tfe_thread_ctx * work_threads; + + unsigned int nr_modules; + struct tfe_plugin * modules; + void * io_mod; + + struct ssl_mgr * ssl_mgr_handler; + struct key_keeper * key_keeper_handler; +}; -struct tfe_proxy; struct tfe_proxy_accept_para { /* Both upstream and downstream FDs */ @@ -17,3 +41,4 @@ struct tfe_proxy_accept_para struct tfe_proxy * tfe_proxy_new(const char * profile); int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_para * para); void tfe_proxy_run(struct tfe_proxy * proxy); + diff --git a/platform/include/internal/ssl_sess_cache.h b/platform/include/internal/ssl_sess_cache.h new file mode 100644 index 0000000..24aad61 --- /dev/null +++ b/platform/include/internal/ssl_sess_cache.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include +#include + +struct sess_cache; +struct sess_cache * ssl_sess_cache_create(unsigned int slot_size, unsigned int expire_seconds, enum tfe_conn_dir served); +void ssl_sess_cache_destroy(struct sess_cache * cache); + +void up_session_set(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni, SSL_SESSION * value); +SSL_SESSION * up_session_get(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni); + +void down_session_set(struct sess_cache * cache, const SSL_SESSION * sess); +void down_session_del(struct sess_cache * cache, const SSL_SESSION * sess); +SSL_SESSION * down_session_get(struct sess_cache * cache, const unsigned char * id, int idlen); diff --git a/platform/include/internal/ssl_stream.h b/platform/include/internal/ssl_stream.h index 9242d00..8f3e46c 100644 --- a/platform/include/internal/ssl_stream.h +++ b/platform/include/internal/ssl_stream.h @@ -1,22 +1,29 @@ #pragma once #include #include -#include +#include +#include +#include +#include +#include + +#include struct ssl_stream; struct ssl_mgr; -struct ssl_mgr* ssl_manager_init(const char* ini_profile, const char* section, struct event_base *evbase, void* logger, screen_stat_handle_t* fs); -void ssl_manager_destroy(struct ssl_mgr* mgr); +struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct event_base * evbase, + void * logger, screen_stat_handle_t * fs); +void ssl_manager_destroy(struct ssl_mgr * mgr); -struct ssl_stream* ssl_upstream_create_result_release_stream(future_result_t* result); -struct bufferevent* ssl_upstream_create_result_release_bev(future_result_t* result); -void ssl_async_upstream_create(struct future* f, struct ssl_mgr* mgr, evutil_socket_t fd_upstream, evutil_socket_t fd_downstream, struct event_base *evbase); - -struct ssl_stream* ssl_downstream_create_result_release_stream(future_result_t* result); -struct bufferevent* ssl_downstream_create_result_release_bev(future_result_t* result); -void ssl_async_downstream_create(struct future* f, struct ssl_mgr* mgr, struct ssl_stream* upstream, evutil_socket_t fd_downstream, int keyring_id, struct event_base *evbase); - -void ssl_stream_free_and_close_fd(struct ssl_stream* stream, struct event_base *evbase, evutil_socket_t fd); +struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result); +struct bufferevent * ssl_upstream_create_result_release_bev(future_result_t * result); +void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream, + evutil_socket_t fd_downstream, struct event_base * evbase); +struct ssl_stream * ssl_downstream_create_result_release_stream(future_result_t * result); +struct bufferevent * ssl_downstream_create_result_release_bev(future_result_t * result); +void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream, + evutil_socket_t fd_downstream, int keyring_id, struct event_base * evbase); +void ssl_stream_free_and_close_fd(struct ssl_stream * stream, struct event_base * evbase, evutil_socket_t fd); diff --git a/platform/include/internal/tcp_stream.h b/platform/include/internal/tcp_stream.h new file mode 100644 index 0000000..fc052a0 --- /dev/null +++ b/platform/include/internal/tcp_stream.h @@ -0,0 +1,8 @@ +#pragma once + +#include + +struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx); +void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type, + evutil_socket_t fd_downstream, evutil_socket_t fd_upstream); +void tfe_stream_destory(struct tfe_stream_private * stream); diff --git a/platform/src/kni.cpp b/platform/src/kni_acceptor.cpp similarity index 99% rename from platform/src/kni.cpp rename to platform/src/kni_acceptor.cpp index 4a74a9f..f165bd5 100644 --- a/platform/src/kni.cpp +++ b/platform/src/kni_acceptor.cpp @@ -5,15 +5,18 @@ #include #include #include - -#include -#include -#include -#include #include #include + +#include +#include + +#include + +#include +#include #include -#include +#include #ifndef TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT #define TFE_CONFIG_KNI_UXDOMAIN_PATH_DEFAULT "/var/run/.tfe_kni_acceptor_handler" diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index a79e449..2c90b4e 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -24,34 +24,14 @@ #include #include #include -#include +#include #include -#include -#include +#include +#include static int signals[] = {SIGTERM, SIGQUIT, SIGHUP, SIGINT, SIGPIPE, SIGUSR1}; -struct tfe_proxy -{ - char name[TFE_SYMBOL_MAX]; - struct event_base * evbase; - struct event * sev[sizeof(signals) / sizeof(int)]; - struct event * gcev; - struct tfe_config * opts; - void * main_logger; - - struct sess_cache * dsess_cache; - struct sess_cache * ssess_cache; - - unsigned int nr_work_threads; - struct tfe_thread_ctx * work_threads; - - unsigned int nr_modules; - struct tfe_plugin * modules; - - void * io_mod; -}; const char * module_name_pxy = "TFE_PXY"; extern struct tfe_instance * g_tfe_instance; @@ -142,14 +122,10 @@ int tfe_proxy_fds_accept(struct tfe_proxy * ctx, const struct tfe_proxy_accept_p unsigned int worker_tid = select_work_thread(ctx); tfe_thread_ctx * worker_thread_ctx = &ctx->work_threads[worker_tid]; - struct tfe_stream_private * stream = tfe_stream_create(para->upstream_fd, - para->downstream_fd, para->session_type, worker_thread_ctx); + struct tfe_stream * stream = tfe_stream_create(ctx, worker_thread_ctx); + tfe_stream_init_by_fds(stream, para->session_type, para->downstream_fd, para->upstream_fd); - if (stream == NULL) goto __errout; - tfe_stream_setup(stream); - -__errout: - return -1; + return 0; } /* @@ -169,9 +145,6 @@ struct tfe_proxy * tfe_proxy_new(const char * profile) event_enable_debug_mode(); proxy->evbase = event_base_new(); - proxy->dsess_cache = session_cache_init(); - proxy->ssess_cache = session_cache_init(); - proxy->nr_modules = 2; proxy->modules = ALLOC(struct tfe_plugin, proxy->nr_modules); @@ -185,8 +158,6 @@ struct tfe_proxy * tfe_proxy_new(const char * profile) { proxy->work_threads[i].thread_id = i; proxy->work_threads[i].evbase = event_base_new(); - proxy->work_threads[i].dsess_cache = proxy->dsess_cache; - proxy->work_threads[i].ssess_cache = proxy->ssess_cache; proxy->work_threads[i].nr_modules = proxy->nr_modules; proxy->work_threads[i].modules = proxy->modules; } diff --git a/platform/src/ssl_sess_cache.cpp b/platform/src/ssl_sess_cache.cpp index 3f05093..42822bc 100644 --- a/platform/src/ssl_sess_cache.cpp +++ b/platform/src/ssl_sess_cache.cpp @@ -2,72 +2,85 @@ #include #include -#include -#include -#define SESS_CACHE_NOT_FOUND -1 -#define SESS_CACHE_FOUND 0 -#define SESS_CACHE_UPDATE_OLD 1 -#define SESS_CACHE_ADD_NEW 2 -#define SESS_CACHE_INVALID 3 +#include +#include + +#define SESS_CACHE_NOT_FOUND -1 +#define SESS_CACHE_FOUND 0 +#define SESS_CACHE_UPDATE_OLD 1 +#define SESS_CACHE_ADD_NEW 2 +#define SESS_CACHE_INVALID 3 struct asn1_sess { - unsigned char* buff; + unsigned char * buff; size_t size; }; + struct sess_set_args { MESA_htable_handle hash; - struct asn1_sess* new_sess; + struct asn1_sess * new_sess; }; + struct sess_cache { enum tfe_conn_dir served_for; MESA_htable_handle hash; - long long hit_cnt, miss_cnt,del_err; + long long hit_cnt, miss_cnt, del_err; }; -static void ssl_sess_free_serialized(void *data) +static void ssl_sess_free_serialized(void * data) { - struct asn1_sess* p=(struct asn1_sess*)data; + struct asn1_sess * p = (struct asn1_sess *) data; free(p->buff); - p->size=0; + p->size = 0; free(p); return; } -static struct asn1_sess* ssl_sess_serialize(SSL_SESSION *sess) + +static struct asn1_sess * ssl_sess_serialize(SSL_SESSION * sess) { - struct asn1_sess* result=ALLOC(struct asn1_sess,1); - result->size = i2d_SSL_SESSION(sess, NULL); - /*When using i2d_SSL_SESSION(), the memory location pointed to by pp must be large enough to hold the binary representation of the session. - There is no known limit on the size of the created ASN1 representation, so the necessary amount of space should be obtained by first calling - i2d_SSL_SESSION() with pp=NULL, and obtain the size needed, then allocate the memory and call i2d_SSL_SESSION() again.*/ - result->buff=ALLOC(unsigned char,result->size); + struct asn1_sess * result = ALLOC(struct asn1_sess, 1); + + int __i2d_size = i2d_SSL_SESSION(sess, NULL); + result->size = (size_t) __i2d_size; + assert(__i2d_size > 0); + + /* When using i2d_SSL_SESSION(), the memory location pointed to by pp must be large enough to + * hold the binary representation of the session. There is no known limit on the size of the + * created ASN1 representation, so the necessary amount of space should be obtained by first + * calling i2d_SSL_SESSION() with pp=NULL, and obtain the size needed, + * then allocate the memory and call i2d_SSL_SESSION() again.*/ + + result->buff = ALLOC(unsigned char, result->size); i2d_SSL_SESSION(sess, &(result->buff)); return result; } -static SSL_SESSION * ssl_sess_deserialize(const struct asn1_sess* asn1) + +static SSL_SESSION * ssl_sess_deserialize(const struct asn1_sess * asn1) { - SSL_SESSION *sess=NULL; - sess = d2i_SSL_SESSION(NULL, &(asn1->buff), asn1->size); /* increments asn1 */ + SSL_SESSION * sess = NULL; + d2i_SSL_SESSION(&sess, (const unsigned char **) &(asn1->buff), (long) asn1->size); /* increments asn1 */ return sess; } -static int ssl_sess_varify_cb(void *data, int eliminate_type) + +static int ssl_sess_verify_cb(void * data, int eliminate_type) { - SSL_SESSION *sess=NULL; - int ret=0; - const struct asn1_sess* asn1=(struct asn1_sess*)data; - if(eliminate_type==ELIMINATE_TYPE_NUM) + const struct asn1_sess * asn1 = (struct asn1_sess *) data; + if (eliminate_type == ELIMINATE_TYPE_NUM) { - return 1; //direct expired. + return 1; //direct expired. } - sess=ssl_sess_deserialize(asn1); - ret=ssl_session_is_valid(sess); + + SSL_SESSION * sess = ssl_sess_deserialize(asn1); + int ret = ssl_session_is_valid(sess); SSL_SESSION_free(sess); - if(ret==0) + + if (ret == 0) { - return 1; //should be expired (deleted). + return 1; //should be expired (deleted). } else { @@ -75,126 +88,134 @@ static int ssl_sess_varify_cb(void *data, int eliminate_type) } } -static long sess_cache_get_cb(void *data, const uchar *key, uint size, void *user_arg) +static long sess_cache_get_cb(void * data, const uchar * key, uint size, void * user_arg) { - SSL_SESSION *sess=NULL; - int is_valid=0; - if(data==NULL) + SSL_SESSION * sess = NULL; + int is_valid = 0; + if (data == NULL) { return SESS_CACHE_NOT_FOUND; } - const struct asn1_sess* asn1=(struct asn1_sess*)data; - sess=ssl_sess_deserialize(data,asn1); - is_valid=ssl_session_is_valid(sess); - if(is_valid==0) + + const struct asn1_sess * asn1 = (struct asn1_sess *) data; + sess = ssl_sess_deserialize(asn1); + is_valid = ssl_session_is_valid(sess); + + if (is_valid == 0) { SSL_SESSION_free(sess); return SESS_CACHE_INVALID; } else { - *(SSL_SESSION **)user_arg=sess; + *(SSL_SESSION **) user_arg = sess; return SESS_CACHE_FOUND; } } -static long sess_cache_set_cb(void *data, const uchar *key, uint size, void *user_arg) +static long sess_cache_set_cb(void * data, const uchar * key, uint size, void * user_arg) { - - struct sess_set_args* args=(struct sess_set_args*)user_arg; - struct asn1_sess* new_asn1=args->new_sess; - struct asn1_sess *cur_asn1=(struct asn1_sess*)data; - int ret=0; - if(cur_asn1!=NULL) + struct sess_set_args * args = (struct sess_set_args *) user_arg; + struct asn1_sess * new_asn1 = args->new_sess; + struct asn1_sess * cur_asn1 = (struct asn1_sess *) data; + + int ret = 0; + if (cur_asn1 != NULL) { free(cur_asn1->buff); - cur_asn1->size=new_asn1->size; - cur_asn1->buff=ALLOC(unsigned char, cur_asn1->size); - memcpy(cur_asn1->buff,new_asn1->buff,cur_asn1->size); + cur_asn1->size = new_asn1->size; + cur_asn1->buff = ALLOC(unsigned char, cur_asn1->size); + memcpy(cur_asn1->buff, new_asn1->buff, cur_asn1->size); return SESS_CACHE_UPDATE_OLD; } else { - ret=MESA_htable_add(args->hash, key, size, new_asn1); - assert(ret>=0); + ret = MESA_htable_add(args->hash, key, size, new_asn1); + assert(ret >= 0); return SESS_CACHE_ADD_NEW; } } -static int upsess_mk_key(struct sockaddr * addr, socklen_t addr_len, const char* sni, unsigned char** key_buf) +static size_t upsess_mk_key(struct sockaddr * addr, socklen_t addrlen, const char * sni, unsigned char ** key_buf) { - size_t key_size=0; - unsigned char* tmp=NULL, p=NULL; + size_t key_size = 0; + unsigned char * tmp = NULL; size_t tmp_size; - dynbuf_t tmp, *db; short port; size_t snilen; - switch (((struct sockaddr_storage *)addr)->ss_family) { + switch (addr->sa_family) + { case AF_INET: - tmp = (unsigned char *) - &((struct sockaddr_in*)addr)->sin_addr; + tmp = (unsigned char *)&((struct sockaddr_in *) addr)->sin_addr; tmp_size = sizeof(struct in_addr); - port = ((struct sockaddr_in*)addr)->sin_port; + port = ((struct sockaddr_in *) addr)->sin_port; break; case AF_INET6: - tmp = (unsigned char *) - &((struct sockaddr_in6*)addr)->sin6_addr; + tmp = (unsigned char *)&((struct sockaddr_in6 *) addr)->sin6_addr; tmp_size = sizeof(struct in6_addr); - port = ((struct sockaddr_in6*)addr)->sin6_port; + port = ((struct sockaddr_in6 *) addr)->sin6_port; break; default: //should never happens. assert(0); break; } - snilen = sni ? strlen(sni) : 0; - key_size=tmp_size+sizeof(port)+snilen; - *key_buf=ALLOC(unsigned char, key_size); - p=*key_buff; - memcpy(p,tmp,tmp_size); - p+=tmp_size; - memcpy(p, (char*)&port, sizeof(port)); - p+=sizeof(port); - return key_size; + snilen = sni ? strlen(sni) : 0; + key_size = tmp_size + sizeof(port) + snilen; + + *key_buf = ALLOC(unsigned char, key_size); + unsigned char * p = *key_buf; + + memcpy(p, tmp, tmp_size); + p += tmp_size; + memcpy(p, (char *) &port, sizeof(port)); + p += sizeof(port); + return key_size; } -void up_session_set(struct sess_cache* cache, struct sockaddr * addr, socklen_t addr_len, const char* sni, SSL_SESSION * sess) + +void up_session_set(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni, + SSL_SESSION * sess) { - char* key=NULL; - int ret=0; - size_t key_size=0; - long cb_ret=0; - void* no_use=NULL; - assert(cache->served_for==CONN_DIR_UPSTREAM); - key_size=upsess_mk_key(addr, addr_len, sni, &key); - struct asn1_sess* asn1=NULL; - asn1=ssl_sess_serialize(sess); - + unsigned char * key = NULL; + int ret = 0; + size_t key_size = 0; + long cb_ret = 0; + void * no_use = NULL; + assert(cache->served_for == CONN_DIR_UPSTREAM); + key_size = upsess_mk_key(addr, addr_len, sni, &key); + + struct asn1_sess * asn1 = NULL; + asn1 = ssl_sess_serialize(sess); + struct sess_set_args set_args; - set_args.hash=cache->hash; - set_args.new_sess=asn1; - no_use=MESA_htable_search_cb(cache->hash, key, key_size, sess_cache_set_cb, &set_args,&cb_ret); - if(cb_ret==SESS_CACHE_UPDATE_OLD) + set_args.hash = cache->hash; + set_args.new_sess = asn1; + no_use = MESA_htable_search_cb(cache->hash, key, key_size, sess_cache_set_cb, &set_args, &cb_ret); + if (cb_ret == SESS_CACHE_UPDATE_OLD) { ssl_sess_free_serialized(asn1); } free(key); return; } -SSL_SESSION* up_session_get(struct sess_cache* cache, struct sockaddr * addr, socklen_t addr_len, const char* sni) + +SSL_SESSION * up_session_get(struct sess_cache * cache, struct sockaddr * addr, socklen_t addr_len, const char * sni) { - SSL_SESSION* sess=NULL; - void* no_use=NULL; - long cb_ret=0; - char* key=NULL; - size_t key_size=0; - assert(cache->served_for==CONN_DIR_UPSTREAM); - key_size=upsess_mk_key(addr, addr_len, sni, &key); - no_use=MESA_htable_search_cb(cache->hash, key, key_size,sess_cache_get_cb, &sess, &cb_ret); + SSL_SESSION * sess = NULL; + void * no_use = NULL; + long cb_ret = 0; + + size_t key_size = 0; + assert(cache->served_for == CONN_DIR_UPSTREAM); + + unsigned char * key = NULL; + key_size = upsess_mk_key(addr, addr_len, sni, &key); + no_use = MESA_htable_search_cb(cache->hash, key, key_size, sess_cache_get_cb, &sess, &cb_ret); free(key); - key=NULL; - if(cb_ret==1) + key = NULL; + if (cb_ret == 1) { cache->hit_cnt++; return sess; @@ -206,39 +227,41 @@ SSL_SESSION* up_session_get(struct sess_cache* cache, struct sockaddr * } } -void down_session_set(struct sess_cache* cache, const SSL_SESSION* sess) +void down_session_set(struct sess_cache * cache, const SSL_SESSION * sess) { - unsigned int idlen=0; - struct asn1_sess* asn1=NULL; - long cb_ret=0; - void* no_use=NULL; - int ret=0; - assert(cache->served_for==CONN_DIR_DOWNSTREAM); - asn1=ssl_sess_serialize(sess); + unsigned int idlen = 0; + struct asn1_sess * asn1 = NULL; + long cb_ret = 0; + void * no_use = NULL; + int ret = 0; + assert(cache->served_for == CONN_DIR_DOWNSTREAM); + asn1 = ssl_sess_serialize((SSL_SESSION *) sess); + /* * SSL_SESSION_get_id() returns a pointer to the internal session id value for the session s. * The length of the id in bytes is stored in *idlen. The length may be 0. * The caller should not free the returned pointer directly. */ - const unsigned char* id = SSL_SESSION_get_id(sess, &idlen); - struct sess_set_args set_args; - set_args.hash=cache->hash; - set_args.new_sess=asn1; - no_use=MESA_htable_search_cb(cache->hash, id, (unsigned int)idlen, sess_cache_set_cb, &set_args,&cb_ret); - if(cb_ret==SESS_CACHE_UPDATE_OLD) + + const unsigned char * id = SSL_SESSION_get_id(sess, &idlen); + struct sess_set_args set_args{.hash = cache->hash, .new_sess = asn1}; + + no_use = MESA_htable_search_cb(cache->hash, id, (unsigned int) idlen, sess_cache_set_cb, &set_args, &cb_ret); + if (cb_ret == SESS_CACHE_UPDATE_OLD) { ssl_sess_free_serialized(asn1); } return; } -SSL_SESSION* down_session_get(struct sess_cache* cache, unsigned char * id, int idlen) + +SSL_SESSION * down_session_get(struct sess_cache * cache, const unsigned char * id, int idlen) { - SSL_SESSION* sess=NULL; - void* no_use=NULL; - long cb_ret=0; - assert(cache->served_for==CONN_DIR_DOWNSTREAM); - no_use=MESA_htable_search_cb(cache->hash, id, (unsigned int)idlen, sess_cache_get_cb, &sess,&cb_ret); - if(cb_ret==1) + SSL_SESSION * sess = NULL; + void * no_use = NULL; + long cb_ret = 0; + assert(cache->served_for == CONN_DIR_DOWNSTREAM); + no_use = MESA_htable_search_cb(cache->hash, id, (unsigned int) idlen, sess_cache_get_cb, &sess, &cb_ret); + if (cb_ret == 1) { cache->hit_cnt++; return sess; @@ -250,49 +273,66 @@ SSL_SESSION* down_session_get(struct sess_cache* cache, unsigned char } } -void down_session_del(struct sess_cache* cache, const SSL_SESSION* sess) +void down_session_del(struct sess_cache * cache, const SSL_SESSION * sess) { - assert(cache->served_for==CONN_DIR_DOWNSTREAM); - unsigned int len=0; - const unsigned char* id = SSL_SESSION_get_id(sess, &len); - int ret=MESA_htable_del(cache->hash, id, len, NULL); - if(ret!=MESA_HTABLE_RET_OK) + assert(cache->served_for == CONN_DIR_DOWNSTREAM); + unsigned int len = 0; + const unsigned char * id = SSL_SESSION_get_id(sess, &len); + int ret = MESA_htable_del(cache->hash, id, len, NULL); + if (ret != MESA_HTABLE_RET_OK) { cache->del_err++; } return; } -struct sess_cache* ssl_sess_cache_create(int slot_size, int expire_seconds, enum tfe_conn_dir served) + +int __wrapper_MESA_htable_set_opt(MESA_htable_handle table, enum MESA_htable_opt opt_type, unsigned value) { - struct sess_cache* cache=ALLOC(struct sess_cache, 1); - MESA_htable_handle htable=NULL; - int ret=0,max_num=slot_size*4; - htable=MESA_htable_born(); - value=0;//no print - ret=MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, &(value), sizeof(value)); - value=1;//thread safe - ret=MESA_htable_set_opt(htable, MHO_THREAD_SAFE, value, sizeof(value)); - assert(ret==0); - value=16; - ret=MESA_htable_set_opt(htable, MHO_MUTEX_NUM, value, sizeof(value)); - ret=MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, &(slot_size), sizeof(slot_size)); - ret=MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, &(max_num), sizeof(max_num)); - ret=MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, &(expire_seconds), sizeof(expire_seconds)); - value=HASH_ELIMINATE_ALGO_FIFO; - ret=MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, &(value), sizeof(value)); - ret=MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE, ssl_sess_free_serialized, sizeof(ssl_sess_free_serialized)); - ret=MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY, ssl_sess_varify_cb, sizeof(ssl_sess_varify_cb)); - assert(ret==0); - ret=MESA_htable_mature(htable); - assert(ret==0); - cache->hash=htable; - cache->served_for=served; + int ret = MESA_htable_set_opt(table, opt_type, &value, (int)(sizeof(value))); + assert(ret == 0); + return ret; +} + +int __wrapper_MESA_htable_set_opt(MESA_htable_handle table, enum MESA_htable_opt opt_type, void * val, size_t len) +{ + int ret = MESA_htable_set_opt(table, opt_type, val, (int)len); + assert(ret == 0); + return ret; +} + +struct sess_cache * ssl_sess_cache_create(unsigned int slot_size, unsigned int expire_seconds, enum tfe_conn_dir served) +{ + struct sess_cache * cache = ALLOC(struct sess_cache, 1); + unsigned max_num = slot_size * 4; + int ret = 0; + + MESA_htable_handle htable = MESA_htable_born(); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, 0); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_THREAD_SAFE, 1); + + ret = __wrapper_MESA_htable_set_opt(htable, MHO_MUTEX_NUM, 16); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, slot_size); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, max_num); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, expire_seconds); + + ret = __wrapper_MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, + HASH_ELIMINATE_ALGO_FIFO); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_FREE, + (void *)ssl_sess_free_serialized, sizeof(&ssl_sess_free_serialized)); + ret = __wrapper_MESA_htable_set_opt(htable, MHO_CBFUN_DATA_EXPIRE_NOTIFY, + (void *)ssl_sess_verify_cb, sizeof(&ssl_sess_verify_cb)); + + ret = MESA_htable_mature(htable); + assert(ret == 0); + + cache->hash = htable; + cache->served_for = served; return cache; } -void ssl_sess_cache_destroy(struct sess_cache* cache) +void ssl_sess_cache_destroy(struct sess_cache * cache) { MESA_htable_destroy(cache->hash, NULL); - cache->hash=NULL; + cache->hash = NULL; free(cache); return; } diff --git a/platform/src/ssl_sess_cache.h b/platform/src/ssl_sess_cache.h deleted file mode 100644 index ece9db0..0000000 --- a/platform/src/ssl_sess_cache.h +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once -#include -#include - -#include - -struct sess_cache; -struct sess_cache* ssl_sess_cache_create(int slot_size, int expire_seconds, enum tfe_conn_dir served); -void ssl_sess_cache_destroy(struct sess_cache* cache); - -void up_session_set(struct sess_cache* cache, struct sockaddr * addr, socklen_t addr_len, const char* sni, SSL_SESSION * value) -SSL_SESSION* up_session_get(struct sess_cache* cache, struct sockaddr *addr, socklen_t addr_len, const char* sni); -void down_session_set(struct sess_cache* cache, const SSL_SESSION* sess); -SSL_SESSION* down_session_get(struct sess_cache* cache, unsigned char * id, int idlen); - diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index e254460..d65a99b 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -21,49 +21,45 @@ #include #include -#include -#include -#include -#include -#include - +#include +#include #include #include #include -#include #include #include #include -#include +#include -#define SSL_EX_DATA_IDX_SSLMGR 0 -#define MAX_NET_RETRIES 50 +#define SSL_EX_DATA_IDX_SSLMGR 0 +#define MAX_NET_RETRIES 50 - -struct ssl_mgr { - int sslcomp; - int no_ssl2; - int no_ssl3; - int no_tls10; - int no_tls11; - int no_tls12; - CONST_SSL_METHOD * (*sslmethod) (void); //Parameter of SSL_CTX_new - int sslversion; - char ssl_session_context[8]; - int cache_slot_num; - int sess_expire_seconds; +struct ssl_mgr +{ + int sslcomp; + int no_ssl2; + int no_ssl3; + int no_tls10; + int no_tls11; + int no_tls12; + CONST_SSL_METHOD * (* sslmethod)(void); //Parameter of SSL_CTX_new + int sslversion; + char ssl_session_context[8]; + int cache_slot_num; + int sess_expire_seconds; struct sess_cache * down_sess_cache; struct sess_cache * up_sess_cache; - char default_ciphers[TFE_STRING_MAX]; - DH * dh; - char * ecdhcurve; - char * crlurl; - uint8_t SSL_MODE_RELEASE_BUFFERS; - void * logger; - char trust_CA_file[TFE_STRING_MAX]; - char trust_CA_dir[TFE_STRING_MAX]; - X509_STORE* trust_CA_store; + char default_ciphers[TFE_STRING_MAX]; + DH * dh; + char * ecdhcurve; + char * crlurl; + + uint8_t ssl_mode_release_buffers; + void * logger; + char trust_CA_file[TFE_STRING_MAX]; + char trust_CA_dir[TFE_STRING_MAX]; + X509_STORE * trust_CA_store; struct key_keeper * keeper_of_keys; }; @@ -71,43 +67,45 @@ struct __ssl_stream_debug { evutil_socket_t fd; }; -struct ssl_stream { + +struct ssl_stream +{ enum tfe_conn_dir dir; - SSL * ssl; + SSL * ssl; struct ssl_mgr * mgr; union { - struct ssl_chello * client_hello; //dir=upstream, a little weird, which send by downstream client. - struct keyring* keyring; //dir=downstream. + struct ssl_chello * client_hello; //dir=upstream, a little weird, which send by downstream client. + struct keyring * keyring; //dir=downstream. }; struct __ssl_stream_debug _do_not_use; }; - -struct ssl_chello { +struct ssl_chello +{ //client hello - int version; - char * sni; - char * cipher_suites; + int version; + char * sni; + char * cipher_suites; }; - -struct peek_client_hello_ctx { +struct peek_client_hello_ctx +{ struct ssl_chello chello; - unsigned char sni_peek_retries; /* max 64 SNI parse retries */ + unsigned char sni_peek_retries; /* max 64 SNI parse retries */ struct event * ev; struct event_base * evbase; - void * logger; + void * logger; }; - -struct ssl_connect_origin_ctx { +struct ssl_connect_origin_ctx +{ struct bufferevent * bev; struct ssl_stream * s_stream; struct ssl_mgr * mgr; struct sockaddr addr; - socklen_t addrlen; - void * logger; + socklen_t addrlen; + void * logger; evutil_socket_t fd_upstream; evutil_socket_t fd_downstream; @@ -115,93 +113,122 @@ struct ssl_connect_origin_ctx { struct future * f_peek_chello; }; - -struct ask_keyring_ctx { - int keyring_id; +struct ask_keyring_ctx +{ + int keyring_id; struct ssl_stream * origin_ssl; - X509 * origin_crt; + X509 * origin_crt; int is_origin_crt_vaild; struct ssl_mgr * ssl_mgr; evutil_socket_t fd_downstream; struct event_base * evbase; - + struct future * f_query_cert; struct bufferevent * bev_down; - struct ssl_stream* downstream; + struct ssl_stream * downstream; }; - /* * SSL shutdown context. */ -struct ssl_shutdown_ctx { +struct ssl_shutdown_ctx +{ struct ssl_stream * s_stream; struct event_base * evbase; struct event * ev; - unsigned int retries; + unsigned int retries; }; -struct ssl_stream * ssl_stream_new(struct ssl_mgr * mgr, evutil_socket_t fd, enum tfe_conn_dir dir, struct ssl_chello * client_hello, struct keyring* crt) +static SSL * downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt); +static SSL * upstream_ssl_create(struct ssl_mgr * mgr, const struct ssl_chello * chello, evutil_socket_t fd); +static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr); + +struct ssl_chello * ssl_peek_result_release_chello(future_result_t * result) +{ + struct ssl_chello * p = (struct ssl_chello *) result, * copy = NULL; + copy = ALLOC(struct ssl_chello, 1); + + if (p != NULL) + { + copy->sni = tfe_strdup(p->sni); + copy->cipher_suites = tfe_strdup(p->cipher_suites); + copy->version = p->version; + } + + return copy; +} + +void ssl_free_chello(struct ssl_chello * p) +{ + if (p == NULL) + { + return; + } + + free(p->sni); + p->sni = NULL; + free(p->cipher_suites); + p->cipher_suites = NULL; + free(p); + return; +} + +struct ssl_stream * ssl_stream_new(struct ssl_mgr * mgr, evutil_socket_t fd, enum tfe_conn_dir dir, + struct ssl_chello * client_hello, struct keyring * crt) { struct sockaddr addr; socklen_t addrlen; - int ret=0; + int ret = 0; struct ssl_stream * s_stream = ALLOC(struct ssl_stream, 1); - s_stream->dir=dir; - s_stream->mgr=mgr; - s_stream->_do_not_use.fd=fd; - ret = getpeername(fd , &addr, &addrlen); + s_stream->dir = dir; + s_stream->mgr = mgr; + s_stream->_do_not_use.fd = fd; + ret = getpeername(fd, &addr, &addrlen); assert(ret == 0); switch (dir) { - case CONN_DIR_DOWNSTREAM: - s_stream->ssl= downstream_ssl_create(mgr, crt); - s_stream->keyring=crt; + case CONN_DIR_DOWNSTREAM: s_stream->ssl = downstream_ssl_create(mgr, crt); + s_stream->keyring = crt; break; - case CONN_DIR_UPSTREAM: - s_stream->ssl= upstream_ssl_create(mgr, client_hello, fd); - s_stream->client_hello=client_hello; + case CONN_DIR_UPSTREAM: s_stream->ssl = upstream_ssl_create(mgr, client_hello, fd); + s_stream->client_hello = client_hello; break; - default: - assert(0); + default: assert(0); } return s_stream; } - static void ssl_stream_free(struct ssl_stream * s_stream) { SSL_free(s_stream->ssl); - s_stream->ssl = NULL; + s_stream->ssl = NULL; switch (s_stream->dir) { case CONN_DIR_DOWNSTREAM: - if(s_stream->keyring!=NULL) + if (s_stream->keyring != NULL) { key_keeper_free_keyring(s_stream->keyring); - s_stream->keyring=NULL; + s_stream->keyring = NULL; } break; case CONN_DIR_UPSTREAM: - if(s_stream->client_hello!=NULL) + if (s_stream->client_hello != NULL) { ssl_free_chello(s_stream->client_hello); - s_stream->client_hello=NULL; + s_stream->client_hello = NULL; } break; - default: - assert(0); + default: assert(0); } - s_stream->mgr = NULL; + s_stream->mgr = NULL; free(s_stream); return; } - static int sslver_str2num(const char * version_str) { - int sslversion = -1; + int sslversion = -1; assert(OPENSSL_VERSION_NUMBER >= 0x10100000L); @@ -210,55 +237,55 @@ static int sslver_str2num(const char * version_str) * SSLv2_server_method() and SSLv2_client_method() functions were * removed in OpenSSL 1.1.0. */ - if (!strcmp(version_str, "ssl3")) + if (!strcmp(version_str, "ssl3")) { - sslversion = SSL3_VERSION; + sslversion = SSL3_VERSION; } else if (!strcmp(version_str, "tls10") || !strcmp(version_str, "tls1")) { - sslversion = TLS1_VERSION; + sslversion = TLS1_VERSION; } - else if (!strcmp(version_str, "tls11")) + else if (!strcmp(version_str, "tls11")) { - sslversion = TLS1_1_VERSION; + sslversion = TLS1_1_VERSION; } - else if (!strcmp(version_str, "tls12")) + else if (!strcmp(version_str, "tls12")) { - sslversion = TLS1_2_VERSION; + sslversion = TLS1_2_VERSION; } else { - sslversion = -1; + sslversion = -1; } return sslversion; } - void ssl_manager_destroy(struct ssl_mgr * mgr) { - if(mgr->keeper_of_keys!=NULL) + if (mgr->keeper_of_keys != NULL) { key_keeper_destroy(mgr->keeper_of_keys); } - if(mgr->trust_CA_store!=NULL) + if (mgr->trust_CA_store != NULL) { X509_STORE_free(mgr->trust_CA_store); - mgr->trust_CA_store=NULL; + mgr->trust_CA_store = NULL; } free(mgr); } - -struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, void * logger) { +struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, void * logger) +{ struct ssl_mgr * mgr = ALLOC(struct ssl_mgr, 1); int ret = 0, value = 0; - char version_str[TFE_SYMBOL_MAX]; + char version_str[TFE_SYMBOL_MAX]; mgr->logger = logger; MESA_load_profile_string_def(ini_profile, section, "ssl_version", version_str, sizeof(version_str), "tls12"); mgr->sslversion = sslver_str2num(version_str); - if (mgr->sslversion < 0) { + if (mgr->sslversion < 0) + { TFE_LOG_ERROR(logger, "Unsupported SSL/TLS protocol %s", version_str); goto error_out; } @@ -266,21 +293,18 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section //tfe2a uses SSLv23_method, it was been deprecated and replaced with the TLS_method() in openssl 1.1.0. mgr->sslmethod = TLS_method; - MESA_load_profile_int_def(ini_profile, section, "ssl_compression", & (mgr->sslcomp), 1); - MESA_load_profile_int_def(ini_profile, section, "no_ssl2", & (mgr->no_ssl2), 1); - MESA_load_profile_int_def(ini_profile, section, "no_ssl3", & (mgr->no_ssl3), 1); - MESA_load_profile_int_def(ini_profile, section, "no_tls10", & (mgr->no_tls10), 1); - MESA_load_profile_int_def(ini_profile, section, "no_tls11", & (mgr->no_tls11), 0); - MESA_load_profile_int_def(ini_profile, section, "no_tls12", & (mgr->no_tls12), 0); - MESA_load_profile_int_def(ini_profile, section, "session_cache_slot_num", & (mgr->cache_slot_num), + MESA_load_profile_int_def(ini_profile, section, "ssl_compression", &(mgr->sslcomp), 1); + MESA_load_profile_int_def(ini_profile, section, "no_ssl2", &(mgr->no_ssl2), 1); + MESA_load_profile_int_def(ini_profile, section, "no_ssl3", &(mgr->no_ssl3), 1); + MESA_load_profile_int_def(ini_profile, section, "no_tls10", &(mgr->no_tls10), 1); + MESA_load_profile_int_def(ini_profile, section, "no_tls11", &(mgr->no_tls11), 0); + MESA_load_profile_int_def(ini_profile, section, "no_tls12", &(mgr->no_tls12), 0); + MESA_load_profile_int_def(ini_profile, section, "session_cache_slot_num", &(mgr->cache_slot_num), 4 * 1024 * 1024); - MESA_load_profile_int_def(ini_profile, section, "session_cache_slot_num", & (mgr->sess_expire_seconds), 30 * 60); + MESA_load_profile_int_def(ini_profile, section, "session_cache_slot_num", &(mgr->sess_expire_seconds), 30 * 60); mgr->up_sess_cache = ssl_sess_cache_create(mgr->cache_slot_num, mgr->sess_expire_seconds, CONN_DIR_UPSTREAM); mgr->down_sess_cache = ssl_sess_cache_create(mgr->cache_slot_num, mgr->sess_expire_seconds, CONN_DIR_DOWNSTREAM); - - - mgr->keeper_of_keys = key_keeper_init(ini_profile, section, logger); if (mgr->keeper_of_keys == NULL) @@ -288,142 +312,130 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section TFE_LOG_ERROR(logger, "Certificate Manager initiate failed."); goto error_out; } - mgr->trust_CA_store=X509_STORE_new(); - if(mgr->trust_CA_store==NULL) - { - TFE_LOG_ERROR(logger,"Failed at creating X509_STORE"); - goto error_out; - } - ret=X509_STORE_set_default_paths(mgr->trust_CA_store); - if(ret==0) - { - TFE_LOG_ERROR(logger,"Failed at setting default paths for X509_STORE"); - goto error_out; - } - MESA_load_profile_string_def(ini_profile, section, "trust_CA_file", mgr->trust_CA_file, sizeof(mgr->trust_CA_file), ""); - MESA_load_profile_string_def(ini_profile, section, "trust_CA_dir", mgr->trust_CA_dir, sizeof(mgr->trust_CA_dir), ""); - ret=X509_STORE_load_locations(mgr->trust_CA_store,strlen(mgr->trust_CA_file)>0? mgr->trust_CA_file:NULL - ,strlen(mgr->trust_CA_dir)>0? mgr->trust_CA_dir:NULL); - if(ret==0) + mgr->trust_CA_store = X509_STORE_new(); + if (mgr->trust_CA_store == NULL) { - TFE_LOG_ERROR(logger,"Failed at setting load locations for X509_STORE"); + TFE_LOG_ERROR(logger, "Failed at creating X509_STORE"); goto error_out; } + + ret = X509_STORE_set_default_paths(mgr->trust_CA_store); + if (ret == 0) + { + TFE_LOG_ERROR(logger, "Failed at setting default paths for X509_STORE"); + goto error_out; + } + + MESA_load_profile_string_def(ini_profile, section, "trust_CA_file", mgr->trust_CA_file, sizeof(mgr->trust_CA_file), + ""); + MESA_load_profile_string_def(ini_profile, section, "trust_CA_dir", mgr->trust_CA_dir, sizeof(mgr->trust_CA_dir), + ""); + + ret = X509_STORE_load_locations(mgr->trust_CA_store, strlen(mgr->trust_CA_file) > 0 ? mgr->trust_CA_file : NULL, + strlen(mgr->trust_CA_dir) > 0 ? mgr->trust_CA_dir : NULL); + + if (ret == 0) + { + TFE_LOG_ERROR(logger, "Failed at setting load locations for X509_STORE"); + goto error_out; + } + memcpy(mgr->ssl_session_context, "mesa-tfe", sizeof(mgr->ssl_session_context)); return mgr; error_out: - ssl_manager_destroy(mgr); return NULL; } -int ssl_conn_verify_cert(X509_STORE *store, const SSL * ssl) + +int ssl_conn_verify_cert(X509_STORE * store, const SSL * ssl) { - int ret=0; - STACK_OF(X509) * cert_chain = SSL_get_peer_cert_chain(ssl); - if(cert_chain==NULL) + int ret = 0; + STACK_OF(X509) * cert_chain = SSL_get_peer_cert_chain(ssl); + if (cert_chain == NULL) { // The peer certificate chain is not necessarily available after reusing a session, in which case a NULL pointer is returned. return 1; } - X509_STORE_CTX *ctx=X509_STORE_CTX_new(); + X509_STORE_CTX * ctx = X509_STORE_CTX_new(); X509 * cert = sk_X509_value(cert_chain, 0); - ret=X509_STORE_CTX_init(ctx, store, cert, cert_chain); - assert(ret==1); - + ret = X509_STORE_CTX_init(ctx, store, cert, cert_chain); + assert(ret == 1); + //If a complete chain can be built and validated this function returns 1, otherwise it return zero or negtive code. - ret=X509_verify_cert(ctx); + ret = X509_verify_cert(ctx); X509_STORE_CTX_free(ctx); - return (ret==1); + return (ret == 1); } - -void peek_client_hello_ctx_free(void * ctx) +void peek_client_hello_ctx_free(struct peek_client_hello_ctx * _ctx) { - struct peek_client_hello_ctx * _ctx = (struct peek_client_hello_ctx *)ctx; event_free(_ctx->ev); - _ctx->ev = NULL; + _ctx->ev = NULL; free(_ctx->chello.sni); - _ctx->chello.sni = NULL; + _ctx->chello.sni = NULL; free(_ctx->chello.cipher_suites); _ctx->chello.cipher_suites = NULL; free(_ctx); - return; } - -struct ssl_chello * ssl_peek_result_release_chello(future_result_t * result) +void peek_client_hello_ctx_free(struct promise * p) { - struct ssl_chello * p = (struct ssl_chello *)result, *copy = NULL; - copy = ALLOC(struct ssl_chello, 1); - - if (p != NULL) { - copy->sni = tfe_strdup(p->sni); - copy->cipher_suites = tfe_strdup(p->cipher_suites); - copy->version = p->version; - } - - return copy; -} void ssl_free_chello(struct ssl_chello * p) -{ - if (p == NULL) { - return; - } - - free(p->sni); - p->sni = NULL; - free(p->cipher_suites); - p->cipher_suites = NULL; - free(p); - return; + struct peek_client_hello_ctx * _ctx = (struct peek_client_hello_ctx *) promise_dettach_ctx(p); + return peek_client_hello_ctx_free(_ctx); } - static void peek_client_hello_cb(evutil_socket_t fd, short what, void * arg) { - struct promise * promise = (struct promise *)arg; + struct promise * promise = (struct promise *) arg; //use promise_get_ctx instead of promise_dettach_ctx for try more times. - struct peek_client_hello_ctx * ctx = (struct peek_client_hello_ctx *)promise_get_ctx(promise); - const char * reason_too_many_retries = "too many tries"; - const char * reason_see_no_client_hello = "see no client hello"; - const char * reason = NULL; - char * sni = NULL; - unsigned char buf[1024]; - ssize_t n = 0; + struct peek_client_hello_ctx * ctx = (struct peek_client_hello_ctx *) promise_get_ctx(promise); + const char * reason_too_many_retries = "too many tries"; + const char * reason_see_no_client_hello = "see no client hello"; + const char * reason = NULL; + char * sni = NULL; + unsigned char buf[1024]; + ssize_t n = 0; const unsigned char * chello = NULL; - int rv = 0; + int rv = 0; - n = recv(fd, buf, sizeof(buf), MSG_PEEK); - - if (n == -1) { + n = recv(fd, buf, sizeof(buf), MSG_PEEK); + if (n == -1) + { TFE_LOG_ERROR(ctx->logger, "Error peeking on fd, aborting connection\n"); goto failed; } - if (n == 0) { + if (n == 0) + { goto failed; } //todo: parse version and cipher suites. //or we should use sni proxy instead? https://github.com/dlundquist/sniproxy/blob/master/src/tls.c - rv = ssl_tls_clienthello_parse(buf, n, 0, &chello, & (ctx->chello.sni)); + rv = ssl_tls_clienthello_parse(buf, n, 0, &chello, &(ctx->chello.sni)); - if (rv == 0) { + if (rv == 0) + { promise_dettach_ctx(promise); - promise_success(promise, & (ctx->chello)); + promise_success(promise, &(ctx->chello)); peek_client_hello_ctx_free(ctx); } - else { - if (!chello) { - TFE_LOG_ERROR(ctx->logger, "Peeking did not yield a (truncated) ClientHello message, aborting connection\n"); - reason = "see no client hello"; + else + { + if (!chello) + { + TFE_LOG_ERROR(ctx->logger, + "Peeking did not yield a (truncated) ClientHello message, aborting connection\n"); + reason = "see no client hello"; goto failed; } - if (ctx->sni_peek_retries++ > MAX_NET_RETRIES) { + if (ctx->sni_peek_retries++ > MAX_NET_RETRIES) + { TFE_LOG_ERROR(ctx->logger, "Peek failed due to too many retries\n"); - reason = "too many peek retries"; + reason = "too many peek retries"; goto failed; } @@ -435,13 +447,10 @@ static void peek_client_hello_cb(evutil_socket_t fd, short what, void * arg) * Because we only peeked at the pending bytes and * never actually read them, fd is still ready for * reading now. We use 25 * 0.2 s = 5 s timeout. */ - struct timeval retry_delay = { - 0, 100 - }; - + struct timeval retry_delay = {0, 100}; event_free(ctx->ev); - ctx->ev = event_new(ctx->evbase, fd, 0, peek_client_hello_cb, promise); + ctx->ev = event_new(ctx->evbase, fd, 0, peek_client_hello_cb, promise); assert(ctx->ev != NULL); event_add(ctx->ev, &retry_delay); } @@ -455,121 +464,119 @@ failed: return; } - -static void ssl_async_peek_client_hello(struct future * future, evutil_socket_t fd, struct event_base * evbase, +static void ssl_async_peek_client_hello(struct future * future, evutil_socket_t fd, struct event_base * evbase, void * logger) { struct event * ev = NULL; struct promise * p = future_to_promise(future); struct peek_client_hello_ctx * ctx = ALLOC(struct peek_client_hello_ctx, 1); - ctx->ev = event_new(evbase, fd, EV_READ, peek_client_hello_cb, p); - ctx->logger = logger; - event_add(evbase, NULL); - promise_set_ctx(p, ctx, peek_client_hello_ctx_free); + ctx->ev = event_new(evbase, fd, EV_READ, peek_client_hello_cb, p); + ctx->logger = logger; + event_add(ev, NULL); + promise_set_ctx(p, (void *) ctx, peek_client_hello_ctx_free); return; } - /* * Create new SSL context for outgoing connections to the original destination. * If hostname sni is provided, use it for Server Name Indication. */ -static SSL * upstream_ssl_create(struct ssl_mgr * mgr, const struct ssl_chello * chello, evutil_socket_t fd) +static SSL * upstream_ssl_create(struct ssl_mgr * mgr, const struct ssl_chello * chello, evutil_socket_t fd) { - SSL_CTX * sslctx = NULL; - SSL * ssl = NULL; - SSL_SESSION * sess = NULL; - - - sslctx = SSL_CTX_new(mgr->sslmethod()); + SSL_CTX * sslctx = NULL; + SSL * ssl = NULL; + SSL_SESSION * sess = NULL; + sslctx = SSL_CTX_new(mgr->sslmethod()); sslctx_set_opts(sslctx, mgr); -#if OPENSSL_VERSION_NUMBER >= 0x10100000L - - if (mgr->sslversion) { + if (mgr->sslversion) + { if (SSL_CTX_set_min_proto_version(sslctx, chello->version) == 0 || - SSL_CTX_set_max_proto_version(sslctx, chello->version) == 0) { + SSL_CTX_set_max_proto_version(sslctx, chello->version) == 0) + { SSL_CTX_free(sslctx); return NULL; } } -#endif /* OPENSSL_VERSION_NUMBER >= 0x10100000L */ - SSL_CTX_set_verify(sslctx, SSL_VERIFY_NONE, NULL); + ssl = SSL_new(sslctx); + SSL_CTX_free(sslctx); /* SSL_new() increments refcount */ - ssl = SSL_new(sslctx); - SSL_CTX_free(sslctx); /* SSL_new() increments refcount */ - - if (!ssl) { + if (!ssl) + { return NULL; } -#ifndef OPENSSL_NO_TLSEXT - - if (chello->sni) { - SSL_set_tlsext_host_name(ssl, sni); + if (chello->sni) + { + SSL_set_tlsext_host_name(ssl, chello->sni); } -#endif /* !OPENSSL_NO_TLSEXT */ - -#ifdef SSL_MODE_RELEASE_BUFFERS - /* lower memory footprint for idle connections */ SSL_set_mode(ssl, SSL_get_mode(ssl) | SSL_MODE_RELEASE_BUFFERS); -#endif /* SSL_MODE_RELEASE_BUFFERS */ - struct sockaddr addr; - socklen_t addrlen; - int ret=0; - ret = getpeername(fd , &addr, &addrlen); - assert(ret==0); + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(struct sockaddr_storage); + + int ret = getpeername(fd, (struct sockaddr *)(&addr), &addrlen); + assert(ret == 0); + /* session resuming based on remote endpoint address and port */ - sess = up_session_get(mgr->up_sess_cache, &addr, addrlen, chello->sni); /* new sess insert */ - if (sess) + sess = up_session_get(mgr->up_sess_cache, (struct sockaddr *)&addr, addrlen, chello->sni); /* new sess insert */ + if (sess) { - SSL_set_session(ssl, sess); /* increments sess refcount */ + SSL_set_session(ssl, sess); /* increments sess refcount */ SSL_SESSION_free(sess); } + return ssl; } - void ssl_connect_origin_ctx_free(struct ssl_connect_origin_ctx * ctx) { - if (ctx->s_stream != NULL) { + if (ctx->s_stream != NULL) + { ssl_stream_free(ctx->s_stream); } - if (ctx->bev != NULL) { + if (ctx->bev != NULL) + { bufferevent_free(ctx->bev); - ctx->bev = NULL; + ctx->bev = NULL; } - if (ctx->f_peek_chello != NULL) { + if (ctx->f_peek_chello != NULL) + { future_destroy(ctx->f_peek_chello); - ctx->f_peek_chello = NULL; + ctx->f_peek_chello = NULL; } free(ctx); return; } +void ssl_connect_origin_ctx_free(struct promise * p) +{ + struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) promise_dettach_ctx(p); + ssl_connect_origin_ctx_free(ctx); +} -struct ssl_stream * ssl_conn_origin_result_release_stream(future_result_t * result) { - struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *)result; +struct ssl_stream * ssl_conn_origin_result_release_stream(future_result_t * result) +{ + struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) result; struct ssl_stream * ret = ctx->s_stream; - ctx->s_stream = NULL; //giveup ownership + ctx->s_stream = NULL; //giveup ownership return ret; } - -struct bufferevent * ssl_conn_origin_result_release_bev(future_result_t * result) { - struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *)result; +struct bufferevent * ssl_conn_origin_result_release_bev(future_result_t * result) +{ + struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) result; struct bufferevent * ret = ctx->bev; - ctx->bev = NULL; //giveup ownership + ctx->bev = NULL; //giveup ownership return ret; } @@ -580,86 +587,88 @@ struct bufferevent * ssl_conn_origin_result_release_bev(future_result_t * result */ static void ssl_connect_origin_eventcb(struct bufferevent * bev, short events, void * arg) { - struct promise * promise = (struct promise *)arg; - struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *)promise_dettach_ctx(promise); + struct promise * promise = (struct promise *) arg; + struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) promise_dettach_ctx(promise); struct ssl_stream * s_stream = ctx->s_stream; - SSL_SESSION * ssl_sess = NULL; + SSL_SESSION * ssl_sess = NULL; - if (events & BEV_EVENT_ERROR) { + if (events & BEV_EVENT_ERROR) + { promise_failed(promise, FUTURE_ERROR_EXCEPTION, "connect to orignal server failed."); } - else { - if (events & BEV_EVENT_CONNECTED) { + else + { + if (events & BEV_EVENT_CONNECTED) + { bufferevent_setcb(ctx->bev, NULL, NULL, NULL, NULL); //leave a clean bev for on_success - ssl_sess = SSL_get0_session(s_stream->ssl); - up_session_set(s_stream->mgr->up_sess_cache, & (ctx->addr), ctx->addrlen, s_stream->client_hello.sni, + ssl_sess = SSL_get0_session(s_stream->ssl); + up_session_set(s_stream->mgr->up_sess_cache, &(ctx->addr), ctx->addrlen, s_stream->client_hello->sni, ssl_sess); promise_success(promise, ctx); } - else { + else + { assert(0); } } ssl_connect_origin_ctx_free(ctx); - return -} - - -static void peek_chello_on_succ(future_result_t * result, void * user) -{ - struct promise* p= (struct promise*)user; - struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) promise_dettach_ctx(p); - - struct ssl_chello * chello = ssl_peek_result_release_chello(result);//chello has been saved in ssl_stream. - ctx->s_stream = ssl_stream_new(ctx->mgr, ctx->fd_upstream, CONN_DIR_UPSTREAM, chello, NULL); - ctx->bev = bufferevent_openssl_socket_new(ctx->evbase, ctx->fd_upstream, ctx->s_stream, - BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); - bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev, 1); - bufferevent_setcb(ctx->bev, NULL, NULL, ssl_connect_origin_eventcb, ctx); - bufferevent_disable(ctx->bev, EV_READ | EV_WRITE); //waiting for connect event only - future_destroy(ctx->f_peek_chello); - ctx->f_peek_chello = NULL; return; } +static void peek_chello_on_succ(future_result_t * result, void * user) +{ + struct promise * p = (struct promise *) user; + struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) promise_dettach_ctx(p); + + struct ssl_chello * chello = ssl_peek_result_release_chello(result);//chello has been saved in ssl_stream. + ctx->s_stream = ssl_stream_new(ctx->mgr, ctx->fd_upstream, CONN_DIR_UPSTREAM, chello, NULL); + ctx->bev = bufferevent_openssl_socket_new(ctx->evbase, ctx->fd_upstream, + ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); + + bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev, 1); + bufferevent_setcb(ctx->bev, NULL, NULL, ssl_connect_origin_eventcb, ctx); + bufferevent_disable(ctx->bev, EV_READ | EV_WRITE); //waiting for connect event only + + future_destroy(ctx->f_peek_chello); + ctx->f_peek_chello = NULL; + return; +} static void peek_chello_on_fail(enum e_future_error err, const char * what, void * user) { - struct promise* p= (struct promise*)user; - struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) promise_dettach_ctx(p); + struct promise * p = (struct promise *) user; + struct ssl_connect_origin_ctx * ctx = (struct ssl_connect_origin_ctx *) promise_dettach_ctx(p); promise_failed(p, FUTURE_ERROR_EXCEPTION, "upstream create failed for no client hello in downstream."); ssl_connect_origin_ctx_free(ctx); return; } - -extern void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream, +extern void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream, evutil_socket_t fd_downstream, struct event_base * evbase) { struct promise * p = future_to_promise(f); struct ssl_connect_origin_ctx * ctx = ALLOC(struct ssl_connect_origin_ctx, 1); - int ret = 0; + int ret = 0; struct sockaddr addr; - socklen_t addrlen; + socklen_t addrlen; - ret = getpeername(fd_downstream, & (ctx->addr), & (ctx->addrlen)); + ret = getpeername(fd_downstream, &(ctx->addr), &(ctx->addrlen)); assert(ret == 0); - ctx->fd_downstream = fd_downstream; - ctx->fd_upstream = fd_upstream; - ctx->evbase = evbase; - ctx->mgr = mgr; + ctx->fd_downstream = fd_downstream; + ctx->fd_upstream = fd_upstream; + ctx->evbase = evbase; + ctx->mgr = mgr; promise_set_ctx(p, ctx, ssl_connect_origin_ctx_free); - - ctx->f_peek_chello = future_create(peek_chello_on_succ, peek_chello_on_fail, p); + + ctx->f_peek_chello = future_create(peek_chello_on_succ, peek_chello_on_fail, p); ssl_async_peek_client_hello(ctx->f_peek_chello, fd_downstream, evbase, mgr->logger); return; } - /* * Called by OpenSSL when a new src SSL session is created. * OpenSSL increments the refcount before calling the callback and will @@ -669,63 +678,58 @@ extern void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, e */ static int ossl_sessnew_cb(SSL * ssl, SSL_SESSION * sess) { - struct ssl_mgr * mgr = (struct ssl_mgr *) - SSL_get_ex_data(ssl, SSL_EX_DATA_IDX_SSLMGR, mgr); + struct ssl_mgr * mgr = (struct ssl_mgr *) SSL_get_ex_data(ssl, SSL_EX_DATA_IDX_SSLMGR); #ifdef HAVE_SSLV2 - /* Session resumption seems to fail for SSLv2 with protocol - * parsing errors, so we disable caching for SSLv2. */ - if (SSL_version(ssl) == SSL2_VERSION) { - return 0; - } + /* Session resumption seems to fail for SSLv2 with protocol + * parsing errors, so we disable caching for SSLv2. */ + if (SSL_version(ssl) == SSL2_VERSION) { + return 0; + } #endif /* HAVE_SSLV2 */ - if (sess) { + if (sess) + { down_session_set(mgr->down_sess_cache, sess); } return 0; } - /* * Called by OpenSSL when a src SSL session should be removed. * OpenSSL calls SSL_SESSION_free() after calling the callback; * we do not need to free the reference here. */ -static void ossl_sessremove_cb(UNUSED SSL_CTX * sslctx, SSL_SESSION * sess) +static void ossl_sessremove_cb(SSL_CTX * sslctx, SSL_SESSION * sess) { - struct ssl_mgr * mgr = (struct ssl_mgr *) - SSL_get_ex_data(ssl, SSL_EX_DATA_IDX_SSLMGR, mgr); + struct ssl_mgr * mgr = (struct ssl_mgr *)SSL_CTX_get_ex_data(sslctx, SSL_EX_DATA_IDX_SSLMGR); assert(mgr != NULL); - if (sess) { - down_session_del(mgr->down_sess_cache, sess) ; + if (sess) + { + down_session_del(mgr->down_sess_cache, sess); } return; } - /* * Called by OpenSSL when a src SSL session is requested by the client. OPENSSL_VERSION_NUMBER >= 0x10100000L required. */ -static SSL_SESSION * ossl_sessget_cb(UNUSED SSL * ssl, const unsigned char * id, int idlen, int * copy) +static SSL_SESSION * ossl_sessget_cb(SSL * ssl, const unsigned char * id, int idlen, int * copy) { - struct ssl_mgr * mgr = (struct ssl_mgr *) - SSL_get_ex_data(ssl, SSL_EX_DATA_IDX_SSLMGR, mgr); + struct ssl_mgr * mgr = (struct ssl_mgr *)SSL_get_ex_data(ssl, SSL_EX_DATA_IDX_SSLMGR); SSL_SESSION * sess; - *copy = 0; /* SSL should not increment reference count of session */ - sess = (SSL_SESSION *) - down_session_get(mgr->down_sess_cache, id, idlen); + *copy = 0; /* SSL should not increment reference count of session */ + sess = (SSL_SESSION *)down_session_get(mgr->down_sess_cache, id, idlen); return sess; } - /* * Set SSL_CTX options that are the same for incoming and outgoing SSL_CTX. */ @@ -756,7 +760,7 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) if (mgr->no_ssl2) { #endif /* HAVE_SSLV2 */ - SSL_CTX_set_options(sslctx, SSL_OP_NO_SSLv2); + SSL_CTX_set_options(sslctx, SSL_OP_NO_SSLv2); #ifdef HAVE_SSLV2 } @@ -767,15 +771,16 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) #ifdef HAVE_SSLV3 - if (mgr->no_ssl3) { - SSL_CTX_set_options(sslctx, SSL_OP_NO_SSLv3); - } + if (mgr->no_ssl3) { + SSL_CTX_set_options(sslctx, SSL_OP_NO_SSLv3); + } #endif /* HAVE_SSLV3 */ #ifdef HAVE_TLSV10 - if (mgr->no_tls10) { + if (mgr->no_tls10) + { SSL_CTX_set_options(sslctx, SSL_OP_NO_TLSv1); } @@ -783,7 +788,8 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) #ifdef HAVE_TLSV11 - if (mgr->no_tls11) { + if (mgr->no_tls11) + { SSL_CTX_set_options(sslctx, SSL_OP_NO_TLSv1_1); } @@ -791,7 +797,8 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) #ifdef HAVE_TLSV12 - if (mgr->no_tls12) { + if (mgr->no_tls12) + { SSL_CTX_set_options(sslctx, SSL_OP_NO_TLSv1_2); } @@ -799,7 +806,8 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) #ifdef SSL_OP_NO_COMPRESSION - if (!mgr->sslcomp) { + if (!mgr->sslcomp) + { SSL_CTX_set_options(sslctx, SSL_OP_NO_COMPRESSION); } @@ -808,24 +816,25 @@ static void sslctx_set_opts(SSL_CTX * sslctx, struct ssl_mgr * mgr) SSL_CTX_set_cipher_list(sslctx, mgr->default_ciphers); } - /* * Create and set up a new SSL_CTX instance for terminating SSL. * Set up all the necessary callbacks, the keyring, the keyring chain and key. */ -static SSL* downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt) +static SSL * downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt) { - SSL_CTX * sslctx = SSL_CTX_new(mgr->sslmethod()); - if (!sslctx) - return NULL; - SSL* ssl=NULL; - int ret=0; + SSL_CTX * sslctx = SSL_CTX_new(mgr->sslmethod()); + if (!sslctx) return NULL; + + SSL * ssl = NULL; + int ret = 0; sslctx_set_opts(sslctx, mgr); //TFE's OPENSSL_VERSION_NUMBER >= 0x10100000L - if (mgr->sslversion) { + if (mgr->sslversion) + { if (SSL_CTX_set_min_proto_version(sslctx, mgr->sslversion) == 0 || - SSL_CTX_set_max_proto_version(sslctx, mgr->sslversion) == 0) { + SSL_CTX_set_max_proto_version(sslctx, mgr->sslversion) == 0) + { SSL_CTX_free(sslctx); return NULL; } @@ -835,50 +844,48 @@ static SSL* downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt) SSL_CTX_sess_set_remove_cb(sslctx, ossl_sessremove_cb); SSL_CTX_sess_set_get_cb(sslctx, ossl_sessget_cb); SSL_CTX_set_session_cache_mode(sslctx, SSL_SESS_CACHE_SERVER | SSL_SESS_CACHE_NO_INTERNAL); - SSL_CTX_set_session_id_context(sslctx, (void *) (mgr->ssl_session_context), - sizeof(mgr->ssl_session_context)); + SSL_CTX_set_session_id_context(sslctx, (const unsigned char *)mgr->ssl_session_context, sizeof(mgr->ssl_session_context)); -#ifndef OPENSSL_NO_DH - - if (mgr->dh) { + if (mgr->dh) + { SSL_CTX_set_tmp_dh(sslctx, mgr->dh); } - else { + else + { SSL_CTX_set_tmp_dh_callback(sslctx, ssl_tmp_dh_callback); } -#endif /* !OPENSSL_NO_DH */ - -#ifndef OPENSSL_NO_ECDH - - if (mgr->ecdhcurve) { - EC_KEY * ecdh = ssl_ec_by_name(mgr->ecdhcurve); + if (mgr->ecdhcurve) + { + EC_KEY * ecdh = ssl_ec_by_name(mgr->ecdhcurve); SSL_CTX_set_tmp_ecdh(sslctx, ecdh); EC_KEY_free(ecdh); } - else { - EC_KEY * ecdh = ssl_ec_by_name(NULL); + else + { + EC_KEY * ecdh = ssl_ec_by_name(NULL); SSL_CTX_set_tmp_ecdh(sslctx, ecdh); EC_KEY_free(ecdh); } -#endif /* !OPENSSL_NO_ECDH */ - SSL_CTX_use_certificate(sslctx, crt->cert); SSL_CTX_use_PrivateKey(sslctx, crt->key); - for (int i = 0; i < sk_X509_num(crt->chain); i++) { - X509 * c = sk_X509_value(crt->chain, i); - ssl_x509_refcount_inc(c); /* next call consumes a reference */ + for (int i = 0; i < sk_X509_num(crt->chain); i++) + { + X509 * c = sk_X509_value(crt->chain, i); + ssl_x509_refcount_inc(c); /* next call consumes a reference */ SSL_CTX_add_extra_chain_cert(sslctx, c); } - ssl=SSL_new(sslctx); - SSL_CTX_free(sslctx); // SSL_new() increments refcount - sslctx=NULL; + + ssl = SSL_new(sslctx); + SSL_CTX_free(sslctx); // SSL_new() increments refcount + sslctx = NULL; + ret = SSL_set_ex_data(ssl, SSL_EX_DATA_IDX_SSLMGR, mgr); assert(ret == 1); - - if (mgr->SSL_MODE_RELEASE_BUFFERS == 1) + + if (mgr->ssl_mode_release_buffers == 1) { /* lower memory footprint for idle connections */ SSL_set_mode(ssl, SSL_get_mode(ssl) | SSL_MODE_RELEASE_BUFFERS); @@ -887,110 +894,115 @@ static SSL* downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt) return ssl; } - void query_cert_ctx_free(struct ask_keyring_ctx * ctx) { X509_free(ctx->origin_crt); - if (ctx->f_query_cert != NULL) { + if (ctx->f_query_cert != NULL) + { future_destroy(ctx->f_query_cert); - ctx->f_query_cert = NULL; + ctx->f_query_cert = NULL; } - if(ctx->bev_down!=NULL) + + if (ctx->bev_down != NULL) { bufferevent_free(ctx->bev_down); } - if(ctx->downstream!=NULL) + + if (ctx->downstream != NULL) { ssl_stream_free(ctx->downstream); } + return; } -struct ssl_stream* ssl_downstream_create_result_release_stream(future_result_t* result) +void query_cert_ctx_free(struct promise * p) { - struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *)result; - struct ssl_stream* ret=ctx->downstream; - ctx->downstream=NULL; + struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *)promise_dettach_ctx(p); + query_cert_ctx_free(ctx); +} + +struct ssl_stream * ssl_downstream_create_result_release_stream(future_result_t * result) +{ + struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *) result; + struct ssl_stream * ret = ctx->downstream; + ctx->downstream = NULL; return ret; } -struct bufferevent* ssl_downstream_create_result_release_bev(future_result_t* result) +struct bufferevent * ssl_downstream_create_result_release_bev(future_result_t * result) { - struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *)result; - struct bufferevent* ret=ctx->bev_down; - ctx->bev_down=NULL; + struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *) result; + struct bufferevent * ret = ctx->bev_down; + ctx->bev_down = NULL; return ret; } void ask_keyring_on_succ(void * result, void * user) { - struct promise* p=(struct promise*)user; - struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *)promise_dettach_ctx(promise); - - struct ssl_stream * downstream = NULL; - struct keyring* crt=NULL; - - struct ssl_mgr* mgr=ctx->ssl_mgr; - - future_destroy(ctx->f_query_cert); - ctx->f_query_cert=NULL; - - crt=key_keeper_release_cert(result); + struct promise * p = (struct promise *) user; + struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *) promise_dettach_ctx(p); - ctx->downstream = ssl_stream_new(mgr, ctx->fd_downstream, CONN_DIR_DOWNSTREAM, NULL, crt); - - ctx->bev_down = bufferevent_openssl_socket_new(ctx->evbase, ctx->fd_downstream, ctx->downstream->ssl, + struct ssl_stream * downstream = NULL; + struct keyring * crt = NULL; + + struct ssl_mgr * mgr = ctx->ssl_mgr; + + future_destroy(ctx->f_query_cert); + ctx->f_query_cert = NULL; + + crt = key_keeper_release_cert(result); + ctx->downstream = ssl_stream_new(mgr, ctx->fd_downstream, CONN_DIR_DOWNSTREAM, NULL, crt); + ctx->bev_down = bufferevent_openssl_socket_new(ctx->evbase, ctx->fd_downstream, ctx->downstream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev_down, 1); promise_success(p, ctx); key_keeper_free_keyring(crt); query_cert_ctx_free(ctx); - return; } - void ask_keyring_on_fail(enum e_future_error error, const char * what, void * user) { - struct promise* p=(struct promise*)user; - struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *)promise_dettach_ctx(promise); + struct promise * p = (struct promise *) user; + struct ask_keyring_ctx * ctx = (struct ask_keyring_ctx *) promise_dettach_ctx(p); promise_failed(p, error, what); query_cert_ctx_free(ctx); return; } - /* * Create a SSL stream for the incoming connection, based on the upstream. */ -void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream, evutil_socket_t fd_downstream, int keyring_id, struct event_base * evbase) +void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream, + evutil_socket_t fd_downstream, int keyring_id, struct event_base * evbase) { assert(upstream->dir == CONN_DIR_UPSTREAM); struct ask_keyring_ctx * ctx = ALLOC(struct ask_keyring_ctx, 1); - ctx->keyring_id = keyring_id; - ctx->ssl_mgr = mgr; - ctx->fd_downstream = fd_downstream; - ctx->evbase = evbase; + ctx->keyring_id = keyring_id; + ctx->ssl_mgr = mgr; + ctx->fd_downstream = fd_downstream; + ctx->evbase = evbase; - if(upstream!=NULL) + if (upstream != NULL) { - ctx->origin_ssl = upstream; - ctx->origin_crt = SSL_get_peer_certificate(upstream->ssl); + ctx->origin_ssl = upstream; + ctx->origin_crt = SSL_get_peer_certificate(upstream->ssl); } - - struct promise* p = future_to_promise(f); + + struct promise * p = future_to_promise(f); promise_set_ctx(p, ctx, query_cert_ctx_free); - ctx->is_origin_crt_vaild=ssl_conn_verify_cert(mgr->trust_CA_store, upstream->ssl); - - ctx->f_query_cert = future_create(ask_keyring_on_succ, ask_keyring_on_fail, p); + ctx->is_origin_crt_vaild = ssl_conn_verify_cert(mgr->trust_CA_store, upstream->ssl); + + ctx->f_query_cert = future_create(ask_keyring_on_succ, ask_keyring_on_fail, p); //todo add a is_valid_cert flag to keyring manager query API. - key_keeper_async_ask(ctx->f_query_cert, mgr->keeper_of_keys, keyring_id, ctx->origin_crt, ctx->is_origin_crt_vaild, evbase); + key_keeper_async_ask(ctx->f_query_cert, mgr->keeper_of_keys, keyring_id, ctx->origin_crt, ctx->is_origin_crt_vaild, + evbase); return; } - /* * Cleanly shut down an SSL socket. Libevent currently has no support for * cleanly shutting down an SSL socket so we work around that by using a @@ -998,42 +1010,41 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct * with the older SSL_shutdown() semantics, not exposing WANT_READ/WRITE * may or may not work. */ -static struct ssl_shutdown_ctx * ssl_shutdown_ctx_new(struct ssl_stream * s_stream, struct event_base * evbase) { +static struct ssl_shutdown_ctx * ssl_shutdown_ctx_new(struct ssl_stream * s_stream, struct event_base * evbase) +{ struct ssl_shutdown_ctx * ctx = ALLOC(struct ssl_shutdown_ctx, 1); ctx->evbase = evbase; ctx->s_stream = s_stream; ctx->ev = NULL; ctx->retries = 0; return ctx; -} +} static void ssl_shutdown_ctx_free(struct ssl_shutdown_ctx * ctx) { free(ctx); } - /* * The shutdown socket event handler. This is either * scheduled as a timeout-only event, or as a fd read or * fd write event, depending on whether SSL_shutdown() * indicates it needs read or write on the socket. */ -static void pxy_ssl_shutdown_cb(evutil_socket_t fd, UNUSED short what, void * arg) +static void pxy_ssl_shutdown_cb(evutil_socket_t fd, short what, void * arg) { - struct ssl_shutdown_ctx * ctx = (struct ssl_shutdown_ctx *) arg; + struct ssl_shutdown_ctx * ctx = (struct ssl_shutdown_ctx *) arg; - struct timeval retry_delay = { - 0, 100 - }; + struct timeval retry_delay = {0, 100 }; - void * logger = ctx->s_stream->mgr->logger; - short want = 0; - int rv = 0, sslerr = 0; + void * logger = ctx->s_stream->mgr->logger; + short want = 0; + int rv = 0, sslerr = 0; - if (ctx->ev) { + if (ctx->ev) + { event_free(ctx->ev); - ctx->ev = NULL; + ctx->ev = NULL; } /* @@ -1045,31 +1056,27 @@ static void pxy_ssl_shutdown_cb(evutil_socket_t fd, UNUSED short what, void * ar * This is a good collection of recent and relevant documents: * http://bugs.python.org/issue8108 */ - rv = SSL_shutdown(ctx->s_stream->ssl); + rv = SSL_shutdown(ctx->s_stream->ssl); if (rv == 1) goto complete; - if (rv != -1) { + if (rv != -1) + { goto retry; } switch ((sslerr = SSL_get_error(ctx->s_stream->ssl, rv))) { - case SSL_ERROR_WANT_READ: - want = EV_READ; + case SSL_ERROR_WANT_READ: want = EV_READ; goto retry; - case SSL_ERROR_WANT_WRITE: - want = EV_WRITE; - goto retry; - case SSL_ERROR_ZERO_RETURN: + case SSL_ERROR_WANT_WRITE: want = EV_WRITE; goto retry; + case SSL_ERROR_ZERO_RETURN: goto retry; case SSL_ERROR_SYSCALL: - case SSL_ERROR_SSL: - goto complete; - default: - TFE_LOG_ERROR(logger, "Unhandled SSL_shutdown() " - "error %i. Closing fd.\n", sslerr); + case SSL_ERROR_SSL: goto complete; + default: TFE_LOG_ERROR(logger, "Unhandled SSL_shutdown() " + "error %i. Closing fd.\n", sslerr); goto complete; } @@ -1077,31 +1084,30 @@ static void pxy_ssl_shutdown_cb(evutil_socket_t fd, UNUSED short what, void * ar retry: - if (ctx->retries++ >= MAX_NET_RETRIES) { + if (ctx->retries++ >= MAX_NET_RETRIES) + { TFE_LOG_ERROR(logger, "Failed to shutdown SSL connection cleanly: " - "Max retries reached. Closing fd.\n"); + "Max retries reached. Closing fd.\n"); goto complete; } - ctx->ev = event_new(ctx->evbase, fd, want, pxy_ssl_shutdown_cb, ctx); + ctx->ev = event_new(ctx->evbase, fd, want, pxy_ssl_shutdown_cb, ctx); - if (ctx->ev) { + if (ctx->ev) + { event_add(ctx->ev, &retry_delay); return; } TFE_LOG_ERROR(logger, "Failed to shutdown SSL connection cleanly: " - "Cannot create event. Closing fd.\n"); - + "Cannot create event. Closing fd.\n"); complete: ssl_stream_free(ctx->s_stream); evutil_closesocket(fd); ssl_shutdown_ctx_free(ctx); - } - /* * Cleanly shutdown an SSL session on file descriptor fd using low-level * file descriptor readiness events on event base evbase. @@ -1111,9 +1117,6 @@ complete: void ssl_stream_free_and_close_fd(struct ssl_stream * s_stream, struct event_base * evbase, evutil_socket_t fd) { struct ssl_shutdown_ctx * sslshutctx = NULL; - sslshutctx = ssl_shutdown_ctx_new(s_stream, evbase); + sslshutctx = ssl_shutdown_ctx_new(s_stream, evbase); pxy_ssl_shutdown_cb(fd, 0, sslshutctx); - return; } - - diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp new file mode 100644 index 0000000..070d047 --- /dev/null +++ b/platform/src/tcp_stream.cpp @@ -0,0 +1,492 @@ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifndef TFE_CONFIG_OUTPUT_LIMIT_DEFAULT +#define TFE_CONFIG_OUTPUT_LIMIT_DEFAULT (1024 * 1024) +#endif + +/* forward declaration of libevent callbacks */ +static void tfe_stream_readcb(struct bufferevent *, void *); +static void tfe_stream_writecb(struct bufferevent *, void *); +static void tfe_stream_eventcb(struct bufferevent *, short, void *); + +static inline struct tfe_stream_private * __TO_STREAM_PRIVATE(const struct tfe_stream * stream) +{ + return container_of(stream, struct tfe_stream_private, head); +} + +static inline struct tfe_conn_private * __THIS_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) +{ + return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_upstream) : (_stream->conn_downstream)); +} + +static inline struct tfe_conn_private * __PEER_CONN(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) +{ + return ((dir == CONN_DIR_UPSTREAM) ? (_stream->conn_downstream) : (_stream->conn_upstream)); +} + +static inline enum tfe_conn_dir __DIR(struct tfe_stream_private * _stream, struct bufferevent * bev) +{ + return ((bev == _stream->conn_downstream->bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM); +} + +static inline bool __IS_SSL(struct tfe_stream_private * _stream) +{ + return (_stream->session_type == SESSION_PROTO_SSL); +} + +void tfe_stream_detach(const struct tfe_stream * stream) +{ + struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); + int plug_id = _stream->calling_idx; + _stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED; + return; +} + +int tfe_stream_preempt(const struct tfe_stream * stream) +{ + struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); + int plug_id = _stream->calling_idx; + int i = 0; + for (i = 0; i < _stream->plugin_num; i++) + { + if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION) + { + return -1; + } + } + _stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION; + return 0; +} + + + +struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) +{ + struct tfe_stream_private * _stream = __TO_STREAM_PRIVATE(stream); + struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); + struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + + if (this_conn->on_writing == 1) + { + return NULL; + } + this_conn->w_ctx.dir = dir; + this_conn->w_ctx._stream = _stream; + this_conn->on_writing = 1; + bufferevent_disable(peer_conn->bev, EV_READ); + return &(this_conn->w_ctx); +} + +int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) +{ + struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir);; + int ret = bufferevent_write(this_conn->bev, data, size); + return ret; +} +void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) +{ + struct tfe_conn_private * this_conn = __THIS_CONN(w_ctx->_stream, w_ctx->dir); + struct tfe_conn_private * peer_conn = __PEER_CONN(w_ctx->_stream, w_ctx->dir); + this_conn->on_writing = 0; + bufferevent_enable(peer_conn->bev, EV_READ); + return; +} + +int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) +{ + int ret = 0; + struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir); + ret = tfe_stream_write_frag(wctx, data, size); + tfe_stream_write_frag_end(wctx); + return ret; +} + +/* + * Callback for read events on the up- and downstream connection bufferevents. + * Called when there is data ready in the input evbuffer. + */ + +static void tfe_stream_readcb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + enum tfe_conn_dir dir = __DIR(_stream, bev); + struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); + struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + + int i = 0, ret = 0; + enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA; + + const struct tfe_plugin * plugins = _stream->thread_ref->modules; + struct plugin_ctx * plug_ctx = NULL; + int plug_num = _stream->thread_ref->nr_modules; + + struct evbuffer * inbuf = bufferevent_get_input(bev); + struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); + + size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0; + const unsigned char * contiguous_data = (const unsigned char *) evbuffer_pullup(inbuf, contigous_len); + + _stream->defere_bytes = 0; + _stream->drop_bytes = 0; + _stream->forward_bytes = 0; + + for (i = 0; i < plug_num; i++) + { + _stream->calling_idx = i; + plug_ctx = _stream->plug_ctx + i; + + if (_stream->is_plugin_opened == 0) + { + action_tmp = plugins[i].on_open(&_stream->head, _stream->thread_ref->thread_id, + dir, contiguous_data, contigous_len, &(plug_ctx->pme)); + _stream->is_plugin_opened = 1; + } + else + { + action_tmp = plugins[i].on_data(&_stream->head, _stream->thread_ref->thread_id, + dir, contiguous_data, contigous_len, &(plug_ctx->pme)); + } + + if (plug_ctx->state == PLUG_STATE_PREEPTION) + { + action_final = action_tmp; + } + } + + switch (action_final) + { + case ACTION_FORWARD_DATA: + if (_stream->forward_bytes > 0) + { + evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes); + } + else + { + evbuffer_add_buffer(outbuf, inbuf); + } + break; + case ACTION_DROP_DATA: + if (_stream->drop_bytes > 0) + { + drain_size = _stream->drop_bytes; + } + else + { + drain_size = evbuffer_get_length(inbuf); + } + evbuffer_drain(inbuf, drain_size); + case ACTION_DEFER_DATA: + if (_stream->defere_bytes > 0) + { + bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0); + } + break; + default: assert(0); + break; + } + + if (evbuffer_get_length(inbuf) != 0) + { + bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); + } + + if (evbuffer_get_length(outbuf) >= TFE_CONFIG_OUTPUT_LIMIT_DEFAULT) + { + bufferevent_setwatermark(peer_conn->bev, EV_WRITE, TFE_CONFIG_OUTPUT_LIMIT_DEFAULT / 2, + TFE_CONFIG_OUTPUT_LIMIT_DEFAULT); + bufferevent_disable(bev, EV_READ); + } + + return; +} + +/* + * Callback for write events on the up- and downstream connection bufferevents. + * Called when either all data from the output evbuffer has been written, + * or if the outbuf is only half full again after having been full. + */ +static void tfe_stream_writecb(struct bufferevent * bev, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + enum tfe_conn_dir dir = __DIR(_stream, bev); + struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); + struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + + struct evbuffer * outbuf = bufferevent_get_output(bev); + + if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ)) + { + /* data source temporarily disabled; + * re-enable and reset watermark to 0. */ + bufferevent_setwatermark(bev, EV_WRITE, 0, 0); + bufferevent_enable(peer_conn->bev, EV_READ); + } +} + +/* + * Callback for meta events on the up- and downstream connection bufferevents. + * Called when EOF has been reached, a connection has been made, and on errors. + */ +static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; + enum tfe_conn_dir dir = __DIR(_stream, bev); + struct tfe_conn_private * this_conn = __THIS_CONN(_stream, dir); + struct tfe_conn_private * peer_conn = __PEER_CONN(_stream, dir); + + const struct tfe_plugin * plugins = _stream->thread_ref->modules; + struct plugin_ctx * plug_ctx = NULL; + int plug_num = _stream->thread_ref->nr_modules, i = 0; + enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED; + + if (events & BEV_EVENT_ERROR) + { + this_conn->closed = 1; + reason = REASON_ERROR; + goto call_plugin_close; + } + + if (events & BEV_EVENT_EOF) + { + //generate a 0 size read callback to notify plugins. + tfe_stream_readcb(bev, arg); + this_conn->closed = 1; + } + if (peer_conn->closed == 1 && this_conn->closed == 1) + { + reason = REASON_PASSIVE_CLOSED; + goto call_plugin_close; + } + return; + +call_plugin_close: + for (i = 0; i < plug_num; i++) + { + _stream->calling_idx = i; + plug_ctx = _stream->plug_ctx + i; + plugins[i].on_close(&(_stream->head), _stream->thread_ref->thread_id, reason, &(plug_ctx->pme)); + } + + tfe_stream_destory(_stream); + return; +} + +static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, evutil_socket_t fd) +{ + struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); + struct event_base * __ev_base = stream->thread_ref->evbase; + + __conn_private->bev = bufferevent_socket_new(__ev_base, fd, BEV_OPT_DEFER_CALLBACKS); + __conn_private->fd = fd; + + if (!__conn_private->bev) + { + TFE_LOG_ERROR(__STREAM_LOGGER(stream), "Failed at creating bufferevent for fd %d", fd); + goto __errout; + } + + bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream); + bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); + return __conn_private; + +__errout: + if (__conn_private != NULL) free(__conn_private); + return NULL; +} + +static tfe_conn_private * __conn_private_create(struct tfe_stream_private * stream, struct bufferevent * bev) +{ + struct tfe_conn_private * __conn_private = ALLOC(struct tfe_conn_private, 1); + __conn_private->bev = bev; + __conn_private->fd = bufferevent_getfd(bev); + + bufferevent_setcb(__conn_private->bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, stream); + bufferevent_enable(__conn_private->bev, EV_READ | EV_WRITE); + return __conn_private; +} + +evutil_socket_t __conn_private_release_fd(struct tfe_conn_private * conn) +{ + evutil_socket_t __to_release_fd = conn->fd; + conn->fd = 0; + return __to_release_fd; +} + +static void __conn_private_destory(struct tfe_conn_private * conn) +{ + return; +} + +void ssl_downstream_create_on_success(future_result_t * result, void * user) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; + struct ssl_stream * downstream = ssl_downstream_create_result_release_stream(result); + struct bufferevent * bev = ssl_downstream_create_result_release_bev(result); + + _stream->conn_downstream = __conn_private_create(_stream, bev); + _stream->ssl_downstream = downstream; + + future_destroy(_stream->future_downstream_create); + _stream->future_downstream_create = NULL; + _stream->defer_fd_downstream = 0; + + return; +} + +void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, void * user) +{ + return; +} + +void ssl_upstream_create_on_success(future_result_t * result, void * user) +{ + struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; + struct event_base * ev_base = _stream->thread_ref->evbase; + + struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result); + struct bufferevent * bev = ssl_upstream_create_result_release_bev(result); + assert(upstream != NULL && bev != NULL); + + /* Create connection ctx by bev */ + _stream->conn_upstream = __conn_private_create(_stream, bev); + _stream->ssl_upstream = upstream; + + future_destroy(_stream->future_upstream_create); + _stream->future_upstream_create = NULL; + _stream->defer_fd_upstream = 0; + + /* Next, create downstream */ + _stream->future_downstream_create = future_create(ssl_downstream_create_on_success, + ssl_downstream_create_on_fail, _stream); + + ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr, + _stream->ssl_upstream, _stream->defer_fd_downstream, /* KEYRING ID */ 0, ev_base); +} + +void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) +{ + assert(0); +} + +struct tfe_stream * tfe_stream_create(struct tfe_proxy * pxy, struct tfe_thread_ctx * thread_ctx) +{ + struct tfe_stream_private * _stream = ALLOC(struct tfe_stream_private, 1); + _stream->thread_ref = thread_ctx; + _stream->proxy_ref = pxy; + return (struct tfe_stream *) &_stream->head; +} + +void tfe_stream_destory(struct tfe_stream_private * stream) +{ + struct tfe_thread_ctx * thread = stream->thread_ref; + struct tfe_proxy * proxy = stream->proxy_ref; + struct event_base * ev_base = thread->evbase; + + if (__IS_SSL(stream) && stream->ssl_upstream) + { + evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); + ssl_stream_free_and_close_fd(stream->ssl_upstream, ev_base, __to_closed_fd); + } + + if (__IS_SSL(stream) && stream->ssl_downstream) + { + evutil_socket_t __to_closed_fd = __conn_private_release_fd(stream->conn_upstream); + ssl_stream_free_and_close_fd(stream->ssl_downstream, ev_base, __to_closed_fd); + } + + if (stream->conn_upstream) + { + __conn_private_destory(stream->conn_upstream); + } + + if (stream->conn_downstream) + { + __conn_private_destory(stream->conn_downstream); + } + + if (stream->defer_fd_downstream) + { + evutil_closesocket(stream->defer_fd_downstream); + } + + if (stream->defer_fd_upstream) + { + evutil_closesocket(stream->defer_fd_upstream); + } + + if (stream->future_downstream_create) + { + future_destroy(stream->future_downstream_create); + } + + if (stream->future_upstream_create) + { + future_destroy(stream->future_upstream_create); + } + + free(stream); + thread->load--; +} + +void tfe_stream_init_by_fds(struct tfe_stream * stream, enum tfe_session_proto session_type, + evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) +{ + struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); + struct event_base * ev_base = _stream->thread_ref->evbase; + + if (session_type == SESSION_PROTO_PLAIN) + { + _stream->conn_downstream = __conn_private_create(_stream, fd_downstream); + _stream->conn_upstream = __conn_private_create(_stream, fd_upstream); + + assert(_stream->conn_downstream != NULL); + assert(_stream->conn_upstream != NULL); + } + + if (session_type == SESSION_PROTO_SSL) + { + _stream->ssl_mgr = _stream->proxy_ref->ssl_mgr_handler; + + _stream->future_upstream_create = future_create( + ssl_upstream_create_on_success, ssl_upstream_create_on_fail, (void *) _stream); + + /* Defer setup conn_downstream & conn_upstream in async callbacks. */ + ssl_async_upstream_create(_stream->future_upstream_create, + _stream->ssl_mgr, fd_upstream, fd_downstream, ev_base); + + _stream->defer_fd_downstream = fd_downstream; + _stream->defer_fd_upstream = fd_upstream; + } + + _stream->session_type = session_type; +} diff --git a/platform/src/tfe_stream.cpp b/platform/src/tfe_stream.cpp deleted file mode 100644 index 6a3cffa..0000000 --- a/platform/src/tfe_stream.cpp +++ /dev/null @@ -1,560 +0,0 @@ - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include - -#define STREAM_EVBASE(s) ((s)->thrmgr_ref->evbase) -/* - * Maximum size of data to buffer per connection direction before - * temporarily stopping to read data from the other end. - */ -#define OUTBUF_LIMIT (1024*1024) - -/* - * Print helper for logging code. - */ -#define STRORDASH(x) (((x)&&*(x))?(x):"-") - -/* forward declaration of libevent callbacks */ -static void tfe_stream_readcb(struct bufferevent *, void *); -static void tfe_stream_writecb(struct bufferevent *, void *); -static void tfe_stream_eventcb(struct bufferevent *, short, void *); -static void stream_fd_readcb(evutil_socket_t, short, void *); - -static void tfe_stream_free(struct tfe_stream_private * stream) -{ - struct tfe_thread_ctx * thread = stream->thrmgr_ref; - thread->load--; - - switch (stream->session_type) - { - case SESSION_PROTO_SSL: -#if 0 - ssl_upstream_free(stream->ssl_upstream); - ssl_downstream_free(stream->ssl_downstream); -#endif - thread->stat.value[SSL_NUM]--; - break; - default: break; - } - - free(stream); - thread->stat.value[STREAM_NUM]--; - return; -} - -void tfe_stream_detach(const struct tfe_stream * stream) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) stream; - int plug_id = _stream->calling_idx; - _stream->plug_ctx[plug_id].state = PLUG_STATE_DETACHED; - return; -} - -int tfe_stream_preempt(const struct tfe_stream * stream) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) stream; - int plug_id = _stream->calling_idx; - int i = 0; - for (i = 0; i < _stream->plugin_num; i++) - { - if (_stream->plug_ctx[i].state == PLUG_STATE_PREEPTION) - { - return -1; - } - } - _stream->plug_ctx[plug_id].state = PLUG_STATE_PREEPTION; - return 0; -} - -static inline struct tfe_conn_private * __this_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) -{ - struct tfe_conn_private * this_conn = NULL; - this_conn = ((dir == CONN_DIR_UPSTREAM) ? &(_stream->conn_downstream) : &(_stream->conn_upstream)); - return this_conn; -} - -static inline struct tfe_conn_private * __peer_conn(struct tfe_stream_private * _stream, enum tfe_conn_dir dir) -{ - struct tfe_conn_private * peer_conn = NULL; - peer_conn = (dir == CONN_DIR_UPSTREAM) ? &(_stream->conn_downstream) : &(_stream->conn_upstream); - return peer_conn; -} - -struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_stream * stream, enum tfe_conn_dir dir) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) stream; - struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); - if (this_conn->on_writing == 1) - { - return NULL; - } - this_conn->w_ctx.dir = dir; - this_conn->w_ctx._stream = _stream; - this_conn->on_writing = 1; - bufferevent_disable(peer_conn->bev, EV_READ); - return &(this_conn->w_ctx); -} - -int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) -{ - struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);; - int ret = bufferevent_write(this_conn->bev, data, size); - return ret; -} -void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) -{ - struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); - struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); - this_conn->on_writing = 0; - bufferevent_enable(peer_conn->bev, EV_READ); - return; -} - -int tfe_stream_write(const struct tfe_stream * stream, enum tfe_conn_dir dir, const unsigned char * data, size_t size) -{ - int ret = 0; - struct tfe_stream_write_ctx * wctx = tfe_stream_write_frag_start(stream, dir); - ret = tfe_stream_write_frag(wctx, data, size); - tfe_stream_write_frag_end(wctx); - return ret; -} -/* - - #define ON_OPEN_CALL 0 - #define ON_DATA_CALL 1 - #define ON_CLOSE_CALL 2 -enum tfe_stream_action tfe_stream_call_plugin(struct tfe_stream_private* _stream, enum tfe_conn_dir dir, int what, struct evbuffer * inbuf) -{ - - size_t contigous_len=evbuffer_get_length(inbuf),drain_size=0; - const char* contiguous_data=evbuffer_pullup(inbuf,contigous_len); - int i=0,ret=0; - int plug_num=_stream->thrmgr_ref->module_num; - - const struct tfe_plugin* plugins=_stream->thrmgr_ref->modules; - struct plugin_ctx* plug_ctx=NULL; - enum tfe_stream_action action_tmp=ACTION_FORWARD_DATA, action_final=ACTION_FORWARD_DATA; - - _stream->defere_bytes=0; - _stream->drop_bytes=0; - _stream->forward_bytes=0; - switch(what) - { - case ON_OPEN_CALL: - for(i=0;ithrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); - if(plug_ctx->state=PLUG_STATE_PREEPTION) - { - action_final=action_tmp; - } - } - - break; - case ON_DATA_CALL: - for(i=0;ithrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); - if(plug_ctx->state=PLUG_STATE_PREEPTION) - { - action_final=action_tmp; - } - } - case ON_CLOSE_CALL: - for(i=0;ithrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); - if(plug_ctx->state=PLUG_STATE_PREEPTION) - { - action_final=action_tmp; - } - } - } - for(i=0;icalling_idx=i; - switch(what) - { - } - plug_ctx=_stream->plug_ctx+i; - if(_stream->is_fisrt_read==1) - { - action_tmp=plugins[i].on_open(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); - _stream->is_fisrt_read=0; - } - else - { - action_tmp=plugins[i].on_data(&_stream.head, _stream->thrmgr_ref->thread_id, dir, contiguous_data,contigous_len, &(plug_ctx->pme)); - } - if(plug_ctx->state=PLUG_STATE_PREEPTION) - { - action_final=action_tmp; - } - } - -} -*/ -/* - * Callback for read events on the up- and downstream connection bufferevents. - * Called when there is data ready in the input evbuffer. - */ - -static void tfe_stream_readcb(struct bufferevent * bev, void * arg) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = (bev == _stream->conn_downstream.bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; - struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); - - int i = 0, ret = 0; - enum tfe_stream_action action_tmp = ACTION_FORWARD_DATA, action_final = ACTION_FORWARD_DATA; - - const struct tfe_plugin * plugins = _stream->thrmgr_ref->modules; - struct plugin_ctx * plug_ctx = NULL; - int plug_num = _stream->thrmgr_ref->nr_modules; - - struct evbuffer * inbuf = bufferevent_get_input(bev); - struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); - - size_t contigous_len = evbuffer_get_length(inbuf), drain_size = 0; - const unsigned char * contiguous_data = (const unsigned char *)evbuffer_pullup(inbuf, contigous_len); - - _stream->defere_bytes = 0; - _stream->drop_bytes = 0; - _stream->forward_bytes = 0; - - for (i = 0; i < plug_num; i++) - { - _stream->calling_idx = i; - plug_ctx = _stream->plug_ctx + i; - - if (_stream->is_plugin_opened == 0) - { - action_tmp = plugins[i].on_open(&_stream->head, _stream->thrmgr_ref->thread_id, - dir, contiguous_data, contigous_len, &(plug_ctx->pme)); - _stream->is_plugin_opened = 1; - } - else - { - action_tmp = plugins[i].on_data(&_stream->head, _stream->thrmgr_ref->thread_id, - dir, contiguous_data, contigous_len, &(plug_ctx->pme)); - } - - if (plug_ctx->state == PLUG_STATE_PREEPTION) - { - action_final = action_tmp; - } - } - - switch (action_final) - { - case ACTION_FORWARD_DATA: - if (_stream->forward_bytes > 0) - { - evbuffer_remove_buffer(inbuf, outbuf, _stream->forward_bytes); - } - else - { - evbuffer_add_buffer(outbuf, inbuf); - } - break; - case ACTION_DROP_DATA: - if (_stream->drop_bytes > 0) - { - drain_size = _stream->drop_bytes; - } - else - { - drain_size = evbuffer_get_length(inbuf); - } - evbuffer_drain(inbuf, drain_size); - case ACTION_DEFER_DATA: - if (_stream->defere_bytes > 0) - { - bufferevent_setwatermark(bev, EV_WRITE, _stream->defere_bytes, 0); - } - break; - default: - assert(0); - break; - } - - if (evbuffer_get_length(inbuf) != 0) - { - bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS); - } - - if (evbuffer_get_length(outbuf) >= OUTBUF_LIMIT) - { - /* temporarily disable data source; - * set an appropriate watermark. */ - bufferevent_setwatermark(peer_conn->bev, EV_WRITE, OUTBUF_LIMIT / 2, OUTBUF_LIMIT); - bufferevent_disable(bev, EV_READ); - } - - return; -} - -/* - * Callback for write events on the up- and downstream connection bufferevents. - * Called when either all data from the output evbuffer has been written, - * or if the outbuf is only half full again after having been full. - */ -static void tfe_stream_writecb(struct bufferevent * bev, void * arg) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = (bev == _stream->conn_downstream.bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; - struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); - - struct evbuffer * outbuf = bufferevent_get_output(bev); - - if (peer_conn->bev && !(bufferevent_get_enabled(peer_conn->bev) & EV_READ)) - { - /* data source temporarily disabled; - * re-enable and reset watermark to 0. */ - bufferevent_setwatermark(bev, EV_WRITE, 0, 0); - bufferevent_enable(peer_conn->bev, EV_READ); - } -} - -/* - * Callback for meta events on the up- and downstream connection bufferevents. - * Called when EOF has been reached, a connection has been made, and on errors. - */ -static void tfe_stream_eventcb(struct bufferevent * bev, short events, void * arg) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) arg; - enum tfe_conn_dir dir = (bev == _stream->conn_downstream.bev) ? CONN_DIR_UPSTREAM : CONN_DIR_DOWNSTREAM; - struct tfe_conn_private * this_conn = __this_conn(_stream, dir); - struct tfe_conn_private * peer_conn = __peer_conn(_stream, dir); - - const struct tfe_plugin * plugins = _stream->thrmgr_ref->modules; - struct plugin_ctx * plug_ctx = NULL; - int plug_num = _stream->thrmgr_ref->nr_modules, i = 0; - enum tfe_stream_close_reason reason = REASON_PASSIVE_CLOSED; - - if (events & BEV_EVENT_ERROR) - { - this_conn->closed = 1; - reason = REASON_ERROR; - goto call_plugin_close; - } - - if (events & BEV_EVENT_EOF) - { - //generate a 0 size read callback to notify plugins. - tfe_stream_readcb(bev, arg); - this_conn->closed = 1; - } - if (peer_conn->closed == 1 && this_conn->closed == 1) - { - reason = REASON_PASSIVE_CLOSED; - goto call_plugin_close; - } - return; - -call_plugin_close: - for (i = 0; i < plug_num; i++) - { - _stream->calling_idx = i; - plug_ctx = _stream->plug_ctx + i; - plugins[i].on_close(&(_stream->head), _stream->thrmgr_ref->thread_id, reason, &(plug_ctx->pme)); - } - - tfe_stream_free(_stream); - return; - -} - -void ssl_get_cert_on_succ(void * result, void * user) -{ - cert_t * cert = (cert_t *) result; - struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; - - //_stream->ssl_downstream->ssl = downstream_ssl_create(_stream); - //_stream->ssl_downstream->ssl = downstream_ssl_create(_stream); - cert_free(cert); - - bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); - bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); - bufferevent_enable(_stream->head.upstream.bev, EV_READ | EV_WRITE); - bufferevent_enable(_stream->head.downstream.bev, EV_READ | EV_WRITE); - - future_destroy(_stream->ssl_downstream->future_get_cert); - _stream->ssl_downstream->future_get_cert = NULL; - return; -} - -void ssl_get_cert_on_fail(enum e_future_error err, const char * what, void * user) -{ - assert(0); -} - -void ssl_conn_origin_on_succ(void * result, void * user) -{ - struct bufferevent * bev = (struct bufferevent *) result; - struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; - - _stream->head.upstream.bev = bev; - _stream->ssl_upstream->ssl = bufferevent_openssl_get_ssl(bev); /* does not inc refc */ - _stream->ssl_upstream->orig_cert = SSL_get_peer_certificate(_stream->ssl_upstream->ssl); - -#if 0 - up_session_set(_stream->thrmgr_ref->dsess_cache, _stream->ssl_downstream->sni, - SSL_get0_session(_stream->ssl_upstream->ssl)); -#endif - - _stream->ssl_downstream->future_get_cert = future_create(ssl_get_cert_on_succ, ssl_get_cert_on_fail, _stream); - -#if 0 - cert_mgr_async_get(_stream->ssl_downstream->future_get_cert, - _stream->thrmgr_ref->cert_mgr, - _stream->ssl_downstream->sni, - _stream->ssl_downstream->keyring_id, - _stream->ssl_upstream->orig_cert); -#endif - - future_destroy(_stream->ssl_upstream->conn_ssl_srv); - _stream->ssl_upstream->conn_ssl_srv = NULL; -} - -void ssl_conn_origin_on_fail(enum e_future_error err, const char * what, void * user) -{ - //TODO: - assert(0); -} - -void peek_client_hello_on_succ(void * result, void * user) -{ - struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; - assert(_stream->session_type == SESSION_PROTO_SSL); - - struct ssl_downstream * ssl_downstream = _stream->ssl_downstream; - struct ssl_upstream * ssl_upstream = _stream->ssl_upstream; - - _stream->ssl_downstream->sni = tfe_strdup((const char *) result); - future_destroy(ssl_downstream->future_sni_peek); - ssl_downstream->future_sni_peek = NULL; - - _stream->ssl_upstream = ALLOC(struct ssl_upstream, 1); - _stream->ssl_upstream->conn_ssl_srv = future_create(ssl_conn_origin_on_succ, ssl_conn_origin_on_fail, _stream); - ssl_async_upstream_create(_stream->ssl_upstream->conn_ssl_srv, _stream->fd_upstream, _stream->ssl_downstream->sni, - _stream->thrmgr_ref->evbase, NULL); -} - -void peek_client_hello_on_fail(enum e_future_error err, const char * what, void * user) -{ - //TODO: - assert(0); -} - - -int ssl_stream_setup(struct tfe_stream_private * _stream) -{ - return 0; -} - -int __plain_stream_conn_private_init(struct tfe_stream_private * _stream, - struct tfe_conn_private * _conn, evutil_socket_t fd) -{ - struct tfe_proxy * proxy = _stream->proxy; - struct event_base * ev_base = _stream->thrmgr_ref->evbase; - - _conn->bev = bufferevent_socket_new(ev_base, fd, BEV_OPT_DEFER_CALLBACKS) - _conn->fd = fd; - _conn->closed = 0; - _conn->need_shutdown = 0; - _conn->on_writing = 0; -} - -int plain_stream_setup(struct tfe_stream_private * _stream) -{ - return 0; -} - -void tfe_stream_setup(struct tfe_stream_private * _stream) -{ - struct future * f_sni = NULL; - tfe_thread_ctx * thread = _stream->thrmgr_ref; - - if (_stream->session_type == SESSION_PROTO_SSL) - { - _stream->ssl_downstream = ssl_downstream_create(); - _stream->async_future = future_create(peek_client_hello_on_succ, peek_client_hello_on_fail, _stream); - ssl_async_peek_client_hello(_stream->ssl_downstream->future_sni_peek, _stream->fd_downstream, - _stream->thrmgr_ref->evbase); - } - else if (_stream->session_type == SESSION_PROTO_PLAIN) - { - bufferevent_setcb(_stream->head.upstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); - bufferevent_setcb(_stream->head.downstream.bev, tfe_stream_readcb, tfe_stream_writecb, tfe_stream_eventcb, _stream); - bufferevent_enable(_stream->head.upstream.bev, EV_READ | EV_WRITE); - bufferevent_enable(_stream->head.downstream.bev, EV_READ | EV_WRITE); - } - else - { - - } - - - switch (_stream->session_type) - { - case SESSION_PROTO_SSL: - // for SSL, defer dst connection setup to initial_readcb - - thread->stat.value[SSL_NUM]++; - break; - default: - //todo: - stream_fd_readcb(_stream->fd_downstream, 0, _stream); - break; - } - - return; -} - -struct tfe_stream_private * tfe_stream_create(evutil_socket_t fd_downstream, evutil_socket_t fd_upstream, - enum tfe_session_proto session_type, tfe_thread_ctx * thread) -{ - struct tfe_stream_private * conn_private = NULL; - struct tfe_stream * conn_public = NULL; - conn_private = ALLOC(struct tfe_stream_private, 1); - conn_private->session_type = session_type; - conn_private->fd_downstream = fd_downstream; - conn_private->fd_upstream = fd_upstream; - conn_private->thrmgr_ref = thread; - conn_private->is_plugin_opened = 0; - conn_public = &(conn_private->head); - thread->stat.value[STREAM_NUM]++; - return conn_private; -} - -/* vim: set noet ft=c: */