#include "pangu_web_cache.h" #include #include #include #include #include #include #include #include #include extern "C" { #include } #include #include enum cache_stat_field { STAT_CACHE_QUERY, STAT_CACHE_QUERY_FORBIDDEN, STAT_CACHE_QUERY_VERIFY, STAT_CACHE_QUERY_HIT, STAT_CACHE_QUERY_BYTES, STAT_CACHE_OVERRIDE_QUERY, STAT_CACHE_OVERRIDE_HIT, STAT_CACHE_OVERRIDE_BYTES, STAT_CACHE_QUERY_ERR, STAT_CACHE_QUERY_ABANDON, STAT_CACHE_QUERYING, STAT_CACHE_PENDING, STAT_CACHE_UPLOAD_CNT, STAT_CACHE_UPLOAD_BYPASS, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_ABANDON, STAT_CACHE_UPLOAD_ERR, STAT_CACHE_UPLOAD_BYTES, STAT_CACHE_UPLOADING, STAT_CACHE_MEMORY, STAT_CACHE_ACTIVE_SESSION, STAT_CACHE_QUERY_HIT_OJB_SIZE, STAT_CACHE_UPLOAD_OBJ_SIZE, STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE, STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE, __CACHE_STAT_MAX }; struct cache_key_descr { int is_not_empty; size_t qs_num; char** ignore_qs; char* include_cookie; }; struct cache_param { int ref_cnt; struct cache_key_descr key_descr; char no_revalidate; char cache_dyn_url; char cache_html; char cache_cookied_cont; char ignore_req_nocache; char ignore_res_nocache; char force_caching; int min_use; time_t pinning_time_sec; time_t inactive_time_sec; long max_cache_size; long max_cache_obj_size; pthread_mutex_t lock; }; struct cache_bloom { int thread_id; size_t size; double error_rate; char filename[TFE_PATH_MAX]; counting_bloom_t *bloom; void * ref_logger; }; struct cache_handle { unsigned int thread_count; int cache_undefined_obj_enabled; int query_undefined_obj_enabled; size_t cache_undefined_obj_min_size; int minimum_cache_seconds; struct tango_cache_instance **clients; long long get_concurrency_max; long long put_concurrency_max; screen_stat_handle_t fs_handle; long long stat_val[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX]; struct event_base* gc_evbase; struct event* gcev; int cache_policy_enabled; //otherwise use default cache policy struct cache_param default_cache_policy; Maat_feather_t ref_feather; int cache_param_idx; int table_url_constraint; int table_cookie_constraint; int cache_key_bloom_life; size_t cache_key_bloom_size; struct cache_bloom *cache_key_bloom; void* logger; }; struct cache_update_context { struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; }; static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg) { struct cache_handle* cache=(struct cache_handle *)arg; struct cache_statistics client_stat_sum, client_stat; memset(&client_stat_sum, 0, sizeof(client_stat_sum)); long long *val_sum = (long long *)&client_stat_sum; long long *val = NULL; int i=0, j=0; for(i=0; ithread_count;i++) { tango_cache_get_statistics(cache->clients[i], &client_stat); val=(long long*)&client_stat; for(j=0; jstat_val[i]))!=0) { switch(i) { case STAT_CACHE_UPLOAD_BYTES: case STAT_CACHE_QUERY_BYTES: case STAT_CACHE_OVERRIDE_BYTES: //translate bytes to mega bytes. FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024)); break; default: FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))); break; } } } FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY], 0, FS_OP_SET, client_stat_sum.get_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ABANDON], 0, FS_OP_SET, client_stat_sum.totaldrop_num); FS_passive_output(cache->fs_handle); return; } struct cache_stat_sepc { const char* name; enum field_dsp_style_t style; enum field_calc_algo calc_type; }; static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum field_dsp_style_t style, enum field_calc_algo calc_type) { spec->name=name; spec->style=style; spec->calc_type=calc_type; return; } void cache_stat_init(struct cache_handle* cache) { const char* fieldstat_output="./cache.fieldstat"; const char* app_name="tango_cache"; const char* obj_size_bins_KB="10,100,1000,10000"; int value=0, i=0; screen_stat_handle_t fs_handle=NULL; fs_handle=FS_create_handle(); FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1); value=1; FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value)); value=0; FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1); FS_set_para(fs_handle, HISTOGRAM_GLOBAL_BINS, obj_size_bins_KB, strlen(obj_size_bins_KB)+1); cache->fs_handle=fs_handle; struct cache_stat_sepc spec[__CACHE_STAT_MAX]; set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_get",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_FORBIDDEN], "get_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_VERIFY], "get_verify",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "get_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_ABANDON], "get_abandon",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_PENDING], "pending",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYPASS], "cache_bypass",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "put_err",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "put(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOADING], "putting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hitted_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OBJ_SIZE], "cached_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], "or_cached(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); for(i=0;i<__CACHE_STAT_MAX;i++) { cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name); } // value=cache->fs_id[STAT_CACHE_QUERY_HIT]; // FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], cache->fs_id[STAT_CACHE_QUERY], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "cache_hit"); value=cache->fs_id[STAT_CACHE_OVERRIDE_HIT]; FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, cache->fs_id[STAT_CACHE_OVERRIDE_HIT], cache->fs_id[STAT_CACHE_OVERRIDE_QUERY], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "override_hit"); FS_start(cache->fs_handle); struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache); evtimer_add(cache->gcev, &gc_delay); return; } time_t time_unit_sec(const char* str) { time_t value=0; sscanf(str, "%ld", &value); switch(str[strlen(str)-1]) { case 's': break; case 'm': value*=60; break; case 'h': value*=3600; break; case 'd': value*=((size_t)24*3600); break; default: break; } return value; } size_t storage_unit_byte(const char* str) { size_t value=0; sscanf(str, "%ld", &value); switch(str[strlen(str)-1]) { case 'b': break; case 'k': value*=1024; break; case 'm': value*=((size_t)1024*1024); break; case 'g': value*=((size_t)1024*1024*1024); break; case 't': if(value<1024) { #pragma GCC diagnostic ignored "-Woverflow" value*=((size_t)1024*1024*1024*1024); } else //maximum 1PB { value=(size_t)1024*(1024*1024*1024*1024); } break; default: break; } return value; } //A URL is considered dynamic if it ends in “.asp(x)” or contains a question mark (?), a semicolon (;), or “cgi”. char is_dynamic_url(const char* url) { if(strchr(url, '?') || strchr(url, ';') || strstr(url, "cgi") || 0==strcmp(url+strlen(url)-3,"asp") || 0==strcmp(url+strlen(url)-4, "aspx")) { return 1; } return 0; } char * cookie_scanvalue(const char * key, const char * cookies, char * val, size_t val_len) { int i=0, j=0, k=0, key_len=0; int found=1; char* key_dup=ALLOC(char, strlen(key)+2); char* cookie_dup=ALLOC(char, strlen(cookies)+1); strcat(key_dup, key); if(key_dup[strlen(key)]!='=') { strcat(key_dup, "="); } for(i=0; i 0) { val[0] = '\0'; } FREE(&key_dup); FREE(&cookie_dup); return val; } char* url_remove_qs(const char* url, int qs_num, char* ignore_qs[]) { char* url_copy=tfe_strdup(url); size_t target_size= strlen(url_copy)+1; char* target_url=ALLOC(char, target_size); int i=0, shall_ignore=0; char *token=NULL,*sub_token=NULL,*saveptr; char* query_string=NULL; query_string=strchr(url_copy, '?'); if(query_string!=NULL) { strncat(target_url, url_copy, MIN(query_string-url_copy,target_size)); query_string++; for (token = query_string; ; token= NULL) { sub_token= strtok_r(token,"&", &saveptr); if (sub_token == NULL) break; shall_ignore=0; for(i=0; iis_not_empty) { return NULL; } char* url_no_qs=NULL; const char* cookie=NULL; char cookie_val[256]={0}; //most 256 bytes for cookie key size_t key_size=strlen(request->req_spec.url)+sizeof(cookie_val); char* cache_key=ALLOC(char, key_size); if(desc->qs_num>0) { url_no_qs=url_remove_qs(request->req_spec.url, desc->qs_num, desc->ignore_qs); strncat(cache_key, url_no_qs, key_size); FREE(&url_no_qs); } else { strncat(cache_key, request->req_spec.url, key_size); } if(desc->include_cookie && (cookie=tfe_http_std_field_read(request, TFE_HTTP_COOKIE))!=NULL) { cookie_scanvalue(desc->include_cookie, cookie, cookie_val, sizeof(cookie_val)); if(strlen(cookie_val)>0) { strncat(cache_key, "/C/", key_size); strncat(cache_key, cookie_val, key_size); } } return cache_key; } void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp) { struct cache_handle* cache=(struct cache_handle*) argp; int i=0; size_t len=0; *ad=NULL; if(rule->serv_def_lenlogger, "invalid cache parameter: id = %d", rule->config_id); return; } struct cache_param* param=ALLOC(struct cache_param, 1); *param=cache->default_cache_policy; param->ref_cnt=1; pthread_mutex_init(&(param->lock), NULL); key_desc=cJSON_GetObjectItem(json,"cache_key"); if(key_desc && key_desc->type==cJSON_Object) { param->key_descr.is_not_empty=1; qs=cJSON_GetObjectItem(key_desc,"ignore_qs"); if(qs && qs->type==cJSON_Array) { param->key_descr.qs_num=cJSON_GetArraySize(qs); param->key_descr.ignore_qs=ALLOC(char*, param->key_descr.qs_num); for(i=0; ikey_descr.qs_num; i++) { item=cJSON_GetArrayItem(qs, i); len=strlen(item->valuestring)+2; param->key_descr.ignore_qs[i]=ALLOC(char, len); strncat(param->key_descr.ignore_qs[i], item->valuestring, len); strncat(param->key_descr.ignore_qs[i], "=", len); } } item=cJSON_GetObjectItem(key_desc,"cookie"); if(item && item->type==cJSON_String) param->key_descr.include_cookie=tfe_strdup(item->valuestring); } item=cJSON_GetObjectItem(json,"no_revalidate"); if(item && item->type==cJSON_Number) param->no_revalidate=item->valueint; item=cJSON_GetObjectItem(json,"cache_dyn_url"); if(item && item->type==cJSON_Number) param->cache_dyn_url=item->valueint; item=cJSON_GetObjectItem(json,"cache_cookied_cont"); if(item && item->type==cJSON_Number) param->cache_cookied_cont=item->valueint; item=cJSON_GetObjectItem(json,"ignore_req_nocache"); if(item && item->type==cJSON_Number) param->ignore_req_nocache=item->valueint; item=cJSON_GetObjectItem(json,"ignore_res_nocache"); if(item && item->type==cJSON_Number) param->ignore_res_nocache=item->valueint; item=cJSON_GetObjectItem(json,"force_caching"); if(item && item->type==cJSON_Number) param->force_caching=item->valueint; item=cJSON_GetObjectItem(json,"min_use"); if(item && item->type==cJSON_Number) param->min_use=item->valueint; item=cJSON_GetObjectItem(json,"pinning_time"); if(item && item->type==cJSON_String) param->pinning_time_sec=time_unit_sec(item->valuestring); item=cJSON_GetObjectItem(json,"inactive_time"); if(item && item->type==cJSON_String) param->inactive_time_sec=time_unit_sec(item->valuestring); item=cJSON_GetObjectItem(json,"max_cache_size"); if(item && item->type==cJSON_String) param->max_cache_size=storage_unit_byte(item->valuestring); item=cJSON_GetObjectItem(json,"max_cache_obj_size"); if(item && item->type==cJSON_String) param->max_cache_obj_size=storage_unit_byte(item->valuestring); cJSON_Delete(json); *ad=param; return; } void cache_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp) { int i=0; if(*ad==NULL) { return; } struct cache_param* param=(struct cache_param*)*ad; pthread_mutex_lock(&(param->lock)); param->ref_cnt--; if(param->ref_cnt>0) { pthread_mutex_unlock(&(param->lock)); return; } pthread_mutex_unlock(&(param->lock)); pthread_mutex_destroy(&(param->lock)); for(i=0; ikey_descr.qs_num; i++) { FREE(&(param->key_descr.ignore_qs[i])); } FREE(&(param->key_descr.ignore_qs)); FREE(&(param->key_descr.include_cookie)); FREE(&(param)); return; } void cache_param_dup(int idx, MAAT_RULE_EX_DATA *to, MAAT_RULE_EX_DATA *from, long argl, void *argp) { struct cache_param* from_param=*((struct cache_param**)from); pthread_mutex_lock(&(from_param->lock)); from_param->ref_cnt++; pthread_mutex_unlock(&(from_param->lock)); *((struct cache_param**)to)=from_param; return; } static void cache_key_bloom_gc_cb(evutil_socket_t fd, short what, void * arg) { struct cache_bloom* p_bloom= (struct cache_bloom*) arg; counting_bloom_t* new_bloom=NULL; new_bloom=new_counting_bloom(p_bloom->size, p_bloom->error_rate, p_bloom->filename); free_counting_bloom(p_bloom->bloom); p_bloom->bloom=new_bloom; TFE_LOG_DEBUG(p_bloom->ref_logger, "Bloom filter %d:%s resets.", p_bloom->thread_id, p_bloom->filename); return; } struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, Maat_feather_t feather, void *logger) { struct cache_handle* cache=ALLOC(struct cache_handle, 1); int temp=0; struct event* ev=NULL; cache->logger=logger; cache->thread_count=tfe_proxy_get_work_thread_count(); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); cache->cache_key_bloom=ALLOC(struct cache_bloom, cache->thread_count); struct cache_bloom* p_bloom=NULL; MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled", &(cache->cache_policy_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size", (int*)&(cache->cache_key_bloom_size), 16*1000*1000); MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life", &(cache->cache_key_bloom_life), 30*60); char bloom_filename[TFE_PATH_MAX]{0}; struct timeval gc_refresh_delay = {cache->cache_key_bloom_life, 0}; int i=0; struct tango_cache_parameter *cache_client_param=tango_cache_parameter_new(profile_path, section, logger); for(i=0; ithread_count; i++) { if(cache->cache_policy_enabled) { p_bloom=cache->cache_key_bloom+i; p_bloom->thread_id=i; p_bloom->size=cache->cache_key_bloom_size; p_bloom->error_rate=0.01; p_bloom->ref_logger=logger; snprintf(p_bloom->filename, sizeof(p_bloom->filename), "/tmp/pangu_cache_blooms.%02d", i); p_bloom->bloom=new_counting_bloom(p_bloom->size, p_bloom->error_rate, p_bloom->filename); if(p_bloom->bloom==NULL) { goto error_out; } ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, p_bloom); evtimer_add(ev, &gc_refresh_delay); } cache->clients[i]=tango_cache_instance_new(cache_client_param,tfe_proxy_get_work_thread_evbase(i), logger); if(cache->clients[i]==NULL) { goto error_out; } } MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000); cache->get_concurrency_max=temp; MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000); cache->put_concurrency_max=temp; MESA_load_profile_int_def(profile_path, section, "query_undefined_obj", &(cache->query_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); cache->cache_undefined_obj_min_size=temp; cache->gc_evbase=gc_evbase; cache->default_cache_policy.key_descr.qs_num=0; cache->default_cache_policy.no_revalidate=0; cache->default_cache_policy.cache_dyn_url=0; cache->default_cache_policy.cache_cookied_cont=0; cache->default_cache_policy.ignore_req_nocache=0; cache->default_cache_policy.ignore_res_nocache=0; cache->default_cache_policy.force_caching=0; cache->default_cache_policy.min_use=0; cache->default_cache_policy.pinning_time_sec=0; cache->default_cache_policy.inactive_time_sec=0; cache->default_cache_policy.max_cache_size=0; cache->default_cache_policy.max_cache_obj_size=1024*1024*1024;//<1GB if(cache->cache_policy_enabled) { cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", cache_param_new, cache_param_free, cache_param_dup, 0, cache); cache->ref_feather=feather; TFE_LOG_INFO(logger, "Cache Policy Enabled."); } cache_stat_init(cache); return cache; error_out: free(cache); return NULL; } static char* read_http1_hdr(const char* hdr, const char* field_name) { const char *p=NULL, *q=NULL; char* value=NULL; p=strcasestr(hdr, field_name); if(p==NULL) { return NULL; } p=strstr(p, ":"); if(p==NULL) { return NULL; } p++; q=strcasestr(p, "\r\n"); if(q==NULL) { return NULL; } value=(char*) calloc(sizeof(char), (q-p+1)); memcpy(value, p, q-p); return value; } struct cache_query_context { struct cache_handle* ref_handle; char* url; struct cached_meta meta; struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; }; enum cache_query_result_type cache_query_result_get_type(future_result_t * result) { struct cache_query_context* ctx=(struct cache_query_context*)result; struct tango_cache_result* cache_result=ctx->ref_tango_cache_result; enum cache_query_result_type map[__CACHE_QUERY_RESULT_MAX]; map[RESULT_TYPE_BODY]=CACHE_QUERY_RESULT_DATA; map[RESULT_TYPE_HEADER]=CACHE_QUERY_RESULT_META; map[RESULT_TYPE_USERTAG]=CACHE_QUERY_RESULT_IRRELEVANT; map[RESULT_TYPE_END]=CACHE_QUERY_RESULT_END; map[RESULT_TYPE_MISS]=CACHE_QUERY_RESULT_MISS; return map[cache_result->type]; } size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data) { struct cache_query_context* ctx=(struct cache_query_context*)result; struct tango_cache_result* cache_result=ctx->ref_tango_cache_result; assert(cache_result->type==RESULT_TYPE_BODY); *pp_data=(const unsigned char*)cache_result->data_frag; return cache_result->size; } void cache_query_ctx_free_cb(void* p) { struct cache_query_context* ctx=(struct cache_query_context*)p; future_destroy(ctx->f_tango_cache_fetch); ctx->f_tango_cache_fetch=NULL; FREE(&(ctx->url)); FREE(&(ctx->meta.etag)); FREE(&(ctx->meta.last_modified)); FREE(&(ctx->meta.content_type)); free(ctx); } const struct cached_meta* cache_query_result_read_meta(future_result_t * result) { struct cache_query_context* ctx=(struct cache_query_context*)result; return &(ctx->meta); } void cached_meta_set(struct cached_meta* meta, enum CACHE_RESULT_TYPE type, const char* data_frag, size_t size) { switch(type) { case RESULT_TYPE_HEADER: meta->content_type=read_http1_hdr((const char*)data_frag, "content-type"); break; case RESULT_TYPE_USERTAG: meta->last_modified=read_http1_hdr(data_frag, "Last-Modified"); if(meta->last_modified!=NULL && 0==strcasecmp(meta->last_modified, "Thu, 01 Jan 1970 00:00:00 GMT")) { FREE(&(meta->last_modified)); } meta->etag=read_http1_hdr(data_frag, "etag"); break; default: assert(0); break; } return; } static void cache_query_obj_on_succ(future_result_t * result, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_get_ctx(p); int last_call=0; ctx->ref_tango_cache_result=tango_cache_read_result(result); switch(ctx->ref_tango_cache_result->type) { case RESULT_TYPE_HEADER: ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT])); FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_QUERY_HIT_OJB_SIZE], 0, FS_OP_SET, ctx->ref_tango_cache_result->tlength/1024); cached_meta_set(&ctx->meta, RESULT_TYPE_HEADER, ctx->ref_tango_cache_result->data_frag, ctx->ref_tango_cache_result->size); ctx->meta.content_length=ctx->ref_tango_cache_result->tlength; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url); break; case RESULT_TYPE_USERTAG: cached_meta_set(&ctx->meta, RESULT_TYPE_USERTAG, ctx->ref_tango_cache_result->data_frag, ctx->ref_tango_cache_result->size); break; case RESULT_TYPE_MISS: TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url); //NOT break intentionally. case RESULT_TYPE_END: //last call. ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING])); promise_dettach_ctx(p); last_call=1; break; case RESULT_TYPE_BODY: ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), ctx->ref_tango_cache_result->size); break; default: break; } promise_success(p, ctx); if(last_call) cache_query_ctx_free_cb(ctx); return; } static void cache_query_obj_on_fail(enum e_future_error err, const char * what, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); promise_failed(p, err, what); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERYING])); cache_query_ctx_free_cb(ctx); return; } struct cache_pending_context { enum cache_pending_result status; int is_undefined_obj; char* req_if_none_match, *req_if_modified_since; const struct tfe_http_half * request; char* url; struct cached_meta cached_obj_meta; struct cache_handle* ref_handle; struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; }; void cache_pending_ctx_free_cb(void* p) { struct cache_pending_context* ctx=(struct cache_pending_context*)p; ctx->request=NULL; FREE(&(ctx->url)); FREE(&(ctx->req_if_modified_since)); FREE(&(ctx->req_if_none_match)); if(ctx->f_tango_cache_fetch) { future_destroy(ctx->f_tango_cache_fetch); } FREE(&(ctx->cached_obj_meta.etag)); FREE(&(ctx->cached_obj_meta.last_modified)); FREE(&(ctx->cached_obj_meta.content_type)); free(ctx); return; } const struct cached_meta* cache_pending_result_read_meta(future_result_t * result) { struct cache_pending_context* ctx=(struct cache_pending_context*)result; if(ctx->status==PENDING_RESULT_MISS) { return NULL; } else { return &(ctx->cached_obj_meta); } } static void cache_query_meta_on_succ(future_result_t * result, void * user) { struct promise * p = (struct promise *) user; struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); ctx->ref_tango_cache_result=_result; switch(_result->type) { case RESULT_TYPE_HEADER: ctx->cached_obj_meta.content_length=_result->tlength; cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); ctx->status=PENDING_RESULT_REVALIDATE; break; case RESULT_TYPE_USERTAG: cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_USERTAG, _result->data_frag, _result->size); TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache meta query hit: %s %s %s" , ctx->url , ctx->cached_obj_meta.last_modified ? ctx->cached_obj_meta.last_modified:"no_last_modify" , ctx->cached_obj_meta.etag ? ctx->cached_obj_meta.etag:"no_etag"); break; case RESULT_TYPE_MISS: ctx->status=PENDING_RESULT_MISS; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache meta query miss: %s", ctx->url); //NOT break intentionally. case RESULT_TYPE_END: //last call. ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); promise_dettach_ctx(p); promise_success(p, ctx); cache_pending_ctx_free_cb(ctx); break; default: break; } } static void cache_query_meta_on_fail(enum e_future_error err, const char * what, void * user) { struct promise * p = (struct promise *) user; struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p); promise_failed(p, err, what); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); cache_pending_ctx_free_cb(ctx); return; } struct cache_mid { enum cache_pending_result result; struct request_freshness req_fresshness; char shall_bypass; char is_using_exception_param; char is_dyn_url; char is_html; char has_cookie; char use_cnt; int cfg_id; char* cache_key; struct cache_param* param; }; void cache_mid_clear(struct cache_mid **mid) { if(*mid==NULL) { return; } if((*mid)->is_using_exception_param) { cache_param_free(0, NULL, NULL, (void**)&((*mid)->param), 0, NULL); } FREE(&((*mid)->cache_key)); FREE(mid); return; } #define CACHE_ACTION_BYPASS 0x80 enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate) { enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; int is_undefined_obj=0; struct Maat_rule_t cache_policy; struct cache_param* param=&(handle->default_cache_policy); MAAT_RULE_EX_DATA ex_data=NULL; scan_status_t scan_mid=NULL; int ret=0; const char* cookie=NULL; struct cache_mid* _mid=ALLOC(struct cache_mid, 1); *mid=_mid; cookie=tfe_http_std_field_read(request, TFE_HTTP_COOKIE); if(cookie) { _mid->has_cookie=1; } _mid->is_dyn_url=is_dynamic_url(request->req_spec.url); if(handle->cache_policy_enabled) { ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, request->req_spec.url, strlen(request->req_spec.url), &cache_policy, NULL, 1, &scan_mid, thread_id); if(cookie && ret<=0) { ret=Maat_full_scan_string(handle->ref_feather, handle->table_cookie_constraint, CHARSET_UTF8, cookie, strlen(cookie), &cache_policy, NULL, 1, &scan_mid, thread_id); } Maat_clean_status(&scan_mid); if(ret>0) { ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); if(ex_data!=NULL) { param=(struct cache_param*)ex_data; _mid->is_using_exception_param=1; _mid->param=param; } if((unsigned char)cache_policy.action==CACHE_ACTION_BYPASS) { _mid->shall_bypass=1; } _mid->cfg_id=cache_policy.config_id; if(param->key_descr.is_not_empty) { _mid->cache_key=get_cache_key(request, &(param->key_descr)); } TFE_LOG_DEBUG(handle->logger, "cache policy %d matched: url=%s alt-key=%s", cache_policy.config_id, request->req_spec.url, _mid->cache_key!=NULL?_mid->cache_key:"null"); } if(_mid->shall_bypass || (!param->force_caching && !param->cache_dyn_url && _mid->is_dyn_url && param->key_descr.qs_num==0) || (!param->force_caching && param->cache_cookied_cont && _mid->has_cookie)) { _mid->result=PENDING_RESULT_FOBIDDEN; return _mid->result; } } enum cache_pending_action get_action=tfe_cache_get_pending(request, &(_mid->req_fresshness)); switch(get_action) { case UNDEFINED: if(!handle->query_undefined_obj_enabled) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_FORBIDDEN])); result=PENDING_RESULT_FOBIDDEN; } else { is_undefined_obj=1; ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); result=PENDING_RESULT_ALLOWED; } break; case FORBIDDEN: if(param->ignore_req_nocache || param->force_caching) { result=PENDING_RESULT_ALLOWED; } else { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_FORBIDDEN])); result=PENDING_RESULT_FOBIDDEN; } break; case ALLOWED: result=PENDING_RESULT_ALLOWED; break; default: if(param->no_revalidate) { result=PENDING_RESULT_ALLOWED; } else { result=PENDING_RESULT_REVALIDATE; } break; } if(result!=PENDING_RESULT_REVALIDATE) { _mid->result=result; return _mid->result; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=_mid->cache_key!=NULL?_mid->cache_key:request->req_spec.url; meta.get = _mid->req_fresshness; struct promise* p=future_to_promise(f_revalidate); struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); ctx->status=PENDING_RESULT_FOBIDDEN; ctx->ref_handle=handle; ctx->url=tfe_strdup(request->req_spec.url); ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING])); ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_query_meta_on_succ, cache_query_meta_on_fail, p); ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); if(ret<0) { cache_pending_ctx_free_cb(ctx); _mid->result=PENDING_RESULT_FOBIDDEN; return _mid->result; } _mid->result=PENDING_RESULT_REVALIDATE; return _mid->result; } int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct cache_mid** mid, struct future* f) { enum cache_pending_action get_action; struct cache_query_context* query_ctx=NULL; struct promise* p=NULL; struct future* _f=NULL; struct cache_mid* _mid=*mid; assert(_mid->result!=PENDING_RESULT_FOBIDDEN); if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max))) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_ABANDON])); return -1; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=_mid->cache_key?_mid->cache_key:request->req_spec.url; meta.get=_mid->req_fresshness; query_ctx=ALLOC(struct cache_query_context, 1); query_ctx->ref_handle=handle; query_ctx->url=tfe_strdup(request->req_spec.url); p=future_to_promise(f); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING])); query_ctx->f_tango_cache_fetch=future_create("_cache_get", cache_query_obj_on_succ, cache_query_obj_on_fail, p); int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); if(ret<0) { cache_query_ctx_free_cb(query_ctx); return -1; } return 0; } struct wrap_cache_put_ctx { char* url; time_t start; struct future* f; struct cache_handle* ref_handle; }; void wrap_cache_put_ctx_free(struct wrap_cache_put_ctx* ctx) { FREE(&(ctx->url)); future_destroy(ctx->f); free(ctx); } static void wrap_cache_update_on_succ(future_result_t * result, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload success: %s elapse: %d", ctx->url, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } static void wrap_cache_update_on_fail(enum e_future_error err, const char * what, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s %s lapse: %d", ctx->url, what, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_session * session, struct cache_mid **mid) { struct cache_update_context* update_ctx=NULL; struct response_freshness resp_freshness; enum cache_pending_action put_action; struct tango_cache_ctx *write_ctx=NULL; char cont_type_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; const char* content_type=NULL; char *tmp=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; const struct cache_param* param=NULL; struct cache_mid* _mid=*mid; if(_mid!=NULL && _mid->is_using_exception_param) { param=_mid->param; } else { param=&(handle->default_cache_policy); } if(session->resp->resp_spec.content_length) { sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); } content_type=tfe_http_std_field_read(session->resp, TFE_HTTP_CONT_TYPE); if(content_type!=NULL&& NULL!=strcasestr(content_type, "text/html")) { _mid->is_html=1; } put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: if(!(param->ignore_res_nocache || param->force_caching)) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); return NULL; } break; case REVALIDATE: case ALLOWED: case UNDEFINED: if(_mid->shall_bypass || content_len > param->max_cache_obj_size || (!param->cache_cookied_cont && _mid->has_cookie) || (!param->cache_html && _mid->is_html)) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_BYPASS])); return NULL; } break; default: assert(0); break; } if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_UPLOADING])) > handle->get_concurrency_max) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON])); return NULL; } const char* key=NULL; size_t key_len=0; if(param->min_use>0) { if(_mid->cache_key) { key=_mid->cache_key; key_len=strlen(_mid->cache_key); } else { key=session->req->req_spec.url; key_len=strlen(session->req->req_spec.url); } _mid->use_cnt=counting_bloom_check(handle->cache_key_bloom[thread_id].bloom, key, key_len); if(_mid->use_cntmin_use) { counting_bloom_add(handle->cache_key_bloom[thread_id].bloom, key, key_len); return NULL; } } ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=_mid->cache_key?_mid->cache_key:session->req->req_spec.url; i=0; snprintf(cont_type_str, sizeof(cont_type_str), "content-type:%s",session->resp->resp_spec.content_type); meta.std_hdr[i]=cont_type_str; i++; const char* etag=tfe_http_std_field_read(session->resp, TFE_HTTP_ETAG); const char* last_modified=tfe_http_std_field_read(session->resp, TFE_HTTP_LAST_MODIFIED); tmp=user_tag_str; if(etag) tmp+=snprintf(tmp, sizeof(user_tag_str)-(tmp-user_tag_str), "etag:%s\r\n", etag); if(last_modified) tmp+=snprintf(tmp, sizeof(user_tag_str)-(tmp-user_tag_str), "Last-modified:%s\r\n", last_modified); if(strlen(user_tag_str)>0) { meta.usertag=user_tag_str; meta.usertag_len=strlen(user_tag_str)+1; } meta.put=resp_freshness; meta.put.timeout=MAX(param->pinning_time_sec, resp_freshness.timeout); struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); _cache_put_ctx->url=tfe_strdup(session->req->req_spec.url); _cache_put_ctx->start=time(NULL); _cache_put_ctx->ref_handle=handle; _cache_put_ctx->f=future_create("cache_put", wrap_cache_update_on_succ, wrap_cache_update_on_fail, _cache_put_ctx); write_ctx=tango_cache_update_start(handle->clients[thread_id], _cache_put_ctx->f, &meta); if(write_ctx==NULL)//exceed maximum cache memory size. { wrap_cache_put_ctx_free(_cache_put_ctx); return NULL; } TFE_LOG_DEBUG(handle->logger, "cache upload allowed: %s", _cache_put_ctx->url); if(is_undefined_obj) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_OVERRIDE])); FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); } FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); update_ctx=ALLOC(struct cache_update_context, 1); update_ctx->write_ctx=write_ctx; update_ctx->ref_cache_handle=handle; return update_ctx; } void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size) { tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size); ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_BYTES]), frag_size); return; } void web_cache_update_end(struct cache_update_context* ctx) { tango_cache_update_end(ctx->write_ctx); ATOMIC_DEC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOADING])); ctx->write_ctx = NULL; ctx->ref_cache_handle = NULL; free(ctx); return; }