缓存功能整体流程调试通过。
This commit is contained in:
@@ -324,6 +324,14 @@ static inline int tfe_http_std_field_write(struct tfe_http_half * half,
|
|||||||
tmp_name.field_name=NULL;
|
tmp_name.field_name=NULL;
|
||||||
return tfe_http_field_write(half, &tmp_name, value);
|
return tfe_http_field_write(half, &tmp_name, value);
|
||||||
}
|
}
|
||||||
|
static inline int tfe_http_nonstd_field_write(struct tfe_http_half * half,
|
||||||
|
const char* field, const char * value)
|
||||||
|
{
|
||||||
|
struct http_field_name tmp_name;
|
||||||
|
tmp_name.field_id=TFE_HTTP_UNKNOWN_FIELD;
|
||||||
|
tmp_name.field_name=field;
|
||||||
|
return tfe_http_field_write(half, &tmp_name, value);
|
||||||
|
}
|
||||||
|
|
||||||
static inline struct tfe_http_half * tfe_http_allow_write(const struct tfe_http_half * half)
|
static inline struct tfe_http_half * tfe_http_allow_write(const struct tfe_http_half * half)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -454,6 +454,7 @@ static void cache_query_on_succ(future_result_t * result, void * user)
|
|||||||
ctx->f_cache_query=NULL;
|
ctx->f_cache_query=NULL;
|
||||||
tfe_http_session_resume(ctx->ref_session);
|
tfe_http_session_resume(ctx->ref_session);
|
||||||
ctx->resume_from_cache_query=1;
|
ctx->resume_from_cache_query=1;
|
||||||
|
printf("cache query success: %s\n", ctx->ref_session->req->req_spec.url);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(cache_query_result_is_header(result))
|
if(cache_query_result_is_header(result))
|
||||||
@@ -475,6 +476,7 @@ static void cache_query_on_fail(enum e_future_error err, const char * what, void
|
|||||||
tfe_http_session_resume(ctx->ref_session);
|
tfe_http_session_resume(ctx->ref_session);
|
||||||
ctx->cache_query_status=WEB_CACHE_NOT_HIT;
|
ctx->cache_query_status=WEB_CACHE_NOT_HIT;
|
||||||
ctx->resume_from_cache_query=1;
|
ctx->resume_from_cache_query=1;
|
||||||
|
printf("cache query failed: %s\n", ctx->ref_session->req->req_spec.url);
|
||||||
}
|
}
|
||||||
void http_replace(const struct tfe_stream * stream, const struct tfe_http_session * session,
|
void http_replace(const struct tfe_stream * stream, const struct tfe_http_session * session,
|
||||||
enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, struct pangu_http_ctx * ctx)
|
enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, struct pangu_http_ctx * ctx)
|
||||||
@@ -852,6 +854,10 @@ void cache_update(const struct tfe_http_session * session, enum tfe_http_event e
|
|||||||
if(events & EV_HTTP_RESP_BODY_BEGIN && ctx->cache_query_status == WEB_CACHE_NOT_HIT)
|
if(events & EV_HTTP_RESP_BODY_BEGIN && ctx->cache_query_status == WEB_CACHE_NOT_HIT)
|
||||||
{
|
{
|
||||||
ctx->cache_update_ctx=web_cache_update_start(g_pangu_rt->cache, thread_id, session);
|
ctx->cache_update_ctx=web_cache_update_start(g_pangu_rt->cache, thread_id, session);
|
||||||
|
if(ctx->cache_update_ctx==NULL)
|
||||||
|
{
|
||||||
|
printf("cache update forbidden: %s\n", ctx->ref_session->req->req_spec.url);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if(events & EV_HTTP_RESP_BODY_CONT && ctx->cache_update_ctx!=NULL)
|
if(events & EV_HTTP_RESP_BODY_CONT && ctx->cache_update_ctx!=NULL)
|
||||||
{
|
{
|
||||||
@@ -861,6 +867,7 @@ void cache_update(const struct tfe_http_session * session, enum tfe_http_event e
|
|||||||
{
|
{
|
||||||
web_cache_update_end(ctx->cache_update_ctx);
|
web_cache_update_end(ctx->cache_update_ctx);
|
||||||
ctx->cache_update_ctx=NULL;
|
ctx->cache_update_ctx=NULL;
|
||||||
|
printf("cache update success: %s\n", ctx->ref_session->req->req_spec.url);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -869,6 +876,7 @@ void cache_make_response(const struct tfe_http_session * session, struct pangu_h
|
|||||||
size_t cont_len=0;
|
size_t cont_len=0;
|
||||||
struct tfe_http_session * wr_session=tfe_http_session_allow_write(session);
|
struct tfe_http_session * wr_session=tfe_http_session_allow_write(session);
|
||||||
struct tfe_http_half* cached_response=tfe_http_session_response_create(wr_session, 200);
|
struct tfe_http_half* cached_response=tfe_http_session_response_create(wr_session, 200);
|
||||||
|
tfe_http_nonstd_field_write(cached_response, "X-Cache-Lookup", "Hit From MESA-TFE");
|
||||||
tfe_http_std_field_write(cached_response, TFE_HTTP_CONT_TYPE, ctx->cached_header->content_type);
|
tfe_http_std_field_write(cached_response, TFE_HTTP_CONT_TYPE, ctx->cached_header->content_type);
|
||||||
tfe_http_std_field_write(cached_response, TFE_HTTP_CONT_LENGTH, ctx->cached_header->content_length);
|
tfe_http_std_field_write(cached_response, TFE_HTTP_CONT_LENGTH, ctx->cached_header->content_length);
|
||||||
sscanf(ctx->cached_header->content_length, "%llu", &cont_len);
|
sscanf(ctx->cached_header->content_length, "%llu", &cont_len);
|
||||||
@@ -956,8 +964,10 @@ void pangu_on_http_data(const struct tfe_stream * stream, const struct tfe_http_
|
|||||||
cache_make_response(session, ctx);
|
cache_make_response(session, ctx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if(!ctx->resume_from_cache_query)
|
||||||
enforce_control_policy(stream, session, events, body_frag, frag_size,thread_id, ctx);
|
{
|
||||||
|
enforce_control_policy(stream, session, events, body_frag, frag_size,thread_id, ctx);
|
||||||
|
}
|
||||||
if(ctx->action != PG_ACTION_NONE)
|
if(ctx->action != PG_ACTION_NONE)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -131,7 +131,7 @@ struct pangu_logger* pangu_log_handle_create(const char* profile, const char* s
|
|||||||
TFE_LOG_ERROR(local_logger,"Pangu log init failed. Cannot create lafka handle with brokerlist: %s.", instance->brokerlist);
|
TFE_LOG_ERROR(local_logger,"Pangu log init failed. Cannot create lafka handle with brokerlist: %s.", instance->brokerlist);
|
||||||
goto error_out;
|
goto error_out;
|
||||||
}
|
}
|
||||||
instance->topic_name="PXY_HTTP_LOG";
|
instance->topic_name="PXY-HTTP-LOG";
|
||||||
instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL);
|
instance->kafka_topic = rd_kafka_topic_new(instance->kafka_handle,instance->topic_name, NULL);
|
||||||
pthread_mutex_init(&(instance->mutex), NULL);
|
pthread_mutex_init(&(instance->mutex), NULL);
|
||||||
return instance;
|
return instance;
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
|
|||||||
memset(&meta, 0, sizeof(meta));
|
memset(&meta, 0, sizeof(meta));
|
||||||
meta.url=session->req->req_spec.url;
|
meta.url=session->req->req_spec.url;
|
||||||
i=0;
|
i=0;
|
||||||
snprintf(buffer, sizeof(buffer), "content-type:%s",session->resp->resp_spec.content_length);
|
snprintf(buffer, sizeof(buffer), "content-type:%s",session->resp->resp_spec.content_type);
|
||||||
meta.std_hdr[i]=buffer;
|
meta.std_hdr[i]=buffer;
|
||||||
i++;
|
i++;
|
||||||
memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness));
|
memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness));
|
||||||
|
|||||||
Reference in New Issue
Block a user