缓存业务层适配redis cluster读取文件的接口。
This commit is contained in:
@@ -539,7 +539,7 @@ struct pangu_http_ctx
|
||||
struct tfe_http_half* cache_revalidate_req;
|
||||
struct tfe_http_half* cached_response;
|
||||
size_t cache_result_declared_sz, cache_result_actual_sz;
|
||||
struct cache_update_context* cache_update_ctx;
|
||||
struct cache_write_context* cache_update_ctx;
|
||||
|
||||
int thread_id;
|
||||
};
|
||||
|
||||
@@ -48,8 +48,8 @@ enum cache_stat_field
|
||||
STAT_CACHE_WRITE_BYTES,
|
||||
STAT_CACHE_WRITING,
|
||||
STAT_CACHE_MEMORY,
|
||||
STAT_CACHE_ACTIVE_SESSION,
|
||||
|
||||
STAT_CACHE_ACTIVE_SESSION_HTTP,
|
||||
STAT_CACHE_ACTIVE_SESSION_REDIS,
|
||||
STAT_CACHE_QUERY_HIT_OJB_SIZE,
|
||||
STAT_CACHE_WRITE_OBJ_SIZE,
|
||||
STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE,
|
||||
@@ -124,8 +124,9 @@ struct cache_handle
|
||||
struct cache_bloom *cache_key_bloom;
|
||||
void* logger;
|
||||
};
|
||||
struct cache_update_context
|
||||
struct cache_write_context
|
||||
{
|
||||
struct cache_write_future_ctx* future_ctx;
|
||||
struct cache_handle* ref_cache_handle;
|
||||
struct tango_cache_ctx * write_ctx;
|
||||
size_t content_len;
|
||||
@@ -168,12 +169,16 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg)
|
||||
}
|
||||
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ], 0, FS_OP_SET, client_stat_sum.get_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0,
|
||||
FS_OP_SET, client_stat_sum.get_succ_http+client_stat_sum.get_succ_redis);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0,
|
||||
FS_OP_SET, client_stat_sum.get_err_http+client_stat_sum.get_err_redis);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0,
|
||||
FS_OP_SET, client_stat_sum.put_recv_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0,
|
||||
FS_OP_SET, client_stat_sum.put_err_http+client_stat_sum.put_err_redis);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024));
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION_HTTP], 0, FS_OP_SET, client_stat_sum.session_http);
|
||||
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_THROTTLE], 0, FS_OP_SET, client_stat_sum.totaldrop_num);
|
||||
FS_passive_output(cache->fs_handle);
|
||||
return;
|
||||
@@ -243,7 +248,8 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITE_BYTES], "write(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_WRITING], "writing",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_MEMORY], "buffer(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION_HTTP], "sess_http",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION_REDIS], "sess_redis",FS_STYLE_STATUS, FS_CALC_CURRENT);
|
||||
|
||||
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hit_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
|
||||
@@ -617,9 +623,11 @@ void cache_param_dup(int idx, MAAT_RULE_EX_DATA *to, MAAT_RULE_EX_DATA *from, lo
|
||||
*((struct cache_param**)to)=from_param;
|
||||
return;
|
||||
}
|
||||
|
||||
struct cache_mid
|
||||
{
|
||||
enum cache_pending_result result;
|
||||
enum OBJECT_LOCATION location;
|
||||
struct request_freshness req_fresshness;
|
||||
char shall_bypass;
|
||||
char is_using_exception_param;
|
||||
@@ -923,7 +931,7 @@ struct cache_pending_context
|
||||
|
||||
char* url;
|
||||
struct cached_meta cached_obj_meta;
|
||||
|
||||
struct cache_mid* ref_mid;
|
||||
struct cache_handle* ref_handle;
|
||||
struct tango_cache_result* ref_tango_cache_result;
|
||||
struct future* f_tango_cache_fetch;
|
||||
@@ -967,7 +975,7 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user)
|
||||
|
||||
switch(_result->type)
|
||||
{
|
||||
case RESULT_TYPE_HEADER:
|
||||
case RESULT_TYPE_HEADER:
|
||||
ctx->cached_obj_meta.content_length=_result->tlength;
|
||||
cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size);
|
||||
ctx->status=PENDING_RESULT_REVALIDATE;
|
||||
@@ -985,6 +993,7 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user)
|
||||
//NOT break intentionally.
|
||||
case RESULT_TYPE_END:
|
||||
//last call.
|
||||
ctx->ref_mid->location=_result->location;
|
||||
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING]));
|
||||
promise_dettach_ctx(p);
|
||||
promise_success(p, ctx);
|
||||
@@ -1130,7 +1139,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
|
||||
ctx->status=PENDING_RESULT_FOBIDDEN;
|
||||
ctx->ref_handle=handle;
|
||||
ctx->url=tfe_strdup(request->req_spec.url);
|
||||
|
||||
ctx->ref_mid=_mid;
|
||||
ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE));
|
||||
ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH));
|
||||
promise_set_ctx(p, ctx, cache_pending_ctx_free_cb);
|
||||
@@ -1178,7 +1187,7 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id,
|
||||
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING]));
|
||||
query_ctx->f_tango_cache_fetch=future_create("_cache_read", cache_query_obj_on_succ, cache_query_obj_on_fail, p);
|
||||
int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta);
|
||||
int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta, _mid->location);
|
||||
if(ret<0)
|
||||
{
|
||||
cache_query_ctx_free_cb(query_ctx);
|
||||
@@ -1186,14 +1195,15 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id,
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
struct wrap_cache_put_ctx
|
||||
struct cache_write_future_ctx
|
||||
{
|
||||
char* url;
|
||||
char upload_path[TFE_PATH_MAX];
|
||||
time_t start;
|
||||
struct future* f;
|
||||
struct cache_handle* ref_handle;
|
||||
};
|
||||
void wrap_cache_write_ctx_free(struct wrap_cache_put_ctx* ctx)
|
||||
void cache_write_future_ctx_free(struct cache_write_future_ctx* ctx)
|
||||
{
|
||||
FREE(&(ctx->url));
|
||||
future_destroy(ctx->f);
|
||||
@@ -1201,24 +1211,25 @@ void wrap_cache_write_ctx_free(struct wrap_cache_put_ctx* ctx)
|
||||
}
|
||||
static void wrap_cache_write_on_succ(future_result_t * result, void * user)
|
||||
{
|
||||
struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user;
|
||||
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s elapse: %d", ctx->url, time(NULL)-ctx->start);
|
||||
wrap_cache_write_ctx_free(ctx);
|
||||
struct cache_write_future_ctx* ctx=(struct cache_write_future_ctx*)user;
|
||||
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s path: %s elapse: %d",
|
||||
ctx->url, ctx->upload_path, time(NULL)-ctx->start);
|
||||
cache_write_future_ctx_free(ctx);
|
||||
}
|
||||
static void wrap_cache_write_on_fail(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user;
|
||||
struct cache_write_future_ctx* ctx=(struct cache_write_future_ctx*)user;
|
||||
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s %s lapse: %d", ctx->url, what, time(NULL)-ctx->start);
|
||||
wrap_cache_write_ctx_free(ctx);
|
||||
cache_write_future_ctx_free(ctx);
|
||||
}
|
||||
|
||||
struct cache_update_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id,
|
||||
struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id,
|
||||
const struct tfe_http_session * session, struct cache_mid **mid)
|
||||
{
|
||||
struct cache_update_context* update_ctx=NULL;
|
||||
struct cache_write_context* write_ctx=NULL;
|
||||
struct response_freshness resp_freshness;
|
||||
enum cache_pending_action put_action;
|
||||
struct tango_cache_ctx *write_ctx=NULL;
|
||||
struct tango_cache_ctx *tango_cache_write_ctx=NULL;
|
||||
char cont_type_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0};
|
||||
const char* content_type=NULL;
|
||||
char *tmp=NULL;
|
||||
@@ -1321,45 +1332,54 @@ struct cache_update_context* web_cache_write_start(struct cache_handle* handle,
|
||||
meta.put=resp_freshness;
|
||||
meta.put.timeout=MAX(param->pinning_time_sec, resp_freshness.timeout);
|
||||
|
||||
struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1);
|
||||
_cache_put_ctx->url=tfe_strdup(session->req->req_spec.url);
|
||||
_cache_put_ctx->start=time(NULL);
|
||||
_cache_put_ctx->ref_handle=handle;
|
||||
_cache_put_ctx->f=future_create("_cache_wrt", wrap_cache_write_on_succ, wrap_cache_write_on_fail, _cache_put_ctx);
|
||||
write_ctx=tango_cache_update_start(handle->clients[thread_id], _cache_put_ctx->f, &meta);
|
||||
if(write_ctx==NULL)//exceed maximum cache memory size.
|
||||
struct cache_write_future_ctx* future_ctx=ALLOC(struct cache_write_future_ctx, 1);
|
||||
future_ctx->url=tfe_strdup(session->req->req_spec.url);
|
||||
future_ctx->start=time(NULL);
|
||||
future_ctx->ref_handle=handle;
|
||||
future_ctx->f=future_create("_cache_wrt", wrap_cache_write_on_succ, wrap_cache_write_on_fail, future_ctx);
|
||||
tango_cache_write_ctx=tango_cache_update_start(handle->clients[thread_id], future_ctx->f, &meta);
|
||||
if(tango_cache_write_ctx==NULL)//exceed maximum cache memory size.
|
||||
{
|
||||
wrap_cache_write_ctx_free(_cache_put_ctx);
|
||||
cache_write_future_ctx_free(future_ctx);
|
||||
return NULL;
|
||||
}
|
||||
TFE_LOG_DEBUG(handle->logger, "cache upload allowed: %s", _cache_put_ctx->url);
|
||||
TFE_LOG_DEBUG(handle->logger, "cache upload allowed: %s", future_ctx->url);
|
||||
if(is_undefined_obj)
|
||||
{
|
||||
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_WRITE]));
|
||||
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||
}
|
||||
FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_WRITE_OBJ_SIZE], 0, FS_OP_SET, content_len/1024);
|
||||
update_ctx=ALLOC(struct cache_update_context, 1);
|
||||
update_ctx->write_ctx=write_ctx;
|
||||
update_ctx->ref_cache_handle=handle;
|
||||
update_ctx->content_len=content_len;
|
||||
update_ctx->uploaded_len=0;
|
||||
return update_ctx;
|
||||
write_ctx=ALLOC(struct cache_write_context, 1);
|
||||
write_ctx->write_ctx=tango_cache_write_ctx;
|
||||
write_ctx->ref_cache_handle=handle;
|
||||
write_ctx->content_len=content_len;
|
||||
write_ctx->uploaded_len=0;
|
||||
write_ctx->future_ctx=future_ctx;
|
||||
return write_ctx;
|
||||
|
||||
}
|
||||
void web_cache_write(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size)
|
||||
void web_cache_write(struct cache_write_context* ctx, const unsigned char * body_frag, size_t frag_size)
|
||||
{
|
||||
tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size);
|
||||
ctx->uploaded_len+=frag_size;
|
||||
ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_BYTES]), frag_size);
|
||||
return;
|
||||
}
|
||||
void web_cache_write_end(struct cache_update_context* ctx)
|
||||
void web_cache_write_end(struct cache_write_context* ctx)
|
||||
{
|
||||
|
||||
int ret=0;
|
||||
struct cache_write_future_ctx* future_ctx=ctx->future_ctx;
|
||||
if(ctx->uploaded_len==ctx->content_len)
|
||||
{
|
||||
tango_cache_update_end(ctx->write_ctx);
|
||||
ret=tango_cache_update_end(ctx->write_ctx, future_ctx->upload_path, sizeof(future_ctx->upload_path));
|
||||
if(ret<0)
|
||||
{
|
||||
//upload too slow or storage server error;
|
||||
cache_write_future_ctx_free(future_ctx);
|
||||
TFE_LOG_DEBUG(ctx->ref_cache_handle->logger, "cache upload failed: %s",ctx->future_ctx->url);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1367,7 +1387,6 @@ void web_cache_write_end(struct cache_update_context* ctx)
|
||||
ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_CANCEL]));
|
||||
}
|
||||
ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITING]));
|
||||
|
||||
ctx->write_ctx = NULL;
|
||||
ctx->ref_cache_handle = NULL;
|
||||
free(ctx);
|
||||
|
||||
@@ -54,11 +54,11 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
|
||||
|
||||
|
||||
|
||||
struct cache_update_context;
|
||||
struct cache_update_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id,
|
||||
struct cache_write_context;
|
||||
struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id,
|
||||
const struct tfe_http_session * session, struct cache_mid **mid);
|
||||
void web_cache_write(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size);
|
||||
void web_cache_write_end(struct cache_update_context* ctx);
|
||||
void web_cache_write(struct cache_write_context* ctx, const unsigned char * body_frag, size_t frag_size);
|
||||
void web_cache_write_end(struct cache_write_context* ctx);
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user