diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index 529a805..8b910c9 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -28,8 +28,11 @@ void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, s void cache_evbase_global_init(void); +//每个minio集群和bucket创建一个parameter +struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog); + /*创建实例,每线程一个,或使用时加锁*/ -struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog); +struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog); //GET接口,成功返回0,失败返回-1,future回调函数会在另外的线程中执行,下同 diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index df60254..2473916 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -95,6 +95,7 @@ struct tango_cache_meta_put struct response_freshness put; }; +struct tango_cache_parameter; struct tango_cache_instance; struct tango_cache_ctx; @@ -105,9 +106,11 @@ void tango_cache_get_statistics(const struct tango_cache_instance *instance, str /*每个进程执行一次初始化*/ void tango_cache_global_init(void); +//每个minio集群和bucket创建一个parameter +struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtimelog); /*以下所有API线程不安全*/ //每个监听线程创建一个instance -struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog); +struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog); /* GET接口的API*/ diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp index 5383384..a173832 100644 --- a/cache/src/cache_evbase_client.cpp +++ b/cache/src/cache_evbase_client.cpp @@ -168,7 +168,7 @@ static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) break; case CACHE_ASYN_HEAD: f = ctx_asyn->ctx->future; - if(ctx_asyn->instance_asyn->instance->head_meta_source == HEAD_META_FROM_REDIS) + if(ctx_asyn->instance_asyn->instance->param->head_meta_source == HEAD_META_FROM_REDIS) { ret = tango_cache_head_redis(ctx_asyn->ctx); } @@ -395,7 +395,7 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct } if(path != NULL) { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key); + snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key); } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); @@ -442,7 +442,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc } if(path != NULL) { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key); + snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->param->bucketname, ctx->object_key); } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); @@ -555,7 +555,12 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu return 0; } -struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog) +struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog) +{ + return tango_cache_parameter_new(profile_path, section, runtimelog); +} + +struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog) { evutil_socket_t notification_fd[2]; struct cache_evbase_instance *instance_asyn; @@ -576,7 +581,7 @@ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path instance_asyn->evbase = evbase; instance_asyn->notify_readfd = notification_fd[0]; instance_asyn->notify_sendfd = notification_fd[1]; - instance_asyn->instance = tango_cache_instance_new(evbase, profile_path, section, runtimelog); + instance_asyn->instance = tango_cache_instance_new(param, evbase, runtimelog); if(instance_asyn->instance == NULL) { free(instance_asyn); diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index c87e885..7f67c6c 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -123,7 +123,7 @@ struct tango_cache_result *tango_cache_read_result(future_result_t *promise_resu void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize) { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); + snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); } static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic) @@ -269,9 +269,9 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, return 0; } ctx->instance->statistic.memory_used += size; - if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size) + if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) { - cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); + cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); } return 0; } @@ -303,9 +303,9 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP } } ctx->instance->statistic.memory_used += size; - if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size) + if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size) { - cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size); + cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size); } return 0; } @@ -316,7 +316,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * char buffer[2064]; time_t expires, now, last_modify; - if((u_int64_t)instance->statistic.memory_used>=instance->cache_limit_size || instance->statistic.session_num>=instance->max_session_num) + if((u_int64_t)instance->statistic.memory_used>=instance->param->cache_limit_size || instance->statistic.session_num>=instance->param->max_session_num) { instance->error_code = CACHE_OUTOF_MEMORY; instance->statistic.totaldrop_num += 1; @@ -328,7 +328,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * ctx->future = f; ctx->method = CACHE_REQUEST_PUT; - if(instance->hash_object_key) + if(instance->param->hash_object_key) { caculate_sha256(meta->url, strlen(meta->url), buffer, 72); snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", buffer[0], buffer[1], buffer[2], buffer[3], buffer+4); @@ -340,7 +340,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { snprintf(ctx->object_key, 256, "%s", meta->url); } - if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) + if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; @@ -350,7 +350,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * //Expires字段,用于缓存内部判定缓存是否超时 now = time(NULL); - expires = (meta->put.timeout==0||meta->put.timeout>instance->relative_ttl)?instance->relative_ttl:meta->put.timeout; + expires = (meta->put.timeout==0||meta->put.timeout>instance->param->relative_ttl)?instance->param->relative_ttl:meta->put.timeout; if(expires_timestamp2hdr_str(now + expires, buffer, 256)) { ctx->headers = curl_slist_append(ctx->headers, buffer); @@ -417,7 +417,7 @@ int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct f } if(path != NULL) { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key); + snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key); } return tango_cache_upload_once_start_data(ctx, way, data, size); @@ -435,7 +435,7 @@ int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct } if(path != NULL) { - snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key); + snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key); } return tango_cache_upload_once_start_evbuf(ctx, way, evbuf); @@ -446,7 +446,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i struct tango_cache_ctx *ctx; char sha256[72]; - if(instance->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->max_session_num) + if(instance->param->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->param->max_session_num) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -461,7 +461,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i ctx->get.max_age = meta->get.max_age; ctx->get.min_fresh = meta->get.min_fresh; - if(instance->hash_object_key) + if(instance->param->hash_object_key) { caculate_sha256(meta->url, strlen(meta->url), sha256, 72); snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); @@ -470,7 +470,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i { snprintf(ctx->object_key, 256, "%s", meta->url); } - if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) + if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; @@ -502,7 +502,7 @@ int tango_cache_head_object(struct tango_cache_instance *instance, struct future return -1; } - if(ctx->instance->head_meta_source == HEAD_META_FROM_REDIS) + if(instance->param->head_meta_source == HEAD_META_FROM_REDIS) { return tango_cache_head_redis(ctx); } @@ -517,7 +517,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * struct tango_cache_ctx *ctx; char sha256[72]; - if(instance->statistic.session_num >= instance->max_session_num) + if(instance->statistic.session_num >= instance->param->max_session_num) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -529,7 +529,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * ctx->future = f; ctx->method = CACHE_REQUEST_DELETE; - if(instance->hash_object_key) + if(instance->param->hash_object_key) { caculate_sha256(objkey, strlen(objkey), sha256, 72); snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4); @@ -538,7 +538,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * { snprintf(ctx->object_key, 256, "%s", objkey); } - if(wired_load_balancer_lookup(instance->wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) + if(wired_load_balancer_lookup(instance->param->minio.wiredlb, ctx->object_key, strlen(ctx->object_key), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += 1; @@ -565,7 +565,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst struct tango_cache_ctx *ctx; char md5[48]={0}, content_md5[48]; - if(instance->statistic.session_num >= instance->max_session_num) + if(instance->statistic.session_num >= instance->param->max_session_num) { instance->error_code = CACHE_OUTOF_SESSION; instance->statistic.totaldrop_num += 1; @@ -578,7 +578,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst ctx->method = CACHE_REQUEST_DELETE_MUL; ctx->del.succ_num = num; - if(wired_load_balancer_lookup(instance->wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) + if(wired_load_balancer_lookup(instance->param->minio.wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; instance->statistic.totaldrop_num += num; @@ -586,7 +586,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst return NULL; } - construct_multiple_delete_xml(ctx->instance->bucketname, objlist, num, instance->hash_object_key, &ctx->response.buff, &ctx->response.size); + construct_multiple_delete_xml(instance->param->bucketname, objlist, num, instance->param->hash_object_key, &ctx->response.buff, &ctx->response.size); caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48); sprintf(content_md5, "Content-MD5: %s", md5); ctx->headers = curl_slist_append(ctx->headers, content_md5); @@ -750,94 +750,119 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) return 0; //0-success; -1-error } -static int wired_load_balancer_init(struct tango_cache_instance *instance) +static int wired_load_balancer_init(const char *topic, const char *datacenter, int override, struct wiredlb_parameter *wparam, void *runtime_log) { - instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER); - if(instance->wiredlb == NULL) + wparam->wiredlb = wiredLB_create(topic, wparam->wiredlb_group, WLB_PRODUCER); + if(wparam->wiredlb == NULL) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); return -1; } - wiredLB_set_opt(instance->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &instance->wiredlb_ha_port, sizeof(instance->wiredlb_ha_port)); - wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override)); - wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1); - if(instance->wiredlb_override) + wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port)); + wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &override, sizeof(override)); + wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, datacenter, strlen(datacenter)+1); + if(override) { - wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1); - wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port)); + wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1); + wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port)); } - if(wiredLB_init(instance->wiredlb) < 0) + if(wiredLB_init(wparam->wiredlb) < 0) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n"); + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_init group %s failed.\n", wparam->wiredlb_group); return -1; } return 0; } -static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section) +struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log) { u_int32_t intval; u_int64_t longval; + struct tango_cache_parameter *param; + param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter)); + + //multi curl MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1); - instance->max_cnn_host = intval; + param->max_cnn_host = intval; MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20); - instance->max_pipeline_num = intval; - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &instance->max_session_num, 200); + param->max_pipeline_num = intval; + MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15); + param->transfer_timeout = intval; + + //instance + MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", ¶m->max_session_num, 200); MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120); longval = intval; - instance->cache_limit_size = longval * 1024 * 1024; - MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 15); - instance->transfer_timeout = intval; - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256) < 0) + param->cache_limit_size = longval * 1024 * 1024; + MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", ¶m->hash_object_key, 1); + if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section); - return -1; + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section); + return NULL; } - - MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", &instance->head_meta_source, HEAD_META_FROM_MINIO); - if(instance->head_meta_source == HEAD_META_FROM_REDIS) + MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", ¶m->upload_block_size, 5242880); + if(param->upload_block_size < 5242880) { - MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", instance->redis_key, 256, instance->bucketname); - if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IP", instance->redis_ip, 256) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IP not found.\n", profile_path, section); - return -1; - } - MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", &instance->redis_port, 6379); - } - MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &instance->hash_object_key, 1); - MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &instance->minio_port, 9000); - if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", instance->minio_iplist, 4096) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section); - return -1; - } - MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880); - if(instance->upload_block_size < 5242880) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); - return -1; + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section); + return NULL; } MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999); if(intval < 60) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); - return -1; + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section); + return NULL; } - instance->relative_ttl = intval; + param->relative_ttl = intval; - //Wired_LB参数 - MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", instance->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); - MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN"); - MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA"); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1); - MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); - instance->wiredlb_ha_port = (u_int16_t)intval; - return 0; + //wiredlb common + MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); + MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->wiredlb_datacenter, 64, "ASTANA"); + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->wiredlb_override, 1); + + //wiredlb minio + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_MINIO_HEALTH_PORT", &intval, 52100); + param->minio.wiredlb_ha_port = intval; + MESA_load_profile_string_def(profile_path, section, "WIREDLB_MINIO_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP"); + MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", ¶m->minio.port, 9000); + if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section); + return NULL; + } + if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, ¶m->minio, runtime_log)) + { + return NULL; + } + + //wiredlb redis + MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", ¶m->head_meta_source, HEAD_META_FROM_MINIO); + if(param->head_meta_source == HEAD_META_FROM_REDIS) + { + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_REDIS_HEALTH_PORT", &intval, 0); + param->redis.wiredlb_ha_port = intval; + MESA_load_profile_string_def(profile_path, section, "WIREDLB_REDIS_GROUP", param->redis.wiredlb_group, 64, "REDIS_GROUP"); + MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", param->redis_key, 256, param->bucketname); + MESA_load_profile_uint_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", ¶m->redis.port, 6379); + if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IPLIST", param->redis.iplist, 256) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IPLIST not found.\n", profile_path, section); + return NULL; + } + if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_MAIN_REDIS_IP", param->redis.mainip, 64) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_MAIN_REDIS_IP not found.\n", profile_path, section); + return NULL; + } + if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, ¶m->redis, runtime_log)) + { + return NULL; + } + } + return param; } -struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog) +struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog) { struct tango_cache_instance *instance; @@ -845,39 +870,26 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase, memset(instance, 0, sizeof(struct tango_cache_instance)); instance->runtime_log = runtimelog; instance->evbase = evbase; + instance->param = param; - if(load_local_configure(instance, profile_path, section)) - { - free(instance); - return NULL; - } - if(wired_load_balancer_init(instance)) - { - free(instance); - return NULL; - } - instance->multi_hd = curl_multi_init(); curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host); - curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, instance->max_pipeline_num); + curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->max_cnn_host); + curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->max_pipeline_num); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance); - if(instance->head_meta_source == HEAD_META_FROM_REDIS) + if(param->head_meta_source == HEAD_META_FROM_REDIS) { - if(redis_asyn_connect_init(instance)) + if(redis_asyn_connect_init(instance, instance->param->redis.mainip)) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.", instance->redis_ip, instance->redis_port); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.", + instance->current_redisip, instance->param->redis.port); free(instance); return NULL; } - else - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u success.", instance->redis_ip, instance->redis_port); - } } evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance); return instance; diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index d795b15..ec8b4f5 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -50,39 +50,55 @@ struct easy_string size_t size; }; -struct tango_cache_instance +struct wiredlb_parameter { - char minio_iplist[4096]; - char bucketname[256]; - char wiredlb_topic[64]; char wiredlb_group[64]; - char wiredlb_datacenter[64]; - u_int32_t minio_port; - u_int32_t wiredlb_override; - u_int16_t wiredlb_ha_port; - u_int32_t hash_object_key; - struct event_base* evbase; - struct event timer_event; - struct cache_statistics statistic; - CURLM *multi_hd; - void *runtime_log; + char mainip[64]; //默认访问的redis地址 + char iplist[4096];//minio: minio列表;redis: mainip挂了后,可选的列表,包含mainip + u_int32_t port; + short wiredlb_ha_port; WLB_handle_t wiredlb; - time_t relative_ttl; //缓存的相对有效期 - u_int64_t cache_limit_size; +}; + +struct tango_cache_parameter +{ + char bucketname[256]; + char redis_key[256]; long max_cnn_host; long transfer_timeout;//传输总时间限制 long max_pipeline_num; + u_int64_t cache_limit_size; u_int32_t max_session_num; u_int32_t upload_block_size; //minio分段上传块的最小长度 + time_t relative_ttl; //缓存的相对有效期 + u_int32_t hash_object_key; + + //wiredlb + int head_meta_source; //可以从MINIO或REDIS获取元信息 + u_int32_t wiredlb_override; + char wiredlb_topic[64]; + char wiredlb_datacenter[64]; + + struct wiredlb_parameter minio; + struct wiredlb_parameter redis; +}; + +struct tango_cache_instance +{ + struct event_base* evbase; + struct event timer_event; + CURLM *multi_hd; enum CACHE_ERR_CODE error_code; - int head_meta_source; //可以从MINIO或REDIS获取元信息 //元信息获取方式Redis - redisAsyncContext *redis_ac; - char redis_key[256]; - char redis_ip[128]; - int redis_port; int redis_connecting; + redisAsyncContext *redis_ac; + char current_redisip[64]; + struct event timer_redis; + + const struct tango_cache_parameter *param; + void *runtime_log; + struct cache_statistics statistic; }; struct multipart_etag_list diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp index 1d0b583..5c771fc 100644 --- a/cache/src/tango_cache_redis.cpp +++ b/cache/src/tango_cache_redis.cpp @@ -39,19 +39,82 @@ struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]= {"content-md5", "Content-MD5: "} }; +//一旦mainip连接成功,就切换回来 +static void main_redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) +{ + struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); + + if(status == REDIS_OK) + { + evtimer_del(&instance->timer_redis); + if(instance->redis_connecting == CACHE_REDIS_CONNECTED) + { + redisAsyncDisconnect(instance->redis_ac); + } + sprintf(instance->current_redisip, "%s", instance->param->redis.mainip); + instance->redis_ac = (struct redisAsyncContext *)ac; + instance->redis_connecting = CACHE_REDIS_CONNECTED; + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", + instance->param->redis.mainip, instance->param->redis.port); + } + else + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", + instance->param->redis.mainip, instance->param->redis.port, ac->errstr); + } +} + static void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) { struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); if(status == REDIS_OK) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", instance->redis_ip, instance->redis_port); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", + instance->current_redisip, instance->param->redis.port); } else { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", + instance->current_redisip, instance->param->redis.port, ac->errstr); } instance->redis_connecting = CACHE_REDIS_DISCONNECTED; + + if(!strcmp(instance->current_redisip, instance->param->redis.mainip)) + { + main_redis_check_timer_start(instance); + } +} + +void main_redis_check_timer_cb(evutil_socket_t fd, short what, void *arg) +{ + struct tango_cache_instance *instance = (struct tango_cache_instance *)arg; + redisAsyncContext *redis_ac; + struct timeval tv; + + redis_ac = redisAsyncConnect(instance->param->redis.mainip, instance->param->redis.port); + if(redis_ac == NULL) + { + return ; + } + redisLibeventAttach(redis_ac, instance->evbase); + redisAsyncSetConnectionData(redis_ac, instance); + redisAsyncSetConnectCallback(redis_ac, main_redis_asyn_connect_cb); + redisAsyncSetDisconnectCallback(redis_ac, redis_asyn_disconnect_cb); + + tv.tv_sec = 60; + tv.tv_usec = 0; + evtimer_add(&instance->timer_redis, &tv); +} + +void main_redis_check_timer_start(struct tango_cache_instance *instance) +{ + struct timeval tv; + + tv.tv_sec = 60; + tv.tv_usec = 0; + evtimer_assign(&instance->timer_redis, instance->evbase, main_redis_check_timer_cb, instance); + evtimer_add(&instance->timer_redis, &tv); } static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) @@ -60,19 +123,27 @@ static void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status if(status == REDIS_OK) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", instance->redis_ip, instance->redis_port); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", + instance->current_redisip, instance->param->redis.port); instance->redis_connecting = CACHE_REDIS_CONNECTED; } else { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr); instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", + instance->current_redisip, instance->param->redis.port, ac->errstr); + if(!strcmp(instance->current_redisip, instance->param->redis.mainip)) + { + main_redis_check_timer_start(instance); + } } } -int redis_asyn_connect_init(struct tango_cache_instance *instance) +int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip) { - instance->redis_ac = redisAsyncConnect(instance->redis_ip, instance->redis_port); + sprintf(instance->current_redisip, "%s", redisip); //mainip好的时候使用mainip + + instance->redis_ac = redisAsyncConnect(instance->current_redisip, instance->param->redis.port); if(instance->redis_ac == NULL) { return -1; @@ -85,6 +156,29 @@ int redis_asyn_connect_init(struct tango_cache_instance *instance) return 0; } +int wiredlb_redis_asyn_connect(struct tango_cache_instance *instance) +{ + struct WLB_consumer_t cons_array[64]; + int i, cons_num; + + cons_num = wiredLB_list(instance->param->redis.wiredlb, 64, cons_array); + for(i=0; iparam->redis.mainip, cons_array[i].ip_addr)) + { + if(0==redis_asyn_connect_init(instance, cons_array[i].ip_addr)) + { + break; + } + } + } + if(i == cons_num) + { + return -1; + } + return 0; +} + static int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent) { cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires; @@ -225,22 +319,23 @@ int tango_cache_head_redis(struct tango_cache_ctx *ctx) { case CACHE_REDIS_CONNECTED: ret = redisAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HGET %s %s/%s", - ctx->instance->redis_key, ctx->instance->bucketname, ctx->object_key); + ctx->instance->param->redis_key, ctx->instance->param->bucketname, ctx->object_key); if(ret != REDIS_OK) { - //redisAsyncDisconnect(ctx->instance->redis_ac); - redis_asyn_connect_init(ctx->instance); + ctx->instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; + if(!strcmp(ctx->instance->current_redisip, ctx->instance->param->redis.mainip)) + { + main_redis_check_timer_start(ctx->instance); + } tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); tango_cache_ctx_destroy(ctx); } break; case CACHE_REDIS_DISCONNECTED: case CACHE_REDIS_CONNECT_IDLE: - redis_asyn_connect_init(ctx->instance); + wiredlb_redis_asyn_connect(ctx->instance); case CACHE_REDIS_CONNECTING: tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); tango_cache_ctx_destroy(ctx); break; default: assert(0);break; diff --git a/cache/src/tango_cache_redis.h b/cache/src/tango_cache_redis.h index 8126025..7edf119 100644 --- a/cache/src/tango_cache_redis.h +++ b/cache/src/tango_cache_redis.h @@ -7,7 +7,8 @@ #include "tango_cache_client_in.h" int tango_cache_head_redis(struct tango_cache_ctx *ctx); -int redis_asyn_connect_init(struct tango_cache_instance *instance); +int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip); +void main_redis_check_timer_start(struct tango_cache_instance *instance); #endif diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp index c404e77..ad97087 100644 --- a/cache/src/tango_cache_transfer.cpp +++ b/cache/src/tango_cache_transfer.cpp @@ -123,11 +123,11 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful ctx->put.upload_offset = 0; if(full) { - snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); + snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); } else { - snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID); + snprintf(minio_url, 256, "http://%s/%s/%s?partNumber=%d&uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ++ctx->put.part_index, ctx->put.uploadID); curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_put_multipart_header_cb); curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); } @@ -141,7 +141,7 @@ static int http_put_bodypart_request_evbuf(struct tango_cache_ctx *ctx, bool ful curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.upload_length); curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_multipart_send_cb); curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); @@ -184,7 +184,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) return -1; } - snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); + snprintf(minio_url, 256, "http://%s/%s/%s?uploads", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, 0); //默认使用回调函数调用fread,测试发现关闭Expect时会导致卡在curl_multi_socket_action curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); @@ -193,7 +193,7 @@ int curl_get_minio_uploadID(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); @@ -214,13 +214,13 @@ int cache_delete_minio_object(struct tango_cache_ctx *ctx, bool call_back) return -1; } - snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); + snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); @@ -238,13 +238,13 @@ bool cache_cancel_upload_minio(struct tango_cache_ctx *ctx) return false; } - snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID); + snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ctx->put.uploadID); curl_easy_setopt(ctx->curl, CURLOPT_CUSTOMREQUEST, "DELETE"); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); @@ -264,7 +264,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) } construct_complete_xml(ctx, &ctx->put.combine_xml, &len); - snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key, ctx->put.uploadID); + snprintf(minio_url, 256, "http://%s/%s/%s?uploadId=%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key, ctx->put.uploadID); curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); @@ -273,7 +273,7 @@ bool cache_kick_combine_minio(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, len); //填充Content-Length curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDS, ctx->put.combine_xml); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); if(ctx->headers != NULL) { @@ -422,7 +422,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r else { size_t upload_length = evbuffer_get_length(ctx->put.evbuf); - if(upload_length >= ctx->instance->upload_block_size) + if(upload_length >= ctx->instance->param->upload_block_size) { cache_kick_upload_minio_multipart(ctx, upload_length); } @@ -453,7 +453,7 @@ void tango_cache_curl_put_done(struct tango_cache_ctx *ctx, CURLcode res, long r else { size_t upload_length = evbuffer_get_length(ctx->put.evbuf); - if(upload_length >= ctx->instance->upload_block_size) + if(upload_length >= ctx->instance->param->upload_block_size) { cache_kick_upload_minio_multipart(ctx, upload_length); } @@ -494,13 +494,13 @@ int tango_cache_upload_once_start_data(struct tango_cache_ctx *ctx, enum PUT_MEM } ctx->put.state = PUT_STATE_END; - snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); + snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_any_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); if(way == PUT_MEM_COPY) { @@ -608,7 +608,7 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback) return -1; } - snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->bucketname); + snprintf(minio_url, 256, "http://%s/%s/?delete", ctx->hostaddr, ctx->instance->param->bucketname); curl_easy_setopt(ctx->curl, CURLOPT_POST, 1L); curl_easy_setopt(ctx->curl, CURLOPT_POSTFIELDSIZE, ctx->response.size); //填充Content-Length,在CURLOPT_COPYPOSTFIELDS之前设置 curl_easy_setopt(ctx->curl, CURLOPT_COPYPOSTFIELDS, ctx->response.buff); @@ -617,7 +617,7 @@ int tango_cache_multi_delete_start(struct tango_cache_ctx *ctx, bool callback) curl_easy_setopt(ctx->curl, CURLOPT_WRITEFUNCTION, curl_response_body_save_cb); curl_easy_setopt(ctx->curl, CURLOPT_WRITEDATA, ctx); curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); @@ -847,7 +847,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx) return -1; } - snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); + snprintf(minio_url, 256, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key); curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); if(ctx->method == CACHE_REQUEST_HEAD) { @@ -858,7 +858,7 @@ int tango_cache_fetch_start(struct tango_cache_ctx *ctx) curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_get_response_header_cb); curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); - curl_set_common_options(ctx->curl, ctx->instance->transfer_timeout, ctx->error); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); rc = curl_multi_add_handle(ctx->instance->multi_hd, ctx->curl); assert(rc==CURLM_OK); diff --git a/cache/test/cache_evbase_test.cpp b/cache/test/cache_evbase_test.cpp index 9077f19..b3f0350 100644 --- a/cache/test/cache_evbase_test.cpp +++ b/cache/test/cache_evbase_test.cpp @@ -191,6 +191,7 @@ int main(int argc, char **argv) struct future_pdata *pdata; struct cache_evbase_ctx *ctx; void *runtime_log; + struct tango_cache_parameter *parameter; if(argc != 2 && argc!=3) { @@ -209,7 +210,9 @@ int main(int argc, char **argv) } cache_evbase_global_init(); - instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + assert(parameter != NULL); + instance_asyn = cache_evbase_instance_new(parameter, runtime_log); assert(instance_asyn!=NULL); pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); diff --git a/cache/test/cache_evbase_test_threads.cpp b/cache/test/cache_evbase_test_threads.cpp index b792bea..f2924b7 100644 --- a/cache/test/cache_evbase_test_threads.cpp +++ b/cache/test/cache_evbase_test_threads.cpp @@ -280,6 +280,7 @@ int main(int argc, char **argv) pthread_t thread_tid; pthread_attr_t attr; struct pthread_data pdata[20]; + struct tango_cache_parameter *parameter; if(argc!=3) { @@ -294,7 +295,9 @@ int main(int argc, char **argv) } cache_evbase_global_init(); - instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + assert(parameter != NULL); + instance_asyn = cache_evbase_instance_new(parameter, runtime_log); assert(instance_asyn!=NULL); pthread_attr_init(&attr); diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf index 53cca1d..a1e663c 100644 --- a/cache/test/pangu_tg_cahce.conf +++ b/cache/test/pangu_tg_cahce.conf @@ -1,12 +1,15 @@ [TANGO_CACHE] -#MINIO IP鍦板潃锛岀洰鍓嶅彧鏀寔涓涓 +#MINIO IP鍦板潃鍒楄〃锛學iredLB鏍煎紡 MINIO_IP_LIST=192.168.10.61-64; MINIO_LISTEN_PORT=9000 #姣忎釜鍩熷悕鏈澶氬紑鍚殑閾炬帴鏁 -MAX_CONNECTION_PER_HOST=10 +#MAX_CONNECTION_PER_HOST=1 +MAX_CNNT_PIPELINE_NUM=20 +#MAX_CURL_SESSION_NUM=100 +MAX_CURL_TRANSFER_TIMEOUT_S=15 #bucket鐨勫悕绉 -CACHE_BUCKET_NAME=images +CACHE_BUCKET_NAME=openbucket #缂撳瓨鏈澶у崰鐢ㄧ殑鍐呭瓨绌洪棿澶у皬锛岃秴鍑虹┖闂存椂涓婁紶澶辫触 MAX_USED_MEMORY_SIZE_MB=5120 #涓婁紶鏃禘xpires澶撮儴鐨勮繃鏈熸椂闂达紝鍗曚綅绉掞紝鏈灏60锛1鍒嗛挓锛 @@ -15,16 +18,21 @@ CACHE_DEFAULT_TTL_SECOND=3600 CACHE_OBJECT_KEY_HASH_SWITCH=1 #HEAD鍏冧俊鎭殑鏉ユ簮锛1-MINIO锛2-REDIS -CACHE_HEAD_FROM_SOURCE=1 +CACHE_HEAD_FROM_SOURCE=2 #浣跨敤Redis浣滀负鍏冧俊鎭幏鍙栨簮 CACHE_HEAD_REDIS_KEY=MINIO_EVENTS_INFO -CACHE_HEAD_REDIS_IP=192.168.10.63 +#涓昏鐨凴edis IP鍦板潃锛屼紭鍏堜娇鐢 +CACHE_HEAD_MAIN_REDIS_IP=192.168.10.63 +#鍙湁鍦ㄤ富Redis鎸傛帀鏃讹紝浠庝笅杩板垪琛ㄩ夋嫨涓涓繛鎺ワ紝WiredLB鏍煎紡銆 +CACHE_HEAD_REDIS_IPLIST=192.168.10.62-63; CACHE_HEAD_REDIS_PORT=6379 #WIRED LOAD BALANCER閰嶇疆 #WIREDLB_OVERRIDE=1 #WIREDLB_TOPIC= -#WIREDLB_GROUP= #WIREDLB_DATACENTER= -#WIREDLB_HEALTH_PORT=52100 +WIREDLB_MINIO_HEALTH_PORT=52100 +#WIREDLB_MINIO_GROUP= +WIREDLB_REDIS_HEALTH_PORT=52101 +#WIREDLB_REDIS_GROUP= diff --git a/cache/test/tango_cache_test.c b/cache/test/tango_cache_test.c index 19c6b19..b449a11 100644 --- a/cache/test/tango_cache_test.c +++ b/cache/test/tango_cache_test.c @@ -398,6 +398,7 @@ int main(int crgc, char **arg) struct event ev_timer; struct timeval tv; void *runtime_log; + struct tango_cache_parameter *parameter; runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); if(NULL==runtime_log) @@ -414,7 +415,9 @@ int main(int crgc, char **arg) init_fifo(); tango_cache_global_init(); - tango_instance = tango_cache_instance_new(ev_base, "./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + parameter = tango_cache_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + assert(parameter != NULL); + tango_instance = tango_cache_instance_new(parameter, ev_base, runtime_log); tv.tv_sec = 10; tv.tv_usec = 0;