diff --git a/cache/include/tango_cache_pending.h b/cache/include/tango_cache_pending.h index f295d90..0c72635 100644 --- a/cache/include/tango_cache_pending.h +++ b/cache/include/tango_cache_pending.h @@ -7,7 +7,7 @@ enum cache_pending_action { UNDEFINED = 0, ALLOWED, FORBIDDEN, - VERIFY + REVALIDATE }; @@ -30,6 +30,8 @@ struct response_freshness{ time_t timeout; }; + +time_t read_GMT_time(const char* gmt_string); /* 函数功能: 根据请求头字段判断是否允许将缓存作为该请求的响应,并且将请求字段对缓存新鲜度的约束范围作为传出参数返回给调用者 @@ -41,7 +43,7 @@ restrict:如果该函数返回值为ALLOWED,则返回请求Cache-Control字段 UNDEFINED = 0,//请求字段中未定义缓存的行为 ALLOWED ,//允许使用缓存作为该请求的响应 FORBIDDEN,//禁止使用缓存作为该请求的响应,需要向源服务器请求 -VERIFY,//禁止使用未验证有效性的缓存作为该请求的响应 +REVALIDATE,//禁止使用未验证有效性的缓存作为该请求的响应 */ enum cache_pending_action tfe_cache_get_pending(const struct tfe_http_half *request, struct request_freshness* restrict); @@ -59,4 +61,4 @@ UNDEFINED = 0,//响应字段中未定义缓存的行为 ALLOWED ,//允许缓存该响应 FORBIDDEN,//禁止缓存该响应 */ -enum cache_pending_action tfe_cache_put_pending(const struct tfe_http_half *response, struct response_freshness* freshness); \ No newline at end of file +enum cache_pending_action tfe_cache_put_pending(const struct tfe_http_half *response, struct response_freshness* freshness); diff --git a/cache/src/tango_cache_pending.cpp b/cache/src/tango_cache_pending.cpp index da3e125..6a732f1 100644 --- a/cache/src/tango_cache_pending.cpp +++ b/cache/src/tango_cache_pending.cpp @@ -53,7 +53,7 @@ enum cache_pending_action request_cache_control(const char* value, struct reques int i = 0; if (strstr(value, "no-cache") != NULL) { - return VERIFY; + return REVALIDATE; } if (strstr(value, "no-store") != NULL) { @@ -64,7 +64,7 @@ enum cache_pending_action request_cache_control(const char* value, struct reques } -bool cache_vertify(const struct tfe_http_half *request) +bool cache_verify(const struct tfe_http_half *request) { int i = 0; if( !tfe_http_std_field_read(request,TFE_HTTP_IF_MATCH) || @@ -98,7 +98,7 @@ enum cache_pending_action get_pragma_action(const char * value) const char *pragma_value = "no-cache"; if (strcasecmp(value, pragma_value) == 0) { - return VERIFY; + return REVALIDATE; } return UNDEFINED; } @@ -106,45 +106,49 @@ enum cache_pending_action get_pragma_action(const char * value) enum cache_pending_action tfe_cache_get_pending(const struct tfe_http_half *request, struct request_freshness* restrict) { - enum cache_pending_action res = UNDEFINED; - int i = 0; - int index = 0; - const char *value = NULL; - memset(restrict,0,sizeof(struct request_freshness)); + enum cache_pending_action res = UNDEFINED; + int i = 0; + int index = 0; + const char *value = NULL; + memset(restrict,0,sizeof(struct request_freshness)); + if(request->req_spec.method!=TFE_HTTP_METHOD_GET) + { + return FORBIDDEN; + } if(NULL!=tfe_http_std_field_read(request, TFE_HTTP_CONT_RANGE)) { return FORBIDDEN; } value = tfe_http_std_field_read(request, TFE_HTTP_PRAGMA); - if (value != NULL) - { - res = get_pragma_action(value); - } - else - { - value = tfe_http_std_field_read(request, TFE_HTTP_CACHE_CONTROL); - if (value != NULL) - { - res = request_cache_control(value, restrict); - } - else - { - if (cache_vertify(request)) - { - res = VERIFY; - } - } - } - return res; + if (value != NULL) + { + res = get_pragma_action(value); + } + else + { + value = tfe_http_std_field_read(request, TFE_HTTP_CACHE_CONTROL); + if (value != NULL) + { + res = request_cache_control(value, restrict); + } + else + { + if (cache_verify(request)) + { + res = REVALIDATE; + } + } + } + return res; } -time_t absolute_to_relative_time(const char* gmt_time) +time_t read_GMT_time(const char* gmt_string) { time_t expire_rel_time; struct tm expire_gmt_time; - strptime(gmt_time, "%a, %d %b %Y %H:%M:%S GMT", &expire_gmt_time); + strptime(gmt_string, "%a, %d %b %Y %H:%M:%S GMT", &expire_gmt_time); expire_rel_time = mktime(&expire_gmt_time); return expire_rel_time; } @@ -222,7 +226,7 @@ void get_response_freshness(const struct tfe_http_half *response, struct respons field_value = tfe_http_std_field_read(response, TFE_HTTP_EXPIRES); if (field_value != NULL && is_standard_gmt_format(field_value)) { - expire_rel_time = absolute_to_relative_time(field_value); + expire_rel_time = read_GMT_time(field_value); const time_t cur_ct_time = time(NULL); if (gmtime_r(&cur_ct_time, &cur_gmt_time) == NULL) { @@ -236,12 +240,12 @@ void get_response_freshness(const struct tfe_http_half *response, struct respons if (field_value != NULL) { assert(is_standard_gmt_format(field_value)); - freshness->date = absolute_to_relative_time(field_value);; + freshness->date = read_GMT_time(field_value);; } field_value = tfe_http_std_field_read(response, TFE_HTTP_LAST_MODIFIED); if (field_value != NULL && is_standard_gmt_format(field_value)) { - freshness->last_modified = absolute_to_relative_time(field_value);; + freshness->last_modified = read_GMT_time(field_value);; } } @@ -262,7 +266,7 @@ enum cache_pending_action response_cache_control(const char* value) { if (strstr(value, verify_vaule[i]) != NULL) { - return VERIFY; + return REVALIDATE; } } return ALLOWED; @@ -277,7 +281,8 @@ enum cache_pending_action tfe_cache_put_pending(const struct tfe_http_half *resp const char *value = NULL; memset(freshness,0,sizeof(struct response_freshness)); if(response->resp_spec.resp_code!=TFE_HTTP_STATUS_OK - || NULL!=tfe_http_std_field_read(response, TFE_HTTP_CONT_RANGE)) //NOT upload response with content-range + || NULL!=tfe_http_std_field_read(response, TFE_HTTP_CONT_RANGE) //NOT upload response with content-range + || NULL==response->resp_spec.content_length) { return FORBIDDEN; } diff --git a/cache/test/test_cache_pending.cpp b/cache/test/test_cache_pending.cpp index 447937a..b21c335 100644 --- a/cache/test/test_cache_pending.cpp +++ b/cache/test/test_cache_pending.cpp @@ -18,10 +18,10 @@ TEST(CacheActionTest, PragmaField) http_heads.http_field = TFE_HTTP_PRAGMA; http_heads.value = "no-cache"; - EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1,&restrict), VERIFY); + EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1,&restrict), REVALIDATE); EXPECT_EQ(restrict.min_fresh, 0); EXPECT_EQ(restrict.max_age, 0); - EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), VERIFY); + EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), REVALIDATE); EXPECT_EQ(freshness.date, 0); EXPECT_EQ(freshness.last_modified, 0); EXPECT_EQ(freshness.timeout, 0); @@ -35,10 +35,10 @@ TEST(CacheActionTest, CacheCtlNoCache) struct response_freshness freshness; http_heads.http_field = TFE_HTTP_CACHE_CONTROL; http_heads.value = "no-cache"; - EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), VERIFY); + EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), REVALIDATE); EXPECT_EQ(restrict.min_fresh, 0); EXPECT_EQ(restrict.max_age, 0); - EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), VERIFY); + EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), REVALIDATE); EXPECT_EQ(freshness.date, 0); EXPECT_EQ(freshness.last_modified, 0); EXPECT_EQ(freshness.timeout, 0); @@ -92,7 +92,7 @@ TEST(CacheActionTest, CacheCtlMustRevalidate) struct response_freshness freshness; http_heads.http_field = TFE_HTTP_CACHE_CONTROL; http_heads.value = "must-revalidate"; - EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), VERIFY); + EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), REVALIDATE); EXPECT_EQ(freshness.date, 0); EXPECT_EQ(freshness.last_modified, 0); EXPECT_EQ(freshness.timeout, 0); @@ -103,7 +103,7 @@ TEST(CacheActionTest, CacheCtlProxyRevalidate) struct response_freshness freshness; http_heads.http_field = TFE_HTTP_CACHE_CONTROL; http_heads.value = "proxy-revalidate"; - EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), VERIFY); + EXPECT_EQ(tfe_cache_put_pending(&http_heads, 1, &freshness), REVALIDATE); EXPECT_EQ(freshness.date, 0); EXPECT_EQ(freshness.last_modified, 0); EXPECT_EQ(freshness.timeout, 0); @@ -149,7 +149,7 @@ TEST(CacheActionTest, IfMatchRequest) struct request_freshness restrict; http_heads.http_field = TLF_HTTP_IF_MATCH; http_heads.value = "50b1c1d4f775c61:df3"; - EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), VERIFY); + EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), REVALIDATE); EXPECT_EQ(restrict.min_fresh, 0); EXPECT_EQ(restrict.max_age, 0); } @@ -159,7 +159,7 @@ TEST(CacheActionTest, IfNoMatchRequest) struct request_freshness restrict; http_heads.http_field = TLF_HTTP_IF_NONE_MATCH; http_heads.value = "50b1c1d4f775c61:df3"; - EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), VERIFY); + EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), REVALIDATE); EXPECT_EQ(restrict.min_fresh, 0); EXPECT_EQ(restrict.max_age, 0); } @@ -169,7 +169,7 @@ TEST(CacheActionTest, IfModifiedSinceRequest) struct request_freshness restrict; http_heads.http_field = TLF_HTTP_IF_MODIFIED_SINCE; http_heads.value = "Sun, 01 Dec 2019 16:00:00 GMT"; - EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), VERIFY); + EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), REVALIDATE); EXPECT_EQ(restrict.min_fresh, 0); EXPECT_EQ(restrict.max_age, 0); } @@ -179,7 +179,7 @@ TEST(CacheActionTest, IfUnModifiedSinceRequest) struct request_freshness restrict; http_heads.http_field = TLF_HTTP_IF_UNMODIFIED_SINCE; http_heads.value = "Sun, 01 Dec 2019 16:00:00 GMT"; - EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), VERIFY); + EXPECT_EQ(tfe_cache_get_pending(&http_heads, 1, &restrict), REVALIDATE); EXPECT_EQ(restrict.min_fresh, 0); EXPECT_EQ(restrict.max_age, 0); } diff --git a/common/include/tfe_http.h b/common/include/tfe_http.h index 27bf572..7a5a27a 100644 --- a/common/include/tfe_http.h +++ b/common/include/tfe_http.h @@ -432,7 +432,28 @@ static inline int tfe_http_in_request(enum tfe_http_event events) return 0; } } +static inline int tfe_http_in_response(enum tfe_http_event events) +{ + return !(tfe_http_in_request(events)); +} +static inline struct tfe_http_half* tfe_http_session_request_duplicate(struct tfe_http_session * session) +{ + struct http_field_name in_header_field{}; + const char * in_header_value = NULL; + void * iterator = NULL; + + struct tfe_http_half* dup_req=tfe_http_session_request_create(session, session->req->req_spec.method, session->req->req_spec.uri); + while (true) + { + if ((in_header_value = tfe_http_field_iterate(session->req, &iterator, &in_header_field)) == NULL) + { + break; + } + tfe_http_field_write(dup_req, &in_header_field,in_header_value); + } + return dup_req; +} //@flag EV_HTTP_RESP_BODY_END, EV_HTTP_RESP_BODY_FULL, //suspend stream on EV_HTTP_REQ_BODY_BEGIN, resume when EV_HTTP_REQ_BODY_END. diff --git a/plugin/business/pangu-http/src/pangu_http.cpp b/plugin/business/pangu-http/src/pangu_http.cpp index 86d6f4f..00487be 100644 --- a/plugin/business/pangu-http/src/pangu_http.cpp +++ b/plugin/business/pangu-http/src/pangu_http.cpp @@ -282,6 +282,7 @@ error_out: return -1; } + struct replace_ctx { struct replace_rule * rule; @@ -304,17 +305,24 @@ struct pangu_http_ctx char * enforce_para; struct replace_ctx * rep_ctx; - - int resume_from_cache_query; + + int (* resumed_cb)(const struct tfe_stream * stream, + const struct tfe_http_session * session, enum tfe_http_event event, const unsigned char * data, + size_t datalen, unsigned int thread_id, struct pangu_http_ctx* ctx); + + enum cache_pending_result pending_result; enum cache_query_status cache_query_status; - struct future* f_cache_query; + struct future *f_cache_pending, *f_cache_query; struct tfe_http_session * ref_session; + struct tfe_http_half* cache_revalidate_req; struct tfe_http_half* cached_response; size_t cache_result_declared_sz, cache_result_actual_sz; struct cache_update_context* cache_update_ctx; int thread_id; }; + + void http_repl_ctx_free(struct replace_ctx* rep_ctx) { for (size_t i = 0; i < rep_ctx->n_rule; i++) @@ -501,81 +509,7 @@ static void html_free(char ** page_buff) FREE(page_buff); return; } -static void cache_query_on_succ(future_result_t * result, void * user) -{ - struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; - struct cached_meta* meta=NULL; - enum cache_query_result_type type=cache_query_result_get_type(result); - const unsigned char* data=NULL; - size_t data_sz=0; - char temp[TFE_STRING_MAX]; - switch(type) - { - case CACHE_QUERY_RESULT_META: - meta=cache_query_result_get_header(result); - ctx->cache_result_declared_sz=meta->content_length; - ctx->resume_from_cache_query=1; - tfe_http_session_resume(ctx->ref_session); - - ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200); - tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From TFE"); - tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type); - snprintf(temp, sizeof(temp), "%lu", meta->content_length); - tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp); - - tfe_http_session_response_set(ctx->ref_session, ctx->cached_response); - tfe_http_half_write_body_begin(ctx->cached_response, 1); - - cache_query_free_meta(meta); - meta=NULL; - break; - case CACHE_QUERY_RESULT_DATA: - data_sz=cache_query_result_get_data(result, &data); - tfe_http_half_write_body_data(ctx->cached_response, data, data_sz); - ctx->cache_result_actual_sz+=data_sz; - break; - case CACHE_QUERY_RESULT_END: - assert(ctx->cached_response!=NULL); - ctx->cache_query_status=WEB_CACHE_HIT; - tfe_http_half_write_body_end(ctx->cached_response); - //ownership has been transferred to http session, set to NULL. - ctx->cached_response=NULL; - assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz); - future_destroy(ctx->f_cache_query); - ctx->f_cache_query=NULL; - break; - case CACHE_QUERY_RESULT_MISS: - ctx->cache_query_status=WEB_CACHE_NOT_HIT; - ctx->resume_from_cache_query=1; - tfe_http_session_resume(ctx->ref_session); - future_destroy(ctx->f_cache_query); - ctx->f_cache_query=NULL; - break; - default: - break; - } - return; -} -static void cache_query_on_fail(enum e_future_error err, const char * what, void * user) -{ - struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; - future_destroy(ctx->f_cache_query); - ctx->f_cache_query=NULL; - ctx->cache_query_status=WEB_CACHE_NOT_HIT; - ctx->resume_from_cache_query=1; - if(!ctx->cached_response) - { - tfe_http_session_resume(ctx->ref_session); - } - else - { - tfe_http_half_write_body_end(ctx->cached_response); - ctx->cached_response=NULL; - } - - printf("cache query failed: %s %s\n", ctx->ref_session->req->req_spec.url, what); -} void http_replace(const struct tfe_stream * stream, const struct tfe_http_session * session, enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, struct pangu_http_ctx * ctx) { @@ -950,27 +884,201 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h } return; } -void cache_query(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx) +#define RESUMED_CB_NO_MORE_CALLS 0 +#define RESUMED_CB_MORE_CALLS 1 +int make_revalidate_request(const struct tfe_stream * stream, const struct tfe_http_session * session, + enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, unsigned int thread_id, struct pangu_http_ctx * ctx) { - ctx->f_cache_query=future_create("cache_get", cache_query_on_succ, cache_query_on_fail, ctx); - ctx->cache_query_status=async_web_cache_query(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_query); - if(ctx->cache_query_status==WEB_CACHE_QUERING) + assert(ctx->cache_revalidate_req); + if(events & EV_HTTP_REQ_BODY_BEGIN) { - ctx->ref_session=tfe_http_session_allow_write(session); - tfe_http_session_suspend(ctx->ref_session); + tfe_http_half_write_body_begin(ctx->cache_revalidate_req, 1); + } + if(events & EV_HTTP_REQ_BODY_CONT) + { + tfe_http_half_write_body_data(ctx->cache_revalidate_req, body_frag, frag_size); + } + if(events & EV_HTTP_REQ_BODY_END) + { + tfe_http_half_write_body_end(ctx->cache_revalidate_req); + ctx->cache_revalidate_req=NULL; + return RESUMED_CB_NO_MORE_CALLS; + } + return RESUMED_CB_MORE_CALLS; +} +int dummy_resume(const struct tfe_stream * stream, const struct tfe_http_session * session, + enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, unsigned int thread_id, struct pangu_http_ctx * ctx) +{ + return RESUMED_CB_NO_MORE_CALLS; +} + +static void cache_query_on_succ(future_result_t * result, void * user) +{ + struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; + const struct cached_meta* meta=NULL; + enum cache_query_result_type type=cache_query_result_get_type(result); + const unsigned char* data=NULL; + size_t data_sz=0; + char temp[TFE_STRING_MAX]; + + switch(type) + { + case CACHE_QUERY_RESULT_META: + meta=cache_query_result_read_meta(result); + ctx->cache_result_declared_sz=meta->content_length; + ctx->resumed_cb=dummy_resume; + tfe_http_session_resume(ctx->ref_session); + + ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200); + tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From TFE"); + tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type); + snprintf(temp, sizeof(temp), "%lu", meta->content_length); + tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp); + + tfe_http_session_response_set(ctx->ref_session, ctx->cached_response); + tfe_http_half_write_body_begin(ctx->cached_response, 1); + + meta=NULL; + break; + case CACHE_QUERY_RESULT_DATA: + data_sz=cache_query_result_get_data(result, &data); + tfe_http_half_write_body_data(ctx->cached_response, data, data_sz); + ctx->cache_result_actual_sz+=data_sz; + break; + case CACHE_QUERY_RESULT_END: + assert(ctx->cached_response!=NULL); + ctx->cache_query_status=WEB_CACHE_HIT; + tfe_http_half_write_body_end(ctx->cached_response); + //ownership has been transferred to http session, set to NULL. + ctx->cached_response=NULL; + assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz); + future_destroy(ctx->f_cache_query); + ctx->f_cache_query=NULL; + break; + case CACHE_QUERY_RESULT_MISS: + ctx->cache_query_status=WEB_CACHE_MISS; + ctx->resumed_cb=dummy_resume; + tfe_http_session_resume(ctx->ref_session); + future_destroy(ctx->f_cache_query); + ctx->f_cache_query=NULL; + break; + default: + break; + } + return; +} +static void cache_query_on_fail(enum e_future_error err, const char * what, void * user) +{ + struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; + future_destroy(ctx->f_cache_query); + ctx->f_cache_query=NULL; + ctx->cache_query_status=WEB_CACHE_MISS; + if(!ctx->cached_response) + { + tfe_http_session_resume(ctx->ref_session); + ctx->resumed_cb=dummy_resume; } else { - future_destroy(ctx->f_cache_query); - ctx->f_cache_query=NULL; + tfe_http_half_write_body_end(ctx->cached_response); + ctx->cached_response=NULL; } + + printf("cache query failed: %s %s\n", ctx->ref_session->req->req_spec.url, what); } +static void cache_pending_on_succ(future_result_t * result, void * user) +{ + struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; + const struct cached_meta* meta=NULL; + meta=cache_pending_result_read_meta(result); + ctx->resumed_cb=dummy_resume; + tfe_http_session_resume(ctx->ref_session); + future_destroy(ctx->f_cache_pending); + ctx->f_cache_pending=NULL; + if(meta==NULL) + { + ctx->pending_result==PENDING_RESULT_MISS; + return; + } + if(!(meta->etag && meta->last_modified)) + { + ctx->pending_result==PENDING_RESULT_FOBIDDEN; + return; + } + ctx->pending_result=PENDING_RESULT_REVALIDATE; + struct http_field_name in_field_name; + const char * in_header_value = NULL; + void * iterator = NULL; + ctx->cache_revalidate_req=tfe_http_session_request_create(ctx->ref_session, + ctx->ref_session->req->req_spec.method, ctx->ref_session->req->req_spec.uri); + while (true) + { + if ((in_header_value = tfe_http_field_iterate(ctx->ref_session->req, &iterator, &in_field_name)) == NULL) + { + break; + } + if(in_field_name.field_id==TFE_HTTP_IF_MATCH || in_field_name.field_id==TFE_HTTP_IF_NONE_MATCH + || in_field_name.field_id==TFE_HTTP_IF_MODIFIED_SINCE + || in_field_name.field_id==TFE_HTTP_IF_UNMODIFIED_SINCE) + { + continue; + } + tfe_http_field_write(ctx->cache_revalidate_req, &in_field_name, in_header_value); + } + if(meta->etag) tfe_http_std_field_write(ctx->cache_revalidate_req, TFE_HTTP_IF_NONE_MATCH, meta->etag); + if(meta->last_modified) tfe_http_std_field_write(ctx->cache_revalidate_req, TFE_HTTP_IF_MODIFIED_SINCE, meta->last_modified); + ctx->resumed_cb=make_revalidate_request; + return; +} +static void cache_pending_on_fail(enum e_future_error err, const char * what, void * user) +{ + struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; + + ctx->pending_result=PENDING_RESULT_FOBIDDEN; + tfe_http_session_resume(ctx->ref_session); + ctx->resumed_cb=dummy_resume; + future_destroy(ctx->f_cache_pending); + ctx->f_cache_pending=NULL; + + return; +} + +void cache_pending(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx) +{ + enum cache_pending_result ret; + ctx->f_cache_pending=future_create("cache_pend", cache_pending_on_succ, cache_pending_on_fail, ctx); + ctx->pending_result=web_cache_async_pending(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_pending); + switch(ctx->pending_result) + { + case PENDING_RESULT_REVALIDATE: + ctx->ref_session=tfe_http_session_allow_write(session); + tfe_http_session_suspend(ctx->ref_session); + break; + case PENDING_RESULT_ALLOWED: + case PENDING_RESULT_FOBIDDEN: + case PENDING_RESULT_MISS: + future_destroy(ctx->f_cache_query); + ctx->f_cache_query=NULL; + break; + default: + break; + } + return; +} +void cache_query(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx) +{ + ctx->f_cache_query=future_create("cache_get", cache_query_on_succ, cache_query_on_fail, ctx); + ctx->ref_session=tfe_http_session_allow_write(session); + tfe_http_session_suspend(ctx->ref_session); + web_cache_async_query(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_query); +} + void cache_update(const struct tfe_http_session * session, enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, unsigned int thread_id, struct pangu_http_ctx * ctx) { - if(events & EV_HTTP_RESP_BODY_BEGIN && ctx->cache_query_status == WEB_CACHE_NOT_HIT) + if(events & EV_HTTP_RESP_BODY_BEGIN) { ctx->cache_update_ctx=web_cache_update_start(g_pangu_rt->cache, thread_id, session); } @@ -1058,33 +1166,42 @@ void pangu_on_http_data(const struct tfe_stream * stream, const struct tfe_http_ enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, unsigned int thread_id, void ** pme) { struct pangu_http_ctx * ctx = *(struct pangu_http_ctx **) pme; - - if(events & EV_HTTP_REQ_HDR && ctx->resume_from_cache_query && ctx->cache_query_status == WEB_CACHE_HIT) + int ret=0; + if(ctx->resumed_cb) { - //resume from cache query. - assert(ctx->action==PG_ACTION_NONE); - tfe_http_session_detach(session); - return; - } - if(!ctx->resume_from_cache_query) - { - enforce_control_policy(stream, session, events, body_frag, frag_size,thread_id, ctx); - } - if(ctx->action != PG_ACTION_NONE) - { - return; - } - if(g_pangu_rt->cache_enabled) - { - if(events & EV_HTTP_REQ_HDR && !ctx->resume_from_cache_query) + ret=ctx->resumed_cb(stream, session, events, body_frag, frag_size,thread_id, ctx); + if(ret==RESUMED_CB_NO_MORE_CALLS) { - cache_query(session, thread_id, ctx); + ctx->resumed_cb=NULL; } - if(!tfe_http_in_request(events)) + return; + } + + enforce_control_policy(stream, session, events, body_frag, frag_size,thread_id, ctx); + + if(g_pangu_rt->cache_enabled && ctx->action == PG_ACTION_NONE) + { + if(events & EV_HTTP_REQ_HDR) + { + cache_pending(session, thread_id, ctx); + if(ctx->pending_result==PENDING_RESULT_ALLOWED) + { + cache_query(session, thread_id, ctx); + } + } + if(events & EV_HTTP_RESP_HDR && ctx->pending_result==PENDING_RESULT_REVALIDATE) + { + if(session->resp->resp_spec.resp_code==TFE_HTTP_STATUS_NOT_MODIFIED) + { + cache_query(session, thread_id, ctx); + } + } + + if(tfe_http_in_response(events)) { cache_update(session, events, body_frag, frag_size, thread_id, ctx); - } - } + } + } return; } diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index ebdd799..8ef3070 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -26,6 +26,7 @@ enum cache_stat_field STAT_CACHE_QUERY_ERR, STAT_CACHE_QUERY_ABANDON, STAT_CACHE_QUERYING, + STAT_CACHE_META_QUERYING, STAT_CACHE_UPLOAD_CNT, STAT_CACHE_UPLOAD_OVERRIDE, STAT_CACHE_UPLOAD_FORBIDEN, @@ -269,12 +270,6 @@ static char* read_http1_hdr(const char* hdr, const char* field_name) memcpy(value, p, q-p); return value; } -void cache_query_free_meta(struct cached_meta* meta) -{ - FREE(&meta->content_type); - FREE(&meta); - return; -} enum cache_query_result_type cache_query_result_get_type(future_result_t * result) { @@ -313,7 +308,10 @@ struct cache_query_context { struct cache_handle* ref_handle; char* url; - int is_undefined_obj; + + struct cached_meta meta; + + struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; }; void cache_query_ctx_free_cb(void* p) @@ -322,27 +320,52 @@ void cache_query_ctx_free_cb(void* p) future_destroy(ctx->f_tango_cache_fetch); ctx->f_tango_cache_fetch=NULL; FREE(&(ctx->url)); + FREE(&(ctx->meta.etag)); + FREE(&(ctx->meta.last_modified)); + FREE(&(ctx->meta.content_type)); free(ctx); } -static void wrap_cache_query_on_succ(future_result_t * result, void * user) +const struct cached_meta* cache_query_result_read_meta(future_result_t * result) +{ + struct cache_query_context* ctx=(struct cache_query_context*)result; + return &(ctx->meta); +} +void cached_meta_set(struct cached_meta* meta, enum CACHE_RESULT_TYPE type, const char* data_frag, size_t size) +{ + switch(type) + { + case RESULT_TYPE_HEADER: + meta->content_type=read_http1_hdr((const char*)data_frag, "content-type"); + break; + case RESULT_TYPE_USERTAG: + meta->last_modified=read_http1_hdr(data_frag, "Last-Modified"); + meta->etag=read_http1_hdr(data_frag, "etag"); + break; + default: + assert(0); + break; + } + return; +} + +static void cache_query_obj_on_succ(future_result_t * result, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_get_ctx(p); struct tango_cache_result* _result=tango_cache_read_result(result); - enum cache_query_result_type type=cache_query_result_get_type(result); switch(_result->type) { case RESULT_TYPE_HEADER: - if(ctx->is_undefined_obj) - { - ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_OVERRIDE_HIT])); - FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], 0, FS_OP_SET, _result->tlength/1024); - } ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT])); FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_QUERY_HIT_OJB_SIZE], 0, FS_OP_SET, _result->tlength/1024); + cached_meta_set(&ctx->meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); + ctx->meta.content_length=_result->tlength; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url); break; + case RESULT_TYPE_USERTAG: + cached_meta_set(&ctx->meta, RESULT_TYPE_USERTAG, _result->data_frag, _result->size); + break; case RESULT_TYPE_MISS: TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url); //NOT break intentionally. @@ -362,7 +385,7 @@ static void wrap_cache_query_on_succ(future_result_t * result, void * user) return; } -static void wrap_cache_query_on_fail(enum e_future_error err, const char * what, void * user) +static void cache_query_obj_on_fail(enum e_future_error err, const char * what, void * user) { struct promise * p = (struct promise *) user; struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); @@ -372,45 +395,160 @@ static void wrap_cache_query_on_fail(enum e_future_error err, const char * what, return; } -enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_half * request, struct future* f) + +struct cache_pending_context { + enum cache_pending_result status; + int is_undefined_obj; struct request_freshness req_fresshness; - enum cache_pending_action get_action; - struct cache_query_context* query_ctx=NULL; - struct promise* p=NULL; - struct future* _f=NULL; - int ret=0, is_undefined_obj=0; - get_action=tfe_cache_get_pending(request, &req_fresshness); + char* req_if_none_match, *req_if_modified_since; + const struct tfe_http_half * request; + + char* url; + struct cached_meta cached_obj_meta; + + struct cache_handle* ref_handle; + struct tango_cache_result* ref_tango_cache_result; + struct future* f_tango_cache_fetch; +}; +void cache_pending_ctx_free_cb(void* p) +{ + struct cache_pending_context* ctx=(struct cache_pending_context*)p; + ctx->request=NULL; + free(ctx->url); + free(ctx); + return; +} +const struct cached_meta* cache_pending_result_read_meta(future_result_t * result) +{ + struct cache_pending_context* ctx=(struct cache_pending_context*)result; + return &(ctx->cached_obj_meta); +} + +static void cache_query_meta_on_succ(future_result_t * result, void * user) +{ + struct promise * p = (struct promise *) user; + struct cache_pending_context* ctx=(struct cache_pending_context*)promise_get_ctx(p); + struct tango_cache_result* _result=tango_cache_read_result(result); + ctx->ref_tango_cache_result=_result; + time_t cache_last_modified_time=0, request_last_modified_time=0; + switch(_result->type) + { + case RESULT_TYPE_HEADER: + ctx->cached_obj_meta.content_length=_result->tlength; + cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); + ctx->status=PENDING_RESULT_REVALIDATE; + break; + case RESULT_TYPE_USERTAG: + cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); + TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache meta query hit: %s %s %s" + , ctx->url + , ctx->cached_obj_meta.last_modified ? ctx->cached_obj_meta.last_modified:"no_last_modify" + , ctx->cached_obj_meta.etag ? ctx->cached_obj_meta.etag:"no_etag"); + break; + case RESULT_TYPE_MISS: + ctx->status=PENDING_RESULT_MISS; + TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache meta query miss: %s", ctx->url); + //NOT break intentionally. + case RESULT_TYPE_END: + //last call. + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_META_QUERYING])); + promise_dettach_ctx(p); + promise_success(p, ctx); + cache_query_ctx_free_cb(ctx); + break; + default: + break; + } + +} +static void cache_query_meta_on_fail(enum e_future_error err, const char * what, void * user) +{ + struct promise * p = (struct promise *) user; + struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p); + promise_failed(p, err, what); + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_META_QUERYING])); + cache_query_ctx_free_cb(ctx); + return; +} + +enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, + const struct tfe_http_half * request, struct future* f_revalidate) +{ + struct request_freshness req_fresshness={0,0}; + enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; + int is_undefined_obj=0; + enum cache_pending_action get_action=tfe_cache_get_pending(request, &(req_fresshness)); switch(get_action) { case UNDEFINED: if(!handle->query_undefined_obj_enabled) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); - return WEB_CACHE_NOT_APPLICABLE; + result=PENDING_RESULT_FOBIDDEN; + } + else + { + is_undefined_obj=1; + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); + result=PENDING_RESULT_ALLOWED; } - is_undefined_obj=1; - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); break; - case VERIFY: - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_VERIFY])); - return WEB_CACHE_NOT_APPLICABLE; case FORBIDDEN: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); - return WEB_CACHE_NOT_APPLICABLE; + result=PENDING_RESULT_FOBIDDEN; + break; case ALLOWED: + result=PENDING_RESULT_ALLOWED; break; default: - assert(0); - return WEB_CACHE_NOT_APPLICABLE; + result=PENDING_RESULT_REVALIDATE; + break; } + if(result!=PENDING_RESULT_REVALIDATE) + { + return result; + } + + struct tango_cache_meta_get meta; + memset(&meta, 0, sizeof(meta)); + meta.url=request->req_spec.url; + meta.get=req_fresshness; + + struct promise* p=future_to_promise(f_revalidate); + struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); + ctx->status=PENDING_RESULT_FOBIDDEN; + ctx->ref_handle=handle; + ctx->req_fresshness=req_fresshness; + ctx->url=tfe_strdup(request->req_spec.url); + ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); + ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); + promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); + + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_META_QUERYING])); + ctx->f_tango_cache_fetch=future_create("_cache_meta", cache_query_meta_on_succ, cache_query_meta_on_fail, p); + int ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); + assert(ret==0); + return PENDING_RESULT_REVALIDATE; +} +int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, + const struct tfe_http_half * request, struct future* f) +{ + struct request_freshness req_fresshness; + enum cache_pending_action get_action; + struct cache_query_context* query_ctx=NULL; + struct promise* p=NULL; + struct future* _f=NULL; + + get_action=tfe_cache_get_pending(request, &req_fresshness); + assert(get_action!=FORBIDDEN); + if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_QUERYING])) > ATOMIC_READ(&(handle->put_concurrency_max))) { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_ABANDON])); - return WEB_CACHE_NOT_APPLICABLE; + return -1; } - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING])); + struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; @@ -418,14 +556,14 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig query_ctx=ALLOC(struct cache_query_context, 1); query_ctx->ref_handle=handle; query_ctx->url=tfe_strdup(request->req_spec.url); - query_ctx->is_undefined_obj=is_undefined_obj; p=future_to_promise(f); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); - query_ctx->f_tango_cache_fetch=future_create("_cache_get", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p); - ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); - assert(ret==0); - return WEB_CACHE_QUERING; + + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERYING])); + query_ctx->f_tango_cache_fetch=future_create("_cache_get", cache_query_obj_on_succ, cache_query_obj_on_fail, p); + int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); + return 0; } struct wrap_cache_put_ctx { @@ -460,20 +598,20 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, struct response_freshness resp_freshness; enum cache_pending_action put_action; struct tango_cache_ctx *write_ctx=NULL; - char buffer[TFE_STRING_MAX]; + char cont_len_str[TFE_STRING_MAX]={0}, user_tag_str[TFE_STRING_MAX]={0}; const char* value=NULL; + char *tmp=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; - if(!session->resp->resp_spec.content_length) + if(session->resp->resp_spec.content_length) { - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); - return NULL; + sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); } - sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); + put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: - case VERIFY: + case REVALIDATE: ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache update forbiden: %s", session->req->req_spec.url); return NULL; @@ -496,14 +634,26 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_ABANDON])); return NULL; } - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOADING])); + struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=session->req->req_spec.url; - i=0; - snprintf(buffer, sizeof(buffer), "content-type:%s",session->resp->resp_spec.content_type); - meta.std_hdr[i]=buffer; + i=0; + + snprintf(cont_len_str, sizeof(cont_len_str), "content-type:%s",session->resp->resp_spec.content_type); + meta.std_hdr[i]=cont_len_str; i++; + const char* etag=tfe_http_std_field_read(session->resp, TFE_HTTP_ETAG); + const char* last_modified=tfe_http_std_field_read(session->resp, TFE_HTTP_LAST_MODIFIED); + tmp=user_tag_str; + if(etag) tmp+=snprintf(tmp, sizeof(user_tag_str)-(tmp-user_tag_str), "etag:%s\r\n", etag); + if(last_modified) tmp+=snprintf(tmp, sizeof(user_tag_str)-(tmp-user_tag_str), "Last-modified:%s\r\n", last_modified); + if(strlen(user_tag_str)>0) + { + meta.usertag=user_tag_str; + meta.usertag_len=strlen(user_tag_str)+1; + } memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness)); struct wrap_cache_put_ctx* _cache_put_ctx=ALLOC(struct wrap_cache_put_ctx, 1); diff --git a/plugin/business/pangu-http/src/pangu_web_cache.h b/plugin/business/pangu-http/src/pangu_web_cache.h index c910922..cb0227f 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.h +++ b/plugin/business/pangu-http/src/pangu_web_cache.h @@ -7,8 +7,9 @@ enum cache_query_status { WEB_CACHE_BEFORE_QUERY=0, WEB_CACHE_NOT_APPLICABLE, - WEB_CACHE_QUERING, - WEB_CACHE_NOT_HIT, + WEB_CACHE_NEED_VERIFY, + WEB_CACHE_QUERY_DATA, + WEB_CACHE_MISS, WEB_CACHE_HIT }; struct cache_handle; @@ -17,13 +18,16 @@ struct cached_meta { size_t content_length; char* content_type; + char* last_modified; + char* etag; }; -struct cached_meta* cache_query_result_get_header(future_result_t * result); -void cache_query_free_meta(struct cached_meta* meta); +const struct cached_meta* cache_query_result_read_meta(future_result_t * result); size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data); -enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id, - const struct tfe_http_half * request, struct future* f); +int web_cache_async_query(struct cache_handle* handle, unsigned int thread_id, + const struct tfe_http_half * request, struct future* f); + + enum cache_query_result_type { CACHE_QUERY_RESULT_MISS, @@ -35,6 +39,19 @@ enum cache_query_result_type }; enum cache_query_result_type cache_query_result_get_type(future_result_t * result); +enum cache_pending_result +{ + PENDING_RESULT_NONE=0, + PENDING_RESULT_FOBIDDEN, + PENDING_RESULT_REVALIDATE, + PENDING_RESULT_ALLOWED, + PENDING_RESULT_MISS +}; +const struct cached_meta* cache_pending_result_read_meta(future_result_t * result); +enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, + const struct tfe_http_half * request, struct future* f_revalidate); + + struct cache_update_context;