diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 4c9e084..c1fa512 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -137,7 +137,7 @@ void tfe_proxy_free(tfe_proxy * ctx) static void __dummy_event_handler(evutil_socket_t fd, short what, void * arg) { - printf("%s alive\n",__FUNCTION__); + //printf("%s alive\n",__FUNCTION__); return; } @@ -191,7 +191,7 @@ static void * tfe_work_thread(void * arg) ctx->running = 1; __currect_thread_id = ctx->thread_id; char thread_name[16]; - snprintf(thread_name, sizeof(thread_name), "tfe:worker%d", ctx->thread_id); + snprintf(thread_name, sizeof(thread_name), "tfe:worker-%d", ctx->thread_id); prctl(PR_SET_NAME,(unsigned long long)thread_name,NULL,NULL,NULL); TFE_LOG_INFO(g_default_logger, "Work thread %u is running...", ctx->thread_id); diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index d690468..6a17ec1 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -854,7 +854,15 @@ void ssl_stream_log_error(struct bufferevent * bev, enum tfe_conn_dir dir, void* unsigned long sslerr=0; int fd=bufferevent_getfd(bev); struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, dir); - char* addr_string=tfe_stream_addr_to_str(addr); + char* addr_string=NULL; + if(addr) + { + addr_string=tfe_stream_addr_to_str(addr); + } + else + { + addr_string=tfe_strdup("null"); + } /* Can happen for socket errs, ssl errs; * may happen for unclean ssl socket shutdowns. */ @@ -1046,7 +1054,12 @@ extern void ssl_async_upstream_create(struct future * f, struct ssl_mgr * mgr, e ctx->addrlen = sizeof(ctx->addr); ret = getpeername(fd_upstream, (struct sockaddr *)&(ctx->addr), &(ctx->addrlen)); - assert(ret == 0); + if(ret!=0) + { + ssl_connect_server_ctx_free(ctx); + promise_failed(p, FUTURE_ERROR_EXCEPTION, "upstream fd closed"); + return; + } ctx->fd_downstream = fd_downstream; ctx->fd_upstream = fd_upstream; diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index 1a28b01..a3112eb 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -17,18 +17,22 @@ enum cache_stat_field { STAT_CACHE_QUERY, STAT_CACHE_QUERY_NOT_APPLICABLE, + STAT_CACHE_QUERY_VERIFY, STAT_CACHE_QUERY_HIT, STAT_CACHE_QUERY_BYTES, STAT_CACHE_OVERRIDE_QUERY, STAT_CACHE_OVERRIDE_HIT, STAT_CACHE_OVERRIDE_BYTES, STAT_CACHE_QUERY_ERR, + STAT_CACHE_QUERY_ABANDON, + STAT_CACHE_QUERYING, STAT_CACHE_UPLOAD_CNT, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_ABANDON, STAT_CACHE_UPLOAD_ERR, STAT_CACHE_UPLOAD_BYTES, + STAT_CACHE_UPLOADING, STAT_CACHE_MEMORY, STAT_CACHE_ACTIVE_SESSION, @@ -46,7 +50,10 @@ struct cache_handle size_t cache_undefined_obj_min_size; int minimum_cache_seconds; struct tango_cache_instance **clients; - + + long long get_concurrency_max; + long long put_concurrency_max; + screen_stat_handle_t fs_handle; long long stat_val[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX]; @@ -144,18 +151,22 @@ void cache_stat_init(struct cache_handle* cache) set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_get",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "get_not_allow",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_VERIFY], "get_verify",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "get_err",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_ABANDON], "get_abandon",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "put_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "put(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOADING], "putting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); @@ -215,6 +226,11 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha goto error_out; } } + + MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000); + cache->put_concurrency_max=temp; + MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000); + cache->put_concurrency_max=temp; MESA_load_profile_int_def(profile_path, section, "query_undefined_obj", &(cache->query_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); @@ -334,6 +350,7 @@ static void wrap_cache_query_on_succ(future_result_t * result, void * user) //last call. promise_dettach_ctx(p); cache_query_ctx_free_cb(ctx); + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING])); break; case RESULT_TYPE_BODY: ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size); @@ -349,7 +366,8 @@ static void wrap_cache_query_on_fail(enum e_future_error err, const char * what, { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); - promise_failed(p, err, what); + promise_failed(p, err, what); + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING])); cache_query_ctx_free_cb(ctx); return; } @@ -375,17 +393,24 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig is_undefined_obj=1; ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); break; + case VERIFY: + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_VERIFY])); + return WEB_CACHE_NOT_APPLICABLE; case FORBIDDEN: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); return WEB_CACHE_NOT_APPLICABLE; - case VERIFY: case ALLOWED: break; default: assert(0); return WEB_CACHE_NOT_APPLICABLE; } - + if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max))) + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_ABANDON])); + return WEB_CACHE_NOT_APPLICABLE; + } + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING])); struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; @@ -394,6 +419,7 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig query_ctx->ref_handle=handle; query_ctx->url=tfe_strdup(request->req_spec.url); query_ctx->is_undefined_obj=is_undefined_obj; + p=future_to_promise(f); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); query_ctx->f_tango_cache_fetch=future_create("_cache_get", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p); @@ -465,6 +491,12 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, assert(0); break; } + if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_UPLOADING])) > handle->get_concurrency_max) + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON])); + return NULL; + } + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=session->req->req_spec.url; @@ -508,6 +540,7 @@ void web_cache_update_end(struct cache_update_context* ctx) { tango_cache_update_end(ctx->write_ctx); free(ctx); + ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); return; }