缓存取消上传+测试用例
This commit is contained in:
431
cache/test/tango_cache_test.cpp
vendored
Normal file
431
cache/test/tango_cache_test.cpp
vendored
Normal file
@@ -0,0 +1,431 @@
|
||||
#include <event2/event.h>
|
||||
#include <event2/bufferevent.h>
|
||||
#include <event.h>
|
||||
#include <event2/event.h>
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/http.h>
|
||||
#include <event2/http_struct.h>
|
||||
#include <event2/keyvalq_struct.h>
|
||||
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <net/if.h>
|
||||
#include <unistd.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <sys/prctl.h>
|
||||
#include <sys/un.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/poll.h>
|
||||
#include <curl/curl.h>
|
||||
#include <event2/event.h>
|
||||
#include <event2/event_struct.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <errno.h>
|
||||
#include <sys/cdefs.h>
|
||||
#include <MESA/MESA_handle_logger.h>
|
||||
|
||||
#include "tango_cache_client.h"
|
||||
|
||||
struct event_base *ev_base;
|
||||
struct tango_cache_instance *tango_instance;
|
||||
|
||||
#define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */
|
||||
|
||||
struct future_pdata
|
||||
{
|
||||
struct future * future;
|
||||
FILE *fp;
|
||||
char filename[256];
|
||||
};
|
||||
|
||||
void get_future_success(future_result_t* result, void * user)
|
||||
{
|
||||
struct tango_cache_result *res = tango_cache_read_result(result);
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
char buffer[1024];
|
||||
|
||||
switch(res->type)
|
||||
{
|
||||
case RESULT_TYPE_USERTAG:
|
||||
case RESULT_TYPE_HEADER:
|
||||
memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size);
|
||||
buffer[res->size] = '\0';
|
||||
printf("%s", buffer);
|
||||
break;
|
||||
case RESULT_TYPE_BODY:
|
||||
fwrite(res->data_frag, res->size, 1, pdata->fp);
|
||||
break;
|
||||
case RESULT_TYPE_MISS:
|
||||
printf("cache not hit/fresh\n");
|
||||
case RESULT_TYPE_END:
|
||||
if(res->type != RESULT_TYPE_MISS)
|
||||
printf("get cache over, total length: %ld\n", res->tlength);
|
||||
future_destroy(pdata->future);
|
||||
fclose(pdata->fp);
|
||||
free(pdata);
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
}
|
||||
|
||||
void get_future_failed(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
future_destroy(pdata->future);
|
||||
fclose(pdata->fp);
|
||||
free(pdata);
|
||||
printf("GET fail: %s\n", what);
|
||||
}
|
||||
|
||||
void head_future_success(future_result_t* result, void * user)
|
||||
{
|
||||
struct tango_cache_result *res = tango_cache_read_result(result);
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
char buffer[1024];
|
||||
|
||||
switch(res->type)
|
||||
{
|
||||
case RESULT_TYPE_USERTAG:
|
||||
case RESULT_TYPE_HEADER:
|
||||
memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size);
|
||||
buffer[res->size] = '\0';
|
||||
printf("%s", buffer);
|
||||
break;
|
||||
case RESULT_TYPE_BODY:
|
||||
assert(0);
|
||||
break;
|
||||
case RESULT_TYPE_MISS:
|
||||
printf("cache not hit/fresh\n");
|
||||
case RESULT_TYPE_END:
|
||||
if(res->type != RESULT_TYPE_MISS)
|
||||
printf("HEAD cache over, total length: %ld\n", res->tlength);
|
||||
future_destroy(pdata->future);
|
||||
free(pdata);
|
||||
break;
|
||||
default:break;
|
||||
}
|
||||
}
|
||||
|
||||
void head_future_failed(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
|
||||
future_destroy(pdata->future);
|
||||
free(pdata);
|
||||
printf("HEAD fail: %s\n", what);
|
||||
}
|
||||
|
||||
void put_future_success(future_result_t* result, void * user)
|
||||
{
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
|
||||
printf("PUT %s succ\n", pdata->filename);
|
||||
future_destroy(pdata->future);
|
||||
free(pdata);
|
||||
}
|
||||
void put_future_failed(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
|
||||
printf("PUT %s fail: %s\n", what, pdata->filename);
|
||||
future_destroy(pdata->future);
|
||||
free(pdata);
|
||||
}
|
||||
|
||||
void del_future_success(future_result_t* result, void * user)
|
||||
{
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
|
||||
printf("DEL %s succ\n", pdata->filename);
|
||||
future_destroy(pdata->future);
|
||||
free(pdata);
|
||||
}
|
||||
void del_future_failed(enum e_future_error err, const char * what, void * user)
|
||||
{
|
||||
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||
|
||||
printf("DEL %s fail: %s\n", pdata->filename, what);
|
||||
future_destroy(pdata->future);
|
||||
free(pdata);
|
||||
}
|
||||
|
||||
char * get_file_content(const char *filename, size_t *filelen_out)
|
||||
{
|
||||
char *buffer;
|
||||
FILE *fp;
|
||||
size_t filelen = 0;
|
||||
struct stat filestat;
|
||||
int readlen;
|
||||
|
||||
fp = fopen(filename, "rb");
|
||||
if(fp == NULL)
|
||||
{
|
||||
printf("fopen file %s failed.\n", filename);
|
||||
return NULL;
|
||||
}
|
||||
if(fstat(fileno(fp), &filestat))
|
||||
{
|
||||
printf("fstat %s failed.\n", filename);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
buffer = (char *)malloc(filestat.st_size);
|
||||
|
||||
while(filelen < (size_t)filestat.st_size)
|
||||
{
|
||||
readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp);
|
||||
if(readlen < 0)
|
||||
{
|
||||
printf("read error: %s\n", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
filelen += readlen;
|
||||
}
|
||||
fclose(fp);
|
||||
*filelen_out = filestat.st_size;
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* future, char *objlist[], u_int32_t num);
|
||||
|
||||
static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
|
||||
{
|
||||
char s[1024];
|
||||
long int rv = 0;
|
||||
int n = 0;
|
||||
FILE *input = (FILE *)arg;
|
||||
static int index=0;
|
||||
char filename[128], method[16], buffer[1024], *p;
|
||||
char *dellist[16];
|
||||
char *pstart, *save_ptr=NULL;
|
||||
int delnum=0;
|
||||
struct tango_cache_meta_put putmeta;
|
||||
struct tango_cache_meta_get getmeta;
|
||||
struct future_pdata *pdata;
|
||||
|
||||
struct tango_cache_ctx *ctx;
|
||||
|
||||
do
|
||||
{
|
||||
s[0]='\0';
|
||||
rv = fscanf(input, "%[^:]:%1023s%n", method, s, &n);
|
||||
if(strlen(s) > 0)
|
||||
{
|
||||
p = method;
|
||||
|
||||
memset(&putmeta, 0, sizeof(struct tango_cache_meta_put));
|
||||
memset(&getmeta, 0, sizeof(struct tango_cache_meta_get));
|
||||
putmeta.url = s;
|
||||
putmeta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype";
|
||||
putmeta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip";
|
||||
putmeta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n";
|
||||
putmeta.usertag_len = strlen(putmeta.usertag);
|
||||
|
||||
getmeta.url = s;
|
||||
|
||||
while(*p=='\r'||*p=='\n')p++;
|
||||
if(*p=='\0') continue;
|
||||
if(!strcasecmp(p, "GET"))
|
||||
{
|
||||
sprintf(filename, "file_index_%u.bin", index++);
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->fp = fopen(filename, "w");
|
||||
pdata->future = future_create(get_future_success, get_future_failed, pdata);
|
||||
|
||||
if(tango_cache_fetch_object(tango_instance, pdata->future, &getmeta) < 0)
|
||||
{
|
||||
get_future_failed(FUTURE_ERROR_CANCEL, "", pdata);
|
||||
}
|
||||
}
|
||||
else if(!strcasecmp(p, "HEAD"))
|
||||
{
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->future = future_create(head_future_success, head_future_failed, pdata);
|
||||
|
||||
if(tango_cache_head_object(tango_instance, pdata->future, &getmeta) < 0)
|
||||
{
|
||||
head_future_failed(FUTURE_ERROR_CANCEL, "", pdata);
|
||||
}
|
||||
}
|
||||
else if(!strcasecmp(p, "PUTONCE"))
|
||||
{
|
||||
size_t filelen;
|
||||
p = get_file_content(s, &filelen);
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||
|
||||
tango_cache_upload_once_data(tango_instance, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256);
|
||||
}
|
||||
else if(!strcasecmp(p, "PUTONCEEV"))
|
||||
{
|
||||
size_t readlen;
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||
struct evbuffer *evbuf = evbuffer_new();
|
||||
char buffer[1024];
|
||||
|
||||
FILE *fp = fopen(s, "rb");
|
||||
while(!feof(fp))
|
||||
{
|
||||
readlen = fread(buffer, 1, 1024, fp);
|
||||
if(readlen < 0)
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
evbuffer_add(evbuf, buffer, readlen);
|
||||
}
|
||||
fclose(fp);
|
||||
tango_cache_upload_once_evbuf(tango_instance, pdata->future, EVBUFFER_MOVE, evbuf, &putmeta, pdata->filename, 256);
|
||||
}
|
||||
else if(!strcasecmp(p, "DEL"))
|
||||
{
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->future = future_create(del_future_success, del_future_failed, pdata);
|
||||
sprintf(pdata->filename, "%s", s);
|
||||
tango_cache_delete_object(tango_instance, pdata->future, s);
|
||||
}
|
||||
else if(!strcasecmp(p, "DELMUL")) //TODO
|
||||
{
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->future = future_create(del_future_success, del_future_failed, pdata);
|
||||
sprintf(pdata->filename, "%s", s);
|
||||
|
||||
for(pstart = strtok_r(s, ";", &save_ptr); pstart != NULL; pstart = strtok_r(NULL, ";", &save_ptr))
|
||||
{
|
||||
dellist[delnum++] = pstart;
|
||||
}
|
||||
tango_cache_multi_delete(tango_instance, pdata->future, dellist, delnum);
|
||||
}
|
||||
else
|
||||
{
|
||||
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||
|
||||
ctx = tango_cache_update_start(tango_instance, pdata->future, &putmeta);
|
||||
if(ctx==NULL)
|
||||
{
|
||||
put_future_failed(FUTURE_ERROR_CANCEL, "tango_cache_update_start_NULL", pdata);
|
||||
continue;
|
||||
}
|
||||
tango_cache_get_object_path(ctx, pdata->filename, 256);
|
||||
|
||||
FILE *fp = fopen(s, "r");
|
||||
while(!feof(fp))
|
||||
{
|
||||
n = fread(buffer, 1, 1024, fp);
|
||||
assert(n>=0);
|
||||
tango_cache_update_frag_data(ctx, buffer, n);
|
||||
}
|
||||
fclose(fp);
|
||||
tango_cache_update_end(ctx);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
} while(rv != EOF);
|
||||
}
|
||||
|
||||
struct event fifo_event;
|
||||
static int init_fifo(void)
|
||||
{
|
||||
struct stat st;
|
||||
curl_socket_t sockfd;
|
||||
static const char *fifo = "cache.fifo";
|
||||
|
||||
fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
|
||||
if(lstat (fifo, &st) == 0) {
|
||||
if((st.st_mode & S_IFMT) == S_IFREG) {
|
||||
errno = EEXIST;
|
||||
perror("lstat");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
unlink(fifo);
|
||||
if(mkfifo (fifo, 0600) == -1) {
|
||||
perror("mkfifo");
|
||||
exit(1);
|
||||
}
|
||||
sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
|
||||
if(sockfd == -1) {
|
||||
perror("open");
|
||||
exit(1);
|
||||
}
|
||||
FILE *input = fdopen(sockfd, "r");
|
||||
|
||||
memset(&fifo_event, 0, sizeof(struct event));
|
||||
fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
|
||||
event_assign(&fifo_event, ev_base, sockfd, EV_READ|EV_PERSIST,
|
||||
dummy_accept_callback, input);
|
||||
event_add(&fifo_event, NULL);
|
||||
return (0);
|
||||
}
|
||||
|
||||
void timer_cb(evutil_socket_t fd, short what, void *arg)
|
||||
{
|
||||
struct timeval tv;
|
||||
struct cache_statistics out;
|
||||
|
||||
tv.tv_sec = 10;
|
||||
tv.tv_usec = 0;
|
||||
|
||||
/*static int num=0;
|
||||
num++;
|
||||
if(ctx_global!=NULL && num>5)
|
||||
{
|
||||
tango_cache_update_end(ctx_global);
|
||||
ctx_global = NULL;
|
||||
}*/
|
||||
|
||||
tango_cache_get_statistics(tango_instance, &out);
|
||||
printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n",
|
||||
out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used);
|
||||
|
||||
event_add((struct event *)arg, &tv);
|
||||
}
|
||||
|
||||
int main(int crgc, char **arg)
|
||||
{
|
||||
struct event ev_timer;
|
||||
struct timeval tv;
|
||||
void *runtime_log;
|
||||
struct tango_cache_parameter *parameter;
|
||||
|
||||
runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10);
|
||||
if(NULL==runtime_log)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
ev_base = event_base_new();
|
||||
if (!ev_base)
|
||||
{
|
||||
printf("create socket error!\n");
|
||||
return 0;
|
||||
}
|
||||
init_fifo();
|
||||
|
||||
tango_cache_global_init();
|
||||
parameter = tango_cache_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
|
||||
assert(parameter != NULL);
|
||||
tango_instance = tango_cache_instance_new(parameter, ev_base, runtime_log);
|
||||
|
||||
tv.tv_sec = 10;
|
||||
tv.tv_usec = 0;
|
||||
evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer);
|
||||
evtimer_add(&ev_timer, &tv);
|
||||
|
||||
event_base_dispatch(ev_base);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user