From d3e1ed38ee409c7d50adbf7eaab81435372fddc7 Mon Sep 17 00:00:00 2001 From: zhengchao Date: Sun, 11 Nov 2018 13:45:03 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E6=88=90=E6=94=AF=E6=8C=81tango=5Fcac?= =?UTF-8?q?he=5Fparameter=5Fnew=E7=9A=84cache=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/pangu/pangu_pxy.conf | 37 +- .../pangu-http/src/pangu_web_cache.cpp | 588 ++++++++++++++---- 2 files changed, 500 insertions(+), 125 deletions(-) diff --git a/conf/pangu/pangu_pxy.conf b/conf/pangu/pangu_pxy.conf index 48dab5a..82ad8ad 100644 --- a/conf/pangu/pangu_pxy.conf +++ b/conf/pangu/pangu_pxy.conf @@ -7,7 +7,7 @@ KAFKA_BROKERLIST=192.168.10.73:9092 [MAAT] # 0:json 1: redis 2: iris -MAAT_INPUT_MODE=1 +MAAT_INPUT_MODE=0 TABLE_INFO=./pangu_conf/table_info.conf JSON_CFG_FILE=./pangu_conf/pangu_ctrl.json STAT_FILE=./log/pangu_scan.status @@ -19,21 +19,36 @@ MAAT_REDIS_DB_INDEX=4 EFFECT_INTERVAL_S=1 [TANGO_CACHE] -#MINIO IP地址,目前只支持一个 +enable_cache=1 +#MINIO IP, As WiredLB required MINIO_IP_LIST=192.168.10.61-64; MINIO_LISTEN_PORT=9000 -#每个域名最多开启的链接数 -MAX_CONNECTION_PER_HOST=10 +#MAX_CONNECTION_PER_HOST=1 +MAX_CNNT_PIPELINE_NUM=20 +#MAX_CURL_SESSION_NUM=100 +MAX_CURL_TRANSFER_TIMEOUT_S=15 -#bucket的名称 CACHE_BUCKET_NAME=openbucket - -#缓存最大占用的内存空间大小,超出空间时上传失败 MAX_USED_MEMORY_SIZE_MB=5120 - -#上传时Expires头部的过期时间,单位秒,最小60(1分钟) CACHE_DEFAULT_TTL_SECOND=3600 - -#是否对对象的名称进行哈希,开启哈希有助于提高上传下载的速率 CACHE_OBJECT_KEY_HASH_SWITCH=1 + +#1-MINIO锛2-REDIS +CACHE_HEAD_FROM_SOURCE=2 +CACHE_HEAD_REDIS_KEY=MINIO_EVENTS_INFO +CACHE_HEAD_MAIN_REDIS_IP=192.168.10.63 +CACHE_HEAD_REDIS_IPLIST=192.168.10.62-63; +CACHE_HEAD_REDIS_PORT=6379 + +#WIRED LOAD BALANCER Configuration +#WIREDLB_OVERRIDE=1 +#WIREDLB_TOPIC= +#WIREDLB_DATACENTER= +WIREDLB_MINIO_HEALTH_PORT=52100 +#WIREDLB_MINIO_GROUP= +WIREDLB_REDIS_HEALTH_PORT=52101 +#WIREDLB_REDIS_GROUP= + +cache_undefined_obj=1 +query_undefined_obj=0 diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index 1468805..ca14945 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -9,11 +9,19 @@ #include #include -#include #include #include +extern "C" +{ +#include +} + +#include + +#include + enum cache_stat_field { STAT_CACHE_QUERY, @@ -29,6 +37,7 @@ enum cache_stat_field STAT_CACHE_QUERYING, STAT_CACHE_PENDING, STAT_CACHE_UPLOAD_CNT, + STAT_CACHE_UPLOAD_BYPASS, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_ABANDON, @@ -44,6 +53,41 @@ enum cache_stat_field STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE, __CACHE_STAT_MAX }; + +struct cache_key_descr +{ + int is_not_empty; + size_t qs_num; + char** ignore_qs; + char* include_cookie; +}; +struct cache_param +{ + int ref_cnt; + struct cache_key_descr key_descr; + + char no_revalidate; + char cache_dyn_url; + char cache_cookied_cont; + char ignore_req_nocache; + char ignore_res_nocache; + char force_caching; + + int min_use; + time_t pinning_time_sec; + time_t inactive_time_sec; + long max_cache_size; + long max_cache_obj_size; + pthread_mutex_t lock; +}; +struct cache_bloom +{ + int thread_id; + size_t size; + double error_rate; + char filename[TFE_PATH_MAX]; + counting_bloom_t *bloom; +}; struct cache_handle { unsigned int thread_count; @@ -62,11 +106,17 @@ struct cache_handle struct event_base* gc_evbase; struct event* gcev; + + int cache_policy_enabled; //otherwise use default cache policy struct cache_param default_cache_policy; Maat_feather_t ref_feather; int cache_param_idx; int table_url_constraint; int table_cookie_constraint; + + int cache_key_bloom_life; + size_t cache_key_bloom_size; + struct cache_bloom *cache_key_bloom; void* logger; }; struct cache_update_context @@ -74,7 +124,7 @@ struct cache_update_context struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; }; -static void cache_gc_cb(evutil_socket_t fd, short what, void * arg) +static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg) { struct cache_handle* cache=(struct cache_handle *)arg; struct cache_statistics client_stat_sum, client_stat; @@ -169,6 +219,7 @@ void cache_stat_init(struct cache_handle* cache) set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_PENDING], "pending",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYPASS], "cache_bypass",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); @@ -214,44 +265,184 @@ void cache_stat_init(struct cache_handle* cache) FS_start(cache->fs_handle); struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. - cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, cache_gc_cb, cache); + cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache); evtimer_add(cache->gcev, &gc_delay); return; } -struct cache_key_descr -{ - size_t qs_num; - char** ignore_qs; - char* include_hdrs; - char* include_cookie; -}; -struct cache_param -{ - struct cache_key_descr key_descr; - - char revalidate; - char cache_dyn_url; - char cache_cookied_cont; - char ignore_req_nocache; - char ignore_res_nocache; - char force_caching; - - int min_use; - int pinning_time_sec; - int inactive_time_sec; - int max_cache_size_mb; - int max_cache_obj_size_mb; -}; -int time_secs_interval(const char* str){} -int storage_mb_size(const char* str){} -int is_dynamic_url(const char* url){} +time_t time_unit_sec(const char* str) +{ + time_t value=0; + sscanf(str, "%ld", &value); + switch(str[strlen(str)-1]) + { + case 's': + break; + case 'm': + value*=60; + break; + case 'h': + value*=3600; + break; + case 'd': + value*=((size_t)24*3600); + break; + default: + break; + } + return value; +} +size_t storage_unit_byte(const char* str) +{ + size_t value=0; + sscanf(str, "%ld", &value); + switch(str[strlen(str)-1]) + { + case 'b': + break; + case 'k': + value*=1024; + break; + case 'm': + value*=((size_t)1024*1024); + break; + case 'g': + value*=((size_t)1024*1024*1024); + break; + case 't': + if(value<1024) + { +#pragma GCC diagnostic ignored "-Woverflow" + value*=((size_t)1024*1024*1024*1024); + + } + else //maximum 1PB + { + value=(size_t)1024*(1024*1024*1024*1024); + } + break; + default: + break; + } + return value; +} + + +//A URL is considered dynamic if it ends in 鈥.asp(x)鈥 or contains a question mark (?), a semicolon (;), or 鈥渃gi鈥. +char is_dynamic_url(const char* url) +{ + if(strchr(url, '?') + || strchr(url, ';') + || strstr(url, "cgi") + || 0==strcmp(url+strlen(url)-3,"asp") + || 0==strcmp(url+strlen(url)-4, "aspx")) + { + return 1; + } + return 0; +} +char * cookie_scanvalue(const char * key, const char * qs, char * val, size_t val_len) +{ + int i=0, key_len=0; + + key_len = strlen(key); + while(qs[0] != '\0') + { + if ( strncmp(key, qs, key_len) == 0 ) + break; + qs += strcspn(qs, ";") + 1; + } + + if ( qs[0] == '\0' ) return NULL; + + qs += strcspn(qs, "=;"); + if ( qs[0] == '=' ) + { + qs++; + i = strcspn(qs, "=;"); + strncpy(val, qs, (val_len-1)<(i+1) ? (val_len-1) : (i+1)); + } + else + { + if ( val_len > 0 ) + val[0] = '\0'; + } + + return val; +} + + +char* get_cache_key(const struct tfe_http_half * request, const struct cache_key_descr* desc) +{ + if(desc==NULL|| !desc->is_not_empty) + { + return NULL; + } + int i=0, shall_ignore=0; + + char *token=NULL,*sub_token=NULL,*saveptr; + char* url=tfe_strdup(request->req_spec.url); + const char* cookie=NULL; + char cookie_val[256]={0}; //most 256 bytes for cookie key + size_t key_size=strlen(url)+sizeof(cookie_val); + char* cache_key=ALLOC(char, key_size); + char* query_string=NULL; + + if(desc->qs_num>0) + { + query_string=strchr(url, '?'); + if(query_string!=NULL) + { + strncat(cache_key, url, MIN(query_string-url,key_size)); + query_string++; + for (token = query_string; ; token= NULL) + { + sub_token= strtok_r(token,"&", &saveptr); + if (sub_token == NULL) + break; + shall_ignore=0; + for(i=0; iqs_num; i++) + { + if(0==strncasecmp(sub_token, desc->ignore_qs[i], strlen(desc->ignore_qs[i]))) + { + shall_ignore=1; + break; + } + } + if(!shall_ignore) + { + strncat(cache_key, sub_token, key_size); + } + } + } + else + { + strncat(cache_key, url, key_size); + } + } + else + { + strncat(cache_key, url, key_size); + } + if(desc->include_cookie && (cookie=tfe_http_std_field_read(request, TFE_HTTP_COOKIE))!=NULL) + { + cookie_scanvalue(desc->include_cookie, cookie, cookie_val, sizeof(cookie_val)); + if(strlen(cookie_val)>0) + { + strncat(cache_key, cookie_val, key_size); + } + } + FREE(&(url)); + return cache_key; +} + void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp) { struct cache_handle* cache=(struct cache_handle*) argp; int i=0; + size_t len=0; *ad=NULL; if(rule->serv_def_lendefault_cache_policy; + param->ref_cnt=1; + pthread_mutex_init(&(param->lock), NULL); key_desc=cJSON_GetObjectItem(json,"cache_key"); if(key_desc && key_desc->type==cJSON_Object) { - qs=cJSON_GetObjectItem(json,"ignore_qs"); + param->key_descr.is_not_empty=1; + qs=cJSON_GetObjectItem(key_desc,"ignore_qs"); if(qs && qs->type==cJSON_Array) { param->key_descr.qs_num=cJSON_GetArraySize(qs); param->key_descr.ignore_qs=ALLOC(char*, param->key_descr.qs_num); for(i=0; ikey_descr.qs_num; i++) { - item=cJSON_GetArrayItem(item, i); - param->key_descr.ignore_qs[i]=tfe_strdup(qs->valuestring); + item=cJSON_GetArrayItem(qs, i); + len=strlen(item->valuestring)+2; + param->key_descr.ignore_qs[i]=ALLOC(char, len); + strncat(param->key_descr.ignore_qs[i], item->valuestring, len); + strncat(param->key_descr.ignore_qs[i], "=", len); } - } - } - item=cJSON_GetObjectItem(key_desc,"hdrs"); - if(item && item->type==cJSON_String) param->key_descr.include_hdrs=tfe_strdup(item->valuestring); + } + item=cJSON_GetObjectItem(key_desc,"cookie"); + if(item && item->type==cJSON_String) param->key_descr.include_cookie=tfe_strdup(param->key_descr.include_cookie); + + } - item=cJSON_GetObjectItem(key_desc,"cookie"); - if(item && item->type==cJSON_String) param->key_descr.include_cookie=tfe_strdup(item->valuestring); - item=cJSON_GetObjectItem(json,"revalidate"); - if(item && item->type==cJSON_Number) param->revalidate=item->valueint; + item=cJSON_GetObjectItem(json,"no_revalidate"); + if(item && item->type==cJSON_Number) param->no_revalidate=item->valueint; item=cJSON_GetObjectItem(json,"cache_dyn_url"); if(item && item->type==cJSON_Number) param->cache_dyn_url=item->valueint; @@ -310,18 +507,19 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de if(item && item->type==cJSON_Number) param->min_use=item->valueint; item=cJSON_GetObjectItem(json,"pinning_time"); - if(item && item->type==cJSON_String) param->pinning_time_sec=time_secs_interval(item->valuestring); + if(item && item->type==cJSON_String) param->pinning_time_sec=time_unit_sec(item->valuestring); item=cJSON_GetObjectItem(json,"inactive_time"); - if(item && item->type==cJSON_String) param->inactive_time_sec=time_secs_interval(item->valuestring); + if(item && item->type==cJSON_String) param->inactive_time_sec=time_unit_sec(item->valuestring); item=cJSON_GetObjectItem(json,"max_cache_size"); - if(item && item->type==cJSON_String) param->max_cache_size_mb=storage_mb_size(item->valuestring); + if(item && item->type==cJSON_String) param->max_cache_size=storage_unit_byte(item->valuestring); item=cJSON_GetObjectItem(json,"max_cache_obj_size"); - if(item && item->type==cJSON_String) param->max_cache_obj_size_mb=storage_mb_size(item->valuestring); + if(item && item->type==cJSON_String) param->max_cache_obj_size=storage_unit_byte(item->valuestring); cJSON_Delete(json); + *ad=param; return; } void cache_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp) @@ -332,29 +530,88 @@ void cache_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_d return; } struct cache_param* param=(struct cache_param*)*ad; + pthread_mutex_lock(&(param->lock)); + param->ref_cnt--; + if(param->ref_cnt>0) + { + pthread_mutex_unlock(&(param->lock)); + return; + } + pthread_mutex_unlock(&(param->lock)); + pthread_mutex_destroy(&(param->lock)); for(i=0; ikey_descr.qs_num; i++) { FREE(&(param->key_descr.ignore_qs[i])); } FREE(&(param->key_descr.ignore_qs)); FREE(&(param->key_descr.include_cookie)); - FREE(&(param->key_descr.include_hdrs)); FREE(&(param)); return; } +void cache_param_dup(int idx, MAAT_RULE_EX_DATA *to, MAAT_RULE_EX_DATA *from, long argl, void *argp) +{ + struct cache_param* from_param=*((struct cache_param**)from); + pthread_mutex_lock(&(from_param->lock)); + from_param->ref_cnt++; + pthread_mutex_unlock(&(from_param->lock)); + *((struct cache_param**)to)=from_param; + return; +} + +static void cache_key_bloom_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + struct cache_bloom* p_bloom= (struct cache_bloom*) arg; + counting_bloom_t* new_bloom=NULL; + + new_bloom=new_counting_bloom(p_bloom->size, p_bloom->error_rate, p_bloom->filename); + free_counting_bloom(p_bloom->bloom); + p_bloom->bloom=new_bloom; + return; +} struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, Maat_feather_t feather, void *logger) { struct cache_handle* cache=ALLOC(struct cache_handle, 1); int temp=0; + struct event* ev=NULL; cache->logger=logger; cache->thread_count=tfe_proxy_get_work_thread_count(); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); + cache->cache_key_bloom=ALLOC(struct cache_bloom, cache->thread_count); + struct cache_bloom* p_bloom=NULL; + MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled", + &(cache->cache_policy_enabled), 1); + + + MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size", + (int*)&(cache->cache_key_bloom_size), 16*1000*1000); + MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life", + &(cache->cache_key_bloom_life), 30*60); + char bloom_filename[TFE_PATH_MAX]{0}; + struct timeval gc_refresh_delay = {cache->cache_key_bloom_life, 0}; int i=0; + + struct tango_cache_parameter *cache_client_param=tango_cache_parameter_new(profile_path, section, logger); for(i=0; ithread_count; i++) { - cache->clients[i]=tango_cache_instance_new(tfe_proxy_get_work_thread_evbase(i), profile_path, section, logger); + if(cache->cache_policy_enabled) + { + p_bloom=cache->cache_key_bloom+i; + p_bloom->thread_id=i; + p_bloom->size=cache->cache_key_bloom_size; + p_bloom->error_rate=0.01; + snprintf(p_bloom->filename, sizeof(p_bloom->filename), "/tmp/pangu_cache_blooms.%d", i); + p_bloom->bloom=new_counting_bloom(p_bloom->size, p_bloom->error_rate, p_bloom->filename); + if(p_bloom->bloom==NULL) + { + goto error_out; + } + ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, p_bloom); + evtimer_add(ev, &gc_refresh_delay); + } + + cache->clients[i]=tango_cache_instance_new(cache_client_param,tfe_proxy_get_work_thread_evbase(i), logger); if(cache->clients[i]==NULL) { goto error_out; @@ -369,29 +626,32 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); cache->cache_undefined_obj_min_size=temp; - MESA_load_profile_int_def(profile_path, section, "cache_minimum_time_override", &(cache->minimum_cache_seconds), 60*5); cache->gc_evbase=gc_evbase; cache->default_cache_policy.key_descr.qs_num=0; - cache->default_cache_policy.revalidate=1; + cache->default_cache_policy.no_revalidate=0; cache->default_cache_policy.cache_dyn_url=0; cache->default_cache_policy.cache_cookied_cont=0; cache->default_cache_policy.ignore_req_nocache=0; cache->default_cache_policy.ignore_res_nocache=0; cache->default_cache_policy.force_caching=0; - cache->default_cache_policy.min_use=1; + cache->default_cache_policy.min_use=0; cache->default_cache_policy.pinning_time_sec=0; cache->default_cache_policy.inactive_time_sec=0; - cache->default_cache_policy.max_cache_size_mb=0; - cache->default_cache_policy.max_cache_obj_size_mb=1024;//<1GB by default + cache->default_cache_policy.max_cache_size=0; + cache->default_cache_policy.max_cache_obj_size=1024*1024*1024;//<1GB by default - cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); - cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); - - cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", - cache_param_new, cache_param_free, 0, cache); - + if(cache->cache_policy_enabled) + { + cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); + cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); + + cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", + cache_param_new, cache_param_free, cache_param_dup, + 0, cache); + cache->ref_feather=feather; + } cache_stat_init(cache); return cache; error_out: @@ -545,7 +805,6 @@ struct cache_pending_context { enum cache_pending_result status; int is_undefined_obj; - struct request_freshness req_fresshness; char* req_if_none_match, *req_if_modified_since; const struct tfe_http_half * request; @@ -566,7 +825,10 @@ void cache_pending_ctx_free_cb(void* p) if(ctx->f_tango_cache_fetch) { future_destroy(ctx->f_tango_cache_fetch); - } + } + FREE(&(ctx->cached_obj_meta.etag)); + FREE(&(ctx->cached_obj_meta.last_modified)); + FREE(&(ctx->cached_obj_meta.content_type)); free(ctx); return; } @@ -631,50 +893,91 @@ static void cache_query_meta_on_fail(enum e_future_error err, const char * what, } struct cache_mid { + enum cache_pending_result result; + struct request_freshness req_fresshness; + char shall_bypass; + char is_using_exception_param; + char is_dyn_url; + char has_cookie; + char use_cnt; + int cfg_id; + char* cache_key; struct cache_param* param; }; +void cache_mid_clear(struct cache_mid **mid) +{ + if(*mid==NULL) + { + return; + } + if((*mid)->is_using_exception_param) + { + cache_param_free(0, NULL, NULL, (void**)&((*mid)->param), 0, NULL); + } + FREE(&((*mid)->cache_key)); + FREE(mid); + return; +} + +#define CACHE_ACTION_BYPASS 0x80 enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate) { - struct request_freshness req_fresshness={0,0}; enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; int is_undefined_obj=0; struct Maat_rule_t cache_policy; - const struct cache_param* param=&(handle->default_cache_policy); + struct cache_param* param=&(handle->default_cache_policy); MAAT_RULE_EX_DATA ex_data=NULL; scan_status_t scan_mid=NULL; int ret=0; const char* cookie=NULL; - ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, - request->req_spec.url, strlen(request->req_spec.url), - &cache_policy, NULL, 1, &scan_mid, thread_id); - + struct cache_mid* _mid=ALLOC(struct cache_mid, 1); + *mid=_mid; cookie=tfe_http_std_field_read(request, TFE_HTTP_COOKIE); - if(cookie && ret<=0) + if(cookie) { - ret=Maat_full_scan_string(handle->ref_feather, handle->table_cookie_constraint, CHARSET_UTF8, - cookie, strlen(cookie), + _mid->has_cookie=1; + } + _mid->is_dyn_url=is_dynamic_url(request->req_spec.url); + if(handle->cache_policy_enabled) + { + ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, + request->req_spec.url, strlen(request->req_spec.url), &cache_policy, NULL, 1, &scan_mid, thread_id); - } - Maat_clean_status(&scan_mid); + - if(ret>0) - { - ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); - if(ex_data!=NULL) + if(cookie && ret<=0) { - param=(const struct cache_param*)ex_data; + ret=Maat_full_scan_string(handle->ref_feather, handle->table_cookie_constraint, CHARSET_UTF8, + cookie, strlen(cookie), + &cache_policy, NULL, 1, &scan_mid, thread_id); + } + Maat_clean_status(&scan_mid); + + if(ret>0) + { + ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); + if(ex_data!=NULL) + { + param=(struct cache_param*)ex_data; + _mid->is_using_exception_param=1; + _mid->param=param; + } + if(cache_policy.action==CACHE_ACTION_BYPASS) + { + _mid->shall_bypass=1; + } + _mid->cfg_id=cache_policy.config_id; + } + if(_mid->shall_bypass || + (!param->cache_dyn_url && _mid->is_dyn_url && param->key_descr.qs_num==0) || + (param->cache_cookied_cont && _mid->has_cookie) ) + { + _mid->result=PENDING_RESULT_FOBIDDEN; + return _mid->result; } } - if(param->cache_dyn_url==0 && is_dynamic_url(request->req_spec.url)) - { - return FORBIDDEN; - } - if(param->cache_cookied_cont==0 && cookie!=NULL) - { - return FORBIDDEN; - } - enum cache_pending_action get_action=tfe_cache_get_pending(request, &(req_fresshness)); + enum cache_pending_action get_action=tfe_cache_get_pending(request, &(_mid->req_fresshness)); switch(get_action) { case UNDEFINED: @@ -705,51 +1008,68 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u result=PENDING_RESULT_ALLOWED; break; default: - result=PENDING_RESULT_REVALIDATE; + if(param->no_revalidate) + { + result=PENDING_RESULT_ALLOWED; + } + else + { + result=PENDING_RESULT_REVALIDATE; + } break; } if(result!=PENDING_RESULT_REVALIDATE) { - return result; + _mid->result=result; + return _mid->result; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); - meta.url=request->req_spec.url; - meta.get=req_fresshness; + if(param->key_descr.is_not_empty) + { + _mid->cache_key=get_cache_key(request, &(param->key_descr)); + meta.url = _mid->cache_key; + } + else + { + meta.url = request->req_spec.url; + } + meta.get = _mid->req_fresshness; struct promise* p=future_to_promise(f_revalidate); struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); ctx->status=PENDING_RESULT_FOBIDDEN; ctx->ref_handle=handle; - ctx->req_fresshness=req_fresshness; ctx->url=tfe_strdup(request->req_spec.url); + ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING])); ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_query_meta_on_succ, cache_query_meta_on_fail, p); - int ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); + ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); if(ret<0) { cache_pending_ctx_free_cb(ctx); - return PENDING_RESULT_FOBIDDEN; + _mid->result=PENDING_RESULT_FOBIDDEN; + return _mid->result; } - assert(ret==0); - return PENDING_RESULT_REVALIDATE; + _mid->result=PENDING_RESULT_REVALIDATE; + + return _mid->result; } int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_half * request, struct future* f) + const struct tfe_http_half * request, struct cache_mid** mid, struct future* f) { struct request_freshness req_fresshness; enum cache_pending_action get_action; struct cache_query_context* query_ctx=NULL; struct promise* p=NULL; struct future* _f=NULL; - - get_action=tfe_cache_get_pending(request, &req_fresshness); - assert(get_action!=FORBIDDEN); + struct cache_mid* _mid=*mid; + assert(_mid->result!=PENDING_RESULT_FOBIDDEN); if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max))) { @@ -760,6 +1080,7 @@ int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; + meta.get=_mid->req_fresshness; memcpy(&(meta.get), &req_fresshness, sizeof(meta.get)); query_ctx=ALLOC(struct cache_query_context, 1); query_ctx->ref_handle=handle; @@ -800,22 +1121,33 @@ static void wrap_cache_update_on_succ(future_result_t * result, void * user) static void wrap_cache_update_on_fail(enum e_future_error err, const char * what, void * user) { struct wrap_cache_put_ctx* ctx=(struct wrap_cache_put_ctx*)user; - TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s elapse: %d", ctx->url, time(NULL)-ctx->start); + TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache upload failed: %s %s lapse: %d", ctx->url, what, time(NULL)-ctx->start); wrap_cache_put_ctx_free(ctx); } struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_session * session) + const struct tfe_http_session * session, struct cache_mid **mid) { struct cache_update_context* update_ctx=NULL; struct response_freshness resp_freshness; enum cache_pending_action put_action; struct tango_cache_ctx *write_ctx=NULL; - char cont_len_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; + char cont_type_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; const char* value=NULL; char *tmp=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; + const struct cache_param* param=NULL; + struct cache_mid* _mid=*mid; + + if(_mid!=NULL && _mid->is_using_exception_param) + { + param=_mid->param; + } + else + { + param=&(handle->default_cache_policy); + } if(session->resp->resp_spec.content_length) { sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); @@ -824,19 +1156,24 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); - TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); - return NULL; - case REVALIDATE: - case ALLOWED: - break; - case UNDEFINED: - if(handle->cache_undefined_obj_enabled && content_lencache_undefined_obj_min_size) + if(!(param->ignore_res_nocache || param->force_caching)) { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); + return NULL; + } + break; + case REVALIDATE: + case ALLOWED: + case UNDEFINED: + if(_mid->shall_bypass + || content_len > param->max_cache_obj_size + || (!param->cache_cookied_cont && _mid->has_cookie) ) + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_BYPASS])); + TFE_LOG_DEBUG(handle->logger, "cache update bypass: %d : %s", _mid->cfg_id, session->req->req_spec.url); return NULL; } - is_undefined_obj=1; break; default: assert(0); @@ -847,15 +1184,37 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON])); return NULL; } + const char* key=NULL; + size_t key_len=0; + if(param->min_use>0) + { + if(_mid->cache_key) + { + key=_mid->cache_key; + key_len=strlen(_mid->cache_key); + } + else + { + key=session->req->req_spec.url; + key_len=strlen(session->req->req_spec.url); + } + _mid->use_cnt=counting_bloom_check(handle->cache_key_bloom[thread_id].bloom, key, key_len); + + if(_mid->use_cntmin_use) + { + counting_bloom_add(handle->cache_key_bloom[thread_id].bloom, key, key_len); + return NULL; + } + } ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=session->req->req_spec.url; - i=0; + i=0; - snprintf(cont_len_str, sizeof(cont_len_str), "content-type:%s",session->resp->resp_spec.content_type); - meta.std_hdr[i]=cont_len_str; + snprintf(cont_type_str, sizeof(cont_type_str), "content-type:%s",session->resp->resp_spec.content_type); + meta.std_hdr[i]=cont_type_str; i++; const char* etag=tfe_http_std_field_read(session->resp, TFE_HTTP_ETAG); const char* last_modified=tfe_http_std_field_read(session->resp, TFE_HTTP_LAST_MODIFIED); @@ -868,7 +1227,8 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, meta.usertag_len=strlen(user_tag_str)+1; } meta.put=resp_freshness; - + meta.put.timeout=MAX(param->pinning_time_sec, resp_freshness.timeout); + struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); _cache_put_ctx->url=tfe_strdup(session->req->req_spec.url); _cache_put_ctx->start=time(NULL);