修改FETCH API函数名为tango_cache_fetch_object;增加DELETE API;调整ctx结构体内部结构;
This commit is contained in:
195
cache/tango_cache_client.cpp
vendored
195
cache/tango_cache_client.cpp
vendored
@@ -16,7 +16,7 @@
|
||||
#include "tango_cache_transfer.h"
|
||||
#include "tango_cache_tools.h"
|
||||
|
||||
int TANGO_CACHE_VERSION_20180910=0;
|
||||
int TANGO_CACHE_VERSION_20180925=0;
|
||||
|
||||
void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
|
||||
{
|
||||
@@ -51,6 +51,9 @@ void tango_cache_get_statistics(const struct tango_cache_instance *instance, str
|
||||
out->put_recv_num = instance->statistic.put_recv_num;
|
||||
out->put_succ_num = instance->statistic.put_succ_num;
|
||||
out->put_error_num= instance->statistic.put_error_num;
|
||||
out->del_recv_num = instance->statistic.del_recv_num;
|
||||
out->del_succ_num = instance->statistic.del_succ_num;
|
||||
out->del_error_num= instance->statistic.del_error_num;
|
||||
out->session_num = instance->statistic.session_num;
|
||||
out->memory_used = instance->statistic.memory_used;
|
||||
}
|
||||
@@ -67,30 +70,42 @@ const char *tango_cache_get_object_key(struct tango_cache_ctx *ctx)
|
||||
|
||||
static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic)
|
||||
{
|
||||
if(method == CACHE_REQUEST_PUT)
|
||||
switch(method)
|
||||
{
|
||||
if(fail_state)
|
||||
{
|
||||
statistic->put_error_num += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
statistic->put_succ_num += 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if(fail_state)
|
||||
{
|
||||
if(error_code == CACHE_ERR_CURL)
|
||||
statistic->get_error_num += 1;
|
||||
case CACHE_REQUEST_PUT:
|
||||
if(fail_state)
|
||||
{
|
||||
statistic->put_error_num += 1;
|
||||
}
|
||||
else
|
||||
statistic->get_miss_num += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
statistic->get_succ_num += 1;
|
||||
}
|
||||
{
|
||||
statistic->put_succ_num += 1;
|
||||
}
|
||||
break;
|
||||
case CACHE_REQUEST_GET:
|
||||
if(fail_state)
|
||||
{
|
||||
if(error_code == CACHE_ERR_CURL)
|
||||
statistic->get_error_num += 1;
|
||||
else
|
||||
statistic->get_miss_num += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
statistic->get_succ_num += 1;
|
||||
}
|
||||
break;
|
||||
case CACHE_REQUEST_DELETE:
|
||||
if(fail_state)
|
||||
{
|
||||
statistic->del_error_num += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
statistic->del_succ_num += 1;
|
||||
}
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,44 +134,46 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len
|
||||
|
||||
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
|
||||
{
|
||||
struct multipart_etag_list *etag;
|
||||
|
||||
if(ctx->curl != NULL)
|
||||
{
|
||||
curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl);
|
||||
curl_easy_cleanup(ctx->curl);
|
||||
}
|
||||
easy_string_destroy(&ctx->response);
|
||||
|
||||
if(ctx->method == CACHE_REQUEST_GET)
|
||||
{
|
||||
easy_string_destroy(&ctx->response_tag);
|
||||
}
|
||||
else
|
||||
{
|
||||
struct multipart_etag_list *etag;
|
||||
|
||||
if(ctx->uploadID != NULL) free(ctx->uploadID);
|
||||
if(ctx->combine_xml != NULL) free(ctx->combine_xml);
|
||||
if(ctx->headers != NULL) curl_slist_free_all(ctx->headers);
|
||||
if(ctx->evbuffer!=NULL) evbuffer_free(ctx->evbuffer);
|
||||
|
||||
TAILQ_FOREACH(etag, &ctx->cache_head, node)
|
||||
{
|
||||
TAILQ_REMOVE(&ctx->cache_head, etag, node);
|
||||
free(etag->etag);
|
||||
free(etag);
|
||||
}
|
||||
switch(ctx->method)
|
||||
{
|
||||
case CACHE_REQUEST_GET:
|
||||
easy_string_destroy(&ctx->get.response_tag);
|
||||
break;
|
||||
|
||||
if(ctx->future != NULL)
|
||||
{
|
||||
if(ctx->fail_state)
|
||||
case CACHE_REQUEST_PUT:
|
||||
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
|
||||
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
|
||||
if(ctx->headers != NULL) curl_slist_free_all(ctx->headers);
|
||||
if(ctx->put.evbuf!=NULL) evbuffer_free(ctx->put.evbuf);
|
||||
TAILQ_FOREACH(etag, &ctx->put.etag_head, node)
|
||||
{
|
||||
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error);
|
||||
}
|
||||
else
|
||||
TAILQ_REMOVE(&ctx->put.etag_head, etag, node);
|
||||
free(etag->etag);
|
||||
free(etag);
|
||||
}//no break here
|
||||
case CACHE_REQUEST_DELETE:
|
||||
if(ctx->future != NULL)
|
||||
{
|
||||
promise_success(future_to_promise(ctx->future), NULL);
|
||||
if(ctx->fail_state)
|
||||
{
|
||||
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error);
|
||||
}
|
||||
else
|
||||
{
|
||||
promise_success(future_to_promise(ctx->future), NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
default: break;
|
||||
}
|
||||
update_statistics(ctx->method, ctx->fail_state, ctx->error_code, &ctx->instance->statistic);
|
||||
free(ctx);
|
||||
@@ -173,12 +190,12 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data,
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
if(evbuffer_add(ctx->evbuffer, data, size))
|
||||
if(evbuffer_add(ctx->put.evbuf, data, size))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
ctx->instance->statistic.memory_used += size;
|
||||
if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size)
|
||||
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
|
||||
{
|
||||
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
|
||||
}
|
||||
@@ -197,20 +214,20 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
|
||||
size = evbuffer_get_length(evbuf);
|
||||
if(way == EVBUFFER_MOVE)
|
||||
{
|
||||
if(evbuffer_add_buffer(ctx->evbuffer, evbuf))
|
||||
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if(evbuffer_add_buffer_reference(ctx->evbuffer, evbuf))
|
||||
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
ctx->instance->statistic.memory_used += size;
|
||||
if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size)
|
||||
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
|
||||
{
|
||||
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
|
||||
}
|
||||
@@ -296,8 +313,8 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ctx->evbuffer = evbuffer_new();
|
||||
TAILQ_INIT(&ctx->cache_head);
|
||||
ctx->put.evbuf = evbuffer_new();
|
||||
TAILQ_INIT(&ctx->put.etag_head);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
@@ -346,9 +363,9 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
|
||||
ctx->instance = instance;
|
||||
ctx->future = future;
|
||||
ctx->method = CACHE_REQUEST_GET;
|
||||
ctx->get_state = GET_STATE_START;
|
||||
ctx->max_age = meta->get.max_age;
|
||||
ctx->min_fresh = meta->get.min_fresh;
|
||||
ctx->get.state = GET_STATE_START;
|
||||
ctx->get.max_age = meta->get.max_age;
|
||||
ctx->get.min_fresh = meta->get.min_fresh;
|
||||
instance->statistic.get_recv_num += 1;
|
||||
|
||||
if(instance->hash_object_key)
|
||||
@@ -360,15 +377,42 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
|
||||
{
|
||||
snprintf(ctx->object_key, 256, "%s", meta->url);
|
||||
}
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
int tango_cache_fetch(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta)
|
||||
int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta)
|
||||
{
|
||||
return tango_cache_fetch_start(tango_cache_fetch_prepare(instance, future, meta));
|
||||
}
|
||||
|
||||
struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey)
|
||||
{
|
||||
struct tango_cache_ctx *ctx;
|
||||
char sha256[72]={0};
|
||||
|
||||
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
|
||||
ctx->instance = instance;
|
||||
ctx->future = future;
|
||||
ctx->method = CACHE_REQUEST_DELETE;
|
||||
instance->statistic.del_recv_num += 1;
|
||||
|
||||
if(instance->hash_object_key)
|
||||
{
|
||||
caculate_sha256(objkey, strlen(objkey), sha256, 72);
|
||||
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
|
||||
}
|
||||
else
|
||||
{
|
||||
snprintf(ctx->object_key, 256, "%s", objkey);
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey)
|
||||
{
|
||||
return (cache_delete_minio_object(tango_cache_delete_prepare(instance, future, objkey))==1)?0:-1;
|
||||
}
|
||||
|
||||
static void check_multi_info(CURLM *multi)
|
||||
{
|
||||
CURLMsg *msg;
|
||||
@@ -387,20 +431,25 @@ static void check_multi_info(CURLM *multi)
|
||||
|
||||
easy = msg->easy_handle;
|
||||
res = msg->data.result;
|
||||
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx);
|
||||
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx);
|
||||
curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code);
|
||||
curl_multi_remove_handle(multi, easy);
|
||||
curl_easy_cleanup(easy);
|
||||
ctx->curl = NULL;
|
||||
ctx->res_code = 0;
|
||||
|
||||
if(ctx->method == CACHE_REQUEST_GET)
|
||||
switch(ctx->method)
|
||||
{
|
||||
tango_cache_curl_get_done(easy, ctx, res, res_code);
|
||||
}
|
||||
else
|
||||
{
|
||||
tango_cache_curl_put_done(easy, ctx, res, res_code);
|
||||
case CACHE_REQUEST_GET:
|
||||
tango_cache_curl_get_done(ctx, res, res_code);
|
||||
break;
|
||||
case CACHE_REQUEST_PUT:
|
||||
tango_cache_curl_put_done(ctx, res, res_code);
|
||||
break;
|
||||
case CACHE_REQUEST_DELETE:
|
||||
tango_cache_curl_del_done(ctx, res, res_code);
|
||||
break;
|
||||
default: break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -519,19 +568,13 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha
|
||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
|
||||
return -1;
|
||||
}
|
||||
MESA_load_profile_uint_def(profile_path, section, "CACHE_BLOCK_MAX_SIZE", &instance->block_len, 8192);
|
||||
if(instance->block_len > 16777216)
|
||||
{
|
||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BLOCK_MAX_SIZE too large, must smaller than 16777216(16MB).\n", profile_path, section);
|
||||
return -1;
|
||||
}
|
||||
MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880);
|
||||
if(instance->upload_block_size < 5242880)
|
||||
{
|
||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
|
||||
return -1;
|
||||
}
|
||||
MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 3600);
|
||||
MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999);
|
||||
if(intval < 60)
|
||||
{
|
||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
|
||||
|
||||
Reference in New Issue
Block a user