diff --git a/cache/cache_evbase_client.cpp b/cache/cache_evbase_client.cpp index f708d55..2e00a7e 100644 --- a/cache/cache_evbase_client.cpp +++ b/cache/cache_evbase_client.cpp @@ -21,8 +21,10 @@ enum CACHE_ASYN_CMD { CACHE_ASYN_FETCH=0, - CACHE_ASYN_UPLOAD_ONCE, - CACHE_ASYN_UPLOAD_FRAG, + CACHE_ASYN_UPLOAD_ONCE_DATA, + CACHE_ASYN_UPLOAD_ONCE_EVBUF, + CACHE_ASYN_UPLOAD_FRAG_DATA, + CACHE_ASYN_UPLOAD_FRAG_EVBUF, CACHE_ASYN_UPLOAD_END, }; @@ -30,6 +32,7 @@ struct databuffer { char *data; size_t size; + struct evbuffer *evbuf; enum CACHE_ASYN_CMD cmd_type; struct cache_evbase_ctx *ctx_asyn; }; @@ -149,15 +152,25 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) cache_asyn_ctx_destroy(ctx_asyn); break; - case CACHE_ASYN_UPLOAD_ONCE: - tango_cache_upload_once_start(ctx_asyn->ctx, buffer->data, buffer->size); + case CACHE_ASYN_UPLOAD_ONCE_DATA: + tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size); + cache_asyn_ctx_destroy(ctx_asyn); + break; + case CACHE_ASYN_UPLOAD_ONCE_EVBUF: + tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); + evbuffer_free(buffer->evbuf); cache_asyn_ctx_destroy(ctx_asyn); break; - case CACHE_ASYN_UPLOAD_FRAG: - tango_cache_update_frag(ctx_asyn->ctx, buffer->data, buffer->size); + case CACHE_ASYN_UPLOAD_FRAG_DATA: + tango_cache_update_frag_data(ctx_asyn->ctx, buffer->data, buffer->size); free(buffer->data); break; + + case CACHE_ASYN_UPLOAD_FRAG_EVBUF: + tango_cache_update_frag_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); + evbuffer_free(buffer->evbuf); + break; case CACHE_ASYN_UPLOAD_END: tango_cache_update_end(ctx_asyn->ctx); @@ -240,7 +253,7 @@ void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn) } } -int cache_evbase_update_frag(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) +int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) { struct databuffer *buffer; @@ -250,7 +263,7 @@ int cache_evbase_update_frag(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_ } buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - if(way == PUT_ONCE_COPY) + if(way == PUT_MEM_COPY) { buffer->data = (char *)malloc(size); memcpy(buffer->data, data, size); @@ -261,7 +274,7 @@ int cache_evbase_update_frag(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_ } buffer->size = size; buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG; + buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA; if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { @@ -278,6 +291,34 @@ int cache_evbase_update_frag(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_ return 0; } +int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf) +{ + struct databuffer *buffer; + + if(ctx_asyn->ctx->fail_state) + { + return -1; + } + buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); + buffer->ctx_asyn = ctx_asyn; + buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF; + buffer->evbuf = evbuffer_new(); + evbuffer_add_buffer(buffer->evbuf, evbuf); + + if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + { + ctx_asyn->ctx->fail_state = true; + if(ctx_asyn->ctx->future != NULL) + { + promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); + } + evbuffer_free(buffer->evbuf); + free(buffer); + return -1; + } + return 0; +} + struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct cache_evbase_ctx *ctx_asyn; @@ -296,7 +337,8 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance return ctx_asyn; } -int cache_evbase_upload_once(struct cache_evbase_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *filename, size_t namelen) +int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* future, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *objectkey, size_t keysize) { struct cache_evbase_ctx *ctx_asyn; struct tango_cache_ctx *ctx; @@ -307,10 +349,9 @@ int cache_evbase_upload_once(struct cache_evbase_instance *instance, struct futu { return -1; } - ctx->way = PUT_ONCE_FREE; - if(filename != NULL) + if(objectkey != NULL) { - snprintf(filename, namelen, "%s", ctx->file_key); + snprintf(objectkey, keysize, "%s", ctx->file_key); } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); @@ -318,7 +359,7 @@ int cache_evbase_upload_once(struct cache_evbase_instance *instance, struct futu ctx_asyn->ctx = ctx; buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); - if(way == PUT_ONCE_COPY) + if(way == PUT_MEM_COPY) { buffer->data = (char *)malloc(size); memcpy(buffer->data, data, size); @@ -329,7 +370,7 @@ int cache_evbase_upload_once(struct cache_evbase_instance *instance, struct futu } buffer->size = size; buffer->ctx_asyn = ctx_asyn; - buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE; + buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA; if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { @@ -347,6 +388,49 @@ int cache_evbase_upload_once(struct cache_evbase_instance *instance, struct futu return 0; } +int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* future, + struct evbuffer *evbuf, struct tango_cache_meta *meta, char *objectkey, size_t keysize) +{ + struct cache_evbase_ctx *ctx_asyn; + struct tango_cache_ctx *ctx; + struct databuffer *buffer; + + ctx = tango_cache_update_prepare(instance->instance, future, meta); + if(ctx == NULL) + { + return -1; + } + if(objectkey != NULL) + { + snprintf(objectkey, keysize, "%s", ctx->file_key); + } + + ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); + ctx_asyn->instance_asyn = instance; + ctx_asyn->ctx = ctx; + + buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); + buffer->ctx_asyn = ctx_asyn; + buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_EVBUF; + buffer->evbuf = evbuffer_new(); + evbuffer_add_buffer(buffer->evbuf, evbuf); + + if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) + { + ctx_asyn->ctx->fail_state = true; + if(ctx_asyn->ctx->future != NULL) + { + promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); + } + evbuffer_free(buffer->evbuf); + free(buffer); + tango_cache_ctx_destroy(ctx); + cache_asyn_ctx_destroy(ctx_asyn); + return -1; + } + return 0; +} + int cache_evbase_fetch(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct cache_evbase_ctx *ctx_asyn; diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index 0d2fabf..692f508 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -27,11 +27,19 @@ void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, s struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog); struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta); -int cache_evbase_update_frag(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); +int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); +int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf); void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn); const char *cache_evbase_get_object_key(struct cache_evbase_ctx *ctx_asyn); -int cache_evbase_upload_once(struct cache_evbase_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *filename, size_t namelen); +int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* future, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, + struct tango_cache_meta *meta, + char *objectkey, size_t keysize); +int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* future, + struct evbuffer *evbuf, + struct tango_cache_meta *meta, + char *objectkey, size_t keysize); int cache_evbase_fetch(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta); diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index 17ab127..5fae905 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -18,8 +18,13 @@ enum CACHE_ERR_CODE enum PUT_MEMORY_COPY_WAY { - PUT_ONCE_COPY=0, //拷贝这块内存 - PUT_ONCE_FREE, //不拷贝内存,发送完毕由本缓存模块释放该内存 + PUT_MEM_COPY=0, //拷贝这块内存 + PUT_MEM_FREE, //不拷贝内存,发送完毕由本缓存模块释放该内存 +}; +enum EVBUFFER_COPY_WAY +{ + EVBUFFER_MOVE=0,//evbuffer_add_buffer + EVBUFFER_COPY, //evbuffer_add_buffer_reference }; struct cache_statistics @@ -58,15 +63,25 @@ enum CACHE_HTTP_HDR_TYPE HDR_CONTENT_NUM, }; +struct put_time_strategy +{ + time_t relative_ttl; //PUT: 缓存最大生存时间;0表示使用配置文件的默认值 + time_t absulote_lastmod;//PUT: +}; +struct get_time_strategy +{ + time_t max_age; //GET: + time_t min_fresh;//GET: +}; struct tango_cache_meta { const char* url; const char* std_hdr[HDR_CONTENT_NUM]; //完整头部,如包含"Content-Type:",不要包含换行 const char* other_hdr; //最大长度不能超过1535字节,GET时会原样返回 - //GET: 要求 - //PUT: 缓存最大生存时间;0表示使用配置文件的默认值 - time_t relative_ttl; - time_t max_age; + union{ + struct put_time_strategy put;//TODO + struct get_time_strategy get; + }; }; struct tango_cache_instance; @@ -88,13 +103,20 @@ int tango_cache_fetch(struct tango_cache_instance *instance, struct future* futu /*UPLOAD接口的API*/ //完整一次上传;若filename不为空,则输出对象的KEY,当CACHE_OBJECT_KEY_HASH_SWITCH=1开启对文件名哈希时有用 //返回0表示成功,<0表示失败;下同 -int tango_cache_upload_once(struct tango_cache_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *filename, size_t namelen); - +int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* future, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, + struct tango_cache_meta *meta, + char *objectkey, size_t keysize); +int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* future, + enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, + struct tango_cache_meta *meta, + char *objectkey, size_t keysize); //流式上传 //若tango_cache_update_start返回NULL,调用tango_cache_ctx_error查看错误码是否是CACHE_OUTOF_MEMORY(正常情况下是) //若future不为NULL,则在上传结束时会调用通知回调函数,否则不调用 struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); -int tango_cache_update_frag(struct tango_cache_ctx *ctx, const char *data, size_t size); +int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size); +int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf); void tango_cache_update_end(struct tango_cache_ctx *ctx); //获取对象key值;当CACHE_OBJECT_KEY_HASH_SWITCH=1开启对文件名哈希时有用 diff --git a/cache/tango_cache_client.cpp b/cache/tango_cache_client.cpp index 1a27e90..862414b 100644 --- a/cache/tango_cache_client.cpp +++ b/cache/tango_cache_client.cpp @@ -85,7 +85,7 @@ static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail } } -void response_buffer_destroy(struct easy_string *estr) +void easy_string_destroy(struct easy_string *estr) { if(estr->buff != NULL) { @@ -95,61 +95,17 @@ void response_buffer_destroy(struct easy_string *estr) } } -void buffer_cache_destroy(struct cache_buffer *cache, struct tango_cache_instance *instance) +void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) { - if(cache->buf) + if(estr->size-estr->len < len+1) { - free(cache->buf); + estr->size += len*4+1; + estr->buff = (char*)realloc(estr->buff,estr->size); } - instance->statistic.memory_used -= instance->block_len; - free(cache); -} - -static inline struct cache_buffer *buffer_cache_new(struct tango_cache_instance *instance) -{ - struct cache_buffer *cache = (struct cache_buffer *)malloc(sizeof(struct cache_buffer)); - cache->buf = (char *)malloc(instance->block_len); - cache->len = 0; - cache->off = 0; - instance->statistic.memory_used += instance->block_len; - return cache; -} - -void buffer_cache_list_destroy(struct buffer_cache_list *list, struct tango_cache_ctx *ctx) -{ - struct cache_buffer *pnode; - - TAILQ_FOREACH(pnode, &list->cache_list, node) - { - TAILQ_REMOVE(&list->cache_list, pnode, node); - buffer_cache_destroy(pnode, ctx->instance); - } - if(list->cache_cur) - { - buffer_cache_destroy(list->cache_cur, ctx->instance); - } - if(list->curl) - { - curl_multi_remove_handle(ctx->instance->multi_hd, list->curl); - curl_easy_cleanup(list->curl); - } - if(list->etag) - { - free(list->etag); - } - free(list); -} - -static struct buffer_cache_list *buffer_cache_list_new(struct tango_cache_ctx *ctx) -{ - struct buffer_cache_list *list = (struct buffer_cache_list *)calloc(1, sizeof(struct buffer_cache_list)); - - list->part_number = ++ctx->part_index; - list->cache_cur = buffer_cache_new(ctx->instance); - list->ctx = ctx; - TAILQ_INIT(&list->cache_list); - return list; + memcpy(estr->buff+estr->len, data, len); + estr->len += len; + estr->buff[estr->len]='\0'; } void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) @@ -159,23 +115,24 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); curl_easy_cleanup(ctx->curl); } - response_buffer_destroy(&ctx->response); + easy_string_destroy(&ctx->response); if(ctx->method == CACHE_REQUEST_PUT) { - struct buffer_cache_list *list; - + struct multipart_etag_list *etag; + if(ctx->uploadID != NULL) free(ctx->uploadID); if(ctx->combine_xml != NULL) free(ctx->combine_xml); if(ctx->headers != NULL) curl_slist_free_all(ctx->headers); - if(ctx->headers_puts != NULL) curl_slist_free_all(ctx->headers_puts); - if(ctx->list_cur != NULL) buffer_cache_list_destroy(ctx->list_cur, ctx); + if(ctx->evbuffer!=NULL) evbuffer_free(ctx->evbuffer); - TAILQ_FOREACH(list, &ctx->cache_head, node) + TAILQ_FOREACH(etag, &ctx->cache_head, node) { - TAILQ_REMOVE(&ctx->cache_head, list, node); - buffer_cache_list_destroy(list, ctx); + TAILQ_REMOVE(&ctx->cache_head, etag, node); + free(etag->etag); + free(etag); } + if(ctx->future != NULL) { if(ctx->fail_state) @@ -197,66 +154,52 @@ void tango_cache_update_end(struct tango_cache_ctx *ctx) cache_kick_upload_minio_end(ctx); } -//开辟新buffer拷贝内存数据 -void tango_cache_kick_new_buffer(struct tango_cache_ctx *ctx, const char *data, size_t size) +int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size) { - struct buffer_cache_list *list = ctx->list_cur; - - if(list->length >= ctx->instance->upload_block_size) - { - TAILQ_INSERT_TAIL(&ctx->cache_head, list, node); - cache_kick_upload_minio_multipart(ctx, list); - list = buffer_cache_list_new(ctx); - ctx->list_cur = list; - } - else - { - list->cache_cur = buffer_cache_new(ctx->instance); - } - memcpy(list->cache_cur->buf, data, size); - list->cache_cur->len = size; -} - -int tango_cache_update_frag(struct tango_cache_ctx *ctx, const char *data, size_t size) -{ - struct buffer_cache_list *list_cur = ctx->list_cur; - struct cache_buffer *cache_cur = list_cur->cache_cur; - u_int32_t copy_len; - if(ctx->fail_state) { return -1; } - - //ADD data to Cache Buffer - if(cache_cur->len + size > ctx->instance->block_len) + if(evbuffer_add(ctx->evbuffer, data, size)) { - copy_len = ctx->instance->block_len - cache_cur->len; - memcpy(cache_cur->buf+cache_cur->len, data, copy_len); - cache_cur->len = ctx->instance->block_len; - TAILQ_INSERT_TAIL(&list_cur->cache_list, cache_cur, node); - list_cur->length += ctx->instance->block_len; - list_cur->cache_cur = NULL; - size -= copy_len; - data += copy_len; + return -1; + } + ctx->instance->statistic.memory_used += size; + if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size) + { + cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); + } + return 0; +} - while(size >= ctx->instance->block_len) +int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf) +{ + size_t size; + + if(ctx->fail_state) + { + return -1; + } + + size = evbuffer_get_length(evbuf); + if(way == EVBUFFER_MOVE) + { + if(evbuffer_add_buffer(ctx->evbuffer, evbuf)) { - tango_cache_kick_new_buffer(ctx, data, ctx->instance->block_len); //拷贝完整buffer块 - list_cur = ctx->list_cur; - TAILQ_INSERT_TAIL(&list_cur->cache_list, list_cur->cache_cur, node); - list_cur->length += ctx->instance->block_len; - list_cur->cache_cur = NULL; - - size -= ctx->instance->block_len; - data += ctx->instance->block_len; + return -1; } - tango_cache_kick_new_buffer(ctx, data, size); } else { - memcpy(cache_cur->buf+cache_cur->len, data, size); - cache_cur->len += size; + if(evbuffer_add_buffer_reference(ctx->evbuffer, evbuf)) + { + return -1; + } + } + ctx->instance->statistic.memory_used += size; + if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size) + { + cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); } return 0; } @@ -266,6 +209,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * struct tango_cache_ctx *ctx; char buffer[256]={0}; int other_len; + time_t expires, now; if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size) { @@ -291,17 +235,25 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * } //Expires字段,用于缓存内部判定缓存是否超时 - ctx->relative_age = (meta->relative_ttl==0||meta->relative_ttl>instance->relative_ttl)?instance->relative_ttl:meta->relative_ttl; - if(expires_timestamp2hdr_str(time(NULL)+ctx->relative_age, buffer, 256)) + now = time(NULL); + expires = (meta->put.relative_ttl==0||meta->put.relative_ttl>instance->relative_ttl)?instance->relative_ttl:meta->put.relative_ttl; + if(expires_timestamp2hdr_str(now + expires, buffer, 256)) { - ctx->headers_puts = curl_slist_append(ctx->headers_puts, buffer); + ctx->headers = curl_slist_append(ctx->headers, buffer); } + //Last-Modify字段,用于GET时判定是否新鲜 + if(meta->put.absulote_lastmod == 0) + { + meta->put.absulote_lastmod = get_gmtime_timestamp(now); + } + sprintf(buffer, "x-amz-meta-lm: %lu", meta->put.absulote_lastmod); + ctx->headers = curl_slist_append(ctx->headers, buffer); //列表中支持的标准头部 for(int i=0; istd_hdr[i] != NULL) { - ctx->headers_puts = curl_slist_append(ctx->headers_puts, meta->std_hdr[i]); + ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]); } } //其他定义的头部,GET时会原样返回 @@ -310,7 +262,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * char *p = (char *)malloc((other_len/3 + 1)*4 + 18); //计算编码后所需空间;18=17+1: 头部+字符串结束符 memcpy(p, "x-amz-meta-user: ", 17); Base64_EncodeBlock((unsigned char*)meta->other_hdr, other_len, (unsigned char*)p+17); - ctx->headers_puts = curl_slist_append(ctx->headers_puts, p); + ctx->headers = curl_slist_append(ctx->headers, p); free(p); } @@ -327,12 +279,13 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in return NULL; } - ctx->list_cur = buffer_cache_list_new(ctx); + ctx->evbuffer = evbuffer_new(); TAILQ_INIT(&ctx->cache_head); return ctx; } -int tango_cache_upload_once(struct tango_cache_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *filename, size_t namelen) +int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* future, + enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *objectkey, size_t keysize) { struct tango_cache_ctx *ctx; @@ -341,13 +294,30 @@ int tango_cache_upload_once(struct tango_cache_instance *instance, struct future { return -1; } - ctx->way = way; - if(filename != NULL) + if(objectkey != NULL) { - snprintf(filename, namelen, "%s", ctx->file_key); + snprintf(objectkey, keysize, "%s", ctx->file_key); } - return tango_cache_upload_once_start(ctx, data, size); + return tango_cache_upload_once_start_data(ctx, way, data, size); +} + +int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* future, + enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta *meta, char *objectkey, size_t keysize) +{ + struct tango_cache_ctx *ctx; + + ctx = tango_cache_update_prepare(instance, future, meta); + if(ctx == NULL) + { + return -1; + } + if(objectkey != NULL) + { + snprintf(objectkey, keysize, "%s", ctx->file_key); + } + + return tango_cache_upload_once_start_evbuf(ctx, way, evbuf); } struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) @@ -360,7 +330,8 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i ctx->future = future; ctx->method = CACHE_REQUEST_GET; ctx->get_state = GET_STATE_START; - ctx->expire_comes = false; + ctx->max_age = meta->get.max_age; + ctx->min_fresh = meta->get.min_fresh; instance->statistic.get_recv_num += 1; if(instance->hash_object_key) @@ -401,6 +372,10 @@ static void check_multi_info(CURLM *multi) res = msg->data.result; curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code); + curl_multi_remove_handle(multi, easy); + curl_easy_cleanup(easy); + ctx->curl = NULL; + ctx->res_code = 0; if(ctx->method == CACHE_REQUEST_GET) { diff --git a/cache/tango_cache_client_in.h b/cache/tango_cache_client_in.h index 75569ad..e8434a1 100644 --- a/cache/tango_cache_client_in.h +++ b/cache/tango_cache_client_in.h @@ -9,6 +9,10 @@ #include "tango_cache_client.h" +#define RESPONSE_HDR_EXPIRES 1 +#define RESPONSE_HDR_LAST_MOD 2 +#define RESPONSE_HDR_ALL 3 + enum CACHE_REQUEST_METHOD { CACHE_REQUEST_GET=0, @@ -26,7 +30,6 @@ enum PUT_OBJECT_STATE { PUT_STATE_START=0, PUT_STATE_WAIT_START, - PUT_STATE_START_DONE, PUT_STATE_PART, PUT_STATE_END, PUT_STATE_CANCEL, @@ -39,14 +42,6 @@ struct easy_string size_t size; }; -struct cache_buffer -{ - char *buf; - size_t len; - size_t off; - TAILQ_ENTRY(cache_buffer) node; -}; - struct tango_cache_instance { char minio_hostlist[4096]; @@ -66,64 +61,61 @@ struct tango_cache_instance u_int32_t hash_object_key; }; -struct buffer_cache_list; +struct multipart_etag_list +{ + char *etag; + u_int32_t part_number; + TAILQ_ENTRY(multipart_etag_list) node; +}; + struct tango_cache_ctx { CURL *curl; struct curl_slist *headers; - struct curl_slist *headers_puts; - struct future* future; char error[CURL_ERROR_SIZE]; char file_key[72]; char hostport[24]; //相同ctx使用相同的IP,保证pipeline顺序性 - u_int32_t host_index; - u_int32_t part_runing_num; - u_int32_t part_index; + bool fail_state; + enum CACHE_REQUEST_METHOD method; enum CACHE_ERR_CODE error_code; + + struct evbuffer *evbuffer; union{ enum PUT_OBJECT_STATE put_state; enum GET_OBJECT_STATE get_state; }; - enum PUT_MEMORY_COPY_WAY way; //PUT ONCE时内存拷贝还是直接利用 - bool fail_state; + u_int32_t part_index; + u_int32_t need_hdrs; //宏RESPONSE_HDR_ bool close_state; //主动被调用关闭 - bool expire_comes; long res_code; - time_t relative_age;//Get时允许的最远缓存时间 + time_t max_age;//Get + time_t min_fresh;//Get + time_t expires; + time_t last_modify; + size_t upload_length; + size_t upload_offset; char *uploadID; char *combine_xml; struct easy_string response; - TAILQ_HEAD(__cache_list_head, buffer_cache_list) cache_head; - struct buffer_cache_list *list_cur; //时刻分配空间,只有最后无数据时方可释放,用于判定上传结束 + TAILQ_HEAD(__etag_list_head, multipart_etag_list) cache_head; struct tango_cache_instance *instance; }; -struct buffer_cache_list -{ - TAILQ_HEAD(__buffer_cache_node, cache_buffer) cache_list; - struct cache_buffer *cache_cur; - CURL *curl; - char *etag; - u_int32_t part_number; - u_int32_t length; - struct tango_cache_ctx *ctx; - TAILQ_ENTRY(buffer_cache_list) node; -}; - struct curl_socket_data { struct event sock_event; }; -void response_buffer_destroy(struct easy_string *estr); -void buffer_cache_destroy(struct cache_buffer *cache, struct tango_cache_instance *instance); -void buffer_cache_list_destroy(struct buffer_cache_list *list, struct tango_cache_ctx *ctx); +void easy_string_savedata(struct easy_string *estr, const char *data, size_t len); +void easy_string_destroy(struct easy_string *estr); + void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx); + struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); diff --git a/cache/tango_cache_transfer.cpp b/cache/tango_cache_transfer.cpp index 3d7673e..74799b7 100644 --- a/cache/tango_cache_transfer.cpp +++ b/cache/tango_cache_transfer.cpp @@ -21,21 +21,25 @@ size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp) static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp) { - struct buffer_cache_list *list = (struct buffer_cache_list *)userp; + struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; size_t totallen = size*count; char *start = (char *)ptr, *end = start + totallen; + struct multipart_etag_list *etag; - if(list->etag == NULL && !strncmp(start, "Etag:", totallen>5?5:totallen)) + if(!strncmp(start, "Etag:", totallen>5?5:totallen)) { start += 5; end -= 1; totallen -= 5; while(totallen>0 && (*start==' ')) {start++; totallen--;} while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;} if(totallen > 0) { + etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list)); totallen = end - start + 1; - list->etag = (char *)malloc(totallen + 1); - memcpy(list->etag, start, totallen); - *(list->etag + totallen) = '\0'; + etag->etag = (char *)malloc(totallen + 1); + etag->part_number = ctx->part_index; + memcpy(etag->etag, start, totallen); + *(etag->etag + totallen) = '\0'; + TAILQ_INSERT_TAIL(&ctx->cache_head, etag, node); } } @@ -64,99 +68,75 @@ static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void * if(ctx->response.len >= ctx->response.size) { ctx->instance->statistic.memory_used -= ctx->response.size; //未使用cache buffer,自己计算内存增减 - if(ctx->way == PUT_ONCE_COPY) - { - response_buffer_destroy(&ctx->response); - } - else - { - ctx->response.buff = NULL; - ctx->response.len = ctx->response.size = 0; - } + easy_string_destroy(&ctx->response); } return len; } static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp) { - size_t len=0, needlen=size * count, remainlen; - struct buffer_cache_list *list = (struct buffer_cache_list *)userp; - struct cache_buffer *next_cache; + size_t len, space=size*count, send_len; + struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - if(size==0 || count==0 || list->cache_cur==NULL) + if(size==0 || count==0 || ctx->upload_offset>=ctx->upload_length) { return 0; } - while(lencache_cur!=NULL) + len = ctx->upload_length - ctx->upload_offset; + if(len > space) { - remainlen = list->cache_cur->len - list->cache_cur->off; - if(needlen-len >= remainlen) - { - memcpy((char*)ptr+len, list->cache_cur->buf+list->cache_cur->off, remainlen); - len += remainlen; - - next_cache = TAILQ_NEXT(list->cache_cur, node); - TAILQ_REMOVE(&list->cache_list, list->cache_cur, node); - buffer_cache_destroy(list->cache_cur, list->ctx->instance); - list->cache_cur = next_cache; - } - else - { - memcpy((char*)ptr+len, list->cache_cur->buf+list->cache_cur->off, needlen-len); - list->cache_cur->off += needlen-len; - len = needlen; - } + len = space; } + send_len = evbuffer_remove(ctx->evbuffer, ptr, len); + assert(send_len>0); + ctx->upload_offset += send_len; + ctx->instance->statistic.memory_used -= send_len; return len; } //return value: <0:fail; =0: not exec; >0: OK -int http_put_bodypart_request(struct tango_cache_ctx *ctx, struct buffer_cache_list *list, bool full) +static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full) { CURLMcode rc; char minio_url[256]; - list->cache_cur = TAILQ_FIRST(&list->cache_list); - if(list->cache_cur == NULL) - { - return 0; //已经上传过 - } - if(NULL == (list->curl=curl_easy_init())) + if(NULL == (ctx->curl=curl_easy_init())) { return -1; } + ctx->upload_offset = 0; if(full) { snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key); } else { - snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key, list->part_number, ctx->uploadID); - curl_easy_setopt(list->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb); - curl_easy_setopt(list->curl, CURLOPT_HEADERDATA, list); + snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key, ++ctx->part_index, ctx->uploadID); + curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb); + curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); } - curl_easy_setopt(list->curl, CURLOPT_URL, minio_url); - curl_easy_setopt(list->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); - curl_easy_setopt(list->curl, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(list->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); - curl_easy_setopt(list->curl, CURLOPT_WRITEDATA, list); - curl_easy_setopt(list->curl, CURLOPT_ERRORBUFFER, ctx->error); - curl_easy_setopt(list->curl, CURLOPT_PRIVATE, ctx); - curl_easy_setopt(list->curl, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(list->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(list->curl, CURLOPT_HTTPHEADER, ctx->headers_puts); - curl_easy_setopt(list->curl, CURLOPT_LOW_SPEED_TIME, 2L); - curl_easy_setopt(list->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L); + curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); + curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error); + curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); + curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L); + curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L); - curl_easy_setopt(list->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(list->curl, CURLOPT_INFILESIZE, list->length); - curl_easy_setopt(list->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); - curl_easy_setopt(list->curl, CURLOPT_READDATA, list); + curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->upload_length); + curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); + curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); - rc = curl_multi_add_handle(ctx->instance->multi_hd, list->curl); + rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); return 1; } @@ -183,16 +163,8 @@ static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void return size*count; } } - - if(estr->size-estr->len < size*count+1) - { - estr->size += size*count*2+1; - estr->buff = (char*)realloc(estr->buff,estr->size); - } - - memcpy(estr->buff+estr->len,ptr,size*count); - estr->len+=size*count; - estr->buff[estr->len]='\0'; + + easy_string_savedata(estr, (const char*)ptr, size*count); return size*count; } @@ -218,7 +190,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers_puts); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L); @@ -310,6 +282,11 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->combine_xml); curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length + if(ctx->headers != NULL) + { + curl_slist_free_all(ctx->headers); + ctx->headers = NULL; + } ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); @@ -319,15 +296,10 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) } //return value: true-成功添加事件;false-未添加事件 -bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, struct buffer_cache_list *list) +bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len) { int ret = 1; - if(ctx->fail_state) - { - return false; - } - switch(ctx->put_state) { case PUT_STATE_START: @@ -335,12 +307,11 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, struct buffe ret = curl_get_minio_uploadID(ctx); break; - case PUT_STATE_START_DONE: case PUT_STATE_PART: - ret = http_put_bodypart_request(ctx, list, false); - if(ret > 0) + if(ctx->curl == NULL) { - ctx->part_runing_num++; + ctx->upload_length = block_len; + ret = http_put_bodypart_request_evbuf(ctx, false); } break; @@ -356,6 +327,29 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, struct buffe return true; } +int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx) +{ + int ret=0; + + ctx->put_state = PUT_STATE_END; + ctx->upload_length = evbuffer_get_length(ctx->evbuffer); + if(ctx->upload_length > 0) + { + ret = http_put_bodypart_request_evbuf(ctx, true); + if(ret <= 0) + { + ctx->fail_state = true; + tango_cache_ctx_destroy(ctx); + } + } + else + { + tango_cache_ctx_destroy(ctx); + } + + return ret; +} + int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) { int ret = 0; @@ -370,38 +364,14 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) switch(ctx->put_state) { case PUT_STATE_START: - ctx->put_state = PUT_STATE_END; - if(ctx->list_cur->cache_cur->len > 0) - { - TAILQ_INSERT_TAIL(&ctx->list_cur->cache_list, ctx->list_cur->cache_cur, node); - ctx->list_cur->length += ctx->list_cur->cache_cur->len; - ret = http_put_bodypart_request(ctx, ctx->list_cur, true); - if(ret <= 0) - { - tango_cache_ctx_destroy(ctx); - } - } - else - { - tango_cache_ctx_destroy(ctx); - } + http_put_complete_part_evbuf(ctx); break; - + case PUT_STATE_PART: - if(ctx->list_cur->length + ctx->list_cur->cache_cur->len > 0) + if(ctx->curl == NULL) { - TAILQ_INSERT_TAIL(&ctx->list_cur->cache_list, ctx->list_cur->cache_cur, node); - ctx->list_cur->length += ctx->list_cur->cache_cur->len; - ctx->list_cur->cache_cur = NULL; - TAILQ_INSERT_TAIL(&ctx->cache_head, ctx->list_cur, node); - cache_kick_upload_minio_multipart(ctx, ctx->list_cur); - ctx->list_cur = NULL; - } - else - { - buffer_cache_list_destroy(ctx->list_cur, ctx); - ctx->list_cur = NULL; - if(ctx->part_runing_num==0) //已全部上传完成,而且END时无数据块了 + ctx->upload_length = evbuffer_get_length(ctx->evbuffer); + if(ctx->upload_length == 0) { if(cache_kick_combine_minio(ctx)) { @@ -409,9 +379,26 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) } else { + ctx->fail_state = true; tango_cache_ctx_destroy(ctx); } - } + } + else + { + ret = http_put_bodypart_request_evbuf(ctx, false); + if(ret <= 0) + { + ctx->fail_state = true; + if(cache_cancel_upload_minio(ctx)) + { + ctx->put_state = PUT_STATE_CANCEL; + } + else + { + tango_cache_ctx_destroy(ctx); + } + } + } } break; @@ -425,93 +412,67 @@ int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx) void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code) { - struct buffer_cache_list *list; - switch(ctx->put_state) { case PUT_STATE_WAIT_START: - ctx->curl = NULL; - ctx->res_code = 0; - curl_multi_remove_handle(ctx->instance->multi_hd, easy); - curl_easy_cleanup(easy); if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->uploadID)) { - response_buffer_destroy(&ctx->response); + easy_string_destroy(&ctx->response); ctx->error_code = CACHE_ERR_CURL; ctx->fail_state = true; if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); } else { - free(ctx->response.buff); - ctx->response.buff = NULL; - ctx->put_state = PUT_STATE_START_DONE; - TAILQ_FOREACH(list, &ctx->cache_head, node) - { - if(!cache_kick_upload_minio_multipart(ctx, list)) - { - ctx->fail_state = true; - break; - } - else - { - ctx->put_state = PUT_STATE_PART; - } - } - } - if(ctx->close_state) - { - if(!ctx->fail_state) + easy_string_destroy(&ctx->response); + ctx->put_state = PUT_STATE_PART; + if(ctx->close_state) { cache_kick_upload_minio_end(ctx); } - else if(ctx->put_state!=PUT_STATE_PART) + else { - tango_cache_ctx_destroy(ctx); + size_t upload_length = evbuffer_get_length(ctx->evbuffer); + if(upload_length >= ctx->instance->upload_block_size) + { + cache_kick_upload_minio_multipart(ctx, upload_length); + } } } break; case PUT_STATE_PART: - curl_multi_remove_handle(ctx->instance->multi_hd, easy); - curl_easy_cleanup(easy); - ctx->part_runing_num--; - TAILQ_FOREACH(list, &ctx->cache_head, node) - { - if(list->curl == easy) - { - list->curl = NULL; - break; - } - } - assert(list != NULL); //PART状态不被打断 - if(res != CURLE_OK ||res_code!=200L ) + if(res != CURLE_OK || res_code!=200L) { ctx->fail_state = true; if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); } - if(ctx->part_runing_num==0 && ctx->list_cur==NULL) + if(ctx->fail_state) { - if(ctx->fail_state && cache_cancel_upload_minio(ctx)) + if(cache_cancel_upload_minio(ctx)) { ctx->put_state = PUT_STATE_CANCEL; } - else if(!ctx->fail_state && ctx->close_state && cache_kick_combine_minio(ctx)) - { - ctx->put_state = PUT_STATE_END; - } else if(ctx->close_state) { tango_cache_ctx_destroy(ctx); } } + else if(ctx->close_state) + { + cache_kick_upload_minio_end(ctx); + } + else + { + size_t upload_length = evbuffer_get_length(ctx->evbuffer); + if(upload_length >= ctx->instance->upload_block_size) + { + cache_kick_upload_minio_multipart(ctx, upload_length); + } + } break; case PUT_STATE_CANCEL: //等待关闭 - ctx->curl = NULL; - ctx->res_code = 0; - curl_multi_remove_handle(ctx->instance->multi_hd, easy); - curl_easy_cleanup(easy); if(ctx->close_state) { tango_cache_ctx_destroy(ctx); @@ -530,7 +491,7 @@ void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode } } -int tango_cache_upload_once_start(struct tango_cache_ctx *ctx, const char *data, size_t size) +int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) { CURLMcode rc; char minio_url[256]; @@ -538,7 +499,7 @@ int tango_cache_upload_once_start(struct tango_cache_ctx *ctx, const char *data, if(NULL == (ctx->curl=curl_easy_init())) { tango_cache_ctx_destroy(ctx); - if(ctx->way == PUT_ONCE_FREE) + if(way == PUT_MEM_FREE) { free((void *)data); } @@ -556,11 +517,11 @@ int tango_cache_upload_once_start(struct tango_cache_ctx *ctx, const char *data, curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers_puts); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L); curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L); - if(ctx->way == PUT_ONCE_COPY) + if(way == PUT_MEM_COPY) { ctx->response.buff = (char *)malloc(size); memcpy(ctx->response.buff, data, size); @@ -582,12 +543,33 @@ int tango_cache_upload_once_start(struct tango_cache_ctx *ctx, const char *data, return 0; } +int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf) +{ + size_t size; + + ctx->evbuffer = evbuffer_new(); + size = evbuffer_get_length(evbuf); + if(way == EVBUFFER_MOVE) + { + if(evbuffer_add_buffer(ctx->evbuffer, evbuf)) + { + return -1; + } + } + else + { + if(evbuffer_add_buffer_reference(ctx->evbuffer, evbuf)) + { + return -1; + } + } + ctx->instance->statistic.memory_used += size; + + return http_put_complete_part_evbuf(ctx); +} + void tango_cache_curl_get_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code) { - curl_multi_remove_handle(ctx->instance->multi_hd, easy); - curl_easy_cleanup(easy); - ctx->curl = NULL; - switch(ctx->get_state) { case GET_STATE_START: @@ -634,12 +616,12 @@ static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, vo return size*count; } - if(!ctx->expire_comes) //无Expires时 + if(ctx->need_hdrs!=RESPONSE_HDR_ALL) //无Expires时 { ctx->fail_state = true; ctx->error_code = CACHE_CACHE_MISS; ctx->get_state = GET_STATE_DELETE; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache Expires not found"); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache Expires or last-modify not found"); } else { @@ -651,31 +633,85 @@ static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, vo return size*count; } +static bool check_expires_header(struct tango_cache_ctx *ctx, const char *expires_val, size_t len) +{ + time_t time_gmt; + + ctx->expires = expires_hdr2timestamp(expires_val, len); + time_gmt = get_gmtime_timestamp(time(NULL)); + + if(time_gmt > ctx->expires) //缓存失效;TODO relative_age的含义是啥 + { + ctx->fail_state = true; + ctx->error_code = CACHE_TIMEOUT; + ctx->get_state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作 + easy_string_destroy(&ctx->response); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh"); + return false; + } + return true; +} + +static bool check_fresh_header(struct tango_cache_ctx *ctx) +{ + struct tango_cache_result result; + time_t now_gmt; + + if(ctx->need_hdrs != RESPONSE_HDR_ALL) + return true; + + now_gmt = get_gmtime_timestamp(time(NULL)); + if(ctx->last_modify+ctx->max_age > now_gmt || now_gmt+ctx->min_fresh>ctx->expires) + { + ctx->fail_state = true; + ctx->error_code = CACHE_TIMEOUT; + easy_string_destroy(&ctx->response); + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh"); + return false; + } + + if(ctx->response.buff != NULL) + { + result.data_frag = ctx->response.buff; + result.size = ctx->response.len; + result.type = RESULT_TYPE_HEADER; + promise_success(future_to_promise(ctx->future), &result); + easy_string_destroy(&ctx->response); + } + return true; +} + +static bool check_get_result_code(struct tango_cache_ctx *ctx) +{ + CURLcode code; + + code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); + if(code != CURLE_OK || ctx->res_code!=200L) + { + ctx->fail_state = true; + ctx->error_code = CACHE_CACHE_MISS; + promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, (code!=CURLE_OK)?ctx->error:"cache not hit"); + return false; + } + return true; +} + static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp) { struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - CURLcode code; struct tango_cache_result result; char *start=(char *)ptr, *pos_colon, *hdrdata=(char*)ptr; bool ptr_valid=false; size_t raw_len = size*count, hdrlen=size*count; + char usrhdr[2048]; if(ctx->fail_state || ctx->get_state==GET_STATE_DELETE) { return raw_len; } - - if(ctx->res_code==0) //首次应答时先看应答码是否是200 + if(ctx->res_code==0 && !check_get_result_code(ctx)) //首次应答时先看应答码是否是200 { - code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code); - if(code != CURLE_OK || ctx->res_code!=200L) - { - ctx->fail_state = true; - ctx->error_code = CACHE_CACHE_MISS; - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not hit"); - if(code != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); - return raw_len; - } + return raw_len; } if((pos_colon=(char*)memchr(start, ':', raw_len))!=NULL) @@ -686,39 +722,30 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, case 7: if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7)) { - time_t expire = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1); - time_t time_gmt = get_gmtime_timestamp(time(NULL)); - if(time_gmt + ctx->relative_age > expire) //缓存失效;TODO relative_age的含义是啥 + ctx->need_hdrs |= RESPONSE_HDR_EXPIRES; + if(!check_expires_header(ctx, pos_colon + 1, raw_len - datalen - 1) || !check_fresh_header(ctx)) { - ctx->fail_state = true; - ctx->error_code = CACHE_TIMEOUT; - if(time_gmt>=expire) ctx->get_state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作 - response_buffer_destroy(&ctx->response); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh"); return raw_len; } - else if(ctx->response.buff != NULL) + } + break; + case 13: + if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13)) + { + ctx->need_hdrs |= RESPONSE_HDR_LAST_MOD; + sscanf(pos_colon+1, "%lu", &ctx->last_modify); + if(!check_fresh_header(ctx)) { - result.data_frag = ctx->response.buff; - result.size = ctx->response.len; - result.type = RESULT_TYPE_HEADER; - promise_success(future_to_promise(ctx->future), &result); - response_buffer_destroy(&ctx->response); + return raw_len; } - ctx->expire_comes = true; } break; case 15: if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15)) { - if(ctx->response.size-ctx->response.len < raw_len+1) + if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usrhdr, 2048))>0) { - ctx->response.size += raw_len*8 + 1; - ctx->response.buff = (char*)realloc(ctx->response.buff, ctx->response.size); - } - if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)ctx->response.buff+ctx->response.len, ctx->response.size-ctx->response.len))>0) - { - hdrdata = ctx->response.buff+ctx->response.len; + hdrdata = usrhdr; ptr_valid = true; } } @@ -734,7 +761,7 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, if(ptr_valid) { - if(ctx->expire_comes) + if(ctx->need_hdrs==RESPONSE_HDR_ALL) { result.data_frag = hdrdata; result.size = hdrlen; @@ -743,14 +770,7 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, } else { - if(ctx->response.size-ctx->response.len < hdrlen+1) - { - ctx->response.size += hdrlen*8 + 1; - ctx->response.buff = (char*)realloc(ctx->response.buff, ctx->response.size); - } - memcpy(ctx->response.buff+ctx->response.len, hdrdata, hdrlen); - ctx->response.len += hdrlen; - ctx->response.buff[ctx->response.len] = '\0'; + easy_string_savedata(&ctx->response, hdrdata, hdrlen); } } return raw_len; diff --git a/cache/tango_cache_transfer.h b/cache/tango_cache_transfer.h index 5037790..da5f210 100644 --- a/cache/tango_cache_transfer.h +++ b/cache/tango_cache_transfer.h @@ -11,8 +11,10 @@ void tango_cache_curl_get_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode bool cache_delete_minio_object(struct tango_cache_ctx *ctx); int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); -bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, struct buffer_cache_list *list); -int tango_cache_upload_once_start(struct tango_cache_ctx *ctx, const char *data, size_t size); +bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len); + +int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size); +int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf); int tango_cache_fetch_start(struct tango_cache_ctx *ctx); diff --git a/cache/tango_cache_xml.cpp b/cache/tango_cache_xml.cpp index ab210c0..4e90987 100644 --- a/cache/tango_cache_xml.cpp +++ b/cache/tango_cache_xml.cpp @@ -48,7 +48,7 @@ bool parse_uploadID_xml(const char *content, int len, char **uploadID) int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) { - struct buffer_cache_list *list; + struct multipart_etag_list *etag; xmlDoc *pdoc; xmlNode *root, *child; char number[20]; @@ -58,11 +58,11 @@ int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/"); xmlDocSetRootElement(pdoc, root); - TAILQ_FOREACH(list, &ctx->cache_head, node) + TAILQ_FOREACH(etag, &ctx->cache_head, node) { - sprintf(number, "%u", list->part_number); + sprintf(number, "%u", etag->part_number); child = xmlNewChild(root, NULL, (const xmlChar*)"Part", NULL); - xmlNewChild(child, NULL, (const xmlChar*)"ETag", (const xmlChar*)list->etag); + xmlNewChild(child, NULL, (const xmlChar*)"ETag", (const xmlChar*)etag->etag); xmlNewChild(child, NULL, (const xmlChar*)"PartNumber", (const xmlChar*)number); } diff --git a/cache/test_demo/cache_evbase_test.cpp b/cache/test_demo/cache_evbase_test.cpp index 82941e9..17a09b1 100644 --- a/cache/test_demo/cache_evbase_test.cpp +++ b/cache/test_demo/cache_evbase_test.cpp @@ -186,7 +186,28 @@ int main(int argc, char **argv) pdata->future = future_create(put_future_success, put_future_failed, pdata); promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); - cache_evbase_upload_once(instance_asyn, pdata->future, PUT_ONCE_FREE, p, filelen, &meta, pdata->filename, 256); + cache_evbase_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, p, filelen, &meta, pdata->filename, 256); + } + else if(!strcasecmp(p, "PUTONCEEV")) + { + size_t readlen; + pdata->future = future_create(put_future_success, put_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + struct evbuffer *evbuf = evbuffer_new(); + char buffer[1024]; + + FILE *fp = fopen(filename_in, "rb"); + while(!feof(fp)) + { + readlen = fread(buffer, 1, 1024, fp); + if(readlen < 0) + { + assert(0); + } + evbuffer_add(evbuf, buffer, readlen); + } + fclose(fp); + cache_evbase_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &meta, pdata->filename, 256); } else { @@ -202,7 +223,7 @@ int main(int argc, char **argv) { n = fread(buffer, 1, 1024, fp); assert(n>=0); - cache_evbase_update_frag(ctx, PUT_ONCE_COPY, buffer, n); + cache_evbase_update_frag_data(ctx, PUT_MEM_COPY, buffer, n); } cache_evbase_update_end(ctx); @@ -210,8 +231,13 @@ int main(int argc, char **argv) } printf("Waiting to finish.......\n"); + static int num=0; while(still_runing) { + /*if(++num==10) + { + cache_evbase_update_end(ctx); + }*/ sleep(1); } diff --git a/cache/test_demo/tango_cache_test.c b/cache/test_demo/tango_cache_test.c index fde74ea..22f350a 100644 --- a/cache/test_demo/tango_cache_test.c +++ b/cache/test_demo/tango_cache_test.c @@ -179,7 +179,28 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) pdata->future = future_create(put_future_success, put_future_failed, pdata); promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); - tango_cache_upload_once(tango_instance, pdata->future, PUT_ONCE_FREE, p, filelen, &meta, pdata->filename, 256); + tango_cache_upload_once_data(tango_instance, pdata->future, PUT_MEM_FREE, p, filelen, &meta, pdata->filename, 256); + } + else if(!strcasecmp(p, "PUTONCEEV")) + { + size_t readlen; + pdata->future = future_create(put_future_success, put_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + struct evbuffer *evbuf = evbuffer_new(); + char buffer[1024]; + + FILE *fp = fopen(s, "rb"); + while(!feof(fp)) + { + readlen = fread(buffer, 1, 1024, fp); + if(readlen < 0) + { + assert(0); + } + evbuffer_add(evbuf, buffer, readlen); + } + fclose(fp); + tango_cache_upload_once_evbuf(tango_instance, pdata->future, EVBUFFER_MOVE, evbuf, &meta, pdata->filename, 256); } else { @@ -194,7 +215,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) { n = fread(buffer, 1, 1024, fp); assert(n>=0); - tango_cache_update_frag(ctx, buffer, n); + tango_cache_update_frag_data(ctx, buffer, n); } tango_cache_update_end(ctx); }