diff --git a/cache/README.txt b/cache/README.txt index 62589e8..1360813 100644 --- a/cache/README.txt +++ b/cache/README.txt @@ -1,3 +1,3 @@ -1、HEAD操作支持从minio和redis两种渠道获取,默认情况下从minio获取。 -2、若想从redis获取,编译时加宏定义-DHEAD_OBJECT_FROM_REDIS -3、使用的redis客户端是自己修改过的版本(support下提供)。 +1、HEAD操作支持从minio和redis两种渠道获取,根据配置文件决定。 + +2、使用的redis客户端是自己修改过的版本(vendor下提供)。 diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index 1e45187..df60254 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -21,6 +21,7 @@ enum CACHE_ERR_CODE CACHE_ERR_INTERNAL, CACHE_ERR_REDIS_JSON, CACHE_ERR_REDIS_CONNECT, + CACHE_OUTOF_SESSION, }; enum PUT_MEMORY_COPY_WAY @@ -81,7 +82,7 @@ enum CACHE_HTTP_HDR_TYPE struct tango_cache_meta_get { - const char* url; //缓存:URI;非结构化日志:文件路径名。不要以'/'开头;CACHE_OBJECT_KEY_HASH_SWITCH=0时最大长度256字节,=1时无限制 + const char* url; //缓存:URL;非结构化日志:文件路径名。CACHE_OBJECT_KEY_HASH_SWITCH=0时最大长度256字节,=1时无限制 struct request_freshness get; }; diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp index 4ce8799..5383384 100644 --- a/cache/src/cache_evbase_client.cpp +++ b/cache/src/cache_evbase_client.cpp @@ -153,36 +153,47 @@ static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn) static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) { struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn; + struct future *f; + int ret=0; switch(buffer->cmd_type) { case CACHE_ASYN_FETCH: - tango_cache_fetch_start(ctx_asyn->ctx); + f = ctx_asyn->ctx->future; + if(tango_cache_fetch_start(ctx_asyn->ctx) < 0) + { + promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed"); + } cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_HEAD: + f = ctx_asyn->ctx->future; if(ctx_asyn->instance_asyn->instance->head_meta_source == HEAD_META_FROM_REDIS) { - tango_cache_head_redis(ctx_asyn->ctx); + ret = tango_cache_head_redis(ctx_asyn->ctx); } else { - tango_cache_fetch_start(ctx_asyn->ctx); - } + ret = tango_cache_fetch_start(ctx_asyn->ctx); + } + if(ret<0) + { + promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed"); + } cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_DELETE: - cache_delete_minio_object(ctx_asyn->ctx); + cache_delete_minio_object(ctx_asyn->ctx, true); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_ONCE_DATA: - tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size); + tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_ONCE_EVBUF: - tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); + tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true); evbuffer_free(buffer->evbuf); cache_asyn_ctx_destroy(ctx_asyn); break; @@ -361,8 +372,8 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance //事件通知仅为了增加统计信息 if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx_asyn->ctx); + instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return NULL; @@ -409,8 +420,8 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct { free(buffer->data); free(buffer); - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx); + instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_ctx_destroy(ctx, false); cache_asyn_ctx_destroy(ctx_asyn); return -2; } @@ -448,8 +459,8 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc { evbuffer_free(buffer->evbuf); free(buffer); - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - tango_cache_ctx_destroy(ctx); + instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_ctx_destroy(ctx, false); cache_asyn_ctx_destroy(ctx_asyn); return -2; } @@ -476,9 +487,8 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx)); - tango_cache_ctx_destroy(ctx_asyn->ctx); + instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return -2; @@ -506,9 +516,8 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx)); - tango_cache_ctx_destroy(ctx_asyn->ctx); + instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return -2; @@ -537,12 +546,8 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu //参考Unix高级编程432页关于多线程写的安全性描述 if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); - if(f != NULL) - { - promise_failed(future_to_promise(f), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx)); - } - tango_cache_ctx_destroy(ctx_asyn->ctx); + instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return -2; diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index 0efec36..c6fa89d 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -94,6 +94,7 @@ const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx) case CACHE_ERR_INTERNAL:return "internal error"; case CACHE_ERR_REDIS_JSON:return "parse redis json error"; case CACHE_ERR_REDIS_CONNECT:return "redis is not connected"; + case CACHE_OUTOF_SESSION:return "two many curl sessions"; default: return ctx->error; } } @@ -194,7 +195,8 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len estr->buff[estr->len]='\0'; } -void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) +//callback: 是否调用回调函数,主要为解决直接调用API时失败,不再调用回调,而是通过返回值判断 +void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) { struct multipart_etag_list *etag; @@ -220,7 +222,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); evbuffer_free(ctx->put.evbuf); } - TAILQ_FOREACH(etag, &ctx->put.etag_head, node) + while(NULL != (etag = TAILQ_FIRST(&ctx->put.etag_head))) { TAILQ_REMOVE(&ctx->put.etag_head, etag, node); free(etag->etag); @@ -232,7 +234,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) curl_slist_free_all(ctx->headers); }//no break here case CACHE_REQUEST_DELETE: - if(ctx->future != NULL) + if(callback && ctx->future != NULL) { if(ctx->fail_state) { @@ -263,6 +265,7 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, } if(evbuffer_add(ctx->put.evbuf, data, size)) { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); return -1; } ctx->instance->statistic.memory_used += size; @@ -287,6 +290,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP { if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); return -1; } } @@ -294,6 +298,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP { if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); return -1; } } @@ -311,7 +316,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * char buffer[2064]; time_t expires, now, last_modify; - if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size) + if((u_int64_t)instance->statistic.memory_used>=instance->cache_limit_size || instance->statistic.session_num>=instance->max_session_num) { instance->error_code = CACHE_OUTOF_MEMORY; instance->statistic.totaldrop_num += 1; @@ -441,6 +446,13 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i struct tango_cache_ctx *ctx; char sha256[72]; + if(instance->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->max_session_num) + { + instance->error_code = CACHE_OUTOF_SESSION; + instance->statistic.totaldrop_num += 1; + return NULL; + } + ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->future = f; @@ -477,7 +489,7 @@ int tango_cache_fetch_object(struct tango_cache_instance *instance, struct futur { return -1; } - return tango_cache_fetch_start(ctx); + return (tango_cache_fetch_start(ctx)==1)?0:-1; } int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta) @@ -496,7 +508,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future } else { - return tango_cache_fetch_start(ctx); + return (tango_cache_fetch_start(ctx)==1)?0:-1; } } @@ -505,6 +517,13 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * struct tango_cache_ctx *ctx; char sha256[72]; + if(instance->statistic.session_num >= instance->max_session_num) + { + instance->error_code = CACHE_OUTOF_SESSION; + instance->statistic.totaldrop_num += 1; + return NULL; + } + ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->future = f; @@ -546,6 +565,13 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst struct tango_cache_ctx *ctx; char md5[48]={0}, content_md5[48]; + if(instance->statistic.session_num >= instance->max_session_num) + { + instance->error_code = CACHE_OUTOF_SESSION; + instance->statistic.totaldrop_num += 1; + return NULL; + } + ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->future = f; @@ -755,9 +781,12 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 0); instance->max_cnn_host = intval; + MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &instance->max_session_num, 200); MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120); longval = intval; instance->cache_limit_size = longval * 1024 * 1024; + MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15); + instance->transfer_timeout = intval; if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256) < 0) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section); diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index bd82dbc..a9b7688 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -70,6 +70,8 @@ struct tango_cache_instance time_t relative_ttl; //缓存的相对有效期 u_int64_t cache_limit_size; long max_cnn_host; + long transfer_timeout;//传输总时间限制 + u_int32_t max_session_num; u_int32_t upload_block_size; //minio分段上传块的最小长度 enum CACHE_ERR_CODE error_code; @@ -154,8 +156,9 @@ void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len); void easy_string_destroy(struct easy_string *estr); -void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx); +void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback=true); void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code); +const char *tango_cache_errcode2str(enum CACHE_ERR_CODE err_code); const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx); struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta); diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp index 198da85..e234011 100644 --- a/cache/src/tango_cache_redis.cpp +++ b/cache/src/tango_cache_redis.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include "tango_cache_transfer.h" #include "tango_cache_tools.h" @@ -39,7 +39,7 @@ struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]= {"content-md5", "Content-MD5: "} }; -void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) +static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) { struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); @@ -54,7 +54,7 @@ void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) instance->redis_connecting = CACHE_REDIS_DISCONNECTED; } -void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) +static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) { struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); @@ -85,7 +85,7 @@ int redis_asyn_connect_init(struct tango_cache_instance *instance) return 0; } -int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent) +static int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent) { cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires; int ret = PARSE_JSON_RET_ERROR; @@ -160,7 +160,7 @@ out_json: return ret; } -void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata) +static void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata) { redisReply *reply = (redisReply *)vreply; struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp index 87d870e..20dd2f6 100644 --- a/cache/src/tango_cache_transfer.cpp +++ b/cache/src/tango_cache_transfer.cpp @@ -127,6 +127,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); + curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //测试发现有某链接接收卡住的情况 curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L); @@ -200,7 +201,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) return 1; } -int cache_delete_minio_object(struct tango_cache_ctx *ctx) +int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back) { CURLMcode rc; char minio_url[256]; @@ -208,8 +209,9 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx) ctx->instance->statistic.del_recv_num += 1; if(NULL == (ctx->curl=curl_easy_init())) { - tango_cache_ctx_destroy(ctx); //终结者 - return 0; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); + tango_cache_ctx_destroy(ctx, call_back); //终结者 + return -1; } snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); @@ -330,9 +332,10 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block return true; } -int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx) +//callback直接失败是否调用回调函数,流式需要,完整一次性不需要 +static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback) { - int ret=0; + int ret=-1; ctx->put.state = PUT_STATE_END; ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf); @@ -342,33 +345,30 @@ int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx) if(ret <= 0) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - tango_cache_ctx_destroy(ctx); + tango_cache_ctx_destroy(ctx, callback); } } else { - tango_cache_ctx_destroy(ctx); + tango_cache_ctx_destroy(ctx, callback); } - return ret; } -int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) +void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) { - int ret = 0; - DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not"); ctx->put.close_state = true;//仅设置状态,并非真正关闭;内部状态机轮转结束后再关闭 if(ctx->fail_state) { tango_cache_ctx_destroy(ctx); - return 0; + return; } switch(ctx->put.state) { case PUT_STATE_START: - ret = http_put_complete_part_evbuf(ctx); + http_put_complete_part_evbuf(ctx, true); break; case PUT_STATE_PART: @@ -387,20 +387,16 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) tango_cache_ctx_destroy(ctx); } } - else + else if(http_put_bodypart_request_evbuf(ctx, false) <= 0) { - ret = http_put_bodypart_request_evbuf(ctx, false); - if(ret <= 0) + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); + if(cache_cancel_upload_minio(ctx)) { - tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(cache_cancel_upload_minio(ctx)) - { - ctx->put.state = PUT_STATE_CANCEL; - } - else - { - tango_cache_ctx_destroy(ctx); - } + ctx->put.state = PUT_STATE_CANCEL; + } + else + { + tango_cache_ctx_destroy(ctx); } } } @@ -410,8 +406,6 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) case PUT_STATE_WAIT_START: //此时未获取到uploadId,所以无法触发上传 default: break; } - - return ret; } void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) @@ -496,7 +490,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r } } -int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) +int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) { CURLMcode rc; char minio_url[256]; @@ -505,11 +499,9 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM ctx->instance->error_code = CACHE_OK; if(NULL == (ctx->curl=curl_easy_init())) { - tango_cache_ctx_destroy(ctx); - if(way == PUT_MEM_FREE) - { - free((void *)data); - } + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); + tango_cache_ctx_destroy(ctx, callback); + if(way == PUT_MEM_FREE) free((void *)data); return -1; } ctx->put.state = PUT_STATE_END; @@ -524,6 +516,7 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); + curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //测试发现有某链接接收卡住的情况 curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L); @@ -550,7 +543,7 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM return 0; } -int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf) +int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback) { size_t size; @@ -562,6 +555,8 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF { if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_ctx_destroy(ctx, callback); return -1; } } @@ -569,12 +564,14 @@ int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFF { if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_ctx_destroy(ctx, callback); return -1; } } ctx->instance->statistic.memory_used += size; - return http_put_complete_part_evbuf(ctx); + return http_put_complete_part_evbuf(ctx, callback); } void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) @@ -616,7 +613,7 @@ void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, lon tango_cache_ctx_destroy(ctx); } -int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx) +int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback) { CURLMcode rc; char minio_url[256]; @@ -625,7 +622,8 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx) ctx->instance->error_code = CACHE_OK; if(NULL == (ctx->curl=curl_easy_init())) { - tango_cache_ctx_destroy(ctx); + tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); + tango_cache_ctx_destroy(ctx, callback); return -1; } @@ -888,6 +886,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); + curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //测试发现有某链接接收卡住的情况 curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb); curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); //ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds" @@ -896,6 +895,6 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx) rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); - return 0; + return 1; } diff --git a/cache/src/tango_cache_transfer.h b/cache/src/tango_cache_transfer.h index 9c85eb5..c5db9d5 100644 --- a/cache/src/tango_cache_transfer.h +++ b/cache/src/tango_cache_transfer.h @@ -14,14 +14,14 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); -int cache_delete_minio_object(struct tango_cache_ctx *ctx); -int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx); +int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back=false); +int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback=false); -int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); +void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len); -int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); -int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf); +int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool call_back=false); +int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool call_back=false); int tango_cache_fetch_start(struct tango_cache_ctx *ctx); diff --git a/cache/test/lib/libhiredis.a b/cache/test/lib/libhiredis.a index 26434df..670ea58 100644 Binary files a/cache/test/lib/libhiredis.a and b/cache/test/lib/libhiredis.a differ diff --git a/cache/test/tango_cache_test.c b/cache/test/tango_cache_test.c index c24ac41..560673c 100644 --- a/cache/test/tango_cache_test.c +++ b/cache/test/tango_cache_test.c @@ -76,6 +76,10 @@ void get_future_success(future_result_t* result, void * user) void get_future_failed(enum e_future_error err, const char * what, void * user) { + struct future_pdata *pdata = (struct future_pdata *)user; + future_destroy(pdata->future); + fclose(pdata->fp); + free(pdata); printf("GET fail: %s\n", what); } @@ -110,6 +114,10 @@ void head_future_success(future_result_t* result, void * user) void head_future_failed(enum e_future_error err, const char * what, void * user) { + struct future_pdata *pdata = (struct future_pdata *)user; + + future_destroy(pdata->future); + free(pdata); printf("HEAD fail: %s\n", what); } @@ -204,8 +212,6 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) struct future_pdata *pdata; struct tango_cache_ctx *ctx; - - pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); do { @@ -230,23 +236,30 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) if(!strcasecmp(p, "GET")) { sprintf(filename, "file_index_%u.bin", index++); + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->fp = fopen(filename, "w"); pdata->future = future_create(get_future_success, get_future_failed, pdata); - tango_cache_fetch_object(tango_instance, pdata->future, &getmeta); + if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta) < 0) + { + get_future_failed(FUTURE_ERROR_CANCEL, "", pdata); + } } else if(!strcasecmp(p, "HEAD")) { - sprintf(filename, "file_index_%u.bin", index++); - pdata->fp = fopen(filename, "w"); + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create(head_future_success, head_future_failed, pdata); - tango_cache_head_object(tango_instance, pdata->future, &getmeta); + if(tango_cache_head_object(tango_instance, pdata->future, &getmeta) < 0) + { + head_future_failed(FUTURE_ERROR_CANCEL, "", pdata); + } } else if(!strcasecmp(p, "PUTONCE")) { size_t filelen; p = get_file_content(s, &filelen); + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create(put_future_success, put_future_failed, pdata); tango_cache_upload_once_data(tango_instance, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256); @@ -254,6 +267,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) else if(!strcasecmp(p, "PUTONCEEV")) { size_t readlen; + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create(put_future_success, put_future_failed, pdata); struct evbuffer *evbuf = evbuffer_new(); char buffer[1024]; @@ -273,12 +287,14 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) } else if(!strcasecmp(p, "DEL")) { + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create(del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", s); tango_cache_delete_object(tango_instance, pdata->future, s); } else if(!strcasecmp(p, "DELMUL")) //TODO { + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create(del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", s); @@ -290,9 +306,15 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) } else { + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->future = future_create(put_future_success, put_future_failed, pdata); ctx = tango_cache_update_start(tango_instance, pdata->future, &putmeta); + if(ctx==NULL) + { + put_future_failed(FUTURE_ERROR_CANCEL, "NULL", pdata); + continue; + } tango_cache_get_object_path(ctx, pdata->filename, 256); FILE *fp = fopen(s, "r"); @@ -302,6 +324,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) assert(n>=0); tango_cache_update_frag_data(ctx, buffer, n); } + fclose(fp); tango_cache_update_end(ctx); } } diff --git a/vendor/hiredis-0.14.0-modified.zip b/vendor/hiredis-0.14.0.zip similarity index 75% rename from vendor/hiredis-0.14.0-modified.zip rename to vendor/hiredis-0.14.0.zip index 509f420..06b0892 100644 Binary files a/vendor/hiredis-0.14.0-modified.zip and b/vendor/hiredis-0.14.0.zip differ