442 lines
12 KiB
C++
442 lines
12 KiB
C++
#include <sys/ioctl.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <net/if.h>
|
|
#include <unistd.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <assert.h>
|
|
#include <errno.h>
|
|
#include <sys/prctl.h>
|
|
#include <sys/un.h>
|
|
#include <unistd.h>
|
|
#include <sys/poll.h>
|
|
#include <curl/curl.h>
|
|
#include <fcntl.h>
|
|
#include <sys/stat.h>
|
|
#include <errno.h>
|
|
#include <sys/cdefs.h>
|
|
#include <MESA/MESA_handle_logger.h>
|
|
#include <pthread.h>
|
|
#include <sys/prctl.h>
|
|
|
|
#include "object_store_client.h"
|
|
|
|
#define METHOD_GET 1
|
|
#define METHOD_PUT 2
|
|
#define METHOD_HEAD 3
|
|
#define METHOD_PUTONCEEV 4
|
|
#define METHOD_PUTONCE 5
|
|
#define METHOD_DEL 6
|
|
|
|
struct object_store_instance *instance_asyn;
|
|
|
|
struct filecontent
|
|
{
|
|
char *buf;
|
|
size_t len;
|
|
};
|
|
|
|
struct future_pdata
|
|
{
|
|
struct future * future;
|
|
FILE *fp;
|
|
char filename[256];
|
|
};
|
|
|
|
void get_future_success(future_result_t* result, void * user)
|
|
{
|
|
struct tango_cache_result *res = cache_evbase_read_result(result);
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
//char buffer[1024];
|
|
|
|
switch(res->type)
|
|
{
|
|
case RESULT_TYPE_USERTAG:
|
|
case RESULT_TYPE_HEADER:
|
|
//memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size);
|
|
//buffer[res->size] = '\0';
|
|
//printf("%s", buffer);
|
|
break;
|
|
case RESULT_TYPE_BODY:
|
|
//fwrite(res->data_frag, res->size, 1, pdata->fp);
|
|
break;
|
|
case RESULT_TYPE_MISS:
|
|
//printf("cache not hit/fresh\n");
|
|
case RESULT_TYPE_END:
|
|
//if(res->type != RESULT_TYPE_MISS)
|
|
// printf("get cache over, total length: %ld\n", res->tlength);
|
|
future_destroy(pdata->future);
|
|
//fclose(pdata->fp);
|
|
free(pdata);
|
|
break;
|
|
default:break;
|
|
}
|
|
}
|
|
|
|
void get_future_failed(enum e_future_error err, const char * what, void * user)
|
|
{
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
//printf("GET fail: %s\n", what);
|
|
}
|
|
|
|
void head_future_success(future_result_t* result, void * user)
|
|
{
|
|
struct tango_cache_result *res = cache_evbase_read_result(result);
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
char buffer[1024];
|
|
|
|
switch(res->type)
|
|
{
|
|
case RESULT_TYPE_USERTAG:
|
|
case RESULT_TYPE_HEADER:
|
|
memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size);
|
|
buffer[res->size] = '\0';
|
|
printf("%s", buffer);
|
|
break;
|
|
case RESULT_TYPE_BODY:
|
|
assert(0);
|
|
break;
|
|
case RESULT_TYPE_MISS:
|
|
printf("cache not hit/fresh\n");
|
|
case RESULT_TYPE_END:
|
|
if(res->type != RESULT_TYPE_MISS)
|
|
printf("HEAD cache over, total length: %ld\n", res->tlength);
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
break;
|
|
default:break;
|
|
}
|
|
}
|
|
|
|
void head_future_failed(enum e_future_error err, const char * what, void * user)
|
|
{
|
|
printf("HEAD fail: %s\n", what);
|
|
}
|
|
|
|
|
|
void put_future_success(future_result_t* result, void * user)
|
|
{
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
|
|
//printf("PUT %s succ\n", pdata->filename);
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
void put_future_failed(enum e_future_error err, const char * what, void * user)
|
|
{
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
|
|
//printf("PUT %s fail: %s\n", what, pdata->filename);
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
|
|
void del_future_success(future_result_t* result, void * user)
|
|
{
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
|
|
printf("DEL %s succ\n", pdata->filename);
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
void del_future_failed(enum e_future_error err, const char * what, void * user)
|
|
{
|
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
|
|
|
printf("DEL %s fail: %s\n", pdata->filename, what);
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
|
|
char * get_file_content(const char *filename, size_t *filelen_out)
|
|
{
|
|
char *buffer;
|
|
FILE *fp;
|
|
size_t filelen = 0;
|
|
struct stat filestat;
|
|
int readlen;
|
|
|
|
fp = fopen(filename, "rb");
|
|
if(fp == NULL)
|
|
{
|
|
printf("fopen file %s failed.\n", filename);
|
|
return NULL;
|
|
}
|
|
if(fstat(fileno(fp), &filestat))
|
|
{
|
|
printf("fstat %s failed.\n", filename);
|
|
return NULL;
|
|
}
|
|
|
|
buffer = (char *)malloc(filestat.st_size);
|
|
|
|
while(filelen < (size_t)filestat.st_size)
|
|
{
|
|
readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp);
|
|
if(readlen < 0)
|
|
{
|
|
printf("read error: %s\n", strerror(errno));
|
|
return NULL;
|
|
}
|
|
|
|
filelen += readlen;
|
|
}
|
|
fclose(fp);
|
|
*filelen_out = filestat.st_size;
|
|
|
|
return buffer;
|
|
}
|
|
|
|
struct cache_statistics g_out_last;
|
|
void timer_cb(evutil_socket_t fd, short what, void *arg)
|
|
{
|
|
struct timeval tv;
|
|
struct cache_statistics out_now;
|
|
struct cache_statistics out;
|
|
|
|
tv.tv_sec = 10;
|
|
tv.tv_usec = 0;
|
|
|
|
/*static int num=0;
|
|
num++;
|
|
if(ctx_global!=NULL && num>5)
|
|
{
|
|
tango_cache_update_end(ctx_global);
|
|
ctx_global = NULL;
|
|
}*/
|
|
|
|
object_store_get_statistics(instance_asyn, &out_now);
|
|
out.del_error_num = out_now.del_error_num - g_out_last.del_error_num;
|
|
out.del_recv_num = out_now.del_recv_num - g_out_last.del_recv_num;
|
|
out.del_succ_num = out_now.del_succ_num - g_out_last.del_succ_num;
|
|
out.get_err_http = out_now.get_err_http - g_out_last.get_err_http;
|
|
out.get_err_redis = out_now.get_err_redis - g_out_last.get_err_redis;
|
|
out.get_miss_num = out_now.get_miss_num - g_out_last.get_miss_num;
|
|
out.get_recv_num = out_now.get_recv_num - g_out_last.get_recv_num;
|
|
out.get_succ_http = out_now.get_succ_http - g_out_last.get_succ_http;
|
|
out.get_succ_redis = out_now.get_succ_redis - g_out_last.get_succ_redis;
|
|
out.put_err_http = out_now.put_err_http - g_out_last.put_err_http;
|
|
out.put_err_redis = out_now.put_err_redis - g_out_last.put_err_redis;
|
|
out.put_recv_num = out_now.put_recv_num - g_out_last.put_recv_num;
|
|
out.put_succ_http = out_now.put_succ_http - g_out_last.put_succ_http;
|
|
out.put_succ_redis = out_now.put_succ_redis - g_out_last.put_succ_redis;
|
|
out.session_http = out_now.session_http;
|
|
out.session_redis = out_now.session_redis;
|
|
out.memory_used = out_now.memory_used;
|
|
out.totaldrop_num = out_now.totaldrop_num - g_out_last.totaldrop_num;
|
|
|
|
printf("-------------------------------------------------------------------------------------------\n"
|
|
"get_recv: %llu, get_http: %llu, get_redis: %llu, get_fail_http: %llu, get_fail_redis: %llu, get_miss: %llu\n"
|
|
"put_recv: %llu, put_http: %llu, put_redis: %llu, put_fail_http: %llu, put_fail_redis: %llu\n"
|
|
"del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session_redis: %llu, session_http: %llu, memory: %llu\n",
|
|
out.get_recv_num, out.get_succ_http, out.get_succ_redis, out.get_err_http, out.get_err_redis, out.get_miss_num,
|
|
out.put_recv_num, out.put_succ_http, out.put_succ_redis, out.put_err_http, out.put_err_redis,
|
|
out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_redis, out.session_http, out.memory_used);
|
|
|
|
g_out_last = out_now;
|
|
event_add((struct event *)arg, &tv);
|
|
}
|
|
|
|
struct filecontentcmd
|
|
{
|
|
int method;
|
|
int threads;
|
|
int total_num;
|
|
int sess_limit;
|
|
char file[256];
|
|
};
|
|
|
|
|
|
static void* thread_transfer_cmd(void *arg)
|
|
{
|
|
int index=0;
|
|
char filename_in[256];
|
|
struct tango_cache_meta_put putmeta;
|
|
struct tango_cache_meta_get getmeta;
|
|
struct future_pdata *pdata;
|
|
struct cache_evbase_ctx *ctx;
|
|
struct filecontent filecont;
|
|
size_t remain_len;
|
|
struct cache_statistics out;
|
|
struct evbuffer *evbuf;
|
|
struct filecontentcmd *filecmd = (struct filecontentcmd *)arg;
|
|
|
|
prctl(PR_SET_NAME, "transfer_cmd");
|
|
|
|
memset(&putmeta, 0, sizeof(struct tango_cache_meta_put));
|
|
memset(&getmeta, 0, sizeof(struct tango_cache_meta_get));
|
|
putmeta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype";
|
|
putmeta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip";
|
|
putmeta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n";
|
|
putmeta.usertag_len = strlen(putmeta.usertag);
|
|
|
|
filecont.buf = get_file_content(filecmd->file, &filecont.len);
|
|
assert(filecont.buf != NULL);
|
|
|
|
while(1)
|
|
{
|
|
object_store_get_statistics(instance_asyn, &out);
|
|
if(out.session_http >= filecmd->sess_limit || out.session_redis>=filecmd->sess_limit)
|
|
{
|
|
usleep(1000);
|
|
continue;
|
|
}
|
|
|
|
switch(filecmd->method)
|
|
{
|
|
case METHOD_GET:
|
|
sprintf(filename_in, "%s_%u", filecmd->file, index);
|
|
getmeta.url = filename_in;
|
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
|
pdata->future = future_create(get_future_success, get_future_failed, pdata);
|
|
object_store_fetch_object(instance_asyn, pdata->future, &getmeta, OBJECT_IN_UNKNOWN);
|
|
break;
|
|
|
|
case METHOD_PUT:
|
|
remain_len = filecont.len;
|
|
|
|
sprintf(filename_in, "%s_%u", filecmd->file, index);
|
|
putmeta.url = filename_in;
|
|
|
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
|
ctx = object_store_update_start(instance_asyn, pdata->future, &putmeta);
|
|
if(ctx == NULL)
|
|
{
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
continue;
|
|
}
|
|
while(remain_len >= 1024)
|
|
{
|
|
object_store_update_frag_data(ctx, PUT_MEM_COPY, filecont.buf+(filecont.len-remain_len), 1024);
|
|
remain_len -= 1024;
|
|
}
|
|
if(remain_len > 0)
|
|
{
|
|
object_store_update_frag_data(ctx, PUT_MEM_COPY, filecont.buf+(filecont.len-remain_len), remain_len);
|
|
}
|
|
if(object_store_update_end(ctx, pdata->filename, 256))
|
|
{
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
break;
|
|
|
|
case METHOD_HEAD:
|
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
|
pdata->future = future_create(head_future_success, head_future_failed, pdata);
|
|
object_store_head_object(instance_asyn, pdata->future, &getmeta);
|
|
break;
|
|
|
|
case METHOD_DEL:
|
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
|
pdata->future = future_create(del_future_success, del_future_failed, pdata);
|
|
sprintf(pdata->filename, "%s_%u", filecmd->file, index);
|
|
object_store_delete_object(instance_asyn, pdata->future, pdata->filename);
|
|
break;
|
|
case METHOD_PUTONCE:
|
|
remain_len = filecont.len;
|
|
|
|
sprintf(filename_in, "%s_%u", filecmd->file, index);
|
|
putmeta.url = filename_in;
|
|
|
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
|
if(object_store_upload_once_data(instance_asyn, pdata->future, PUT_MEM_COPY, filecont.buf, filecont.len, &putmeta, pdata->filename, 256))
|
|
{
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
break;
|
|
case METHOD_PUTONCEEV:
|
|
remain_len = filecont.len;
|
|
|
|
sprintf(filename_in, "%s_%u", filecmd->file, index);
|
|
putmeta.url = filename_in;
|
|
|
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
|
evbuf = evbuffer_new();
|
|
|
|
remain_len = filecont.len;
|
|
while(remain_len >= 1024)
|
|
{
|
|
evbuffer_add(evbuf, filecont.buf+(filecont.len-remain_len), 1024);
|
|
remain_len -= 1024;
|
|
}
|
|
if(remain_len > 0)
|
|
{
|
|
evbuffer_add(evbuf, filecont.buf+(filecont.len-remain_len), remain_len);
|
|
}
|
|
if(object_store_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &putmeta, pdata->filename, 256))
|
|
{
|
|
future_destroy(pdata->future);
|
|
free(pdata);
|
|
}
|
|
break;
|
|
default:break;
|
|
}
|
|
|
|
index = (index+1) % filecmd->total_num;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
pthread_t thread_tid;
|
|
pthread_attr_t attr;
|
|
void *runtime_log;
|
|
struct filecontentcmd filecmd;
|
|
|
|
struct event ev_timer;
|
|
struct timeval tv;
|
|
struct event_base *ev_base;
|
|
|
|
if(argc != 6)
|
|
{
|
|
printf("USAGE: %s <method,1-GET,2-PUT,5-PUTONCE> <file> <threads> <total_num> <limit_session_num>\n", argv[0]);
|
|
return -1;
|
|
}
|
|
|
|
runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10);
|
|
if(NULL==runtime_log)
|
|
{
|
|
return -1;
|
|
}
|
|
filecmd.method = atoi(argv[1]);
|
|
filecmd.threads = atoi(argv[3]);
|
|
filecmd.total_num = atoi(argv[4]);
|
|
filecmd.sess_limit = atoi(argv[5]);
|
|
sprintf(filecmd.file, "%s", argv[2]);
|
|
|
|
object_store_global_init();
|
|
instance_asyn = object_store_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", filecmd.threads, runtime_log);
|
|
assert(instance_asyn!=NULL);
|
|
|
|
pthread_attr_init(&attr);
|
|
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
|
for(int i=0; i<4; i++)
|
|
{
|
|
if(pthread_create(&thread_tid, &attr, thread_transfer_cmd, &filecmd))
|
|
{
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
ev_base = event_base_new();
|
|
tv.tv_sec = 10;
|
|
tv.tv_usec = 0;
|
|
evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer);
|
|
evtimer_add(&ev_timer, &tv);
|
|
event_base_dispatch(ev_base);
|
|
return 0;
|
|
}
|
|
|