#include #include #include #include #include #include #include #include #include #include #include #include #include "tango_cache_transfer.h" #include "tango_cache_tools.h" #include "tango_cache_redis.h" #include "cJSON.h" #define PARSE_JSON_RET_ERROR -1 #define PARSE_JSON_RET_TIMEOUT 0 #define PARSE_JSON_RET_SUCC 1 #define CACHE_REDIS_CONNECT_IDLE 0 #define CACHE_REDIS_CONNECTING 1 #define CACHE_REDIS_CONNECTED 2 #define CACHE_REDIS_DISCONNECTED 3 struct http_hdr_name { const char *json_name; const char *http_name; }; struct http_hdr_name g_http_hdr_name[HDR_CONTENT_NUM]= { {"content-type", "Content-Type: "}, {"content-encoding", "Content-Encoding: "}, {"content-disposition", "Content-Disposition: "}, {"content-md5", "Content-MD5: "} }; void redis_asyn_disconnect_cb(const struct redisAsyncContext *ac, int status) { struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); if(status == REDIS_OK) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u success.", instance->redis_ip, instance->redis_port); } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis disconnect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr); } instance->redis_connecting = CACHE_REDIS_DISCONNECTED; } void redis_asyn_connect_cb(const struct redisAsyncContext *ac, int status) { struct tango_cache_instance *instance = (struct tango_cache_instance *)redisAsyncGetConnectionData(ac); if(status == REDIS_OK) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u success.", instance->redis_ip, instance->redis_port); instance->redis_connecting = CACHE_REDIS_CONNECTED; } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Redis connect %s:%u failed: %s.", instance->redis_ip, instance->redis_port, ac->errstr); instance->redis_connecting = CACHE_REDIS_CONNECT_IDLE; } } int redis_asyn_connect_init(struct tango_cache_instance *instance, const char *redisip, int redis_port) { instance->redis_ac = redisAsyncConnect(redisip, redis_port); if(instance->redis_ac == NULL) { return -1; } instance->redis_connecting = CACHE_REDIS_CONNECTING; redisLibeventAttach(instance->redis_ac, instance->evbase); redisAsyncSetConnectionData(instance->redis_ac, instance); redisAsyncSetConnectCallback(instance->redis_ac, redis_asyn_connect_cb); redisAsyncSetDisconnectCallback(instance->redis_ac, redis_asyn_disconnect_cb); return 0; } int parse_minio_events_json(struct tango_cache_ctx *ctx, const char *jcontent) { cJSON *root, *pobject = NULL, *ptarget, *plastMod, *pexpires; int ret = PARSE_JSON_RET_ERROR; char usertag[2048]; size_t datalen; //Records[0]->s3->object->key...userMetaData->metas... if(NULL == (root=cJSON_Parse(jcontent))) { goto out_json; } if(NULL==(pobject=cJSON_GetObjectItem(root, "Records")) || pobject->type!=cJSON_Array) { goto out_json; } if(NULL == (pobject=cJSON_GetArrayItem(pobject, 0))) //第一个数组元素,一般只有一个 { goto out_json; } if(NULL == (pobject=cJSON_GetObjectItem(pobject, "s3")) || pobject->type!=cJSON_Object) { goto out_json; } if(NULL == (pobject=cJSON_GetObjectItem(pobject, "object")) || pobject->type!=cJSON_Object) { goto out_json; } //获取结果 if(NULL == (ptarget=cJSON_GetObjectItem(pobject, "size")) || ptarget->type!=cJSON_Number) { goto out_json; } ctx->get.result.tlength = ptarget->valueint; //TODO: 若超过4GB怎么办? if(NULL == (ptarget=cJSON_GetObjectItem(pobject, "userMetadata")) || ptarget->type!=cJSON_Object) { goto out_json; } if(NULL==(plastMod=cJSON_GetObjectItem(ptarget, "X-Amz-Meta-Lm")) || NULL==(pexpires=cJSON_GetObjectItem(ptarget, "expires"))) { goto out_json; } ctx->get.need_hdrs = RESPONSE_HDR_ALL; ctx->get.last_modify = atol(plastMod->valuestring); ctx->get.expires = expires_hdr2timestamp(pexpires->valuestring, strlen(pexpires->valuestring)); if(!check_expires_fresh_header(ctx)) { ret = PARSE_JSON_RET_TIMEOUT; goto out_json; } if(NULL!=(plastMod=cJSON_GetObjectItem(ptarget, "X-Amz-Meta-User"))) { if((datalen = Base64_DecodeBlock((unsigned char*)plastMod->valuestring, strlen(plastMod->valuestring), (unsigned char*)usertag, 2048))>0) { easy_string_savedata(&ctx->get.response_tag, usertag, datalen); } } for(int i=0; iresponse, g_http_hdr_name[i].http_name, strlen(g_http_hdr_name[i].http_name)); easy_string_savedata(&ctx->response, plastMod->valuestring, strlen(plastMod->valuestring)); easy_string_savedata(&ctx->response, "\r\n", strlen("\r\n")); } } cJSON_Delete(root); return PARSE_JSON_RET_SUCC; out_json: cJSON_Delete(root); return ret; } void redis_hget_command_cb(struct redisAsyncContext *ac, void *vreply, void *privdata) { redisReply *reply = (redisReply *)vreply; struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)privdata; int ret; if(reply == NULL || reply->type!=REDIS_REPLY_STRING) { if(reply!=NULL && reply->type == REDIS_REPLY_NIL) { tango_cache_set_fail_state(ctx, CACHE_CACHE_MISS); ctx->get.result.type = RESULT_TYPE_MISS; promise_success(future_to_promise(ctx->future), &ctx->get.result); } else { tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); if(reply!=NULL && reply->type==REDIS_REPLY_ERROR) { promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, reply->str); } else { promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); } } tango_cache_ctx_destroy(ctx); return; } ret = parse_minio_events_json(ctx, reply->str); switch(ret) { case PARSE_JSON_RET_ERROR: tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_JSON); promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); tango_cache_ctx_destroy(ctx); break; case PARSE_JSON_RET_TIMEOUT: if(ctx->get.state == GET_STATE_DELETE) { ctx->get.state = GET_STATE_END; cache_delete_minio_object(ctx); } break; case PARSE_JSON_RET_SUCC: fetch_header_over_biz(ctx); ctx->get.result.type = RESULT_TYPE_END; promise_success(future_to_promise(ctx->future), &ctx->get.result); tango_cache_ctx_destroy(ctx); break; default: assert(0);break; } } int tango_cache_head_redis(struct tango_cache_ctx *ctx) { int ret = -1; ctx->instance->statistic.get_recv_num += 1; switch(ctx->instance->redis_connecting) { case CACHE_REDIS_CONNECTED: ret = redisAsyncCommand(ctx->instance->redis_ac, redis_hget_command_cb, ctx, "HGET %s %s/%s", ctx->instance->redis_key, ctx->instance->bucketname, ctx->object_key); if(ret < 0) { //redisAsyncDisconnect(ctx->instance->redis_ac); redis_asyn_connect_init(ctx->instance, ctx->instance->redis_ip, ctx->instance->redis_port); tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); tango_cache_ctx_destroy(ctx); } break; case CACHE_REDIS_DISCONNECTED: case CACHE_REDIS_CONNECT_IDLE: redis_asyn_connect_init(ctx->instance, ctx->instance->redis_ip, ctx->instance->redis_port); case CACHE_REDIS_CONNECTING: tango_cache_set_fail_state(ctx, CACHE_ERR_REDIS_CONNECT); promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx)); tango_cache_ctx_destroy(ctx); break; default: assert(0);break; } return ret; }