diff --git a/cache/cache_evbase_client.cpp b/cache/cache_evbase_client.cpp index f880a4b..40de5c2 100644 --- a/cache/cache_evbase_client.cpp +++ b/cache/cache_evbase_client.cpp @@ -115,13 +115,13 @@ static int32_t iothread_notify_event(int32_t socket_fd, void *content, int32_t l } if(res <= 0) { - printf("log_error: select io res=%d, error: %s", res, strerror(errno)); + printf("log_error: select io res=%d, error: %s\n", res, strerror(errno)); return -1; } if(FD_ISSET(socket_fd, &e_set)) { - printf("log_error: select io is in efds, error: %s", strerror(errno)); + printf("log_error: select io is in efds, error: %s\n", strerror(errno)); return -2; } @@ -250,15 +250,11 @@ void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn) buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_END; - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { if(!ctx_asyn->ctx->fail_state) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) - { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); - } + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); } tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); @@ -289,12 +285,11 @@ int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_ME buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA; - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) + if(!ctx_asyn->ctx->fail_state) { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); } free(buffer->data); free(buffer); @@ -317,12 +312,11 @@ int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evb buffer->evbuf = evbuffer_new(); evbuffer_add_buffer(buffer->evbuf, evbuf); - if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; - if(ctx_asyn->ctx->future != NULL) + if(!ctx_asyn->ctx->fail_state) { - promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); } evbuffer_free(buffer->evbuf); free(buffer); @@ -352,9 +346,9 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance buffer->cmd_type = CACHE_ASYN_UPLOAD_START; //事件通知仅为了增加统计信息 - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); @@ -398,11 +392,11 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA; - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { free(buffer->data); free(buffer); - ctx_asyn->ctx->fail_state = true; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx); cache_asyn_ctx_destroy(ctx_asyn); return -2; @@ -437,11 +431,11 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc buffer->evbuf = evbuffer_new(); evbuffer_add_buffer(buffer->evbuf, evbuf); - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { evbuffer_free(buffer->evbuf); free(buffer); - ctx_asyn->ctx->fail_state = true; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx); cache_asyn_ctx_destroy(ctx_asyn); return -2; @@ -467,8 +461,10 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_FETCH; - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); + promise_failed(future_to_promise(future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx)); tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); @@ -496,9 +492,13 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu buffer->cmd_type = CACHE_ASYN_DELETE; //参考Unix高级编程432页关于多线程写的安全性描述 - if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { - ctx_asyn->ctx->fail_state = true; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); + if(future != NULL) + { + promise_failed(future_to_promise(future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx_asyn->ctx)); + } tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index 90e0f28..268cdf5 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -17,6 +17,8 @@ enum CACHE_ERR_CODE CACHE_OUTOF_MEMORY,//当前内存占用超过限制,查看MAX_USED_MEMORY_SIZE_MB是否过小或者当前上传速率跟不上调用者的速率 CACHE_ERR_CURL, CACHE_ERR_WIREDLB, + CACHE_ERR_SOCKPAIR, + CACHE_ERR_INTERNAL, }; enum PUT_MEMORY_COPY_WAY diff --git a/cache/pangu_tango_cache.a b/cache/pangu_tango_cache.a index 0094000..20e2fea 100644 Binary files a/cache/pangu_tango_cache.a and b/cache/pangu_tango_cache.a differ diff --git a/cache/tango_cache_client.cpp b/cache/tango_cache_client.cpp index f25177e..2938552 100644 --- a/cache/tango_cache_client.cpp +++ b/cache/tango_cache_client.cpp @@ -54,6 +54,26 @@ enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *ins return instance->error_code; } +void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code) +{ + ctx->fail_state = true; + ctx->error_code = error_code; +} + +const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx) +{ + switch(ctx->error_code) + { + case CACHE_CACHE_MISS: return "cache not hit"; + case CACHE_TIMEOUT: return "cache not fresh"; + case CACHE_OUTOF_MEMORY:return "outof memory"; + case CACHE_ERR_WIREDLB: return "wiredlb error"; + case CACHE_ERR_SOCKPAIR:return "socketpair error"; + case CACHE_ERR_INTERNAL:return "cache Expires or x-amz-meta-lm not found"; + default: return ctx->error; + } +} + void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out) { out->get_recv_num = instance->statistic.get_recv_num; @@ -182,7 +202,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) { if(ctx->fail_state) { - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } else { @@ -313,7 +333,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); } - ctx->headers = curl_slist_append(ctx->headers, "Expect:"); + //ctx->headers = curl_slist_append(ctx->headers, "Expect:"); //不可以添加?curl_multi_socket_action会卡住 //其他定义的头部,GET时会原样返回 if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) { diff --git a/cache/tango_cache_client_in.h b/cache/tango_cache_client_in.h index 2c6295d..3c2ab00 100644 --- a/cache/tango_cache_client_in.h +++ b/cache/tango_cache_client_in.h @@ -130,6 +130,8 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len void easy_string_destroy(struct easy_string *estr); void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx); +void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code); +const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx); struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); diff --git a/cache/tango_cache_transfer.cpp b/cache/tango_cache_transfer.cpp index fd76937..8945abb 100644 --- a/cache/tango_cache_transfer.cpp +++ b/cache/tango_cache_transfer.cpp @@ -157,8 +157,7 @@ static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); if(code != CURLE_OK || ctx->res_code!=200L) { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); if(code != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); return size*count; } @@ -182,6 +181,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L); curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_write_uploadID_cb); @@ -321,8 +321,7 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block if(ret <= 0) { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); return false; } return true; @@ -339,8 +338,7 @@ int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx) ret = http_put_bodypart_request_evbuf(ctx, true); if(ret <= 0) { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); tango_cache_ctx_destroy(ctx); } } @@ -381,8 +379,7 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) } else { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); tango_cache_ctx_destroy(ctx); } } @@ -391,8 +388,7 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) ret = http_put_bodypart_request_evbuf(ctx, false); if(ret <= 0) { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); if(cache_cancel_upload_minio(ctx)) { ctx->put.state = PUT_STATE_CANCEL; @@ -422,8 +418,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID)) { easy_string_destroy(&ctx->response); - ctx->error_code = CACHE_ERR_CURL; - ctx->fail_state = true; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); if(ctx->put.close_state) { @@ -452,8 +447,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r case PUT_STATE_PART: if(res != CURLE_OK || res_code!=200L) { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); } if(ctx->fail_state) @@ -491,8 +485,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r case PUT_STATE_END: if(res != CURLE_OK || res_code!=200L) { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); } tango_cache_ctx_destroy(ctx); @@ -600,8 +593,8 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r { if(res!=CURLE_OK || res_code!=200L) { - ctx->error_code = (res!=CURLE_OK)?CACHE_ERR_CURL:CACHE_CACHE_MISS; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error); + tango_cache_set_fail_state(ctx, (res!=CURLE_OK)?CACHE_ERR_CURL:CACHE_CACHE_MISS); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } else { @@ -641,10 +634,9 @@ static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, vo if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //无Expires时 { - ctx->fail_state = true; - ctx->error_code = CACHE_ERR_CURL; + tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL); ctx->get.state = GET_STATE_DELETE; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache Expires or x-amz-meta-lm not found"); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); return size*count; } @@ -680,11 +672,10 @@ static bool check_expires_header(struct tango_cache_ctx *ctx, const char *expire if(time_gmt > ctx->get.expires) //缓存失效;TODO relative_age的含义是啥 { - ctx->fail_state = true; - ctx->error_code = CACHE_TIMEOUT; + tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); ctx->get.state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作 easy_string_destroy(&ctx->response); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh"); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); return false; } return true; @@ -700,10 +691,9 @@ static bool check_fresh_header(struct tango_cache_ctx *ctx) now_gmt = get_gmtime_timestamp(time(NULL)); if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires) { - ctx->fail_state = true; - ctx->error_code = CACHE_TIMEOUT; + tango_cache_set_fail_state(ctx, CACHE_TIMEOUT); easy_string_destroy(&ctx->response); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh"); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); return false; } return true; @@ -716,9 +706,8 @@ static bool check_get_result_code(struct tango_cache_ctx *ctx) code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); if(code != CURLE_OK || ctx->res_code!=200L) { - ctx->fail_state = true; - ctx->error_code = (code!=CURLE_OK)?CACHE_ERR_CURL:CACHE_CACHE_MISS; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, (code!=CURLE_OK)?ctx->error:"cache not hit"); + tango_cache_set_fail_state(ctx, (code!=CURLE_OK)?CACHE_ERR_CURL:CACHE_CACHE_MISS); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); return false; } return true;