This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/src/cache_evbase_client.cpp

635 lines
17 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/prctl.h>
#include <string.h>
#include <pthread.h>
#include "cache_evbase_client.h"
#include "tango_cache_transfer.h"
#include "tango_cache_tools.h"
enum CACHE_ASYN_CMD
{
CACHE_ASYN_FETCH=0,
CACHE_ASYN_UPLOAD_ONCE_DATA,
CACHE_ASYN_UPLOAD_ONCE_EVBUF,
CACHE_ASYN_UPLOAD_START,
CACHE_ASYN_UPLOAD_FRAG_DATA,
CACHE_ASYN_UPLOAD_FRAG_EVBUF,
CACHE_ASYN_UPLOAD_END,
CACHE_ASYN_UPLOAD_CANCEL,
CACHE_ASYN_DELETE,
CACHE_ASYN_HEAD,
};
struct databuffer
{
char *data;
size_t size;
struct evbuffer *evbuf;
enum CACHE_ASYN_CMD cmd_type;
enum OBJECT_LOCATION where_to_get;
struct cache_evbase_ctx *ctx_asyn;
};
enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn)
{
return tango_cache_get_last_error(ctx_asyn->ctx);
}
enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance)
{
return tango_cache_ctx_error(instance->instance);
}
void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out)
{
tango_cache_get_statistics(instance->instance, out);
}
struct tango_cache_result *cache_evbase_read_result(void *promise_result)
{
return tango_cache_read_result(promise_result);
}
static int create_notification_pipe(evutil_socket_t sv[2])
{
if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) == -1)
{
return -1;
}
if(evutil_make_socket_nonblocking(sv[0])<0 || evutil_make_socket_nonblocking(sv[1])<0)
{
return -1;
}
if(evutil_make_socket_closeonexec(sv[0])<0 || evutil_make_socket_closeonexec(sv[1])<0)
{
return -1;
}
return 0;
}
static int32_t mesa_tcp_sock_write (int32_t write_fd, void *buf, int32_t bufsize)
{
int32_t res;
do{
res = send(write_fd, buf, bufsize, MSG_NOSIGNAL);
}while (res==-1 &&(errno == EINTR));
return res;
}
static int32_t iothread_notify_event(int32_t socket_fd, void *content, int32_t len, int32_t s_time_out)
{
fd_set w_set, e_set;
struct timeval tv;
int32_t res=0, sndlen=0, sendsize=0;
while(len > sndlen)
{
FD_ZERO (&w_set);
FD_ZERO (&e_set);
FD_SET (socket_fd, &w_set);
FD_SET (socket_fd, &e_set);
if(s_time_out == 0)
{
res = select (socket_fd + 1, NULL, &w_set, &e_set, NULL);
}
else
{
tv.tv_sec = s_time_out;
tv.tv_usec = 0;
res = select (socket_fd + 1, NULL, &w_set, &e_set, &tv);
}
if(res <= 0)
{
printf("log_error: select io res=%d, error: %s\n", res, strerror(errno));
return -1;
}
if(FD_ISSET(socket_fd, &e_set))
{
printf("log_error: select io is in efds, error: %s\n", strerror(errno));
return -2;
}
if(FD_ISSET(socket_fd, &w_set))
{
sendsize = mesa_tcp_sock_write(socket_fd, (char*)content + sndlen, len - sndlen);
if (sendsize < 0)
{
if(errno == EAGAIN)
{
continue;
}
return -1;
}
sndlen += sendsize;
}
}
return sndlen;
}
static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn)
{
free(ctx_asyn);
}
static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
{
struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn;
struct promise *p;
int ret=0;
switch(buffer->cmd_type)
{
case CACHE_ASYN_FETCH:
p = ctx_asyn->ctx->promise;
if(do_tango_cache_fetch_object(ctx_asyn->ctx, buffer->where_to_get) < 0)
{
promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_FETCH failed");
}
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_HEAD:
p = ctx_asyn->ctx->promise;
ret = do_tango_cache_head_object(ctx_asyn->ctx, buffer->where_to_get);
if(ret<0)
{
promise_failed(p, FUTURE_ERROR_CANCEL, "CACHE_ASYN_HEAD failed");
}
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_DELETE:
cache_delete_minio_object(ctx_asyn->ctx, true);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_ONCE_DATA:
do_tango_cache_upload_once_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size, true);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_ONCE_EVBUF:
do_tango_cache_upload_once_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf, true);
evbuffer_free(buffer->evbuf);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_START:
ctx_asyn->ctx->instance->statistic.put_recv_num += 1;
ctx_asyn->ctx->instance->error_code = CACHE_OK;
break;
case CACHE_ASYN_UPLOAD_FRAG_DATA:
tango_cache_update_frag_data(ctx_asyn->ctx, buffer->data, buffer->size);
free(buffer->data);
break;
case CACHE_ASYN_UPLOAD_FRAG_EVBUF:
tango_cache_update_frag_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf);
evbuffer_free(buffer->evbuf);
break;
case CACHE_ASYN_UPLOAD_END:
do_tango_cache_update_end(ctx_asyn->ctx, true);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_CANCEL:
tango_cache_update_cancel(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
break;
default: assert(0);break;
}
}
static void sockpair_notification_handler(evutil_socket_t fd, short events, void *arg)
{
ssize_t readlen, needlen;
struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg;
struct databuffer *buffer;
while(1)
{
needlen=sizeof(struct cache_evbase_ctx *);
readlen = recv(fd, &buffer, needlen, 0);
if(readlen == needlen)
{
cache_asyn_ioevent_dispatch(buffer);
free(buffer);
}
else
{
if(errno!=EWOULDBLOCK && errno!=EAGAIN)
{
MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "read pipe error: %s.", strerror(errno));
assert(0);
return;
}
break;
}
}
}
static void* thread_listen_sockpair(void *arg)
{
struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg;
struct event listen_event;
prctl(PR_SET_NAME, "tango_cache");
event_assign(&listen_event, instance_asyn->evbase, instance_asyn->notify_readfd, EV_READ|EV_PERSIST, sockpair_notification_handler, instance_asyn);
event_add(&listen_event, NULL);
event_base_dispatch(instance_asyn->evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
assert(0);
return NULL;
}
int cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn, char *path/*OUT*/, size_t pathsize)
{
struct databuffer *buffer;
if(ctx_asyn->ctx->fail_state)
{
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -1;
}
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_END;
//ENDʱ<44><CAB1><EFBFBD><EFBFBD>δ<EFBFBD><CEB4>ʼ<EFBFBD>ֶ<EFBFBD><D6B6>ϴ<EFBFBD><CFB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD>֮ǰ<D6AE><C7B0><EFBFBD><EFBFBD>locateһ<65><D2BB>λ<EFBFBD><CEBB>
ctx_asyn->ctx->locate = tango_cache_object_locate(ctx_asyn->ctx->instance, ctx_asyn->object_size);
tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize);
if(ctx_asyn->ctx->instance->param->object_store_way != CACHE_ALL_MINIO)
{
cJSON_AddNumberToObject(ctx_asyn->ctx->put.object_meta, "Content-Length", ctx_asyn->object_size);
}
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;
}
return 0;
}
void cache_evbase_update_cancel(struct cache_evbase_ctx *ctx_asyn)
{
struct databuffer *buffer;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_CANCEL;
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
if(!ctx_asyn->ctx->fail_state)
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
}
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
}
}
int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size)
{
struct databuffer *buffer;
if(ctx_asyn->ctx->fail_state)
{
if(way == PUT_MEM_FREE) free((void *)data);
return 0;//<2F><>ʱ<EFBFBD>Ⱥ<EFBFBD><C8BA>Է<EFBFBD><D4B7><EFBFBD>ֵ<EFBFBD><D6B5><EFBFBD>Իص<D4BB><D8B5><EFBFBD>ʽ֪ͨ<CDA8><D6AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>û<EFBFBD><C3BB><EFBFBD>֪<EFBFBD><D6AA>ʱEND<4E><44><EFBFBD><EFBFBD><EFBFBD>
}
ctx_asyn->object_size += size;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
if(way == PUT_MEM_COPY)
{
buffer->data = (char *)malloc(size);
memcpy(buffer->data, data, size);
}
else
{
buffer->data = (char*)data;
}
buffer->size = size;
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA;
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
free(buffer->data);
free(buffer);
return -2;
}
return 0;
}
int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf)
{
struct databuffer *buffer;
if(ctx_asyn->ctx->fail_state)
{
return 0;
}
ctx_asyn->object_size += evbuffer_get_length(evbuf);
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF;
buffer->evbuf = evbuffer_new();
evbuffer_add_buffer(buffer->evbuf, evbuf);
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
evbuffer_free(buffer->evbuf);
free(buffer);
return -2;
}
return 0;
}
struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
{
struct cache_evbase_ctx *ctx_asyn;
struct tango_cache_ctx *ctx;
struct databuffer *buffer;
enum OBJECT_LOCATION maybe_loc=OBJECT_IN_UNKNOWN;
if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS)
{
maybe_loc = OBJECT_IN_MINIO;
}
ctx = tango_cache_update_prepare(instance->instance, f, meta, maybe_loc);
if(ctx == NULL)
{
return NULL;
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = ctx;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_START;
//<2F>¼<EFBFBD>֪ͨ<CDA8><D6AA>Ϊ<EFBFBD><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͳ<EFBFBD><CDB3><EFBFBD><EFBFBD>Ϣ
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return NULL;
}
return ctx_asyn;
}
int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* f,
enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
struct cache_evbase_ctx *ctx_asyn;
struct tango_cache_ctx *ctx;
struct databuffer *buffer;
ctx = tango_cache_update_once_prepare(instance->instance, f, meta, size, path, pathsize);
if(ctx == NULL)
{
if(way == PUT_MEM_FREE) free((void *)data);
return -1;
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = ctx;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
if(way == PUT_MEM_COPY)
{
buffer->data = (char *)malloc(size);
memcpy(buffer->data, data, size);
}
else
{
buffer->data = (char*)data;
}
buffer->size = size;
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA;
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
free(buffer->data);
free(buffer);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
}
return 0;
}
int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* f,
struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
struct cache_evbase_ctx *ctx_asyn;
struct tango_cache_ctx *ctx;
struct databuffer *buffer;
ctx = tango_cache_update_once_prepare(instance->instance, f, meta, evbuffer_get_length(evbuf), path, pathsize);
if(ctx == NULL)
{
return -1;
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = ctx;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_EVBUF;
buffer->evbuf = evbuffer_new();
evbuffer_add_buffer(buffer->evbuf, evbuf);
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
evbuffer_free(buffer->evbuf);
free(buffer);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
}
return 0;
}
int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get)
{
struct cache_evbase_ctx *ctx_asyn;
struct databuffer *buffer;
if(instance->instance->param->object_store_way != CACHE_SMALL_REDIS)
{
where_to_get = OBJECT_IN_MINIO;
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_GET, f, meta, where_to_get);
if(ctx_asyn->ctx == NULL)
{
free(ctx_asyn);
return -1;
}
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_FETCH;
buffer->where_to_get = where_to_get;
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;
}
return 0;
}
int cache_evbase_head_object(struct cache_evbase_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
{
struct cache_evbase_ctx *ctx_asyn;
struct databuffer *buffer;
enum OBJECT_LOCATION location = OBJECT_IN_MINIO;
if(instance->instance->param->object_store_way != CACHE_ALL_MINIO)
{
location = OBJECT_IN_REDIS;
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, CACHE_REQUEST_HEAD, f, meta, location);
if(ctx_asyn->ctx == NULL)
{
free(ctx_asyn);
return -1;
}
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_HEAD;
buffer->where_to_get = location;
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;
}
return 0;
}
int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket)
{
struct cache_evbase_ctx *ctx_asyn;
struct databuffer *buffer;
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, f, objkey, minio_addr, bucket);
if(ctx_asyn->ctx == NULL)
{
free(ctx_asyn);
return -1;
}
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_DELETE;
//<2F>ο<EFBFBD>Unix<69>߼<EFBFBD><DFBC><EFBFBD><EFBFBD><EFBFBD>432ҳ<32><D2B3><EFBFBD>ڶ<EFBFBD><DAB6>߳<EFBFBD>д<EFBFBD>İ<EFBFBD>ȫ<EFBFBD><C8AB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -2;
}
return 0;
}
struct tango_cache_parameter *cache_evbase_parameter_new(const char* profile_path, const char* section, void *runtimelog)
{
return tango_cache_parameter_new(profile_path, section, runtimelog);
}
struct cache_evbase_instance *cache_evbase_instance_new(struct tango_cache_parameter *param, void *runtimelog)
{
evutil_socket_t notification_fd[2];
struct cache_evbase_instance *instance_asyn;
struct event_base *evbase;
pthread_t thread_tid;
pthread_attr_t attr;
if(create_notification_pipe(notification_fd))
{
return NULL;
}
if((evbase = event_base_new()) == NULL)
{
return NULL;
}
instance_asyn = (struct cache_evbase_instance *)calloc(1, sizeof(struct cache_evbase_instance));
instance_asyn->evbase = evbase;
instance_asyn->notify_readfd = notification_fd[0];
instance_asyn->notify_sendfd = notification_fd[1];
instance_asyn->instance = tango_cache_instance_new(param, evbase, runtimelog);
if(instance_asyn->instance == NULL)
{
free(instance_asyn);
return NULL;
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if(pthread_create(&thread_tid, &attr, thread_listen_sockpair, instance_asyn))
{
free(instance_asyn);
return NULL;
}
return instance_asyn;
}
void cache_evbase_global_init(void)
{
tango_cache_global_init();
}