diff --git a/plugin/business/pangu-http/src/pangu_http.cpp b/plugin/business/pangu-http/src/pangu_http.cpp index 11abdef..c195aba 100644 --- a/plugin/business/pangu-http/src/pangu_http.cpp +++ b/plugin/business/pangu-http/src/pangu_http.cpp @@ -539,7 +539,7 @@ struct pangu_http_ctx struct tfe_http_half* cache_revalidate_req; struct tfe_http_half* cached_response; size_t cache_result_declared_sz, cache_result_actual_sz; - struct cache_update_context* cache_update_ctx; + struct cache_write_context* cache_update_ctx; int thread_id; }; diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index ec7c893..b824c1e 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -48,8 +48,8 @@ enum cache_stat_field STAT_CACHE_WRITE_BYTES, STAT_CACHE_WRITING, STAT_CACHE_MEMORY, - STAT_CACHE_ACTIVE_SESSION, - + STAT_CACHE_ACTIVE_SESSION_HTTP, + STAT_CACHE_ACTIVE_SESSION_REDIS, STAT_CACHE_QUERY_HIT_OJB_SIZE, STAT_CACHE_WRITE_OBJ_SIZE, STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE, @@ -124,8 +124,9 @@ struct cache_handle struct cache_bloom *cache_key_bloom; void* logger; }; -struct cache_update_context +struct cache_write_context { + struct cache_write_future_ctx* future_ctx; struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; size_t content_len; @@ -168,12 +169,16 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg) } FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ], 0, FS_OP_SET, client_stat_sum.get_recv_num); - FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num); - FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num); - FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num); - FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0, + FS_OP_SET, client_stat_sum.get_succ_http+client_stat_sum.get_succ_redis); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, + FS_OP_SET, client_stat_sum.get_err_http+client_stat_sum.get_err_redis); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0, + FS_OP_SET, client_stat_sum.put_recv_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0, + FS_OP_SET, client_stat_sum.put_err_http+client_stat_sum.put_err_redis); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); - FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION_HTTP], 0, FS_OP_SET, client_stat_sum.session_http); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_THROTTLE], 0, FS_OP_SET, client_stat_sum.totaldrop_num); FS_passive_output(cache->fs_handle); return; @@ -243,7 +248,8 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) set_stat_spec(&spec[STAT_CACHE_WRITE_BYTES], "write(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_WRITING], "writing",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_MEMORY], "buffer(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); - set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION_HTTP], "sess_http",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION_REDIS], "sess_redis",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hit_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); @@ -617,9 +623,11 @@ void cache_param_dup(int idx, MAAT_RULE_EX_DATA *to, MAAT_RULE_EX_DATA *from, lo *((struct cache_param**)to)=from_param; return; } + struct cache_mid { enum cache_pending_result result; + enum OBJECT_LOCATION location; struct request_freshness req_fresshness; char shall_bypass; char is_using_exception_param; @@ -923,7 +931,7 @@ struct cache_pending_context char* url; struct cached_meta cached_obj_meta; - + struct cache_mid* ref_mid; struct cache_handle* ref_handle; struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; @@ -967,7 +975,7 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user) switch(_result->type) { - case RESULT_TYPE_HEADER: + case RESULT_TYPE_HEADER: ctx->cached_obj_meta.content_length=_result->tlength; cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); ctx->status=PENDING_RESULT_REVALIDATE; @@ -985,6 +993,7 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user) //NOT break intentionally. case RESULT_TYPE_END: //last call. + ctx->ref_mid->location=_result->location; ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); promise_dettach_ctx(p); promise_success(p, ctx); @@ -1130,7 +1139,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u ctx->status=PENDING_RESULT_FOBIDDEN; ctx->ref_handle=handle; ctx->url=tfe_strdup(request->req_spec.url); - + ctx->ref_mid=_mid; ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); @@ -1178,7 +1187,7 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id, ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING])); query_ctx->f_tango_cache_fetch=future_create("_cache_read", cache_query_obj_on_succ, cache_query_obj_on_fail, p); - int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); + int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta, _mid->location); if(ret<0) { cache_query_ctx_free_cb(query_ctx); @@ -1186,14 +1195,15 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id, } return 0; } -struct wrap_cache_put_ctx +struct cache_write_future_ctx { char* url; + char upload_path[TFE_PATH_MAX]; time_t start; struct future* f; struct cache_handle* ref_handle; }; -void wrap_cache_write_ctx_free(struct wrap_cache_put_ctx* ctx) +void cache_write_future_ctx_free(struct cache_write_future_ctx* ctx) { FREE(&(ctx->url)); future_destroy(ctx->f); @@ -1201,24 +1211,25 @@ void wrap_cache_write_ctx_free(struct wrap_cache_put_ctx* ctx) } static void wrap_cache_write_on_succ(future_result_t * result, void * user) { - struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; - TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s elapse: %d", ctx->url, time(NULL)-ctx->start); - wrap_cache_write_ctx_free(ctx); + struct cache_write_future_ctx* ctx=(struct cache_write_future_ctx*)user; + TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s path: %s elapse: %d", + ctx->url, ctx->upload_path, time(NULL)-ctx->start); + cache_write_future_ctx_free(ctx); } static void wrap_cache_write_on_fail(enum e_future_error err, const char * what, void * user) { - struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; + struct cache_write_future_ctx* ctx=(struct cache_write_future_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s %s lapse: %d", ctx->url, what, time(NULL)-ctx->start); - wrap_cache_write_ctx_free(ctx); + cache_write_future_ctx_free(ctx); } -struct cache_update_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, +struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_session * session, struct cache_mid **mid) { - struct cache_update_context* update_ctx=NULL; + struct cache_write_context* write_ctx=NULL; struct response_freshness resp_freshness; enum cache_pending_action put_action; - struct tango_cache_ctx *write_ctx=NULL; + struct tango_cache_ctx *tango_cache_write_ctx=NULL; char cont_type_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; const char* content_type=NULL; char *tmp=NULL; @@ -1321,45 +1332,54 @@ struct cache_update_context* web_cache_write_start(struct cache_handle* handle, meta.put=resp_freshness; meta.put.timeout=MAX(param->pinning_time_sec, resp_freshness.timeout); - struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); - _cache_put_ctx->url=tfe_strdup(session->req->req_spec.url); - _cache_put_ctx->start=time(NULL); - _cache_put_ctx->ref_handle=handle; - _cache_put_ctx->f=future_create("_cache_wrt", wrap_cache_write_on_succ, wrap_cache_write_on_fail, _cache_put_ctx); - write_ctx=tango_cache_update_start(handle->clients[thread_id], _cache_put_ctx->f, &meta); - if(write_ctx==NULL)//exceed maximum cache memory size. + struct cache_write_future_ctx* future_ctx=ALLOC(struct cache_write_future_ctx, 1); + future_ctx->url=tfe_strdup(session->req->req_spec.url); + future_ctx->start=time(NULL); + future_ctx->ref_handle=handle; + future_ctx->f=future_create("_cache_wrt", wrap_cache_write_on_succ, wrap_cache_write_on_fail, future_ctx); + tango_cache_write_ctx=tango_cache_update_start(handle->clients[thread_id], future_ctx->f, &meta); + if(tango_cache_write_ctx==NULL)//exceed maximum cache memory size. { - wrap_cache_write_ctx_free(_cache_put_ctx); + cache_write_future_ctx_free(future_ctx); return NULL; } - TFE_LOG_DEBUG(handle->logger, "cache upload allowed: %s", _cache_put_ctx->url); + TFE_LOG_DEBUG(handle->logger, "cache upload allowed: %s", future_ctx->url); if(is_undefined_obj) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_WRITE])); FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); } FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_WRITE_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); - update_ctx=ALLOC(struct cache_update_context, 1); - update_ctx->write_ctx=write_ctx; - update_ctx->ref_cache_handle=handle; - update_ctx->content_len=content_len; - update_ctx->uploaded_len=0; - return update_ctx; + write_ctx=ALLOC(struct cache_write_context, 1); + write_ctx->write_ctx=tango_cache_write_ctx; + write_ctx->ref_cache_handle=handle; + write_ctx->content_len=content_len; + write_ctx->uploaded_len=0; + write_ctx->future_ctx=future_ctx; + return write_ctx; } -void web_cache_write(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size) +void web_cache_write(struct cache_write_context* ctx, const unsigned char * body_frag, size_t frag_size) { tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size); ctx->uploaded_len+=frag_size; ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_BYTES]), frag_size); return; } -void web_cache_write_end(struct cache_update_context* ctx) +void web_cache_write_end(struct cache_write_context* ctx) { - + int ret=0; + struct cache_write_future_ctx* future_ctx=ctx->future_ctx; if(ctx->uploaded_len==ctx->content_len) { - tango_cache_update_end(ctx->write_ctx); + ret=tango_cache_update_end(ctx->write_ctx, future_ctx->upload_path, sizeof(future_ctx->upload_path)); + if(ret<0) + { + //upload too slow or storage server error; + cache_write_future_ctx_free(future_ctx); + TFE_LOG_DEBUG(ctx->ref_cache_handle->logger, "cache upload failed: %s",ctx->future_ctx->url); + return; + } } else { @@ -1367,7 +1387,6 @@ void web_cache_write_end(struct cache_update_context* ctx) ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_CANCEL])); } ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITING])); - ctx->write_ctx = NULL; ctx->ref_cache_handle = NULL; free(ctx); diff --git a/plugin/business/pangu-http/src/pangu_web_cache.h b/plugin/business/pangu-http/src/pangu_web_cache.h index 6a4e3a9..8656660 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.h +++ b/plugin/business/pangu-http/src/pangu_web_cache.h @@ -54,11 +54,11 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u -struct cache_update_context; -struct cache_update_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, +struct cache_write_context; +struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_session * session, struct cache_mid **mid); -void web_cache_write(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size); -void web_cache_write_end(struct cache_update_context* ctx); +void web_cache_write(struct cache_write_context* ctx, const unsigned char * body_frag, size_t frag_size); +void web_cache_write_end(struct cache_write_context* ctx);