#include #include #include #include #include #include #include #include #include #include #include #include #include "tango_cache_client_in.h" #include "tango_cache_transfer.h" #include "tango_cache_tools.h" int TANGO_CACHE_VERSION_20180925=0; void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) { SHA256_CTX c; unsigned char sha256[128]; SHA256_Init(&c); SHA256_Update(&c, data, len); SHA256_Final(sha256, &c); for(u_int32_t i=0; i<32 && ierror_code; } enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance) { return instance->error_code; } void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out) { out->get_recv_num = instance->statistic.get_recv_num; out->get_succ_num = instance->statistic.get_succ_num; out->get_error_num= instance->statistic.get_error_num; out->get_miss_num = instance->statistic.get_miss_num; out->put_recv_num = instance->statistic.put_recv_num; out->put_succ_num = instance->statistic.put_succ_num; out->put_error_num= instance->statistic.put_error_num; out->del_recv_num = instance->statistic.del_recv_num; out->del_succ_num = instance->statistic.del_succ_num; out->del_error_num= instance->statistic.del_error_num; out->session_num = instance->statistic.session_num; out->memory_used = instance->statistic.memory_used; } struct tango_cache_result *tango_cache_read_result(void *promise_result) { return (struct tango_cache_result *)promise_result; } const char *tango_cache_get_object_key(struct tango_cache_ctx *ctx) { return ctx->object_key; } static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic) { switch(method) { case CACHE_REQUEST_PUT: if(fail_state) { statistic->put_error_num += 1; } else { statistic->put_succ_num += 1; } break; case CACHE_REQUEST_GET: if(fail_state) { if(error_code == CACHE_ERR_CURL) statistic->get_error_num += 1; else statistic->get_miss_num += 1; } else { statistic->get_succ_num += 1; } break; case CACHE_REQUEST_DELETE: if(fail_state) { statistic->del_error_num += 1; } else { statistic->del_succ_num += 1; } break; default:break; } } void easy_string_destroy(struct easy_string *estr) { if(estr->buff != NULL) { free(estr->buff); estr->buff = NULL; estr->len = estr->size = 0; } } void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) { if(estr->size-estr->len < len+1) { estr->size += len*4+1; estr->buff = (char*)realloc(estr->buff, estr->size); } memcpy(estr->buff+estr->len, data, len); estr->len += len; estr->buff[estr->len]='\0'; } void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) { struct multipart_etag_list *etag; if(ctx->curl != NULL) { curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl); curl_easy_cleanup(ctx->curl); } easy_string_destroy(&ctx->response); switch(ctx->method) { case CACHE_REQUEST_GET: easy_string_destroy(&ctx->get.response_tag); break; case CACHE_REQUEST_PUT: if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); if(ctx->headers != NULL) curl_slist_free_all(ctx->headers); if(ctx->put.evbuf!=NULL) evbuffer_free(ctx->put.evbuf); TAILQ_FOREACH(etag, &ctx->put.etag_head, node) { TAILQ_REMOVE(&ctx->put.etag_head, etag, node); free(etag->etag); free(etag); }//no break here case CACHE_REQUEST_DELETE: if(ctx->future != NULL) { if(ctx->fail_state) { promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, ctx->error); } else { promise_success(future_to_promise(ctx->future), NULL); } } break; default: break; } update_statistics(ctx->method, ctx->fail_state, ctx->error_code, &ctx->instance->statistic); free(ctx); } void tango_cache_update_end(struct tango_cache_ctx *ctx) { cache_kick_upload_minio_end(ctx); } int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size) { if(ctx->fail_state) { return -1; } if(evbuffer_add(ctx->put.evbuf, data, size)) { return -1; } ctx->instance->statistic.memory_used += size; if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size) { cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); } return 0; } int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf) { size_t size; if(ctx->fail_state) { return -1; } size = evbuffer_get_length(evbuf); if(way == EVBUFFER_MOVE) { if(evbuffer_add_buffer(ctx->put.evbuf, evbuf)) { return -1; } } else { if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf)) { return -1; } } ctx->instance->statistic.memory_used += size; if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size) { cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); } return 0; } struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct tango_cache_ctx *ctx; char buffer[256]={0}; time_t expires, now, last_modify; if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size) { instance->error_code = CACHE_OUTOF_MEMORY; return NULL; } instance->statistic.put_recv_num += 1; instance->error_code = CACHE_OK; ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->future = future; ctx->method = CACHE_REQUEST_PUT; if(instance->hash_object_key) { caculate_sha256(meta->url, strlen(meta->url), buffer, 72); snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); } else { snprintf(ctx->object_key, 256, "%s", meta->url); } //Expires字段,用于缓存内部判定缓存是否超时 now = time(NULL); expires = (meta->put.timeout==0||meta->put.timeout>instance->relative_ttl)?instance->relative_ttl:meta->put.timeout; if(expires_timestamp2hdr_str(now + expires, buffer, 256)) { ctx->headers = curl_slist_append(ctx->headers, buffer); } //Last-Modify字段,用于GET时判定是否新鲜 last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified; if(last_modify == 0) { last_modify = get_gmtime_timestamp(now); } sprintf(buffer, "x-amz-meta-lm: %lu", last_modify); ctx->headers = curl_slist_append(ctx->headers, buffer); //列表中支持的标准头部 for(int i=0; istd_hdr[i] != NULL) { ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]); } } if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL) { ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); } //其他定义的头部,GET时会原样返回 if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) { char *p = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //计算编码后所需空间;18=17+1: 头部+字符串结束符 memcpy(p, "x-amz-meta-user: ", 17); Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)p+17); ctx->headers = curl_slist_append(ctx->headers, p); free(p); } return ctx; } struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct tango_cache_ctx *ctx; ctx = tango_cache_update_prepare(instance, future, meta); if(ctx == NULL) { return NULL; } ctx->put.evbuf = evbuffer_new(); TAILQ_INIT(&ctx->put.etag_head); return ctx; } int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *objectkey, size_t keysize) { struct tango_cache_ctx *ctx; ctx = tango_cache_update_prepare(instance, future, meta); if(ctx == NULL) { return -1; } if(objectkey != NULL) { snprintf(objectkey, keysize, "%s", ctx->object_key); } return tango_cache_upload_once_start_data(ctx, way, data, size); } int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* future, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta *meta, char *objectkey, size_t keysize) { struct tango_cache_ctx *ctx; ctx = tango_cache_update_prepare(instance, future, meta); if(ctx == NULL) { return -1; } if(objectkey != NULL) { snprintf(objectkey, keysize, "%s", ctx->object_key); } return tango_cache_upload_once_start_evbuf(ctx, way, evbuf); } struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct tango_cache_ctx *ctx; char sha256[72]={0}; ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->future = future; ctx->method = CACHE_REQUEST_GET; ctx->get.state = GET_STATE_START; ctx->get.max_age = meta->get.max_age; ctx->get.min_fresh = meta->get.min_fresh; instance->statistic.get_recv_num += 1; if(instance->hash_object_key) { caculate_sha256(meta->url, strlen(meta->url), sha256, 72); snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); } else { snprintf(ctx->object_key, 256, "%s", meta->url); } return ctx; } int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) { return tango_cache_fetch_start(tango_cache_fetch_prepare(instance, future, meta)); } struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey) { struct tango_cache_ctx *ctx; char sha256[72]={0}; ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx)); ctx->instance = instance; ctx->future = future; ctx->method = CACHE_REQUEST_DELETE; instance->statistic.del_recv_num += 1; if(instance->hash_object_key) { caculate_sha256(objkey, strlen(objkey), sha256, 72); snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); } else { snprintf(ctx->object_key, 256, "%s", objkey); } return ctx; } int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey) { return (cache_delete_minio_object(tango_cache_delete_prepare(instance, future, objkey))==1)?0:-1; } static void check_multi_info(CURLM *multi) { CURLMsg *msg; int msgs_left; struct tango_cache_ctx *ctx; CURL *easy; CURLcode res; long res_code; while((msg = curl_multi_info_read(multi, &msgs_left))) { if(msg->msg != CURLMSG_DONE) { continue; } easy = msg->easy_handle; res = msg->data.result; curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx); curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code); curl_multi_remove_handle(multi, easy); curl_easy_cleanup(easy); ctx->curl = NULL; ctx->res_code = 0; switch(ctx->method) { case CACHE_REQUEST_GET: tango_cache_curl_get_done(ctx, res, res_code); break; case CACHE_REQUEST_PUT: tango_cache_curl_put_done(ctx, res, res_code); break; case CACHE_REQUEST_DELETE: tango_cache_curl_del_done(ctx, res, res_code); break; default: break; } } } /* Called by libevent when we get action on a multi socket */ static void libevent_socket_event_cb(int fd, short action, void *userp) { struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from event_assign CURLMcode rc; int what, still_running; what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0); rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running); instance->statistic.session_num = still_running; assert(rc==CURLM_OK); check_multi_info(instance->multi_hd); if(still_running<=0 && evtimer_pending(&instance->timer_event, NULL)) { evtimer_del(&instance->timer_event); } } /* Called by libevent when our timeout expires */ static void libevent_timer_event_cb(int fd, short kind, void *userp) { struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; CURLMcode rc; int still_running; rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running); instance->statistic.session_num = still_running; assert(rc==CURLM_OK); check_multi_info(instance->multi_hd); } static int curl_socket_function_cb(CURL *curl, curl_socket_t sockfd, int what, void *userp, void *sockp) { struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from multi handle struct curl_socket_data *sockinfo = (struct curl_socket_data *)sockp; //curl_multi_assign, for socket int action; if(what == CURL_POLL_REMOVE) { if(sockinfo != NULL) { event_del(&sockinfo->sock_event); free(sockinfo); } } else { if(sockinfo == NULL) { sockinfo = (struct curl_socket_data *)calloc(1, sizeof(struct curl_socket_data)); curl_multi_assign(instance->multi_hd, sockfd, sockinfo); } else { event_del(&sockinfo->sock_event); } action = (what&CURL_POLL_IN?EV_READ:0)|(what&CURL_POLL_OUT?EV_WRITE:0)|EV_PERSIST; event_assign(&sockinfo->sock_event, instance->evbase, sockfd, action, libevent_socket_event_cb, instance); event_add(&sockinfo->sock_event, NULL); } return 0; } static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) { struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; struct timeval timeout; CURLMcode rc; int still_running; timeout.tv_sec = timeout_ms/1000; timeout.tv_usec = (timeout_ms%1000)*1000; if(timeout_ms == 0) { //timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once. //To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs. rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running); instance->statistic.session_num = still_running; assert(rc==CURLM_OK); } else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer. { evtimer_del(&instance->timer_event); } else //update the timer to the new value. { evtimer_add(&instance->timer_event, &timeout); } return 0; //0-success; -1-error } static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section) { u_int32_t intval; u_int64_t longval; MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 0); instance->max_cnn_host = intval; MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120); longval = intval; instance->cache_limit_size = longval * 1024 * 1024; MESA_load_profile_string_def(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256, "openbucket"); MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &instance->hash_object_key, 1); if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_BROKERS_LIST", instance->minio_hostlist, 64) < 0) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section); return -1; } MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880); if(instance->upload_block_size < 5242880) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); return -1; } MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999); if(intval < 60) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); return -1; } instance->relative_ttl = intval; return 0; } struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog) { struct tango_cache_instance *instance; instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance)); memset(instance, 0, sizeof(struct tango_cache_instance)); if(load_local_configure(instance, profile_path, section)) { free(instance); return NULL; } instance->evbase = evbase; instance->multi_hd = curl_multi_init(); instance->runtime_log = runtimelog; curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1); curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance); evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance); return instance; }