diff --git a/cache/cache_evbase_client.cpp b/cache/cache_evbase_client.cpp index d7b8482..675a169 100644 --- a/cache/cache_evbase_client.cpp +++ b/cache/cache_evbase_client.cpp @@ -26,6 +26,7 @@ enum CACHE_ASYN_CMD CACHE_ASYN_UPLOAD_FRAG_DATA, CACHE_ASYN_UPLOAD_FRAG_EVBUF, CACHE_ASYN_UPLOAD_END, + CACHE_ASYN_DELETE, }; struct databuffer @@ -157,6 +158,11 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) cache_asyn_ctx_destroy(ctx_asyn); break; + case CACHE_ASYN_DELETE: + cache_delete_minio_object(ctx_asyn->ctx); + cache_asyn_ctx_destroy(ctx_asyn); + break; + case CACHE_ASYN_UPLOAD_ONCE_DATA: tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size); cache_asyn_ctx_destroy(ctx_asyn); @@ -234,23 +240,19 @@ void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn) { struct databuffer *buffer; - if(ctx_asyn->ctx->fail_state) - { - tango_cache_ctx_destroy(ctx_asyn->ctx); - cache_asyn_ctx_destroy(ctx_asyn); - return; - } - buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_END; if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) + if(!ctx_asyn->ctx->fail_state) { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); + ctx_asyn->ctx->fail_state = true; + if(ctx_asyn->ctx->future != NULL) + { + promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); + } } tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); @@ -292,7 +294,6 @@ int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_ME free(buffer); return -1; } - return 0; } @@ -379,11 +380,6 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) - { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); - } free(buffer->data); free(buffer); tango_cache_ctx_destroy(ctx); @@ -422,11 +418,6 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) - { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); - } evbuffer_free(buffer->evbuf); free(buffer); tango_cache_ctx_destroy(ctx); @@ -436,7 +427,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc return 0; } -int cache_evbase_fetch(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta) +int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct cache_evbase_ctx *ctx_asyn; struct databuffer *buffer; @@ -451,11 +442,29 @@ int cache_evbase_fetch(struct cache_evbase_instance *instance, struct future* fu if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) - { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); - } + tango_cache_ctx_destroy(ctx_asyn->ctx); + cache_asyn_ctx_destroy(ctx_asyn); + free(buffer); + return -1; + } + return 0; +} + +int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* future, const char *objkey) +{ + struct cache_evbase_ctx *ctx_asyn; + struct databuffer *buffer; + + ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); + ctx_asyn->instance_asyn = instance; + ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, future, objkey); + + buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); + buffer->ctx_asyn = ctx_asyn; + buffer->cmd_type = CACHE_ASYN_DELETE; + + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + { tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index eb81600..83d0534 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -20,20 +20,23 @@ struct cache_evbase_ctx struct cache_evbase_instance *instance_asyn; }; -/*API的使用说明参考tango_cache_client.h*/ +/*所有API线程不安全,API的使用说明参考tango_cache_client.h*/ enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn); enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance); void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out); - +/*创建实例,每线程一个,或使用时加锁*/ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog); -//GET接口 -int cache_evbase_fetch(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta); +//GET接口,成功返回0,失败返回-1,future回调函数会在另外的线程中执行,下同 +int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta); struct tango_cache_result *cache_evbase_read_result(void *promise_result); +//DELETE接口 +int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* future, const char *objkey); + //一次性上传接口 int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index 03e0d52..218332b 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -39,6 +39,9 @@ struct cache_statistics long long put_recv_num; //发起UPLOAD的次数 long long put_succ_num; //UPLOAD成功的次数 long long put_error_num;//UPLOAD失败的次数 + long long del_recv_num; //发起DELETE的次数 + long long del_succ_num; //DELETE成功的次数 + long long del_error_num;//DELETE成功的次数 long long memory_used; //当前UPLOAD BODY所占内存大小 long long session_num; //当前正在进行GET/PUT的HTTP会话数 }; @@ -47,13 +50,13 @@ enum CACHE_RESULT_TYPE { RESULT_TYPE_HEADER=0, //只调用一次 RESULT_TYPE_USERTAG, //只调用一次 - RESULT_TYPE_BODY, + RESULT_TYPE_BODY, //可能调用多次 }; //promise_success的结果result struct tango_cache_result { - const void *data_frag; //如果type为RESULT_TYPE_HEADER,内容会包含一个换行 + const void *data_frag; //如果type为RESULT_TYPE_HEADER,每个头部后会包含一个换行(HTTP1.1格式) size_t size; enum CACHE_RESULT_TYPE type; }; @@ -70,7 +73,7 @@ enum CACHE_HTTP_HDR_TYPE struct tango_cache_meta { - const char* url; //缓存:URL;非结构化日志:文件名 + const char* url; //缓存:URL;非结构化日志:文件名;最大长度256字节 const char* std_hdr[HDR_CONTENT_NUM]; //完整头部,如"Content-Type: text/html",不要包含换行;NULL表示没有该头部; const char* usertag; //可以是任意内容,GET时会原样返回 size_t usertag_len; //最大长度USER_TAG_MAX_LEN,0表示没有该头部 @@ -92,16 +95,18 @@ void tango_cache_get_statistics(const struct tango_cache_instance *instance, str struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog); - +/* GET接口的API*/ //成功时回调promise_success, result为NULL时表示结束; //失败时回调promise_failed(仅一次),使用get_last_error获取错误码; //future不可以为NULL -int tango_cache_fetch(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); - -//从promise_success的result参数提取结果;需要Caller调用tango_cache_result_release释放; +int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); +//从promise_success的result参数提取结果 struct tango_cache_result *tango_cache_read_result(void *promise_result); +/* DELETE接口的API*/ +int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey); + /* UPLOAD接口的API * 注意: UPLOAD接口的API,若future不为NULL,则在上传结束时会调用通知回调函数,否则不调用; diff --git a/cache/tango_cache_client.cpp b/cache/tango_cache_client.cpp index f413b4b..3b7317b 100644 --- a/cache/tango_cache_client.cpp +++ b/cache/tango_cache_client.cpp @@ -16,7 +16,7 @@ #include "tango_cache_transfer.h" #include "tango_cache_tools.h" -int TANGO_CACHE_VERSION_20180910=0; +int TANGO_CACHE_VERSION_20180925=0; void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) { @@ -51,6 +51,9 @@ void tango_cache_get_statistics(const struct tango_cache_instance *instance, str out->put_recv_num = instance->statistic.put_recv_num; out->put_succ_num = instance->statistic.put_succ_num; out->put_error_num= instance->statistic.put_error_num; + out->del_recv_num = instance->statistic.del_recv_num; + out->del_succ_num = instance->statistic.del_succ_num; + out->del_error_num= instance->statistic.del_error_num; out->session_num = instance->statistic.session_num; out->memory_used = instance->statistic.memory_used; } @@ -67,30 +70,42 @@ const char *tango_cache_get_object_key(struct tango_cache_ctx *ctx) static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic) { - if(method == CACHE_REQUEST_PUT) + switch(method) { - if(fail_state) - { - statistic->put_error_num += 1; - } - else - { - statistic->put_succ_num += 1; - } - } - else - { - if(fail_state) - { - if(error_code == CACHE_ERR_CURL) - statistic->get_error_num += 1; + case CACHE_REQUEST_PUT: + if(fail_state) + { + statistic->put_error_num += 1; + } else - statistic->get_miss_num += 1; - } - else - { - statistic->get_succ_num += 1; - } + { + statistic->put_succ_num += 1; + } + break; + case CACHE_REQUEST_GET: + if(fail_state) + { + if(error_code == CACHE_ERR_CURL) + statistic->get_error_num += 1; + else + statistic->get_miss_num += 1; + } + else + { + statistic->get_succ_num += 1; + } + break; + case CACHE_REQUEST_DELETE: + if(fail_state) + { + statistic->del_error_num += 1; + } + else + { + statistic->del_succ_num += 1; + } + break; + default:break; } } @@ -119,44 +134,46 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) { + struct multipart_etag_list *etag; + if(ctx->curl != NULL) { curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); curl_easy_cleanup(ctx->curl); } easy_string_destroy(&ctx->response); - - if(ctx->method == CACHE_REQUEST_GET) - { - easy_string_destroy(&ctx->response_tag); - } - else - { - struct multipart_etag_list *etag; - - if(ctx->uploadID != NULL) free(ctx->uploadID); - if(ctx->combine_xml != NULL) free(ctx->combine_xml); - if(ctx->headers != NULL) curl_slist_free_all(ctx->headers); - if(ctx->evbuffer!=NULL) evbuffer_free(ctx->evbuffer); - TAILQ_FOREACH(etag, &ctx->cache_head, node) - { - TAILQ_REMOVE(&ctx->cache_head, etag, node); - free(etag->etag); - free(etag); - } + switch(ctx->method) + { + case CACHE_REQUEST_GET: + easy_string_destroy(&ctx->get.response_tag); + break; - if(ctx->future != NULL) - { - if(ctx->fail_state) + case CACHE_REQUEST_PUT: + if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); + if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); + if(ctx->headers != NULL) curl_slist_free_all(ctx->headers); + if(ctx->put.evbuf!=NULL) evbuffer_free(ctx->put.evbuf); + TAILQ_FOREACH(etag, &ctx->put.etag_head, node) { - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error); - } - else + TAILQ_REMOVE(&ctx->put.etag_head, etag, node); + free(etag->etag); + free(etag); + }//no break here + case CACHE_REQUEST_DELETE: + if(ctx->future != NULL) { - promise_success(future_to_promise(ctx->future), NULL); + if(ctx->fail_state) + { + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error); + } + else + { + promise_success(future_to_promise(ctx->future), NULL); + } } - } + break; + default: break; } update_statistics(ctx->method, ctx->fail_state, ctx->error_code, &ctx->instance->statistic); free(ctx); @@ -173,12 +190,12 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, { return -1; } - if(evbuffer_add(ctx->evbuffer, data, size)) + if(evbuffer_add(ctx->put.evbuf, data, size)) { return -1; } ctx->instance->statistic.memory_used += size; - if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size) + if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size) { cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); } @@ -197,20 +214,20 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP size = evbuffer_get_length(evbuf); if(way == EVBUFFER_MOVE) { - if(evbuffer_add_buffer(ctx->evbuffer, evbuf)) + if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { return -1; } } else { - if(evbuffer_add_buffer_reference(ctx->evbuffer, evbuf)) + if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { return -1; } } ctx->instance->statistic.memory_used += size; - if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size) + if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size) { cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); } @@ -296,8 +313,8 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in return NULL; } - ctx->evbuffer = evbuffer_new(); - TAILQ_INIT(&ctx->cache_head); + ctx->put.evbuf = evbuffer_new(); + TAILQ_INIT(&ctx->put.etag_head); return ctx; } @@ -346,9 +363,9 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i ctx->instance = instance; ctx->future = future; ctx->method = CACHE_REQUEST_GET; - ctx->get_state = GET_STATE_START; - ctx->max_age = meta->get.max_age; - ctx->min_fresh = meta->get.min_fresh; + ctx->get.state = GET_STATE_START; + ctx->get.max_age = meta->get.max_age; + ctx->get.min_fresh = meta->get.min_fresh; instance->statistic.get_recv_num += 1; if(instance->hash_object_key) @@ -360,15 +377,42 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i { snprintf(ctx->object_key, 256, "%s", meta->url); } - return ctx; } -int tango_cache_fetch(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) +int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) { return tango_cache_fetch_start(tango_cache_fetch_prepare(instance, future, meta)); } +struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey) +{ + struct tango_cache_ctx *ctx; + char sha256[72]={0}; + + ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); + ctx->instance = instance; + ctx->future = future; + ctx->method = CACHE_REQUEST_DELETE; + instance->statistic.del_recv_num += 1; + + if(instance->hash_object_key) + { + caculate_sha256(objkey, strlen(objkey), sha256, 72); + snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); + } + else + { + snprintf(ctx->object_key, 256, "%s", objkey); + } + return ctx; +} + +int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey) +{ + return (cache_delete_minio_object(tango_cache_delete_prepare(instance, future, objkey))==1)?0:-1; +} + static void check_multi_info(CURLM *multi) { CURLMsg *msg; @@ -387,20 +431,25 @@ static void check_multi_info(CURLM *multi) easy = msg->easy_handle; res = msg->data.result; - curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); + curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code); curl_multi_remove_handle(multi, easy); curl_easy_cleanup(easy); ctx->curl = NULL; ctx->res_code = 0; - if(ctx->method == CACHE_REQUEST_GET) + switch(ctx->method) { - tango_cache_curl_get_done(easy, ctx, res, res_code); - } - else - { - tango_cache_curl_put_done(easy, ctx, res, res_code); + case CACHE_REQUEST_GET: + tango_cache_curl_get_done(ctx, res, res_code); + break; + case CACHE_REQUEST_PUT: + tango_cache_curl_put_done(ctx, res, res_code); + break; + case CACHE_REQUEST_DELETE: + tango_cache_curl_del_done(ctx, res, res_code); + break; + default: break; } } } @@ -519,19 +568,13 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section); return -1; } - MESA_load_profile_uint_def(profile_path, section, "CACHE_BLOCK_MAX_SIZE", &instance->block_len, 8192); - if(instance->block_len > 16777216) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BLOCK_MAX_SIZE too large, must smaller than 16777216(16MB).\n", profile_path, section); - return -1; - } MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880); if(instance->upload_block_size < 5242880) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); return -1; } - MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 3600); + MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999); if(intval < 60) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); diff --git a/cache/tango_cache_client_in.h b/cache/tango_cache_client_in.h index 13470ea..dadb14e 100644 --- a/cache/tango_cache_client_in.h +++ b/cache/tango_cache_client_in.h @@ -17,6 +17,7 @@ enum CACHE_REQUEST_METHOD { CACHE_REQUEST_GET=0, CACHE_REQUEST_PUT, + CACHE_REQUEST_DELETE, }; enum GET_OBJECT_STATE @@ -54,7 +55,6 @@ struct tango_cache_instance time_t relative_ttl; //缓存的相对有效期 u_int64_t cache_limit_size; long max_cnn_host; - u_int32_t block_len; //申请buffercache内存的缓存块大小,每次update块大小最好不要超过该值,否则会增加拷贝次数 u_int32_t upload_block_size; //minio分段上传块的最小长度 enum CACHE_ERR_CODE error_code; u_int32_t hash_object_key; @@ -67,6 +67,30 @@ struct multipart_etag_list TAILQ_ENTRY(multipart_etag_list) node; }; +struct cache_ctx_data_get +{ + time_t max_age;//Get + time_t min_fresh;//Get + time_t expires; + time_t last_modify; + u_int32_t need_hdrs; + enum GET_OBJECT_STATE state; + struct easy_string response_tag; +}; + +struct cache_ctx_data_put +{ + struct evbuffer *evbuf; + size_t upload_length; + size_t upload_offset; + char *uploadID; + char *combine_xml; + TAILQ_HEAD(__etag_list_head, multipart_etag_list) etag_head; + enum PUT_OBJECT_STATE state; + u_int32_t part_index; //宏RESPONSE_HDR_ + bool close_state; //主动被调用关闭 +}; + struct tango_cache_ctx { CURL *curl; @@ -74,34 +98,18 @@ struct tango_cache_ctx struct future* future; char error[CURL_ERROR_SIZE]; char object_key[256]; - char hostport[24]; //相同ctx使用相同的IP,保证pipeline顺序性 enum CACHE_REQUEST_METHOD method; enum CACHE_ERR_CODE error_code; - - struct evbuffer *evbuffer; - union{ - enum PUT_OBJECT_STATE put_state; - enum GET_OBJECT_STATE get_state; - }; - u_int32_t part_index; - u_int32_t need_hdrs; //宏RESPONSE_HDR_ - bool fail_state; - bool close_state; //主动被调用关闭 - long res_code; - time_t max_age;//Get - time_t min_fresh;//Get - time_t expires; - time_t last_modify; - - size_t upload_length; - size_t upload_offset; - char *uploadID; - char *combine_xml; struct easy_string response; - struct easy_string response_tag; - TAILQ_HEAD(__etag_list_head, multipart_etag_list) cache_head; + + bool fail_state; + long res_code; + union{ + struct cache_ctx_data_put put; + struct cache_ctx_data_get get; + }; struct tango_cache_instance *instance; }; @@ -111,13 +119,13 @@ struct curl_socket_data }; void easy_string_savedata(struct easy_string *estr, const char *data, size_t len); -void easy_string_expand(struct easy_string *estr, size_t to_size); void easy_string_destroy(struct easy_string *estr); void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx); struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); +struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey); #endif diff --git a/cache/tango_cache_transfer.cpp b/cache/tango_cache_transfer.cpp index 251055b..ed0bb6e 100644 --- a/cache/tango_cache_transfer.cpp +++ b/cache/tango_cache_transfer.cpp @@ -36,10 +36,10 @@ static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list)); totallen = end - start + 1; etag->etag = (char *)malloc(totallen + 1); - etag->part_number = ctx->part_index; + etag->part_number = ctx->put.part_index; memcpy(etag->etag, start, totallen); *(etag->etag + totallen) = '\0'; - TAILQ_INSERT_TAIL(&ctx->cache_head, etag, node); + TAILQ_INSERT_TAIL(&ctx->put.etag_head, etag, node); } } @@ -78,19 +78,19 @@ static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, v size_t len, space=size*count, send_len; struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - if(size==0 || count==0 || ctx->upload_offset>=ctx->upload_length) + if(size==0 || count==0 || ctx->put.upload_offset>=ctx->put.upload_length) { return 0; } - len = ctx->upload_length - ctx->upload_offset; + len = ctx->put.upload_length - ctx->put.upload_offset; if(len > space) { len = space; } - send_len = evbuffer_remove(ctx->evbuffer, ptr, len); + send_len = evbuffer_remove(ctx->put.evbuf, ptr, len); assert(send_len>0); - ctx->upload_offset += send_len; + ctx->put.upload_offset += send_len; ctx->instance->statistic.memory_used -= send_len; return send_len; @@ -107,14 +107,14 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful return -1; } - ctx->upload_offset = 0; + ctx->put.upload_offset = 0; if(full) { snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key); } else { - snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key, ++ctx->part_index, ctx->uploadID); + snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID); curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb); curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); } @@ -132,7 +132,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L); curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->upload_length); + curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length); curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); @@ -199,14 +199,14 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) return 1; } -bool cache_delete_minio_object(struct tango_cache_ctx *ctx) +int cache_delete_minio_object(struct tango_cache_ctx *ctx) { CURLMcode rc; char minio_url[256]; if(NULL == (ctx->curl=curl_easy_init())) { - return false; + return 0; } snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key); @@ -223,7 +223,7 @@ bool cache_delete_minio_object(struct tango_cache_ctx *ctx) rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); - return true; + return 1; } //return value: true-成功添加事件;false-未添加事件 @@ -237,7 +237,7 @@ bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx) return false; } - snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key, ctx->uploadID); + snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID); curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); @@ -265,9 +265,9 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) { return false; } - construct_complete_xml(ctx, &ctx->combine_xml, &len); + construct_complete_xml(ctx, &ctx->put.combine_xml, &len); - snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key, ctx->uploadID); + snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID); curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); @@ -279,7 +279,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->combine_xml); + curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml); curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length if(ctx->headers != NULL) @@ -300,17 +300,17 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block { int ret = 1; - switch(ctx->put_state) + switch(ctx->put.state) { case PUT_STATE_START: - ctx->put_state = PUT_STATE_WAIT_START; + ctx->put.state = PUT_STATE_WAIT_START; ret = curl_get_minio_uploadID(ctx); break; case PUT_STATE_PART: if(ctx->curl == NULL) { - ctx->upload_length = block_len; + ctx->put.upload_length = block_len; ret = http_put_bodypart_request_evbuf(ctx, false); } break; @@ -331,9 +331,9 @@ int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx) { int ret=0; - ctx->put_state = PUT_STATE_END; - ctx->upload_length = evbuffer_get_length(ctx->evbuffer); - if(ctx->upload_length > 0) + ctx->put.state = PUT_STATE_END; + ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); + if(ctx->put.upload_length > 0) { ret = http_put_bodypart_request_evbuf(ctx, true); if(ret <= 0) @@ -355,14 +355,14 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) { int ret = 0; - ctx->close_state = true;//仅设置状态,并非真正关闭;内部状态机轮转结束后再关闭 + ctx->put.close_state = true;//仅设置状态,并非真正关闭;内部状态机轮转结束后再关闭 if(ctx->fail_state) { tango_cache_ctx_destroy(ctx); return 0; } - switch(ctx->put_state) + switch(ctx->put.state) { case PUT_STATE_START: http_put_complete_part_evbuf(ctx); @@ -371,12 +371,12 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) case PUT_STATE_PART: if(ctx->curl == NULL) { - ctx->upload_length = evbuffer_get_length(ctx->evbuffer); - if(ctx->upload_length == 0) + ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); + if(ctx->put.upload_length == 0) { if(cache_kick_combine_minio(ctx)) { - ctx->put_state = PUT_STATE_END; + ctx->put.state = PUT_STATE_END; } else { @@ -394,7 +394,7 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) ctx->error_code = CACHE_ERR_CURL; if(cache_cancel_upload_minio(ctx)) { - ctx->put_state = PUT_STATE_CANCEL; + ctx->put.state = PUT_STATE_CANCEL; } else { @@ -413,18 +413,18 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) return ret; } -void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code) +void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) { - switch(ctx->put_state) + switch(ctx->put.state) { case PUT_STATE_WAIT_START: - if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->uploadID)) + if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID)) { easy_string_destroy(&ctx->response); ctx->error_code = CACHE_ERR_CURL; ctx->fail_state = true; if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); - if(ctx->close_state) + if(ctx->put.close_state) { tango_cache_ctx_destroy(ctx); } @@ -432,14 +432,14 @@ void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode else { easy_string_destroy(&ctx->response); - ctx->put_state = PUT_STATE_PART; - if(ctx->close_state) + ctx->put.state = PUT_STATE_PART; + if(ctx->put.close_state) { cache_kick_upload_minio_end(ctx); } else { - size_t upload_length = evbuffer_get_length(ctx->evbuffer); + size_t upload_length = evbuffer_get_length(ctx->put.evbuf); if(upload_length >= ctx->instance->upload_block_size) { cache_kick_upload_minio_multipart(ctx, upload_length); @@ -459,20 +459,20 @@ void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode { if(cache_cancel_upload_minio(ctx)) { - ctx->put_state = PUT_STATE_CANCEL; + ctx->put.state = PUT_STATE_CANCEL; } - else if(ctx->close_state) + else if(ctx->put.close_state) { tango_cache_ctx_destroy(ctx); } } - else if(ctx->close_state) + else if(ctx->put.close_state) { cache_kick_upload_minio_end(ctx); } else { - size_t upload_length = evbuffer_get_length(ctx->evbuffer); + size_t upload_length = evbuffer_get_length(ctx->put.evbuf); if(upload_length >= ctx->instance->upload_block_size) { cache_kick_upload_minio_multipart(ctx, upload_length); @@ -481,7 +481,7 @@ void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode break; case PUT_STATE_CANCEL: //等待关闭 - if(ctx->close_state) + if(ctx->put.close_state) { tango_cache_ctx_destroy(ctx); } @@ -514,7 +514,7 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM } return -1; } - ctx->put_state = PUT_STATE_END; + ctx->put.state = PUT_STATE_END; snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); @@ -556,18 +556,18 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF { size_t size; - ctx->evbuffer = evbuffer_new(); + ctx->put.evbuf = evbuffer_new(); size = evbuffer_get_length(evbuf); if(way == EVBUFFER_MOVE) { - if(evbuffer_add_buffer(ctx->evbuffer, evbuf)) + if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { return -1; } } else { - if(evbuffer_add_buffer_reference(ctx->evbuffer, evbuf)) + if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { return -1; } @@ -577,9 +577,18 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF return http_put_complete_part_evbuf(ctx); } -void tango_cache_curl_get_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code) +void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) { - switch(ctx->get_state) + if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) + { + ctx->fail_state = true; + } + tango_cache_ctx_destroy(ctx); +} + +void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) +{ + switch(ctx->get.state) { case GET_STATE_START: if(!ctx->fail_state) @@ -600,7 +609,7 @@ void tango_cache_curl_get_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode case GET_STATE_DELETE: if(cache_delete_minio_object(ctx)) { - ctx->get_state = GET_STATE_END; + ctx->get.state = GET_STATE_END; } else { @@ -620,27 +629,28 @@ static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, vo struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; struct tango_cache_result result; - if(ctx->fail_state || ctx->get_state==GET_STATE_DELETE) + if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE) { return size*count; } - if(ctx->need_hdrs!=RESPONSE_HDR_ALL) //无Expires时 + if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //无Expires时 { ctx->fail_state = true; ctx->error_code = CACHE_ERR_CURL; - ctx->get_state = GET_STATE_DELETE; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache Expires or last-modify not found"); + ctx->get.state = GET_STATE_DELETE; + ctx->instance->statistic.del_recv_num += 1; + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache Expires or x-amz-meta-lm not found"); return size*count; } - if(ctx->response_tag.len > 0) + if(ctx->get.response_tag.len > 0) { - result.data_frag = ctx->response_tag.buff; - result.size = ctx->response_tag.len; + result.data_frag = ctx->get.response_tag.buff; + result.size = ctx->get.response_tag.len; result.type = RESULT_TYPE_USERTAG; promise_success(future_to_promise(ctx->future), &result); - easy_string_destroy(&ctx->response_tag); + easy_string_destroy(&ctx->get.response_tag); } if(ctx->response.len > 0) { @@ -661,14 +671,15 @@ static bool check_expires_header(struct tango_cache_ctx *ctx, const char *expire { time_t time_gmt; - ctx->expires = expires_hdr2timestamp(expires_val, len); + ctx->get.expires = expires_hdr2timestamp(expires_val, len); time_gmt = get_gmtime_timestamp(time(NULL)); - if(time_gmt > ctx->expires) //缓存失效;TODO relative_age的含义是啥 + if(time_gmt > ctx->get.expires) //缓存失效;TODO relative_age的含义是啥 { ctx->fail_state = true; ctx->error_code = CACHE_TIMEOUT; - ctx->get_state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作 + ctx->get.state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作 + ctx->instance->statistic.del_recv_num += 1; easy_string_destroy(&ctx->response); promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh"); return false; @@ -680,11 +691,11 @@ static bool check_fresh_header(struct tango_cache_ctx *ctx) { time_t now_gmt; - if(ctx->need_hdrs != RESPONSE_HDR_ALL) + if(ctx->get.need_hdrs != RESPONSE_HDR_ALL) return true; now_gmt = get_gmtime_timestamp(time(NULL)); - if(ctx->last_modify+ctx->max_age > now_gmt || now_gmt+ctx->min_fresh>ctx->expires) + if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires) { ctx->fail_state = true; ctx->error_code = CACHE_TIMEOUT; @@ -716,8 +727,9 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, char *start=(char *)ptr, *pos_colon; size_t raw_len = size*count, hdrlen=size*count; char usertag[2048]; + size_t datalen; - if(ctx->fail_state || ctx->get_state==GET_STATE_DELETE) + if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE) { return raw_len; } @@ -725,49 +737,51 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, { return raw_len; } - - if((pos_colon=(char*)memchr(start, ':', raw_len))!=NULL) + pos_colon = (char*)memchr(start, ':', raw_len); + if(pos_colon == NULL) { - size_t datalen = pos_colon - start; - switch(datalen) - { - case 7: - if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7)) + return raw_len; + } + + datalen = pos_colon - start; + switch(datalen) + { + case 7: + if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7)) + { + ctx->get.need_hdrs |= RESPONSE_HDR_EXPIRES; + if(!check_expires_header(ctx, pos_colon + 1, raw_len - datalen - 1) || !check_fresh_header(ctx)) { - ctx->need_hdrs |= RESPONSE_HDR_EXPIRES; - if(!check_expires_header(ctx, pos_colon + 1, raw_len - datalen - 1) || !check_fresh_header(ctx)) - { - return raw_len; - } + return raw_len; } - break; - case 13: - if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13)) + } + break; + case 13: + if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13)) + { + ctx->get.need_hdrs |= RESPONSE_HDR_LAST_MOD; + sscanf(pos_colon+1, "%lu", &ctx->get.last_modify); + if(!check_fresh_header(ctx)) { - ctx->need_hdrs |= RESPONSE_HDR_LAST_MOD; - sscanf(pos_colon+1, "%lu", &ctx->last_modify); - if(!check_fresh_header(ctx)) - { - return raw_len; - } + return raw_len; } - break; - case 15: - if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15)) + } + break; + case 15: + if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15)) + { + if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0) { - if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0) - { - easy_string_savedata(&ctx->response_tag, usertag, hdrlen); - } + easy_string_savedata(&ctx->get.response_tag, usertag, hdrlen); } - break; - case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 14: if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; - default: break; - } + } + break; + case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; + case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; + case 14: if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; + case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; + case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break; + default: break; } return raw_len; } diff --git a/cache/tango_cache_transfer.h b/cache/tango_cache_transfer.h index da5f210..1f1d5b5 100644 --- a/cache/tango_cache_transfer.h +++ b/cache/tango_cache_transfer.h @@ -6,10 +6,11 @@ #include "tango_cache_client_in.h" -void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code); -void tango_cache_curl_get_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code); +void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); +void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); +void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); -bool cache_delete_minio_object(struct tango_cache_ctx *ctx); +int cache_delete_minio_object(struct tango_cache_ctx *ctx); int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len); diff --git a/cache/tango_cache_xml.cpp b/cache/tango_cache_xml.cpp index 4e90987..d7d7a80 100644 --- a/cache/tango_cache_xml.cpp +++ b/cache/tango_cache_xml.cpp @@ -58,7 +58,7 @@ int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/"); xmlDocSetRootElement(pdoc, root); - TAILQ_FOREACH(etag, &ctx->cache_head, node) + TAILQ_FOREACH(etag, &ctx->put.etag_head, node) { sprintf(number, "%u", etag->part_number); child = xmlNewChild(root, NULL, (const xmlChar*)"Part", NULL); diff --git a/cache/test_demo/cache_evbase_test.cpp b/cache/test_demo/cache_evbase_test.cpp index 1b6f572..172041e 100644 --- a/cache/test_demo/cache_evbase_test.cpp +++ b/cache/test_demo/cache_evbase_test.cpp @@ -22,7 +22,7 @@ #include "cache_evbase_client.h" struct cache_evbase_instance *instance_asyn; -int still_runing=1; +int runing_over=0; struct future_pdata { @@ -59,15 +59,14 @@ void get_future_success(future_result_t* result, void * user) future_destroy(pdata->future); fclose(pdata->fp); free(pdata); - still_runing = 0; + runing_over = 1; } } void get_future_failed(enum e_future_error err, const char * what, void * user) { printf("GET fail: %s\n", what); - sleep(5); - still_runing = 0; + runing_over = 2; } void put_future_success(future_result_t* result, void * user) @@ -77,7 +76,7 @@ void put_future_success(future_result_t* result, void * user) printf("PUT %s succ\n", pdata->filename); future_destroy(pdata->future); free(pdata); - still_runing = 0; + runing_over = 1; } void put_future_failed(enum e_future_error err, const char * what, void * user) { @@ -86,7 +85,26 @@ void put_future_failed(enum e_future_error err, const char * what, void * user) printf("PUT %s fail: %s\n", what, pdata->filename); future_destroy(pdata->future); free(pdata); - still_runing = 0; + runing_over = 1; +} + +void del_future_success(future_result_t* result, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s succ\n", pdata->filename); + future_destroy(pdata->future); + free(pdata); + runing_over = 1; +} +void del_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s fail: %s\n", pdata->filename, what); + future_destroy(pdata->future); + free(pdata); + runing_over = 1; } char * get_file_content(const char *filename, size_t *filelen_out) @@ -139,7 +157,7 @@ int main(int argc, char **argv) if(argc != 2 && argc!=3) { - printf("USGAE: %s [get_out_file_index]\n", argv[0]); + printf("USGAE: %s [get_out_file_index]\n", argv[0]); return -1; } if(argc==3) @@ -182,7 +200,14 @@ int main(int argc, char **argv) promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); pdata->fp = fopen(filename_out, "w"); - cache_evbase_fetch(instance_asyn, pdata->future, &meta); + cache_evbase_fetch_object(instance_asyn, pdata->future, &meta); + } + else if(!strcasecmp(p, "DEL")) + { + pdata->future = future_create(del_future_success, del_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + sprintf(pdata->filename, "%s", filename_in); + cache_evbase_delete_object(instance_asyn, pdata->future, filename_in); } else if(!strcasecmp(p, "PUTONCE")) { @@ -237,7 +262,7 @@ int main(int argc, char **argv) printf("Waiting to finish.......\n"); static int num=0; - while(still_runing) + while(!runing_over) { /*if(++num==10) { @@ -245,11 +270,15 @@ int main(int argc, char **argv) }*/ sleep(1); } + if(runing_over==2) //GET时超时删除,来不及 + { + sleep(5); + } struct cache_statistics out; cache_evbase_get_statistics(instance_asyn, &out); - printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, session: %llu, memory: %llu\n", - out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.session_num, out.memory_used); + printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, session: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.session_num, out.memory_used); return 0; } diff --git a/cache/test_demo/pangu_tg_cahce.conf b/cache/test_demo/pangu_tg_cahce.conf index fa7e465..f6aeddf 100644 --- a/cache/test_demo/pangu_tg_cahce.conf +++ b/cache/test_demo/pangu_tg_cahce.conf @@ -8,9 +8,6 @@ MAX_CONNECTION_PER_HOST=10 #bucket鐨勫悕绉 CACHE_BUCKET_NAME=openbucket -#鍐呴儴鍒嗛厤缂撳瓨鍧楃殑澶у皬锛屾牴鎹笂浼犵殑鏂囦欢澶у皬鍒嗗竷鑰屽畾锛屾渶澶16777216锛16MB锛 -CACHE_BLOCK_MAX_SIZE=8192 - #缂撳瓨鏈澶у崰鐢ㄧ殑鍐呭瓨绌洪棿澶у皬锛岃秴鍑虹┖闂存椂涓婁紶澶辫触 MAX_USED_MEMORY_SIZE_MB=5120 diff --git a/cache/test_demo/tango_cache_test.c b/cache/test_demo/tango_cache_test.c index 4cb6394..ce8f16d 100644 --- a/cache/test_demo/tango_cache_test.c +++ b/cache/test_demo/tango_cache_test.c @@ -175,7 +175,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) pdata->future = future_create(get_future_success, get_future_failed, pdata); promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); - tango_cache_fetch(tango_instance, pdata->future, &meta); + tango_cache_fetch_object(tango_instance, pdata->future, &meta); } else if(!strcasecmp(p, "PUTONCE")) {