统一cache的动作名称:write、read、pend。
This commit is contained in:
@@ -24,36 +24,36 @@ extern "C"
|
||||
|
||||
enum cache_stat_field
|
||||
{
|
||||
STAT_CACHE_QUERY,
|
||||
STAT_CACHE_QUERY_FORBIDDEN,
|
||||
STAT_CACHE_QUERY_VERIFY,
|
||||
STAT_CACHE_QUERY_HIT,
|
||||
STAT_CACHE_QUERY_BYTES,
|
||||
STAT_CACHE_OVERRIDE_QUERY,
|
||||
STAT_CACHE_OVERRIDE_HIT,
|
||||
STAT_CACHE_OVERRIDE_BYTES,
|
||||
STAT_CACHE_QUERY_ERR,
|
||||
STAT_CACHE_QUERY_ABANDON,
|
||||
STAT_CACHE_QUERYING,
|
||||
STAT_CACHE_READ,
|
||||
STAT_CACHE_PEND_FORBIDDEN,
|
||||
STAT_CACHE_READ_VERIFY,
|
||||
STAT_CACHE_READ_HIT,
|
||||
STAT_CACHE_READ_BYTES,
|
||||
STAT_CACHE_OVERRIDE_READ,
|
||||
STAT_CACHE_OVERRIDE_READ_HIT,
|
||||
STAT_CACHE_OVERRIDE_READ_BYTES,
|
||||
STAT_CACHE_READ_ERR,
|
||||
STAT_CACHE_READ_THROTTLE,
|
||||
STAT_CACHE_READING,
|
||||
STAT_CACHE_PENDING,
|
||||
STAT_CACHE_POLICY_MATCH,
|
||||
STAT_CACHE_POLICY_EFFECT,
|
||||
STAT_CACHE_UPLOAD_CNT,
|
||||
STAT_CACHE_UPLOAD_BYPASS,
|
||||
STAT_CACHE_UPLOAD_OVERRIDE,
|
||||
STAT_CACHE_UPLOAD_FORBIDEN,
|
||||
STAT_CACHE_UPLOAD_ABANDON,
|
||||
STAT_CACHE_UPLOAD_CANCEL,
|
||||
STAT_CACHE_UPLOAD_ERR,
|
||||
STAT_CACHE_UPLOAD_BYTES,
|
||||
STAT_CACHE_UPLOADING,
|
||||
STAT_CACHE_WRITE_CNT,
|
||||
STAT_CACHE_WRITE_BYPASS,
|
||||
STAT_CACHE_OVERRIDE_WRITE,
|
||||
STAT_CACHE_WRITE_FORBIDEN,
|
||||
STAT_CACHE_WRITE_THROTTLE,
|
||||
STAT_CACHE_WRITE_CANCEL,
|
||||
STAT_CACHE_WRITE_ERR,
|
||||
STAT_CACHE_WRITE_BYTES,
|
||||
STAT_CACHE_WRITING,
|
||||
STAT_CACHE_MEMORY,
|
||||
STAT_CACHE_ACTIVE_SESSION,
|
||||
|
||||
STAT_CACHE_QUERY_HIT_OJB_SIZE,
|
||||
STAT_CACHE_UPLOAD_OBJ_SIZE,
|
||||
STAT_CACHE_WRITE_OBJ_SIZE,
|
||||
STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE,
|
||||
STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE,
|
||||
STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE,
|
||||
__CACHE_STAT_MAX
|
||||
};
|
||||
|
||||
@@ -153,9 +153,9 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg)
|
||||
{
|
||||
switch(i)
|
||||
{
|
||||
case STAT_CACHE_UPLOAD_BYTES:
|
||||
case STAT_CACHE_QUERY_BYTES:
|
||||
case STAT_CACHE_OVERRIDE_BYTES:
|
||||
case STAT_CACHE_WRITE_BYTES:
|
||||
case STAT_CACHE_READ_BYTES:
|
||||
case STAT_CACHE_OVERRIDE_READ_BYTES:
|
||||
//translate bytes to mega bytes.
|
||||
FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024));
|
||||
break;
|
||||
@@ -166,14 +166,14 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg)
|
||||
}
|
||||
}
|
||||
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY], 0, FS_OP_SET, client_stat_sum.get_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ], 0, FS_OP_SET, client_stat_sum.get_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024));
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ABANDON], 0, FS_OP_SET, client_stat_sum.totaldrop_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_THROTTLE], 0, FS_OP_SET, client_stat_sum.totaldrop_num);
|
||||
FS_passive_output(cache->fs_handle);
|
||||
return;
|
||||
}
|
||||
@@ -217,37 +217,37 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
|
||||
|
||||
struct cache_stat_sepc spec[__CACHE_STAT_MAX];
|
||||
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_get",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_FORBIDDEN], "get_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_VERIFY], "get_verify",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "get_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_ABANDON], "get_abandon",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READ], "cache_read",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_PEND_FORBIDDEN], "read_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READ_VERIFY], "read_verify",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READ_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READ_BYTES], "read(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_READ], "or_read",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_READ_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_READ_BYTES], "or_read(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READ_ERR], "read_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READ_THROTTLE], "read_throt",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_READING], "reading",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_PENDING], "pending",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_POLICY_MATCH], "policy_match",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_POLICY_EFFECT], "policy_effect",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYPASS], "cache_bypass",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_CANCEL], "put_cancel",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_POLICY_MATCH], "ply_match",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_POLICY_EFFECT], "ply_effect",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_CNT], "cache_write",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_BYPASS], "write_bypass",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_WRITE], "or_write",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_FORBIDEN], "write_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_THROTTLE], "write_throt",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_CANCEL], "write_cancel",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "put_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "put(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOADING], "putting",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_ERR], "write_err",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_BYTES], "write(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITING], "writing",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_MEMORY], "buffer(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hitted_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_UPLOAD_OBJ_SIZE], "cached_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], "or_cached(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hit_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_OBJ_SIZE], "wr_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], "or_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
|
||||
|
||||
for(i=0;i<__CACHE_STAT_MAX;i++)
|
||||
@@ -261,27 +261,27 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
|
||||
cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name);
|
||||
}
|
||||
}
|
||||
// value=cache->fs_id[STAT_CACHE_QUERY_HIT];
|
||||
// value=cache->fs_id[STAT_CACHE_READ_HIT];
|
||||
// FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
|
||||
|
||||
FS_register_ratio(cache->fs_handle,
|
||||
cache->fs_id[STAT_CACHE_QUERY_HIT],
|
||||
cache->fs_id[STAT_CACHE_QUERY],
|
||||
cache->fs_id[STAT_CACHE_READ_HIT],
|
||||
cache->fs_id[STAT_CACHE_READ],
|
||||
1,
|
||||
FS_STYLE_STATUS,
|
||||
FS_CALC_CURRENT,
|
||||
"cache_hit");
|
||||
|
||||
value=cache->fs_id[STAT_CACHE_OVERRIDE_HIT];
|
||||
value=cache->fs_id[STAT_CACHE_OVERRIDE_READ_HIT];
|
||||
FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
|
||||
|
||||
FS_register_ratio(cache->fs_handle,
|
||||
cache->fs_id[STAT_CACHE_OVERRIDE_HIT],
|
||||
cache->fs_id[STAT_CACHE_OVERRIDE_QUERY],
|
||||
cache->fs_id[STAT_CACHE_OVERRIDE_READ_HIT],
|
||||
cache->fs_id[STAT_CACHE_OVERRIDE_READ],
|
||||
1,
|
||||
FS_STYLE_STATUS,
|
||||
FS_CALC_CURRENT,
|
||||
"override_hit");
|
||||
"or_hit");
|
||||
|
||||
|
||||
FS_start(cache->fs_handle);
|
||||
@@ -857,7 +857,7 @@ static void cache_query_obj_on_succ(future_result_t * result, void * user)
|
||||
switch(ctx->ref_tango_cache_result->type)
|
||||
{
|
||||
case RESULT_TYPE_HEADER:
|
||||
ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT]));
|
||||
ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_READ_HIT]));
|
||||
if(ctx->ref_mid->is_using_exception_param==1)
|
||||
{
|
||||
ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_POLICY_EFFECT]));
|
||||
@@ -875,12 +875,12 @@ static void cache_query_obj_on_succ(future_result_t * result, void * user)
|
||||
//NOT break intentionally.
|
||||
case RESULT_TYPE_END:
|
||||
//last call.
|
||||
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING]));
|
||||
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING]));
|
||||
promise_dettach_ctx(p);
|
||||
last_call=1;
|
||||
break;
|
||||
case RESULT_TYPE_BODY:
|
||||
ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), ctx->ref_tango_cache_result->size);
|
||||
ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_READ_BYTES]), ctx->ref_tango_cache_result->size);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@@ -895,7 +895,7 @@ static void cache_query_obj_on_fail(enum e_future_error err, const char * what,
|
||||
struct promise * p = (struct promise *) user;
|
||||
struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p);
|
||||
promise_failed(p, err, what);
|
||||
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING]));
|
||||
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING]));
|
||||
cache_query_ctx_free_cb(ctx);
|
||||
return;
|
||||
}
|
||||
@@ -945,7 +945,7 @@ const struct cached_meta* cache_pending_result_read_meta(future_result_t * resul
|
||||
}
|
||||
}
|
||||
|
||||
static void cache_query_meta_on_succ(future_result_t * result, void * user)
|
||||
static void cache_read_meta_on_succ(future_result_t * result, void * user)
|
||||
{
|
||||
struct promise * p = (struct promise *) user;
|
||||
struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p);
|
||||
@@ -982,7 +982,7 @@ static void cache_query_meta_on_succ(future_result_t * result, void * user)
|
||||
}
|
||||
|
||||
}
|
||||
static void cache_query_meta_on_fail(enum e_future_error err, const char * what, void * user)
|
||||
static void cache_read_meta_on_fail(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct promise * p = (struct promise *) user;
|
||||
struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p);
|
||||
@@ -1067,12 +1067,12 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
|
||||
case UNDEFINED:
|
||||
if(!handle->query_undefined_obj_enabled)
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_FORBIDDEN]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PEND_FORBIDDEN]));
|
||||
result=PENDING_RESULT_FOBIDDEN;
|
||||
}
|
||||
else
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_READ]));
|
||||
result=PENDING_RESULT_ALLOWED;
|
||||
}
|
||||
break;
|
||||
@@ -1083,7 +1083,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
|
||||
}
|
||||
else
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_FORBIDDEN]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PEND_FORBIDDEN]));
|
||||
result=PENDING_RESULT_FOBIDDEN;
|
||||
}
|
||||
break;
|
||||
@@ -1123,7 +1123,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
|
||||
promise_set_ctx(p, ctx, cache_pending_ctx_free_cb);
|
||||
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING]));
|
||||
ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_query_meta_on_succ, cache_query_meta_on_fail, p);
|
||||
ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_read_meta_on_succ, cache_read_meta_on_fail, p);
|
||||
ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta);
|
||||
if(ret<0)
|
||||
{
|
||||
@@ -1136,7 +1136,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
|
||||
|
||||
return _mid->result;
|
||||
}
|
||||
int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id,
|
||||
int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id,
|
||||
const struct tfe_http_half * request, struct cache_mid** mid, struct future* f)
|
||||
{
|
||||
struct cache_query_context* query_ctx=NULL;
|
||||
@@ -1144,9 +1144,9 @@ int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id,
|
||||
struct cache_mid* _mid=*mid;
|
||||
assert(_mid->result!=PENDING_RESULT_FOBIDDEN);
|
||||
|
||||
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max)))
|
||||
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_READING])) > ATOMIC_READ(&(handle->put_concurrency_max)))
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_ABANDON]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READ_THROTTLE]));
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -1163,7 +1163,7 @@ int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id,
|
||||
promise_allow_many_successes(p);
|
||||
promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb);
|
||||
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING]));
|
||||
query_ctx->f_tango_cache_fetch=future_create("_cache_get", cache_query_obj_on_succ, cache_query_obj_on_fail, p);
|
||||
int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta);
|
||||
if(ret<0)
|
||||
@@ -1186,20 +1186,20 @@ void wrap_cache_put_ctx_free(struct wrap_cache_put_ctx* ctx)
|
||||
future_destroy(ctx->f);
|
||||
free(ctx);
|
||||
}
|
||||
static void wrap_cache_update_on_succ(future_result_t * result, void * user)
|
||||
static void wrap_cache_write_on_succ(future_result_t * result, void * user)
|
||||
{
|
||||
struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user;
|
||||
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s elapse: %d", ctx->url, time(NULL)-ctx->start);
|
||||
wrap_cache_put_ctx_free(ctx);
|
||||
}
|
||||
static void wrap_cache_update_on_fail(enum e_future_error err, const char * what, void * user)
|
||||
static void wrap_cache_write_on_fail(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user;
|
||||
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s %s lapse: %d", ctx->url, what, time(NULL)-ctx->start);
|
||||
wrap_cache_put_ctx_free(ctx);
|
||||
}
|
||||
|
||||
struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id,
|
||||
struct cache_update_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id,
|
||||
const struct tfe_http_session * session, struct cache_mid **mid)
|
||||
{
|
||||
struct cache_update_context* update_ctx=NULL;
|
||||
@@ -1236,7 +1236,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
case FORBIDDEN:
|
||||
if(!(param->ignore_res_nocache || param->force_caching))
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_FORBIDEN]));
|
||||
TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url);
|
||||
return NULL;
|
||||
}
|
||||
@@ -1249,7 +1249,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
|| (!param->cache_cookied_cont && _mid->has_cookie)
|
||||
|| (!param->cache_html && _mid->is_html))
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_BYPASS]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_BYPASS]));
|
||||
return NULL;
|
||||
}
|
||||
break;
|
||||
@@ -1257,9 +1257,9 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_UPLOADING])) > handle->get_concurrency_max)
|
||||
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_WRITING])) > handle->get_concurrency_max)
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_THROTTLE]));
|
||||
return NULL;
|
||||
}
|
||||
const char* key=NULL;
|
||||
@@ -1284,7 +1284,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING]));
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITING]));
|
||||
|
||||
struct tango_cache_meta_put meta;
|
||||
memset(&meta, 0, sizeof(meta));
|
||||
@@ -1311,7 +1311,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
_cache_put_ctx->url=tfe_strdup(session->req->req_spec.url);
|
||||
_cache_put_ctx->start=time(NULL);
|
||||
_cache_put_ctx->ref_handle=handle;
|
||||
_cache_put_ctx->f=future_create("cache_put", wrap_cache_update_on_succ, wrap_cache_update_on_fail, _cache_put_ctx);
|
||||
_cache_put_ctx->f=future_create("cache_put", wrap_cache_write_on_succ, wrap_cache_write_on_fail, _cache_put_ctx);
|
||||
write_ctx=tango_cache_update_start(handle->clients[thread_id], _cache_put_ctx->f, &meta);
|
||||
if(write_ctx==NULL)//exceed maximum cache memory size.
|
||||
{
|
||||
@@ -1321,10 +1321,10 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
TFE_LOG_DEBUG(handle->logger, "cache upload allowed: %s", _cache_put_ctx->url);
|
||||
if(is_undefined_obj)
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_OVERRIDE]));
|
||||
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_WRITE]));
|
||||
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||
}
|
||||
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_WRITE_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||
update_ctx=ALLOC(struct cache_update_context, 1);
|
||||
update_ctx->write_ctx=write_ctx;
|
||||
update_ctx->ref_cache_handle=handle;
|
||||
@@ -1333,14 +1333,14 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
||||
return update_ctx;
|
||||
|
||||
}
|
||||
void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size)
|
||||
void web_cache_write(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size)
|
||||
{
|
||||
tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size);
|
||||
ctx->uploaded_len+=frag_size;
|
||||
ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_BYTES]), frag_size);
|
||||
ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_BYTES]), frag_size);
|
||||
return;
|
||||
}
|
||||
void web_cache_update_end(struct cache_update_context* ctx)
|
||||
void web_cache_write_end(struct cache_update_context* ctx)
|
||||
{
|
||||
|
||||
if(ctx->uploaded_len==ctx->content_len)
|
||||
@@ -1350,9 +1350,9 @@ void web_cache_update_end(struct cache_update_context* ctx)
|
||||
else
|
||||
{
|
||||
tango_cache_update_cancel(ctx->write_ctx);
|
||||
ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_CANCEL]));
|
||||
ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_CANCEL]));
|
||||
}
|
||||
ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING]));
|
||||
ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITING]));
|
||||
|
||||
ctx->write_ctx = NULL;
|
||||
ctx->ref_cache_handle = NULL;
|
||||
|
||||
Reference in New Issue
Block a user