#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cache_evbase_client.h" #include "tango_cache_transfer.h" #include "tango_cache_tools.h" enum CACHE_ASYN_CMD { CACHE_ASYN_FETCH=0, CACHE_ASYN_UPLOAD_ONCE_DATA, CACHE_ASYN_UPLOAD_ONCE_EVBUF, CACHE_ASYN_UPLOAD_START, CACHE_ASYN_UPLOAD_FRAG_DATA, CACHE_ASYN_UPLOAD_FRAG_EVBUF, CACHE_ASYN_UPLOAD_END, CACHE_ASYN_DELETE, }; struct databuffer { char *data; size_t size; struct evbuffer *evbuf; enum CACHE_ASYN_CMD cmd_type; struct cache_evbase_ctx *ctx_asyn; }; enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn) { return tango_cache_get_last_error(ctx_asyn->ctx); } enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance) { return tango_cache_ctx_error(instance->instance); } void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out) { tango_cache_get_statistics(instance->instance, out); } struct tango_cache_result *cache_evbase_read_result(void *promise_result) { return tango_cache_read_result(promise_result); } void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx_asyn, char *path, size_t pathsize) { tango_cache_get_object_path(ctx_asyn->ctx, path, pathsize); } static int create_notification_pipe(evutil_socket_t sv[2]) { if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) == -1) { return -1; } if(evutil_make_socket_nonblocking(sv[0])<0 || evutil_make_socket_nonblocking(sv[1])<0) { return -1; } if(evutil_make_socket_closeonexec(sv[0])<0 || evutil_make_socket_closeonexec(sv[1])<0) { return -1; } return 0; } static int32_t mesa_tcp_sock_write (int32_t write_fd, void *buf, int32_t bufsize) { int32_t res; do{ res = send(write_fd, buf, bufsize, MSG_NOSIGNAL); }while (res==-1 &&(errno == EINTR)); return res; } static int32_t iothread_notify_event(int32_t socket_fd, void *content, int32_t len, int32_t s_time_out) { fd_set w_set, e_set; struct timeval tv; int32_t res=0, sndlen=0, sendsize=0; while(len > sndlen) { FD_ZERO (&w_set); FD_ZERO (&e_set); FD_SET (socket_fd, &w_set); FD_SET (socket_fd, &e_set); if(s_time_out == 0) { res = select (socket_fd + 1, NULL, &w_set, &e_set, NULL); } else { tv.tv_sec = s_time_out; tv.tv_usec = 0; res = select (socket_fd + 1, NULL, &w_set, &e_set, &tv); } if(res <= 0) { printf("log_error: select io res=%d, error: %s", res, strerror(errno)); return -1; } if(FD_ISSET(socket_fd, &e_set)) { printf("log_error: select io is in efds, error: %s", strerror(errno)); return -2; } if(FD_ISSET(socket_fd, &w_set)) { sendsize = mesa_tcp_sock_write(socket_fd, (char*)content + sndlen, len - sndlen); if (sendsize < 0) { if(errno == EAGAIN) { continue; } return -1; } sndlen += sendsize; } } return sndlen; } static void cache_asyn_ctx_destroy(struct cache_evbase_ctx *ctx_asyn) { free(ctx_asyn); } static void cache_asyn_ioevent_dispatch(struct databuffer *buffer) { struct cache_evbase_ctx *ctx_asyn=buffer->ctx_asyn; switch(buffer->cmd_type) { case CACHE_ASYN_FETCH: tango_cache_fetch_start(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_DELETE: cache_delete_minio_object(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_ONCE_DATA: tango_cache_upload_once_start_data(ctx_asyn->ctx, PUT_MEM_FREE, buffer->data, buffer->size); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_ONCE_EVBUF: tango_cache_upload_once_start_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); evbuffer_free(buffer->evbuf); cache_asyn_ctx_destroy(ctx_asyn); break; case CACHE_ASYN_UPLOAD_START: ctx_asyn->ctx->instance->statistic.put_recv_num += 1; ctx_asyn->ctx->instance->error_code = CACHE_OK; break; case CACHE_ASYN_UPLOAD_FRAG_DATA: tango_cache_update_frag_data(ctx_asyn->ctx, buffer->data, buffer->size); free(buffer->data); break; case CACHE_ASYN_UPLOAD_FRAG_EVBUF: tango_cache_update_frag_evbuf(ctx_asyn->ctx, EVBUFFER_MOVE, buffer->evbuf); evbuffer_free(buffer->evbuf); break; case CACHE_ASYN_UPLOAD_END: tango_cache_update_end(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); break; default: assert(0);break; } } static void sockpair_notification_handler(evutil_socket_t fd, short events, void *arg) { ssize_t readlen, needlen; struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg; struct databuffer *buffer; while(1) { needlen=sizeof(struct cache_evbase_ctx *); readlen = recv(fd, &buffer, needlen, 0); if(readlen == needlen) { cache_asyn_ioevent_dispatch(buffer); free(buffer); } else { if(errno!=EWOULDBLOCK && errno!=EAGAIN) { MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "read pipe error: %s.", strerror(errno)); assert(0); return; } break; } } } static void* thread_listen_sockpair(void *arg) { struct cache_evbase_instance *instance_asyn = (struct cache_evbase_instance *)arg; struct event listen_event; prctl(PR_SET_NAME, "tango_cache"); event_assign(&listen_event, instance_asyn->evbase, instance_asyn->notify_readfd, EV_READ|EV_PERSIST, sockpair_notification_handler, instance_asyn); event_add(&listen_event, NULL); event_base_dispatch(instance_asyn->evbase); printf("Libevent dispath error, should not run here.\n"); MESA_HANDLE_RUNTIME_LOGV2(instance_asyn->instance->runtime_log, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); assert(0); return NULL; } void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn) { struct databuffer *buffer; buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_END; if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { if(!ctx_asyn->ctx->fail_state) { ctx_asyn->ctx->fail_state = true; if(ctx_asyn->ctx->future != NULL) { promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); } } tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); } } int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size) { struct databuffer *buffer; if(ctx_asyn->ctx->fail_state) { return -1; } buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); if(way == PUT_MEM_COPY) { buffer->data = (char *)malloc(size); memcpy(buffer->data, data, size); } else { buffer->data = (char*)data; } buffer->size = size; buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_DATA; if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { ctx_asyn->ctx->fail_state = true; if(ctx_asyn->ctx->future != NULL) { promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); } free(buffer->data); free(buffer); return -2; } return 0; } int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf) { struct databuffer *buffer; if(ctx_asyn->ctx->fail_state) { return -1; } buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_FRAG_EVBUF; buffer->evbuf = evbuffer_new(); evbuffer_add_buffer(buffer->evbuf, evbuf); if(iothread_notify_event(ctx_asyn->instance_asyn->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { ctx_asyn->ctx->fail_state = true; if(ctx_asyn->ctx->future != NULL) { promise_failed(future_to_promise(ctx_asyn->ctx->future), FUTURE_ERROR_CANCEL, "write sockpair error"); } evbuffer_free(buffer->evbuf); free(buffer); return -2; } return 0; } struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct cache_evbase_ctx *ctx_asyn; struct tango_cache_ctx *ctx; struct databuffer *buffer; ctx = tango_cache_update_prepare(instance->instance, future, meta); if(ctx == NULL) { return NULL; } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = ctx; buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_START; //事件通知仅为了增加统计信息 if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { ctx_asyn->ctx->fail_state = true; tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return NULL; } return ctx_asyn; } int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, char *path, size_t pathsize) { struct cache_evbase_ctx *ctx_asyn; struct tango_cache_ctx *ctx; struct databuffer *buffer; ctx = tango_cache_update_prepare(instance->instance, future, meta); if(ctx == NULL) { return -1; } if(path != NULL) { snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key); } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = ctx; buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); if(way == PUT_MEM_COPY) { buffer->data = (char *)malloc(size); memcpy(buffer->data, data, size); } else { buffer->data = (char*)data; } buffer->size = size; buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_DATA; if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { free(buffer->data); free(buffer); ctx_asyn->ctx->fail_state = true; tango_cache_ctx_destroy(ctx); cache_asyn_ctx_destroy(ctx_asyn); return -2; } return 0; } int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* future, struct evbuffer *evbuf, struct tango_cache_meta *meta, char *path, size_t pathsize) { struct cache_evbase_ctx *ctx_asyn; struct tango_cache_ctx *ctx; struct databuffer *buffer; ctx = tango_cache_update_prepare(instance->instance, future, meta); if(ctx == NULL) { return -1; } if(path != NULL) { snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->instance->bucketname, ctx->object_key); } ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = ctx; buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_UPLOAD_ONCE_EVBUF; buffer->evbuf = evbuffer_new(); evbuffer_add_buffer(buffer->evbuf, evbuf); if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { evbuffer_free(buffer->evbuf); free(buffer); ctx_asyn->ctx->fail_state = true; tango_cache_ctx_destroy(ctx); cache_asyn_ctx_destroy(ctx_asyn); return -2; } return 0; } int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta) { struct cache_evbase_ctx *ctx_asyn; struct databuffer *buffer; ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, future, meta); buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_FETCH; if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return -1; } return 0; } int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct future* future, const char *objkey) { struct cache_evbase_ctx *ctx_asyn; struct databuffer *buffer; ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, future, objkey); buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_DELETE; //参考Unix高级编程432页关于多线程写的安全性描述 if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { ctx_asyn->ctx->fail_state = true; tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return -1; } return 0; } struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog) { evutil_socket_t notification_fd[2]; struct cache_evbase_instance *instance_asyn; struct event_base *evbase; pthread_t thread_tid; pthread_attr_t attr; if(create_notification_pipe(notification_fd)) { return NULL; } if((evbase = event_base_new()) == NULL) { return NULL; } instance_asyn = (struct cache_evbase_instance *)calloc(1, sizeof(struct cache_evbase_instance)); instance_asyn->evbase = evbase; instance_asyn->notify_readfd = notification_fd[0]; instance_asyn->notify_sendfd = notification_fd[1]; instance_asyn->instance = tango_cache_instance_new(evbase, profile_path, section, runtimelog); if(instance_asyn->instance == NULL) { free(instance_asyn); return NULL; } pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if(pthread_create(&thread_tid, &attr, thread_listen_sockpair, instance_asyn)) { free(instance_asyn); return NULL; } return instance_asyn; }