This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/cache_evbase_client.cpp

537 lines
14 KiB
C++
Raw Normal View History

2018-09-18 11:14:11 +08:00
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/prctl.h>
#include <string.h>
#include <pthread.h>
#include "cache_evbase_client.h"
#include "tango_cache_transfer.h"
#include "tango_cache_tools.h"
enum CACHE_ASYN_CMD
{
CACHE_ASYN_FETCH=0,
2018-09-21 14:50:41 +08:00
CACHE_ASYN_UPLOAD_ONCE_DATA,
CACHE_ASYN_UPLOAD_ONCE_EVBUF,
2018-09-29 11:28:09 +08:00
CACHE_ASYN_UPLOAD_START,
2018-09-21 14:50:41 +08:00
CACHE_ASYN_UPLOAD_FRAG_DATA,
CACHE_ASYN_UPLOAD_FRAG_EVBUF,
2018-09-18 11:14:11 +08:00
CACHE_ASYN_UPLOAD_END,
CACHE_ASYN_DELETE,
2018-09-18 11:14:11 +08:00
};
struct databuffer
{
char *data;
size_t size;
2018-09-21 14:50:41 +08:00
struct evbuffer *evbuf;
2018-09-18 11:14:11 +08:00
enum CACHE_ASYN_CMD cmd_type;
struct cache_evbase_ctx *ctx_asyn;
};
enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn)
{
2018-09-23 15:35:13 +08:00
return tango_cache_get_last_error(ctx_asyn->ctx);
2018-09-18 11:14:11 +08:00
}
enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance)
{
return tango_cache_ctx_error(instance->instance);
}
void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out)
{
tango_cache_get_statistics(instance->instance, out);
}
2018-09-23 15:35:13 +08:00
struct tango_cache_result *cache_evbase_read_result(void *promise_result)
{
return tango_cache_read_result(promise_result);
}
2018-09-28 15:16:28 +08:00
void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx_asyn, char *path, size_t pathsize)
2018-09-18 11:14:11 +08:00
{
2018-09-28 15:16:28 +08:00
tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize);
2018-09-18 11:14:11 +08:00
}
static int create_notification_pipe(evutil_socket_t sv[2])
{
if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) == -1)
{
return -1;
}
if(evutil_make_socket_nonblocking(sv[0])<0 || evutil_make_socket_nonblocking(sv[1])<0)
{
return -1;
}
if(evutil_make_socket_closeonexec(sv[0])<0 || evutil_make_socket_closeonexec(sv[1])<0)
{
return -1;
}
return 0;
}
static int32_t mesa_tcp_sock_write (int32_t write_fd, void *buf, int32_t bufsize)
{
int32_t res;
do{
res = send(write_fd, buf, bufsize, MSG_NOSIGNAL);
}while (res==-1 &&(errno == EINTR));
return res;
}
static int32_t iothread_notify_event(int32_t socket_fd, void *content, int32_t len, int32_t s_time_out)
{
fd_set w_set, e_set;
struct timeval tv;
int32_t res=0, sndlen=0, sendsize=0;
while(len > sndlen)
{
FD_ZERO (&w_set);
FD_ZERO (&e_set);
FD_SET (socket_fd, &w_set);
FD_SET (socket_fd, &e_set);
if(s_time_out == 0)
{
res = select (socket_fd + 1, NULL, &w_set, &e_set, NULL);
}
else
{
tv.tv_sec = s_time_out;
tv.tv_usec = 0;
res = select (socket_fd + 1, NULL, &w_set, &e_set, &tv);
}
if(res <= 0)
{
printf("log_error: select io res=%d, error: %s", res, strerror(errno));
return -1;
}
if(FD_ISSET(socket_fd, &e_set))
{
printf("log_error: select io is in efds, error: %s", strerror(errno));
return -2;
}
if(FD_ISSET(socket_fd, &w_set))
{
sendsize = mesa_tcp_sock_write(socket_fd, (char*)content + sndlen, len - sndlen);
if (sendsize < 0)
{
if(errno == EAGAIN)
{
continue;
}
return -1;
}
sndlen += sendsize;
}
}
return sndlen;
}
static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn)
{
free(ctx_asyn);
}
static void cache_asyn_ioevent_dispatch(struct databuffer *buffer)
{
struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn;
switch(buffer->cmd_type)
{
case CACHE_ASYN_FETCH:
tango_cache_fetch_start(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_DELETE:
cache_delete_minio_object(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
break;
2018-09-21 14:50:41 +08:00
case CACHE_ASYN_UPLOAD_ONCE_DATA:
tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size);
cache_asyn_ctx_destroy(ctx_asyn);
break;
case CACHE_ASYN_UPLOAD_ONCE_EVBUF:
tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf);
evbuffer_free(buffer->evbuf);
2018-09-18 11:14:11 +08:00
cache_asyn_ctx_destroy(ctx_asyn);
break;
2018-09-29 11:28:09 +08:00
case CACHE_ASYN_UPLOAD_START:
ctx_asyn->ctx->instance->statistic.put_recv_num += 1;
ctx_asyn->ctx->instance->error_code = CACHE_OK;
break;
2018-09-18 11:14:11 +08:00
2018-09-21 14:50:41 +08:00
case CACHE_ASYN_UPLOAD_FRAG_DATA:
tango_cache_update_frag_data(ctx_asyn->ctx, buffer->data, buffer->size);
2018-09-18 11:14:11 +08:00
free(buffer->data);
break;
2018-09-21 14:50:41 +08:00
case CACHE_ASYN_UPLOAD_FRAG_EVBUF:
tango_cache_update_frag_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf);
evbuffer_free(buffer->evbuf);
break;
2018-09-18 11:14:11 +08:00
case CACHE_ASYN_UPLOAD_END:
tango_cache_update_end(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
break;
default: assert(0);break;
}
}
static void sockpair_notification_handler(evutil_socket_t fd, short events, void *arg)
{
ssize_t readlen, needlen;
struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg;
struct databuffer *buffer;
while(1)
{
needlen=sizeof(struct cache_evbase_ctx *);
readlen = recv(fd, &buffer, needlen, 0);
if(readlen == needlen)
{
cache_asyn_ioevent_dispatch(buffer);
free(buffer);
}
else
{
if(errno!=EWOULDBLOCK && errno!=EAGAIN)
{
MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "read pipe error: %s.", strerror(errno));
assert(0);
return;
}
break;
}
}
}
static void* thread_listen_sockpair(void *arg)
{
struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg;
struct event listen_event;
prctl(PR_SET_NAME, "tango_cache");
event_assign(&listen_event, instance_asyn->evbase, instance_asyn->notify_readfd, EV_READ|EV_PERSIST, sockpair_notification_handler, instance_asyn);
event_add(&listen_event, NULL);
event_base_dispatch(instance_asyn->evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
assert(0);
return NULL;
}
void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn)
{
struct databuffer *buffer;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_END;
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
if(!ctx_asyn->ctx->fail_state)
2018-09-18 11:14:11 +08:00
{
ctx_asyn->ctx->fail_state = true;
if(ctx_asyn->ctx->future != NULL)
{
promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error");
}
2018-09-18 11:14:11 +08:00
}
tango_cache_ctx_destroy(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
}
}
2018-09-21 14:50:41 +08:00
int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size)
2018-09-18 11:14:11 +08:00
{
struct databuffer *buffer;
if(ctx_asyn->ctx->fail_state)
{
return -1;
}
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
2018-09-21 14:50:41 +08:00
if(way == PUT_MEM_COPY)
2018-09-18 11:14:11 +08:00
{
buffer->data = (char *)malloc(size);
memcpy(buffer->data, data, size);
}
else
{
buffer->data = (char*)data;
}
buffer->size = size;
buffer->ctx_asyn = ctx_asyn;
2018-09-21 14:50:41 +08:00
buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA;
2018-09-18 11:14:11 +08:00
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
ctx_asyn->ctx->fail_state = true;
if(ctx_asyn->ctx->future != NULL)
{
promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error");
}
free(buffer->data);
free(buffer);
2018-09-29 11:28:09 +08:00
return -2;
2018-09-18 11:14:11 +08:00
}
return 0;
}
2018-09-21 14:50:41 +08:00
int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf)
{
struct databuffer *buffer;
if(ctx_asyn->ctx->fail_state)
{
return -1;
}
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF;
buffer->evbuf = evbuffer_new();
evbuffer_add_buffer(buffer->evbuf, evbuf);
if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
ctx_asyn->ctx->fail_state = true;
if(ctx_asyn->ctx->future != NULL)
{
promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error");
}
evbuffer_free(buffer->evbuf);
free(buffer);
2018-09-29 11:28:09 +08:00
return -2;
2018-09-21 14:50:41 +08:00
}
return 0;
}
2018-09-18 11:14:11 +08:00
struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta)
{
struct cache_evbase_ctx *ctx_asyn;
struct tango_cache_ctx *ctx;
2018-09-29 11:28:09 +08:00
struct databuffer *buffer;
2018-09-18 11:14:11 +08:00
2018-09-29 11:28:09 +08:00
ctx = tango_cache_update_prepare(instance->instance, future, meta);
2018-09-18 11:14:11 +08:00
if(ctx == NULL)
{
return NULL;
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = ctx;
2018-09-29 11:28:09 +08:00
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_START;
//<2F>¼<EFBFBD>֪ͨ<CDA8><D6AA>Ϊ<EFBFBD><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͳ<EFBFBD><CDB3><EFBFBD><EFBFBD>Ϣ
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
ctx_asyn->ctx->fail_state = true;
tango_cache_ctx_destroy(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return NULL;
}
2018-09-18 11:14:11 +08:00
return ctx_asyn;
}
2018-09-21 14:50:41 +08:00
int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* future,
2018-09-28 15:16:28 +08:00
enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *path, size_t pathsize)
2018-09-18 11:14:11 +08:00
{
struct cache_evbase_ctx *ctx_asyn;
struct tango_cache_ctx *ctx;
struct databuffer *buffer;
ctx = tango_cache_update_prepare(instance->instance, future, meta);
if(ctx == NULL)
{
return -1;
}
2018-09-28 15:16:28 +08:00
if(path != NULL)
2018-09-18 11:14:11 +08:00
{
2018-09-28 15:16:28 +08:00
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key);
2018-09-18 11:14:11 +08:00
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = ctx;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
2018-09-21 14:50:41 +08:00
if(way == PUT_MEM_COPY)
2018-09-18 11:14:11 +08:00
{
buffer->data = (char *)malloc(size);
memcpy(buffer->data, data, size);
}
else
{
buffer->data = (char*)data;
}
buffer->size = size;
buffer->ctx_asyn = ctx_asyn;
2018-09-21 14:50:41 +08:00
buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA;
2018-09-18 11:14:11 +08:00
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
free(buffer->data);
free(buffer);
2018-09-29 11:28:09 +08:00
ctx_asyn->ctx->fail_state = true;
2018-09-18 11:14:11 +08:00
tango_cache_ctx_destroy(ctx);
cache_asyn_ctx_destroy(ctx_asyn);
2018-09-29 11:28:09 +08:00
return -2;
2018-09-21 14:50:41 +08:00
}
return 0;
}
int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* future,
2018-09-28 15:16:28 +08:00
struct evbuffer *evbuf, struct tango_cache_meta *meta, char *path, size_t pathsize)
2018-09-21 14:50:41 +08:00
{
struct cache_evbase_ctx *ctx_asyn;
struct tango_cache_ctx *ctx;
struct databuffer *buffer;
ctx = tango_cache_update_prepare(instance->instance, future, meta);
if(ctx == NULL)
{
return -1;
}
2018-09-28 15:16:28 +08:00
if(path != NULL)
2018-09-21 14:50:41 +08:00
{
2018-09-28 15:16:28 +08:00
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key);
2018-09-21 14:50:41 +08:00
}
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = ctx;
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_EVBUF;
buffer->evbuf = evbuffer_new();
evbuffer_add_buffer(buffer->evbuf, evbuf);
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
evbuffer_free(buffer->evbuf);
free(buffer);
2018-09-29 11:28:09 +08:00
ctx_asyn->ctx->fail_state = true;
2018-09-21 14:50:41 +08:00
tango_cache_ctx_destroy(ctx);
cache_asyn_ctx_destroy(ctx_asyn);
2018-09-29 11:28:09 +08:00
return -2;
2018-09-18 11:14:11 +08:00
}
return 0;
}
int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta)
2018-09-18 11:14:11 +08:00
{
struct cache_evbase_ctx *ctx_asyn;
struct databuffer *buffer;
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, future, meta);
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_FETCH;
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
tango_cache_ctx_destroy(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -1;
}
return 0;
}
int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* future, const char *objkey)
{
struct cache_evbase_ctx *ctx_asyn;
struct databuffer *buffer;
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
ctx_asyn->instance_asyn = instance;
ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, future, objkey);
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
buffer->ctx_asyn = ctx_asyn;
buffer->cmd_type = CACHE_ASYN_DELETE;
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *))
{
2018-09-29 11:28:09 +08:00
ctx_asyn->ctx->fail_state = true;
2018-09-18 11:14:11 +08:00
tango_cache_ctx_destroy(ctx_asyn->ctx);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return -1;
}
return 0;
}
struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog)
{
evutil_socket_t notification_fd[2];
struct cache_evbase_instance *instance_asyn;
struct event_base *evbase;
pthread_t thread_tid;
pthread_attr_t attr;
if(create_notification_pipe(notification_fd))
{
return NULL;
}
if((evbase = event_base_new()) == NULL)
{
return NULL;
}
instance_asyn = (struct cache_evbase_instance *)calloc(1, sizeof(struct cache_evbase_instance));
instance_asyn->evbase = evbase;
instance_asyn->notify_readfd = notification_fd[0];
instance_asyn->notify_sendfd = notification_fd[1];
instance_asyn->instance = tango_cache_instance_new(evbase, profile_path, section, runtimelog);
if(instance_asyn->instance == NULL)
{
free(instance_asyn);
return NULL;
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if(pthread_create(&thread_tid, &attr, thread_listen_sockpair, instance_asyn))
{
free(instance_asyn);
return NULL;
}
return instance_asyn;
}