This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/tango_cache_transfer.cpp
zhangchengwei 0bfd49194e 创建
2018-10-08 19:33:01 +08:00

791 lines
24 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <curl/curl.h>
#include "tango_cache_transfer.h"
#include "tango_cache_xml.h"
#include "tango_cache_tools.h"
//response body很短或不关心时
size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp)
{
return size*count;
}
static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct buffer_cache_list *list = (struct buffer_cache_list *)userp;
size_t totallen = size*count;
char *start = (char *)ptr, *end = start + totallen;
if(list->etag == NULL && !strncmp(start, "Etag:", totallen>5?5:totallen))
{
start += 5; end -= 1; totallen -= 5;
while(totallen>0 && (*start==' ')) {start++; totallen--;}
while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;}
if(totallen > 0)
{
totallen = end - start + 1;
list->etag = (char *)malloc(totallen + 1);
memcpy(list->etag, start, totallen);
*(list->etag + totallen) = '\0';
}
}
return size*count;
}
static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *userp)
{
size_t len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(size==0 || count==0 || ctx->response.len>=ctx->response.size)
{
return 0; //不一定调用
}
len = ctx->response.size - ctx->response.len; //剩余待上传的长度
if(len > size * count)
{
len = size * count;
}
memcpy(ptr, ctx->response.buff + ctx->response.len, len);
ctx->response.len += len;
if(ctx->response.len >= ctx->response.size)
{
ctx->instance->statistic.memory_used -= ctx->response.size; //未使用cache buffer自己计算内存增减
if(ctx->way == PUT_ONCE_COPY)
{
response_buffer_destroy(&ctx->response);
}
else
{
ctx->response.buff = NULL;
ctx->response.len = ctx->response.size = 0;
}
}
return len;
}
static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp)
{
size_t len=0, needlen=size * count, remainlen;
struct buffer_cache_list *list = (struct buffer_cache_list *)userp;
struct cache_buffer *next_cache;
if(size==0 || count==0 || list->cache_cur==NULL)
{
return 0;
}
while(len<needlen && list->cache_cur!=NULL)
{
remainlen = list->cache_cur->len - list->cache_cur->off;
if(needlen-len >= remainlen)
{
memcpy((char*)ptr+len, list->cache_cur->buf+list->cache_cur->off, remainlen);
len += remainlen;
next_cache = TAILQ_NEXT(list->cache_cur, node);
TAILQ_REMOVE(&list->cache_list, list->cache_cur, node);
buffer_cache_destroy(list->cache_cur, list->ctx->instance);
list->cache_cur = next_cache;
}
else
{
memcpy((char*)ptr+len, list->cache_cur->buf+list->cache_cur->off, needlen-len);
list->cache_cur->off += needlen-len;
len = needlen;
}
}
return len;
}
//return value: <0:fail; =0: not exec; >0: OK
int http_put_bodypart_request(struct tango_cache_ctx *ctx, struct buffer_cache_list *list, bool full)
{
CURLMcode rc;
char minio_url[256];
list->cache_cur = TAILQ_FIRST(&list->cache_list);
if(list->cache_cur == NULL)
{
return 0; //已经上传过
}
if(NULL == (list->curl=curl_easy_init()))
{
return -1;
}
if(full)
{
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key);
}
else
{
snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key, list->part_number, ctx->uploadID);
curl_easy_setopt(list->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb);
curl_easy_setopt(list->curl, CURLOPT_HEADERDATA, list);
}
curl_easy_setopt(list->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(list->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(list->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(list->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(list->curl, CURLOPT_WRITEDATA, list);
curl_easy_setopt(list->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(list->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(list->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(list->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(list->curl, CURLOPT_HTTPHEADER, ctx->headers_puts);
curl_easy_setopt(list->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(list->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L);
curl_easy_setopt(list->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(list->curl, CURLOPT_INFILESIZE, list->length);
curl_easy_setopt(list->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
curl_easy_setopt(list->curl, CURLOPT_READDATA, list);
rc = curl_multi_add_handle(ctx->instance->multi_hd, list->curl);
assert(rc==CURLM_OK);
return 1;
}
static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
struct easy_string *estr = &ctx->response;
CURLcode code;
if(ctx->fail_state)
{
return size*count;
}
if(ctx->res_code == 0)
{
code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
if(code != CURLE_OK || ctx->res_code!=200L)
{
ctx->fail_state = true;
ctx->error_code = CACHE_CACHE_MISS;
if(code != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
return size*count;
}
}
if(estr->size-estr->len < size*count+1)
{
estr->size += size*count*2+1;
estr->buff = (char*)realloc(estr->buff,estr->size);
}
memcpy(estr->buff+estr->len,ptr,size*count);
estr->len+=size*count;
estr->buff[estr->len]='\0';
return size*count;
}
int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
free(ctx);
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_write_uploadID_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers_puts);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 1;
}
bool cache_delete_minio_object(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key, ctx->uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
{
int len=0;
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
construct_complete_xml(ctx, &ctx->combine_xml, &len);
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key, ctx->uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->combine_xml);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length
ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, struct buffer_cache_list *list)
{
int ret = 1;
if(ctx->fail_state)
{
return false;
}
switch(ctx->put_state)
{
case PUT_STATE_START:
ctx->put_state = PUT_STATE_WAIT_START;
ret = curl_get_minio_uploadID(ctx);
break;
case PUT_STATE_START_DONE:
case PUT_STATE_PART:
ret = http_put_bodypart_request(ctx, list, false);
if(ret > 0)
{
ctx->part_runing_num++;
}
break;
default: break;//nothing to do
}
if(ret <= 0)
{
ctx->fail_state = true;
ctx->error_code = CACHE_ERR_CURL;
return false;
}
return true;
}
int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx)
{
int ret = 0;
ctx->close_state = true;//仅设置状态,并非真正关闭;内部状态机轮转结束后再关闭
if(ctx->fail_state)
{
tango_cache_ctx_destroy(ctx);
return 0;
}
switch(ctx->put_state)
{
case PUT_STATE_START:
ctx->put_state = PUT_STATE_END;
if(ctx->list_cur->cache_cur->len > 0)
{
TAILQ_INSERT_TAIL(&ctx->list_cur->cache_list, ctx->list_cur->cache_cur, node);
ctx->list_cur->length += ctx->list_cur->cache_cur->len;
ret = http_put_bodypart_request(ctx, ctx->list_cur, true);
if(ret <= 0)
{
tango_cache_ctx_destroy(ctx);
}
}
else
{
tango_cache_ctx_destroy(ctx);
}
break;
case PUT_STATE_PART:
if(ctx->list_cur->length + ctx->list_cur->cache_cur->len > 0)
{
TAILQ_INSERT_TAIL(&ctx->list_cur->cache_list, ctx->list_cur->cache_cur, node);
ctx->list_cur->length += ctx->list_cur->cache_cur->len;
ctx->list_cur->cache_cur = NULL;
TAILQ_INSERT_TAIL(&ctx->cache_head, ctx->list_cur, node);
cache_kick_upload_minio_multipart(ctx, ctx->list_cur);
ctx->list_cur = NULL;
}
else
{
buffer_cache_list_destroy(ctx->list_cur, ctx);
ctx->list_cur = NULL;
if(ctx->part_runing_num==0) //已全部上传完成而且END时无数据块了
{
if(cache_kick_combine_minio(ctx))
{
ctx->put_state = PUT_STATE_END;
}
else
{
tango_cache_ctx_destroy(ctx);
}
}
}
break;
case PUT_STATE_END: assert(0); //用户主动调用end时不可能处于此状态
case PUT_STATE_WAIT_START: //此时未获取到uploadId所以无法触发上传
default: break;
}
return ret;
}
void tango_cache_curl_put_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
struct buffer_cache_list *list;
switch(ctx->put_state)
{
case PUT_STATE_WAIT_START:
ctx->curl = NULL;
ctx->res_code = 0;
curl_multi_remove_handle(ctx->instance->multi_hd, easy);
curl_easy_cleanup(easy);
if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->uploadID))
{
response_buffer_destroy(&ctx->response);
ctx->error_code = CACHE_ERR_CURL;
ctx->fail_state = true;
if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
}
else
{
free(ctx->response.buff);
ctx->response.buff = NULL;
ctx->put_state = PUT_STATE_START_DONE;
TAILQ_FOREACH(list, &ctx->cache_head, node)
{
if(!cache_kick_upload_minio_multipart(ctx, list))
{
ctx->fail_state = true;
break;
}
else
{
ctx->put_state = PUT_STATE_PART;
}
}
}
if(ctx->close_state)
{
if(!ctx->fail_state)
{
cache_kick_upload_minio_end(ctx);
}
else if(ctx->put_state!=PUT_STATE_PART)
{
tango_cache_ctx_destroy(ctx);
}
}
break;
case PUT_STATE_PART:
curl_multi_remove_handle(ctx->instance->multi_hd, easy);
curl_easy_cleanup(easy);
ctx->part_runing_num--;
TAILQ_FOREACH(list, &ctx->cache_head, node)
{
if(list->curl == easy)
{
list->curl = NULL;
break;
}
}
assert(list != NULL); //PART状态不被打断
if(res != CURLE_OK ||res_code!=200L )
{
ctx->fail_state = true;
if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
}
if(ctx->part_runing_num==0 && ctx->list_cur==NULL)
{
if(ctx->fail_state && cache_cancel_upload_minio(ctx))
{
ctx->put_state = PUT_STATE_CANCEL;
}
else if(!ctx->fail_state && ctx->close_state && cache_kick_combine_minio(ctx))
{
ctx->put_state = PUT_STATE_END;
}
else if(ctx->close_state)
{
tango_cache_ctx_destroy(ctx);
}
}
break;
case PUT_STATE_CANCEL: //等待关闭
ctx->curl = NULL;
ctx->res_code = 0;
curl_multi_remove_handle(ctx->instance->multi_hd, easy);
curl_easy_cleanup(easy);
if(ctx->close_state)
{
tango_cache_ctx_destroy(ctx);
}
break;
case PUT_STATE_END:
if(res != CURLE_OK || res_code!=200L)
{
ctx->fail_state = true;
if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
}
tango_cache_ctx_destroy(ctx);
break;
default: break;
}
}
int tango_cache_upload_once_start(struct tango_cache_ctx *ctx, const char *data, size_t size)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx);
if(ctx->way == PUT_ONCE_FREE)
{
free((void *)data);
}
return -1;
}
ctx->put_state = PUT_STATE_END;
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers_puts);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L);
if(ctx->way == PUT_ONCE_COPY)
{
ctx->response.buff = (char *)malloc(size);
memcpy(ctx->response.buff, data, size);
}
else
{
ctx->response.buff = (char *)data;
}
ctx->response.size = size;
ctx->response.len = 0;
ctx->instance->statistic.memory_used += size;
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->response.size);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 0;
}
void tango_cache_curl_get_done(CURL *easy, struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
curl_multi_remove_handle(ctx->instance->multi_hd, easy);
curl_easy_cleanup(easy);
ctx->curl = NULL;
switch(ctx->get_state)
{
case GET_STATE_START:
if(!ctx->fail_state)
{
if(res!=CURLE_OK || res_code!=200L)
{
ctx->error_code = CACHE_ERR_CURL;
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error);
}
else
{
promise_success(future_to_promise(ctx->future), NULL);
}
}
tango_cache_ctx_destroy(ctx);
break;
case GET_STATE_DELETE:
if(cache_delete_minio_object(ctx))
{
ctx->get_state = GET_STATE_END;
}
else
{
tango_cache_ctx_destroy(ctx);
}
break;
case GET_STATE_END:
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;
}
}
static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
struct tango_cache_result result;
if(ctx->fail_state || ctx->get_state==GET_STATE_DELETE)
{
return size*count;
}
if(!ctx->expire_comes) //无Expires时
{
ctx->fail_state = true;
ctx->error_code = CACHE_CACHE_MISS;
ctx->get_state = GET_STATE_DELETE;
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache Expires not found");
}
else
{
result.data_frag = ptr;
result.size = size * count;
result.type = RESULT_TYPE_BODY;
promise_success(future_to_promise(ctx->future), &result);
}
return size*count;
}
static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
CURLcode code;
struct tango_cache_result result;
char *start=(char *)ptr, *pos_colon, *hdrdata=(char*)ptr;
bool ptr_valid=false;
size_t raw_len = size*count, hdrlen=size*count;
if(ctx->fail_state || ctx->get_state==GET_STATE_DELETE)
{
return raw_len;
}
if(ctx->res_code==0) //首次应答时先看应答码是否是200
{
code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
if(code != CURLE_OK || ctx->res_code!=200L)
{
ctx->fail_state = true;
ctx->error_code = CACHE_CACHE_MISS;
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not hit");
if(code != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
return raw_len;
}
}
if((pos_colon=(char*)memchr(start, ':', raw_len))!=NULL)
{
size_t datalen = pos_colon - start;
switch(datalen)
{
case 7:
if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7))
{
time_t expire = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1);
time_t time_gmt = get_gmtime_timestamp(time(NULL));
if(time_gmt + ctx->relative_age > expire) //缓存失效TODO relative_age的含义是啥
{
ctx->fail_state = true;
ctx->error_code = CACHE_TIMEOUT;
if(time_gmt>=expire) ctx->get_state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作
response_buffer_destroy(&ctx->response);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, "cache not fresh");
return raw_len;
}
else if(ctx->response.buff != NULL)
{
result.data_frag = ctx->response.buff;
result.size = ctx->response.len;
result.type = RESULT_TYPE_HEADER;
promise_success(future_to_promise(ctx->future), &result);
response_buffer_destroy(&ctx->response);
}
ctx->expire_comes = true;
}
break;
case 15:
if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15))
{
if(ctx->response.size-ctx->response.len < raw_len+1)
{
ctx->response.size += raw_len*8 + 1;
ctx->response.buff = (char*)realloc(ctx->response.buff, ctx->response.size);
}
if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)ctx->response.buff+ctx->response.len, ctx->response.size-ctx->response.len))>0)
{
hdrdata = ctx->response.buff+ctx->response.len;
ptr_valid = true;
}
}
break;
case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) ptr_valid = true; break;
case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) ptr_valid = true; break;
case 14: if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14)) ptr_valid = true; break;
case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) ptr_valid = true; break;
case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) ptr_valid = true; break;
default: break;
}
}
if(ptr_valid)
{
if(ctx->expire_comes)
{
result.data_frag = hdrdata;
result.size = hdrlen;
result.type = RESULT_TYPE_HEADER;
promise_success(future_to_promise(ctx->future), &result);
}
else
{
if(ctx->response.size-ctx->response.len < hdrlen+1)
{
ctx->response.size += hdrlen*8 + 1;
ctx->response.buff = (char*)realloc(ctx->response.buff, ctx->response.size);
}
memcpy(ctx->response.buff+ctx->response.len, hdrdata, hdrlen);
ctx->response.len += hdrlen;
ctx->response.buff[ctx->response.len] = '\0';
}
}
return raw_len;
}
int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx);
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->instance->minio_hostlist, ctx->instance->bucketname, ctx->file_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_get_response_body_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
//ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds"
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 1024L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 0;
}