From 9fc2a8a0d229274741d8db1fb0a0fc9253e61eea Mon Sep 17 00:00:00 2001 From: zhangchengwei Date: Fri, 23 Nov 2018 20:55:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81promise=20finish=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cache/include/cache_evbase_client.h | 1 + cache/include/tango_cache_client.h | 2 ++ cache/src/cache_evbase_client.cpp | 35 ++++++++++++++++++++++++---- cache/src/tango_cache_client.cpp | 36 +++++++++++++++++++++++------ cache/src/tango_cache_client_in.h | 2 +- cache/src/tango_cache_redis.cpp | 13 ++++++----- cache/src/tango_cache_transfer.cpp | 24 +++++++++++-------- cache/src/tango_cache_transfer.h | 1 + 8 files changed, 85 insertions(+), 29 deletions(-) diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index 46db0ea..5bead2f 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -58,6 +58,7 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf); void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn); +void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn); void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize); diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index cf7294a..1592b8e 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -22,6 +22,7 @@ enum CACHE_ERR_CODE CACHE_ERR_REDIS_JSON, CACHE_ERR_REDIS_CONNECT, CACHE_OUTOF_SESSION, + CACHE_UPDATE_CANCELED, }; enum PUT_MEMORY_COPY_WAY @@ -149,6 +150,7 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size); int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf); void tango_cache_update_end(struct tango_cache_ctx *ctx); +void tango_cache_update_cancel(struct tango_cache_ctx *ctx); //获取对象key值;当CACHE_OBJECT_KEY_HASH_SWITCH=1开启对URL/文件名哈希时有用 void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize); diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp index a173832..ca2f6c5 100644 --- a/cache/src/cache_evbase_client.cpp +++ b/cache/src/cache_evbase_client.cpp @@ -28,6 +28,7 @@ enum CACHE_ASYN_CMD CACHE_ASYN_UPLOAD_FRAG_DATA, CACHE_ASYN_UPLOAD_FRAG_EVBUF, CACHE_ASYN_UPLOAD_END, + CACHE_ASYN_UPLOAD_CANCEL, CACHE_ASYN_DELETE, CACHE_ASYN_HEAD, }; @@ -153,21 +154,21 @@ static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn) static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) { struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn; - struct future *f; + struct promise *p; int ret=0; switch(buffer->cmd_type) { case CACHE_ASYN_FETCH: - f = ctx_asyn->ctx->future; + p = ctx_asyn->ctx->promise; if(tango_cache_fetch_start(ctx_asyn->ctx) < 0) { - promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed"); + promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed"); } cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_HEAD: - f = ctx_asyn->ctx->future; + p = ctx_asyn->ctx->promise; if(ctx_asyn->instance_asyn->instance->param->head_meta_source == HEAD_META_FROM_REDIS) { ret = tango_cache_head_redis(ctx_asyn->ctx); @@ -178,7 +179,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) } if(ret<0) { - promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed"); + promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed"); } cache_asyn_ctx_destroy(ctx_asyn); break; @@ -217,6 +218,10 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) tango_cache_update_end(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); break; + case CACHE_ASYN_UPLOAD_CANCEL: + tango_cache_update_cancel(ctx_asyn->ctx); + cache_asyn_ctx_destroy(ctx_asyn); + break; default: assert(0);break; } } @@ -286,6 +291,26 @@ void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn) } } +void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn) +{ + struct databuffer *buffer; + + buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); + buffer->ctx_asyn = ctx_asyn; + buffer->cmd_type = CACHE_ASYN_UPLOAD_CANCEL; + + if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) + { + if(!ctx_asyn->ctx->fail_state) + { + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); + } + tango_cache_ctx_destroy(ctx_asyn->ctx, false); + cache_asyn_ctx_destroy(ctx_asyn); + free(buffer); + } +} + int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) { struct databuffer *buffer; diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index 7f67c6c..46eac9e 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -95,6 +95,7 @@ const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx) case CACHE_ERR_REDIS_JSON:return "parse redis json error"; case CACHE_ERR_REDIS_CONNECT:return "redis is not connected"; case CACHE_OUTOF_SESSION:return "two many curl sessions"; + case CACHE_UPDATE_CANCELED:return "update was canceled"; default: return ctx->error; } } @@ -234,15 +235,15 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) curl_slist_free_all(ctx->headers); }//no break here case CACHE_REQUEST_DELETE: - if(callback && ctx->future != NULL) + if(callback && ctx->promise != NULL) { if(ctx->fail_state) { - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } else { - promise_success(future_to_promise(ctx->future), NULL); + promise_success(ctx->promise, NULL); } } break; @@ -257,6 +258,26 @@ void tango_cache_update_end(struct tango_cache_ctx *ctx) cache_kick_upload_minio_end(ctx); } +void tango_cache_update_cancel(struct tango_cache_ctx *ctx) +{ + ctx->put.close_state = true; + if(ctx->curl != NULL) + { + curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); + curl_easy_cleanup(ctx->curl); + ctx->curl = NULL; + } + tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED); + if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx)) + { + ctx->put.state = PUT_STATE_CANCEL; + } + else + { + tango_cache_ctx_destroy(ctx, false); + } +} + int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size) { if(ctx->fail_state) @@ -325,7 +346,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; - ctx->future = f; + ctx->promise = future_to_promise(f); ctx->method = CACHE_REQUEST_PUT; if(instance->param->hash_object_key) @@ -455,7 +476,8 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; - ctx->future = f; + ctx->promise = future_to_promise(f); + promise_allow_many_successes(ctx->promise); ctx->method = method; ctx->get.state = GET_STATE_START; ctx->get.max_age = meta->get.max_age; @@ -526,7 +548,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; - ctx->future = f; + ctx->promise = future_to_promise(f); ctx->method = CACHE_REQUEST_DELETE; if(instance->param->hash_object_key) @@ -574,7 +596,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; - ctx->future = f; + ctx->promise = future_to_promise(f); ctx->method = CACHE_REQUEST_DELETE_MUL; ctx->del.succ_num = num; diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index ec8b4f5..8a76868 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -143,7 +143,7 @@ struct tango_cache_ctx { CURL *curl; struct curl_slist *headers; - struct future* future; + struct promise* promise; char error[CURL_ERROR_SIZE]; char object_key[256]; char hostaddr[48]; diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp index 5c771fc..be78795 100644 --- a/cache/src/tango_cache_redis.cpp +++ b/cache/src/tango_cache_redis.cpp @@ -266,19 +266,19 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo { tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(future_to_promise(ctx->future), &ctx->get.result); - + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); } else { tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); if(reply!=NULL && reply->type==REDIS_REPLY_ERROR) { - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, reply->str); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, reply->str); } else { - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } } tango_cache_ctx_destroy(ctx); @@ -290,7 +290,7 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo { case PARSE_JSON_RET_ERROR: tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); tango_cache_ctx_destroy(ctx); break; case PARSE_JSON_RET_TIMEOUT: @@ -303,7 +303,8 @@ static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, vo case PARSE_JSON_RET_SUCC: fetch_header_over_biz(ctx); ctx->get.result.type = RESULT_TYPE_END; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); tango_cache_ctx_destroy(ctx); break; default: assert(0);break; diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp index ad97087..f744f25 100644 --- a/cache/src/tango_cache_transfer.cpp +++ b/cache/src/tango_cache_transfer.cpp @@ -631,7 +631,7 @@ bool fetch_header_over_biz(struct tango_cache_ctx *ctx) { tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); ctx->get.state = GET_STATE_DELETE; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); return false; } @@ -640,7 +640,7 @@ bool fetch_header_over_biz(struct tango_cache_ctx *ctx) ctx->get.result.data_frag = ctx->get.response_tag.buff; ctx->get.result.size = ctx->get.response_tag.len; ctx->get.result.type = RESULT_TYPE_USERTAG; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); easy_string_destroy(&ctx->get.response_tag); } if(ctx->response.len > 0) @@ -648,7 +648,7 @@ bool fetch_header_over_biz(struct tango_cache_ctx *ctx) ctx->get.result.data_frag = ctx->response.buff; ctx->get.result.size = ctx->response.len; ctx->get.result.type = RESULT_TYPE_HEADER; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); easy_string_destroy(&ctx->response); } return true; @@ -671,7 +671,7 @@ static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, vo ctx->get.result.data_frag = (const char *)ptr; ctx->get.result.size = size * count; ctx->get.result.type = RESULT_TYPE_BODY; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); return size*count; } @@ -689,7 +689,8 @@ bool check_expires_fresh_header(struct tango_cache_ctx *ctx) tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); ctx->get.state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作 ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); easy_string_destroy(&ctx->response); return false; } @@ -698,7 +699,8 @@ bool check_expires_fresh_header(struct tango_cache_ctx *ctx) { tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); easy_string_destroy(&ctx->response); return false; } @@ -710,7 +712,7 @@ static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, lo if(code != CURLE_OK) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); return false; } @@ -720,12 +722,13 @@ static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, lo { tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); ctx->get.result.type = RESULT_TYPE_MISS; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); } else { tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); + promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } return false; } @@ -817,7 +820,8 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD发现的字段不全先不删,正常情况下无; { ctx->get.result.type = RESULT_TYPE_END; - promise_success(future_to_promise(ctx->future), &ctx->get.result); + promise_success(ctx->promise, &ctx->get.result); + promise_finish(ctx->promise); } } tango_cache_ctx_destroy(ctx); diff --git a/cache/src/tango_cache_transfer.h b/cache/src/tango_cache_transfer.h index c5db9d5..12bfa0b 100644 --- a/cache/src/tango_cache_transfer.h +++ b/cache/src/tango_cache_transfer.h @@ -17,6 +17,7 @@ void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, lon int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back=false); int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback=false); +bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx); void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len);