为缓存管理,增加独立的SET命令设置Minio URL的超时,以通过redis订阅方式获取超时事件

This commit is contained in:
zhangchengwei
2018-12-16 16:49:49 +08:00
committed by zhengchao
parent c801523de9
commit 9430f699df
2 changed files with 53 additions and 13 deletions

View File

@@ -47,7 +47,8 @@ enum PUT_OBJECT_STATE
PUT_STATE_PART, PUT_STATE_PART,
PUT_STATE_CANCEL, PUT_STATE_CANCEL,
PUT_STATE_REDIS_META, PUT_STATE_REDIS_META,
PUT_STATE_REDIS_ALL, PUT_STATE_REDIS_EXPIRE,
PUT_STATE_REDIS_SETEX, //<2F><>״̬<D7B4><CCAC><EFBFBD>ڵȴ<DAB5><C8B4><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD>н<EFBFBD><D0BD><EFBFBD>
PUT_STATE_END, PUT_STATE_END,
}; };

View File

@@ -287,6 +287,11 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre
} }
if(ctx->fail_state) if(ctx->fail_state)
{ {
if(ctx->put.state==PUT_STATE_REDIS_META || ctx->put.state==PUT_STATE_REDIS_SETEX) //<2F><><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD>
{
ctx->put.state = PUT_STATE_END;
return;
}
tango_cache_ctx_destroy(ctx, true); tango_cache_ctx_destroy(ctx, true);
return; return;
} }
@@ -294,12 +299,25 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre
switch(ctx->put.state) switch(ctx->put.state)
{ {
case PUT_STATE_REDIS_META: case PUT_STATE_REDIS_META:
case PUT_STATE_REDIS_ALL:
ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
"EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl);
if(ret!=REDIS_OK) if(ret!=REDIS_OK)
{ {
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
ctx->put.state = PUT_STATE_END;
}
else
{
ctx->instance->statistic.session_redis += 1;
ctx->put.state = PUT_STATE_REDIS_SETEX;
}
break;
case PUT_STATE_REDIS_EXPIRE:
ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
"EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl);
if(ret != REDIS_OK)
{
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
tango_cache_ctx_destroy(ctx, true); tango_cache_ctx_destroy(ctx, true);
} }
else else
@@ -308,6 +326,9 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre
ctx->put.state = PUT_STATE_END; ctx->put.state = PUT_STATE_END;
} }
break; break;
case PUT_STATE_REDIS_SETEX:
ctx->put.state = PUT_STATE_END; //<2F><><EFBFBD><EFBFBD>һ<EFBFBD><D2BB>EXPIRE<52><45><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
break;
case PUT_STATE_END: case PUT_STATE_END:
tango_cache_ctx_destroy(ctx, true); tango_cache_ctx_destroy(ctx, true);
break; break;
@@ -317,24 +338,42 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre
int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback) int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback)
{ {
int ret; int ret_mset, ret_set;
char *meta; char *meta;
meta = cJSON_PrintUnformatted(ctx->put.object_meta); meta = cJSON_PrintUnformatted(ctx->put.object_meta);
ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
"HMSET %s OBJECT_LOCATION minio OBJECT_META %s", ctx->object_key, meta); ret_mset = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
if(ret != REDIS_OK) "HMSET %s OBJECT_LOCATION minio OBJECT_META %s MINIO_ADDR %s", ctx->object_key, meta, ctx->hostaddr);
ret_set = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx,
"SET http://%s/%s 1 EX %u", ctx->hostaddr, ctx->object_key, ctx->put.object_ttl);
if(ret_mset==REDIS_OK && ret_set==REDIS_OK)
{ {
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); ctx->instance->statistic.session_redis += 2;
tango_cache_ctx_destroy(ctx, callback); ctx->put.state = PUT_STATE_REDIS_META;
} }
else else
{
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
if(ret_mset==REDIS_OK)
{ {
ctx->instance->statistic.session_redis += 1; ctx->instance->statistic.session_redis += 1;
ctx->put.state = PUT_STATE_REDIS_META; ctx->put.state = PUT_STATE_REDIS_EXPIRE; //<2F><>ʱ<EFBFBD><CAB1>PUT<55><54><EFBFBD><EFBFBD>object<63>߼<EFBFBD>һ<EFBFBD><D2BB>
}
else if(ret_set==REDIS_OK)
{
ctx->instance->statistic.session_redis += 1;
ctx->put.state = PUT_STATE_END;
}
else
{
tango_cache_ctx_destroy(ctx, callback);
free(meta);
return -1;
}
} }
free(meta); free(meta);
return ret; return 0;
} }
int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback)
@@ -354,7 +393,7 @@ int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_CO
else else
{ {
ctx->instance->statistic.session_redis += 1; ctx->instance->statistic.session_redis += 1;
ctx->put.state = PUT_STATE_REDIS_ALL; ctx->put.state = PUT_STATE_REDIS_EXPIRE;
} }
if(way == PUT_MEM_FREE) if(way == PUT_MEM_FREE)
{ {