支持promise finish。

This commit is contained in:
zhangchengwei
2018-11-23 20:55:28 +08:00
committed by zhengchao
parent 313f36c58a
commit 9fc2a8a0d2
8 changed files with 85 additions and 29 deletions

View File

@@ -58,6 +58,7 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance
int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size);
int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf);
void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn);
void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn);
void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize);

View File

@@ -22,6 +22,7 @@ enum CACHE_ERR_CODE
CACHE_ERR_REDIS_JSON,
CACHE_ERR_REDIS_CONNECT,
CACHE_OUTOF_SESSION,
CACHE_UPDATE_CANCELED,
};
enum PUT_MEMORY_COPY_WAY
@@ -149,6 +150,7 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in
int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size);
int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf);
void tango_cache_update_end(struct tango_cache_ctx *ctx);
void tango_cache_update_cancel(struct tango_cache_ctx *ctx);
//<2F><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>keyֵ<79><D6B5><EFBFBD><EFBFBD>CACHE_OBJECT_KEY_HASH_SWITCH=1<><31><EFBFBD><EFBFBD><EFBFBD><EFBFBD>URL/<2F>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD>ϣʱ<CFA3><CAB1><EFBFBD><EFBFBD>
void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize);

View File

@@ -28,6 +28,7 @@ enum CACHE_ASYN_CMD
CACHE_ASYN_UPLOAD_FRAG_DATA,
CACHE_ASYN_UPLOAD_FRAG_EVBUF,
CACHE_ASYN_UPLOAD_END,
CACHE_ASYN_UPLOAD_CANCEL,
CACHE_ASYN_DELETE,
CACHE_ASYN_HEAD,
};
@@ -153,21 +154,21 @@ static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn)
static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
{
struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn;
struct future *f;
struct promise *p;
int ret=0;
switch(buffer->cmd_type)
{
case CACHE_ASYN_FETCH:
f = ctx_asyn->ctx->future;
p = ctx_asyn->ctx->promise;
if(tango_cache_fetch_start(ctx_asyn->ctx) < 0)
{
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed");
promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed");
}
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_HEAD:
f = ctx_asyn->ctx->future;
p = ctx_asyn->ctx->promise;
if(ctx_asyn->instance_asyn->instance->param->head_meta_source == HEAD_META_FROM_REDIS)
{
ret = tango_cache_head_redis(ctx_asyn->ctx);
@@ -178,7 +179,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
}
if(ret<0)
{
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed");
promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed");
}
cache_asyn_ctx_destroy(ctx_asyn);
break;
@@ -217,6 +218,10 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
tango_cache_update_end(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_CANCEL:
tango_cache_update_cancel(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
break;
default: assert(0);break;
}
}
@@ -286,6 +291,26 @@ void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn)
}
}
void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn)
{
struct databuffer *buffer;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_CANCEL;
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
if(!ctx_asyn->ctx->fail_state)
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
}
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
}
}
int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size)
{
struct databuffer *buffer;

View File

@@ -95,6 +95,7 @@ const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx)
case CACHE_ERR_REDIS_JSON:return "parse redis json error";
case CACHE_ERR_REDIS_CONNECT:return "redis is not connected";
case CACHE_OUTOF_SESSION:return "two many curl sessions";
case CACHE_UPDATE_CANCELED:return "update was canceled";
default: return ctx->error;
}
}
@@ -234,15 +235,15 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
curl_slist_free_all(ctx->headers);
}//no break here
case CACHE_REQUEST_DELETE:
if(callback && ctx->future != NULL)
if(callback && ctx->promise != NULL)
{
if(ctx->fail_state)
{
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
else
{
promise_success(future_to_promise(ctx->future), NULL);
promise_success(ctx->promise, NULL);
}
}
break;
@@ -257,6 +258,26 @@ void tango_cache_update_end(struct tango_cache_ctx *ctx)
cache_kick_upload_minio_end(ctx);
}
void tango_cache_update_cancel(struct tango_cache_ctx *ctx)
{
ctx->put.close_state = true;
if(ctx->curl != NULL)
{
curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl);
curl_easy_cleanup(ctx->curl);
ctx->curl = NULL;
}
tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED);
if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
}
else
{
tango_cache_ctx_destroy(ctx, false);
}
}
int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size)
{
if(ctx->fail_state)
@@ -325,7 +346,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->promise = future_to_promise(f);
ctx->method = CACHE_REQUEST_PUT;
if(instance->param->hash_object_key)
@@ -455,7 +476,8 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->promise = future_to_promise(f);
promise_allow_many_successes(ctx->promise);
ctx->method = method;
ctx->get.state = GET_STATE_START;
ctx->get.max_age = meta->get.max_age;
@@ -526,7 +548,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->promise = future_to_promise(f);
ctx->method = CACHE_REQUEST_DELETE;
if(instance->param->hash_object_key)
@@ -574,7 +596,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->promise = future_to_promise(f);
ctx->method = CACHE_REQUEST_DELETE_MUL;
ctx->del.succ_num = num;

View File

@@ -143,7 +143,7 @@ struct tango_cache_ctx
{
CURL *curl;
struct curl_slist *headers;
struct future* future;
struct promise* promise;
char error[CURL_ERROR_SIZE];
char object_key[256];
char hostaddr[48];

View File

@@ -266,19 +266,19 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo
{
tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
}
else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON);
if(reply!=NULL && reply->type==REDIS_REPLY_ERROR)
{
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, reply->str);
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str);
}
else
{
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
}
tango_cache_ctx_destroy(ctx);
@@ -290,7 +290,7 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo
{
case PARSE_JSON_RET_ERROR:
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
break;
case PARSE_JSON_RET_TIMEOUT:
@@ -303,7 +303,8 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo
case PARSE_JSON_RET_SUCC:
fetch_header_over_biz(ctx);
ctx->get.result.type = RESULT_TYPE_END;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;

View File

@@ -631,7 +631,7 @@ bool fetch_header_over_biz(struct tango_cache_ctx *ctx)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
ctx->get.state = GET_STATE_DELETE;
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
return false;
}
@@ -640,7 +640,7 @@ bool fetch_header_over_biz(struct tango_cache_ctx *ctx)
ctx->get.result.data_frag = ctx->get.response_tag.buff;
ctx->get.result.size = ctx->get.response_tag.len;
ctx->get.result.type = RESULT_TYPE_USERTAG;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
easy_string_destroy(&ctx->get.response_tag);
}
if(ctx->response.len > 0)
@@ -648,7 +648,7 @@ bool fetch_header_over_biz(struct tango_cache_ctx *ctx)
ctx->get.result.data_frag = ctx->response.buff;
ctx->get.result.size = ctx->response.len;
ctx->get.result.type = RESULT_TYPE_HEADER;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
easy_string_destroy(&ctx->response);
}
return true;
@@ -671,7 +671,7 @@ static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, vo
ctx->get.result.data_frag = (const char *)ptr;
ctx->get.result.size = size * count;
ctx->get.result.type = RESULT_TYPE_BODY;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
return size*count;
}
@@ -689,7 +689,8 @@ bool check_expires_fresh_header(struct tango_cache_ctx *ctx)
tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
ctx->get.state = GET_STATE_DELETE; //<2F><><EFBFBD><EFBFBD>ʧЧʱ<D0A7><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD>ɾ<EFBFBD><C9BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
easy_string_destroy(&ctx->response);
return false;
}
@@ -698,7 +699,8 @@ bool check_expires_fresh_header(struct tango_cache_ctx *ctx)
{
tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
easy_string_destroy(&ctx->response);
return false;
}
@@ -710,7 +712,7 @@ static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, lo
if(code != CURLE_OK)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
return false;
}
@@ -720,12 +722,13 @@ static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, lo
{
tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
}
else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
return false;
}
@@ -817,7 +820,8 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r
if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD<41><44><EFBFBD>ֵ<EFBFBD><D6B5>ֶβ<D6B6>ȫ<EFBFBD>Ȳ<EFBFBD>ɾ<EFBFBD><C9BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ޣ<EFBFBD>
{
ctx->get.result.type = RESULT_TYPE_END;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
}
}
tango_cache_ctx_destroy(ctx);

View File

@@ -17,6 +17,7 @@ void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, lon
int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back=false);
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback=false);
bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx);
void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx);
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len);