增加集群版redis作为元信息和对象缓存,去除Minio事件通知的redis元信息获取方式。

This commit is contained in:
zhangchengwei
2018-12-14 15:07:09 +08:00
committed by zhengchao
parent 08ae82932a
commit d24c57ce85
32 changed files with 1561 additions and 909 deletions

View File

@@ -86,35 +86,25 @@ const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx)
{
switch(ctx->error_code)
{
case CACHE_CACHE_MISS: return "cache not hit";
case CACHE_TIMEOUT: return "cache not fresh";
case CACHE_OUTOF_MEMORY:return "outof memory";
case CACHE_ERR_WIREDLB: return "wiredlb error";
case CACHE_ERR_SOCKPAIR:return "socketpair error";
case CACHE_ERR_INTERNAL:return "internal error";
case CACHE_ERR_REDIS_JSON:return "parse redis json error";
case CACHE_CACHE_MISS: return "cache not hit";
case CACHE_TIMEOUT: return "cache not fresh";
case CACHE_OUTOF_MEMORY: return "outof memory";
case CACHE_ERR_WIREDLB: return "wiredlb error";
case CACHE_ERR_SOCKPAIR: return "socketpair error";
case CACHE_ERR_INTERNAL: return "internal error";
case CACHE_ERR_REDIS_JSON: return "parse redis json error";
case CACHE_ERR_REDIS_CONNECT:return "redis is not connected";
case CACHE_OUTOF_SESSION:return "two many curl sessions";
case CACHE_UPDATE_CANCELED:return "update was canceled";
case CACHE_ERR_REDIS_EXEC: return "redis command reply error";
case CACHE_OUTOF_SESSION: return "two many curl sessions";
case CACHE_UPDATE_CANCELED: return "update was canceled";
case CACHE_ERR_EVBUFFER: return "evbuffer read write error";
default: return ctx->error;
}
}
void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out)
{
out->get_recv_num = instance->statistic.get_recv_num;
out->get_succ_num = instance->statistic.get_succ_num;
out->get_error_num= instance->statistic.get_error_num;
out->get_miss_num = instance->statistic.get_miss_num;
out->put_recv_num = instance->statistic.put_recv_num;
out->put_succ_num = instance->statistic.put_succ_num;
out->put_error_num= instance->statistic.put_error_num;
out->del_recv_num = instance->statistic.del_recv_num;
out->del_succ_num = instance->statistic.del_succ_num;
out->del_error_num= instance->statistic.del_error_num;
out->totaldrop_num= instance->statistic.totaldrop_num;
out->session_num = instance->statistic.session_num;
out->memory_used = instance->statistic.memory_used;
*out = instance->statistic;
}
struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result)
@@ -122,11 +112,6 @@ struct tango_cache_result *tango_cache_read_result(future_result_t *promise_resu
return (struct tango_cache_result *)promise_result;
}
void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
}
static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic)
{
switch(ctx->method)
@@ -134,11 +119,17 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti
case CACHE_REQUEST_PUT:
if(ctx->fail_state)
{
statistic->put_error_num += 1;
if(ctx->locate == OBJECT_IN_MINIO)
statistic->put_err_http += 1;
else
statistic->put_err_redis += 1;
}
else
{
statistic->put_succ_num += 1;
if(ctx->locate == OBJECT_IN_MINIO)
statistic->put_succ_http += 1;
else
statistic->put_succ_redis += 1;
}
break;
case CACHE_REQUEST_GET:
@@ -147,12 +138,17 @@ static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statisti
{
if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT)
statistic->get_miss_num += 1;
else if(ctx->locate == OBJECT_IN_MINIO)
statistic->get_err_http += 1;
else
statistic->get_error_num += 1;
statistic->get_err_redis += 1;
}
else
{
statistic->get_succ_num += 1;
if(ctx->locate == OBJECT_IN_MINIO)
statistic->get_succ_http += 1;
else
statistic->get_succ_redis += 1;
}
break;
case CACHE_REQUEST_DELETE:
@@ -218,6 +214,7 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
case CACHE_REQUEST_PUT:
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
if(ctx->put.object_meta == NULL) cJSON_Delete(ctx->put.object_meta);
if(ctx->put.evbuf!=NULL)
{
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
@@ -253,9 +250,61 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
free(ctx);
}
void tango_cache_update_end(struct tango_cache_ctx *ctx)
//<2F>ж<EFBFBD>session<6F>Ƿ񳬹<C7B7><F1B3ACB9><EFBFBD><EFBFBD>ƣ<EFBFBD><C6A3><EFBFBD><EFBFBD><EFBFBD>ȡ<EFBFBD><C8A1>ʼ<EFBFBD><CABC><EFBFBD>ж<EFBFBD>where_to_get<65>Ƿ<EFBFBD>ȫ<EFBFBD><C8AB><EFBFBD><EFBFBD>MINIO<49><4F>
bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_LOCATION where_to_get)
{
cache_kick_upload_minio_end(ctx);
if(where_to_get == OBJECT_IN_MINIO)
{
return (instance->statistic.session_http>=instance->param->maximum_sessions);
}
else
{
return (instance->statistic.session_redis>=instance->param->maximum_sessions);
}
}
//<2F><><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD>API<50><49>ʹ<EFBFBD><CAB9>ctx<74><78>evbuffer<65><72><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>޷<EFBFBD><DEB7><EFBFBD><EFBFBD><EFBFBD>ctx<74><78>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size)
{
if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize)
{
return OBJECT_IN_MINIO;
}
else
{
return OBJECT_IN_REDIS;
}
}
void tango_cache_get_object_path(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize)
{
if(path != NULL)
{
if(ctx->locate == OBJECT_IN_MINIO)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->param->bucketname, ctx->object_key);
}
else
{
snprintf(path, pathsize, "redis://%s/%s/%s", ctx->instance->redisaddr, ctx->instance->param->bucketname, ctx->object_key);
}
}
}
int tango_cache_update_end(struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize)
{
if(!ctx->fail_state)
{
ctx->locate = tango_cache_object_locate(ctx->instance, ctx->put.object_size);
tango_cache_get_object_path(ctx, path, pathsize);
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
{
cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", ctx->put.object_size);
}
}
return do_tango_cache_update_end(ctx, false);
}
void tango_cache_update_cancel(struct tango_cache_ctx *ctx)
@@ -268,7 +317,6 @@ void tango_cache_update_cancel(struct tango_cache_ctx *ctx)
ctx->curl = NULL;
}
tango_cache_set_fail_state(ctx, CACHE_UPDATE_CANCELED);
//<2F>Ѿ<EFBFBD><D1BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֶ<EFBFBD><D6B6>ϴ<EFBFBD>IDʱ<44><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD>cancelɾ<6C><C9BE>
if(ctx->put.uploadID!=NULL && cache_cancel_upload_minio(ctx))
{
ctx->put.state = PUT_STATE_CANCEL;
@@ -291,6 +339,7 @@ int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data,
return 0;
}
ctx->instance->statistic.memory_used += size;
ctx->put.object_size += size;
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
@@ -312,7 +361,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
{
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
return 0;
}
}
@@ -320,11 +369,12 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
{
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
{
tango_cache_set_fail_state(ctx, CACHE_OUTOF_MEMORY);
tango_cache_set_fail_state(ctx, CACHE_ERR_EVBUFFER);
return 0;
}
}
ctx->instance->statistic.memory_used += size;
ctx->put.object_size += size;
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->param->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->param->upload_block_size);
@@ -332,13 +382,14 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP
return 0;
}
struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta, enum OBJECT_LOCATION maybe_loc)
{
struct tango_cache_ctx *ctx;
char buffer[2064];
char buffer[2064], *user_tag=NULL, *user_tag_value=NULL;
time_t expires, now, last_modify;
struct easy_string hdr_estr={NULL, 0, 0};
if((u_int64_t)instance->statistic.memory_used>=instance->param->cache_limit_size || instance->statistic.session_num>=instance->param->max_session_num)
if(sessions_exceeds_limit(instance, maybe_loc) || (u_int64_t)instance->statistic.memory_used>=instance->param->maximum_used_mem)
{
instance->error_code = CACHE_OUTOF_MEMORY;
instance->statistic.totaldrop_num += 1;
@@ -349,6 +400,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
ctx->instance = instance;
ctx->promise = future_to_promise(f);
ctx->method = CACHE_REQUEST_PUT;
ctx->locate = maybe_loc;
if(instance->param->hash_object_key)
{
@@ -366,6 +418,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
if(ctx->headers!=NULL) curl_slist_free_all(ctx->headers);
free(ctx);
return NULL;
}
@@ -377,6 +430,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
{
ctx->headers = curl_slist_append(ctx->headers, buffer);
}
ctx->put.object_ttl = expires;
//Last-Modify<66>ֶΣ<D6B6><CEA3><EFBFBD><EFBFBD><EFBFBD>GETʱ<54>ж<EFBFBD><D0B6>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD>
last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified;
if(last_modify == 0)
@@ -391,21 +445,51 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
if(meta->std_hdr[i] != NULL)
{
ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]);
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
{
easy_string_savedata(&hdr_estr, meta->std_hdr[i], strlen(meta->std_hdr[i]));
easy_string_savedata(&hdr_estr, "\r\n", 2);
}
}
}
if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL)
{
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
{
easy_string_savedata(&hdr_estr, "Content-Type: application/octet-stream\r\n", strlen("Content-Type: application/octet-stream\r\n"));
}
}
ctx->headers = curl_slist_append(ctx->headers, "Expect:");//ע<><D7A2>POST<53><54><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Expect<63><74>ϵ<EFBFBD><CFB5>Ҫ<EFBFBD><D2AA>ȷ<EFBFBD><C8B7><EFBFBD><EFBFBD>CURLOPT_POSTFIELDSIZE
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD>GETʱ<54><CAB1>ԭ<EFBFBD><D4AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
{
char *p = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ռ䣻18=17+1: ͷ<><CDB7>+<2B>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
memcpy(p, "x-amz-meta-user: ", 17);
Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)p+17);
ctx->headers = curl_slist_append(ctx->headers, p);
free(p);
user_tag = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ռ䣻18=17+1: ͷ<><CDB7>+<2B>ַ<EFBFBD><D6B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
memcpy(user_tag, "x-amz-meta-user: ", 17);
user_tag_value = user_tag+17;
Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)user_tag_value);
ctx->headers = curl_slist_append(ctx->headers, user_tag);
}
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
{
ctx->put.object_meta = cJSON_CreateObject();
if(instance->param->hash_object_key)
{
cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-Url", meta->url);
}
cJSON_AddNumberToObject(ctx->put.object_meta, "Expires", now + expires);
cJSON_AddNumberToObject(ctx->put.object_meta, "X-Amz-Meta-Lm", last_modify);
cJSON_AddStringToObject(ctx->put.object_meta, "Headers", hdr_estr.buff);
if(user_tag_value != NULL)
{
cJSON_AddStringToObject(ctx->put.object_meta, "X-Amz-Meta-User", user_tag_value);
}
easy_string_destroy(&hdr_estr);
}
if(user_tag != NULL)
{
free(user_tag);
}
ctx->put.evbuf = evbuffer_new();
@@ -415,9 +499,15 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
{
struct tango_cache_ctx *ctx;
struct tango_cache_ctx *ctx;
enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN;
ctx = tango_cache_update_prepare(instance, f, meta);
if(instance->param->object_store_way != CACHE_SMALL_REDIS)
{
maybe_loc = OBJECT_IN_MINIO;
}
ctx = tango_cache_update_prepare(instance, f, meta, maybe_loc);
if(ctx == NULL)
{
return NULL;
@@ -427,22 +517,40 @@ struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *in
return ctx;
}
//һ<><D2BB><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD>ʱ<EFBFBD><CAB1>ֱ<EFBFBD>Ӷ<EFBFBD>λ<EFBFBD><CEBB><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD><CFB4><EFBFBD>λ<EFBFBD><CEBB>
struct tango_cache_ctx *tango_cache_update_once_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta,
size_t object_size, char *path, size_t pathsize)
{
struct tango_cache_ctx *ctx;
enum OBJECT_LOCATION location;
location = tango_cache_object_locate(instance, object_size);
ctx = tango_cache_update_prepare(instance, f, meta, location);
if(ctx == NULL)
{
return NULL;
}
tango_cache_get_object_path(ctx, path, pathsize);
if(ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
{
cJSON_AddNumberToObject(ctx->put.object_meta, "Content-Length", object_size);
}
return ctx;
}
int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f,
enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
struct tango_cache_ctx *ctx;
struct tango_cache_ctx *ctx;
ctx = tango_cache_update_prepare(instance, f, meta);
ctx = tango_cache_update_once_prepare(instance, f, meta, size, path, pathsize);
if(ctx == NULL)
{
if(way == PUT_MEM_FREE) free((void *)data);
return -1;
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_data(ctx, way, data, size);
return do_tango_cache_upload_once_data(ctx, way, data, size);
}
int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* f,
@@ -450,25 +558,21 @@ int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_update_prepare(instance, f, meta);
ctx = tango_cache_update_once_prepare(instance, f, meta, evbuffer_get_length(evbuf), path, pathsize);
if(ctx == NULL)
{
return -1;
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->param->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_evbuf(ctx, way, evbuf);
return do_tango_cache_upload_once_evbuf(ctx, way, evbuf);
}
struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, struct future* f, struct tango_cache_meta_get *meta)
struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method,
struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get)
{
struct tango_cache_ctx *ctx;
char sha256[72];
if(instance->param->head_meta_source!=HEAD_META_FROM_REDIS && instance->statistic.session_num>=instance->param->max_session_num)
if(sessions_exceeds_limit(instance, where_to_get))
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -478,7 +582,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->promise = future_to_promise(f);
promise_allow_many_successes(ctx->promise);
promise_allow_many_successes(ctx->promise); //<2F><><EFBFBD>λص<CEBB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD>promise_finish
ctx->method = method;
ctx->get.state = GET_STATE_START;
ctx->get.max_age = meta->get.max_age;
@@ -503,36 +607,36 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
return ctx;
}
int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta);
if(instance->param->object_store_way != CACHE_SMALL_REDIS)
{
where_to_get = OBJECT_IN_MINIO;
}
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta, where_to_get);
if(ctx == NULL)
{
return -1;
}
return (tango_cache_fetch_start(ctx)==1)?0:-1;
return do_tango_cache_fetch_object(ctx, where_to_get);
}
int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
{
struct tango_cache_ctx *ctx;
enum OBJECT_LOCATION location;
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta);
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Redis<69><73><EFBFBD><EFBFBD>Ԫ<EFBFBD><D4AA>Ϣ<EFBFBD><EFBFBD><E6B4A2>Redis<69><73>
location = (instance->param->object_store_way != CACHE_ALL_MINIO)?OBJECT_IN_REDIS:OBJECT_IN_MINIO;
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta, location);
if(ctx == NULL)
{
return -1;
}
if(instance->param->head_meta_source == HEAD_META_FROM_REDIS)
{
return tango_cache_head_redis(ctx);
}
else
{
return (tango_cache_fetch_start(ctx)==1)?0:-1;
}
return do_tango_cache_head_object(ctx, location);
}
struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey)
@@ -540,7 +644,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
struct tango_cache_ctx *ctx;
char sha256[72];
if(instance->statistic.session_num >= instance->param->max_session_num)
if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO))
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -588,7 +692,7 @@ struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_inst
struct tango_cache_ctx *ctx;
char md5[48]={0}, content_md5[48];
if(instance->statistic.session_num >= instance->param->max_session_num)
if(sessions_exceeds_limit(instance, OBJECT_IN_MINIO))
{
instance->error_code = CACHE_OUTOF_SESSION;
instance->statistic.totaldrop_num += 1;
@@ -628,7 +732,7 @@ int tango_cache_multi_delete(struct tango_cache_instance *instance, struct futur
{
return -1;
}
return tango_cache_multi_delete_start(ctx);
return do_tango_cache_multi_delete(ctx);
}
static void check_multi_info(CURLM *multi)
@@ -686,7 +790,7 @@ static void libevent_socket_event_cb(int fd, short action, void *userp)
what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0);
rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running);
instance->statistic.session_num = still_running;
instance->statistic.session_http = still_running;
assert(rc==CURLM_OK);
check_multi_info(instance->multi_hd);
@@ -704,7 +808,7 @@ static void libevent_timer_event_cb(int fd, short kind, void *userp)
int still_running;
rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running);
instance->statistic.session_num = still_running;
instance->statistic.session_http = still_running;
assert(rc==CURLM_OK);
check_multi_info(instance->multi_hd);
}
@@ -758,7 +862,7 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
//timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once.
//To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs.
rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running);
instance->statistic.session_num = still_running;
instance->statistic.session_http = still_running;
assert(rc==CURLM_OK);
}
else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer.
@@ -773,18 +877,18 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
return 0; //0-success; -1-error
}
static int wired_load_balancer_init(const char *topic, const char *datacenter, int override, struct wiredlb_parameter *wparam, void *runtime_log)
static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log)
{
wparam->wiredlb = wiredLB_create(topic, wparam->wiredlb_group, WLB_PRODUCER);
wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER);
if(wparam->wiredlb == NULL)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
return -1;
}
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port));
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &override, sizeof(override));
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, datacenter, strlen(datacenter)+1);
if(override)
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override));
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1);
if(wparam->wiredlb_override)
{
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1);
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &wparam->port, sizeof(wparam->port));
@@ -807,17 +911,17 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
//multi curl
MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 1);
param->max_cnn_host = intval;
param->maximum_host_cnns = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CNNT_PIPELINE_NUM", &intval, 20);
param->max_pipeline_num = intval;
param->maximum_pipelines = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_TRANSFER_TIMEOUT_S", &intval, 0);
param->transfer_timeout = intval;
//instance
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &param->max_session_num, 200);
MESA_load_profile_uint_def(profile_path, section, "MAX_CURL_SESSION_NUM", &param->maximum_sessions, 100);
MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120);
longval = intval;
param->cache_limit_size = longval * 1024 * 1024;
param->maximum_used_mem = longval * 1024 * 1024;
MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &param->hash_object_key, 1);
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", param->bucketname, 256) < 0)
{
@@ -830,7 +934,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
return NULL;
}
MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999);
MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 604800);
if(intval < 60)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
@@ -838,47 +942,41 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
}
param->relative_ttl = intval;
//wiredlb common
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->wiredlb_override, 1);
//wiredlb minio
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_MINIO_HEALTH_PORT", &intval, 52100);
//wiredlb
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->minio.wiredlb_override, 1);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
param->minio.wiredlb_ha_port = intval;
MESA_load_profile_string_def(profile_path, section, "WIREDLB_MINIO_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", param->minio.wiredlb_group, 64, "MINIO_GROUP");
MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &param->minio.port, 9000);
if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", param->minio.iplist, 4096) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.", profile_path, section);
return NULL;
}
if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, &param->minio, runtime_log))
if(wired_load_balancer_init(&param->minio, runtime_log))
{
return NULL;
}
//wiredlb redis
MESA_load_profile_int_def(profile_path, section, "CACHE_HEAD_FROM_SOURCE", &param->head_meta_source, HEAD_META_FROM_MINIO);
if(param->head_meta_source == HEAD_META_FROM_REDIS)
MESA_load_profile_int_def(profile_path, section, "CACHE_STORE_OBJECT_WAY", &param->object_store_way, CACHE_ALL_MINIO);
if(param->object_store_way!=CACHE_ALL_MINIO && param->object_store_way!=CACHE_META_REDIS && param->object_store_way!=CACHE_SMALL_REDIS)
{
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_REDIS_HEALTH_PORT", &intval, 0);
param->redis.wiredlb_ha_port = intval;
MESA_load_profile_string_def(profile_path, section, "WIREDLB_REDIS_GROUP", param->redis.wiredlb_group, 64, "REDIS_GROUP");
MESA_load_profile_string_def(profile_path, section, "CACHE_HEAD_REDIS_KEY", param->redis_key, 256, param->bucketname);
MESA_load_profile_uint_def(profile_path, section, "CACHE_HEAD_REDIS_PORT", &param->redis.port, 6379);
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_REDIS_IPLIST", param->redis.iplist, 256) < 0)
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "CACHE_STORE_OBJECT_WAY is not 1/2/3.", profile_path, section);
return NULL;
}
if(param->object_store_way != CACHE_ALL_MINIO)
{
if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_ADDRS", param->redisaddrs, 4096) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_REDIS_IPLIST not found.\n", profile_path, section);
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_ADDRS not found.", profile_path, section);
return NULL;
}
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_HEAD_MAIN_REDIS_IP", param->redis.mainip, 64) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_HEAD_MAIN_REDIS_IP not found.\n", profile_path, section);
return NULL;
}
if(wired_load_balancer_init(param->wiredlb_topic, param->wiredlb_datacenter, param->wiredlb_override, &param->redis, runtime_log))
MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", &param->redis_object_maxsize, 10240);
if(param->redis_object_maxsize >= param->upload_block_size)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CACHE_OBJECT_SIZE must be smaller than CACHE_UPLOAD_BLOCK_SIZE.", profile_path, section);
return NULL;
}
}
@@ -888,6 +986,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_parameter *param, struct event_base* evbase, void *runtimelog)
{
struct tango_cache_instance *instance;
char *redis_sep, *save_ptr=NULL;
instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance));
memset(instance, 0, sizeof(struct tango_cache_instance));
@@ -897,22 +996,23 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet
instance->multi_hd = curl_multi_init();
curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->max_cnn_host);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->max_pipeline_num);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, param->maximum_host_cnns);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_PIPELINE_LENGTH, param->maximum_pipelines);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance);
if(param->head_meta_source == HEAD_META_FROM_REDIS)
if(param->object_store_way != CACHE_ALL_MINIO)
{
if(redis_asyn_connect_init(instance, instance->param->redis.mainip))
if(redis_asyn_connect_init(instance))
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.",
instance->current_redisip, instance->param->redis.port);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s failed.", instance->param->redisaddrs);
free(instance);
return NULL;
}
}
redis_sep = strtok_r(param->redisaddrs, ",", &save_ptr);
sprintf(instance->redisaddr, "%s", redis_sep);
}
evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
return instance;