diff --git a/cache/cache_evbase_client.cpp b/cache/cache_evbase_client.cpp index 1020a8f..f880a4b 100644 --- a/cache/cache_evbase_client.cpp +++ b/cache/cache_evbase_client.cpp @@ -457,6 +457,11 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, future, meta); + if(ctx_asyn->ctx == NULL) + { + free(ctx_asyn); + return -1; + } buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; @@ -467,7 +472,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); - return -1; + return -2; } return 0; } @@ -480,6 +485,11 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx)); ctx_asyn->instance_asyn = instance; ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, future, objkey); + if(ctx_asyn->ctx == NULL) + { + free(ctx_asyn); + return -1; + } buffer = (struct databuffer *)malloc(sizeof(struct databuffer)); buffer->ctx_asyn = ctx_asyn; @@ -492,7 +502,7 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu tango_cache_ctx_destroy(ctx_asyn->ctx); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); - return -1; + return -2; } return 0; } @@ -535,3 +545,8 @@ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path return instance_asyn; } +void cache_evbase_global_init(void) +{ + tango_cache_global_init(); +} + diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index 3585a71..538b6d7 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -26,6 +26,8 @@ enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *c enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance); void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out); +void cache_evbase_global_init(void); + /*创建实例,每线程一个,或使用时加锁*/ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog); diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index 2019dc5..90e0f28 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -42,7 +42,7 @@ struct cache_statistics long long del_recv_num; //发起DELETE的次数 long long del_succ_num; //DELETE成功的次数 long long del_error_num;//DELETE成功的次数 - long long totaldrop_num;//内存满DROP的次数 + long long totaldrop_num;//内存满以及WiredLB出错时DROP的次数 long long memory_used; //当前UPLOAD BODY所占内存大小 long long session_num; //当前正在进行GET/PUT的HTTP会话数 }; @@ -91,6 +91,9 @@ enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance); void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out); +/*每个进程执行一次初始化*/ +void tango_cache_global_init(void); + /*以下所有API线程不安全*/ //每个监听线程创建一个instance struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog); diff --git a/cache/tango_cache_client.cpp b/cache/tango_cache_client.cpp index 2f55ef6..f25177e 100644 --- a/cache/tango_cache_client.cpp +++ b/cache/tango_cache_client.cpp @@ -18,7 +18,7 @@ int TANGO_CACHE_VERSION_20180925=0; -void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) +static void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size) { SHA256_CTX c; unsigned char sha256[128]; @@ -33,7 +33,7 @@ void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_ } } -int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize) +static int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize) { struct WLB_consumer_t chosen; @@ -81,7 +81,7 @@ void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key); } -static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic) +static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic) { switch(method) { @@ -166,7 +166,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx) if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); if(ctx->headers != NULL) curl_slist_free_all(ctx->headers); - if(ctx->put.evbuf!=NULL) evbuffer_free(ctx->put.evbuf); + if(ctx->put.evbuf!=NULL) + { + ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); + evbuffer_free(ctx->put.evbuf); + } TAILQ_FOREACH(etag, &ctx->put.etag_head, node) { TAILQ_REMOVE(&ctx->put.etag_head, etag, node); @@ -256,6 +260,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size) { instance->error_code = CACHE_OUTOF_MEMORY; + instance->statistic.totaldrop_num += 1; return NULL; } @@ -276,6 +281,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; + instance->statistic.totaldrop_num += 1; free(ctx); return NULL; } @@ -307,6 +313,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { ctx->headers = curl_slist_append(ctx->headers, "Content-Type:"); } + ctx->headers = curl_slist_append(ctx->headers, "Expect:"); //其他定义的头部,GET时会原样返回 if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN) { @@ -397,6 +404,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; + instance->statistic.totaldrop_num += 1; free(ctx); return NULL; } @@ -405,7 +413,14 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta) { - return tango_cache_fetch_start(tango_cache_fetch_prepare(instance, future, meta)); + struct tango_cache_ctx *ctx; + + ctx = tango_cache_fetch_prepare(instance, future, meta); + if(ctx == NULL) + { + return -1; + } + return tango_cache_fetch_start(ctx); } struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey) @@ -430,6 +445,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * if(wired_load_balancer_lookup(instance->wiredlb, objkey, strlen(objkey), ctx->hostaddr, 48)) { instance->error_code = CACHE_ERR_WIREDLB; + instance->statistic.totaldrop_num += 1; free(ctx); return NULL; } @@ -438,7 +454,14 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey) { - return (cache_delete_minio_object(tango_cache_delete_prepare(instance, future, objkey))==1)?0:-1; + struct tango_cache_ctx *ctx; + + ctx = tango_cache_delete_prepare(instance, future, objkey); + if(ctx == NULL) + { + return -1; + } + return (cache_delete_minio_object(ctx)==1)?0:-1; } static void check_multi_info(CURLM *multi) @@ -579,6 +602,29 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) return 0; //0-success; -1-error } +static int wired_load_balancer_init(struct tango_cache_instance *instance) +{ + instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER); + if(instance->wiredlb == NULL) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); + return -1; + } + wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override)); + wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1); + if(instance->wiredlb_override) + { + wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1); + wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port)); + } + if(wiredLB_init(instance->wiredlb) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n"); + return -1; + } + return 0; +} + static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section) { u_int32_t intval; @@ -619,29 +665,6 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha return 0; } -int wired_load_balancer_init(struct tango_cache_instance *instance) -{ - instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER); - if(instance->wiredlb == NULL) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); - return -1; - } - wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override)); - wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1); - if(instance->wiredlb_override) - { - wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1); - wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port)); - } - if(wiredLB_init(instance->wiredlb) < 0) - { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n"); - return -1; - } - return 0; -} - struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog) { struct tango_cache_instance *instance; @@ -664,7 +687,7 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase, instance->multi_hd = curl_multi_init(); instance->runtime_log = runtimelog; - curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1); + curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX); curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb); curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp @@ -676,3 +699,8 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase, return instance; } +void tango_cache_global_init(void) +{ + curl_global_init(CURL_GLOBAL_NOTHING); +} + diff --git a/cache/test_demo/Makefile b/cache/test_demo/Makefile index 5078227..509c8c3 100644 --- a/cache/test_demo/Makefile +++ b/cache/test_demo/Makefile @@ -3,17 +3,19 @@ CCC=g++ INC_PATH=-I../src/include -I../src/include/libevent2 CFLAGS=-Wall -g $(INC_PATH) -LIBS = -lMESA_handle_logger -lMESA_htable -lMESA_prof_load -lwiredcfg +LIBS = -lMESA_handle_logger -lMESA_htable -lMESA_prof_load -lWiredLB LIBS += -lssl -lcrypto -lcurl -levent -lxml2 LIBS += ../src/pangu_tango_cache.a OBJS = tango_cache_test.o OBJS_EVBASE=cache_evbase_test.o +OBJS_EVBASE_THREADS=cache_evbase_test_threads.o TARGET_EXE=tango_cache_test TARGET_EXE_EVBASE=cache_evbase_test +TARGET_EXE_EVBASE_THREAD=cache_evbase_test_threads -ALL:$(TARGET_EXE) $(TARGET_EXE_EVBASE) +ALL:$(TARGET_EXE) $(TARGET_EXE_EVBASE) $(TARGET_EXE_EVBASE_THREAD) $(TARGET_EXE):$(OBJS) $(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) @@ -21,6 +23,9 @@ $(TARGET_EXE):$(OBJS) $(TARGET_EXE_EVBASE):$(OBJS_EVBASE) $(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread +$(TARGET_EXE_EVBASE_THREAD):$(OBJS_EVBASE_THREADS) + $(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread + .c.o: $(CCC) $(CFLAGS) -c $< .cpp.o: @@ -29,5 +34,5 @@ $(TARGET_EXE_EVBASE):$(OBJS_EVBASE) -include $(DEPS) clean: - rm -rf $(OBJS) $(TARGET_EXE) $(OBJS_EVBASE) $(TARGET_EXE_EVBASE) + rm -rf $(OBJS) $(TARGET_EXE) $(OBJS_EVBASE) $(TARGET_EXE_EVBASE) $(OBJS_EVBASE_THREADS) $(TARGET_EXE_EVBASE_THREAD) diff --git a/cache/test_demo/cache_evbase_test.cpp b/cache/test_demo/cache_evbase_test.cpp index 295289b..f4b4ddd 100644 --- a/cache/test_demo/cache_evbase_test.cpp +++ b/cache/test_demo/cache_evbase_test.cpp @@ -170,7 +170,8 @@ int main(int argc, char **argv) { return -1; } - + + cache_evbase_global_init(); instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); assert(instance_asyn!=NULL); diff --git a/cache/test_demo/cache_evbase_test_threads.cpp b/cache/test_demo/cache_evbase_test_threads.cpp new file mode 100644 index 0000000..20bc861 --- /dev/null +++ b/cache/test_demo/cache_evbase_test_threads.cpp @@ -0,0 +1,322 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cache_evbase_client.h" + +struct cache_evbase_instance *instance_asyn; +int runing_over=0; + +struct future_pdata +{ + struct future * future; + FILE *fp; + char filename[256]; +}; + +void get_future_success(future_result_t* result, void * user) +{ + struct tango_cache_result *res = cache_evbase_read_result(result); + struct future_pdata *pdata = (struct future_pdata *)user; + char buffer[1024]; + + if(res != NULL) + { + switch(res->type) + { + case RESULT_TYPE_BODY: + fwrite(res->data_frag, res->size, 1, pdata->fp); + break; + + case RESULT_TYPE_USERTAG: + case RESULT_TYPE_HEADER: + memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size); + buffer[res->size] = '\0'; + printf("%s", buffer); + break; + default:break; + } + } + else //结束 + { + future_destroy(pdata->future); + fclose(pdata->fp); + free(pdata); + runing_over = 1; + } +} + +void get_future_failed(enum e_future_error err, const char * what, void * user) +{ + printf("GET fail: %s\n", what); + runing_over = 2; +} + +void put_future_success(future_result_t* result, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("PUT %s succ\n", pdata->filename); + future_destroy(pdata->future); + free(pdata); + runing_over = 1; +} +void put_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("PUT %s fail: %s\n", what, pdata->filename); + future_destroy(pdata->future); + free(pdata); + runing_over = 1; +} + +void del_future_success(future_result_t* result, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s succ\n", pdata->filename); + future_destroy(pdata->future); + free(pdata); + runing_over = 1; +} +void del_future_failed(enum e_future_error err, const char * what, void * user) +{ + struct future_pdata *pdata = (struct future_pdata *)user; + + printf("DEL %s fail: %s\n", pdata->filename, what); + future_destroy(pdata->future); + free(pdata); + runing_over = 1; +} + +char * get_file_content(const char *filename, size_t *filelen_out) +{ + char *buffer; + FILE *fp; + size_t filelen = 0; + struct stat filestat; + int readlen; + + fp = fopen(filename, "rb"); + if(fp == NULL) + { + printf("fopen file %s failed.\n", filename); + return NULL; + } + if(fstat(fileno(fp), &filestat)) + { + printf("fstat %s failed.\n", filename); + return NULL; + } + + buffer = (char *)malloc(filestat.st_size); + + while(filelen < (size_t)filestat.st_size) + { + readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp); + if(readlen < 0) + { + printf("read error: %s\n", strerror(errno)); + return NULL; + } + + filelen += readlen; + } + fclose(fp); + *filelen_out = filestat.st_size; + + return buffer; +} + +struct pthread_data +{ + char *argv; + int upload_times; + int thread_id; +}; + +void* thread_upload_download(void *arg) +{ + int n; + char method[16], filename_in[256], filename_out[256], *p; + struct tango_cache_meta meta; + struct future_pdata *pdata; + struct cache_evbase_ctx *ctx; + struct pthread_data *thread_data = (struct pthread_data *)arg; + + if(sscanf(thread_data->argv, "%[^:]:%1023s%n", method, filename_in, &n) != 2) + { + assert(0); + } + if(strlen(filename_in) <= 0) + { + return NULL; + } + + memset(&meta, 0, sizeof(struct tango_cache_meta)); + meta.url = filename_in; + meta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype"; + meta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip"; + meta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n"; + meta.usertag_len = strlen(meta.usertag); + + p = method; + while(*p=='\r'||*p=='\n') p++; + assert(*p!='\0'); + + for(int i=0; iupload_times; i++) + { + pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata)); + + if(!strcasecmp(p, "GET")) + { + sprintf(filename_out, "file_index_%d_%d.bin", thread_data->thread_id, i); + pdata->future = future_create(get_future_success, get_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + pdata->fp = fopen(filename_out, "w"); + + cache_evbase_fetch_object(instance_asyn, pdata->future, &meta); + } + else if(!strcasecmp(p, "DEL")) + { + pdata->future = future_create(del_future_success, del_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + sprintf(pdata->filename, "%s", filename_in); + cache_evbase_delete_object(instance_asyn, pdata->future, filename_in); + } + else if(!strcasecmp(p, "PUTONCE")) + { + size_t filelen; + p = get_file_content(filename_in, &filelen); + pdata->future = future_create(put_future_success, put_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + + if(cache_evbase_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, p, filelen, &meta, pdata->filename, 256)) + { + printf("cache_evbase_upload_once_data fail: %d\n", cache_evbase_ctx_error(instance_asyn)); + future_destroy(pdata->future); + free(pdata); + } + } + else if(!strcasecmp(p, "PUTONCEEV")) + { + size_t readlen; + pdata->future = future_create(put_future_success, put_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + struct evbuffer *evbuf = evbuffer_new(); + char buffer[1024]; + + FILE *fp = fopen(filename_in, "rb"); + while(!feof(fp)) + { + readlen = fread(buffer, 1, 1024, fp); + if(readlen < 0) + { + assert(0); + } + evbuffer_add(evbuf, buffer, readlen); + } + fclose(fp); + if(cache_evbase_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &meta, pdata->filename, 256)) + { + printf("cache_evbase_upload_once_evbuf fail: %d\n", cache_evbase_ctx_error(instance_asyn)); + future_destroy(pdata->future); + free(pdata); + } + evbuffer_free(evbuf); + } + else + { + pdata->future = future_create(put_future_success, put_future_failed, pdata); + promise_set_ctx(future_to_promise(pdata->future), NULL, NULL); + + ctx = cache_evbase_update_start(instance_asyn, pdata->future, &meta); + if(ctx==NULL) + { + printf("cache_evbase_update_start fail: %d\n", cache_evbase_ctx_error(instance_asyn)); + future_destroy(pdata->future); + free(pdata); + continue; + } + cache_evbase_get_object_path(ctx, pdata->filename, 256); + + char buffer[1024]; + FILE *fp = fopen(filename_in, "r"); + while(!feof(fp)) + { + n = fread(buffer, 1, 1024, fp); + assert(n>=0); + cache_evbase_update_frag_data(ctx, PUT_MEM_COPY, buffer, n); + } + + cache_evbase_update_end(ctx); + } + } + + printf("transfer over\n"); + return NULL; +} + +int main(int argc, char **argv) +{ + struct cache_statistics out; + void *runtime_log; + pthread_t thread_tid; + pthread_attr_t attr; + struct pthread_data pdata[20]; + + if(argc!=3) + { + printf("USGAE: %s \n", argv[0]); + return -1; + } + + runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10); + if(NULL==runtime_log) + { + return -1; + } + + cache_evbase_global_init(); + instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); + assert(instance_asyn!=NULL); + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + for(int i=0; i<20; i++) + { + pdata[i].argv = argv[1]; + pdata[i].thread_id = i; + pdata[i].upload_times = atoi(argv[2]); + pthread_create(&thread_tid, &attr, thread_upload_download, &pdata[i]); + } + + while(1) + { + sleep(30); + cache_evbase_get_statistics(instance_asyn, &out); + printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); + } + return 0; +} + diff --git a/cache/test_demo/tango_cache_test.c b/cache/test_demo/tango_cache_test.c index 7d3555d..39675bf 100644 --- a/cache/test_demo/tango_cache_test.c +++ b/cache/test_demo/tango_cache_test.c @@ -310,6 +310,7 @@ int main(int crgc, char **arg) } init_fifo(); + tango_cache_global_init(); tango_instance = tango_cache_instance_new(ev_base, "./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log); tv.tv_sec = 10;