#include "pangu_web_cache.h" #include #include #include #include #include #include #include #include #include enum cache_stat_field { STAT_CACHE_QUERY, STAT_CACHE_QUERY_NOT_APPLICABLE, STAT_CACHE_QUERY_HIT, STAT_CACHE_QUERY_BYTES, STAT_CACHE_OVERRIDE_QUERY, STAT_CACHE_OVERRIDE_HIT, STAT_CACHE_OVERRIDE_BYTES, STAT_CACHE_QUERY_ERR, STAT_CACHE_UPLOAD_CNT, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_ABANDON, STAT_CACHE_UPLOAD_ERR, STAT_CACHE_UPLOAD_BYTES, STAT_CACHE_MEMORY, STAT_CACHE_ACTIVE_SESSION, STAT_CACHE_QUERY_HIT_OJB_SIZE, STAT_CACHE_UPLOAD_OBJ_SIZE, STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE, STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE, __CACHE_STAT_MAX }; struct cache_handle { unsigned int thread_count; int cache_undefined_obj_enabled; int query_undefined_obj_enabled; size_t cache_undefined_obj_min_size; int minimum_cache_seconds; struct tango_cache_instance **clients; screen_stat_handle_t fs_handle; long long stat_val[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX]; struct event_base* gc_evbase; struct event* gcev; void* logger; }; struct cache_update_context { struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; }; static void cache_gc_cb(evutil_socket_t fd, short what, void * arg) { struct cache_handle* cache=(struct cache_handle *)arg; struct cache_statistics client_stat_sum, client_stat; memset(&client_stat_sum, 0, sizeof(client_stat_sum)); long long *val_sum = (long long *)&client_stat_sum; long long *val = NULL; int i=0, j=0; for(i=0; ithread_count;i++) { tango_cache_get_statistics(cache->clients[i], &client_stat); val=(long long*)&client_stat; for(j=0; jstat_val[i]))!=0) { switch(i) { case STAT_CACHE_UPLOAD_BYTES: case STAT_CACHE_QUERY_BYTES: case STAT_CACHE_OVERRIDE_BYTES: //translate bytes to mega bytes. FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024)); break; default: FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))); break; } } } FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY], 0, FS_OP_SET, client_stat_sum.get_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ABANDON], 0, FS_OP_SET, client_stat_sum.totaldrop_num); FS_passive_output(cache->fs_handle); return; } struct cache_stat_sepc { const char* name; enum field_dsp_style_t style; enum field_calc_algo calc_type; }; static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum field_dsp_style_t style, enum field_calc_algo calc_type) { spec->name=name; spec->style=style; spec->calc_type=calc_type; return; } void cache_stat_init(struct cache_handle* cache) { const char* fieldstat_output="./cache.fieldstat"; const char* app_name="tango_cache"; const char* obj_size_bins_KB="10,100,1000,10000"; int value=0, i=0; screen_stat_handle_t fs_handle=NULL; fs_handle=FS_create_handle(); FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1); value=1; FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value)); value=0; FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1); FS_set_para(fs_handle, HISTOGRAM_GLOBAL_BINS, obj_size_bins_KB, strlen(obj_size_bins_KB)+1); cache->fs_handle=fs_handle; struct cache_stat_sepc spec[__CACHE_STAT_MAX]; set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_query",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "qry_not_allow",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "query_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_upload",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_upload",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "upload_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "upload_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "upload_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "uploaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hitted_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OBJ_SIZE], "cached_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], "or_cached(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); for(i=0;i<__CACHE_STAT_MAX;i++) { cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name); } // value=cache->fs_id[STAT_CACHE_QUERY_HIT]; // FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], cache->fs_id[STAT_CACHE_QUERY], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "cache_hit"); value=cache->fs_id[STAT_CACHE_OVERRIDE_HIT]; FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, cache->fs_id[STAT_CACHE_OVERRIDE_HIT], cache->fs_id[STAT_CACHE_OVERRIDE_QUERY], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "override_hit"); FS_start(cache->fs_handle); struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, cache_gc_cb, cache); evtimer_add(cache->gcev, &gc_delay); return; } struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger) { struct cache_handle* cache=ALLOC(struct cache_handle, 1); int temp=0; cache->logger=logger; cache->thread_count=tfe_proxy_get_work_thread_count(); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); int i=0; for(i=0; ithread_count; i++) { cache->clients[i]=tango_cache_instance_new(tfe_proxy_get_work_thread_evbase(i), profile_path, section, logger); if(cache->clients[i]==NULL) { goto error_out; } } MESA_load_profile_int_def(profile_path, section, "query_undefined_obj", &(cache->query_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); cache->cache_undefined_obj_min_size=temp; MESA_load_profile_int_def(profile_path, section, "cache_minimum_time_override", &(cache->minimum_cache_seconds), 60*5); cache->gc_evbase=gc_evbase; cache_stat_init(cache); return cache; error_out: free(cache); return NULL; } static char* read_http1_hdr(const char* hdr, const char* field_name) { const char *p=NULL, *q=NULL; char* value=NULL; p=strcasestr(hdr, field_name); if(p==NULL) { return NULL; } p=strstr(p, ":"); if(p==NULL) { return NULL; } p++; q=strcasestr(p, "\r\n"); if(q==NULL) { return NULL; } value=(char*) calloc(sizeof(char), (q-p+1)); memcpy(value, p, q-p); return value; } void cache_query_free_meta(struct cached_meta* meta) { FREE(&meta->content_type); FREE(&meta); return; } enum cache_query_result_type cache_query_result_get_type(future_result_t * result) { struct tango_cache_result* cache_result=tango_cache_read_result(result); enum cache_query_result_type map[__CACHE_QUERY_RESULT_MAX]; map[RESULT_TYPE_BODY]=CACHE_QUERY_RESULT_DATA; map[RESULT_TYPE_HEADER]=CACHE_QUERY_RESULT_META; map[RESULT_TYPE_USERTAG]=CACHE_QUERY_RESULT_IRRELEVANT; map[RESULT_TYPE_END]=CACHE_QUERY_RESULT_END; map[RESULT_TYPE_MISS]=CACHE_QUERY_RESULT_MISS; return map[cache_result->type]; } struct cached_meta* cache_query_result_get_header(future_result_t * result) { struct tango_cache_result* cache_result=tango_cache_read_result(result); struct cached_meta* meta; if(cache_result->type!=RESULT_TYPE_HEADER) { return NULL; } meta= ALLOC(struct cached_meta, 1); meta->content_length=cache_result->tlength; meta->content_type=read_http1_hdr((const char*)cache_result->data_frag, "content-type"); return meta; } size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data) { struct tango_cache_result* cache_result=tango_cache_read_result(result); assert(cache_result->type==RESULT_TYPE_BODY); *pp_data=(const unsigned char*)cache_result->data_frag; return cache_result->size; } struct cache_query_context { struct cache_handle* ref_handle; char* url; int is_undefined_obj; struct future* f_tango_cache_fetch; }; void cache_query_ctx_free_cb(void* p) { struct cache_query_context* ctx=(struct cache_query_context*)p; future_destroy(ctx->f_tango_cache_fetch); ctx->f_tango_cache_fetch=NULL; FREE(&(ctx->url)); free(ctx); } static void wrap_cache_query_on_succ(future_result_t * result, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); enum cache_query_result_type type=cache_query_result_get_type(result); switch(_result->type) { case RESULT_TYPE_HEADER: if(ctx->is_undefined_obj) { ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_OVERRIDE_HIT])); FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], 0, FS_OP_SET, _result->tlength/1024); } ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT])); FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_QUERY_HIT_OJB_SIZE], 0, FS_OP_SET, _result->tlength/1024); TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url); break; case RESULT_TYPE_END: case RESULT_TYPE_MISS: //last call. TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url); promise_dettach_ctx(p); cache_query_ctx_free_cb(ctx); break; case RESULT_TYPE_BODY: ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size); break; default: break; } promise_success(p, result); return; } static void wrap_cache_query_on_fail(enum e_future_error err, const char * what, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); promise_failed(p, err, what); cache_query_ctx_free_cb(ctx); return; } enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct future* f) { struct request_freshness req_fresshness; enum cache_pending_action get_action; struct cache_query_context* query_ctx=NULL; struct promise* p=NULL; struct future* _f=NULL; int ret=0, is_undefined_obj=0; get_action=tfe_cache_get_pending(request, &req_fresshness); switch(get_action) { case UNDEFINED: if(!handle->query_undefined_obj_enabled) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); return WEB_CACHE_NOT_APPLICABLE; } is_undefined_obj=1; ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); break; case FORBIDDEN: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); return WEB_CACHE_NOT_APPLICABLE; case VERIFY: case ALLOWED: break; default: assert(0); return WEB_CACHE_NOT_APPLICABLE; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; memcpy(&(meta.get), &req_fresshness, sizeof(meta.get)); query_ctx=ALLOC(struct cache_query_context, 1); query_ctx->ref_handle=handle; query_ctx->url=tfe_strdup(request->req_spec.url); query_ctx->is_undefined_obj=is_undefined_obj; p=future_to_promise(f); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); query_ctx->f_tango_cache_fetch=future_create("_cache_get", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p); ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); assert(ret==0); return WEB_CACHE_QUERING; } struct wrap_cache_put_ctx { char* url; time_t start; struct future* f; struct cache_handle* ref_handle; }; void wrap_cache_put_ctx_free(struct wrap_cache_put_ctx* ctx) { FREE(&(ctx->url)); future_destroy(ctx->f); free(ctx); } static void wrap_cache_update_on_succ(future_result_t * result, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s elapse: %d", ctx->url, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } static void wrap_cache_update_on_fail(enum e_future_error err, const char * what, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s elapse: %d", ctx->url, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_session * session) { struct cache_update_context* update_ctx=NULL; struct response_freshness resp_freshness; enum cache_pending_action put_action; struct tango_cache_ctx *write_ctx=NULL; char buffer[TFE_STRING_MAX]; const char* value=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; if(!session->resp->resp_spec.content_length) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); return NULL; } sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: case VERIFY: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); return NULL; case ALLOWED: break; case UNDEFINED: if(handle->cache_undefined_obj_enabled && content_lencache_undefined_obj_min_size) { TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); return NULL; } is_undefined_obj=1; break; default: assert(0); break; } struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=session->req->req_spec.url; i=0; snprintf(buffer, sizeof(buffer), "content-type:%s",session->resp->resp_spec.content_type); meta.std_hdr[i]=buffer; i++; memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness)); struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); _cache_put_ctx->url=tfe_strdup(session->req->req_spec.url); _cache_put_ctx->start=time(NULL); _cache_put_ctx->ref_handle=handle; _cache_put_ctx->f=future_create("cache_put", wrap_cache_update_on_succ, wrap_cache_update_on_fail, _cache_put_ctx); TFE_LOG_DEBUG(handle->logger, "cache update allowed: %s", _cache_put_ctx->url); write_ctx=tango_cache_update_start(handle->clients[thread_id], _cache_put_ctx->f, &meta); if(write_ctx==NULL)//exceed maximum cache memory size. { wrap_cache_put_ctx_free(_cache_put_ctx); return NULL; } if(is_undefined_obj) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_OVERRIDE])); FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); } FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); update_ctx=ALLOC(struct cache_update_context, 1); update_ctx->write_ctx=write_ctx; update_ctx->ref_cache_handle=handle; return update_ctx; } void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size) { tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size); ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_BYTES]), frag_size); return; } void web_cache_update_end(struct cache_update_context* ctx) { tango_cache_update_end(ctx->write_ctx); free(ctx); return; }