This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/src/tango_cache_transfer.cpp
fengweihao 9745251b2a TSG-7141 修复非格式日志部分下载失败问题
修复重定向指定用户自定义域无法替换问题
2021-07-23 15:55:42 +08:00

987 lines
29 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <curl/curl.h>
#include "tango_cache_transfer.h"
#include "tango_cache_xml.h"
#include "tango_cache_tools.h"
#include "tango_cache_redis.h"
static inline void curl_set_common_options(CURL *curl, long transfer_timeout, char *errorbuf)
{
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errorbuf);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, transfer_timeout); //测试发现多链接有某链接接收卡住的情况
//ctx->error="Operation too slow. Less than 1024 bytes/sec transferred the last 3 seconds"
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 5L);
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
curl_easy_setopt(curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
}
//response body很短或不关心时
size_t curl_response_any_cb(void *ptr, size_t size, size_t count, void *userp)
{
return size*count;
}
static size_t curl_put_multipart_header_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
size_t totallen = size*count;
char *start = (char *)ptr, *end = start + totallen;
struct multipart_etag_list *etag;
if(!strncmp(start, "Etag:", totallen>5?5:totallen))
{
start += 5; end -= 1; totallen -= 5;
while(totallen>0 && (*start==' ')) {start++; totallen--;}
while(totallen>0 && (*end=='\r'||*end=='\n')) {end--; totallen--;}
if(totallen > 0)
{
etag = (struct multipart_etag_list *)malloc(sizeof(struct multipart_etag_list));
totallen = end - start + 1;
etag->etag = (char *)malloc(totallen + 1);
etag->part_number = ctx->put.part_index;
memcpy(etag->etag, start, totallen);
*(etag->etag + totallen) = '\0';
TAILQ_INSERT_TAIL(&ctx->put.etag_head, etag, node);
}
}
return size*count;
}
static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *userp)
{
size_t len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size)
{
return 0; //不一定调用
}
len = ctx->put.once_request.size - ctx->put.once_request.len; //剩余待上传的长度
if(len > size * count)
{
len = size * count;
}
memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len);
ctx->put.once_request.len += len;
if(ctx->put.once_request.len >= ctx->put.once_request.size)
{
ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //未使用cache buffer自己计算内存增减
easy_string_destroy(&ctx->put.once_request);
}
return len;
}
static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, void *userp)
{
size_t len, space=size*count, send_len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(size==0 || count==0 || ctx->put.upload_offset>=ctx->put.upload_length)
{
return 0;
}
len = ctx->put.upload_length - ctx->put.upload_offset;
if(len > space)
{
len = space;
}
send_len = evbuffer_remove(ctx->put.evbuf, ptr, len);
assert(send_len>0);
ctx->put.upload_offset += send_len;
ctx->instance->statistic.memory_used -= send_len;
return send_len;
}
//return value: <0:fail; =0: not exec; >0: OK
static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full)
{
UNUSED CURLMcode rc;
char minio_url[256]={0}, buffer[256]={0};
if(NULL == (ctx->curl=curl_easy_init()))
{
return -1;
}
ctx->put.upload_offset = 0;
if(full)
{
snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
}
else
{
snprintf(minio_url, 256, "http://%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
}
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
//token字段用于hos存储认证
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
DBG_CACHE("state: %d, length: %lu, key: %s\n", ctx->put.state, ctx->put.upload_length, ctx->object_key);
return 1;
}
static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
struct easy_string *estr = &ctx->response;
CURLcode code;
if(ctx->fail_state)
{
return size*count;
}
if(ctx->res_code == 0)
{
code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
if(code != CURLE_OK || ctx->res_code!=200L)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
return size*count;
}
}
easy_string_savedata(estr, (const char*)ptr, size*count);
return size*count;
}
int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
{
UNUSED CURLMcode rc;
char minio_url[256]={0}, buffer[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return -1;
}
snprintf(minio_url, 256, "http://%s/%s?uploads", ctx->hostaddr, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //默认使用回调函数调用fread测试发现关闭Expect时会导致卡在curl_multi_socket_action
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
return 1;
}
int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
{
UNUSED CURLMcode rc;
char minio_url[256], buffer[256];
ctx->instance->statistic.del_recv_num += 1;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, call_back); //终结者
return -1;
}
snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 1;
}
//return value: true-成功添加事件false-未添加事件
bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx)
{
UNUSED CURLMcode rc;
char minio_url[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
{
int len=0;
UNUSED CURLMcode rc;
char minio_url[256], buffer[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
return false;
}
construct_complete_xml(ctx, &ctx->put.combine_xml, &len);
snprintf(minio_url, 256, "http://%s/%s?uploadId=%s", ctx->hostaddr, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
if(ctx->headers != NULL)
{
curl_slist_free_all(ctx->headers);
ctx->headers = NULL;
}
ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
return true;
}
//return value: true-成功添加事件false-未添加事件
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len)
{
int ret = 1;
switch(ctx->put.state)
{
case PUT_STATE_START:
if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_HOS))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION);
return false;
}
ctx->put.state = PUT_STATE_WAIT_START;
ret = curl_get_minio_uploadID(ctx);
break;
case PUT_STATE_PART:
if(ctx->curl == NULL)
{
ctx->put.upload_length = block_len;
ret = http_put_bodypart_request_evbuf(ctx, false);
}
break;
default: break;//nothing to do
}
if(ret <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
return false;
}
return true;
}
//callback直接失败是否调用回调函数流式需要完整一次性不需要
static int http_put_complete_part_evbuf(struct tango_cache_ctx *ctx, bool callback)
{
int ret=-1;
ctx->put.state = PUT_STATE_END;
ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
if(ctx->put.upload_length > 0)
{
ret = http_put_bodypart_request_evbuf(ctx, true);
if(ret <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, callback);
}
}
else
{
tango_cache_ctx_destroy(ctx, callback);
}
return ret;
}
int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback)
{
DBG_CACHE("state: %d, key: %s, curl %s NULL\n", ctx->put.state, ctx->object_key, (ctx->curl==NULL)?"is":"is not");
ctx->put.close_state = true;//仅设置状态,并非真正关闭;内部状态机轮转结束后再关闭
if(ctx->fail_state)
{
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
switch(ctx->put.state)
{
case PUT_STATE_START: //此时形同完整一次性上传
if(sessions_exceeds_limit(ctx->instance, ctx->locate))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
if(ctx->locate == OBJECT_IN_HOS)
{
return http_put_complete_part_evbuf(ctx, callback);
}
else
{
return redis_put_complete_part_evbuf(ctx, ctx->put.object_size, callback);
}
break;
case PUT_STATE_PART:
if(ctx->curl == NULL)
{
ctx->put.upload_length = evbuffer_get_length(ctx->put.evbuf);
if(ctx->put.upload_length == 0)
{
if(cache_kick_combine_minio(ctx))
{
ctx->put.state = PUT_STATE_END;
}
else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx);
return -1;
}
}
else if(http_put_bodypart_request_evbuf(ctx, false) <= 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
if(cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
}
else
{
tango_cache_ctx_destroy(ctx);
return -1;
}
}
}
break;
case PUT_STATE_END: assert(0); //用户主动调用end时不可能处于此状态
case PUT_STATE_WAIT_START: //此时未获取到uploadId所以无法触发上传
default: break;
}
return 0;
}
void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
DBG_CACHE("state: %d, key: %s\n", ctx->put.state, ctx->object_key);
switch(ctx->put.state)
{
case PUT_STATE_WAIT_START:
if(res!=CURLE_OK||res_code!=200L|| ctx->fail_state || !parse_uploadID_xml(ctx->response.buff, ctx->response.len, &ctx->put.uploadID))
{
easy_string_destroy(&ctx->response);
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
if(ctx->put.close_state)
{
tango_cache_ctx_destroy(ctx);
}
}
else
{
easy_string_destroy(&ctx->response);
ctx->put.state = PUT_STATE_PART;
if(ctx->put.close_state)
{
do_tango_cache_update_end(ctx, true);
}
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
if(upload_length >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
}
}
break;
case PUT_STATE_PART:
if(res != CURLE_OK || res_code!=200L)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
if(ctx->fail_state)
{
if(cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
}
else if(ctx->put.close_state)
{
tango_cache_ctx_destroy(ctx);
}
}
else if(ctx->put.close_state)
{
do_tango_cache_update_end(ctx, true);
}
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
if(upload_length >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
}
break;
case PUT_STATE_CANCEL: //等待关闭
if(ctx->put.close_state)
{
tango_cache_ctx_destroy(ctx);
}
break;
case PUT_STATE_END:
if(res != CURLE_OK || res_code!=200L)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
if(ctx->instance->param->object_store_way!=CACHE_ALL_HOS && !ctx->fail_state)
{
redis_put_minio_object_meta(ctx, true);
}
else
{
tango_cache_ctx_destroy(ctx);
}
break;
default: break;
}
}
int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
{
UNUSED CURLMcode rc;
char minio_url[256], buffer[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
tango_cache_ctx_destroy(ctx, callback);
if(way == PUT_MEM_FREE) free((void *)data);
ctx->instance->statistic.memory_used -= size;
return -1;
}
ctx->put.state = PUT_STATE_END;
snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
if(way == PUT_MEM_COPY)
{
ctx->put.once_request.buff = (char *)malloc(size);
memcpy(ctx->put.once_request.buff, data, size);
}
else
{
ctx->put.once_request.buff = (char *)data;
}
ctx->put.once_request.size = size;
ctx->put.once_request.len = 0;
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 0;
}
int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
{
ctx->instance->statistic.put_recv_num += 1;
ctx->instance->statistic.memory_used += size;
ctx->instance->error_code = CACHE_OK;
if(ctx->locate == OBJECT_IN_HOS)
{
return http_put_complete_part_data(ctx, way, data, size, false);
}
else
{
return redis_put_complete_part_data(ctx, way, data, size, false);
}
}
int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, bool callback)
{
size_t size;
ctx->instance->statistic.put_recv_num += 1;
ctx->instance->error_code = CACHE_OK;
if(way == EVBUFFER_MOVE)
{
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
}
else
{
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
}
size = evbuffer_get_length(ctx->put.evbuf);
ctx->instance->statistic.memory_used += size;
if(ctx->locate == OBJECT_IN_HOS)
{
return http_put_complete_part_evbuf(ctx, callback);
}
else
{
return redis_put_complete_part_evbuf(ctx, size, callback);
}
}
void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
tango_cache_ctx_destroy(ctx);
}
void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
u_int32_t errnum=0;
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
ctx->del.fail_num = ctx->del.succ_num;
ctx->del.succ_num = 0;
}
else
{
if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE))
{
ctx->del.fail_num = ctx->del.succ_num;
ctx->del.succ_num = 0;
}
else
{
ctx->del.fail_num = errnum;
ctx->del.succ_num -= errnum;
}
if(ctx->del.fail_num > 0)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
}
}
tango_cache_ctx_destroy(ctx);
}
int do_tango_cache_multi_delete(struct tango_cache_ctx *ctx, bool callback)
{
UNUSED CURLMcode rc;
char minio_url[256], buffer[256];
ctx->instance->statistic.del_recv_num += ctx->del.succ_num;
ctx->instance->error_code = CACHE_OK;
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_ctx_destroy(ctx, callback);
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //填充Content-Length在CURLOPT_COPYPOSTFIELDS之前设置
curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
easy_string_destroy(&ctx->response);
return 0;
}
bool fetch_header_over_biz(struct tango_cache_ctx *ctx)
{
if(ctx->get.need_hdrs!=RESPONSE_HDR_ALL) //无Expires时
{
tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
ctx->get.state = GET_STATE_DELETE;
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
return false;
}
if(ctx->get.response_tag.len > 0)
{
ctx->get.result.data_frag = ctx->get.response_tag.buff;
ctx->get.result.size = ctx->get.response_tag.len;
ctx->get.result.type = RESULT_TYPE_USERTAG;
promise_success(ctx->promise, &ctx->get.result);
easy_string_destroy(&ctx->get.response_tag);
}
if(ctx->response.len > 0)
{
ctx->get.result.data_frag = ctx->response.buff;
ctx->get.result.size = ctx->response.len;
ctx->get.result.type = RESULT_TYPE_HEADER;
promise_success(ctx->promise, &ctx->get.result);
easy_string_destroy(&ctx->response);
}
return true;
}
static size_t curl_get_response_body_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE)
{
return size*count;
}
if(!fetch_header_over_biz(ctx))
{
return size*count;
}
ctx->get.result.data_frag = (const char *)ptr;
ctx->get.result.size = size * count;
ctx->get.result.type = RESULT_TYPE_BODY;
promise_success(ctx->promise, &ctx->get.result);
return size*count;
}
bool check_expires_fresh_header(struct tango_cache_ctx *ctx)
{
time_t now_gmt;
if(ctx->get.need_hdrs != RESPONSE_HDR_ALL)
return true;
now_gmt = get_gmtime_timestamp(time(NULL));
if(now_gmt > ctx->get.expires)
{
tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
ctx->get.state = GET_STATE_DELETE; //缓存失效时在下载完毕时触发删除动作
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
easy_string_destroy(&ctx->response);
return false;
}
if(ctx->get.last_modify+ctx->get.max_age > now_gmt || now_gmt+ctx->get.min_fresh>ctx->get.expires)
{
tango_cache_set_fail_state(ctx, CACHE_TIMEOUT);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
easy_string_destroy(&ctx->response);
return false;
}
return true;
}
static bool check_get_result_code(struct tango_cache_ctx *ctx, CURLcode code, long res_code)
{
if(code != CURLE_OK)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
return false;
}
if(res_code != 200L)
{
if(res_code == 404L)
{
tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
}
else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_INTERNAL);
promise_failed(ctx->promise, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
return false;
}
return true;
}
static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, void *userp)
{
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
char *start=(char *)ptr, *pos_colon;
size_t raw_len = size*count, hdrlen=size*count;
char usertag[2048];
size_t datalen;
if(ctx->fail_state || ctx->get.state==GET_STATE_DELETE)
{
return raw_len;
}
if(ctx->res_code == 0) //首次应答时先看应答码是否是200
{
UNUSED CURLcode code = curl_easy_getinfo(ctx->curl, CURLINFO_RESPONSE_CODE, &ctx->res_code);
if(!check_get_result_code(ctx, code, ctx->res_code))
{
return raw_len;
}
ctx->get.result.location = OBJECT_IN_HOS;
}
pos_colon = (char*)memchr(start, ':', raw_len);
if(pos_colon == NULL)
{
return raw_len;
}
datalen = pos_colon - start;
switch(datalen)
{
case 7:
if(strcmp_one_word_mesa_equal_len("expires", "EXPIRES", start, 7))
{
ctx->get.need_hdrs |= RESPONSE_HDR_EXPIRES;
ctx->get.expires = expires_hdr2timestamp(pos_colon + 1, raw_len - datalen - 1);
if(!check_expires_fresh_header(ctx))
{
return raw_len;
}
}
break;
case 13:
if(strcmp_one_word_mesa_equal_len("x-amz-meta-lm", "X-AMZ-META-LM", start, 13))
{
ctx->get.need_hdrs |= RESPONSE_HDR_LAST_MOD;
sscanf(pos_colon+1, "%lu", &ctx->get.last_modify);
if(!check_expires_fresh_header(ctx))
{
return raw_len;
}
}
break;
case 15:
if(strcmp_one_word_mesa_equal_len("x-amz-meta-user", "X-AMZ-META-USER", start, 15))
{
if((hdrlen = Base64_DecodeBlock((unsigned char*)pos_colon+1, raw_len-datalen-1, (unsigned char*)usertag, 2048))>0)
{
easy_string_savedata(&ctx->get.response_tag, usertag, hdrlen);
}
}
break;
case 14:
if(strcmp_one_word_mesa_equal_len("content-length", "CONTENT-LENGTH", start, 14))
{
sscanf(pos_colon+1, "%lu", &ctx->get.result.tlength);
}
break;
case 11: if(strcmp_one_word_mesa_equal_len("content-md5", "CONTENT-MD5", start, 11)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
case 12: if(strcmp_one_word_mesa_equal_len("content-type", "CONTENT-TYPE", start, 12)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
case 16: if(strcmp_one_word_mesa_equal_len("content-encoding", "CONTENT-ENCODING", start, 16)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
case 19: if(strcmp_one_word_mesa_equal_len("content-disposition", "CONTENT-DISPOSITION", start, 19)) easy_string_savedata(&ctx->response, (const char*)ptr, raw_len); break;
default: break;
}
return raw_len;
}
void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
{
switch(ctx->get.state)
{
case GET_STATE_START:
if(!ctx->fail_state && check_get_result_code(ctx, res, res_code))
{
if(ctx->method!=CACHE_REQUEST_HEAD || fetch_header_over_biz(ctx)) //HEAD发现的字段不全先不删正常情况下无
{
ctx->get.result.type = RESULT_TYPE_END;
promise_success(ctx->promise, &ctx->get.result);
promise_finish(ctx->promise);
}
}
tango_cache_ctx_destroy(ctx);
break;
case GET_STATE_DELETE:
ctx->get.state = GET_STATE_END;
cache_delete_minio_object(ctx);
break;
case GET_STATE_END:
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;
}
}
static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx)
{
UNUSED CURLMcode rc;
char minio_url[256], buffer[256];
if(NULL == (ctx->curl=curl_easy_init()))
{
tango_cache_ctx_destroy(ctx);
return -1;
}
snprintf(minio_url, 256, "http://%s/%s", ctx->hostaddr, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
if(ctx->method == CACHE_REQUEST_HEAD)
{
curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L);
}
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_get_response_body_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
ctx->headers = curl_slist_append(ctx->headers, buffer);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
return 1;
}
static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx)
{
struct promise *p = ctx->promise;
ctx->get.state = GET_STATE_START;
ctx->locate = OBJECT_IN_HOS;
if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions)
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
promise_failed(p, FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
}
else if(tango_cache_fetch_minio(ctx) != 1)
{
promise_failed(p, FUTURE_ERROR_CANCEL, "tango_cache_fetch_minio failed");
}
}
int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_get)
{
ctx->instance->statistic.get_recv_num += 1;
switch(where_to_get)
{
case OBJECT_IN_HOS:
ctx->locate = OBJECT_IN_HOS;
return (tango_cache_fetch_minio(ctx)==1)?0:-2;
case OBJECT_IN_REDIS:
ctx->locate = OBJECT_IN_REDIS;
return tango_cache_fetch_redis(ctx);
default:
ctx->get.redis_redirect_minio_cb = redis_redirect_object2minio_cb;
return tango_cache_try_fetch_redis(ctx);
}
return 0;
}
int do_tango_cache_head_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATION where_to_head)
{
ctx->instance->statistic.get_recv_num += 1;
if(where_to_head == OBJECT_IN_REDIS)
{
return tango_cache_head_redis(ctx);
}
else
{
return (tango_cache_fetch_minio(ctx)==1)?0:-1;
}
}