diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index 226d4c1..1ed9695 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -81,7 +81,7 @@ enum CACHE_RESULT_TYPE enum OBJECT_LOCATION { OBJECT_IN_UNKNOWN=0, - OBJECT_IN_MINIO, + OBJECT_IN_HOS, OBJECT_IN_REDIS }; diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp index 327b95d..e7835ef 100644 --- a/cache/src/cache_evbase_client.cpp +++ b/cache/src/cache_evbase_client.cpp @@ -276,7 +276,7 @@ int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/ //ENDʱ£¬Èôδ¿ªÊ¼·Ö¶ÎÉÏ´«£¬´¥·¢ÉÏ´«Ö®Ç°±ØÐëlocateÒ»ÏÂλÖà ctx_asyn->ctx->locate = tango_cache_object_locate(ctx_asyn->ctx->instance, ctx_asyn->object_size); tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize); - if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_HOS) { cJSON_AddNumberToObject(ctx_asyn->ctx->put.object_meta, "Content-Length", ctx_asyn->object_size); } @@ -381,7 +381,7 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) { - maybe_loc = OBJECT_IN_MINIO; + maybe_loc = OBJECT_IN_HOS; } ctx = tango_cache_update_prepare(instance->instance, f, meta, maybe_loc); if(ctx == NULL) @@ -495,7 +495,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS) { - where_to_get = OBJECT_IN_MINIO; + where_to_get = OBJECT_IN_HOS; } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; @@ -527,9 +527,9 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu { struct cache_evbase_ctx *ctx_asyn; struct databuffer *buffer; - enum OBJECT_LOCATION location = OBJECT_IN_MINIO; + enum OBJECT_LOCATION location = OBJECT_IN_HOS; - if(instance->instance->param->object_store_way != CACHE_ALL_MINIO) + if(instance->instance->param->object_store_way != CACHE_ALL_HOS) { location = OBJECT_IN_REDIS; } diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index 70e8dae..ba64ef2 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -119,14 +119,14 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti case CACHE_REQUEST_PUT: if(ctx->fail_state) { - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) statistic->put_err_http += 1; else statistic->put_err_redis += 1; } else { - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) statistic->put_succ_http += 1; else statistic->put_succ_redis += 1; @@ -138,14 +138,14 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti { if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT) statistic->get_miss_num += 1; - else if(ctx->locate == OBJECT_IN_MINIO) + else if(ctx->locate == OBJECT_IN_HOS) statistic->get_err_http += 1; else statistic->get_err_redis += 1; } else { - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) statistic->get_succ_http += 1; else statistic->get_succ_redis += 1; @@ -258,7 +258,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) //ÅжÏsessionÊÇ·ñ³¬¹ýÏÞÖÆ£¬ÐèÌáÈ¡³õʼ»¯ÅжÏwhere_to_getÊÇ·ñÈ«²¿ÔÚMINIOÖÐ bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get) { - if(where_to_get == OBJECT_IN_MINIO) + if(where_to_get == OBJECT_IN_HOS) { return (instance->statistic.session_http>=instance->param->maximum_sessions); } @@ -278,7 +278,7 @@ enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *inst } if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize) { - return OBJECT_IN_MINIO; + return OBJECT_IN_HOS; } else { @@ -290,7 +290,7 @@ void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, { if(path != NULL) { - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) { snprintf(path, pathsize, "http://%s/%s", ctx->hostaddr, ctx->object_key); } @@ -307,8 +307,8 @@ int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_ { ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size); tango_cache_get_object_path(ctx, path, pathsize); - - if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + + if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) { cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size); } @@ -414,7 +414,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * if(instance->param->hash_object_key) { caculate_sha256(meta->url, strlen(meta->url), buffer, 72); - snprintf(ctx->object_key, 256, "%s/%c%c/%c%c/%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); + snprintf(ctx->object_key, 256, "%s/%c%c:%c%c:%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); //±£´æÔ­Ê¼URL snprintf(buffer, 2064, "x-amz-meta-url: %s", meta->url); ctx->headers = curl_slist_append(ctx->headers, buffer); @@ -423,7 +423,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url); } - if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) + if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; @@ -454,7 +454,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * if(meta->std_hdr[i] != NULL) { ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]); - if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) { easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i])); easy_string_savedata(&hdr_estr, "\r\n", 2); @@ -464,7 +464,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL) { ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); - if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) { easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n")); } @@ -480,7 +480,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * ctx->headers = curl_slist_append(ctx->headers, user_tag); } - if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) { ctx->put.object_meta = cJSON_CreateObject(); if(instance->param->hash_object_key) @@ -513,7 +513,7 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in if(instance->param->object_store_way != CACHE_SMALL_REDIS) { - maybe_loc = OBJECT_IN_MINIO; + maybe_loc = OBJECT_IN_HOS; } ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc); @@ -540,8 +540,8 @@ struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_insta return NULL; } tango_cache_get_object_path(ctx, path, pathsize); - - if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO) + + if(ctx->instance->param->object_store_way != CACHE_ALL_HOS) { cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size); } @@ -600,13 +600,13 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i if(instance->param->hash_object_key) { caculate_sha256(meta->url, strlen(meta->url), sha256, 72); - snprintf(ctx->object_key, 256, "%s/%c%c/%c%c/%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); + snprintf(ctx->object_key, 256, "%s/%c%c:%c%c:%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); } else { snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url); } - if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) + if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; @@ -622,7 +622,7 @@ int tango_cache_fetch_object(struct tango_cache_instance *instance, struct futur if(instance->param->object_store_way != CACHE_SMALL_REDIS) { - where_to_get = OBJECT_IN_MINIO; + where_to_get = OBJECT_IN_HOS; } ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get); @@ -639,7 +639,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future enum OBJECT_LOCATION location; //Èç¹û¿ªÆôÁËRedis£¬ÔòÔªÐÅÏ¢´æ´¢ÔÚRedisÖÐ - location = (instance->param->object_store_way != CACHE_ALL_MINIO)?OBJECT_IN_REDIS:OBJECT_IN_MINIO; + location = (instance->param->object_store_way != CACHE_ALL_HOS)?OBJECT_IN_REDIS:OBJECT_IN_HOS; ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location); if(ctx == NULL) { @@ -654,7 +654,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * char sha256[72]; const char *pbucket; - if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO)) + if(sessions_exceeds_limit(instance, OBJECT_IN_HOS)) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -670,7 +670,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * if(instance->param->hash_object_key) { caculate_sha256(objkey, strlen(objkey), sha256, 72); - snprintf(ctx->object_key, 256, "%s/%c%c/%c%c/%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); + snprintf(ctx->object_key, 256, "%s/%c%c:%c%c:%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); } else { @@ -680,7 +680,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * { snprintf(ctx->hostaddr, 48, "%s", minio_addr); } - else if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) + else if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; @@ -707,7 +707,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst struct tango_cache_ctx *ctx; char md5[48]={0}, content_md5[48]; - if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO)) + if(sessions_exceeds_limit(instance, OBJECT_IN_HOS)) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -719,8 +719,8 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst ctx->promise = future_to_promise(f); ctx->method = CACHE_REQUEST_DELETE_MUL; ctx->del.succ_num = num; - - if(wired_load_balancer_lookup(instance->param->minio.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) + + if(wired_load_balancer_lookup(instance->param->cache.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += num; @@ -1105,6 +1105,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path longval = intval; param->maximum_used_mem = longval * 1024 * 1024; MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", ¶m->hash_object_key, 1); + MESA_load_profile_string_def(profile_path, section, "CACHE_TOKEN", param->cache_token, 256, "c21f969b5f03d33d43e04f8f136e7682"); if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0) { MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section); @@ -1125,30 +1126,30 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path param->relative_ttl = intval; //wiredlb - MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); - MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->minio.wiredlb_override, 1); + MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->cache.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); + MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->cache.wiredlb_datacenter, 64); + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->cache.wiredlb_override, 1); MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); - param->minio.wiredlb_ha_port = intval; - MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP"); - MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", ¶m->minio.port, 9000); - if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0) + param->cache.wiredlb_ha_port = intval; + MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->cache.wiredlb_group, 64, "MINIO_GROUP"); + MESA_load_profile_uint_def(profile_path, section, "CACHE_LISTEN_PORT", ¶m->cache.port, 9000); + if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_IP_LIST", param->cache.iplist, 4096) < 0) { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.", profile_path, section); + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BROKERS_LIST not found.", profile_path, section); return NULL; } - if(wired_load_balancer_init(¶m->minio, runtime_log)) + if(wired_load_balancer_init(¶m->cache, runtime_log)) { return NULL; } - MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_MINIO); - if(param->object_store_way!=CACHE_ALL_MINIO && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS) + MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_HOS); + if(param->object_store_way!=CACHE_ALL_HOS && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS) { MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section); return NULL; } - if(param->object_store_way != CACHE_ALL_MINIO) + if(param->object_store_way != CACHE_ALL_HOS) { if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0) { @@ -1209,7 +1210,7 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance); - if(param->object_store_way != CACHE_ALL_MINIO) + if(param->object_store_way != CACHE_ALL_HOS) { if(redis_asyn_connect_init(instance)) { diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index f457fed..82345de 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -19,7 +19,7 @@ #define RESPONSE_HDR_LAST_MOD 2 #define RESPONSE_HDR_ALL 3 -#define CACHE_ALL_MINIO 0 //ÔªÐÅÏ¢ºÍ¶ÔÏó¶¼´æÔÚMINIO +#define CACHE_ALL_HOS 0 //ÔªÐÅÏ¢ºÍ¶ÔÏó¶¼´æÔÚMINIO #define CACHE_META_REDIS 1 //ÔªÐÅÏ¢ÔÚREDIS¶ÔÏóÔÚMINIO #define CACHE_SMALL_REDIS 2 //ÔªÐÅÏ¢ºÍСÎļþÔÚREDIS£¬´óÎļþÔÚMINIO @@ -106,6 +106,7 @@ struct wiredlb_parameter struct tango_cache_parameter { char bucketname[256]; + char cache_token[256]; char redis_key[256]; long maximum_host_cnns; long transfer_timeout;//´«Êä×Üʱ¼äÏÞÖÆ @@ -117,7 +118,7 @@ struct tango_cache_parameter u_int32_t hash_object_key; //wiredlb int object_store_way; //´æÈ¡objectÐÅÏ¢µÄ·½Ê½ - struct wiredlb_parameter minio; + struct wiredlb_parameter cache; char redisaddrs[4096]; u_int32_t redis_object_maxsize;//СÎļþ´æÔÚredisʱ£¬¶ÔÏóµÄ×î´ó´óС diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp index ccfb171..54fd8d0 100644 --- a/cache/src/tango_cache_redis.cpp +++ b/cache/src/tango_cache_redis.cpp @@ -162,15 +162,15 @@ static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vre switch(ctx->get.state) { case GET_STATE_REDIS_META: - ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_MINIO:OBJECT_IN_REDIS; + ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS; break; case GET_STATE_REDIS_ALL: ctx->get.result.location = OBJECT_IN_REDIS; break; case GET_STATE_REDIS_TRY: - ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_MINIO:OBJECT_IN_REDIS; - if(ctx->get.result.location == OBJECT_IN_MINIO) + ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS; + if(ctx->get.result.location == OBJECT_IN_HOS) { ctx->get.redis_redirect_minio_cb(ctx); return; @@ -189,7 +189,7 @@ static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vre tango_cache_ctx_destroy(ctx); break; case PARSE_JSON_RET_TIMEOUT: - if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_MINIO) + if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_HOS) { ctx->get.state = GET_STATE_END; cache_delete_minio_object(ctx); diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp index b518691..6ea2c63 100644 --- a/cache/src/tango_cache_transfer.cpp +++ b/cache/src/tango_cache_transfer.cpp @@ -114,7 +114,7 @@ static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, v static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full) { UNUSED CURLMcode rc; - char minio_url[256]; + char minio_url[256], buffer[256]; if(NULL == (ctx->curl=curl_easy_init())) { @@ -136,8 +136,10 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + //token×ֶΣ¬ÓÃÓÚhos´æ´¢ÈÏÖ¤ + sprintf(buffer, "token: %s", ctx->instance->param->cache_token); + ctx->headers = curl_slist_append(ctx->headers, buffer); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length); curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); @@ -178,9 +180,9 @@ static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, v int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) { UNUSED CURLMcode rc; - char minio_url[256]; - - if(NULL == (ctx->curl=curl_easy_init())) + char minio_url[256]={0}, buffer[256]; + + if(NULL == (ctx->curl=curl_easy_init())) { return -1; } @@ -193,6 +195,8 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + sprintf(buffer, "token: %s", ctx->instance->param->cache_token); + ctx->headers = curl_slist_append(ctx->headers, buffer); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); @@ -205,7 +209,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back) { UNUSED CURLMcode rc; - char minio_url[256]; + char minio_url[256], buffer[256]; ctx->instance->statistic.del_recv_num += 1; if(NULL == (ctx->curl=curl_easy_init())) @@ -221,6 +225,9 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back) curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + sprintf(buffer, "token: %s", ctx->instance->param->cache_token); + ctx->headers = curl_slist_append(ctx->headers, buffer); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); @@ -257,9 +264,9 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) { int len=0; UNUSED CURLMcode rc; - char minio_url[256]; - - if(NULL == (ctx->curl=curl_easy_init())) + char minio_url[256], buffer[256]; + + if(NULL == (ctx->curl=curl_easy_init())) { return false; } @@ -282,6 +289,8 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) ctx->headers = NULL; } ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml"); + sprintf(buffer, "token: %s", ctx->instance->param->cache_token); + ctx->headers = curl_slist_append(ctx->headers, buffer); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); @@ -298,7 +307,7 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block switch(ctx->put.state) { case PUT_STATE_START: - if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_MINIO)) + if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_HOS)) { tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION); return false; @@ -368,7 +377,7 @@ int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback) tango_cache_ctx_destroy(ctx, callback); return -1; } - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) { return http_put_complete_part_evbuf(ctx, callback); } @@ -494,7 +503,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r { tango_cache_set_fail_state(ctx, CACHE_ERR_CURL); } - if(ctx->instance->param->object_store_way!=CACHE_ALL_MINIO && !ctx->fail_state) + if(ctx->instance->param->object_store_way!=CACHE_ALL_HOS && !ctx->fail_state) { redis_put_minio_object_meta(ctx, true); } @@ -510,7 +519,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) { UNUSED CURLMcode rc; - char minio_url[256]; + char minio_url[256], buffer[256]; if(NULL == (ctx->curl=curl_easy_init())) { @@ -527,6 +536,8 @@ int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COP curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + sprintf(buffer, "token: %s", ctx->instance->param->cache_token); + ctx->headers = curl_slist_append(ctx->headers, buffer); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); @@ -557,7 +568,7 @@ int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY ctx->instance->statistic.memory_used += size; ctx->instance->error_code = CACHE_OK; - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) { return http_put_complete_part_data(ctx, way, data, size, false); } @@ -595,7 +606,7 @@ int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_ size = evbuffer_get_length(ctx->put.evbuf); ctx->instance->statistic.memory_used += size; - if(ctx->locate == OBJECT_IN_MINIO) + if(ctx->locate == OBJECT_IN_HOS) { return http_put_complete_part_evbuf(ctx, callback); } @@ -804,7 +815,7 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count, { return raw_len; } - ctx->get.result.location = OBJECT_IN_MINIO; + ctx->get.result.location = OBJECT_IN_HOS; } pos_colon = (char*)memchr(start, ':', raw_len); if(pos_colon == NULL) @@ -893,7 +904,7 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx) { UNUSED CURLMcode rc; - char minio_url[256]; + char minio_url[256], buffer[256]; if(NULL == (ctx->curl=curl_easy_init())) { @@ -912,6 +923,9 @@ static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb); curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); + sprintf(buffer, "token: %s", ctx->instance->param->cache_token); + ctx->headers = curl_slist_append(ctx->headers, buffer); + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); @@ -924,7 +938,7 @@ static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx) struct promise *p = ctx->promise; ctx->get.state = GET_STATE_START; - ctx->locate = OBJECT_IN_MINIO; + ctx->locate = OBJECT_IN_HOS; if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions) { tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY); @@ -942,8 +956,8 @@ int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATIO ctx->instance->statistic.get_recv_num += 1; switch(where_to_get) { - case OBJECT_IN_MINIO: - ctx->locate = OBJECT_IN_MINIO; + case OBJECT_IN_HOS: + ctx->locate = OBJECT_IN_HOS; return (tango_cache_fetch_minio(ctx)==1)?0:-2; case OBJECT_IN_REDIS: ctx->locate = OBJECT_IN_REDIS; diff --git a/cache/src/tango_cache_xml.cpp b/cache/src/tango_cache_xml.cpp index 2b26b4e..0d21497 100644 --- a/cache/src/tango_cache_xml.cpp +++ b/cache/src/tango_cache_xml.cpp @@ -55,7 +55,8 @@ void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len) pdoc = xmlNewDoc((const xmlChar *)"1.0"); root = xmlNewNode(NULL, (const xmlChar *)"CompleteMultipartUpload"); - xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/"); + /*Big data deletion of this field parsing, shielding this field**/ + //xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/"); xmlDocSetRootElement(pdoc, root); TAILQ_FOREACH(etag, &ctx->put.etag_head, node) diff --git a/cache/test/CMakeLists.txt b/cache/test/CMakeLists.txt index dc4a0d1..f2a59a3 100644 --- a/cache/test/CMakeLists.txt +++ b/cache/test/CMakeLists.txt @@ -1,20 +1,22 @@ add_definitions(-fPIC -Wall -g) add_executable (cache_evbase_test cache_evbase_test.cpp) -target_link_libraries(cache_evbase_test tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) -target_link_libraries (cache_evbase_test MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread z) +target_include_directories(cache_evbase_test PRIVATE ../include) +target_link_libraries(cache_evbase_test common) +target_link_libraries(cache_evbase_test tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +target_link_libraries (cache_evbase_test MESA_handle_logger MESA_htable MESA_prof_load MESA_field_stat wiredLB pthread z) add_executable (cache_evbase_benchmark cache_evbase_benchmark.cpp) -target_link_libraries(cache_evbase_benchmark tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) -target_link_libraries (cache_evbase_benchmark MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread z) +target_link_libraries(cache_evbase_benchmark tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +target_link_libraries (cache_evbase_benchmark MESA_handle_logger MESA_htable MESA_prof_load MESA_field_stat wiredLB pthread z) #add_executable (cache_evbase_test_threads cache_evbase_test_threads.cpp) -#target_link_libraries(cache_evbase_test_threads tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +#target_link_libraries(cache_evbase_test_threads tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) #target_link_libraries (cache_evbase_test_threads MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread z) add_executable (tango_cache_test tango_cache_test.cpp) -target_link_libraries(tango_cache_test tango_cache_client_static libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) -target_link_libraries (tango_cache_test MESA_handle_logger MESA_htable MESA_prof_load wiredLB pthread) +target_link_libraries(tango_cache_test tango-cache-client libevent-static openssl-crypto-static openssl-ssl-static libxml2-static libcurl-static hiredis-static cjson) +target_link_libraries (tango_cache_test MESA_handle_logger MESA_htable MESA_prof_load MESA_field_stat wiredLB pthread) #INSTALL (TARGETS cache_evbase_test cache_evbase_test_threads tango_cache_test cache_evbase_benchmark DESTINATION bin) INSTALL (TARGETS cache_evbase_test tango_cache_test cache_evbase_benchmark DESTINATION bin) diff --git a/cache/test/cache_evbase_benchmark.cpp b/cache/test/cache_evbase_benchmark.cpp index c5ce437..8caba74 100644 --- a/cache/test/cache_evbase_benchmark.cpp +++ b/cache/test/cache_evbase_benchmark.cpp @@ -292,7 +292,7 @@ static void* thread_transfer_cmd(void *arg) sprintf(filename_in, "%s_%u", filecmd->file, index); getmeta.url = filename_in; pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(get_future_success, get_future_failed, pdata); + pdata->future = future_create("_get", get_future_success, get_future_failed, (void *)pdata); object_store_fetch_object(instance_asyn, pdata->future, &getmeta, OBJECT_IN_UNKNOWN); break; @@ -303,7 +303,7 @@ static void* thread_transfer_cmd(void *arg) putmeta.url = filename_in; pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(put_future_success, put_future_failed, pdata); + pdata->future = future_create("_put", put_future_success, put_future_failed, (void *)pdata); ctx = object_store_update_start(instance_asyn, pdata->future, &putmeta); if(ctx == NULL) { @@ -329,13 +329,13 @@ static void* thread_transfer_cmd(void *arg) case METHOD_HEAD: pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(head_future_success, head_future_failed, pdata); + pdata->future = future_create("_head", head_future_success, head_future_failed, (void *)pdata); object_store_head_object(instance_asyn, pdata->future, &getmeta); break; case METHOD_DEL: pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(del_future_success, del_future_failed, pdata); + pdata->future = future_create("_del", del_future_success, del_future_failed, (void *)pdata); sprintf(pdata->filename, "%s_%u", filecmd->file, index); object_store_delete_object(instance_asyn, pdata->future, pdata->filename); break; @@ -346,7 +346,7 @@ static void* thread_transfer_cmd(void *arg) putmeta.url = filename_in; pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(put_future_success, put_future_failed, pdata); + pdata->future = future_create("_putonce", put_future_success, put_future_failed, (void *)pdata); if(object_store_upload_once_data(instance_asyn, pdata->future, PUT_MEM_COPY, filecont.buf, filecont.len, &putmeta, pdata->filename, 256)) { future_destroy(pdata->future); @@ -360,7 +360,7 @@ static void* thread_transfer_cmd(void *arg) putmeta.url = filename_in; pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(put_future_success, put_future_failed, pdata); + pdata->future = future_create("_putonceev", put_future_success, put_future_failed, (void *)pdata); evbuf = evbuffer_new(); remain_len = filecont.len; diff --git a/cache/test/cache_evbase_test.cpp b/cache/test/cache_evbase_test.cpp index e1c7462..cd65513 100644 --- a/cache/test/cache_evbase_test.cpp +++ b/cache/test/cache_evbase_test.cpp @@ -202,7 +202,9 @@ int main(int argc, char **argv) { index = atoi(argv[2]); } - + + future_promise_library_init(NULL); + runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); if(NULL==runtime_log) { @@ -240,19 +242,19 @@ int main(int argc, char **argv) if(!strcasecmp(p, "GET")) { sprintf(filename_out, "file_index_%u.bin", index); - pdata->future = future_create(get_future_success, get_future_failed, pdata); + pdata->future = future_create("_get", get_future_success, get_future_failed, (void *)pdata); pdata->fp = fopen(filename_out, "w"); cache_evbase_fetch_object(instance_asyn, pdata->future, &getmeta, OBJECT_IN_UNKNOWN); } else if(!strcasecmp(p, "HEAD")) { - pdata->future = future_create(head_future_success, head_future_failed, pdata); + pdata->future = future_create("_head", head_future_success, head_future_failed, (void *)pdata); cache_evbase_head_object(instance_asyn, pdata->future, &getmeta); } else if(!strcasecmp(p, "DEL")) { - pdata->future = future_create(del_future_success, del_future_failed, pdata); + pdata->future = future_create("_del", del_future_success, del_future_failed, (void *)pdata); sprintf(pdata->filename, "%s", filename_in); cache_evbase_delete_object(instance_asyn, pdata->future, filename_in); } @@ -260,14 +262,14 @@ int main(int argc, char **argv) { size_t filelen; p = get_file_content(filename_in, &filelen); - pdata->future = future_create(put_future_success, put_future_failed, pdata); - + pdata->future = future_create("_putonce", put_future_success, put_future_failed, pdata); + cache_evbase_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256); } else if(!strcasecmp(p, "PUTONCEEV")) { size_t readlen; - pdata->future = future_create(put_future_success, put_future_failed, pdata); + pdata->future = future_create("_putonceev", put_future_success, put_future_failed, (void *)pdata); struct evbuffer *evbuf = evbuffer_new(); char buffer[1024]; @@ -286,7 +288,7 @@ int main(int argc, char **argv) } else { - pdata->future = future_create(put_future_success, put_future_failed, pdata); + pdata->future = future_create("_default", put_future_success, put_future_failed, (void *)pdata); ctx = cache_evbase_update_start(instance_asyn, pdata->future, &putmeta); char buffer[1024]; diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf index 21ac1f8..1c6d7a2 100644 --- a/cache/test/pangu_tg_cahce.conf +++ b/cache/test/pangu_tg_cahce.conf @@ -1,7 +1,13 @@ [TANGO_CACHE] -#Addresses of minio. Format is defined by WiredLB. -minio_ip_list=10.3.35.60-61; -minio_listen_port=9000 +Addresses of hos, Bucket name in minio. Format is defined by WiredLB. +cache_ip_list=192.168.40.223; +cache_listen_port=9098 +cache_bucket_name=hos/proxy_test + +#cache_ip_list=192.168.44.10; +#cache_listen_port=9090 +#cache_bucket_name=proxybucket + #Maximum number of connections opened by per host. #max_connection_per_host=1 #Maximum number of requests in a pipeline. @@ -11,8 +17,6 @@ minio_listen_port=9000 #Maximum time the request is allowed to take(seconds). #max_curl_transfer_timeout_s=0 -#Bucket name in minio. -cache_bucket_name=openbucket #Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value. max_used_memory_size_mb=5120 #Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute). @@ -20,8 +24,8 @@ cache_default_ttl_second=3600 #Whether to hash the object key before cache actions. GET/PUT may be faster if you open it. cache_object_key_hash_switch=1 -#Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio; -cache_store_object_way=2 +#Store way: 0-HOS; 1-META in REDIS, object in hos; 2-META and small object in Redis, large object in hos; +cache_store_object_way=0 #If cache_store_object_way is 2 and the size of a object is not bigger than this value, object will be stored in redis. redis_cache_object_size=20480 #If cache_store_object_way is not 0, we will use redis to store meta and object. diff --git a/cache/test/tango_cache_test.cpp b/cache/test/tango_cache_test.cpp index c00bd8c..e89d8f4 100644 --- a/cache/test/tango_cache_test.cpp +++ b/cache/test/tango_cache_test.cpp @@ -238,7 +238,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) sprintf(filename, "file_index_%u.bin", index++); pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); pdata->fp = fopen(filename, "w"); - pdata->future = future_create(get_future_success, get_future_failed, pdata); + pdata->future = future_create("_get", get_future_success, get_future_failed, (void *)pdata); if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta, OBJECT_IN_UNKNOWN) < 0) { @@ -248,7 +248,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) else if(!strcasecmp(p, "HEAD")) { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(head_future_success, head_future_failed, pdata); + pdata->future = future_create("_head", head_future_success, head_future_failed, (void *)pdata); if(tango_cache_head_object(tango_instance, pdata->future, &getmeta) < 0) { @@ -260,8 +260,8 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) size_t filelen; p = get_file_content(s, &filelen); pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(put_future_success, put_future_failed, pdata); - + pdata->future = future_create("_putnoce", put_future_success, put_future_failed, (void *)pdata); + if(tango_cache_upload_once_data(tango_instance, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256)) { put_future_failed(FUTURE_ERROR_CANCEL, "", pdata); @@ -271,7 +271,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) { size_t readlen; pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(put_future_success, put_future_failed, pdata); + pdata->future = future_create("_putonceev", put_future_success, put_future_failed, pdata); struct evbuffer *evbuf = evbuffer_new(); char buffer[1024]; @@ -294,14 +294,14 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) else if(!strcasecmp(p, "DEL")) { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(del_future_success, del_future_failed, pdata); + pdata->future = future_create("_del", del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", s); tango_cache_delete_object(tango_instance, pdata->future, s); } else if(!strcasecmp(p, "DELMUL")) //TODO { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(del_future_success, del_future_failed, pdata); + pdata->future = future_create("_delmul", del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", s); for(pstart = strtok_r(s, ";", &save_ptr); pstart != NULL; pstart = strtok_r(NULL, ";", &save_ptr)) @@ -313,8 +313,8 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) else { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); - pdata->future = future_create(put_future_success, put_future_failed, pdata); - + pdata->future = future_create("_default", put_future_success, put_future_failed, pdata); + ctx = tango_cache_update_start(tango_instance, pdata->future, &putmeta); if(ctx==NULL) { @@ -411,7 +411,9 @@ int main(int crgc, char **arg) struct timeval tv; void *runtime_log; struct tango_cache_parameter *parameter; - + + future_promise_library_init(NULL); + runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); if(NULL==runtime_log) { diff --git a/conf/pangu/pangu_pxy.conf b/conf/pangu/pangu_pxy.conf index aa5b6d4..f4ea6c6 100644 --- a/conf/pangu/pangu_pxy.conf +++ b/conf/pangu/pangu_pxy.conf @@ -5,32 +5,21 @@ enable_plugin=1 entrance_id=0 # default 1, if enable "en_sendlog", the iterm "tfe.conf [kafka] enable" must set 1 en_sendlog=1 -#Addresses of minio. Format is defined by WiredLB. -minio_ip_list=10.4.35.42-46; -minio_listen_port=9000 -#Maximum number of connections opened by per host. -#MAX_CONNECTION_PER_HOST=1 -#Maximum number of requests in a pipeline. -#MAX_CNNT_PIPELINE_NUM=20 -#Maximum parellel sessions(http and redis) is allowed to open. -#MAX_CURL_SESSION_NUM=100 -#Maximum time the request is allowed to take(seconds). -#MAX_CURL_TRANSFER_TIMEOUT_S=0 -#Bucket name in minio. +#Addresses of hos, Bucket name in hos. Format is defined by WiredLB. +cache_ip_list=10.4.35.42-46; +cache_listen_port=9000 cache_bucket_name=proxybucket -#Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value. +cache_token=c21f969b5f03d33d43e04f8f136e7682 + +#Refer to the pangu_cahche definition max_used_memroy_size_mb=5120 -#Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute). cache_default_ttl_second=3600 -#Whether to hash the object key before cache actions. GET/PUT may be faster if you open it. cache_object_key_hash_switch=1 -#Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio; -cache_store_object_way=2 -#If CACHE_STORE_OBJECT_WAY is 2 and the size of a object is not bigger than this value, object will be stored in redis. +#Refer to the pangu_cahche definition +cache_store_object_way=0 redis_cache_object_size=1024000 -#If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object. redis_cluster_addrs=10.4.20.211:9001,10.4.20.212:9001,10.4.20.213:9001,10.4.20.214:9001,10.4.20.215:9001,10.4.20.216:9001,10.4.20.217:9001,10.4.20.218:9001 #Configs of WiredLB for Minios load balancer. @@ -49,6 +38,7 @@ log_fsstat_dst_ip=10.4.20.202 log_fsstat_dst_port=8125 [ratelimit] +#hijack flow control enable=0 token_name=ratelimit redis_server=192.168.40.137 @@ -56,35 +46,28 @@ redis_port=6379 redis_db_index=5 [tango_cache] -enable_cache=1 +enable_cache=0 min_cache_obj_size=512 -#minio ip, as wiredlb required -minio_ip_list=10.4.35.1-14; -minio_listen_port=9000 - -#max_connection_per_host=1 -max_cnnt_pipeline_num=20 -#max_curl_session_num=100 - +#hos ip, as wiredlb required +cache_ip_list=10.4.35.1-14; +cache_listen_port=9000 cache_bucket_name=proxybucket -max_used_memory_size_mb=10240 -cache_default_ttl_second=3600 -cache_object_key_hash_switch=1 -#1-minio,2-redis -#Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio; +max_cnnt_pipeline_num=20 +#Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value. +max_used_memory_size_mb=10240 +#Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute). +cache_default_ttl_second=3600 +#Whether to hash the object key before cache actions. GET/PUT may be faster if you open it. +cache_object_key_hash_switch=1 +#Store way: 0-HOS; 1-META in REDIS, object in hos; 2-META and small object in Redis, large object in hos; cache_store_object_way=2 #If CACHE_STORE_OBJECT_WAY is 2 and the size of a object is not bigger than this value, object will be stored in redis. redis_cache_object_size=102400 #If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object. redis_cluster_addrs=10.4.35.15:9001,10.4.35.16:9001,10.4.35.17:9001,10.4.35.18:9001,10.4.35.19:9001,10.4.35.20:9001,10.4.35.21:9001,10.4.35.22:9001,10.4.35.23:9001,10.4.35.24:9001,10.4.35.25:9001,10.4.35.26:9001,10.4.35.27:9001,10.4.35.28:9001,10.4.35.29:9001,10.4.35.30:9001,10.4.35.31:9001,10.4.35.32:9001 -#wired load balancer configuration -#wiredlb_override=1 -#wiredlb_topic= -#wiredlb_datacenter= -wiredlb_health_port=52100 -#wiredlb_group= +#Configs of WiredLB for Minios load balancer.Refer to the definition at log cache_undefined_obj=1 query_undefined_obj=0