diff --git a/cache/pangu_tango_cache.a b/cache/pangu_tango_cache.a index 20e2fea..b39903e 100644 Binary files a/cache/pangu_tango_cache.a and b/cache/pangu_tango_cache.a differ diff --git a/cache/tango_cache_client.cpp b/cache/tango_cache_client.cpp index 2938552..7e24969 100644 --- a/cache/tango_cache_client.cpp +++ b/cache/tango_cache_client.cpp @@ -9,16 +9,34 @@ #include #include #include +#include #include #include "tango_cache_client_in.h" #include "tango_cache_transfer.h" #include "tango_cache_tools.h" +#include "tango_cache_xml.h" -int TANGO_CACHE_VERSION_20180925=0; +int TANGO_CACHE_VERSION_20181009=0; -static void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) +static int caculate_base64_md5(const char *data, unsigned long len, unsigned char *result, unsigned int size) +{ + MD5_CTX c; + unsigned char md5[17]={0}; + + if(size < 33) + return -1; + + MD5_Init(&c); + MD5_Update(&c, data, len); + MD5_Final(md5, &c); + + Base64_EncodeBlock(md5, 16, result); + return 0; +} + +void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) { SHA256_CTX c; unsigned char sha256[128]; @@ -101,12 +119,12 @@ void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); } -static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic) +static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic) { - switch(method) + switch(ctx->method) { case CACHE_REQUEST_PUT: - if(fail_state) + if(ctx->fail_state) { statistic->put_error_num += 1; } @@ -116,9 +134,9 @@ static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, } break; case CACHE_REQUEST_GET: - if(fail_state) + if(ctx->fail_state) { - if(error_code == CACHE_ERR_CURL) + if(ctx->error_code == CACHE_ERR_CURL) statistic->get_error_num += 1; else statistic->get_miss_num += 1; @@ -129,7 +147,7 @@ static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, } break; case CACHE_REQUEST_DELETE: - if(fail_state) + if(ctx->fail_state) { statistic->del_error_num += 1; } @@ -138,6 +156,10 @@ static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, statistic->del_succ_num += 1; } break; + case CACHE_REQUEST_DELETE_MUL: + statistic->del_succ_num += ctx->del.succ_num; + statistic->del_error_num += ctx->del.fail_num; + break; default:break; } } @@ -185,7 +207,6 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) case CACHE_REQUEST_PUT: if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); - if(ctx->headers != NULL) curl_slist_free_all(ctx->headers); if(ctx->put.evbuf!=NULL) { ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); @@ -197,6 +218,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) free(etag->etag); free(etag); }//no break here + case CACHE_REQUEST_DELETE_MUL: + if(ctx->headers != NULL) + { + curl_slist_free_all(ctx->headers); + }//no break here case CACHE_REQUEST_DELETE: if(ctx->future != NULL) { @@ -212,7 +238,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) break; default: break; } - update_statistics(ctx->method, ctx->fail_state, ctx->error_code, &ctx->instance->statistic); + update_statistics(ctx, &ctx->instance->statistic); free(ctx); } @@ -333,7 +359,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); } - //ctx->headers = curl_slist_append(ctx->headers, "Expect:"); //不可以添加?curl_multi_socket_action会卡住 + ctx->headers = curl_slist_append(ctx->headers, "Expect:");//注意POST方法与Expect关系,要明确给出CURLOPT_POSTFIELDSIZE //其他定义的头部,GET时会原样返回 if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) { @@ -484,6 +510,46 @@ int tango_cache_delete_object(struct tango_cache_instance *instance, struct futu return (cache_delete_minio_object(ctx)==1)?0:-1; } +struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num) +{ + struct tango_cache_ctx *ctx; + char md5[48]={0}, content_md5[48]; + + ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); + ctx->instance = instance; + ctx->future = future; + ctx->method = CACHE_REQUEST_DELETE_MUL; + ctx->del.succ_num = num; + + if(wired_load_balancer_lookup(instance->wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) + { + instance->error_code = CACHE_ERR_WIREDLB; + instance->statistic.totaldrop_num += num; + free(ctx); + return NULL; + } + + construct_multiple_delete_xml(ctx->instance->bucketname, objlist, num, instance->hash_object_key, &ctx->response.buff, &ctx->response.size); + caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48); + sprintf(content_md5, "Content-MD5: %s", md5); + ctx->headers = curl_slist_append(ctx->headers, content_md5); + ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); + return ctx; +} + +//TODO: AccessDenied +int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num) +{ + struct tango_cache_ctx *ctx; + + ctx = tango_cache_multi_delete_prepare(instance, future, objlist, num); + if(ctx == NULL) + { + return -1; + } + return tango_cache_multi_delete_start(ctx); +} + static void check_multi_info(CURLM *multi) { CURLMsg *msg; @@ -520,6 +586,9 @@ static void check_multi_info(CURLM *multi) case CACHE_REQUEST_DELETE: tango_cache_curl_del_done(ctx, res, res_code); break; + case CACHE_REQUEST_DELETE_MUL: + tango_cache_curl_muldel_done(ctx, res, res_code); + break; default: break; } } diff --git a/cache/tango_cache_client_in.h b/cache/tango_cache_client_in.h index 3c2ab00..9e855bf 100644 --- a/cache/tango_cache_client_in.h +++ b/cache/tango_cache_client_in.h @@ -19,6 +19,7 @@ enum CACHE_REQUEST_METHOD CACHE_REQUEST_GET=0, CACHE_REQUEST_PUT, CACHE_REQUEST_DELETE, + CACHE_REQUEST_DELETE_MUL, }; enum GET_OBJECT_STATE @@ -98,6 +99,12 @@ struct cache_ctx_data_put bool close_state; //主动被调用关闭 }; +struct cache_ctx_multi_delete +{ + u_int32_t succ_num; + u_int32_t fail_num; +}; + struct tango_cache_ctx { CURL *curl; @@ -117,6 +124,7 @@ struct tango_cache_ctx union{ struct cache_ctx_data_put put; struct cache_ctx_data_get get; + struct cache_ctx_multi_delete del; }; struct tango_cache_instance *instance; }; @@ -126,6 +134,8 @@ struct curl_socket_data struct event sock_event; }; +void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size); + void easy_string_savedata(struct easy_string *estr, const char *data, size_t len); void easy_string_destroy(struct easy_string *estr); diff --git a/cache/tango_cache_transfer.cpp b/cache/tango_cache_transfer.cpp index 8945abb..55b0cb6 100644 --- a/cache/tango_cache_transfer.cpp +++ b/cache/tango_cache_transfer.cpp @@ -141,7 +141,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful return 1; } -static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void *userp) +static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, void *userp) { struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; struct easy_string *estr = &ctx->response; @@ -158,7 +158,6 @@ static size_t curl_write_uploadID_cb(void *ptr, size_t size, size_t count, void if(code != CURLE_OK || ctx->res_code!=200L) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(code != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); return size*count; } } @@ -180,11 +179,12 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //默认使用回调函数调用fread,测试发现关闭Expect时会导致卡在curl_multi_socket_action curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL,1L); - curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_write_uploadID_cb); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); @@ -280,8 +280,8 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); - curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml); curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length + curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml); if(ctx->headers != NULL) { @@ -419,7 +419,6 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r { easy_string_destroy(&ctx->response); tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); if(ctx->put.close_state) { tango_cache_ctx_destroy(ctx); @@ -448,7 +447,6 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r if(res != CURLE_OK || res_code!=200L) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); } if(ctx->fail_state) { @@ -486,7 +484,6 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r if(res != CURLE_OK || res_code!=200L) { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); - if(res != CURLE_OK) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "%s", ctx->error); } tango_cache_ctx_destroy(ctx); break; @@ -579,11 +576,77 @@ void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long r { if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) { - ctx->fail_state = true; + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); } tango_cache_ctx_destroy(ctx); } +void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) +{ + u_int32_t errnum=0; + + if(res!=CURLE_OK || (res_code!=204L && res_code!=200L )) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); + ctx->del.fail_num = ctx->del.succ_num; + ctx->del.succ_num = 0; + } + else + { + if(!parse_multidelete_xml(ctx->response.buff, ctx->response.len, &errnum, ctx->error, CURL_ERROR_SIZE)) + { + ctx->del.fail_num = ctx->del.succ_num; + ctx->del.succ_num = 0; + } + else + { + ctx->del.fail_num = errnum; + ctx->del.succ_num -= errnum; + } + if(ctx->del.fail_num > 0) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); + } + } + tango_cache_ctx_destroy(ctx); +} + +int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx) +{ + CURLMcode rc; + char minio_url[256]; + + ctx->instance->statistic.del_recv_num += ctx->del.succ_num; + ctx->instance->error_code = CACHE_OK; + if(NULL == (ctx->curl=curl_easy_init())) + { + tango_cache_ctx_destroy(ctx); + return -1; + } + + snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->bucketname); + curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //填充Content-Length,在CURLOPT_COPYPOSTFIELDS之前设置 + curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff); + curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + curl_easy_setopt(ctx->curl, CURLOPT_USERAGENT, "aws-sdk-cpp/1.5.24 Linux/3.10.0-327.el7.x86_64 x86_64 pangu_cache"); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); + curl_easy_setopt(ctx->curl, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); + curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_ERRORBUFFER, ctx->error); + curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(ctx->curl, CURLOPT_CONNECTTIMEOUT_MS, 500L); + curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_TIME, 2L); + curl_easy_setopt(ctx->curl, CURLOPT_LOW_SPEED_LIMIT, 100L); + + rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); + assert(rc==CURLM_OK); + easy_string_destroy(&ctx->response); + return 0; +} + void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code) { switch(ctx->get.state) diff --git a/cache/tango_cache_transfer.h b/cache/tango_cache_transfer.h index 1f1d5b5..f976fd1 100644 --- a/cache/tango_cache_transfer.h +++ b/cache/tango_cache_transfer.h @@ -9,8 +9,11 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); void tango_cache_curl_del_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); +void tango_cache_curl_muldel_done(struct tango_cache_ctx *ctx, CURLcode res, long res_code); int cache_delete_minio_object(struct tango_cache_ctx *ctx); +int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx); + int cache_kick_upload_minio_end(struct tango_cache_ctx *ctx); bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block_len); diff --git a/cache/tango_cache_xml.cpp b/cache/tango_cache_xml.cpp index d7d7a80..2b26b4e 100644 --- a/cache/tango_cache_xml.cpp +++ b/cache/tango_cache_xml.cpp @@ -46,7 +46,7 @@ bool parse_uploadID_xml(const char *content, int len, char **uploadID) return false; } -int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) +void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) { struct multipart_etag_list *etag; xmlDoc *pdoc; @@ -68,6 +68,104 @@ int construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) xmlDocDumpFormatMemory(pdoc, (xmlChar **)xml, len, 1); xmlFreeDoc(pdoc); - return 0; +} + +static void fill_multidelete_xml_errcode(xmlNode *error, char *out, int size) +{ + xmlChar *errcode; + xmlNode *child = error->children; + + while(child != NULL) + { + if(child->type != XML_ELEMENT_NODE || xmlStrcmp(child->name, (const xmlChar *)"Message")) + { + child = child->next; + continue; + } + errcode = xmlNodeGetContent(child); + snprintf(out, size, "%s", (char *)errcode); + xmlFree(errcode); + break; + } +} + +bool parse_multidelete_xml(const char *xml, int len, u_int32_t *errnum, char *errstr, int size) +{ + xmlDoc *pdoc; + xmlNode *pcur; + int errornum=0; + + if((pdoc = xmlParseMemory(xml, len)) == NULL) + { + return false; + } + if((pcur = xmlDocGetRootElement(pdoc)) == NULL) + { + xmlFreeDoc(pdoc); + return false; + } + + while(pcur->type != XML_ELEMENT_NODE) + pcur = pcur->next; + if(xmlStrcmp(pcur->name, (const xmlChar *)"DeleteResult")) + { + xmlFreeDoc(pdoc); + return false; + } + pcur = pcur->children; + while(pcur != NULL) + { + if(pcur->type != XML_ELEMENT_NODE || xmlStrcmp(pcur->name, (const xmlChar *)"Error")) + { + pcur = pcur->next; + continue; + } + if(errornum == 0) + { + fill_multidelete_xml_errcode(pcur, errstr, size); + } + errornum++; + pcur = pcur->next; + } + *errnum = errornum; + + xmlFreeDoc(pdoc); + return true; +} + +void construct_multiple_delete_xml(const char *bucket, char *key[], u_int32_t num, int is_hash, char **xml, size_t *len) +{ + xmlDoc *pdoc; + xmlNode *root, *child; + int xmllen; + + pdoc = xmlNewDoc((const xmlChar *)"1.0"); + root = xmlNewNode(NULL, (const xmlChar *)"Delete"); + xmlDocSetRootElement(pdoc, root); + + xmlNewChild(root, NULL, (const xmlChar*)"Quiet", (const xmlChar*)"true"); + + if(is_hash) + { + char hashkey[72], sha256[72]; + for(u_int32_t i=0; ifilename); + future_destroy(pdata->future); + free(pdata); +} +void del_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s fail: %s\n", pdata->filename, what); + future_destroy(pdata->future); + free(pdata); +} + char * get_file_content(const char *filename, size_t *filelen_out) { char *buffer; @@ -136,6 +153,8 @@ char * get_file_content(const char *filename, size_t *filelen_out) return buffer; } +int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num); + static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) { char s[1024]; @@ -144,6 +163,9 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) FILE *input = (FILE *)arg; static int index=0; char filename[128], method[16], buffer[1024], *p; + char *dellist[16]; + char *pstart, *save_ptr=NULL; + int delnum=0; struct tango_cache_meta meta; struct future_pdata *pdata; @@ -207,6 +229,25 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) fclose(fp); tango_cache_upload_once_evbuf(tango_instance, pdata->future, EVBUFFER_MOVE, evbuf, &meta, pdata->filename, 256); } + else if(!strcasecmp(p, "DEL")) + { + pdata->future = future_create(del_future_success, del_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + sprintf(pdata->filename, "%s", s); + tango_cache_delete_object(tango_instance, pdata->future, s); + } + else if(!strcasecmp(p, "DELMUL")) //TODO + { + pdata->future = future_create(del_future_success, del_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + sprintf(pdata->filename, "%s", s); + + for(pstart = strtok_r(s, ";", &save_ptr); pstart != NULL; pstart = strtok_r(NULL, ";", &save_ptr)) + { + dellist[delnum++] = pstart; + } + tango_cache_multi_delete(tango_instance, pdata->future, dellist, delnum); + } else { pdata->future = future_create(put_future_success, put_future_failed, pdata);