[1]统一GET/PUT结束后结果通知机制,API直接调用失败时不回调,其他情况回调(promise);

[2]hiredis版本确定为0.14.0版;
[3]修复tango_cache_ctx_destroy中TAILQ内存释放的BUG;
This commit is contained in:
zhangchengwei
2018-10-27 11:03:58 +08:00
committed by zhengchao
parent 4bb03d6e38
commit e1ad321332
11 changed files with 150 additions and 90 deletions

6
cache/README.txt vendored
View File

@@ -1,3 +1,3 @@
1<EFBFBD><EFBFBD>HEAD<EFBFBD><EFBFBD><EFBFBD><EFBFBD>֧<EFBFBD>ִ<EFBFBD>minio<EFBFBD><EFBFBD>redis<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȡ<EFBFBD><EFBFBD>Ĭ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>´<EFBFBD>minio<EFBFBD><EFBFBD>ȡ<EFBFBD><EFBFBD>
2<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>redis<EFBFBD><EFBFBD>ȡ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>Ӻ궨<EFBFBD><EFBFBD>-DHEAD_OBJECT_FROM_REDIS
3<EFBFBD><EFBFBD>ʹ<EFBFBD>õ<EFBFBD>redis<EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Լ<EFBFBD><EFBFBD>޸Ĺ<EFBFBD><EFBFBD>İ汾<EFBFBD><EFBFBD>support<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
1<EFBFBD><EFBFBD>HEAD<EFBFBD><EFBFBD><EFBFBD><EFBFBD>֧<EFBFBD>ִ<EFBFBD>minio<EFBFBD><EFBFBD>redis<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȡ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
2<EFBFBD><EFBFBD>ʹ<EFBFBD>õ<EFBFBD>redis<EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Լ<EFBFBD><EFBFBD>޸Ĺ<EFBFBD><EFBFBD>İ汾<EFBFBD><EFBFBD>vendor<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@@ -21,6 +21,7 @@ enum CACHE_ERR_CODE
CACHE_ERR_INTERNAL,
CACHE_ERR_REDIS_JSON,
CACHE_ERR_REDIS_CONNECT,
CACHE_OUTOF_SESSION,
};
enum PUT_MEMORY_COPY_WAY
@@ -81,7 +82,7 @@ enum CACHE_HTTP_HDR_TYPE
struct tango_cache_meta_get
{
const char* url; //<2F><><EFBFBD><EFBFBD>:URI<EFBFBD><EFBFBD><EFBFBD>ǽṹ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>־:<3A>ļ<EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><EFBFBD>'/'<27><>ͷ<EFBFBD><CDB7>CACHE_OBJECT_KEY_HASH_SWITCH=0ʱ<30><CAB1><EFBFBD>󳤶<EFBFBD>256<35>ֽڣ<D6BD>=1ʱ<31><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
const char* url; //<2F><><EFBFBD><EFBFBD>:URL<EFBFBD><EFBFBD><EFBFBD>ǽṹ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>־:<3A>ļ<EFBFBD>·<EFBFBD><C2B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>CACHE_OBJECT_KEY_HASH_SWITCH=0ʱ<30><CAB1><EFBFBD>󳤶<EFBFBD>256<35>ֽڣ<D6BD>=1ʱ<31><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
struct request_freshness get;
};

View File

@@ -153,36 +153,47 @@ static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn)
static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
{
struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn;
struct future *f;
int ret=0;
switch(buffer->cmd_type)
{
case CACHE_ASYN_FETCH:
tango_cache_fetch_start(ctx_asyn->ctx);
f = ctx_asyn->ctx->future;
if(tango_cache_fetch_start(ctx_asyn->ctx) < 0)
{
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed");
}
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_HEAD:
f = ctx_asyn->ctx->future;
if(ctx_asyn->instance_asyn->instance->head_meta_source == HEAD_META_FROM_REDIS)
{
tango_cache_head_redis(ctx_asyn->ctx);
ret = tango_cache_head_redis(ctx_asyn->ctx);
}
else
{
tango_cache_fetch_start(ctx_asyn->ctx);
}
ret = tango_cache_fetch_start(ctx_asyn->ctx);
}
if(ret<0)
{
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed");
}
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_DELETE:
cache_delete_minio_object(ctx_asyn->ctx);
cache_delete_minio_object(ctx_asyn->ctx, true);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_ONCE_DATA:
tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size);
tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_ONCE_EVBUF:
tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf);
tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true);
evbuffer_free(buffer->evbuf);
cache_asyn_ctx_destroy(ctx_asyn);
break;
@@ -361,8 +372,8 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance
//<2F>¼<EFBFBD>֪ͨ<CDA8><D6AA>Ϊ<EFBFBD><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͳ<EFBFBD><CDB3><EFBFBD><EFBFBD>Ϣ
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return NULL;
@@ -409,8 +420,8 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct
{
free(buffer->data);
free(buffer);
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
}
@@ -448,8 +459,8 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc
{
evbuffer_free(buffer->evbuf);
free(buffer);
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
}
@@ -476,9 +487,8 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx));
tango_cache_ctx_destroy(ctx_asyn->ctx);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;
@@ -506,9 +516,8 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx));
tango_cache_ctx_destroy(ctx_asyn->ctx);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;
@@ -537,12 +546,8 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
//<2F>ο<EFBFBD>Unix<69>߼<EFBFBD><DFBC><EFBFBD><EFBFBD><EFBFBD>432ҳ<32><D2B3><EFBFBD>ڶ<EFBFBD><DAB6>߳<EFBFBD>д<EFBFBD>İ<EFBFBD>ȫ<EFBFBD><C8AB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
if(f != NULL)
{
promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx));
}
tango_cache_ctx_destroy(ctx_asyn->ctx);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;

View File

@@ -94,6 +94,7 @@ const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx)
case CACHE_ERR_INTERNAL:return "internal error";
case CACHE_ERR_REDIS_JSON:return "parse redis json error";
case CACHE_ERR_REDIS_CONNECT:return "redis is not connected";
case CACHE_OUTOF_SESSION:return "two many curl sessions";
default: return ctx->error;
}
}
@@ -194,7 +195,8 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len
estr->buff[estr->len]='\0';
}
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
//callback: <20>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD>ûص<C3BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ҪΪ<D2AA><CEAA><EFBFBD><EFBFBD>ֱ<EFBFBD>ӵ<EFBFBD><D3B5><EFBFBD>APIʱʧ<CAB1>ܣ<EFBFBD><DCA3><EFBFBD><EFBFBD>ٵ<EFBFBD><D9B5>ûص<C3BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͨ<EFBFBD><CDA8><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֵ<EFBFBD>ж<EFBFBD>
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
{
struct multipart_etag_list *etag;
@@ -220,7 +222,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
evbuffer_free(ctx->put.evbuf);
}
TAILQ_FOREACH(etag, &ctx->put.etag_head, node)
while(NULL != (etag = TAILQ_FIRST(&ctx->put.etag_head)))
{
TAILQ_REMOVE(&ctx->put.etag_head, etag, node);
free(etag->etag);
@@ -232,7 +234,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
curl_slist_free_all(ctx->headers);
}//no break here
case CACHE_REQUEST_DELETE:
if(ctx->future != NULL)
if(callback && ctx->future != NULL)
{
if(ctx->fail_state)
{
@@ -263,6 +265,7 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data,
}
if(evbuffer_add(ctx->put.evbuf, data, size))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
return -1;
}
ctx->instance->statistic.memory_used += size;
@@ -287,6 +290,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
{
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
return -1;
}
}
@@ -294,6 +298,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
{
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
return -1;
}
}
@@ -311,7 +316,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
char buffer[2064];
time_t expires, now, last_modify;
if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size)
if((u_int64_t)instance->statistic.memory_used>=instance->cache_limit_size || instance->statistic.session_num>=instance->max_session_num)
{
instance->error_code = CACHE_OUTOF_MEMORY;
instance->statistic.totaldrop_num += 1;
@@ -441,6 +446,13 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
struct tango_cache_ctx *ctx;
char sha256[72];
if(instance->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
return NULL;
}
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
@@ -477,7 +489,7 @@ int tango_cache_fetch_object(struct tango_cache_instance *instance, struct futur
{
return -1;
}
return tango_cache_fetch_start(ctx);
return (tango_cache_fetch_start(ctx)==1)?0:-1;
}
int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
@@ -496,7 +508,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future
}
else
{
return tango_cache_fetch_start(ctx);
return (tango_cache_fetch_start(ctx)==1)?0:-1;
}
}
@@ -505,6 +517,13 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
struct tango_cache_ctx *ctx;
char sha256[72];
if(instance->statistic.session_num >= instance->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
return NULL;
}
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
@@ -546,6 +565,13 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
struct tango_cache_ctx *ctx;
char md5[48]={0}, content_md5[48];
if(instance->statistic.session_num >= instance->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
return NULL;
}
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
@@ -755,9 +781,12 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha
MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 0);
instance->max_cnn_host = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &instance->max_session_num, 200);
MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120);
longval = intval;
instance->cache_limit_size = longval * 1024 * 1024;
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15);
instance->transfer_timeout = intval;
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);

View File

@@ -70,6 +70,8 @@ struct tango_cache_instance
time_t relative_ttl; //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ч<EFBFBD><D0A7>
u_int64_t cache_limit_size;
long max_cnn_host;
long transfer_timeout;//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
u_int32_t max_session_num;
u_int32_t upload_block_size; //minio<69>ֶ<EFBFBD><D6B6>ϴ<EFBFBD><CFB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С<EFBFBD><D0A1><EFBFBD><EFBFBD>
enum CACHE_ERR_CODE error_code;
@@ -154,8 +156,9 @@ void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_
void easy_string_savedata(struct easy_string *estr, const char *data, size_t len);
void easy_string_destroy(struct easy_string *estr);
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx);
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback=true);
void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code);
const char *tango_cache_errcode2str(enum CACHE_ERR_CODE err_code);
const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx);
struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta);

View File

@@ -11,7 +11,7 @@
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include <cjson/cJSON.h>
#include <cJSON/cJSON.h>
#include "tango_cache_transfer.h"
#include "tango_cache_tools.h"
@@ -39,7 +39,7 @@ struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]=
{"content-md5", "Content-MD5: "}
};
void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
@@ -54,7 +54,7 @@ void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
instance->redis_connecting = CACHE_REDIS_DISCONNECTED;
}
void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
@@ -85,7 +85,7 @@ int redis_asyn_connect_init(struct tango_cache_instance *instance)
return 0;
}
int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent)
static int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent)
{
cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires;
int ret = PARSE_JSON_RET_ERROR;
@@ -160,7 +160,7 @@ out_json:
return ret;
}
void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata)
static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata)
{
redisReply *reply = (redisReply *)vreply;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata;

View File

@@ -127,6 +127,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //<2F><><EFBFBD>Է<EFBFBD><D4B7><EFBFBD><EFBFBD><EFBFBD>ij<EFBFBD><C4B3><EFBFBD>ӽ<EFBFBD><D3BD>տ<EFBFBD>ס<EFBFBD><D7A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
@@ -200,7 +201,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
return 1;
}
int cache_delete_minio_object(struct tango_cache_ctx *ctx)
int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
{
CURLMcode rc;
char minio_url[256];
@@ -208,8 +209,9 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx)
ctx->instance->statistic.del_recv_num += 1;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx); //<2F>ս<EFBFBD><D5BD><EFBFBD>
return 0;
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, call_back); //<2F>ս<EFBFBD><D5BD><EFBFBD>
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
@@ -330,9 +332,10 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block
return true;
}
int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx)
//callbackֱ<6B><D6B1>ʧ<EFBFBD><CAA7><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD>ûص<C3BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʽ<EFBFBD><CABD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>Բ<EFBFBD><D4B2><EFBFBD>Ҫ
static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback)
{
int ret=0;
int ret=-1;
ctx->put.state = PUT_STATE_END;
ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
@@ -342,33 +345,30 @@ int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx)
if(ret <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx);
tango_cache_ctx_destroy(ctx, callback);
}
}
else
{
tango_cache_ctx_destroy(ctx);
tango_cache_ctx_destroy(ctx, callback);
}
return ret;
}
int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx)
void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx)
{
int ret = 0;
DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not");
ctx->put.close_state = true;//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>״̬<D7B4><CCAC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>رգ<D8B1><D5A3>ڲ<EFBFBD>״̬<D7B4><CCAC><EFBFBD><EFBFBD>ת<EFBFBD><D7AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ٹر<D9B9>
if(ctx->fail_state)
{
tango_cache_ctx_destroy(ctx);
return 0;
return;
}
switch(ctx->put.state)
{
case PUT_STATE_START:
ret = http_put_complete_part_evbuf(ctx);
http_put_complete_part_evbuf(ctx, true);
break;
case PUT_STATE_PART:
@@ -387,20 +387,16 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx)
tango_cache_ctx_destroy(ctx);
}
}
else
else if(http_put_bodypart_request_evbuf(ctx, false) <= 0)
{
ret = http_put_bodypart_request_evbuf(ctx, false);
if(ret <= 0)
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
if(cache_cancel_upload_minio(ctx))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
if(cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
}
else
{
tango_cache_ctx_destroy(ctx);
}
ctx->put.state = PUT_STATE_CANCEL;
}
else
{
tango_cache_ctx_destroy(ctx);
}
}
}
@@ -410,8 +406,6 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx)
case PUT_STATE_WAIT_START: //<2F><>ʱδ<CAB1><CEB4>ȡ<EFBFBD><C8A1>uploadId<49><64><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>޷<EFBFBD><DEB7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD>
default: break;
}
return ret;
}
void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
@@ -496,7 +490,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
}
}
int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size)
int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
{
CURLMcode rc;
char minio_url[256];
@@ -505,11 +499,9 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM
ctx->instance->error_code = CACHE_OK;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx);
if(way == PUT_MEM_FREE)
{
free((void *)data);
}
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, callback);
if(way == PUT_MEM_FREE) free((void *)data);
return -1;
}
ctx->put.state = PUT_STATE_END;
@@ -524,6 +516,7 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //<2F><><EFBFBD>Է<EFBFBD><D4B7><EFBFBD><EFBFBD><EFBFBD>ij<EFBFBD><C4B3><EFBFBD>ӽ<EFBFBD><D3BD>տ<EFBFBD>ס<EFBFBD><D7A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
@@ -550,7 +543,7 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM
return 0;
}
int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf)
int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback)
{
size_t size;
@@ -562,6 +555,8 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF
{
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
}
@@ -569,12 +564,14 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF
{
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
}
ctx->instance->statistic.memory_used += size;
return http_put_complete_part_evbuf(ctx);
return http_put_complete_part_evbuf(ctx, callback);
}
void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
@@ -616,7 +613,7 @@ void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, lon
tango_cache_ctx_destroy(ctx);
}
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx)
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback)
{
CURLMcode rc;
char minio_url[256];
@@ -625,7 +622,8 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx)
ctx->instance->error_code = CACHE_OK;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx);
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
@@ -888,6 +886,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //<2F><><EFBFBD>Է<EFBFBD><D4B7><EFBFBD><EFBFBD><EFBFBD>ij<EFBFBD><C4B3><EFBFBD>ӽ<EFBFBD><D3BD>տ<EFBFBD>ס<EFBFBD><D7A1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
//ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds"
@@ -896,6 +895,6 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 0;
return 1;
}

View File

@@ -14,14 +14,14 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r
void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
int cache_delete_minio_object(struct tango_cache_ctx *ctx);
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx);
int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back=false);
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback=false);
int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx);
void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx);
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len);
int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size);
int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf);
int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool call_back=false);
int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool call_back=false);
int tango_cache_fetch_start(struct tango_cache_ctx *ctx);

Binary file not shown.

View File

@@ -76,6 +76,10 @@ void get_future_success(future_result_t* result, void * user)
void get_future_failed(enum e_future_error err, const char * what, void * user)
{
struct future_pdata *pdata = (struct future_pdata *)user;
future_destroy(pdata->future);
fclose(pdata->fp);
free(pdata);
printf("GET fail: %s\n", what);
}
@@ -110,6 +114,10 @@ void head_future_success(future_result_t* result, void * user)
void head_future_failed(enum e_future_error err, const char * what, void * user)
{
struct future_pdata *pdata = (struct future_pdata *)user;
future_destroy(pdata->future);
free(pdata);
printf("HEAD fail: %s\n", what);
}
@@ -204,8 +212,6 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
struct future_pdata *pdata;
struct tango_cache_ctx *ctx;
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
do
{
@@ -230,23 +236,30 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
if(!strcasecmp(p, "GET"))
{
sprintf(filename, "file_index_%u.bin", index++);
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->fp = fopen(filename, "w");
pdata->future = future_create(get_future_success, get_future_failed, pdata);
tango_cache_fetch_object(tango_instance, pdata->future, &getmeta);
if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta) < 0)
{
get_future_failed(FUTURE_ERROR_CANCEL, "", pdata);
}
}
else if(!strcasecmp(p, "HEAD"))
{
sprintf(filename, "file_index_%u.bin", index++);
pdata->fp = fopen(filename, "w");
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->future = future_create(head_future_success, head_future_failed, pdata);
tango_cache_head_object(tango_instance, pdata->future, &getmeta);
if(tango_cache_head_object(tango_instance, pdata->future, &getmeta) < 0)
{
head_future_failed(FUTURE_ERROR_CANCEL, "", pdata);
}
}
else if(!strcasecmp(p, "PUTONCE"))
{
size_t filelen;
p = get_file_content(s, &filelen);
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->future = future_create(put_future_success, put_future_failed, pdata);
tango_cache_upload_once_data(tango_instance, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256);
@@ -254,6 +267,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
else if(!strcasecmp(p, "PUTONCEEV"))
{
size_t readlen;
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->future = future_create(put_future_success, put_future_failed, pdata);
struct evbuffer *evbuf = evbuffer_new();
char buffer[1024];
@@ -273,12 +287,14 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
}
else if(!strcasecmp(p, "DEL"))
{
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->future = future_create(del_future_success, del_future_failed, pdata);
sprintf(pdata->filename, "%s", s);
tango_cache_delete_object(tango_instance, pdata->future, s);
}
else if(!strcasecmp(p, "DELMUL")) //TODO
{
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->future = future_create(del_future_success, del_future_failed, pdata);
sprintf(pdata->filename, "%s", s);
@@ -290,9 +306,15 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
}
else
{
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
pdata->future = future_create(put_future_success, put_future_failed, pdata);
ctx = tango_cache_update_start(tango_instance, pdata->future, &putmeta);
if(ctx==NULL)
{
put_future_failed(FUTURE_ERROR_CANCEL, "NULL", pdata);
continue;
}
tango_cache_get_object_path(ctx, pdata->filename, 256);
FILE *fp = fopen(s, "r");
@@ -302,6 +324,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
assert(n>=0);
tango_cache_update_frag_data(ctx, buffer, n);
}
fclose(fp);
tango_cache_update_end(ctx);
}
}