diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index ed846dc..fe679fb 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -47,7 +47,8 @@ enum PUT_OBJECT_STATE PUT_STATE_PART, PUT_STATE_CANCEL, PUT_STATE_REDIS_META, - PUT_STATE_REDIS_ALL, + PUT_STATE_REDIS_EXPIRE, + PUT_STATE_REDIS_SETEX, //该状态用于等待两次执行结果 PUT_STATE_END, }; diff --git a/cache/src/tango_cache_redis.cpp b/cache/src/tango_cache_redis.cpp index c1fcbc5..ccfb171 100644 --- a/cache/src/tango_cache_redis.cpp +++ b/cache/src/tango_cache_redis.cpp @@ -287,6 +287,11 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre } if(ctx->fail_state) { + if(ctx->put.state==PUT_STATE_REDIS_META || ctx->put.state==PUT_STATE_REDIS_SETEX) //还有一条命令待回调 + { + ctx->put.state = PUT_STATE_END; + return; + } tango_cache_ctx_destroy(ctx, true); return; } @@ -294,12 +299,25 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre switch(ctx->put.state) { case PUT_STATE_REDIS_META: - case PUT_STATE_REDIS_ALL: + ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, + "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); + if(ret!=REDIS_OK) + { + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + ctx->put.state = PUT_STATE_END; + } + else + { + ctx->instance->statistic.session_redis += 1; + ctx->put.state = PUT_STATE_REDIS_SETEX; + } + break; + case PUT_STATE_REDIS_EXPIRE: ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, "EXPIRE %s %u", ctx->object_key, ctx->put.object_ttl); if(ret != REDIS_OK) { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_EXEC); + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); tango_cache_ctx_destroy(ctx, true); } else @@ -308,6 +326,9 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre ctx->put.state = PUT_STATE_END; } break; + case PUT_STATE_REDIS_SETEX: + ctx->put.state = PUT_STATE_END; //还有一条EXPIRE命令待返回 + break; case PUT_STATE_END: tango_cache_ctx_destroy(ctx, true); break; @@ -317,24 +338,42 @@ static void redis_hset_command_cb(struct redisClusterAsyncContext *ac, void *vre int redis_put_minio_object_meta(struct tango_cache_ctx *ctx, bool callback) { - int ret; + int ret_mset, ret_set; char *meta; meta = cJSON_PrintUnformatted(ctx->put.object_meta); - ret = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, - "HMSET %s OBJECT_LOCATION minio OBJECT_META %s", ctx->object_key, meta); - if(ret != REDIS_OK) + + ret_mset = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, + "HMSET %s OBJECT_LOCATION minio OBJECT_META %s MINIO_ADDR %s", ctx->object_key, meta, ctx->hostaddr); + ret_set = redisClusterAsyncCommand(ctx->instance->redis_ac, redis_hset_command_cb, ctx, + "SET http://%s/%s 1 EX %u", ctx->hostaddr, ctx->object_key, ctx->put.object_ttl); + if(ret_mset==REDIS_OK && ret_set==REDIS_OK) { - tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); - tango_cache_ctx_destroy(ctx, callback); + ctx->instance->statistic.session_redis += 2; + ctx->put.state = PUT_STATE_REDIS_META; } else { - ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_META; + tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); + if(ret_mset==REDIS_OK) + { + ctx->instance->statistic.session_redis += 1; + ctx->put.state = PUT_STATE_REDIS_EXPIRE; //此时与PUT完整object逻辑一致 + } + else if(ret_set==REDIS_OK) + { + ctx->instance->statistic.session_redis += 1; + ctx->put.state = PUT_STATE_END; + } + else + { + tango_cache_ctx_destroy(ctx, callback); + free(meta); + return -1; + } } free(meta); - return ret; + return 0; } int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, bool callback) @@ -354,7 +393,7 @@ int redis_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_CO else { ctx->instance->statistic.session_redis += 1; - ctx->put.state = PUT_STATE_REDIS_ALL; + ctx->put.state = PUT_STATE_REDIS_EXPIRE; } if(way == PUT_MEM_FREE) {