#64 key keeper增加evdnsbase参数。

避免创建大量dnsbase,耗尽fd。ssl stream/tcp stream/proxy也做了相应修改。
This commit is contained in:
zhengchao
2018-11-26 14:54:20 +08:00
parent 35c2559f8a
commit ce9d7fa5eb
13 changed files with 82 additions and 47 deletions

View File

@@ -8,6 +8,8 @@ const char * tfe_proxy_default_conffile();
const char * tfe_proxy_default_logger(); const char * tfe_proxy_default_logger();
unsigned int tfe_proxy_get_work_thread_count(); unsigned int tfe_proxy_get_work_thread_count();
struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id); struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id);
struct evdns_base* tfe_proxy_get_work_thread_dnsbase(unsigned int thread_id);
struct event_base * tfe_proxy_get_gc_evbase(void); struct event_base * tfe_proxy_get_gc_evbase(void);
screen_stat_handle_t tfe_proxy_get_fs_handle(void); screen_stat_handle_t tfe_proxy_get_fs_handle(void);

View File

@@ -1,6 +1,7 @@
#pragma once #pragma once
#include "tfe_future.h" #include "tfe_future.h"
#include "event2/event.h" #include <event2/event.h>
#include <event2/dns.h>
struct tfe_rpc_response_result{ struct tfe_rpc_response_result{
int status_code; int status_code;
@@ -24,4 +25,6 @@ enum TFE_RPC_METHOD
struct tfe_rpc_response_result* tfe_rpc_release(void* result); struct tfe_rpc_response_result* tfe_rpc_release(void* result);
void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag, const char* data, int data_len, struct event_base * evbase); void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag,
const char* data, int data_len, struct event_base * evbase, struct evdns_base* dnsbase);

View File

@@ -178,18 +178,14 @@ char* get_request_url(struct evhttp_uri* uri, int url_len)
} }
//data is for POST. if method is GET, data should be NULL //data is for POST. if method is GET, data should be NULL
void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag, const char* data, int data_len, struct event_base * evbase) void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag, const char* data, int data_len, struct event_base * evbase, struct evdns_base* dnsbase)
{ {
struct promise* p = future_to_promise(f); struct promise* p = future_to_promise(f);
struct tfe_rpc_ctx* ctx = ALLOC(struct tfe_rpc_ctx, 1); struct tfe_rpc_ctx* ctx = ALLOC(struct tfe_rpc_ctx, 1);
ctx->evbase = evbase; ctx->evbase = evbase;
ctx->flag = flag; ctx->flag = flag;
promise_set_ctx(p, (void*)ctx, tfe_rpc_promise_free_ctx); promise_set_ctx(p, (void*)ctx, tfe_rpc_promise_free_ctx);
if(!evbase) assert(evbase&&dnsbase);
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "event base is NULL");
return;
}
struct evhttp_uri* uri = evhttp_uri_parse(url); struct evhttp_uri* uri = evhttp_uri_parse(url);
if(NULL == uri) if(NULL == uri)
{ {
@@ -207,13 +203,7 @@ void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD me
{ {
port = 80; port = 80;
} }
//printf("url:%s host:%s port:%d path:%s query:%s request_url:%s\n", url, host, port, path, query, request_url);
struct evdns_base* dnsbase = evdns_base_new(evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS);
if (!dnsbase)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "create dns base failed!");
return;
}
struct evhttp_connection* connection = evhttp_connection_base_new(evbase, dnsbase, host, port); struct evhttp_connection* connection = evhttp_connection_base_new(evbase, dnsbase, host, port);
if (!connection) if (!connection)
{ {

View File

@@ -9,7 +9,12 @@ struct keyring
X509 *cert; X509 *cert;
STACK_OF(X509) * chain; STACK_OF(X509) * chain;
}; };
struct key_keeper_stat
{
long long ask_times;
long long cache_hit;
long long cached_num;
};
struct key_keeper; struct key_keeper;
struct key_keeper * key_keeper_init(const char * profile, const char* section, void* logger); struct key_keeper * key_keeper_init(const char * profile, const char* section, void* logger);
@@ -20,6 +25,9 @@ struct keyring* key_keeper_release_keyring(future_result_t* result);
void key_keeper_free_keyring(struct keyring* cert); void key_keeper_free_keyring(struct keyring* cert);
void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const char* sni, int keyring_id,
X509 * origin_cert, int is_cert_valid, struct event_base * evbase); void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const char* sni, int keyring_id,
X509 * origin_cert, int is_cert_valid, struct event_base * evbase, struct evdns_base* dnsbase);
void key_keeper_statistic(struct key_keeper *keeper, struct key_keeper_stat* result);

View File

@@ -14,6 +14,7 @@ struct tfe_thread_ctx
unsigned int load; unsigned int load;
struct event_base * evbase; struct event_base * evbase;
struct evdns_base* dnsbase;
unsigned char running; unsigned char running;
unsigned int nr_modules; unsigned int nr_modules;

View File

@@ -15,12 +15,12 @@ void ssl_manager_destroy(struct ssl_mgr * mgr);
struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result); struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result);
struct bufferevent * ssl_upstream_create_result_release_bev(future_result_t * result); struct bufferevent * ssl_upstream_create_result_release_bev(future_result_t * result);
void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream, void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream,
evutil_socket_t fd_downstream, struct event_base * evbase); evutil_socket_t fd_downstream, unsigned int thread_id);
struct ssl_stream * ssl_downstream_create_result_release_stream(future_result_t * result); struct ssl_stream * ssl_downstream_create_result_release_stream(future_result_t * result);
struct bufferevent * ssl_downstream_create_result_release_bev(future_result_t * result); struct bufferevent * ssl_downstream_create_result_release_bev(future_result_t * result);
void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream, void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream,
evutil_socket_t fd_downstream, int keyring_id, struct event_base * evbase); evutil_socket_t fd_downstream, int keyring_id, unsigned int thread_id);
void ssl_stream_free_and_close_fd(struct ssl_stream * stream, struct event_base * evbase, evutil_socket_t fd); void ssl_stream_free_and_close_fd(struct ssl_stream * stream, struct event_base * evbase, evutil_socket_t fd);
void ssl_stream_log_error(struct bufferevent * bev, enum tfe_conn_dir dir, void* logger); void ssl_stream_log_error(struct bufferevent * bev, enum tfe_conn_dir dir, void* logger);

View File

@@ -43,6 +43,8 @@ struct key_keeper
X509* untrusted_ca_cert; X509* untrusted_ca_cert;
EVP_PKEY* untrusted_ca_key; EVP_PKEY* untrusted_ca_key;
unsigned int no_cache; unsigned int no_cache;
struct key_keeper_stat stat;
}; };
@@ -601,7 +603,7 @@ char* url_escape(char* url)
return _url; return _url;
} }
void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const char* sni, int keyring_id, X509 * origin_cert, int is_cert_valid, struct event_base * evbase) void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const char* sni, int keyring_id, X509 * origin_cert, int is_cert_valid, struct event_base * evbase, struct evdns_base* dnsbase)
{ {
struct promise* p = future_to_promise(f); struct promise* p = future_to_promise(f);
unsigned int len = 0; unsigned int len = 0;
@@ -618,12 +620,14 @@ void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const c
ctx->key_len = len; ctx->key_len = len;
promise_set_ctx(p, (void*)ctx, key_keeper_promise_free_ctx); promise_set_ctx(p, (void*)ctx, key_keeper_promise_free_ctx);
long int cb_rtn = 0; long int cb_rtn = 0;
keeper->stat.ask_times++;
if(!keeper->no_cache) if(!keeper->no_cache)
{ {
MESA_htable_search_cb(keeper->cert_cache, (const unsigned char*)(ctx->key), ctx->key_len, keyring_local_cache_query_cb, p, &cb_rtn); MESA_htable_search_cb(keeper->cert_cache, (const unsigned char*)(ctx->key), ctx->key_len, keyring_local_cache_query_cb, p, &cb_rtn);
if(cb_rtn == KEYRING_EXSITED) if(cb_rtn == KEYRING_EXSITED)
{ {
//printf("KEYRING_EXSITED\n"); //printf("KEYRING_EXSITED\n");
keeper->stat.cache_hit++;
return; return;
} }
} }
@@ -644,7 +648,7 @@ void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const c
promise_failed(p, FUTURE_ERROR_EXCEPTION, "url escape failed"); promise_failed(p, FUTURE_ERROR_EXCEPTION, "url escape failed");
break; break;
} }
struct future* f_certstore_rpc = future_create("tfe_rpc", certstore_rpc_on_succ, certstore_rpc_on_fail, p); struct future* f_certstore_rpc = future_create("crt_store", certstore_rpc_on_succ, certstore_rpc_on_fail, p);
ctx->f_certstore_rpc = f_certstore_rpc; ctx->f_certstore_rpc = f_certstore_rpc;
char *url = NULL; char *url = NULL;
@@ -661,7 +665,7 @@ void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const c
} }
TFE_LOG_DEBUG(keeper->logger, "CertStore query: %.100s", url); TFE_LOG_DEBUG(keeper->logger, "CertStore query: %.100s", url);
curl_free(escaped_origin_cert_pem); curl_free(escaped_origin_cert_pem);
tfe_rpc_async_ask(f_certstore_rpc, url, GET, DONE_CB, NULL, 0, evbase); tfe_rpc_async_ask(f_certstore_rpc, url, GET, DONE_CB, NULL, 0, evbase, dnsbase);
free(url); free(url);
break; break;
} }
@@ -699,3 +703,10 @@ void key_keeper_async_ask(struct future * f, struct key_keeper * keeper, const c
} }
return; return;
} }
void key_keeper_statistic(struct key_keeper *keeper, struct key_keeper_stat* result)
{
keeper->stat.cached_num=MESA_htable_get_elem_num(keeper->cert_cache);
*result=keeper->stat;
return;
}

View File

@@ -249,14 +249,13 @@ void __kni_event_cb(evutil_socket_t fd, short what, void * user)
} }
__cmsghdr = CMSG_FIRSTHDR(&__msghdr); __cmsghdr = CMSG_FIRSTHDR(&__msghdr);
__fds = (int *) (CMSG_DATA(__cmsghdr));
if (unlikely(__cmsghdr == NULL)) if (unlikely(__cmsghdr == NULL))
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at fetch CMSG_FIRSTHDR() from incoming fds, close KNI connection."); TFE_LOG_ERROR(__ctx->logger, "Failed at fetch CMSG_FIRSTHDR() from incoming fds, close KNI connection.");
goto __close_kni_connection; goto __close_kni_connection;
} }
__fds = (int *) (CMSG_DATA(__cmsghdr));
if (unlikely(__fds == NULL)) if (unlikely(__fds == NULL))
{ {
TFE_LOG_ERROR(__ctx->logger, "Failed at fetch CMSG_DATA() from incoming fds, close KNI connection."); TFE_LOG_ERROR(__ctx->logger, "Failed at fetch CMSG_DATA() from incoming fds, close KNI connection.");

View File

@@ -16,6 +16,7 @@
#include <pthread.h> #include <pthread.h>
#include <event2/event.h> #include <event2/event.h>
#include <event2/dns.h>
#include <event2/listener.h> #include <event2/listener.h>
#include <event2/bufferevent.h> #include <event2/bufferevent.h>
#include <event2/bufferevent_ssl.h> #include <event2/bufferevent_ssl.h>
@@ -210,7 +211,8 @@ void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy)
{ {
proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1); proxy->work_threads[i] = ALLOC(struct tfe_thread_ctx, 1);
proxy->work_threads[i]->thread_id = i; proxy->work_threads[i]->thread_id = i;
proxy->work_threads[i]->evbase = event_base_new(); proxy->work_threads[i]->evbase = event_base_new();
proxy->work_threads[i]->dnsbase = evdns_base_new(proxy->work_threads[i]->evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS);
} }
return; return;
} }
@@ -414,6 +416,12 @@ struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id)
assert(thread_id < g_default_proxy->nr_work_threads); assert(thread_id < g_default_proxy->nr_work_threads);
return g_default_proxy->work_threads[thread_id]->evbase; return g_default_proxy->work_threads[thread_id]->evbase;
} }
struct evdns_base* tfe_proxy_get_work_thread_dnsbase(unsigned int thread_id)
{
assert(thread_id < g_default_proxy->nr_work_threads);
return g_default_proxy->work_threads[thread_id]->dnsbase;
}
struct event_base * tfe_proxy_get_gc_evbase(void) struct event_base * tfe_proxy_get_gc_evbase(void)
{ {

View File

@@ -90,6 +90,9 @@ enum ssl_stream_stat
SSL_NO_CHELLO, SSL_NO_CHELLO,
SSL_NO_SNI, SSL_NO_SNI,
SSL_FAKE_CRT, SSL_FAKE_CRT,
KEY_KEEPER_CACHE_SIZE,
KEY_KEEPER_ASK,
KEY_KEEPER_HIT,
SSL_STAT_MAX SSL_STAT_MAX
}; };
@@ -193,20 +196,20 @@ struct ssl_connect_server_ctx
evutil_socket_t fd_upstream; evutil_socket_t fd_upstream;
evutil_socket_t fd_downstream; evutil_socket_t fd_downstream;
struct event_base * evbase; unsigned int thread_id;
struct future * f_peek_chello; struct future * f_peek_chello;
struct timespec start,end; struct timespec start,end;
}; };
struct ssl_connect_client_ctx struct ssl_connect_client_ctx
{ {
unsigned int thread_id;
int keyring_id; int keyring_id;
struct ssl_stream * origin_ssl; struct ssl_stream * origin_ssl;
X509 * origin_crt; X509 * origin_crt;
int is_origin_crt_verify_passed; int is_origin_crt_verify_passed;
struct ssl_mgr * ssl_mgr; struct ssl_mgr * ssl_mgr;
evutil_socket_t fd_downstream; evutil_socket_t fd_downstream;
struct event_base * evbase;
struct future * f_ask_keyring; struct future * f_ask_keyring;
struct bufferevent * bev_down; struct bufferevent * bev_down;
@@ -241,6 +244,12 @@ ssl_stream_gc_cb(evutil_socket_t fd, short what, void * arg)
int i=0; int i=0;
ssl_sess_cache_stat(mgr->up_sess_cache, &(mgr->stat_val[SSL_UP_CACHE_SZ]), &(mgr->stat_val[SSL_UP_CACHE_QUERY]), &(mgr->stat_val[SSL_UP_CACHE_HIT])); ssl_sess_cache_stat(mgr->up_sess_cache, &(mgr->stat_val[SSL_UP_CACHE_SZ]), &(mgr->stat_val[SSL_UP_CACHE_QUERY]), &(mgr->stat_val[SSL_UP_CACHE_HIT]));
ssl_sess_cache_stat(mgr->down_sess_cache, &(mgr->stat_val[SSL_DOWN_CACHE_SZ]), &(mgr->stat_val[SSL_DOWN_CACHE_QUERY]), &(mgr->stat_val[SSL_DOWN_CACHE_HIT])); ssl_sess_cache_stat(mgr->down_sess_cache, &(mgr->stat_val[SSL_DOWN_CACHE_SZ]), &(mgr->stat_val[SSL_DOWN_CACHE_QUERY]), &(mgr->stat_val[SSL_DOWN_CACHE_HIT]));
struct key_keeper_stat keeper_stat;
key_keeper_statistic(mgr->key_keeper, &keeper_stat);
mgr->stat_val[KEY_KEEPER_ASK]=keeper_stat.ask_times;
mgr->stat_val[KEY_KEEPER_HIT]=keeper_stat.cache_hit;
mgr->stat_val[KEY_KEEPER_CACHE_SIZE]=keeper_stat.cached_num;
for(i=0;i<SSL_STAT_MAX;i++) for(i=0;i<SSL_STAT_MAX;i++)
{ {
FS_operate(mgr->fs_handle, mgr->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(mgr->stat_val[i]))); FS_operate(mgr->fs_handle, mgr->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(mgr->stat_val[i])));
@@ -283,6 +292,9 @@ void ssl_stat_init(struct ssl_mgr * mgr)
spec[SSL_NO_CHELLO]="ssl_no_chlo"; spec[SSL_NO_CHELLO]="ssl_no_chlo";
spec[SSL_NO_SNI]="ssl_no_sni"; spec[SSL_NO_SNI]="ssl_no_sni";
spec[SSL_FAKE_CRT]="ssl_fk_crt"; spec[SSL_FAKE_CRT]="ssl_fk_crt";
spec[KEY_KEEPER_ASK]="kyr_ask";
spec[KEY_KEEPER_HIT]="kyr_hit";
spec[KEY_KEEPER_CACHE_SIZE]="kyr_cache";
for(i=0;i<SSL_STAT_MAX;i++) for(i=0;i<SSL_STAT_MAX;i++)
{ {
@@ -1034,6 +1046,7 @@ static void peek_chello_on_succ(future_result_t * result, void * user)
{ {
struct promise * p = (struct promise *) user; struct promise * p = (struct promise *) user;
struct ssl_connect_server_ctx * ctx = (struct ssl_connect_server_ctx *) promise_get_ctx(p); struct ssl_connect_server_ctx * ctx = (struct ssl_connect_server_ctx *) promise_get_ctx(p);
struct event_base* evbase=tfe_proxy_get_work_thread_evbase(ctx->thread_id);
struct ssl_chello * chello = ssl_peek_result_release_chello(result);//chello has been saved in ssl_stream. struct ssl_chello * chello = ssl_peek_result_release_chello(result);//chello has been saved in ssl_stream.
if(chello->sni==NULL) if(chello->sni==NULL)
@@ -1042,7 +1055,7 @@ static void peek_chello_on_succ(future_result_t * result, void * user)
} }
clock_gettime(CLOCK_MONOTONIC, &(ctx->start)); clock_gettime(CLOCK_MONOTONIC, &(ctx->start));
ctx->s_stream = ssl_stream_new(ctx->mgr, ctx->fd_upstream, CONN_DIR_UPSTREAM, chello, NULL, NULL); ctx->s_stream = ssl_stream_new(ctx->mgr, ctx->fd_upstream, CONN_DIR_UPSTREAM, chello, NULL, NULL);
ctx->bev = bufferevent_openssl_socket_new(ctx->evbase, ctx->fd_upstream, ctx->bev = bufferevent_openssl_socket_new(evbase, ctx->fd_upstream,
ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS); ctx->s_stream->ssl, BUFFEREVENT_SSL_CONNECTING, BEV_OPT_DEFER_CALLBACKS);
bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev, 1); bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev, 1);
@@ -1065,7 +1078,7 @@ static void peek_chello_on_fail(enum e_future_error err, const char * what, void
} }
void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream, void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_socket_t fd_upstream,
evutil_socket_t fd_downstream, struct event_base * evbase) evutil_socket_t fd_downstream, unsigned int thread_id)
{ {
struct promise * p = future_to_promise(f); struct promise * p = future_to_promise(f);
struct ssl_connect_server_ctx * ctx = ALLOC(struct ssl_connect_server_ctx, 1); struct ssl_connect_server_ctx * ctx = ALLOC(struct ssl_connect_server_ctx, 1);
@@ -1079,10 +1092,10 @@ void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, evutil_s
promise_failed(p, FUTURE_ERROR_EXCEPTION, "upstream fd closed"); promise_failed(p, FUTURE_ERROR_EXCEPTION, "upstream fd closed");
return; return;
} }
struct event_base* evbase=tfe_proxy_get_work_thread_evbase(thread_id);
ctx->fd_downstream = fd_downstream; ctx->fd_downstream = fd_downstream;
ctx->fd_upstream = fd_upstream; ctx->fd_upstream = fd_upstream;
ctx->evbase = evbase; ctx->thread_id = thread_id;
ctx->mgr = mgr; ctx->mgr = mgr;
promise_set_ctx(p, ctx, wrap_ssl_connect_server_ctx_free); promise_set_ctx(p, ctx, wrap_ssl_connect_server_ctx_free);
@@ -1498,16 +1511,14 @@ void ask_keyring_on_succ(void * result, void * user)
struct keyring * kyr = NULL; struct keyring * kyr = NULL;
struct ssl_mgr * mgr = ctx->ssl_mgr; struct ssl_mgr * mgr = ctx->ssl_mgr;
struct event_base* evbase=tfe_proxy_get_work_thread_evbase(ctx->thread_id);
kyr = key_keeper_release_keyring(result); //kyr will be freed at ssl downstream closing. kyr = key_keeper_release_keyring(result); //kyr will be freed at ssl downstream closing.
clock_gettime(CLOCK_MONOTONIC, &(ctx->start)); clock_gettime(CLOCK_MONOTONIC, &(ctx->start));
ctx->downstream = ssl_stream_new(mgr, ctx->fd_downstream, CONN_DIR_DOWNSTREAM, NULL, kyr, ctx->downstream = ssl_stream_new(mgr, ctx->fd_downstream, CONN_DIR_DOWNSTREAM, NULL, kyr,
ctx->origin_ssl?ctx->origin_ssl->alpn_selected:NULL); ctx->origin_ssl?ctx->origin_ssl->alpn_selected:NULL);
ctx->bev_down = bufferevent_openssl_socket_new(ctx->evbase, ctx->fd_downstream, ctx->downstream->ssl, ctx->bev_down = bufferevent_openssl_socket_new(evbase, ctx->fd_downstream, ctx->downstream->ssl,
BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS); BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_DEFER_CALLBACKS);
bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev_down, 1); bufferevent_openssl_set_allow_dirty_shutdown(ctx->bev_down, 1);
@@ -1532,7 +1543,7 @@ void ask_keyring_on_fail(enum e_future_error error, const char * what, void * us
* Create a SSL stream for the incoming connection, based on the upstream. * Create a SSL stream for the incoming connection, based on the upstream.
*/ */
void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream, void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct ssl_stream * upstream,
evutil_socket_t fd_downstream, int keyring_id, struct event_base * evbase) evutil_socket_t fd_downstream, int keyring_id, unsigned int thread_id)
{ {
assert(upstream->dir == CONN_DIR_UPSTREAM); assert(upstream->dir == CONN_DIR_UPSTREAM);
@@ -1541,7 +1552,10 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct
ctx->keyring_id = keyring_id; ctx->keyring_id = keyring_id;
ctx->ssl_mgr = mgr; ctx->ssl_mgr = mgr;
ctx->fd_downstream = fd_downstream; ctx->fd_downstream = fd_downstream;
ctx->evbase = evbase; ctx->thread_id = thread_id;
struct event_base * evbase=tfe_proxy_get_work_thread_evbase(thread_id);
struct evdns_base* dnsbase=tfe_proxy_get_work_thread_dnsbase(thread_id);
if (upstream != NULL) if (upstream != NULL)
{ {
ctx->origin_ssl = upstream; ctx->origin_ssl = upstream;
@@ -1555,7 +1569,7 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct
ctx->f_ask_keyring = future_create("ask_kyr",ask_keyring_on_succ, ask_keyring_on_fail, p); ctx->f_ask_keyring = future_create("ask_kyr",ask_keyring_on_succ, ask_keyring_on_fail, p);
ctx->is_origin_crt_verify_passed = upstream->is_peer_cert_verify_passed; ctx->is_origin_crt_verify_passed = upstream->is_peer_cert_verify_passed;
key_keeper_async_ask(ctx->f_ask_keyring, mgr->key_keeper, sni, keyring_id, ctx->origin_crt, ctx->is_origin_crt_verify_passed, key_keeper_async_ask(ctx->f_ask_keyring, mgr->key_keeper, sni, keyring_id, ctx->origin_crt, ctx->is_origin_crt_verify_passed,
evbase); evbase, dnsbase);
return; return;
} }

View File

@@ -763,7 +763,6 @@ void ssl_downstream_create_on_fail(enum e_future_error err, const char * what, v
void ssl_upstream_create_on_success(future_result_t * result, void * user) void ssl_upstream_create_on_success(future_result_t * result, void * user)
{ {
struct tfe_stream_private * _stream = (struct tfe_stream_private *) user; struct tfe_stream_private * _stream = (struct tfe_stream_private *) user;
struct event_base * ev_base = _stream->thread_ref->evbase;
struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result); struct ssl_stream * upstream = ssl_upstream_create_result_release_stream(result);
struct bufferevent * bev = ssl_upstream_create_result_release_bev(result); struct bufferevent * bev = ssl_upstream_create_result_release_bev(result);
@@ -785,7 +784,7 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user)
ssl_downstream_create_on_fail, _stream); ssl_downstream_create_on_fail, _stream);
ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr, ssl_async_downstream_create(_stream->future_downstream_create, _stream->ssl_mgr,
_stream->ssl_upstream, _stream->defer_fd_downstream, _stream->keyring_id, ev_base); _stream->ssl_upstream, _stream->defer_fd_downstream, _stream->keyring_id, _stream->thread_ref->thread_id);
} }
void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user) void ssl_upstream_create_on_fail(enum e_future_error err, const char * what, void * user)
@@ -1021,7 +1020,6 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
{ {
struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head); struct tfe_stream_private * _stream = container_of(stream, struct tfe_stream_private, head);
struct event_base * ev_base = _stream->thread_ref->evbase;
_stream->defer_fd_downstream = fd_downstream; _stream->defer_fd_downstream = fd_downstream;
_stream->defer_fd_upstream = fd_upstream; _stream->defer_fd_upstream = fd_upstream;
@@ -1074,7 +1072,7 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
/* Defer setup conn_downstream & conn_upstream in async callbacks. */ /* Defer setup conn_downstream & conn_upstream in async callbacks. */
ssl_async_upstream_create(_stream->future_upstream_create, ssl_async_upstream_create(_stream->future_upstream_create,
_stream->ssl_mgr, fd_upstream, fd_downstream, ev_base); _stream->ssl_mgr, fd_upstream, fd_downstream, _stream->thread_ref->thread_id);
TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1); TFE_PROXY_STAT_INCREASE(STAT_STREAM_TCP_SSL, 1);
} }

View File

@@ -27,7 +27,7 @@ int main()
printf("-------------------------------\n"); printf("-------------------------------\n");
int i = 0; int i = 0;
printf("call key_keeper_async_ask, i = %d\n", i); printf("call key_keeper_async_ask, i = %d\n", i);
key_keeper_async_ask(f, keeper, "www.baidu.com", 1, origin_cert, 1, NULL); key_keeper_async_ask(f, keeper, "www.baidu.com", 1, origin_cert, 1, NULL, NULL);
X509_free(origin_cert); X509_free(origin_cert);
key_keeper_destroy(keeper); key_keeper_destroy(keeper);
future_destroy(f); future_destroy(f);

View File

@@ -234,12 +234,13 @@ int main()
MESA_load_profile_string_def(profile, section, "cert_store_host", cert_store_host, sizeof(cert_store_host), "xxxxx"); MESA_load_profile_string_def(profile, section, "cert_store_host", cert_store_host, sizeof(cert_store_host), "xxxxx");
MESA_load_profile_uint_def(profile, section, "cert_store_port", &cert_store_port, 80); MESA_load_profile_uint_def(profile, section, "cert_store_port", &cert_store_port, 80);
struct event_base* evbase = event_base_new(); struct event_base* evbase = event_base_new();
struct evdns_base* dnsbase=evdns_base_new(evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS);
struct future* f_tfe_rpc = future_create("tfe_rpc", tfe_rpc_on_succ, tfe_rpc_on_fail, NULL); struct future* f_tfe_rpc = future_create("tfe_rpc", tfe_rpc_on_succ, tfe_rpc_on_fail, NULL);
char url[TFE_STRING_MAX]; char url[TFE_STRING_MAX];
char sni[TFE_STRING_MAX] = "www.baidu.com"; char sni[TFE_STRING_MAX] = "www.baidu.com";
snprintf(url, TFE_STRING_MAX, "http://%s:%d/ca?host=%s&flag=1&valid=1&kering_id=%d", cert_store_host, cert_store_port, sni, keyring_id); snprintf(url, TFE_STRING_MAX, "http://%s:%d/ca?host=%s&flag=1&valid=1&kering_id=%d", cert_store_host, cert_store_port, sni, keyring_id);
printf("url is %s\n", url); printf("url is %s\n", url);
tfe_rpc_async_ask(f_tfe_rpc, url, GET, DONE_CB, NULL, 0, evbase); tfe_rpc_async_ask(f_tfe_rpc, url, GET, DONE_CB, NULL, 0, evbase, dnsbase);
event_base_dispatch(evbase); event_base_dispatch(evbase);
} }