update数据增加evbuffer接口

This commit is contained in:
zhangchengwei
2018-09-21 14:50:41 +08:00
committed by zhengchao
parent 0bfd49194e
commit 3fe4625e04
10 changed files with 560 additions and 410 deletions

View File

@@ -85,7 +85,7 @@ static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail
}
}
void response_buffer_destroy(struct easy_string *estr)
void easy_string_destroy(struct easy_string *estr)
{
if(estr->buff != NULL)
{
@@ -95,61 +95,17 @@ void response_buffer_destroy(struct easy_string *estr)
}
}
void buffer_cache_destroy(struct cache_buffer *cache, struct tango_cache_instance *instance)
void easy_string_savedata(struct easy_string *estr, const char *data, size_t len)
{
if(cache->buf)
if(estr->size-estr->len < len+1)
{
free(cache->buf);
estr->size += len*4+1;
estr->buff = (char*)realloc(estr->buff,estr->size);
}
instance->statistic.memory_used -= instance->block_len;
free(cache);
}
static inline struct cache_buffer *buffer_cache_new(struct tango_cache_instance *instance)
{
struct cache_buffer *cache = (struct cache_buffer *)malloc(sizeof(struct cache_buffer));
cache->buf = (char *)malloc(instance->block_len);
cache->len = 0;
cache->off = 0;
instance->statistic.memory_used += instance->block_len;
return cache;
}
void buffer_cache_list_destroy(struct buffer_cache_list *list, struct tango_cache_ctx *ctx)
{
struct cache_buffer *pnode;
TAILQ_FOREACH(pnode, &list->cache_list, node)
{
TAILQ_REMOVE(&list->cache_list, pnode, node);
buffer_cache_destroy(pnode, ctx->instance);
}
if(list->cache_cur)
{
buffer_cache_destroy(list->cache_cur, ctx->instance);
}
if(list->curl)
{
curl_multi_remove_handle(ctx->instance->multi_hd, list->curl);
curl_easy_cleanup(list->curl);
}
if(list->etag)
{
free(list->etag);
}
free(list);
}
static struct buffer_cache_list *buffer_cache_list_new(struct tango_cache_ctx *ctx)
{
struct buffer_cache_list *list = (struct buffer_cache_list *)calloc(1, sizeof(struct buffer_cache_list));
list->part_number = ++ctx->part_index;
list->cache_cur = buffer_cache_new(ctx->instance);
list->ctx = ctx;
TAILQ_INIT(&list->cache_list);
return list;
memcpy(estr->buff+estr->len, data, len);
estr->len += len;
estr->buff[estr->len]='\0';
}
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
@@ -159,23 +115,24 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl);
curl_easy_cleanup(ctx->curl);
}
response_buffer_destroy(&ctx->response);
easy_string_destroy(&ctx->response);
if(ctx->method == CACHE_REQUEST_PUT)
{
struct buffer_cache_list *list;
struct multipart_etag_list *etag;
if(ctx->uploadID != NULL) free(ctx->uploadID);
if(ctx->combine_xml != NULL) free(ctx->combine_xml);
if(ctx->headers != NULL) curl_slist_free_all(ctx->headers);
if(ctx->headers_puts != NULL) curl_slist_free_all(ctx->headers_puts);
if(ctx->list_cur != NULL) buffer_cache_list_destroy(ctx->list_cur, ctx);
if(ctx->evbuffer!=NULL) evbuffer_free(ctx->evbuffer);
TAILQ_FOREACH(list, &ctx->cache_head, node)
TAILQ_FOREACH(etag, &ctx->cache_head, node)
{
TAILQ_REMOVE(&ctx->cache_head, list, node);
buffer_cache_list_destroy(list, ctx);
TAILQ_REMOVE(&ctx->cache_head, etag, node);
free(etag->etag);
free(etag);
}
if(ctx->future != NULL)
{
if(ctx->fail_state)
@@ -197,66 +154,52 @@ void tango_cache_update_end(struct tango_cache_ctx *ctx)
cache_kick_upload_minio_end(ctx);
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>buffer<65><72><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD><EFBFBD><EFBFBD>
void tango_cache_kick_new_buffer(struct tango_cache_ctx *ctx, const char *data, size_t size)
int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size)
{
struct buffer_cache_list *list = ctx->list_cur;
if(list->length >= ctx->instance->upload_block_size)
{
TAILQ_INSERT_TAIL(&ctx->cache_head, list, node);
cache_kick_upload_minio_multipart(ctx, list);
list = buffer_cache_list_new(ctx);
ctx->list_cur = list;
}
else
{
list->cache_cur = buffer_cache_new(ctx->instance);
}
memcpy(list->cache_cur->buf, data, size);
list->cache_cur->len = size;
}
int tango_cache_update_frag(struct tango_cache_ctx *ctx, const char *data, size_t size)
{
struct buffer_cache_list *list_cur = ctx->list_cur;
struct cache_buffer *cache_cur = list_cur->cache_cur;
u_int32_t copy_len;
if(ctx->fail_state)
{
return -1;
}
//ADD data to Cache Buffer
if(cache_cur->len + size > ctx->instance->block_len)
if(evbuffer_add(ctx->evbuffer, data, size))
{
copy_len = ctx->instance->block_len - cache_cur->len;
memcpy(cache_cur->buf+cache_cur->len, data, copy_len);
cache_cur->len = ctx->instance->block_len;
TAILQ_INSERT_TAIL(&list_cur->cache_list, cache_cur, node);
list_cur->length += ctx->instance->block_len;
list_cur->cache_cur = NULL;
size -= copy_len;
data += copy_len;
return -1;
}
ctx->instance->statistic.memory_used += size;
if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
}
return 0;
}
while(size >= ctx->instance->block_len)
int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf)
{
size_t size;
if(ctx->fail_state)
{
return -1;
}
size = evbuffer_get_length(evbuf);
if(way == EVBUFFER_MOVE)
{
if(evbuffer_add_buffer(ctx->evbuffer, evbuf))
{
tango_cache_kick_new_buffer(ctx, data, ctx->instance->block_len); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>buffer<65><72>
list_cur = ctx->list_cur;
TAILQ_INSERT_TAIL(&list_cur->cache_list, list_cur->cache_cur, node);
list_cur->length += ctx->instance->block_len;
list_cur->cache_cur = NULL;
size -= ctx->instance->block_len;
data += ctx->instance->block_len;
return -1;
}
tango_cache_kick_new_buffer(ctx, data, size);
}
else
{
memcpy(cache_cur->buf+cache_cur->len, data, size);
cache_cur->len += size;
if(evbuffer_add_buffer_reference(ctx->evbuffer, evbuf))
{
return -1;
}
}
ctx->instance->statistic.memory_used += size;
if(evbuffer_get_length(ctx->evbuffer) >= ctx->instance->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
}
return 0;
}
@@ -266,6 +209,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
struct tango_cache_ctx *ctx;
char buffer[256]={0};
int other_len;
time_t expires, now;
if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size)
{
@@ -291,17 +235,25 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
}
//Expires<65>ֶΣ<D6B6><CEA3><EFBFBD><EFBFBD>ڻ<EFBFBD><DABB><EFBFBD><EFBFBD>ڲ<EFBFBD><DAB2>ж<EFBFBD><D0B6><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD>ʱ
ctx->relative_age = (meta->relative_ttl==0||meta->relative_ttl>instance->relative_ttl)?instance->relative_ttl:meta->relative_ttl;
if(expires_timestamp2hdr_str(time(NULL)+ctx->relative_age, buffer, 256))
now = time(NULL);
expires = (meta->put.relative_ttl==0||meta->put.relative_ttl>instance->relative_ttl)?instance->relative_ttl:meta->put.relative_ttl;
if(expires_timestamp2hdr_str(now + expires, buffer, 256))
{
ctx->headers_puts = curl_slist_append(ctx->headers_puts, buffer);
ctx->headers = curl_slist_append(ctx->headers, buffer);
}
//Last-Modify<66>ֶΣ<D6B6><CEA3><EFBFBD><EFBFBD><EFBFBD>GETʱ<54>ж<EFBFBD><D0B6>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD>
if(meta->put.absulote_lastmod == 0)
{
meta->put.absulote_lastmod = get_gmtime_timestamp(now);
}
sprintf(buffer, "x-amz-meta-lm: %lu", meta->put.absulote_lastmod);
ctx->headers = curl_slist_append(ctx->headers, buffer);
//<2F>б<EFBFBD><D0B1><EFBFBD>֧<EFBFBD>ֵı<D6B5>׼ͷ<D7BC><CDB7>
for(int i=0; i<HDR_CONTENT_NUM; i++)
{
if(meta->std_hdr[i] != NULL)
{
ctx->headers_puts = curl_slist_append(ctx->headers_puts, meta->std_hdr[i]);
ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]);
}
}
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD>GETʱ<54><CAB1>ԭ<EFBFBD><D4AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
@@ -310,7 +262,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
char *p = (char *)malloc((other_len/3 + 1)*4 + 18); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ռ䣻18=17+1: ͷ<><CDB7>+<2B>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
memcpy(p, "x-amz-meta-user: ", 17);
Base64_EncodeBlock((unsigned char*)meta->other_hdr, other_len, (unsigned char*)p+17);
ctx->headers_puts = curl_slist_append(ctx->headers_puts, p);
ctx->headers = curl_slist_append(ctx->headers, p);
free(p);
}
@@ -327,12 +279,13 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in
return NULL;
}
ctx->list_cur = buffer_cache_list_new(ctx);
ctx->evbuffer = evbuffer_new();
TAILQ_INIT(&ctx->cache_head);
return ctx;
}
int tango_cache_upload_once(struct tango_cache_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *filename, size_t namelen)
int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* future,
enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *objectkey, size_t keysize)
{
struct tango_cache_ctx *ctx;
@@ -341,13 +294,30 @@ int tango_cache_upload_once(struct tango_cache_instance *instance, struct future
{
return -1;
}
ctx->way = way;
if(filename != NULL)
if(objectkey != NULL)
{
snprintf(filename, namelen, "%s", ctx->file_key);
snprintf(objectkey, keysize, "%s", ctx->file_key);
}
return tango_cache_upload_once_start(ctx, data, size);
return tango_cache_upload_once_start_data(ctx, way, data, size);
}
int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* future,
enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta *meta, char *objectkey, size_t keysize)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_update_prepare(instance, future, meta);
if(ctx == NULL)
{
return -1;
}
if(objectkey != NULL)
{
snprintf(objectkey, keysize, "%s", ctx->file_key);
}
return tango_cache_upload_once_start_evbuf(ctx, way, evbuf);
}
struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta)
@@ -360,7 +330,8 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
ctx->future = future;
ctx->method = CACHE_REQUEST_GET;
ctx->get_state = GET_STATE_START;
ctx->expire_comes = false;
ctx->max_age = meta->get.max_age;
ctx->min_fresh = meta->get.min_fresh;
instance->statistic.get_recv_num += 1;
if(instance->hash_object_key)
@@ -401,6 +372,10 @@ static void check_multi_info(CURLM *multi)
res = msg->data.result;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx);
curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code);
curl_multi_remove_handle(multi, easy);
curl_easy_cleanup(easy);
ctx->curl = NULL;
ctx->res_code = 0;
if(ctx->method == CACHE_REQUEST_GET)
{