diff --git a/conf/pangu/table_info.conf b/conf/pangu/table_info.conf index 5c196c6..7c5a34d 100644 --- a/conf/pangu/table_info.conf +++ b/conf/pangu/table_info.conf @@ -20,9 +20,9 @@ 2 PXY_CTRL_IP ip --- 3 PXY_CTRL_HTTP_URL expr UTF8 GBK/UNICODE/UTF8/url_encode_gb2312/url_encode_utf8 yes 0 quickoff 4 PXY_CTRL_HTTP_REQ_HDR expr_plus UTF8 UTF8 yes 0 quickoff -5 PXY_CTRL_HTTP_REQ_BODY expr UTF8 GBK/UNICODE/UTF8 yes 128 quickoff +5 PXY_CTRL_HTTP_REQ_BODY expr UTF8 GBK/BIG5/UNICODE/UTF8 yes 128 quickoff 6 PXY_CTRL_HTTP_RES_HDR expr_plus UTF8 UTF8 UTF8 yes 0 quickoff -7 PXY_CTRL_HTTP_RES_BODY expr UTF8 GBK/UNICODE/UTF8 yes 128 quickoff +7 PXY_CTRL_HTTP_RES_BODY expr UTF8 GBK/BIG5/UNICODE/UTF8 yes 128 quickoff 8 PXY_CACHE_COMPILE compile escape -- 9 PXY_CACHE_GROUP group -- 10 PXY_CACHE_HTTP_URL expr UTF8 UTF8 yes 0 quickoff diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index 1468805..1e01395 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -9,11 +9,19 @@ #include #include -#include #include #include +extern "C" +{ +#include +} + +#include + +#include + enum cache_stat_field { STAT_CACHE_QUERY, @@ -29,6 +37,7 @@ enum cache_stat_field STAT_CACHE_QUERYING, STAT_CACHE_PENDING, STAT_CACHE_UPLOAD_CNT, + STAT_CACHE_UPLOAD_BYPASS, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, STAT_CACHE_UPLOAD_ABANDON, @@ -44,6 +53,33 @@ enum cache_stat_field STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE, __CACHE_STAT_MAX }; + +struct cache_key_descr +{ + int is_not_empty; + size_t qs_num; + char** ignore_qs; + char* include_cookie; +}; +struct cache_param +{ + int ref_cnt; + struct cache_key_descr cache_key; + + char no_revalidate; + char cache_dyn_url; + char cache_cookied_cont; + char ignore_req_nocache; + char ignore_res_nocache; + char force_caching; + + int min_use; + time_t pinning_time_sec; + time_t inactive_time_sec; + long max_cache_size; + long max_cache_obj_size; + pthread_mutex_t lock; +}; struct cache_handle { unsigned int thread_count; @@ -62,11 +98,16 @@ struct cache_handle struct event_base* gc_evbase; struct event* gcev; + + int cache_policy_enabled; //otherwise use default cache policy struct cache_param default_cache_policy; Maat_feather_t ref_feather; int cache_param_idx; int table_url_constraint; int table_cookie_constraint; + size_t cache_key_bloom_size; + int cache_key_bloom_life; + counting_bloom_t **cache_key_bloom; void* logger; }; struct cache_update_context @@ -74,7 +115,7 @@ struct cache_update_context struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; }; -static void cache_gc_cb(evutil_socket_t fd, short what, void * arg) +static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg) { struct cache_handle* cache=(struct cache_handle *)arg; struct cache_statistics client_stat_sum, client_stat; @@ -169,6 +210,7 @@ void cache_stat_init(struct cache_handle* cache) set_stat_spec(&spec[STAT_CACHE_QUERYING], "getting",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_PENDING], "pending",FS_STYLE_STATUS, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_put",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYPASS], "cache_bypass",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_put",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "put_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "put_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); @@ -214,44 +256,170 @@ void cache_stat_init(struct cache_handle* cache) FS_start(cache->fs_handle); struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. - cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, cache_gc_cb, cache); + cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache); evtimer_add(cache->gcev, &gc_delay); return; } -struct cache_key_descr -{ - size_t qs_num; - char** ignore_qs; - char* include_hdrs; - char* include_cookie; -}; -struct cache_param -{ - struct cache_key_descr key_descr; - - char revalidate; - char cache_dyn_url; - char cache_cookied_cont; - char ignore_req_nocache; - char ignore_res_nocache; - char force_caching; - - int min_use; - int pinning_time_sec; - int inactive_time_sec; - int max_cache_size_mb; - int max_cache_obj_size_mb; -}; -int time_secs_interval(const char* str){} -int storage_mb_size(const char* str){} -int is_dynamic_url(const char* url){} +time_t time_unit_sec(const char* str) +{ + time_t value=0; + sscanf(str, "%ld", &value); + switch(str[strlen(str)-1]) + { + case 's': + break; + case 'm': + value*=60; + break; + case 'h': + value*=3600; + break; + case 'd': + value*=24*3600; + break; + default: + break; + } + return value; +} +size_t storage_unit_byte(const char* str) +{ + size_t value=0; + sscanf(str, "%ld", &value); + switch(str[strlen(str)-1]) + { + case 'b': + break; + case 'k': + value*=1024; + break; + case 'm': + value*=1024*1024; + break; + case 'g': + value*=1024*1024*1024; + break; + case 't': + if(value<1024) + { +#pragma GCC diagnostic ignored "-Woverflow" + value*=1024*1024*1024*1024; + + } + break; + default: + break; + } + return value; +} + + +//A URL is considered dynamic if it ends in “.asp(x)” or contains a question mark (?), a semicolon (;), or “cgi”. +char is_dynamic_url(const char* url) +{ + if(strchr(url, '?') + || strchr(url, ';') + || strstr(url, "cgi") + || 0==strcmp(url+strlen(url)-3,"asp") + || 0==strcmp(url+strlen(url)-4, "aspx")) + { + return 1; + } + return 0; +} +char * cookie_scanvalue(const char * key, const char * qs, char * val, size_t val_len) +{ + int i=0, key_len=0; + + key_len = strlen(key); + while(qs[0] != '\0') + { + if ( strncmp(key, qs, key_len) == 0 ) + break; + qs += strcspn(qs, ";") + 1; + } + + if ( qs[0] == '\0' ) return NULL; + + qs += strcspn(qs, "=;"); + if ( qs[0] == '=' ) + { + qs++; + i = strcspn(qs, "=;"); + strncpy(val, qs, (val_len-1)<(i+1) ? (val_len-1) : (i+1)); + } + else + { + if ( val_len > 0 ) + val[0] = '\0'; + } + + return val; +} + + +char* get_cache_key(const struct tfe_http_half * request, const struct cache_key_descr* desc) +{ + if(desc==NULL|| !desc->is_not_empty) + { + return NULL; + } + int i=0, shall_ignore=0; + + char *token=NULL,*sub_token=NULL,*saveptr; + char* url=tfe_strdup(request->req_spec.url); + const char* cookie=NULL; + char cookie_val[256]={0}; //most 256 bytes for cookie key + size_t key_size=strlen(url)+sizeof(cookie_val); + char* cache_key=ALLOC(char, key_size); + char* query_string=strchr(url, '?'); + if(query_string!=NULL && desc->qs_num>0) + { + query_string++; + for (token = url; ; token= NULL) + { + sub_token= strtok_r(token,"&", &saveptr); + if (sub_token == NULL) + break; + shall_ignore=0; + for(i=0; iqs_num; i++) + { + if(0==strncasecmp(sub_token, desc->ignore_qs[i], strlen(desc->ignore_qs[i]))) + { + shall_ignore=1; + break; + } + } + if(!shall_ignore) + { + strncat(cache_key, sub_token, key_size); + } + } + } + else + { + strncat(cache_key, url, key_size); + } + if(desc->include_cookie && (cookie=tfe_http_std_field_read(request, TFE_HTTP_COOKIE))!=NULL) + { + cookie_scanvalue(desc->include_cookie, cookie, cookie_val, sizeof(cookie_val)); + if(strlen(cookie_val)>0) + { + strncat(cache_key, cookie_val, key_size); + } + } + FREE(&(url)); + +} + void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp) { struct cache_handle* cache=(struct cache_handle*) argp; int i=0; + size_t len=0; *ad=NULL; if(rule->serv_def_lendefault_cache_policy; + param->ref_cnt=1; + pthread_mutex_init(&(param->lock), NULL); key_desc=cJSON_GetObjectItem(json,"cache_key"); if(key_desc && key_desc->type==cJSON_Object) { + param->cache_key.is_not_empty=1; qs=cJSON_GetObjectItem(json,"ignore_qs"); if(qs && qs->type==cJSON_Array) { - param->key_descr.qs_num=cJSON_GetArraySize(qs); - param->key_descr.ignore_qs=ALLOC(char*, param->key_descr.qs_num); - for(i=0; ikey_descr.qs_num; i++) + param->cache_key.qs_num=cJSON_GetArraySize(qs); + param->cache_key.ignore_qs=ALLOC(char*, param->cache_key.qs_num); + for(i=0; icache_key.qs_num; i++) { item=cJSON_GetArrayItem(item, i); - param->key_descr.ignore_qs[i]=tfe_strdup(qs->valuestring); + len=strlen(qs->valuestring)+2; + param->cache_key.ignore_qs[i]=ALLOC(char, len); + strncat(param->cache_key.ignore_qs[i], qs->valuestring, len); + strncat(param->cache_key.ignore_qs[i], "=", len); } - } - } - item=cJSON_GetObjectItem(key_desc,"hdrs"); - if(item && item->type==cJSON_String) param->key_descr.include_hdrs=tfe_strdup(item->valuestring); + } + item=cJSON_GetObjectItem(key_desc,"cookie"); + if(item && item->type==cJSON_String) param->cache_key.include_cookie=tfe_strdup(param->cache_key.include_cookie); + + } - item=cJSON_GetObjectItem(key_desc,"cookie"); - if(item && item->type==cJSON_String) param->key_descr.include_cookie=tfe_strdup(item->valuestring); - item=cJSON_GetObjectItem(json,"revalidate"); - if(item && item->type==cJSON_Number) param->revalidate=item->valueint; + item=cJSON_GetObjectItem(json,"no_revalidate"); + if(item && item->type==cJSON_Number) param->no_revalidate=item->valueint; item=cJSON_GetObjectItem(json,"cache_dyn_url"); if(item && item->type==cJSON_Number) param->cache_dyn_url=item->valueint; @@ -310,16 +484,16 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de if(item && item->type==cJSON_Number) param->min_use=item->valueint; item=cJSON_GetObjectItem(json,"pinning_time"); - if(item && item->type==cJSON_String) param->pinning_time_sec=time_secs_interval(item->valuestring); + if(item && item->type==cJSON_String) param->pinning_time_sec=time_unit_sec(item->valuestring); item=cJSON_GetObjectItem(json,"inactive_time"); - if(item && item->type==cJSON_String) param->inactive_time_sec=time_secs_interval(item->valuestring); + if(item && item->type==cJSON_String) param->inactive_time_sec=time_unit_sec(item->valuestring); item=cJSON_GetObjectItem(json,"max_cache_size"); - if(item && item->type==cJSON_String) param->max_cache_size_mb=storage_mb_size(item->valuestring); + if(item && item->type==cJSON_String) param->max_cache_size=storage_unit_byte(item->valuestring); item=cJSON_GetObjectItem(json,"max_cache_obj_size"); - if(item && item->type==cJSON_String) param->max_cache_obj_size_mb=storage_mb_size(item->valuestring); + if(item && item->type==cJSON_String) param->max_cache_obj_size=storage_unit_byte(item->valuestring); cJSON_Delete(json); return; @@ -332,28 +506,73 @@ void cache_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_d return; } struct cache_param* param=(struct cache_param*)*ad; - for(i=0; ikey_descr.qs_num; i++) - { - FREE(&(param->key_descr.ignore_qs[i])); + pthread_mutex_lock(&(param->lock)); + param->ref_cnt--; + if(param->ref_cnt>0) + { + pthread_mutex_unlock(&(param->lock)); + return; } - FREE(&(param->key_descr.ignore_qs)); - FREE(&(param->key_descr.include_cookie)); - FREE(&(param->key_descr.include_hdrs)); + pthread_mutex_unlock(&(param->lock)); + pthread_mutex_destroy(&(param->lock)); + for(i=0; icache_key.qs_num; i++) + { + FREE(&(param->cache_key.ignore_qs[i])); + } + FREE(&(param->cache_key.ignore_qs)); + FREE(&(param->cache_key.include_cookie)); FREE(&(param)); return; } +void cache_param_dup(int idx, MAAT_RULE_EX_DATA *to, MAAT_RULE_EX_DATA *from, long argl, void *argp) +{ + struct cache_param* from_param=*((struct cache_param**)from); + pthread_mutex_lock(&(from_param->lock)); + from_param->ref_cnt++; + pthread_mutex_unlock(&(from_param->lock)); + *((struct cache_param**)to)=from_param; + return; +} + +static void cache_key_bloom_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + counting_bloom_t* old_bloom=*((counting_bloom_t**)arg), *new_bloom=NULL; + + new_bloom=new_counting_bloom(old_bloom->capacity, old_bloom->error_rate, NULL); + free_counting_bloom(old_bloom); + *((counting_bloom_t**)arg)=old_bloom; + return; +} struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, Maat_feather_t feather, void *logger) { struct cache_handle* cache=ALLOC(struct cache_handle, 1); int temp=0; + struct event* ev=NULL; cache->logger=logger; cache->thread_count=tfe_proxy_get_work_thread_count(); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); + cache->cache_key_bloom=ALLOC(counting_bloom_t*, cache->thread_count); + MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled", + &(cache->cache_policy_enabled), 1); + + + MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size", + (int*)&(cache->cache_key_bloom_size), 16*1000*1000); + MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life", + &(cache->cache_key_bloom_life), 30*60); + struct timeval gc_refresh_delay = {cache->cache_key_bloom_life, 0}; int i=0; for(i=0; ithread_count; i++) { + if(cache->cache_policy_enabled) + { + cache->cache_key_bloom[i]=new_counting_bloom(cache->cache_key_bloom_size, 0.01, NULL); + ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, &(cache->cache_key_bloom[i])); + evtimer_add(ev, &gc_refresh_delay); + } + cache->clients[i]=tango_cache_instance_new(tfe_proxy_get_work_thread_evbase(i), profile_path, section, logger); if(cache->clients[i]==NULL) { @@ -369,29 +588,32 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); cache->cache_undefined_obj_min_size=temp; - MESA_load_profile_int_def(profile_path, section, "cache_minimum_time_override", &(cache->minimum_cache_seconds), 60*5); cache->gc_evbase=gc_evbase; - cache->default_cache_policy.key_descr.qs_num=0; - cache->default_cache_policy.revalidate=1; + cache->default_cache_policy.cache_key.qs_num=0; + cache->default_cache_policy.no_revalidate=0; cache->default_cache_policy.cache_dyn_url=0; cache->default_cache_policy.cache_cookied_cont=0; cache->default_cache_policy.ignore_req_nocache=0; cache->default_cache_policy.ignore_res_nocache=0; cache->default_cache_policy.force_caching=0; - cache->default_cache_policy.min_use=1; + cache->default_cache_policy.min_use=0; cache->default_cache_policy.pinning_time_sec=0; cache->default_cache_policy.inactive_time_sec=0; - cache->default_cache_policy.max_cache_size_mb=0; - cache->default_cache_policy.max_cache_obj_size_mb=1024;//<1GB by default + cache->default_cache_policy.max_cache_size=0; + cache->default_cache_policy.max_cache_obj_size=1024*1024*1024;//<1GB by default - cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); - cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); - - cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", - cache_param_new, cache_param_free, 0, cache); - + if(cache->cache_policy_enabled) + { + cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); + cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); + + cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", + cache_param_new, cache_param_free, cache_param_dup, + 0, cache); + cache->ref_feather=feather; + } cache_stat_init(cache); return cache; error_out: @@ -545,7 +767,6 @@ struct cache_pending_context { enum cache_pending_result status; int is_undefined_obj; - struct request_freshness req_fresshness; char* req_if_none_match, *req_if_modified_since; const struct tfe_http_half * request; @@ -566,7 +787,10 @@ void cache_pending_ctx_free_cb(void* p) if(ctx->f_tango_cache_fetch) { future_destroy(ctx->f_tango_cache_fetch); - } + } + FREE(&(ctx->cached_obj_meta.etag)); + FREE(&(ctx->cached_obj_meta.last_modified)); + FREE(&(ctx->cached_obj_meta.content_type)); free(ctx); return; } @@ -631,50 +855,91 @@ static void cache_query_meta_on_fail(enum e_future_error err, const char * what, } struct cache_mid { + enum cache_pending_result result; + struct request_freshness req_fresshness; + char shall_bypass; + char is_using_exception_param; + char is_dyn_url; + char has_cookie; + char use_cnt; + int cfg_id; + char* cache_key; struct cache_param* param; }; +void cache_mid_clear(struct cache_mid **mid) +{ + if(*mid==NULL) + { + return; + } + if((*mid)->is_using_exception_param) + { + cache_param_free(0, NULL, NULL, (void**)&((*mid)->param), 0, NULL); + } + FREE(&((*mid)->cache_key)); + FREE(mid); + return; +} + +#define CACHE_ACTION_BYPASS 0x80 enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate) { - struct request_freshness req_fresshness={0,0}; enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; int is_undefined_obj=0; struct Maat_rule_t cache_policy; - const struct cache_param* param=&(handle->default_cache_policy); + struct cache_param* param=&(handle->default_cache_policy); MAAT_RULE_EX_DATA ex_data=NULL; scan_status_t scan_mid=NULL; int ret=0; const char* cookie=NULL; - ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, - request->req_spec.url, strlen(request->req_spec.url), - &cache_policy, NULL, 1, &scan_mid, thread_id); - + struct cache_mid* _mid=ALLOC(struct cache_mid, 1); + *mid=_mid; cookie=tfe_http_std_field_read(request, TFE_HTTP_COOKIE); - if(cookie && ret<=0) + if(cookie) { - ret=Maat_full_scan_string(handle->ref_feather, handle->table_cookie_constraint, CHARSET_UTF8, - cookie, strlen(cookie), + _mid->has_cookie=1; + } + _mid->is_dyn_url=is_dynamic_url(request->req_spec.url); + if(handle->cache_policy_enabled) + { + ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, + request->req_spec.url, strlen(request->req_spec.url), &cache_policy, NULL, 1, &scan_mid, thread_id); - } - Maat_clean_status(&scan_mid); + - if(ret>0) - { - ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); - if(ex_data!=NULL) + if(cookie && ret<=0) { - param=(const struct cache_param*)ex_data; + ret=Maat_full_scan_string(handle->ref_feather, handle->table_cookie_constraint, CHARSET_UTF8, + cookie, strlen(cookie), + &cache_policy, NULL, 1, &scan_mid, thread_id); + } + Maat_clean_status(&scan_mid); + + if(ret>0) + { + ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); + if(ex_data!=NULL) + { + param=(struct cache_param*)ex_data; + _mid->is_using_exception_param=1; + _mid->param=param; + } + if(cache_policy.action==CACHE_ACTION_BYPASS) + { + _mid->shall_bypass=1; + } + _mid->cfg_id=cache_policy.config_id; + } + if(_mid->shall_bypass || + (!param->cache_dyn_url && _mid->is_dyn_url && param->cache_key.qs_num==0) || + (param->cache_cookied_cont && _mid->has_cookie) ) + { + _mid->result=PENDING_RESULT_FOBIDDEN; + return _mid->result; } } - if(param->cache_dyn_url==0 && is_dynamic_url(request->req_spec.url)) - { - return FORBIDDEN; - } - if(param->cache_cookied_cont==0 && cookie!=NULL) - { - return FORBIDDEN; - } - enum cache_pending_action get_action=tfe_cache_get_pending(request, &(req_fresshness)); + enum cache_pending_action get_action=tfe_cache_get_pending(request, &(_mid->req_fresshness)); switch(get_action) { case UNDEFINED: @@ -705,51 +970,68 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u result=PENDING_RESULT_ALLOWED; break; default: - result=PENDING_RESULT_REVALIDATE; + if(param->no_revalidate) + { + result=PENDING_RESULT_ALLOWED; + } + else + { + result=PENDING_RESULT_REVALIDATE; + } break; } if(result!=PENDING_RESULT_REVALIDATE) { - return result; + _mid->result=result; + return _mid->result; } struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); - meta.url=request->req_spec.url; - meta.get=req_fresshness; + if(param->cache_key.is_not_empty) + { + _mid->cache_key=get_cache_key(request, &(param->cache_key)); + meta.url = _mid->cache_key; + } + else + { + meta.url = request->req_spec.url; + } + meta.get = _mid->req_fresshness; struct promise* p=future_to_promise(f_revalidate); struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); ctx->status=PENDING_RESULT_FOBIDDEN; ctx->ref_handle=handle; - ctx->req_fresshness=req_fresshness; ctx->url=tfe_strdup(request->req_spec.url); + ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING])); ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_query_meta_on_succ, cache_query_meta_on_fail, p); - int ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); + ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); if(ret<0) { cache_pending_ctx_free_cb(ctx); - return PENDING_RESULT_FOBIDDEN; + _mid->result=PENDING_RESULT_FOBIDDEN; + return _mid->result; } - assert(ret==0); - return PENDING_RESULT_REVALIDATE; + _mid->result=PENDING_RESULT_REVALIDATE; + + return _mid->result; } int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_half * request, struct future* f) + const struct tfe_http_half * request, struct cache_mid** mid, struct future* f) { struct request_freshness req_fresshness; enum cache_pending_action get_action; struct cache_query_context* query_ctx=NULL; struct promise* p=NULL; struct future* _f=NULL; - - get_action=tfe_cache_get_pending(request, &req_fresshness); - assert(get_action!=FORBIDDEN); + struct cache_mid* _mid=*mid; + assert(_mid->result!=PENDING_RESULT_FOBIDDEN); if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max))) { @@ -760,6 +1042,7 @@ int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; + meta.get=_mid->req_fresshness; memcpy(&(meta.get), &req_fresshness, sizeof(meta.get)); query_ctx=ALLOC(struct cache_query_context, 1); query_ctx->ref_handle=handle; @@ -805,17 +1088,28 @@ static void wrap_cache_update_on_fail(enum e_future_error err, const char * what } struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_session * session) + const struct tfe_http_session * session, struct cache_mid **mid) { struct cache_update_context* update_ctx=NULL; struct response_freshness resp_freshness; enum cache_pending_action put_action; struct tango_cache_ctx *write_ctx=NULL; - char cont_len_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; + char cont_type_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; const char* value=NULL; char *tmp=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; + const struct cache_param* param=NULL; + struct cache_mid* _mid=*mid; + + if(_mid!=NULL && _mid->is_using_exception_param) + { + param=_mid->param; + } + else + { + param=&(handle->default_cache_policy); + } if(session->resp->resp_spec.content_length) { sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); @@ -824,19 +1118,24 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); - TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); - return NULL; - case REVALIDATE: - case ALLOWED: - break; - case UNDEFINED: - if(handle->cache_undefined_obj_enabled && content_lencache_undefined_obj_min_size) + if(!(param->ignore_res_nocache || param->force_caching)) { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); + return NULL; + } + break; + case REVALIDATE: + case ALLOWED: + case UNDEFINED: + if(_mid->shall_bypass + || content_len > param->max_cache_obj_size + || (!param->cache_cookied_cont && _mid->has_cookie) ) + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_BYPASS])); + TFE_LOG_DEBUG(handle->logger, "cache update bypass: %d : %s", _mid->cfg_id, session->req->req_spec.url); return NULL; } - is_undefined_obj=1; break; default: assert(0); @@ -847,15 +1146,37 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON])); return NULL; } + const char* key=NULL; + size_t key_len=0; + if(param->min_use>0) + { + if(_mid->cache_key) + { + key=_mid->cache_key; + key_len=strlen(_mid->cache_key); + } + else + { + key=session->req->req_spec.url; + key_len=strlen(session->req->req_spec.url); + } + _mid->use_cnt=counting_bloom_check(handle->cache_key_bloom[thread_id], key, key_len); + + if(_mid->use_cntmin_use) + { + counting_bloom_add(handle->cache_key_bloom[thread_id], key, key_len); + return NULL; + } + } ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=session->req->req_spec.url; - i=0; + i=0; - snprintf(cont_len_str, sizeof(cont_len_str), "content-type:%s",session->resp->resp_spec.content_type); - meta.std_hdr[i]=cont_len_str; + snprintf(cont_type_str, sizeof(cont_type_str), "content-type:%s",session->resp->resp_spec.content_type); + meta.std_hdr[i]=cont_type_str; i++; const char* etag=tfe_http_std_field_read(session->resp, TFE_HTTP_ETAG); const char* last_modified=tfe_http_std_field_read(session->resp, TFE_HTTP_LAST_MODIFIED); @@ -868,7 +1189,8 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, meta.usertag_len=strlen(user_tag_str)+1; } meta.put=resp_freshness; - + meta.put.timeout=MAX(param->pinning_time_sec, resp_freshness.timeout); + struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); _cache_put_ctx->url=tfe_strdup(session->req->req_spec.url); _cache_put_ctx->start=time(NULL); diff --git a/plugin/business/pangu-http/src/pangu_web_cache.h b/plugin/business/pangu-http/src/pangu_web_cache.h index db0a234..0f81852 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.h +++ b/plugin/business/pangu-http/src/pangu_web_cache.h @@ -2,6 +2,7 @@ #include #include #include +#include enum cache_query_status { @@ -13,7 +14,9 @@ enum cache_query_status WEB_CACHE_HIT }; struct cache_handle; -struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger); +struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, + struct event_base* gc_evbase, Maat_feather_t feather, void *logger); + struct cached_meta { size_t content_length; @@ -25,7 +28,7 @@ const struct cached_meta* cache_query_result_read_meta(future_result_t * result) size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data); int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_half * request, struct future* f); + const struct tfe_http_half * request, struct cache_mid** mid, struct future* f); enum cache_query_result_type @@ -51,8 +54,9 @@ struct cache_mid; const struct cached_meta* cache_pending_result_read_meta(future_result_t * result); enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_half * request, struct cache_mid **mid, struct future* f_revalidate); -void cache_mid_free(struct cache_mid **mid); + const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate); + +void cache_mid_clear(struct cache_mid **mid);