使用缓存上传的future参数。

This commit is contained in:
zhengchao
2018-10-18 21:42:53 +08:00
parent 529f7037ba
commit 2f6be2c864
3 changed files with 38 additions and 7 deletions

View File

@@ -599,7 +599,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
__stream_bev_readcb(bev, arg); __stream_bev_readcb(bev, arg);
} }
fprintf(stderr, "---- eventcb ----, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); //fprintf(stderr, "---- eventcb ----, stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir);
goto __close_connection; goto __close_connection;
} }
@@ -621,8 +621,8 @@ __close_connection:
if (*ref_this_conn != NULL) if (*ref_this_conn != NULL)
{ {
fprintf(stderr, "---- eventcb ----, close this connection, " //fprintf(stderr, "---- eventcb ----, close this connection, "
"stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir); // "stream = %p, event = %x, dir = %s\n", _stream, events, __str_dir);
assert((*ref_this_conn)->on_writing == 0); assert((*ref_this_conn)->on_writing == 0);
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);

View File

@@ -927,7 +927,7 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h
} }
void cache_query(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx) void cache_query(const struct tfe_http_session * session, unsigned int thread_id, struct pangu_http_ctx * ctx)
{ {
ctx->f_cache_query=future_create("cache_query", cache_query_on_succ, cache_query_on_fail, ctx); ctx->f_cache_query=future_create("cache_down", cache_query_on_succ, cache_query_on_fail, ctx);
ctx->cache_query_status=async_web_cache_query(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_query); ctx->cache_query_status=async_web_cache_query(g_pangu_rt->cache, thread_id, session->req, ctx->f_cache_query);
if(ctx->cache_query_status==WEB_CACHE_QUERING) if(ctx->cache_query_status==WEB_CACHE_QUERING)
{ {
@@ -961,7 +961,7 @@ void cache_update(const struct tfe_http_session * session, enum tfe_http_event e
{ {
web_cache_update_end(ctx->cache_update_ctx); web_cache_update_end(ctx->cache_update_ctx);
ctx->cache_update_ctx=NULL; ctx->cache_update_ctx=NULL;
printf("cache update success: %s\n", ctx->ref_session->req->req_spec.url); //printf("cache update success: %s\n", ctx->ref_session->req->req_spec.url);
} }
} }

View File

@@ -386,11 +386,35 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig
query_ctx->is_undefined_obj=is_undefined_obj; query_ctx->is_undefined_obj=is_undefined_obj;
p=future_to_promise(f); p=future_to_promise(f);
promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb);
query_ctx->f_tango_cache_fetch=future_create("wrap_cache_qry", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p); query_ctx->f_tango_cache_fetch=future_create("_cache_qry", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p);
ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta);
assert(ret==0); assert(ret==0);
return WEB_CACHE_QUERING; return WEB_CACHE_QUERING;
} }
struct wrap_cache_up_ctx
{
char* url;
time_t start;
struct future* f;
};
void wrap_cache_up_ctx_free(struct wrap_cache_up_ctx* ctx)
{
FREE(&(ctx->url));
future_destroy(ctx->f);
free(ctx);
}
static void wrap_cache_update_on_succ(future_result_t * result, void * user)
{
struct wrap_cache_up_ctx* ctx=(struct wrap_cache_up_ctx*)user;
printf("cache upload success: %s elapse: %d\n", ctx->url, time(NULL)-ctx->start);
wrap_cache_up_ctx_free(ctx);
}
static void wrap_cache_update_on_fail(enum e_future_error err, const char * what, void * user)
{
struct wrap_cache_up_ctx* ctx=(struct wrap_cache_up_ctx*)user;
printf("cache upload failed: %s elapse: %d\n", ctx->url, time(NULL)-ctx->start);
wrap_cache_up_ctx_free(ctx);
}
struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id, struct cache_update_context* web_cache_update_start(struct cache_handle* handle, unsigned int thread_id,
const struct tfe_http_session * session) const struct tfe_http_session * session)
@@ -436,9 +460,16 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
meta.std_hdr[i]=buffer; meta.std_hdr[i]=buffer;
i++; i++;
memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness)); memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness));
write_ctx=tango_cache_update_start(handle->clients[thread_id], NULL, &meta);
struct wrap_cache_up_ctx* wrap_cache_up_ctx=ALLOC(struct wrap_cache_up_ctx, 1);
wrap_cache_up_ctx->url=tfe_strdup(session->req->req_spec.url);
wrap_cache_up_ctx->start=time(NULL);
wrap_cache_up_ctx->f=future_create("cache_up", wrap_cache_update_on_succ, wrap_cache_update_on_fail, wrap_cache_up_ctx);
printf("cache update start: %s\n", wrap_cache_up_ctx->url);
write_ctx=tango_cache_update_start(handle->clients[thread_id], wrap_cache_up_ctx->f, &meta);
if(write_ctx==NULL)//exceed maximum cache memory size. if(write_ctx==NULL)//exceed maximum cache memory size.
{ {
wrap_cache_up_ctx_free(wrap_cache_up_ctx);
return NULL; return NULL;
} }
if(is_undefined_obj) if(is_undefined_obj)