【1】修复HTTP Expect头部缺失时POST卡顿的问题;
【2】TODO:尝试增加multiple delete objects API,尚未成功(AccessDenied);
This commit is contained in:
BIN
cache/pangu_tango_cache.a
vendored
BIN
cache/pangu_tango_cache.a
vendored
Binary file not shown.
91
cache/tango_cache_client.cpp
vendored
91
cache/tango_cache_client.cpp
vendored
@@ -9,16 +9,34 @@
|
|||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <openssl/sha.h>
|
#include <openssl/sha.h>
|
||||||
|
#include <openssl/md5.h>
|
||||||
|
|
||||||
#include <MESA/MESA_prof_load.h>
|
#include <MESA/MESA_prof_load.h>
|
||||||
|
|
||||||
#include "tango_cache_client_in.h"
|
#include "tango_cache_client_in.h"
|
||||||
#include "tango_cache_transfer.h"
|
#include "tango_cache_transfer.h"
|
||||||
#include "tango_cache_tools.h"
|
#include "tango_cache_tools.h"
|
||||||
|
#include "tango_cache_xml.h"
|
||||||
|
|
||||||
int TANGO_CACHE_VERSION_20180925=0;
|
int TANGO_CACHE_VERSION_20181009=0;
|
||||||
|
|
||||||
static void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
|
static int caculate_base64_md5(const char *data, unsigned long len, unsigned char *result, unsigned int size)
|
||||||
|
{
|
||||||
|
MD5_CTX c;
|
||||||
|
unsigned char md5[17]={0};
|
||||||
|
|
||||||
|
if(size < 33)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
MD5_Init(&c);
|
||||||
|
MD5_Update(&c, data, len);
|
||||||
|
MD5_Final(md5, &c);
|
||||||
|
|
||||||
|
Base64_EncodeBlock(md5, 16, result);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
|
||||||
{
|
{
|
||||||
SHA256_CTX c;
|
SHA256_CTX c;
|
||||||
unsigned char sha256[128];
|
unsigned char sha256[128];
|
||||||
@@ -101,12 +119,12 @@ void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path,
|
|||||||
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
|
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic)
|
static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic)
|
||||||
{
|
{
|
||||||
switch(method)
|
switch(ctx->method)
|
||||||
{
|
{
|
||||||
case CACHE_REQUEST_PUT:
|
case CACHE_REQUEST_PUT:
|
||||||
if(fail_state)
|
if(ctx->fail_state)
|
||||||
{
|
{
|
||||||
statistic->put_error_num += 1;
|
statistic->put_error_num += 1;
|
||||||
}
|
}
|
||||||
@@ -116,9 +134,9 @@ static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state,
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CACHE_REQUEST_GET:
|
case CACHE_REQUEST_GET:
|
||||||
if(fail_state)
|
if(ctx->fail_state)
|
||||||
{
|
{
|
||||||
if(error_code == CACHE_ERR_CURL)
|
if(ctx->error_code == CACHE_ERR_CURL)
|
||||||
statistic->get_error_num += 1;
|
statistic->get_error_num += 1;
|
||||||
else
|
else
|
||||||
statistic->get_miss_num += 1;
|
statistic->get_miss_num += 1;
|
||||||
@@ -129,7 +147,7 @@ static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state,
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CACHE_REQUEST_DELETE:
|
case CACHE_REQUEST_DELETE:
|
||||||
if(fail_state)
|
if(ctx->fail_state)
|
||||||
{
|
{
|
||||||
statistic->del_error_num += 1;
|
statistic->del_error_num += 1;
|
||||||
}
|
}
|
||||||
@@ -138,6 +156,10 @@ static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state,
|
|||||||
statistic->del_succ_num += 1;
|
statistic->del_succ_num += 1;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case CACHE_REQUEST_DELETE_MUL:
|
||||||
|
statistic->del_succ_num += ctx->del.succ_num;
|
||||||
|
statistic->del_error_num += ctx->del.fail_num;
|
||||||
|
break;
|
||||||
default:break;
|
default:break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -185,7 +207,6 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
|
|||||||
case CACHE_REQUEST_PUT:
|
case CACHE_REQUEST_PUT:
|
||||||
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
|
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
|
||||||
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
|
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
|
||||||
if(ctx->headers != NULL) curl_slist_free_all(ctx->headers);
|
|
||||||
if(ctx->put.evbuf!=NULL)
|
if(ctx->put.evbuf!=NULL)
|
||||||
{
|
{
|
||||||
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
|
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
|
||||||
@@ -197,6 +218,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
|
|||||||
free(etag->etag);
|
free(etag->etag);
|
||||||
free(etag);
|
free(etag);
|
||||||
}//no break here
|
}//no break here
|
||||||
|
case CACHE_REQUEST_DELETE_MUL:
|
||||||
|
if(ctx->headers != NULL)
|
||||||
|
{
|
||||||
|
curl_slist_free_all(ctx->headers);
|
||||||
|
}//no break here
|
||||||
case CACHE_REQUEST_DELETE:
|
case CACHE_REQUEST_DELETE:
|
||||||
if(ctx->future != NULL)
|
if(ctx->future != NULL)
|
||||||
{
|
{
|
||||||
@@ -212,7 +238,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
|
|||||||
break;
|
break;
|
||||||
default: break;
|
default: break;
|
||||||
}
|
}
|
||||||
update_statistics(ctx->method, ctx->fail_state, ctx->error_code, &ctx->instance->statistic);
|
update_statistics(ctx, &ctx->instance->statistic);
|
||||||
free(ctx);
|
free(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,7 +359,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
|||||||
{
|
{
|
||||||
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
|
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
|
||||||
}
|
}
|
||||||
//ctx->headers = curl_slist_append(ctx->headers, "Expect:"); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӣ<EFBFBD>curl_multi_socket_action<EFBFBD>Ῠס
|
ctx->headers = curl_slist_append(ctx->headers, "Expect:");//ע<EFBFBD><EFBFBD>POST<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Expect<EFBFBD><EFBFBD>ϵ<EFBFBD><EFBFBD>Ҫ<EFBFBD><EFBFBD>ȷ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>CURLOPT_POSTFIELDSIZE
|
||||||
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD>GETʱ<54><CAB1>ԭ<EFBFBD><D4AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD>GETʱ<54><CAB1>ԭ<EFBFBD><D4AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||||||
if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
|
if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
|
||||||
{
|
{
|
||||||
@@ -484,6 +510,46 @@ int tango_cache_delete_object(struct tango_cache_instance *instance, struct futu
|
|||||||
return (cache_delete_minio_object(ctx)==1)?0:-1;
|
return (cache_delete_minio_object(ctx)==1)?0:-1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num)
|
||||||
|
{
|
||||||
|
struct tango_cache_ctx *ctx;
|
||||||
|
char md5[48]={0}, content_md5[48];
|
||||||
|
|
||||||
|
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
|
||||||
|
ctx->instance = instance;
|
||||||
|
ctx->future = future;
|
||||||
|
ctx->method = CACHE_REQUEST_DELETE_MUL;
|
||||||
|
ctx->del.succ_num = num;
|
||||||
|
|
||||||
|
if(wired_load_balancer_lookup(instance->wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
|
||||||
|
{
|
||||||
|
instance->error_code = CACHE_ERR_WIREDLB;
|
||||||
|
instance->statistic.totaldrop_num += num;
|
||||||
|
free(ctx);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
construct_multiple_delete_xml(ctx->instance->bucketname, objlist, num, instance->hash_object_key, &ctx->response.buff, &ctx->response.size);
|
||||||
|
caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48);
|
||||||
|
sprintf(content_md5, "Content-MD5: %s", md5);
|
||||||
|
ctx->headers = curl_slist_append(ctx->headers, content_md5);
|
||||||
|
ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO: AccessDenied
|
||||||
|
int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num)
|
||||||
|
{
|
||||||
|
struct tango_cache_ctx *ctx;
|
||||||
|
|
||||||
|
ctx = tango_cache_multi_delete_prepare(instance, future, objlist, num);
|
||||||
|
if(ctx == NULL)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return tango_cache_multi_delete_start(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
static void check_multi_info(CURLM *multi)
|
static void check_multi_info(CURLM *multi)
|
||||||
{
|
{
|
||||||
CURLMsg *msg;
|
CURLMsg *msg;
|
||||||
@@ -520,6 +586,9 @@ static void check_multi_info(CURLM *multi)
|
|||||||
case CACHE_REQUEST_DELETE:
|
case CACHE_REQUEST_DELETE:
|
||||||
tango_cache_curl_del_done(ctx, res, res_code);
|
tango_cache_curl_del_done(ctx, res, res_code);
|
||||||
break;
|
break;
|
||||||
|
case CACHE_REQUEST_DELETE_MUL:
|
||||||
|
tango_cache_curl_muldel_done(ctx, res, res_code);
|
||||||
|
break;
|
||||||
default: break;
|
default: break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
10
cache/tango_cache_client_in.h
vendored
10
cache/tango_cache_client_in.h
vendored
@@ -19,6 +19,7 @@ enum CACHE_REQUEST_METHOD
|
|||||||
CACHE_REQUEST_GET=0,
|
CACHE_REQUEST_GET=0,
|
||||||
CACHE_REQUEST_PUT,
|
CACHE_REQUEST_PUT,
|
||||||
CACHE_REQUEST_DELETE,
|
CACHE_REQUEST_DELETE,
|
||||||
|
CACHE_REQUEST_DELETE_MUL,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum GET_OBJECT_STATE
|
enum GET_OBJECT_STATE
|
||||||
@@ -98,6 +99,12 @@ struct cache_ctx_data_put
|
|||||||
bool close_state; //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ùر<C3B9>
|
bool close_state; //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ùر<C3B9>
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct cache_ctx_multi_delete
|
||||||
|
{
|
||||||
|
u_int32_t succ_num;
|
||||||
|
u_int32_t fail_num;
|
||||||
|
};
|
||||||
|
|
||||||
struct tango_cache_ctx
|
struct tango_cache_ctx
|
||||||
{
|
{
|
||||||
CURL *curl;
|
CURL *curl;
|
||||||
@@ -117,6 +124,7 @@ struct tango_cache_ctx
|
|||||||
union{
|
union{
|
||||||
struct cache_ctx_data_put put;
|
struct cache_ctx_data_put put;
|
||||||
struct cache_ctx_data_get get;
|
struct cache_ctx_data_get get;
|
||||||
|
struct cache_ctx_multi_delete del;
|
||||||
};
|
};
|
||||||
struct tango_cache_instance *instance;
|
struct tango_cache_instance *instance;
|
||||||
};
|
};
|
||||||
@@ -126,6 +134,8 @@ struct curl_socket_data
|
|||||||
struct event sock_event;
|
struct event sock_event;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size);
|
||||||
|
|
||||||
void easy_string_savedata(struct easy_string *estr, const char *data, size_t len);
|
void easy_string_savedata(struct easy_string *estr, const char *data, size_t len);
|
||||||
void easy_string_destroy(struct easy_string *estr);
|
void easy_string_destroy(struct easy_string *estr);
|
||||||
|
|
||||||
|
|||||||
79
cache/tango_cache_transfer.cpp
vendored
79
cache/tango_cache_transfer.cpp
vendored
@@ -141,7 +141,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void *userp)
|
static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp)
|
||||||
{
|
{
|
||||||
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
|
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
|
||||||
struct easy_string *estr = &ctx->response;
|
struct easy_string *estr = &ctx->response;
|
||||||
@@ -158,7 +158,6 @@ static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void
|
|||||||
if(code != CURLE_OK || ctx->res_code!=200L)
|
if(code != CURLE_OK || ctx->res_code!=200L)
|
||||||
{
|
{
|
||||||
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
if(code != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
|
|
||||||
return size*count;
|
return size*count;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -180,11 +179,12 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
|
|||||||
|
|
||||||
snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
|
snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
|
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //Ĭ<><C4AC>ʹ<EFBFBD>ûص<C3BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>fread<61><64><EFBFBD><EFBFBD><EFBFBD>Է<EFBFBD><D4B7>ֹر<D6B9>Expectʱ<74>ᵼ<EFBFBD>¿<EFBFBD><C2BF><EFBFBD>curl_multi_socket_action
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
|
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
|
||||||
|
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
|
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
|
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L);
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_write_uploadID_cb);
|
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
|
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||||
@@ -280,8 +280,8 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
|
|||||||
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
|
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
|
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
|
||||||
|
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
|
|
||||||
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //<2F><><EFBFBD><EFBFBD>Content-Length
|
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //<2F><><EFBFBD><EFBFBD>Content-Length
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
|
||||||
|
|
||||||
if(ctx->headers != NULL)
|
if(ctx->headers != NULL)
|
||||||
{
|
{
|
||||||
@@ -419,7 +419,6 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
|||||||
{
|
{
|
||||||
easy_string_destroy(&ctx->response);
|
easy_string_destroy(&ctx->response);
|
||||||
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
|
|
||||||
if(ctx->put.close_state)
|
if(ctx->put.close_state)
|
||||||
{
|
{
|
||||||
tango_cache_ctx_destroy(ctx);
|
tango_cache_ctx_destroy(ctx);
|
||||||
@@ -448,7 +447,6 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
|||||||
if(res != CURLE_OK || res_code!=200L)
|
if(res != CURLE_OK || res_code!=200L)
|
||||||
{
|
{
|
||||||
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
|
|
||||||
}
|
}
|
||||||
if(ctx->fail_state)
|
if(ctx->fail_state)
|
||||||
{
|
{
|
||||||
@@ -486,7 +484,6 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
|||||||
if(res != CURLE_OK || res_code!=200L)
|
if(res != CURLE_OK || res_code!=200L)
|
||||||
{
|
{
|
||||||
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error);
|
|
||||||
}
|
}
|
||||||
tango_cache_ctx_destroy(ctx);
|
tango_cache_ctx_destroy(ctx);
|
||||||
break;
|
break;
|
||||||
@@ -579,11 +576,77 @@ void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
|||||||
{
|
{
|
||||||
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
|
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
|
||||||
{
|
{
|
||||||
ctx->fail_state = true;
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
}
|
}
|
||||||
tango_cache_ctx_destroy(ctx);
|
tango_cache_ctx_destroy(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
|
||||||
|
{
|
||||||
|
u_int32_t errnum=0;
|
||||||
|
|
||||||
|
if(res!=CURLE_OK || (res_code!=204L && res_code!=200L ))
|
||||||
|
{
|
||||||
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
|
ctx->del.fail_num = ctx->del.succ_num;
|
||||||
|
ctx->del.succ_num = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE))
|
||||||
|
{
|
||||||
|
ctx->del.fail_num = ctx->del.succ_num;
|
||||||
|
ctx->del.succ_num = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ctx->del.fail_num = errnum;
|
||||||
|
ctx->del.succ_num -= errnum;
|
||||||
|
}
|
||||||
|
if(ctx->del.fail_num > 0)
|
||||||
|
{
|
||||||
|
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tango_cache_ctx_destroy(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx)
|
||||||
|
{
|
||||||
|
CURLMcode rc;
|
||||||
|
char minio_url[256];
|
||||||
|
|
||||||
|
ctx->instance->statistic.del_recv_num += ctx->del.succ_num;
|
||||||
|
ctx->instance->error_code = CACHE_OK;
|
||||||
|
if(NULL == (ctx->curl=curl_easy_init()))
|
||||||
|
{
|
||||||
|
tango_cache_ctx_destroy(ctx);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->bucketname);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //<2F><><EFBFBD><EFBFBD>Content-Length<74><68><EFBFBD><EFBFBD>CURLOPT_COPYPOSTFIELDS֮ǰ<D6AE><C7B0><EFBFBD><EFBFBD>
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache");
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L);
|
||||||
|
curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L);
|
||||||
|
|
||||||
|
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
|
||||||
|
assert(rc==CURLM_OK);
|
||||||
|
easy_string_destroy(&ctx->response);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
|
void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code)
|
||||||
{
|
{
|
||||||
switch(ctx->get.state)
|
switch(ctx->get.state)
|
||||||
|
|||||||
3
cache/tango_cache_transfer.h
vendored
3
cache/tango_cache_transfer.h
vendored
@@ -9,8 +9,11 @@
|
|||||||
void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
||||||
void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
||||||
void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
||||||
|
void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code);
|
||||||
|
|
||||||
int cache_delete_minio_object(struct tango_cache_ctx *ctx);
|
int cache_delete_minio_object(struct tango_cache_ctx *ctx);
|
||||||
|
int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx);
|
||||||
|
|
||||||
int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx);
|
int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx);
|
||||||
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len);
|
bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len);
|
||||||
|
|
||||||
|
|||||||
102
cache/tango_cache_xml.cpp
vendored
102
cache/tango_cache_xml.cpp
vendored
@@ -46,7 +46,7 @@ bool parse_uploadID_xml(const char *content, int len, char **uploadID)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len)
|
void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len)
|
||||||
{
|
{
|
||||||
struct multipart_etag_list *etag;
|
struct multipart_etag_list *etag;
|
||||||
xmlDoc *pdoc;
|
xmlDoc *pdoc;
|
||||||
@@ -68,6 +68,104 @@ int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len)
|
|||||||
|
|
||||||
xmlDocDumpFormatMemory(pdoc, (xmlChar **)xml, len, 1);
|
xmlDocDumpFormatMemory(pdoc, (xmlChar **)xml, len, 1);
|
||||||
xmlFreeDoc(pdoc);
|
xmlFreeDoc(pdoc);
|
||||||
return 0;
|
}
|
||||||
|
|
||||||
|
static void fill_multidelete_xml_errcode(xmlNode *error, char *out, int size)
|
||||||
|
{
|
||||||
|
xmlChar *errcode;
|
||||||
|
xmlNode *child = error->children;
|
||||||
|
|
||||||
|
while(child != NULL)
|
||||||
|
{
|
||||||
|
if(child->type != XML_ELEMENT_NODE || xmlStrcmp(child->name, (const xmlChar *)"Message"))
|
||||||
|
{
|
||||||
|
child = child->next;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
errcode = xmlNodeGetContent(child);
|
||||||
|
snprintf(out, size, "%s", (char *)errcode);
|
||||||
|
xmlFree(errcode);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool parse_multidelete_xml(const char *xml, int len, u_int32_t *errnum, char *errstr, int size)
|
||||||
|
{
|
||||||
|
xmlDoc *pdoc;
|
||||||
|
xmlNode *pcur;
|
||||||
|
int errornum=0;
|
||||||
|
|
||||||
|
if((pdoc = xmlParseMemory(xml, len)) == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if((pcur = xmlDocGetRootElement(pdoc)) == NULL)
|
||||||
|
{
|
||||||
|
xmlFreeDoc(pdoc);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
while(pcur->type != XML_ELEMENT_NODE)
|
||||||
|
pcur = pcur->next;
|
||||||
|
if(xmlStrcmp(pcur->name, (const xmlChar *)"DeleteResult"))
|
||||||
|
{
|
||||||
|
xmlFreeDoc(pdoc);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pcur = pcur->children;
|
||||||
|
while(pcur != NULL)
|
||||||
|
{
|
||||||
|
if(pcur->type != XML_ELEMENT_NODE || xmlStrcmp(pcur->name, (const xmlChar *)"Error"))
|
||||||
|
{
|
||||||
|
pcur = pcur->next;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if(errornum == 0)
|
||||||
|
{
|
||||||
|
fill_multidelete_xml_errcode(pcur, errstr, size);
|
||||||
|
}
|
||||||
|
errornum++;
|
||||||
|
pcur = pcur->next;
|
||||||
|
}
|
||||||
|
*errnum = errornum;
|
||||||
|
|
||||||
|
xmlFreeDoc(pdoc);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void construct_multiple_delete_xml(const char *bucket, char *key[], u_int32_t num, int is_hash, char **xml, size_t *len)
|
||||||
|
{
|
||||||
|
xmlDoc *pdoc;
|
||||||
|
xmlNode *root, *child;
|
||||||
|
int xmllen;
|
||||||
|
|
||||||
|
pdoc = xmlNewDoc((const xmlChar *)"1.0");
|
||||||
|
root = xmlNewNode(NULL, (const xmlChar *)"Delete");
|
||||||
|
xmlDocSetRootElement(pdoc, root);
|
||||||
|
|
||||||
|
xmlNewChild(root, NULL, (const xmlChar*)"Quiet", (const xmlChar*)"true");
|
||||||
|
|
||||||
|
if(is_hash)
|
||||||
|
{
|
||||||
|
char hashkey[72], sha256[72];
|
||||||
|
for(u_int32_t i=0; i<num; i++)
|
||||||
|
{
|
||||||
|
child = xmlNewChild(root, NULL, (const xmlChar*)"Object", NULL);
|
||||||
|
caculate_sha256(key[i], strlen(key[i]), sha256, 72);
|
||||||
|
snprintf(hashkey, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
|
||||||
|
xmlNewChild(child, NULL, (const xmlChar*)"Key", (const xmlChar*)hashkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for(u_int32_t i=0; i<num; i++)
|
||||||
|
{
|
||||||
|
child = xmlNewChild(root, NULL, (const xmlChar*)"Object", NULL);
|
||||||
|
xmlNewChild(child, NULL, (const xmlChar*)"Key", (const xmlChar*)key[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
xmlDocDumpFormatMemoryEnc(pdoc, (xmlChar **)xml, &xmllen, "UTF-8", 1);
|
||||||
|
xmlFreeDoc(pdoc);
|
||||||
|
*len = xmllen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
4
cache/tango_cache_xml.h
vendored
4
cache/tango_cache_xml.h
vendored
@@ -4,7 +4,9 @@
|
|||||||
#include "tango_cache_client_in.h"
|
#include "tango_cache_client_in.h"
|
||||||
|
|
||||||
bool parse_uploadID_xml(const char *content, int len, char **uploadID);
|
bool parse_uploadID_xml(const char *content, int len, char **uploadID);
|
||||||
int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len);
|
void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len);
|
||||||
|
void construct_multiple_delete_xml(const char *bucket, char *key[], u_int32_t num, int is_hash, char **xml, size_t *len);
|
||||||
|
bool parse_multidelete_xml(const char *xml, int len, u_int32_t *errnum, char *errstr, int size);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|||||||
41
cache/test_demo/tango_cache_test.c
vendored
41
cache/test_demo/tango_cache_test.c
vendored
@@ -97,6 +97,23 @@ void put_future_failed(enum e_future_error err, const char * what, void * user)
|
|||||||
free(pdata);
|
free(pdata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void del_future_success(future_result_t* result, void * user)
|
||||||
|
{
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
|
||||||
|
printf("DEL %s succ\n", pdata->filename);
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
}
|
||||||
|
void del_future_failed(enum e_future_error err, const char * what, void * user)
|
||||||
|
{
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
|
||||||
|
printf("DEL %s fail: %s\n", pdata->filename, what);
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
}
|
||||||
|
|
||||||
char * get_file_content(const char *filename, size_t *filelen_out)
|
char * get_file_content(const char *filename, size_t *filelen_out)
|
||||||
{
|
{
|
||||||
char *buffer;
|
char *buffer;
|
||||||
@@ -136,6 +153,8 @@ char * get_file_content(const char *filename, size_t *filelen_out)
|
|||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num);
|
||||||
|
|
||||||
static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
|
static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
|
||||||
{
|
{
|
||||||
char s[1024];
|
char s[1024];
|
||||||
@@ -144,6 +163,9 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
|
|||||||
FILE *input = (FILE *)arg;
|
FILE *input = (FILE *)arg;
|
||||||
static int index=0;
|
static int index=0;
|
||||||
char filename[128], method[16], buffer[1024], *p;
|
char filename[128], method[16], buffer[1024], *p;
|
||||||
|
char *dellist[16];
|
||||||
|
char *pstart, *save_ptr=NULL;
|
||||||
|
int delnum=0;
|
||||||
struct tango_cache_meta meta;
|
struct tango_cache_meta meta;
|
||||||
struct future_pdata *pdata;
|
struct future_pdata *pdata;
|
||||||
|
|
||||||
@@ -207,6 +229,25 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
|
|||||||
fclose(fp);
|
fclose(fp);
|
||||||
tango_cache_upload_once_evbuf(tango_instance, pdata->future, EVBUFFER_MOVE, evbuf, &meta, pdata->filename, 256);
|
tango_cache_upload_once_evbuf(tango_instance, pdata->future, EVBUFFER_MOVE, evbuf, &meta, pdata->filename, 256);
|
||||||
}
|
}
|
||||||
|
else if(!strcasecmp(p, "DEL"))
|
||||||
|
{
|
||||||
|
pdata->future = future_create(del_future_success, del_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
sprintf(pdata->filename, "%s", s);
|
||||||
|
tango_cache_delete_object(tango_instance, pdata->future, s);
|
||||||
|
}
|
||||||
|
else if(!strcasecmp(p, "DELMUL")) //TODO
|
||||||
|
{
|
||||||
|
pdata->future = future_create(del_future_success, del_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
sprintf(pdata->filename, "%s", s);
|
||||||
|
|
||||||
|
for(pstart = strtok_r(s, ";", &save_ptr); pstart != NULL; pstart = strtok_r(NULL, ";", &save_ptr))
|
||||||
|
{
|
||||||
|
dellist[delnum++] = pstart;
|
||||||
|
}
|
||||||
|
tango_cache_multi_delete(tango_instance, pdata->future, dellist, delnum);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||||
|
|||||||
Reference in New Issue
Block a user