增加缓存并发查询和更新的统计,并增加熔断机制。

This commit is contained in:
zhengchao
2018-10-21 20:09:23 +08:00
parent c5f5ee2655
commit 9290dd0e0f
3 changed files with 54 additions and 8 deletions

View File

@@ -137,7 +137,7 @@ void tfe_proxy_free(tfe_proxy * ctx)
static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg)
{ {
printf("%s alive\n",__FUNCTION__); //printf("%s alive\n",__FUNCTION__);
return; return;
} }
@@ -191,7 +191,7 @@ static void * tfe_work_thread(void * arg)
ctx->running = 1; ctx->running = 1;
__currect_thread_id = ctx->thread_id; __currect_thread_id = ctx->thread_id;
char thread_name[16]; char thread_name[16];
snprintf(thread_name, sizeof(thread_name), "tfe:worker%d", ctx->thread_id); snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id);
prctl(PR_SET_NAME,(unsigned long long)thread_name,NULL,NULL,NULL); prctl(PR_SET_NAME,(unsigned long long)thread_name,NULL,NULL,NULL);
TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id); TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id);

View File

@@ -854,7 +854,15 @@ void ssl_stream_log_error(struct bufferevent * bev, enum tfe_conn_dir dir, void*
unsigned long sslerr=0; unsigned long sslerr=0;
int fd=bufferevent_getfd(bev); int fd=bufferevent_getfd(bev);
struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, dir); struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, dir);
char* addr_string=tfe_stream_addr_to_str(addr); char* addr_string=NULL;
if(addr)
{
addr_string=tfe_stream_addr_to_str(addr);
}
else
{
addr_string=tfe_strdup("null");
}
/* Can happen for socket errs, ssl errs; /* Can happen for socket errs, ssl errs;
* may happen for unclean ssl socket shutdowns. */ * may happen for unclean ssl socket shutdowns. */
@@ -1046,7 +1054,12 @@ extern void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, e
ctx->addrlen = sizeof(ctx->addr); ctx->addrlen = sizeof(ctx->addr);
ret = getpeername(fd_upstream, (struct sockaddr *)&(ctx->addr), &(ctx->addrlen)); ret = getpeername(fd_upstream, (struct sockaddr *)&(ctx->addr), &(ctx->addrlen));
assert(ret == 0); if(ret!=0)
{
ssl_connect_server_ctx_free(ctx);
promise_failed(p, FUTURE_ERROR_EXCEPTION, "upstream fd closed");
return;
}
ctx->fd_downstream = fd_downstream; ctx->fd_downstream = fd_downstream;
ctx->fd_upstream = fd_upstream; ctx->fd_upstream = fd_upstream;

View File

@@ -17,18 +17,22 @@ enum cache_stat_field
{ {
STAT_CACHE_QUERY, STAT_CACHE_QUERY,
STAT_CACHE_QUERY_NOT_APPLICABLE, STAT_CACHE_QUERY_NOT_APPLICABLE,
STAT_CACHE_QUERY_VERIFY,
STAT_CACHE_QUERY_HIT, STAT_CACHE_QUERY_HIT,
STAT_CACHE_QUERY_BYTES, STAT_CACHE_QUERY_BYTES,
STAT_CACHE_OVERRIDE_QUERY, STAT_CACHE_OVERRIDE_QUERY,
STAT_CACHE_OVERRIDE_HIT, STAT_CACHE_OVERRIDE_HIT,
STAT_CACHE_OVERRIDE_BYTES, STAT_CACHE_OVERRIDE_BYTES,
STAT_CACHE_QUERY_ERR, STAT_CACHE_QUERY_ERR,
STAT_CACHE_QUERY_ABANDON,
STAT_CACHE_QUERYING,
STAT_CACHE_UPLOAD_CNT, STAT_CACHE_UPLOAD_CNT,
STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_OVERRIDE,
STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_FORBIDEN,
STAT_CACHE_UPLOAD_ABANDON, STAT_CACHE_UPLOAD_ABANDON,
STAT_CACHE_UPLOAD_ERR, STAT_CACHE_UPLOAD_ERR,
STAT_CACHE_UPLOAD_BYTES, STAT_CACHE_UPLOAD_BYTES,
STAT_CACHE_UPLOADING,
STAT_CACHE_MEMORY, STAT_CACHE_MEMORY,
STAT_CACHE_ACTIVE_SESSION, STAT_CACHE_ACTIVE_SESSION,
@@ -47,6 +51,9 @@ struct cache_handle
int minimum_cache_seconds; int minimum_cache_seconds;
struct tango_cache_instance **clients; struct tango_cache_instance **clients;
long long get_concurrency_max;
long long put_concurrency_max;
screen_stat_handle_t fs_handle; screen_stat_handle_t fs_handle;
long long stat_val[__CACHE_STAT_MAX]; long long stat_val[__CACHE_STAT_MAX];
int fs_id[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX];
@@ -144,18 +151,22 @@ void cache_stat_init(struct cache_handle* cache)
set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_get",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_get",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "get_not_allow",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "get_not_allow",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERY_VERIFY], "get_verify",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "get_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "get_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERY_ABANDON], "get_abandon",FS_STYLE_STATUS, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "put_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "put_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "put(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "put(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_UPLOADING], "putting",FS_STYLE_STATUS, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT);
@@ -215,6 +226,11 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
goto error_out; goto error_out;
} }
} }
MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000);
cache->put_concurrency_max=temp;
MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000);
cache->put_concurrency_max=temp;
MESA_load_profile_int_def(profile_path, section, "query_undefined_obj", &(cache->query_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "query_undefined_obj", &(cache->query_undefined_obj_enabled), 1);
MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1);
MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024);
@@ -334,6 +350,7 @@ static void wrap_cache_query_on_succ(future_result_t * result, void * user)
//last call. //last call.
promise_dettach_ctx(p); promise_dettach_ctx(p);
cache_query_ctx_free_cb(ctx); cache_query_ctx_free_cb(ctx);
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING]));
break; break;
case RESULT_TYPE_BODY: case RESULT_TYPE_BODY:
ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size); ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size);
@@ -350,6 +367,7 @@ static void wrap_cache_query_on_fail(enum e_future_error err, const char * what,
struct promise * p = (struct promise *) user; struct promise * p = (struct promise *) user;
struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p);
promise_failed(p, err, what); promise_failed(p, err, what);
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING]));
cache_query_ctx_free_cb(ctx); cache_query_ctx_free_cb(ctx);
return; return;
} }
@@ -375,17 +393,24 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig
is_undefined_obj=1; is_undefined_obj=1;
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY]));
break; break;
case VERIFY:
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_VERIFY]));
return WEB_CACHE_NOT_APPLICABLE;
case FORBIDDEN: case FORBIDDEN:
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE]));
return WEB_CACHE_NOT_APPLICABLE; return WEB_CACHE_NOT_APPLICABLE;
case VERIFY:
case ALLOWED: case ALLOWED:
break; break;
default: default:
assert(0); assert(0);
return WEB_CACHE_NOT_APPLICABLE; return WEB_CACHE_NOT_APPLICABLE;
} }
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max)))
{
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_ABANDON]));
return WEB_CACHE_NOT_APPLICABLE;
}
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING]));
struct tango_cache_meta_get meta; struct tango_cache_meta_get meta;
memset(&meta, 0, sizeof(meta)); memset(&meta, 0, sizeof(meta));
meta.url=request->req_spec.url; meta.url=request->req_spec.url;
@@ -394,6 +419,7 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig
query_ctx->ref_handle=handle; query_ctx->ref_handle=handle;
query_ctx->url=tfe_strdup(request->req_spec.url); query_ctx->url=tfe_strdup(request->req_spec.url);
query_ctx->is_undefined_obj=is_undefined_obj; query_ctx->is_undefined_obj=is_undefined_obj;
p=future_to_promise(f); p=future_to_promise(f);
promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb);
query_ctx->f_tango_cache_fetch=future_create("_cache_get", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p); query_ctx->f_tango_cache_fetch=future_create("_cache_get", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p);
@@ -465,6 +491,12 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
assert(0); assert(0);
break; break;
} }
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_UPLOADING])) > handle->get_concurrency_max)
{
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON]));
return NULL;
}
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING]));
struct tango_cache_meta_put meta; struct tango_cache_meta_put meta;
memset(&meta, 0, sizeof(meta)); memset(&meta, 0, sizeof(meta));
meta.url=session->req->req_spec.url; meta.url=session->req->req_spec.url;
@@ -508,6 +540,7 @@ void web_cache_update_end(struct cache_update_context* ctx)
{ {
tango_cache_update_end(ctx->write_ctx); tango_cache_update_end(ctx->write_ctx);
free(ctx); free(ctx);
ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING]));
return; return;
} }