This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/src/tango_cache_redis.cpp

244 lines
7.6 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include "tango_cache_transfer.h"
#include "tango_cache_tools.h"
#include "tango_cache_redis.h"
#include "cJSON.h"
#define PARSE_JSON_RET_ERROR -1
#define PARSE_JSON_RET_TIMEOUT 0
#define PARSE_JSON_RET_SUCC 1
#define CACHE_REDIS_CONNECT_IDLE 0
#define CACHE_REDIS_CONNECTING 1
#define CACHE_REDIS_CONNECTED 2
#define CACHE_REDIS_DISCONNECTED 3
struct http_hdr_name
{
const char *json_name;
const char *http_name;
};
struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]=
{
{"content-type", "Content-Type: "},
{"content-encoding", "Content-Encoding: "},
{"content-disposition", "Content-Disposition: "},
{"content-md5", "Content-MD5: "}
};
void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
if(status == REDIS_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", instance->redis_ip, instance->redis_port);
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr);
}
instance->redis_connecting = CACHE_REDIS_DISCONNECTED;
}
void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac);
if(status == REDIS_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", instance->redis_ip, instance->redis_port);
instance->redis_connecting = CACHE_REDIS_CONNECTED;
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed.", instance->redis_ip, instance->redis_port, ac->errstr);
instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE;
}
}
int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip, int redis_port)
{
instance->redis_ac = redisAsyncConnect(redisip, redis_port);
if(instance->redis_ac == NULL)
{
return -1;
}
instance->redis_connecting = CACHE_REDIS_CONNECTING;
redisLibeventAttach(instance->redis_ac, instance->evbase);
redisAsyncSetConnectionData(instance->redis_ac, instance);
redisAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb);
redisAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb);
return 0;
}
int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent)
{
cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires;
int ret = PARSE_JSON_RET_ERROR;
char usertag[2048];
size_t datalen;
//Records[0]->s3->object->key...userMetaData->metas...
if(NULL == (root=cJSON_Parse(jcontent)))
{
goto out_json;
}
if(NULL==(pobject=cJSON_GetObjectItem(root, "Records")) || pobject->type!=cJSON_Array)
{
goto out_json;
}
if(NULL == (pobject=cJSON_GetArrayItem(pobject, 0))) //<2F><>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ԫ<EFBFBD>أ<EFBFBD>һ<EFBFBD><D2BB>ֻ<EFBFBD><D6BB>һ<EFBFBD><D2BB>
{
goto out_json;
}
if(NULL == (pobject=cJSON_GetObjectItem(pobject, "s3")) || pobject->type!=cJSON_Object)
{
goto out_json;
}
if(NULL == (pobject=cJSON_GetObjectItem(pobject, "object")) || pobject->type!=cJSON_Object)
{
goto out_json;
}
//<2F><>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
if(NULL == (ptarget=cJSON_GetObjectItem(pobject, "size")) || ptarget->type!=cJSON_Number)
{
goto out_json;
}
ctx->get.result.tlength = ptarget->valueint; //TODO: <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>4GB<47><42>ô<EFBFBD>
if(NULL == (ptarget=cJSON_GetObjectItem(pobject, "userMetadata")) || ptarget->type!=cJSON_Object)
{
goto out_json;
}
if(NULL==(plastMod=cJSON_GetObjectItem(ptarget, "X-Amz-Meta-Lm")) || NULL==(pexpires=cJSON_GetObjectItem(ptarget, "expires")))
{
goto out_json;
}
ctx->get.need_hdrs = RESPONSE_HDR_ALL;
ctx->get.last_modify = atol(plastMod->valuestring);
ctx->get.expires = expires_hdr2timestamp(pexpires->valuestring, strlen(pexpires->valuestring));
if(!check_expires_fresh_header(ctx))
{
ret = PARSE_JSON_RET_TIMEOUT;
goto out_json;
}
if(NULL!=(plastMod=cJSON_GetObjectItem(ptarget, "X-Amz-Meta-User")))
{
if((datalen = Base64_DecodeBlock((unsigned char*)plastMod->valuestring, strlen(plastMod->valuestring), (unsigned char*)usertag, 2048))>0)
{
easy_string_savedata(&ctx->get.response_tag, usertag, datalen);
}
}
for(int i=0; i<HDR_CONTENT_NUM; i++)
{
if(NULL != (plastMod=cJSON_GetObjectItem(ptarget, g_http_hdr_name[i].json_name)))
{
easy_string_savedata(&ctx->response, g_http_hdr_name[i].http_name, strlen(g_http_hdr_name[i].http_name));
easy_string_savedata(&ctx->response, plastMod->valuestring, strlen(plastMod->valuestring));
easy_string_savedata(&ctx->response, "\r\n", strlen("\r\n"));
}
}
return PARSE_JSON_RET_SUCC;
out_json:
cJSON_Delete(root);
return ret;
}
void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata)
{
redisReply *reply = (redisReply *)vreply;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata;
int ret;
if(reply == NULL || reply->type!=REDIS_REPLY_STRING)
{
if(reply->type == REDIS_REPLY_NIL)
{
tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS);
ctx->get.result.type = RESULT_TYPE_MISS;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
}
else
{
if(ac->err) MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "redis_hget_command_cb error: %s.", ac->errstr);
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
tango_cache_ctx_destroy(ctx);
return;
}
ret = parse_minio_events_json(ctx, reply->str);
switch(ret)
{
case PARSE_JSON_RET_ERROR:
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
break;
case PARSE_JSON_RET_TIMEOUT:
if(ctx->get.state == GET_STATE_DELETE)
{
ctx->get.state = GET_STATE_END;
cache_delete_minio_object(ctx);
}
break;
case PARSE_JSON_RET_SUCC:
fetch_header_over_biz(ctx);
ctx->get.result.type = RESULT_TYPE_END;
promise_success(future_to_promise(ctx->future), &ctx->get.result);
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;
}
}
int tango_cache_head_redis(struct tango_cache_ctx *ctx)
{
int ret = -1;
ctx->instance->statistic.get_recv_num += 1;
switch(ctx->instance->redis_connecting)
{
case CACHE_REDIS_CONNECTED:
ret = redisAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HGET %s %s/%s",
ctx->instance->redis_key, ctx->instance->bucketname, ctx->object_key);
if(ret < 0)
{
redisAsyncDisconnect(ctx->instance->redis_ac);
redis_asyn_connect_init(ctx->instance, ctx->instance->redis_ip, ctx->instance->redis_port);
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
}
break;
case CACHE_REDIS_DISCONNECTED:
case CACHE_REDIS_CONNECT_IDLE:
redis_asyn_connect_init(ctx->instance, ctx->instance->redis_ip, ctx->instance->redis_port);
case CACHE_REDIS_CONNECTING:
tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT);
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
tango_cache_ctx_destroy(ctx);
break;
default: assert(0);break;
}
return ret;
}