This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/src/tango_cache_transfer.cpp
zhangchengwei e1ad321332 [1]统一GET/PUT结束后结果通知机制,API直接调用失败时不回调,其他情况回调(promise);
[2]hiredis版本确定为0.14.0版;
[3]修复tango_cache_ctx_destroy中TAILQ内存释放的BUG;
2018-10-31 10:54:52 +08:00

901 lines
28 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <curl/curl.h>
#include "tango_cache_transfer.h"
#include "tango_cache_xml.h"
#include "tango_cache_tools.h"
//response body很短或不关心时
size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp)
{
return size*count;
}
static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
size_t totallen = size*count;
char *start = (char *)ptr, *end = start + totallen;
struct multipart_etag_list *etag;
if(!strncmp(start, "Etag:", totallen>5?5:totallen))
{
start += 5; end -= 1; totallen -= 5;
while(totallen>0 && (*start==' ')) {start++; totallen--;}
while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;}
if(totallen > 0)
{
etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list));
totallen = end - start + 1;
etag->etag = (char *)malloc(totallen + 1);
etag->part_number = ctx->put.part_index;
memcpy(etag->etag, start, totallen);
*(etag->etag + totallen) = '\0';
TAILQ_INSERT_TAIL(&ctx->put.etag_head, etag, node);
}
}
return size*count;
}
static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *userp)
{
size_t len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(size==0 || count==0 || ctx->response.len>=ctx->response.size)
{
return 0; //不一定调用
}
len = ctx->response.size - ctx->response.len; //剩余待上传的长度
if(len > size * count)
{
len = size * count;
}
memcpy(ptr, ctx->response.buff + ctx->response.len, len);
ctx->response.len += len;
if(ctx->response.len >= ctx->response.size)
{
ctx->instance->statistic.memory_used -= ctx->response.size; //未使用cache buffer自己计算内存增减
easy_string_destroy(&ctx->response);
}
return len;
}
static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp)
{
size_t len, space=size*count, send_len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(size==0 || count==0 || ctx->put.upload_offset>=ctx->put.upload_length)
{
return 0;
}
len = ctx->put.upload_length - ctx->put.upload_offset;
if(len > space)
{
len = space;
}
send_len = evbuffer_remove(ctx->put.evbuf, ptr, len);
assert(send_len>0);
ctx->put.upload_offset += send_len;
ctx->instance->statistic.memory_used -= send_len;
return send_len;
}
//return value: <0:fail; =0: not exec; >0: OK
static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return -1;
}
ctx->put.upload_offset = 0;
if(full)
{
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
}
else
{
snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
}
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //测试发现有某链接接收卡住的情况
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
DBG_CACHE("state: %d, length: %lu, key: %s\n", ctx->put.state, ctx->put.upload_length, ctx->object_key);
return 1;
}
static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
struct easy_string *estr = &ctx->response;
CURLcode code;
if(ctx->fail_state)
{
return size*count;
}
if(ctx->res_code == 0)
{
code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
if(code != CURLE_OK || ctx->res_code!=200L)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
return size*count;
}
}
easy_string_savedata(estr, (const char*)ptr, size*count);
return size*count;
}
int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //默认使用回调函数调用fread测试发现关闭Expect时会导致卡在curl_multi_socket_action
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
return 1;
}
int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
{
CURLMcode rc;
char minio_url[256];
ctx->instance->statistic.del_recv_num += 1;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, call_back); //终结者
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 1;
}
//return value: true-成功添加事件false-未添加事件
bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
{
int len=0;
CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
construct_complete_xml(ctx, &ctx->put.combine_xml, &len);
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
if(ctx->headers != NULL)
{
curl_slist_free_all(ctx->headers);
ctx->headers = NULL;
}
ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len)
{
int ret = 1;
switch(ctx->put.state)
{
case PUT_STATE_START:
ctx->put.state = PUT_STATE_WAIT_START;
ret = curl_get_minio_uploadID(ctx);
break;
case PUT_STATE_PART:
if(ctx->curl == NULL)
{
ctx->put.upload_length = block_len;
ret = http_put_bodypart_request_evbuf(ctx, false);
}
break;
default: break;//nothing to do
}
if(ret <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
return false;
}
return true;
}
//callback直接失败是否调用回调函数流式需要完整一次性不需要
static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback)
{
int ret=-1;
ctx->put.state = PUT_STATE_END;
ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
if(ctx->put.upload_length > 0)
{
ret = http_put_bodypart_request_evbuf(ctx, true);
if(ret <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, callback);
}
}
else
{
tango_cache_ctx_destroy(ctx, callback);
}
return ret;
}
void cache_kick_upload_minio_end(struct tango_cache_ctx *ctx)
{
DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not");
ctx->put.close_state = true;//仅设置状态,并非真正关闭;内部状态机轮转结束后再关闭
if(ctx->fail_state)
{
tango_cache_ctx_destroy(ctx);
return;
}
switch(ctx->put.state)
{
case PUT_STATE_START:
http_put_complete_part_evbuf(ctx, true);
break;
case PUT_STATE_PART:
if(ctx->curl == NULL)
{
ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
if(ctx->put.upload_length == 0)
{
if(cache_kick_combine_minio(ctx))
{
ctx->put.state = PUT_STATE_END;
}
else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx);
}
}
else if(http_put_bodypart_request_evbuf(ctx, false) <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
if(cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
}
else
{
tango_cache_ctx_destroy(ctx);
}
}
}
break;
case PUT_STATE_END: assert(0); //用户主动调用end时不可能处于此状态
case PUT_STATE_WAIT_START: //此时未获取到uploadId所以无法触发上传
default: break;
}
}
void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
switch(ctx->put.state)
{
case PUT_STATE_WAIT_START:
if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID))
{
easy_string_destroy(&ctx->response);
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
if(ctx->put.close_state)
{
tango_cache_ctx_destroy(ctx);
}
}
else
{
easy_string_destroy(&ctx->response);
ctx->put.state = PUT_STATE_PART;
if(ctx->put.close_state)
{
cache_kick_upload_minio_end(ctx);
}
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
if(upload_length >= ctx->instance->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
}
}
break;
case PUT_STATE_PART:
if(res != CURLE_OK || res_code!=200L)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
if(ctx->fail_state)
{
if(cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
}
else if(ctx->put.close_state)
{
tango_cache_ctx_destroy(ctx);
}
}
else if(ctx->put.close_state)
{
cache_kick_upload_minio_end(ctx);
}
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
if(upload_length >= ctx->instance->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
}
break;
case PUT_STATE_CANCEL: //等待关闭
if(ctx->put.close_state)
{
tango_cache_ctx_destroy(ctx);
}
break;
case PUT_STATE_END:
if(res != CURLE_OK || res_code!=200L)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
tango_cache_ctx_destroy(ctx);
break;
default: break;
}
}
int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
{
CURLMcode rc;
char minio_url[256];
ctx->instance->statistic.put_recv_num += 1;
ctx->instance->error_code = CACHE_OK;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, callback);
if(way == PUT_MEM_FREE) free((void *)data);
return -1;
}
ctx->put.state = PUT_STATE_END;
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //测试发现有某链接接收卡住的情况
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
if(way == PUT_MEM_COPY)
{
ctx->response.buff = (char *)malloc(size);
memcpy(ctx->response.buff, data, size);
}
else
{
ctx->response.buff = (char *)data;
}
ctx->response.size = size;
ctx->response.len = 0;
ctx->instance->statistic.memory_used += size;
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->response.size);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 0;
}
int tango_cache_upload_once_start_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback)
{
size_t size;
ctx->instance->statistic.put_recv_num += 1;
ctx->instance->error_code = CACHE_OK;
size = evbuffer_get_length(evbuf);
if(way == EVBUFFER_MOVE)
{
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
}
else
{
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
}
ctx->instance->statistic.memory_used += size;
return http_put_complete_part_evbuf(ctx, callback);
}
void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
tango_cache_ctx_destroy(ctx);
}
void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
u_int32_t errnum=0;
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
ctx->del.fail_num = ctx->del.succ_num;
ctx->del.succ_num = 0;
}
else
{
if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE))
{
ctx->del.fail_num = ctx->del.succ_num;
ctx->del.succ_num = 0;
}
else
{
ctx->del.fail_num = errnum;
ctx->del.succ_num -= errnum;
}
if(ctx->del.fail_num > 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
}
tango_cache_ctx_destroy(ctx);
}
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback)
{
CURLMcode rc;
char minio_url[256];
ctx->instance->statistic.del_recv_num += ctx->del.succ_num;
ctx->instance->error_code = CACHE_OK;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->bucketname);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //填充Content-Length在CURLOPT_COPYPOSTFIELDS之前设置
curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
easy_string_destroy(&ctx->response);
return 0;
}
bool fetch_header_over_biz(struct tango_cache_ctx *ctx)
{
if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //无Expires时
{
tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
ctx->get.state = GET_STATE_DELETE;
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
return false;
}
if(ctx->get.response_tag.len > 0)
{
ctx->get.result.data_frag = ctx->get.response_tag.buff;
ctx->get.result.size = ctx->get.response_tag.len;
ctx->get.result.type = RESULT_TYPE_USERTAG;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
easy_string_destroy(&ctx->get.response_tag);
}
if(ctx->response.len > 0)
{
ctx->get.result.data_frag = ctx->response.buff;
ctx->get.result.size = ctx->response.len;
ctx->get.result.type = RESULT_TYPE_HEADER;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
easy_string_destroy(&ctx->response);
}
return true;
}
static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE)
{
return size*count;
}
if(!fetch_header_over_biz(ctx))
{
return size*count;
}
ctx->get.result.data_frag = (const char *)ptr;
ctx->get.result.size = size * count;
ctx->get.result.type = RESULT_TYPE_BODY;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
return size*count;
}
bool check_expires_fresh_header(struct tango_cache_ctx *ctx)
{
time_t now_gmt;
if(ctx->get.need_hdrs != RESPONSE_HDR_ALL)
return true;
now_gmt = get_gmtime_timestamp(time(NULL));
if(now_gmt > ctx->get.expires)
{
tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
ctx->get.state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
easy_string_destroy(&ctx->response);
return false;
}
if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires)
{
tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
easy_string_destroy(&ctx->response);
return false;
}
return true;
}
static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, long res_code)
{
if(code != CURLE_OK)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
return false;
}
if(res_code != 200L)
{
if(res_code == 404L)
{
tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
}
else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
return false;
}
return true;
}
static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
char *start=(char *)ptr, *pos_colon;
size_t raw_len = size*count, hdrlen=size*count;
char usertag[2048];
size_t datalen;
if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE)
{
return raw_len;
}
if(ctx->res_code == 0) //首次应答时先看应答码是否是200
{
CURLcode code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
if(!check_get_result_code(ctx, code, ctx->res_code))
{
return raw_len;
}
}
pos_colon = (char*)memchr(start, ':', raw_len);
if(pos_colon == NULL)
{
return raw_len;
}
datalen = pos_colon - start;
switch(datalen)
{
case 7:
if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7))
{
ctx->get.need_hdrs |= RESPONSE_HDR_EXPIRES;
ctx->get.expires = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1);
if(!check_expires_fresh_header(ctx))
{
return raw_len;
}
}
break;
case 13:
if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13))
{
ctx->get.need_hdrs |= RESPONSE_HDR_LAST_MOD;
sscanf(pos_colon+1, "%lu", &ctx->get.last_modify);
if(!check_expires_fresh_header(ctx))
{
return raw_len;
}
}
break;
case 15:
if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15))
{
if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0)
{
easy_string_savedata(&ctx->get.response_tag, usertag, hdrlen);
}
}
break;
case 14:
if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14))
{
sscanf(pos_colon+1, "%lu", &ctx->get.result.tlength);
}
break;
case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
default: break;
}
return raw_len;
}
void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
switch(ctx->get.state)
{
case GET_STATE_START:
if(!ctx->fail_state && check_get_result_code(ctx, res, res_code))
{
if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD发现的字段不全先不删正常情况下无
{
ctx->get.result.type = RESULT_TYPE_END;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
}
}
tango_cache_ctx_destroy(ctx);
break;
case GET_STATE_DELETE:
ctx->get.state = GET_STATE_END;
cache_delete_minio_object(ctx);
break;
case GET_STATE_END:
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;
}
}
int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
{
CURLMcode rc;
char minio_url[256];
ctx->instance->statistic.get_recv_num += 1;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx);
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
if(ctx->method == CACHE_REQUEST_HEAD)
{
curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L);
}
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_get_response_body_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(ctx->curl, CURLOPT_TIMEOUT, ctx->instance->transfer_timeout); //测试发现有某链接接收卡住的情况
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
//ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds"
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 1;
}