#include "pangu_web_cache.h" #include #include #include #include #include #include #include #include #include enum cache_stat_field { STAT_CACHE_QUERY, STAT_CACHE_QUERY_NOT_APPLICABLE, STAT_CACHE_QUERY_VERIFY, STAT_CACHE_QUERY_HIT, STAT_CACHE_QUERY_BYTES, STAT_CACHE_OVERRIDE_QUERY, STAT_CACHE_OVERRIDE_HIT, STAT_CACHE_OVERRIDE_BYTES, STAT_CACHE_QUERY_ERR, STAT_CACHE_QUERY_ABANDON, STAT_CACHE_QUERYING, STAT_CACHE_META_QUERYING, STAT_CACHE_UPLOAD_CNT, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_ABANDON, STAT_CACHE_UPLOAD_ERR, STAT_CACHE_UPLOAD_BYTES, STAT_CACHE_UPLOADING, STAT_CACHE_MEMORY, STAT_CACHE_ACTIVE_SESSION, STAT_CACHE_QUERY_HIT_OJB_SIZE, STAT_CACHE_UPLOAD_OBJ_SIZE, STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE, STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE, __CACHE_STAT_MAX }; struct cache_handle { unsigned int thread_count; int cache_undefined_obj_enabled; int query_undefined_obj_enabled; size_t cache_undefined_obj_min_size; int minimum_cache_seconds; struct tango_cache_instance **clients; long long get_concurrency_max; long long put_concurrency_max; screen_stat_handle_t fs_handle; long long stat_val[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX]; struct event_base* gc_evbase; struct event* gcev; void* logger; }; struct cache_update_context { struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; }; static void cache_gc_cb(evutil_socket_t fd, short what, void * arg) { struct cache_handle* cache=(struct cache_handle *)arg; struct cache_statistics client_stat_sum, client_stat; memset(&client_stat_sum, 0, sizeof(client_stat_sum)); long long *val_sum = (long long *)&client_stat_sum; long long *val = NULL; int i=0, j=0; for(i=0; ithread_count;i++) { tango_cache_get_statistics(cache->clients[i], &client_stat); val=(long long*)&client_stat; for(j=0; jstat_val[i]))!=0) { switch(i) { case STAT_CACHE_UPLOAD_BYTES: case STAT_CACHE_QUERY_BYTES: case STAT_CACHE_OVERRIDE_BYTES: //translate bytes to mega bytes. FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024)); break; default: FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))); break; } } } FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY], 0, FS_OP_SET, client_stat_sum.get_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ABANDON], 0, FS_OP_SET, client_stat_sum.totaldrop_num); FS_passive_output(cache->fs_handle); return; } struct cache_stat_sepc { const char* name; enum field_dsp_style_t style; enum field_calc_algo calc_type; }; static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum field_dsp_style_t style, enum field_calc_algo calc_type) { spec->name=name; spec->style=style; spec->calc_type=calc_type; return; } void cache_stat_init(struct cache_handle* cache) { const char* fieldstat_output="./cache.fieldstat"; const char* app_name="tango_cache"; const char* obj_size_bins_KB="10,100,1000,10000"; int value=0, i=0; screen_stat_handle_t fs_handle=NULL; fs_handle=FS_create_handle(); FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1); value=1; FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value)); value=0; FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1); FS_set_para(fs_handle, HISTOGRAM_GLOBAL_BINS, obj_size_bins_KB, strlen(obj_size_bins_KB)+1); cache->fs_handle=fs_handle; struct cache_stat_sepc spec[__CACHE_STAT_MAX]; set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_get",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "get_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_VERIFY], "get_verify",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "get_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ABANDON], "get_abandon",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "put_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "put(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOADING], "putting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hitted_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OBJ_SIZE], "cached_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], "or_cached(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); for(i=0;i<__CACHE_STAT_MAX;i++) { cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name); } // value=cache->fs_id[STAT_CACHE_QUERY_HIT]; // FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], cache->fs_id[STAT_CACHE_QUERY], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "cache_hit"); value=cache->fs_id[STAT_CACHE_OVERRIDE_HIT]; FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, cache->fs_id[STAT_CACHE_OVERRIDE_HIT], cache->fs_id[STAT_CACHE_OVERRIDE_QUERY], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "override_hit"); FS_start(cache->fs_handle); struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, cache_gc_cb, cache); evtimer_add(cache->gcev, &gc_delay); return; } struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger) { struct cache_handle* cache=ALLOC(struct cache_handle, 1); int temp=0; cache->logger=logger; cache->thread_count=tfe_proxy_get_work_thread_count(); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); int i=0; for(i=0; ithread_count; i++) { cache->clients[i]=tango_cache_instance_new(tfe_proxy_get_work_thread_evbase(i), profile_path, section, logger); if(cache->clients[i]==NULL) { goto error_out; } } MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000); cache->put_concurrency_max=temp; MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000); cache->put_concurrency_max=temp; MESA_load_profile_int_def(profile_path, section, "query_undefined_obj", &(cache->query_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); cache->cache_undefined_obj_min_size=temp; MESA_load_profile_int_def(profile_path, section, "cache_minimum_time_override", &(cache->minimum_cache_seconds), 60*5); cache->gc_evbase=gc_evbase; cache_stat_init(cache); return cache; error_out: free(cache); return NULL; } static char* read_http1_hdr(const char* hdr, const char* field_name) { const char *p=NULL, *q=NULL; char* value=NULL; p=strcasestr(hdr, field_name); if(p==NULL) { return NULL; } p=strstr(p, ":"); if(p==NULL) { return NULL; } p++; q=strcasestr(p, "\r\n"); if(q==NULL) { return NULL; } value=(char*) calloc(sizeof(char), (q-p+1)); memcpy(value, p, q-p); return value; } enum cache_query_result_type cache_query_result_get_type(future_result_t * result) { struct tango_cache_result* cache_result=tango_cache_read_result(result); enum cache_query_result_type map[__CACHE_QUERY_RESULT_MAX]; map[RESULT_TYPE_BODY]=CACHE_QUERY_RESULT_DATA; map[RESULT_TYPE_HEADER]=CACHE_QUERY_RESULT_META; map[RESULT_TYPE_USERTAG]=CACHE_QUERY_RESULT_IRRELEVANT; map[RESULT_TYPE_END]=CACHE_QUERY_RESULT_END; map[RESULT_TYPE_MISS]=CACHE_QUERY_RESULT_MISS; return map[cache_result->type]; } struct cached_meta* cache_query_result_get_header(future_result_t * result) { struct tango_cache_result* cache_result=tango_cache_read_result(result); struct cached_meta* meta; if(cache_result->type!=RESULT_TYPE_HEADER) { return NULL; } meta= ALLOC(struct cached_meta, 1); meta->content_length=cache_result->tlength; meta->content_type=read_http1_hdr((const char*)cache_result->data_frag, "content-type"); return meta; } size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data) { struct tango_cache_result* cache_result=tango_cache_read_result(result); assert(cache_result->type==RESULT_TYPE_BODY); *pp_data=(const unsigned char*)cache_result->data_frag; return cache_result->size; } struct cache_query_context { struct cache_handle* ref_handle; char* url; struct cached_meta meta; struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; }; void cache_query_ctx_free_cb(void* p) { struct cache_query_context* ctx=(struct cache_query_context*)p; future_destroy(ctx->f_tango_cache_fetch); ctx->f_tango_cache_fetch=NULL; FREE(&(ctx->url)); FREE(&(ctx->meta.etag)); FREE(&(ctx->meta.last_modified)); FREE(&(ctx->meta.content_type)); free(ctx); } const struct cached_meta* cache_query_result_read_meta(future_result_t * result) { struct cache_query_context* ctx=(struct cache_query_context*)result; return &(ctx->meta); } void cached_meta_set(struct cached_meta* meta, enum CACHE_RESULT_TYPE type, const char* data_frag, size_t size) { switch(type) { case RESULT_TYPE_HEADER: meta->content_type=read_http1_hdr((const char*)data_frag, "content-type"); break; case RESULT_TYPE_USERTAG: meta->last_modified=read_http1_hdr(data_frag, "Last-Modified"); meta->etag=read_http1_hdr(data_frag, "etag"); break; default: assert(0); break; } return; } static void cache_query_obj_on_succ(future_result_t * result, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); switch(_result->type) { case RESULT_TYPE_HEADER: ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT])); FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_QUERY_HIT_OJB_SIZE], 0, FS_OP_SET, _result->tlength/1024); cached_meta_set(&ctx->meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); ctx->meta.content_length=_result->tlength; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url); break; case RESULT_TYPE_USERTAG: cached_meta_set(&ctx->meta, RESULT_TYPE_USERTAG, _result->data_frag, _result->size); break; case RESULT_TYPE_MISS: TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url); //NOT break intentionally. case RESULT_TYPE_END: //last call. ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING])); promise_dettach_ctx(p); cache_query_ctx_free_cb(ctx); break; case RESULT_TYPE_BODY: ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size); break; default: break; } promise_success(p, result); return; } static void cache_query_obj_on_fail(enum e_future_error err, const char * what, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); promise_failed(p, err, what); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING])); cache_query_ctx_free_cb(ctx); return; } struct cache_pending_context { enum cache_pending_result status; int is_undefined_obj; struct request_freshness req_fresshness; char* req_if_none_match, *req_if_modified_since; const struct tfe_http_half * request; char* url; struct cached_meta cached_obj_meta; struct cache_handle* ref_handle; struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; }; void cache_pending_ctx_free_cb(void* p) { struct cache_pending_context* ctx=(struct cache_pending_context*)p; ctx->request=NULL; free(ctx->url); free(ctx); return; } const struct cached_meta* cache_pending_result_read_meta(future_result_t * result) { struct cache_pending_context* ctx=(struct cache_pending_context*)result; return &(ctx->cached_obj_meta); } static void cache_query_meta_on_succ(future_result_t * result, void * user) { struct promise * p = (struct promise *) user; struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); ctx->ref_tango_cache_result=_result; time_t cache_last_modified_time=0, request_last_modified_time=0; switch(_result->type) { case RESULT_TYPE_HEADER: ctx->cached_obj_meta.content_length=_result->tlength; cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); ctx->status=PENDING_RESULT_REVALIDATE; break; case RESULT_TYPE_USERTAG: cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache meta query hit: %s %s %s" , ctx->url , ctx->cached_obj_meta.last_modified ? ctx->cached_obj_meta.last_modified:"no_last_modify" , ctx->cached_obj_meta.etag ? ctx->cached_obj_meta.etag:"no_etag"); break; case RESULT_TYPE_MISS: ctx->status=PENDING_RESULT_MISS; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache meta query miss: %s", ctx->url); //NOT break intentionally. case RESULT_TYPE_END: //last call. ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_META_QUERYING])); promise_dettach_ctx(p); promise_success(p, ctx); cache_query_ctx_free_cb(ctx); break; default: break; } } static void cache_query_meta_on_fail(enum e_future_error err, const char * what, void * user) { struct promise * p = (struct promise *) user; struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p); promise_failed(p, err, what); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_META_QUERYING])); cache_query_ctx_free_cb(ctx); return; } enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct future* f_revalidate) { struct request_freshness req_fresshness={0,0}; enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; int is_undefined_obj=0; enum cache_pending_action get_action=tfe_cache_get_pending(request, &(req_fresshness)); switch(get_action) { case UNDEFINED: if(!handle->query_undefined_obj_enabled) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); result=PENDING_RESULT_FOBIDDEN; } else { is_undefined_obj=1; ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); result=PENDING_RESULT_ALLOWED; } break; case FORBIDDEN: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); result=PENDING_RESULT_FOBIDDEN; break; case ALLOWED: result=PENDING_RESULT_ALLOWED; break; default: result=PENDING_RESULT_REVALIDATE; break; } if(result!=PENDING_RESULT_REVALIDATE) { return result; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; meta.get=req_fresshness; struct promise* p=future_to_promise(f_revalidate); struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); ctx->status=PENDING_RESULT_FOBIDDEN; ctx->ref_handle=handle; ctx->req_fresshness=req_fresshness; ctx->url=tfe_strdup(request->req_spec.url); ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_META_QUERYING])); ctx->f_tango_cache_fetch=future_create("_cache_meta", cache_query_meta_on_succ, cache_query_meta_on_fail, p); int ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); assert(ret==0); return PENDING_RESULT_REVALIDATE; } int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct future* f) { struct request_freshness req_fresshness; enum cache_pending_action get_action; struct cache_query_context* query_ctx=NULL; struct promise* p=NULL; struct future* _f=NULL; get_action=tfe_cache_get_pending(request, &req_fresshness); assert(get_action!=FORBIDDEN); if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max))) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_ABANDON])); return -1; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; memcpy(&(meta.get), &req_fresshness, sizeof(meta.get)); query_ctx=ALLOC(struct cache_query_context, 1); query_ctx->ref_handle=handle; query_ctx->url=tfe_strdup(request->req_spec.url); p=future_to_promise(f); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING])); query_ctx->f_tango_cache_fetch=future_create("_cache_get", cache_query_obj_on_succ, cache_query_obj_on_fail, p); int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); return 0; } struct wrap_cache_put_ctx { char* url; time_t start; struct future* f; struct cache_handle* ref_handle; }; void wrap_cache_put_ctx_free(struct wrap_cache_put_ctx* ctx) { FREE(&(ctx->url)); future_destroy(ctx->f); free(ctx); } static void wrap_cache_update_on_succ(future_result_t * result, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s elapse: %d", ctx->url, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } static void wrap_cache_update_on_fail(enum e_future_error err, const char * what, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s elapse: %d", ctx->url, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_session * session) { struct cache_update_context* update_ctx=NULL; struct response_freshness resp_freshness; enum cache_pending_action put_action; struct tango_cache_ctx *write_ctx=NULL; char cont_len_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; const char* value=NULL; char *tmp=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; if(session->resp->resp_spec.content_length) { sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); } put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: case REVALIDATE: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); return NULL; case ALLOWED: break; case UNDEFINED: if(handle->cache_undefined_obj_enabled && content_lencache_undefined_obj_min_size) { TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); return NULL; } is_undefined_obj=1; break; default: assert(0); break; } if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_UPLOADING])) > handle->get_concurrency_max) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON])); return NULL; } ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=session->req->req_spec.url; i=0; snprintf(cont_len_str, sizeof(cont_len_str), "content-type:%s",session->resp->resp_spec.content_type); meta.std_hdr[i]=cont_len_str; i++; const char* etag=tfe_http_std_field_read(session->resp, TFE_HTTP_ETAG); const char* last_modified=tfe_http_std_field_read(session->resp, TFE_HTTP_LAST_MODIFIED); tmp=user_tag_str; if(etag) tmp+=snprintf(tmp, sizeof(user_tag_str)-(tmp-user_tag_str), "etag:%s\r\n", etag); if(last_modified) tmp+=snprintf(tmp, sizeof(user_tag_str)-(tmp-user_tag_str), "Last-modified:%s\r\n", last_modified); if(strlen(user_tag_str)>0) { meta.usertag=user_tag_str; meta.usertag_len=strlen(user_tag_str)+1; } memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness)); struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); _cache_put_ctx->url=tfe_strdup(session->req->req_spec.url); _cache_put_ctx->start=time(NULL); _cache_put_ctx->ref_handle=handle; _cache_put_ctx->f=future_create("cache_put", wrap_cache_update_on_succ, wrap_cache_update_on_fail, _cache_put_ctx); TFE_LOG_DEBUG(handle->logger, "cache update allowed: %s", _cache_put_ctx->url); write_ctx=tango_cache_update_start(handle->clients[thread_id], _cache_put_ctx->f, &meta); if(write_ctx==NULL)//exceed maximum cache memory size. { wrap_cache_put_ctx_free(_cache_put_ctx); return NULL; } if(is_undefined_obj) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_OVERRIDE])); FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); } FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); update_ctx=ALLOC(struct cache_update_context, 1); update_ctx->write_ctx=write_ctx; update_ctx->ref_cache_handle=handle; return update_ctx; } void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size) { tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size); ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_BYTES]), frag_size); return; } void web_cache_update_end(struct cache_update_context* ctx) { tango_cache_update_end(ctx->write_ctx); free(ctx); ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); return; }