TSG-5746 使用hos存储非结构化文件
修复缓存代码编译问题 缓存功能适配hos
This commit is contained in:
10
cache/src/cache_evbase_client.cpp
vendored
10
cache/src/cache_evbase_client.cpp
vendored
@@ -276,7 +276,7 @@ int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/
|
||||
//ENDʱ<44><CAB1><EFBFBD><EFBFBD>δ<EFBFBD><CEB4>ʼ<EFBFBD>ֶ<EFBFBD><D6B6>ϴ<EFBFBD><CFB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD>֮ǰ<D6AE><C7B0><EFBFBD><EFBFBD>locateһ<65><D2BB>λ<EFBFBD><CEBB>
|
||||
ctx_asyn->ctx->locate = tango_cache_object_locate(ctx_asyn->ctx->instance, ctx_asyn->object_size);
|
||||
tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize);
|
||||
if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
cJSON_AddNumberToObject(ctx_asyn->ctx->put.object_meta, "Content-Length", ctx_asyn->object_size);
|
||||
}
|
||||
@@ -381,7 +381,7 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance
|
||||
|
||||
if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS)
|
||||
{
|
||||
maybe_loc = OBJECT_IN_MINIO;
|
||||
maybe_loc = OBJECT_IN_HOS;
|
||||
}
|
||||
ctx = tango_cache_update_prepare(instance->instance, f, meta, maybe_loc);
|
||||
if(ctx == NULL)
|
||||
@@ -495,7 +495,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut
|
||||
|
||||
if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS)
|
||||
{
|
||||
where_to_get = OBJECT_IN_MINIO;
|
||||
where_to_get = OBJECT_IN_HOS;
|
||||
}
|
||||
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
|
||||
ctx_asyn->instance_asyn = instance;
|
||||
@@ -527,9 +527,9 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu
|
||||
{
|
||||
struct cache_evbase_ctx *ctx_asyn;
|
||||
struct databuffer *buffer;
|
||||
enum OBJECT_LOCATION location = OBJECT_IN_MINIO;
|
||||
enum OBJECT_LOCATION location = OBJECT_IN_HOS;
|
||||
|
||||
if(instance->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(instance->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
location = OBJECT_IN_REDIS;
|
||||
}
|
||||
|
||||
81
cache/src/tango_cache_client.cpp
vendored
81
cache/src/tango_cache_client.cpp
vendored
@@ -119,14 +119,14 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti
|
||||
case CACHE_REQUEST_PUT:
|
||||
if(ctx->fail_state)
|
||||
{
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
statistic->put_err_http += 1;
|
||||
else
|
||||
statistic->put_err_redis += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
statistic->put_succ_http += 1;
|
||||
else
|
||||
statistic->put_succ_redis += 1;
|
||||
@@ -138,14 +138,14 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti
|
||||
{
|
||||
if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT)
|
||||
statistic->get_miss_num += 1;
|
||||
else if(ctx->locate == OBJECT_IN_MINIO)
|
||||
else if(ctx->locate == OBJECT_IN_HOS)
|
||||
statistic->get_err_http += 1;
|
||||
else
|
||||
statistic->get_err_redis += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
statistic->get_succ_http += 1;
|
||||
else
|
||||
statistic->get_succ_redis += 1;
|
||||
@@ -258,7 +258,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
|
||||
//<2F>ж<EFBFBD>session<6F>Ƿ<C7B7><F1B3ACB9><EFBFBD><EFBFBD>ƣ<EFBFBD><C6A3><EFBFBD><EFBFBD><EFBFBD>ȡ<EFBFBD><C8A1>ʼ<EFBFBD><CABC><EFBFBD>ж<EFBFBD>where_to_get<65>Ƿ<EFBFBD>ȫ<EFBFBD><C8AB><EFBFBD><EFBFBD>MINIO<49><4F>
|
||||
bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get)
|
||||
{
|
||||
if(where_to_get == OBJECT_IN_MINIO)
|
||||
if(where_to_get == OBJECT_IN_HOS)
|
||||
{
|
||||
return (instance->statistic.session_http>=instance->param->maximum_sessions);
|
||||
}
|
||||
@@ -278,7 +278,7 @@ enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *inst
|
||||
}
|
||||
if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize)
|
||||
{
|
||||
return OBJECT_IN_MINIO;
|
||||
return OBJECT_IN_HOS;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -290,7 +290,7 @@ void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/,
|
||||
{
|
||||
if(path != NULL)
|
||||
{
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
{
|
||||
snprintf(path, pathsize, "http://%s/%s", ctx->hostaddr, ctx->object_key);
|
||||
}
|
||||
@@ -307,8 +307,8 @@ int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_
|
||||
{
|
||||
ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size);
|
||||
tango_cache_get_object_path(ctx, path, pathsize);
|
||||
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size);
|
||||
}
|
||||
@@ -414,7 +414,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
||||
if(instance->param->hash_object_key)
|
||||
{
|
||||
caculate_sha256(meta->url, strlen(meta->url), buffer, 72);
|
||||
snprintf(ctx->object_key, 256, "%s/%c%c/%c%c/%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
|
||||
snprintf(ctx->object_key, 256, "%s/%c%c:%c%c:%s", instance->param->bucketname, buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
|
||||
//<2F><><EFBFBD><EFBFBD>ԭʼURL
|
||||
snprintf(buffer, 2064, "x-amz-meta-url: %s", meta->url);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
@@ -423,7 +423,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
||||
{
|
||||
snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url);
|
||||
}
|
||||
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
|
||||
if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
|
||||
{
|
||||
instance->error_code = CACHE_ERR_WIREDLB;
|
||||
instance->statistic.totaldrop_num += 1;
|
||||
@@ -454,7 +454,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
||||
if(meta->std_hdr[i] != NULL)
|
||||
{
|
||||
ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]);
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i]));
|
||||
easy_string_savedata(&hdr_estr, "\r\n", 2);
|
||||
@@ -464,7 +464,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
||||
if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL)
|
||||
{
|
||||
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n"));
|
||||
}
|
||||
@@ -480,7 +480,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
||||
ctx->headers = curl_slist_append(ctx->headers, user_tag);
|
||||
}
|
||||
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
ctx->put.object_meta = cJSON_CreateObject();
|
||||
if(instance->param->hash_object_key)
|
||||
@@ -513,7 +513,7 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in
|
||||
|
||||
if(instance->param->object_store_way != CACHE_SMALL_REDIS)
|
||||
{
|
||||
maybe_loc = OBJECT_IN_MINIO;
|
||||
maybe_loc = OBJECT_IN_HOS;
|
||||
}
|
||||
|
||||
ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc);
|
||||
@@ -540,8 +540,8 @@ struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_insta
|
||||
return NULL;
|
||||
}
|
||||
tango_cache_get_object_path(ctx, path, pathsize);
|
||||
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
|
||||
|
||||
if(ctx->instance->param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size);
|
||||
}
|
||||
@@ -600,13 +600,13 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
|
||||
if(instance->param->hash_object_key)
|
||||
{
|
||||
caculate_sha256(meta->url, strlen(meta->url), sha256, 72);
|
||||
snprintf(ctx->object_key, 256, "%s/%c%c/%c%c/%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
|
||||
snprintf(ctx->object_key, 256, "%s/%c%c:%c%c:%s", instance->param->bucketname, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
|
||||
}
|
||||
else
|
||||
{
|
||||
snprintf(ctx->object_key, 256, "%s/%s", instance->param->bucketname, meta->url);
|
||||
}
|
||||
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
|
||||
if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
|
||||
{
|
||||
instance->error_code = CACHE_ERR_WIREDLB;
|
||||
instance->statistic.totaldrop_num += 1;
|
||||
@@ -622,7 +622,7 @@ int tango_cache_fetch_object(struct tango_cache_instance *instance, struct futur
|
||||
|
||||
if(instance->param->object_store_way != CACHE_SMALL_REDIS)
|
||||
{
|
||||
where_to_get = OBJECT_IN_MINIO;
|
||||
where_to_get = OBJECT_IN_HOS;
|
||||
}
|
||||
|
||||
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get);
|
||||
@@ -639,7 +639,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future
|
||||
enum OBJECT_LOCATION location;
|
||||
|
||||
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Redis<69><73><EFBFBD><EFBFBD>Ԫ<EFBFBD><D4AA>Ϣ<EFBFBD>洢<EFBFBD><E6B4A2>Redis<69><73>
|
||||
location = (instance->param->object_store_way != CACHE_ALL_MINIO)?OBJECT_IN_REDIS:OBJECT_IN_MINIO;
|
||||
location = (instance->param->object_store_way != CACHE_ALL_HOS)?OBJECT_IN_REDIS:OBJECT_IN_HOS;
|
||||
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location);
|
||||
if(ctx == NULL)
|
||||
{
|
||||
@@ -654,7 +654,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
|
||||
char sha256[72];
|
||||
const char *pbucket;
|
||||
|
||||
if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO))
|
||||
if(sessions_exceeds_limit(instance, OBJECT_IN_HOS))
|
||||
{
|
||||
instance->error_code = CACHE_OUTOF_SESSION;
|
||||
instance->statistic.totaldrop_num += 1;
|
||||
@@ -670,7 +670,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
|
||||
if(instance->param->hash_object_key)
|
||||
{
|
||||
caculate_sha256(objkey, strlen(objkey), sha256, 72);
|
||||
snprintf(ctx->object_key, 256, "%s/%c%c/%c%c/%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
|
||||
snprintf(ctx->object_key, 256, "%s/%c%c:%c%c:%s", pbucket, sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -680,7 +680,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
|
||||
{
|
||||
snprintf(ctx->hostaddr, 48, "%s", minio_addr);
|
||||
}
|
||||
else if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
|
||||
else if(wired_load_balancer_lookup(instance->param->cache.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
|
||||
{
|
||||
instance->error_code = CACHE_ERR_WIREDLB;
|
||||
instance->statistic.totaldrop_num += 1;
|
||||
@@ -707,7 +707,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
|
||||
struct tango_cache_ctx *ctx;
|
||||
char md5[48]={0}, content_md5[48];
|
||||
|
||||
if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO))
|
||||
if(sessions_exceeds_limit(instance, OBJECT_IN_HOS))
|
||||
{
|
||||
instance->error_code = CACHE_OUTOF_SESSION;
|
||||
instance->statistic.totaldrop_num += 1;
|
||||
@@ -719,8 +719,8 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
|
||||
ctx->promise = future_to_promise(f);
|
||||
ctx->method = CACHE_REQUEST_DELETE_MUL;
|
||||
ctx->del.succ_num = num;
|
||||
|
||||
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
|
||||
|
||||
if(wired_load_balancer_lookup(instance->param->cache.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
|
||||
{
|
||||
instance->error_code = CACHE_ERR_WIREDLB;
|
||||
instance->statistic.totaldrop_num += num;
|
||||
@@ -1105,6 +1105,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
|
||||
longval = intval;
|
||||
param->maximum_used_mem = longval * 1024 * 1024;
|
||||
MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", ¶m->hash_object_key, 1);
|
||||
MESA_load_profile_string_def(profile_path, section, "CACHE_TOKEN", param->cache_token, 256, "c21f969b5f03d33d43e04f8f136e7682");
|
||||
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0)
|
||||
{
|
||||
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
|
||||
@@ -1125,30 +1126,30 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
|
||||
param->relative_ttl = intval;
|
||||
|
||||
//wiredlb
|
||||
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
|
||||
MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64);
|
||||
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->minio.wiredlb_override, 1);
|
||||
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->cache.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
|
||||
MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->cache.wiredlb_datacenter, 64);
|
||||
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->cache.wiredlb_override, 1);
|
||||
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
|
||||
param->minio.wiredlb_ha_port = intval;
|
||||
MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP");
|
||||
MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", ¶m->minio.port, 9000);
|
||||
if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0)
|
||||
param->cache.wiredlb_ha_port = intval;
|
||||
MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->cache.wiredlb_group, 64, "MINIO_GROUP");
|
||||
MESA_load_profile_uint_def(profile_path, section, "CACHE_LISTEN_PORT", ¶m->cache.port, 9000);
|
||||
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_IP_LIST", param->cache.iplist, 4096) < 0)
|
||||
{
|
||||
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.", profile_path, section);
|
||||
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BROKERS_LIST not found.", profile_path, section);
|
||||
return NULL;
|
||||
}
|
||||
if(wired_load_balancer_init(¶m->minio, runtime_log))
|
||||
if(wired_load_balancer_init(¶m->cache, runtime_log))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_MINIO);
|
||||
if(param->object_store_way!=CACHE_ALL_MINIO && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS)
|
||||
MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", ¶m->object_store_way, CACHE_ALL_HOS);
|
||||
if(param->object_store_way!=CACHE_ALL_HOS && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS)
|
||||
{
|
||||
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section);
|
||||
return NULL;
|
||||
}
|
||||
if(param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0)
|
||||
{
|
||||
@@ -1209,7 +1210,7 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet
|
||||
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb);
|
||||
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance);
|
||||
|
||||
if(param->object_store_way != CACHE_ALL_MINIO)
|
||||
if(param->object_store_way != CACHE_ALL_HOS)
|
||||
{
|
||||
if(redis_asyn_connect_init(instance))
|
||||
{
|
||||
|
||||
5
cache/src/tango_cache_client_in.h
vendored
5
cache/src/tango_cache_client_in.h
vendored
@@ -19,7 +19,7 @@
|
||||
#define RESPONSE_HDR_LAST_MOD 2
|
||||
#define RESPONSE_HDR_ALL 3
|
||||
|
||||
#define CACHE_ALL_MINIO 0 //Ԫ<><D4AA>Ϣ<EFBFBD>Ͷ<EFBFBD><CDB6><EFBFBD><F3B6BCB4><EFBFBD>MINIO
|
||||
#define CACHE_ALL_HOS 0 //Ԫ<><D4AA>Ϣ<EFBFBD>Ͷ<EFBFBD><CDB6><EFBFBD><F3B6BCB4><EFBFBD>MINIO
|
||||
#define CACHE_META_REDIS 1 //Ԫ<><D4AA>Ϣ<EFBFBD><CFA2>REDIS<49><53><EFBFBD><EFBFBD><EFBFBD><EFBFBD>MINIO
|
||||
#define CACHE_SMALL_REDIS 2 //Ԫ<><D4AA>Ϣ<EFBFBD><CFA2>С<EFBFBD>ļ<EFBFBD><C4BC><EFBFBD>REDIS<49><53><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD>MINIO
|
||||
|
||||
@@ -106,6 +106,7 @@ struct wiredlb_parameter
|
||||
struct tango_cache_parameter
|
||||
{
|
||||
char bucketname[256];
|
||||
char cache_token[256];
|
||||
char redis_key[256];
|
||||
long maximum_host_cnns;
|
||||
long transfer_timeout;//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||||
@@ -117,7 +118,7 @@ struct tango_cache_parameter
|
||||
u_int32_t hash_object_key;
|
||||
//wiredlb
|
||||
int object_store_way; //<2F><>ȡobject<63><74>Ϣ<EFBFBD>ķ<EFBFBD>ʽ
|
||||
struct wiredlb_parameter minio;
|
||||
struct wiredlb_parameter cache;
|
||||
char redisaddrs[4096];
|
||||
u_int32_t redis_object_maxsize;//С<>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD>redisʱ<73><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С
|
||||
|
||||
|
||||
8
cache/src/tango_cache_redis.cpp
vendored
8
cache/src/tango_cache_redis.cpp
vendored
@@ -162,15 +162,15 @@ static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vre
|
||||
switch(ctx->get.state)
|
||||
{
|
||||
case GET_STATE_REDIS_META:
|
||||
ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_MINIO:OBJECT_IN_REDIS;
|
||||
ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS;
|
||||
break;
|
||||
case GET_STATE_REDIS_ALL:
|
||||
ctx->get.result.location = OBJECT_IN_REDIS;
|
||||
break;
|
||||
|
||||
case GET_STATE_REDIS_TRY:
|
||||
ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_MINIO:OBJECT_IN_REDIS;
|
||||
if(ctx->get.result.location == OBJECT_IN_MINIO)
|
||||
ctx->get.result.location = (strcmp(reply->element[1]->str, "redis"))?OBJECT_IN_HOS:OBJECT_IN_REDIS;
|
||||
if(ctx->get.result.location == OBJECT_IN_HOS)
|
||||
{
|
||||
ctx->get.redis_redirect_minio_cb(ctx);
|
||||
return;
|
||||
@@ -189,7 +189,7 @@ static void redis_hget_command_cb(struct redisClusterAsyncContext *ac, void *vre
|
||||
tango_cache_ctx_destroy(ctx);
|
||||
break;
|
||||
case PARSE_JSON_RET_TIMEOUT:
|
||||
if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_MINIO)
|
||||
if(ctx->get.state == GET_STATE_DELETE && ctx->get.result.location==OBJECT_IN_HOS)
|
||||
{
|
||||
ctx->get.state = GET_STATE_END;
|
||||
cache_delete_minio_object(ctx);
|
||||
|
||||
54
cache/src/tango_cache_transfer.cpp
vendored
54
cache/src/tango_cache_transfer.cpp
vendored
@@ -114,7 +114,7 @@ static size_t curl_put_multipart_send_cb(void *ptr, size_t size, size_t count, v
|
||||
static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool full)
|
||||
{
|
||||
UNUSED CURLMcode rc;
|
||||
char minio_url[256];
|
||||
char minio_url[256], buffer[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
{
|
||||
@@ -136,8 +136,10 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||
//token<65>ֶΣ<D6B6><CEA3><EFBFBD><EFBFBD><EFBFBD>hos<6F>洢<EFBFBD><E6B4A2>֤
|
||||
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
|
||||
@@ -178,9 +180,9 @@ static size_t curl_response_body_save_cb(void *ptr, size_t size, size_t count, v
|
||||
int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
|
||||
{
|
||||
UNUSED CURLMcode rc;
|
||||
char minio_url[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
char minio_url[256]={0}, buffer[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
@@ -193,6 +195,8 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
|
||||
|
||||
@@ -205,7 +209,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
|
||||
int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
|
||||
{
|
||||
UNUSED CURLMcode rc;
|
||||
char minio_url[256];
|
||||
char minio_url[256], buffer[256];
|
||||
|
||||
ctx->instance->statistic.del_recv_num += 1;
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
@@ -221,6 +225,9 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
|
||||
|
||||
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
|
||||
@@ -257,9 +264,9 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
|
||||
{
|
||||
int len=0;
|
||||
UNUSED CURLMcode rc;
|
||||
char minio_url[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
char minio_url[256], buffer[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@@ -282,6 +289,8 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
|
||||
ctx->headers = NULL;
|
||||
}
|
||||
ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
|
||||
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||
|
||||
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
|
||||
@@ -298,7 +307,7 @@ bool cache_kick_upload_minio_multipart(struct tango_cache_ctx *ctx, size_t block
|
||||
switch(ctx->put.state)
|
||||
{
|
||||
case PUT_STATE_START:
|
||||
if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_MINIO))
|
||||
if(sessions_exceeds_limit(ctx->instance, OBJECT_IN_HOS))
|
||||
{
|
||||
tango_cache_set_fail_state(ctx, CACHE_OUTOF_SESSION);
|
||||
return false;
|
||||
@@ -368,7 +377,7 @@ int do_tango_cache_update_end(struct tango_cache_ctx *ctx, bool callback)
|
||||
tango_cache_ctx_destroy(ctx, callback);
|
||||
return -1;
|
||||
}
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
{
|
||||
return http_put_complete_part_evbuf(ctx, callback);
|
||||
}
|
||||
@@ -494,7 +503,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
||||
{
|
||||
tango_cache_set_fail_state(ctx, CACHE_ERR_CURL);
|
||||
}
|
||||
if(ctx->instance->param->object_store_way!=CACHE_ALL_MINIO && !ctx->fail_state)
|
||||
if(ctx->instance->param->object_store_way!=CACHE_ALL_HOS && !ctx->fail_state)
|
||||
{
|
||||
redis_put_minio_object_meta(ctx, true);
|
||||
}
|
||||
@@ -510,7 +519,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
||||
int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
|
||||
{
|
||||
UNUSED CURLMcode rc;
|
||||
char minio_url[256];
|
||||
char minio_url[256], buffer[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
{
|
||||
@@ -527,6 +536,8 @@ int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COP
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
|
||||
|
||||
@@ -557,7 +568,7 @@ int do_tango_cache_upload_once_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY
|
||||
ctx->instance->statistic.memory_used += size;
|
||||
ctx->instance->error_code = CACHE_OK;
|
||||
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
{
|
||||
return http_put_complete_part_data(ctx, way, data, size, false);
|
||||
}
|
||||
@@ -595,7 +606,7 @@ int do_tango_cache_upload_once_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_
|
||||
size = evbuffer_get_length(ctx->put.evbuf);
|
||||
ctx->instance->statistic.memory_used += size;
|
||||
|
||||
if(ctx->locate == OBJECT_IN_MINIO)
|
||||
if(ctx->locate == OBJECT_IN_HOS)
|
||||
{
|
||||
return http_put_complete_part_evbuf(ctx, callback);
|
||||
}
|
||||
@@ -804,7 +815,7 @@ static size_t curl_get_response_header_cb(void *ptr, size_t size, size_t count,
|
||||
{
|
||||
return raw_len;
|
||||
}
|
||||
ctx->get.result.location = OBJECT_IN_MINIO;
|
||||
ctx->get.result.location = OBJECT_IN_HOS;
|
||||
}
|
||||
pos_colon = (char*)memchr(start, ':', raw_len);
|
||||
if(pos_colon == NULL)
|
||||
@@ -893,7 +904,7 @@ void tango_cache_curl_get_done(struct tango_cache_ctx *ctx, CURLcode res, long r
|
||||
static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx)
|
||||
{
|
||||
UNUSED CURLMcode rc;
|
||||
char minio_url[256];
|
||||
char minio_url[256], buffer[256];
|
||||
|
||||
if(NULL == (ctx->curl=curl_easy_init()))
|
||||
{
|
||||
@@ -912,6 +923,9 @@ static int tango_cache_fetch_minio(struct tango_cache_ctx *ctx)
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
|
||||
sprintf(buffer, "token: %s", ctx->instance->param->cache_token);
|
||||
ctx->headers = curl_slist_append(ctx->headers, buffer);
|
||||
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
|
||||
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
|
||||
|
||||
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
|
||||
@@ -924,7 +938,7 @@ static void redis_redirect_object2minio_cb(struct tango_cache_ctx *ctx)
|
||||
struct promise *p = ctx->promise;
|
||||
|
||||
ctx->get.state = GET_STATE_START;
|
||||
ctx->locate = OBJECT_IN_MINIO;
|
||||
ctx->locate = OBJECT_IN_HOS;
|
||||
if(ctx->instance->statistic.session_http>=ctx->instance->param->maximum_sessions)
|
||||
{
|
||||
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
|
||||
@@ -942,8 +956,8 @@ int do_tango_cache_fetch_object(struct tango_cache_ctx *ctx, enum OBJECT_LOCATIO
|
||||
ctx->instance->statistic.get_recv_num += 1;
|
||||
switch(where_to_get)
|
||||
{
|
||||
case OBJECT_IN_MINIO:
|
||||
ctx->locate = OBJECT_IN_MINIO;
|
||||
case OBJECT_IN_HOS:
|
||||
ctx->locate = OBJECT_IN_HOS;
|
||||
return (tango_cache_fetch_minio(ctx)==1)?0:-2;
|
||||
case OBJECT_IN_REDIS:
|
||||
ctx->locate = OBJECT_IN_REDIS;
|
||||
|
||||
3
cache/src/tango_cache_xml.cpp
vendored
3
cache/src/tango_cache_xml.cpp
vendored
@@ -55,7 +55,8 @@ void construct_complete_xml(struct tango_cache_ctx *ctx, char **xml, int *len)
|
||||
|
||||
pdoc = xmlNewDoc((const xmlChar *)"1.0");
|
||||
root = xmlNewNode(NULL, (const xmlChar *)"CompleteMultipartUpload");
|
||||
xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/");
|
||||
/*Big data deletion of this field parsing, shielding this field**/
|
||||
//xmlNewProp(root, (const xmlChar *)"xmlns",(const xmlChar *)"http://s3.amazonaws.com/doc/2006-03-01/");
|
||||
xmlDocSetRootElement(pdoc, root);
|
||||
|
||||
TAILQ_FOREACH(etag, &ctx->put.etag_head, node)
|
||||
|
||||
Reference in New Issue
Block a user