将读取配置单独抽出形成parameter API;支持Redis多机备份和故障切换。

This commit is contained in:
zhangchengwei
2018-11-07 15:05:55 +08:00
committed by zhengchao
parent 1fbaee37a5
commit f1822e04c5
12 changed files with 320 additions and 168 deletions

View File

@@ -28,8 +28,11 @@ void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, s
void cache_evbase_global_init(void);
//ÿ<><C3BF>minio<69><6F>Ⱥ<EFBFBD><C8BA>bucket<65><74><EFBFBD><EFBFBD>һ<EFBFBD><D2BB>parameter
struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog);
/*<2A><><EFBFBD><EFBFBD>ʵ<EFBFBD><CAB5><EFBFBD><EFBFBD>ÿ<EFBFBD>߳<EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʹ<EFBFBD><CAB9>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD>*/
struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog);
struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog);
//GET<45>ӿڣ<D3BF><DAA3>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD><EFBFBD>0<EFBFBD><30>ʧ<EFBFBD>ܷ<EFBFBD><DCB7><EFBFBD>-1<><31>future<72>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><DFB3><EFBFBD>ִ<EFBFBD>У<EFBFBD><D0A3><EFBFBD>ͬ

View File

@@ -95,6 +95,7 @@ struct tango_cache_meta_put
struct response_freshness put;
};
struct tango_cache_parameter;
struct tango_cache_instance;
struct tango_cache_ctx;
@@ -105,9 +106,11 @@ void tango_cache_get_statistics(const struct tango_cache_instance *instance, str
/*ÿ<><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD><D6B4>һ<EFBFBD>γ<EFBFBD>ʼ<EFBFBD><CABC>*/
void tango_cache_global_init(void);
//ÿ<><C3BF>minio<69><6F>Ⱥ<EFBFBD><C8BA>bucket<65><74><EFBFBD><EFBFBD>һ<EFBFBD><D2BB>parameter
struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtimelog);
/*<2A><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>API<50>̲߳<DFB3><CCB2><EFBFBD>ȫ*/
//ÿ<><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̴߳<DFB3><CCB4><EFBFBD>һ<EFBFBD><D2BB>instance
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog);
struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog);
/* GET<45>ӿڵ<D3BF>API*/

View File

@@ -168,7 +168,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
break;
case CACHE_ASYN_HEAD:
f = ctx_asyn->ctx->future;
if(ctx_asyn->instance_asyn->instance->head_meta_source == HEAD_META_FROM_REDIS)
if(ctx_asyn->instance_asyn->instance->param->head_meta_source == HEAD_META_FROM_REDIS)
{
ret = tango_cache_head_redis(ctx_asyn->ctx);
}
@@ -395,7 +395,7 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key);
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key);
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
@@ -442,7 +442,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key);
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key);
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
@@ -555,7 +555,12 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
return 0;
}
struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog)
struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog)
{
return tango_cache_parameter_new(profile_path, section, runtimelog);
}
struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog)
{
evutil_socket_t notification_fd[2];
struct cache_evbase_instance *instance_asyn;
@@ -576,7 +581,7 @@ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path
instance_asyn->evbase = evbase;
instance_asyn->notify_readfd = notification_fd[0];
instance_asyn->notify_sendfd = notification_fd[1];
instance_asyn->instance = tango_cache_instance_new(evbase, profile_path, section, runtimelog);
instance_asyn->instance = tango_cache_instance_new(param, evbase, runtimelog);
if(instance_asyn->instance == NULL)
{
free(instance_asyn);

View File

@@ -123,7 +123,7 @@ struct tango_cache_result *tango_cache_read_result(future_result_t *promise_resu
void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
}
static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic)
@@ -269,9 +269,9 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data,
return 0;
}
ctx->instance->statistic.memory_used += size;
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
}
return 0;
}
@@ -303,9 +303,9 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
}
}
ctx->instance->statistic.memory_used += size;
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
}
return 0;
}
@@ -316,7 +316,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
char buffer[2064];
time_t expires, now, last_modify;
if((u_int64_t)instance->statistic.memory_used>=instance->cache_limit_size || instance->statistic.session_num>=instance->max_session_num)
if((u_int64_t)instance->statistic.memory_used>=instance->param->cache_limit_size || instance->statistic.session_num>=instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_MEMORY;
instance->statistic.totaldrop_num += 1;
@@ -328,7 +328,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
ctx->future = f;
ctx->method = CACHE_REQUEST_PUT;
if(instance->hash_object_key)
if(instance->param->hash_object_key)
{
caculate_sha256(meta->url, strlen(meta->url), buffer, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
@@ -340,7 +340,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
{
snprintf(ctx->object_key, 256, "%s", meta->url);
}
if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
@@ -350,7 +350,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
//Expires<65>ֶΣ<D6B6><CEA3><EFBFBD><EFBFBD>ڻ<EFBFBD><DABB><EFBFBD><EFBFBD>ڲ<EFBFBD><DAB2>ж<EFBFBD><D0B6><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ƿ<EFBFBD><C7B7><EFBFBD>ʱ
now = time(NULL);
expires = (meta->put.timeout==0||meta->put.timeout>instance->relative_ttl)?instance->relative_ttl:meta->put.timeout;
expires = (meta->put.timeout==0||meta->put.timeout>instance->param->relative_ttl)?instance->param->relative_ttl:meta->put.timeout;
if(expires_timestamp2hdr_str(now + expires, buffer, 256))
{
ctx->headers = curl_slist_append(ctx->headers, buffer);
@@ -417,7 +417,7 @@ int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct f
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key);
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_data(ctx, way, data, size);
@@ -435,7 +435,7 @@ int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key);
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_evbuf(ctx, way, evbuf);
@@ -446,7 +446,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
struct tango_cache_ctx *ctx;
char sha256[72];
if(instance->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->max_session_num)
if(instance->param->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -461,7 +461,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
ctx->get.max_age = meta->get.max_age;
ctx->get.min_fresh = meta->get.min_fresh;
if(instance->hash_object_key)
if(instance->param->hash_object_key)
{
caculate_sha256(meta->url, strlen(meta->url), sha256, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
@@ -470,7 +470,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
{
snprintf(ctx->object_key, 256, "%s", meta->url);
}
if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
@@ -502,7 +502,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future
return -1;
}
if(ctx->instance->head_meta_source == HEAD_META_FROM_REDIS)
if(instance->param->head_meta_source == HEAD_META_FROM_REDIS)
{
return tango_cache_head_redis(ctx);
}
@@ -517,7 +517,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
struct tango_cache_ctx *ctx;
char sha256[72];
if(instance->statistic.session_num >= instance->max_session_num)
if(instance->statistic.session_num >= instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -529,7 +529,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
ctx->future = f;
ctx->method = CACHE_REQUEST_DELETE;
if(instance->hash_object_key)
if(instance->param->hash_object_key)
{
caculate_sha256(objkey, strlen(objkey), sha256, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
@@ -538,7 +538,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
{
snprintf(ctx->object_key, 256, "%s", objkey);
}
if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
@@ -565,7 +565,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
struct tango_cache_ctx *ctx;
char md5[48]={0}, content_md5[48];
if(instance->statistic.session_num >= instance->max_session_num)
if(instance->statistic.session_num >= instance->param->max_session_num)
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -578,7 +578,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
ctx->method = CACHE_REQUEST_DELETE_MUL;
ctx->del.succ_num = num;
if(wired_load_balancer_lookup(instance->wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
if(wired_load_balancer_lookup(instance->param->minio.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += num;
@@ -586,7 +586,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
return NULL;
}
construct_multiple_delete_xml(ctx->instance->bucketname, objlist, num, instance->hash_object_key, &ctx->response.buff, &ctx->response.size);
construct_multiple_delete_xml(instance->param->bucketname, objlist, num, instance->param->hash_object_key, &ctx->response.buff, &ctx->response.size);
caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48);
sprintf(content_md5, "Content-MD5: %s", md5);
ctx->headers = curl_slist_append(ctx->headers, content_md5);
@@ -750,94 +750,119 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
return 0; //0-success; -1-error
}
static int wired_load_balancer_init(struct tango_cache_instance *instance)
static int wired_load_balancer_init(const char *topic, const char *datacenter, int override, struct wiredlb_parameter *wparam, void *runtime_log)
{
instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER);
if(instance->wiredlb == NULL)
wparam->wiredlb = wiredLB_create(topic, wparam->wiredlb_group, WLB_PRODUCER);
if(wparam->wiredlb == NULL)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
return -1;
}
wiredLB_set_opt(instance->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &instance->wiredlb_ha_port, sizeof(instance->wiredlb_ha_port));
wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override));
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1);
if(instance->wiredlb_override)
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port));
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &override, sizeof(override));
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, datacenter, strlen(datacenter)+1);
if(override)
{
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1);
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port));
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1);
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port));
}
if(wiredLB_init(instance->wiredlb) < 0)
if(wiredLB_init(wparam->wiredlb) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n");
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_init group %s failed.\n", wparam->wiredlb_group);
return -1;
}
return 0;
}
static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section)
struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log)
{
u_int32_t intval;
u_int64_t longval;
struct tango_cache_parameter *param;
param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter));
//multi curl
MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1);
instance->max_cnn_host = intval;
param->max_cnn_host = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20);
instance->max_pipeline_num = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &instance->max_session_num, 200);
param->max_pipeline_num = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15);
param->transfer_timeout = intval;
//instance
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &param->max_session_num, 200);
MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120);
longval = intval;
instance->cache_limit_size = longval * 1024 * 1024;
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15);
instance->transfer_timeout = intval;
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256) < 0)
param->cache_limit_size = longval * 1024 * 1024;
MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &param->hash_object_key, 1);
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
return -1;
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
return NULL;
}
MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", &instance->head_meta_source, HEAD_META_FROM_MINIO);
if(instance->head_meta_source == HEAD_META_FROM_REDIS)
MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &param->upload_block_size, 5242880);
if(param->upload_block_size < 5242880)
{
MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", instance->redis_key, 256, instance->bucketname);
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IP", instance->redis_ip, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IP not found.\n", profile_path, section);
return -1;
}
MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", &instance->redis_port, 6379);
}
MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &instance->hash_object_key, 1);
MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &instance->minio_port, 9000);
if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", instance->minio_iplist, 4096) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
return -1;
}
MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880);
if(instance->upload_block_size < 5242880)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
return -1;
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
return NULL;
}
MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999);
if(intval < 60)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
return -1;
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
return NULL;
}
instance->relative_ttl = intval;
param->relative_ttl = intval;
//Wired_LB<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", instance->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
instance->wiredlb_ha_port = (u_int16_t)intval;
return 0;
//wiredlb common
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->wiredlb_override, 1);
//wiredlb minio
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_MINIO_HEALTH_PORT", &intval, 52100);
param->minio.wiredlb_ha_port = intval;
MESA_load_profile_string_def(profile_path, section, "WIREDLB_MINIO_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP");
MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &param->minio.port, 9000);
if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
return NULL;
}
if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, &param->minio, runtime_log))
{
return NULL;
}
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog)
//wiredlb redis
MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", &param->head_meta_source, HEAD_META_FROM_MINIO);
if(param->head_meta_source == HEAD_META_FROM_REDIS)
{
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_REDIS_HEALTH_PORT", &intval, 0);
param->redis.wiredlb_ha_port = intval;
MESA_load_profile_string_def(profile_path, section, "WIREDLB_REDIS_GROUP", param->redis.wiredlb_group, 64, "REDIS_GROUP");
MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", param->redis_key, 256, param->bucketname);
MESA_load_profile_uint_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", &param->redis.port, 6379);
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IPLIST", param->redis.iplist, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IPLIST not found.\n", profile_path, section);
return NULL;
}
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_MAIN_REDIS_IP", param->redis.mainip, 64) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_MAIN_REDIS_IP not found.\n", profile_path, section);
return NULL;
}
if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, &param->redis, runtime_log))
{
return NULL;
}
}
return param;
}
struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog)
{
struct tango_cache_instance *instance;
@@ -845,39 +870,26 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,
memset(instance, 0, sizeof(struct tango_cache_instance));
instance->runtime_log = runtimelog;
instance->evbase = evbase;
if(load_local_configure(instance, profile_path, section))
{
free(instance);
return NULL;
}
if(wired_load_balancer_init(instance))
{
free(instance);
return NULL;
}
instance->param = param;
instance->multi_hd = curl_multi_init();
curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, instance->max_pipeline_num);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->max_cnn_host);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->max_pipeline_num);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance);
if(instance->head_meta_source == HEAD_META_FROM_REDIS)
if(param->head_meta_source == HEAD_META_FROM_REDIS)
{
if(redis_asyn_connect_init(instance))
if(redis_asyn_connect_init(instance, instance->param->redis.mainip))
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.", instance->redis_ip, instance->redis_port);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.",
instance->current_redisip, instance->param->redis.port);
free(instance);
return NULL;
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u success.", instance->redis_ip, instance->redis_port);
}
}
evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
return instance;

View File

@@ -50,39 +50,55 @@ struct easy_string
size_t size;
};
struct tango_cache_instance
struct wiredlb_parameter
{
char minio_iplist[4096];
char bucketname[256];
char wiredlb_topic[64];
char wiredlb_group[64];
char wiredlb_datacenter[64];
u_int32_t minio_port;
u_int32_t wiredlb_override;
u_int16_t wiredlb_ha_port;
u_int32_t hash_object_key;
struct event_base* evbase;
struct event timer_event;
struct cache_statistics statistic;
CURLM *multi_hd;
void *runtime_log;
char mainip[64]; //Ĭ<>Ϸ<EFBFBD><CFB7>ʵ<EFBFBD>redis<69><73>ַ
char iplist[4096];//minio: minio<69>б<EFBFBD><D0B1><EFBFBD>redis: mainip<69><70><EFBFBD>˺󣬿<CBBA>ѡ<EFBFBD><D1A1><EFBFBD>б<EFBFBD><D0B1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>mainip
u_int32_t port;
short wiredlb_ha_port;
WLB_handle_t wiredlb;
time_t relative_ttl; //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ч<EFBFBD><D0A7>
u_int64_t cache_limit_size;
};
struct tango_cache_parameter
{
char bucketname[256];
char redis_key[256];
long max_cnn_host;
long transfer_timeout;//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
long max_pipeline_num;
u_int64_t cache_limit_size;
u_int32_t max_session_num;
u_int32_t upload_block_size; //minio<69>ֶ<EFBFBD><D6B6>ϴ<EFBFBD><CFB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С<EFBFBD><D0A1><EFBFBD><EFBFBD>
time_t relative_ttl; //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ч<EFBFBD><D0A7>
u_int32_t hash_object_key;
//wiredlb
int head_meta_source; //<2F><><EFBFBD>Դ<EFBFBD>MINIO<49><4F>REDIS<49><53>ȡԪ<C8A1><D4AA>Ϣ
u_int32_t wiredlb_override;
char wiredlb_topic[64];
char wiredlb_datacenter[64];
struct wiredlb_parameter minio;
struct wiredlb_parameter redis;
};
struct tango_cache_instance
{
struct event_base* evbase;
struct event timer_event;
CURLM *multi_hd;
enum CACHE_ERR_CODE error_code;
int head_meta_source; //<2F><><EFBFBD>Դ<EFBFBD>MINIO<49><4F>REDIS<49><53>ȡԪ<C8A1><D4AA>Ϣ
//Ԫ<><D4AA>Ϣ<EFBFBD><CFA2>ȡ<EFBFBD><C8A1>ʽRedis
redisAsyncContext *redis_ac;
char redis_key[256];
char redis_ip[128];
int redis_port;
int redis_connecting;
redisAsyncContext *redis_ac;
char current_redisip[64];
struct event timer_redis;
const struct tango_cache_parameter *param;
void *runtime_log;
struct cache_statistics statistic;
};
struct multipart_etag_list

View File

@@ -39,19 +39,82 @@ struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]=
{"content-md5", "Content-MD5: "}
};
//һ<><D2BB>mainip<69><70><EFBFBD>ӳɹ<D3B3><C9B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD>л<EFBFBD><D0BB><EFBFBD><EFBFBD><EFBFBD>
static void main_redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
if(status == REDIS_OK)
{
evtimer_del(&instance->timer_redis);
if(instance->redis_connecting == CACHE_REDIS_CONNECTED)
{
redisAsyncDisconnect(instance->redis_ac);
}
sprintf(instance->current_redisip, "%s", instance->param->redis.mainip);
instance->redis_ac = (struct redisAsyncContext *)ac;
instance->redis_connecting = CACHE_REDIS_CONNECTED;
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.",
instance->param->redis.mainip, instance->param->redis.port);
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.",
instance->param->redis.mainip, instance->param->redis.port, ac->errstr);
}
}
static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
if(status == REDIS_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", instance->redis_ip, instance->redis_port);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.",
instance->current_redisip, instance->param->redis.port);
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.",
instance->current_redisip, instance->param->redis.port, ac->errstr);
}
instance->redis_connecting = CACHE_REDIS_DISCONNECTED;
if(!strcmp(instance->current_redisip, instance->param->redis.mainip))
{
main_redis_check_timer_start(instance);
}
}
void main_redis_check_timer_cb(evutil_socket_t fd, short what, void *arg)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)arg;
redisAsyncContext *redis_ac;
struct timeval tv;
redis_ac = redisAsyncConnect(instance->param->redis.mainip, instance->param->redis.port);
if(redis_ac == NULL)
{
return ;
}
redisLibeventAttach(redis_ac, instance->evbase);
redisAsyncSetConnectionData(redis_ac, instance);
redisAsyncSetConnectCallback(redis_ac, main_redis_asyn_connect_cb);
redisAsyncSetDisconnectCallback(redis_ac, redis_asyn_disconnect_cb);
tv.tv_sec = 60;
tv.tv_usec = 0;
evtimer_add(&instance->timer_redis, &tv);
}
void main_redis_check_timer_start(struct tango_cache_instance *instance)
{
struct timeval tv;
tv.tv_sec = 60;
tv.tv_usec = 0;
evtimer_assign(&instance->timer_redis, instance->evbase, main_redis_check_timer_cb, instance);
evtimer_add(&instance->timer_redis, &tv);
}
static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
@@ -60,19 +123,27 @@ static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status
if(status == REDIS_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", instance->redis_ip, instance->redis_port);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.",
instance->current_redisip, instance->param->redis.port);
instance->redis_connecting = CACHE_REDIS_CONNECTED;
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr);
instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE;
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.",
instance->current_redisip, instance->param->redis.port, ac->errstr);
if(!strcmp(instance->current_redisip, instance->param->redis.mainip))
{
main_redis_check_timer_start(instance);
}
}
}
int redis_asyn_connect_init(struct tango_cache_instance *instance)
int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip)
{
instance->redis_ac = redisAsyncConnect(instance->redis_ip, instance->redis_port);
sprintf(instance->current_redisip, "%s", redisip); //mainip<69>õ<EFBFBD>ʱ<EFBFBD><CAB1>ʹ<EFBFBD><CAB9>mainip
instance->redis_ac = redisAsyncConnect(instance->current_redisip, instance->param->redis.port);
if(instance->redis_ac == NULL)
{
return -1;
@@ -85,6 +156,29 @@ int redis_asyn_connect_init(struct tango_cache_instance *instance)
return 0;
}
int wiredlb_redis_asyn_connect(struct tango_cache_instance *instance)
{
struct WLB_consumer_t cons_array[64];
int i, cons_num;
cons_num = wiredLB_list(instance->param->redis.wiredlb, 64, cons_array);
for(i=0; i<cons_num; i++)
{
if(strcmp(instance->param->redis.mainip, cons_array[i].ip_addr))
{
if(0==redis_asyn_connect_init(instance, cons_array[i].ip_addr))
{
break;
}
}
}
if(i == cons_num)
{
return -1;
}
return 0;
}
static int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent)
{
cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires;
@@ -225,22 +319,23 @@ int tango_cache_head_redis(struct tango_cache_ctx *ctx)
{
case CACHE_REDIS_CONNECTED:
ret = redisAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HGET %s %s/%s",
ctx->instance->redis_key, ctx->instance->bucketname, ctx->object_key);
ctx->instance->param->redis_key, ctx->instance->param->bucketname, ctx->object_key);
if(ret != REDIS_OK)
{
//redisAsyncDisconnect(ctx->instance->redis_ac);
redis_asyn_connect_init(ctx->instance);
ctx->instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE;
if(!strcmp(ctx->instance->current_redisip, ctx->instance->param->redis.mainip))
{
main_redis_check_timer_start(ctx->instance);
}
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
}
break;
case CACHE_REDIS_DISCONNECTED:
case CACHE_REDIS_CONNECT_IDLE:
redis_asyn_connect_init(ctx->instance);
wiredlb_redis_asyn_connect(ctx->instance);
case CACHE_REDIS_CONNECTING:
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;

View File

@@ -7,7 +7,8 @@
#include "tango_cache_client_in.h"
int tango_cache_head_redis(struct tango_cache_ctx *ctx);
int redis_asyn_connect_init(struct tango_cache_instance *instance);
int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip);
void main_redis_check_timer_start(struct tango_cache_instance *instance);
#endif

View File

@@ -123,11 +123,11 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
ctx->put.upload_offset = 0;
if(full)
{
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
}
else
{
snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
}
@@ -141,7 +141,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -184,7 +184,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //Ĭ<><C4AC>ʹ<EFBFBD>ûص<C3BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>fread<61><64><EFBFBD><EFBFBD><EFBFBD>Է<EFBFBD><D4B7>ֹر<D6B9>Expectʱ<74><EFBFBD>¿<EFBFBD><C2BF><EFBFBD>curl_multi_socket_action
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
@@ -193,7 +193,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -214,13 +214,13 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back)
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -238,13 +238,13 @@ bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx)
return false;
}
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID);
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -264,7 +264,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
}
construct_complete_xml(ctx, &ctx->put.combine_xml, &len);
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID);
snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ctx->put.uploadID);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
@@ -273,7 +273,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //<2F><><EFBFBD><EFBFBD>Content-Length
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
if(ctx->headers != NULL)
{
@@ -422,7 +422,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
if(upload_length >= ctx->instance->upload_block_size)
if(upload_length >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
@@ -453,7 +453,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r
else
{
size_t upload_length = evbuffer_get_length(ctx->put.evbuf);
if(upload_length >= ctx->instance->upload_block_size)
if(upload_length >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, upload_length);
}
@@ -494,13 +494,13 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM
}
ctx->put.state = PUT_STATE_END;
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
if(way == PUT_MEM_COPY)
{
@@ -608,7 +608,7 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback)
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->bucketname);
snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname);
curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //<2F><><EFBFBD><EFBFBD>Content-Length<74><68><EFBFBD><EFBFBD>CURLOPT_COPYPOSTFIELDS֮ǰ<D6AE><C7B0><EFBFBD><EFBFBD>
curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff);
@@ -617,7 +617,7 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback)
curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb);
curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);
@@ -847,7 +847,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
return -1;
}
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url);
if(ctx->method == CACHE_REQUEST_HEAD)
{
@@ -858,7 +858,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx)
curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb);
curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx);
curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error);
curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error);
rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl);
assert(rc==CURLM_OK);

View File

@@ -191,6 +191,7 @@ int main(int argc, char **argv)
struct future_pdata *pdata;
struct cache_evbase_ctx *ctx;
void *runtime_log;
struct tango_cache_parameter *parameter;
if(argc != 2 && argc!=3)
{
@@ -209,7 +210,9 @@ int main(int argc, char **argv)
}
cache_evbase_global_init();
instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
assert(parameter != NULL);
instance_asyn = cache_evbase_instance_new(parameter, runtime_log);
assert(instance_asyn!=NULL);
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));

View File

@@ -280,6 +280,7 @@ int main(int argc, char **argv)
pthread_t thread_tid;
pthread_attr_t attr;
struct pthread_data pdata[20];
struct tango_cache_parameter *parameter;
if(argc!=3)
{
@@ -294,7 +295,9 @@ int main(int argc, char **argv)
}
cache_evbase_global_init();
instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
assert(parameter != NULL);
instance_asyn = cache_evbase_instance_new(parameter, runtime_log);
assert(instance_asyn!=NULL);
pthread_attr_init(&attr);

View File

@@ -1,12 +1,15 @@
[TANGO_CACHE]
#MINIO IP地址,目前只支持一个
#MINIO IP地址列表WiredLB格式
MINIO_IP_LIST=192.168.10.61-64;
MINIO_LISTEN_PORT=9000
#每个域名最多开启的链接数
MAX_CONNECTION_PER_HOST=10
#MAX_CONNECTION_PER_HOST=1
MAX_CNNT_PIPELINE_NUM=20
#MAX_CURL_SESSION_NUM=100
MAX_CURL_TRANSFER_TIMEOUT_S=15
#bucket的名称
CACHE_BUCKET_NAME=images
CACHE_BUCKET_NAME=openbucket
#缓存最大占用的内存空间大小,超出空间时上传失败
MAX_USED_MEMORY_SIZE_MB=5120
#上传时Expires头部的过期时间单位秒最小601分钟
@@ -15,16 +18,21 @@ CACHE_DEFAULT_TTL_SECOND=3600
CACHE_OBJECT_KEY_HASH_SWITCH=1
#HEAD元信息的来源1-MINIO2-REDIS
CACHE_HEAD_FROM_SOURCE=1
CACHE_HEAD_FROM_SOURCE=2
#使用Redis作为元信息获取源
CACHE_HEAD_REDIS_KEY=MINIO_EVENTS_INFO
CACHE_HEAD_REDIS_IP=192.168.10.63
#主要的Redis IP地址优先使用
CACHE_HEAD_MAIN_REDIS_IP=192.168.10.63
#只有在主Redis挂掉时从下述列表选择一个连接WiredLB格式。
CACHE_HEAD_REDIS_IPLIST=192.168.10.62-63;
CACHE_HEAD_REDIS_PORT=6379
#WIRED LOAD BALANCER配置
#WIREDLB_OVERRIDE=1
#WIREDLB_TOPIC=
#WIREDLB_GROUP=
#WIREDLB_DATACENTER=
#WIREDLB_HEALTH_PORT=52100
WIREDLB_MINIO_HEALTH_PORT=52100
#WIREDLB_MINIO_GROUP=
WIREDLB_REDIS_HEALTH_PORT=52101
#WIREDLB_REDIS_GROUP=

View File

@@ -398,6 +398,7 @@ int main(int crgc, char **arg)
struct event ev_timer;
struct timeval tv;
void *runtime_log;
struct tango_cache_parameter *parameter;
runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10);
if(NULL==runtime_log)
@@ -414,7 +415,9 @@ int main(int crgc, char **arg)
init_fifo();
tango_cache_global_init();
tango_instance = tango_cache_instance_new(ev_base, "./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
parameter = tango_cache_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
assert(parameter != NULL);
tango_instance = tango_cache_instance_new(parameter, ev_base, runtime_log);
tv.tv_sec = 10;
tv.tv_usec = 0;