#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cache_evbase_client.h" struct cache_evbase_instance *instance_asyn; int runing_over=0; struct future_pdata { struct future * future; FILE *fp; char filename[256]; }; void get_future_success(future_result_t* result, void * user) { struct tango_cache_result *res = cache_evbase_read_result(result); struct future_pdata *pdata = (struct future_pdata *)user; char buffer[1024]; switch(res->type) { case RESULT_TYPE_USERTAG: case RESULT_TYPE_HEADER: memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); buffer[res->size] = '\0'; printf("%s", buffer); break; case RESULT_TYPE_BODY: fwrite(res->data_frag, res->size, 1, pdata->fp); break; case RESULT_TYPE_MISS: printf("cache not hit/fresh\n"); case RESULT_TYPE_END: if(res->type != RESULT_TYPE_MISS) printf("get cache over, total length: %ld\n", res->tlength); future_destroy(pdata->future); fclose(pdata->fp); free(pdata); runing_over = 1; break; default:break; } } void get_future_failed(enum e_future_error err, const char * what, void * user) { printf("GET fail: %s\n", what); runing_over = 2; } void put_future_success(future_result_t* result, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("PUT %s succ\n", pdata->filename); future_destroy(pdata->future); free(pdata); runing_over = 1; } void put_future_failed(enum e_future_error err, const char * what, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("PUT %s fail: %s\n", what, pdata->filename); future_destroy(pdata->future); free(pdata); runing_over = 1; } void del_future_success(future_result_t* result, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("DEL %s succ\n", pdata->filename); future_destroy(pdata->future); free(pdata); runing_over = 1; } void del_future_failed(enum e_future_error err, const char * what, void * user) { struct future_pdata *pdata = (struct future_pdata *)user; printf("DEL %s fail: %s\n", pdata->filename, what); future_destroy(pdata->future); free(pdata); runing_over = 1; } char * get_file_content(const char *filename, size_t *filelen_out) { char *buffer; FILE *fp; size_t filelen = 0; struct stat filestat; int readlen; fp = fopen(filename, "rb"); if(fp == NULL) { printf("fopen file %s failed.\n", filename); return NULL; } if(fstat(fileno(fp), &filestat)) { printf("fstat %s failed.\n", filename); return NULL; } buffer = (char *)malloc(filestat.st_size); while(filelen < (size_t)filestat.st_size) { readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp); if(readlen < 0) { printf("read error: %s\n", strerror(errno)); return NULL; } filelen += readlen; } fclose(fp); *filelen_out = filestat.st_size; return buffer; } struct pthread_data { char *argv; int upload_times; int thread_id; }; void* thread_upload_download(void *arg) { int n; char method[16], filename_in[256], filename_out[256], *p; struct tango_cache_meta_put putmeta; struct tango_cache_meta_get getmeta; struct future_pdata *pdata; struct cache_evbase_ctx *ctx; struct pthread_data *thread_data = (struct pthread_data *)arg; if(sscanf(thread_data->argv, "%[^:]:%1023s%n", method, filename_in, &n) != 2) { assert(0); } if(strlen(filename_in) <= 0) { return NULL; } memset(&putmeta, 0, sizeof(struct tango_cache_meta_put)); putmeta.url = filename_in; putmeta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype"; putmeta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip"; putmeta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n"; putmeta.usertag_len = strlen(putmeta.usertag); getmeta.url = filename_in; p = method; while(*p=='\r'||*p=='\n') p++; assert(*p!='\0'); for(int i=0; iupload_times; i++) { pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); if(!strcasecmp(p, "GET")) { sprintf(filename_out, "file_index_%d_%d.bin", thread_data->thread_id, i); pdata->future = future_create(get_future_success, get_future_failed, pdata); pdata->fp = fopen(filename_out, "w"); cache_evbase_fetch_object(instance_asyn, pdata->future, &getmeta); } else if(!strcasecmp(p, "DEL")) { pdata->future = future_create(del_future_success, del_future_failed, pdata); sprintf(pdata->filename, "%s", filename_in); cache_evbase_delete_object(instance_asyn, pdata->future, filename_in); } else if(!strcasecmp(p, "PUTONCE")) { size_t filelen; p = get_file_content(filename_in, &filelen); pdata->future = future_create(put_future_success, put_future_failed, pdata); if(cache_evbase_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, p, filelen, &putmeta, pdata->filename, 256)) { printf("cache_evbase_upload_once_data fail: %d\n", cache_evbase_ctx_error(instance_asyn)); future_destroy(pdata->future); free(pdata); } } else if(!strcasecmp(p, "PUTONCEEV")) { size_t readlen; pdata->future = future_create(put_future_success, put_future_failed, pdata); struct evbuffer *evbuf = evbuffer_new(); char buffer[1024]; FILE *fp = fopen(filename_in, "rb"); while(!feof(fp)) { readlen = fread(buffer, 1, 1024, fp); if(readlen < 0) { assert(0); } evbuffer_add(evbuf, buffer, readlen); } fclose(fp); if(cache_evbase_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &putmeta, pdata->filename, 256)) { printf("cache_evbase_upload_once_evbuf fail: %d\n", cache_evbase_ctx_error(instance_asyn)); future_destroy(pdata->future); free(pdata); } evbuffer_free(evbuf); } else { pdata->future = future_create(put_future_success, put_future_failed, pdata); ctx = cache_evbase_update_start(instance_asyn, pdata->future, &putmeta); if(ctx==NULL) { printf("cache_evbase_update_start fail: %d\n", cache_evbase_ctx_error(instance_asyn)); future_destroy(pdata->future); free(pdata); continue; } cache_evbase_get_object_path(ctx, pdata->filename, 256); char buffer[1024]; FILE *fp = fopen(filename_in, "r"); while(!feof(fp)) { n = fread(buffer, 1, 1024, fp); assert(n>=0); cache_evbase_update_frag_data(ctx, PUT_MEM_COPY, buffer, n); } cache_evbase_update_end(ctx); } } printf("transfer over\n"); return NULL; } int main(int argc, char **argv) { struct cache_statistics out; void *runtime_log; pthread_t thread_tid; pthread_attr_t attr; struct pthread_data pdata[20]; struct tango_cache_parameter *parameter; if(argc!=3) { printf("USGAE: %s \n", argv[0]); return -1; } runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); if(NULL==runtime_log) { return -1; } cache_evbase_global_init(); parameter = cache_evbase_parameter_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); assert(parameter != NULL); instance_asyn = cache_evbase_instance_new(parameter, runtime_log); assert(instance_asyn!=NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); for(int i=0; i<20; i++) { pdata[i].argv = argv[1]; pdata[i].thread_id = i; pdata[i].upload_times = atoi(argv[2]); pthread_create(&thread_tid, &attr, thread_upload_download, &pdata[i]); } while(1) { sleep(30); cache_evbase_get_statistics(instance_asyn, &out); printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); } return 0; }