修复内存统计BUG;添加全局初始化API;
This commit is contained in:
19
cache/cache_evbase_client.cpp
vendored
19
cache/cache_evbase_client.cpp
vendored
@@ -457,6 +457,11 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut
|
|||||||
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
|
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
|
||||||
ctx_asyn->instance_asyn = instance;
|
ctx_asyn->instance_asyn = instance;
|
||||||
ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, future, meta);
|
ctx_asyn->ctx = tango_cache_fetch_prepare(instance->instance, future, meta);
|
||||||
|
if(ctx_asyn->ctx == NULL)
|
||||||
|
{
|
||||||
|
free(ctx_asyn);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
|
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
|
||||||
buffer->ctx_asyn = ctx_asyn;
|
buffer->ctx_asyn = ctx_asyn;
|
||||||
@@ -467,7 +472,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut
|
|||||||
tango_cache_ctx_destroy(ctx_asyn->ctx);
|
tango_cache_ctx_destroy(ctx_asyn->ctx);
|
||||||
cache_asyn_ctx_destroy(ctx_asyn);
|
cache_asyn_ctx_destroy(ctx_asyn);
|
||||||
free(buffer);
|
free(buffer);
|
||||||
return -1;
|
return -2;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -480,6 +485,11 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
|
|||||||
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
|
ctx_asyn = (struct cache_evbase_ctx *)calloc(1, sizeof(struct cache_evbase_ctx));
|
||||||
ctx_asyn->instance_asyn = instance;
|
ctx_asyn->instance_asyn = instance;
|
||||||
ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, future, objkey);
|
ctx_asyn->ctx = tango_cache_delete_prepare(instance->instance, future, objkey);
|
||||||
|
if(ctx_asyn->ctx == NULL)
|
||||||
|
{
|
||||||
|
free(ctx_asyn);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
|
buffer = (struct databuffer *)malloc(sizeof(struct databuffer));
|
||||||
buffer->ctx_asyn = ctx_asyn;
|
buffer->ctx_asyn = ctx_asyn;
|
||||||
@@ -492,7 +502,7 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
|
|||||||
tango_cache_ctx_destroy(ctx_asyn->ctx);
|
tango_cache_ctx_destroy(ctx_asyn->ctx);
|
||||||
cache_asyn_ctx_destroy(ctx_asyn);
|
cache_asyn_ctx_destroy(ctx_asyn);
|
||||||
free(buffer);
|
free(buffer);
|
||||||
return -1;
|
return -2;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -535,3 +545,8 @@ struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path
|
|||||||
return instance_asyn;
|
return instance_asyn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void cache_evbase_global_init(void)
|
||||||
|
{
|
||||||
|
tango_cache_global_init();
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
2
cache/include/cache_evbase_client.h
vendored
2
cache/include/cache_evbase_client.h
vendored
@@ -26,6 +26,8 @@ enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *c
|
|||||||
enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance);
|
enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance);
|
||||||
void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out);
|
void cache_evbase_get_statistics(const struct cache_evbase_instance *instance, struct cache_statistics *out);
|
||||||
|
|
||||||
|
void cache_evbase_global_init(void);
|
||||||
|
|
||||||
/*<2A><><EFBFBD><EFBFBD>ʵ<EFBFBD><CAB5><EFBFBD><EFBFBD>ÿ<EFBFBD>߳<EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʹ<EFBFBD><CAB9>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD>*/
|
/*<2A><><EFBFBD><EFBFBD>ʵ<EFBFBD><CAB5><EFBFBD><EFBFBD>ÿ<EFBFBD>߳<EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʹ<EFBFBD><CAB9>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD>*/
|
||||||
struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog);
|
struct cache_evbase_instance *cache_evbase_instance_new(const char* profile_path, const char* section, void *runtimelog);
|
||||||
|
|
||||||
|
|||||||
5
cache/include/tango_cache_client.h
vendored
5
cache/include/tango_cache_client.h
vendored
@@ -42,7 +42,7 @@ struct cache_statistics
|
|||||||
long long del_recv_num; //<2F><><EFBFBD><EFBFBD>DELETE<54>Ĵ<EFBFBD><C4B4><EFBFBD>
|
long long del_recv_num; //<2F><><EFBFBD><EFBFBD>DELETE<54>Ĵ<EFBFBD><C4B4><EFBFBD>
|
||||||
long long del_succ_num; //DELETE<54>ɹ<EFBFBD><C9B9>Ĵ<EFBFBD><C4B4><EFBFBD>
|
long long del_succ_num; //DELETE<54>ɹ<EFBFBD><C9B9>Ĵ<EFBFBD><C4B4><EFBFBD>
|
||||||
long long del_error_num;//DELETE<54>ɹ<EFBFBD><C9B9>Ĵ<EFBFBD><C4B4><EFBFBD>
|
long long del_error_num;//DELETE<54>ɹ<EFBFBD><C9B9>Ĵ<EFBFBD><C4B4><EFBFBD>
|
||||||
long long totaldrop_num;//<2F>ڴ<EFBFBD><DAB4><EFBFBD>DROP<4F>Ĵ<EFBFBD><C4B4><EFBFBD>
|
long long totaldrop_num;//<2F>ڴ<EFBFBD><DAB4><EFBFBD><EFBFBD>Լ<EFBFBD>WiredLB<EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱDROP<EFBFBD>Ĵ<EFBFBD><EFBFBD><EFBFBD>
|
||||||
long long memory_used; //<2F><>ǰUPLOAD BODY<44><59>ռ<EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD>С
|
long long memory_used; //<2F><>ǰUPLOAD BODY<44><59>ռ<EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD>С
|
||||||
long long session_num; //<2F><>ǰ<EFBFBD><C7B0><EFBFBD>ڽ<EFBFBD><DABD><EFBFBD>GET/PUT<55><54>HTTP<54>Ự<EFBFBD><E1BBB0>
|
long long session_num; //<2F><>ǰ<EFBFBD><C7B0><EFBFBD>ڽ<EFBFBD><DABD><EFBFBD>GET/PUT<55><54>HTTP<54>Ự<EFBFBD><E1BBB0>
|
||||||
};
|
};
|
||||||
@@ -91,6 +91,9 @@ enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx
|
|||||||
enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance);
|
enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance);
|
||||||
void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out);
|
void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out);
|
||||||
|
|
||||||
|
/*ÿ<><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ִ<EFBFBD><D6B4>һ<EFBFBD>γ<EFBFBD>ʼ<EFBFBD><CABC>*/
|
||||||
|
void tango_cache_global_init(void);
|
||||||
|
|
||||||
/*<2A><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>API<50>̲߳<DFB3><CCB2><EFBFBD>ȫ*/
|
/*<2A><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>API<50>̲߳<DFB3><CCB2><EFBFBD>ȫ*/
|
||||||
//ÿ<><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̴߳<DFB3><CCB4><EFBFBD>һ<EFBFBD><D2BB>instance
|
//ÿ<><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̴߳<DFB3><CCB4><EFBFBD>һ<EFBFBD><D2BB>instance
|
||||||
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog);
|
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog);
|
||||||
|
|||||||
88
cache/tango_cache_client.cpp
vendored
88
cache/tango_cache_client.cpp
vendored
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
int TANGO_CACHE_VERSION_20180925=0;
|
int TANGO_CACHE_VERSION_20180925=0;
|
||||||
|
|
||||||
void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
|
static void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
|
||||||
{
|
{
|
||||||
SHA256_CTX c;
|
SHA256_CTX c;
|
||||||
unsigned char sha256[128];
|
unsigned char sha256[128];
|
||||||
@@ -33,7 +33,7 @@ void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize)
|
static int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize)
|
||||||
{
|
{
|
||||||
struct WLB_consumer_t chosen;
|
struct WLB_consumer_t chosen;
|
||||||
|
|
||||||
@@ -81,7 +81,7 @@ void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path,
|
|||||||
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
|
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic)
|
static void update_statistics(enum CACHE_REQUEST_METHOD method, bool fail_state, enum CACHE_ERR_CODE error_code, struct cache_statistics *statistic)
|
||||||
{
|
{
|
||||||
switch(method)
|
switch(method)
|
||||||
{
|
{
|
||||||
@@ -166,7 +166,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
|
|||||||
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
|
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
|
||||||
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
|
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
|
||||||
if(ctx->headers != NULL) curl_slist_free_all(ctx->headers);
|
if(ctx->headers != NULL) curl_slist_free_all(ctx->headers);
|
||||||
if(ctx->put.evbuf!=NULL) evbuffer_free(ctx->put.evbuf);
|
if(ctx->put.evbuf!=NULL)
|
||||||
|
{
|
||||||
|
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
|
||||||
|
evbuffer_free(ctx->put.evbuf);
|
||||||
|
}
|
||||||
TAILQ_FOREACH(etag, &ctx->put.etag_head, node)
|
TAILQ_FOREACH(etag, &ctx->put.etag_head, node)
|
||||||
{
|
{
|
||||||
TAILQ_REMOVE(&ctx->put.etag_head, etag, node);
|
TAILQ_REMOVE(&ctx->put.etag_head, etag, node);
|
||||||
@@ -256,6 +260,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
|||||||
if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size)
|
if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size)
|
||||||
{
|
{
|
||||||
instance->error_code = CACHE_OUTOF_MEMORY;
|
instance->error_code = CACHE_OUTOF_MEMORY;
|
||||||
|
instance->statistic.totaldrop_num += 1;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,6 +281,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
|||||||
if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48))
|
if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48))
|
||||||
{
|
{
|
||||||
instance->error_code = CACHE_ERR_WIREDLB;
|
instance->error_code = CACHE_ERR_WIREDLB;
|
||||||
|
instance->statistic.totaldrop_num += 1;
|
||||||
free(ctx);
|
free(ctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -307,6 +313,7 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *
|
|||||||
{
|
{
|
||||||
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
|
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
|
||||||
}
|
}
|
||||||
|
ctx->headers = curl_slist_append(ctx->headers, "Expect:");
|
||||||
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD>GETʱ<54><CAB1>ԭ<EFBFBD><D4AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD>GETʱ<54><CAB1>ԭ<EFBFBD><D4AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||||||
if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
|
if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
|
||||||
{
|
{
|
||||||
@@ -397,6 +404,7 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
|
|||||||
if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48))
|
if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48))
|
||||||
{
|
{
|
||||||
instance->error_code = CACHE_ERR_WIREDLB;
|
instance->error_code = CACHE_ERR_WIREDLB;
|
||||||
|
instance->statistic.totaldrop_num += 1;
|
||||||
free(ctx);
|
free(ctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -405,7 +413,14 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i
|
|||||||
|
|
||||||
int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta)
|
int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta)
|
||||||
{
|
{
|
||||||
return tango_cache_fetch_start(tango_cache_fetch_prepare(instance, future, meta));
|
struct tango_cache_ctx *ctx;
|
||||||
|
|
||||||
|
ctx = tango_cache_fetch_prepare(instance, future, meta);
|
||||||
|
if(ctx == NULL)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return tango_cache_fetch_start(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey)
|
struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* future, const char *objkey)
|
||||||
@@ -430,6 +445,7 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
|
|||||||
if(wired_load_balancer_lookup(instance->wiredlb, objkey, strlen(objkey), ctx->hostaddr, 48))
|
if(wired_load_balancer_lookup(instance->wiredlb, objkey, strlen(objkey), ctx->hostaddr, 48))
|
||||||
{
|
{
|
||||||
instance->error_code = CACHE_ERR_WIREDLB;
|
instance->error_code = CACHE_ERR_WIREDLB;
|
||||||
|
instance->statistic.totaldrop_num += 1;
|
||||||
free(ctx);
|
free(ctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -438,7 +454,14 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *
|
|||||||
|
|
||||||
int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey)
|
int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* future, const char *objkey)
|
||||||
{
|
{
|
||||||
return (cache_delete_minio_object(tango_cache_delete_prepare(instance, future, objkey))==1)?0:-1;
|
struct tango_cache_ctx *ctx;
|
||||||
|
|
||||||
|
ctx = tango_cache_delete_prepare(instance, future, objkey);
|
||||||
|
if(ctx == NULL)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return (cache_delete_minio_object(ctx)==1)?0:-1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void check_multi_info(CURLM *multi)
|
static void check_multi_info(CURLM *multi)
|
||||||
@@ -579,6 +602,29 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
|
|||||||
return 0; //0-success; -1-error
|
return 0; //0-success; -1-error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int wired_load_balancer_init(struct tango_cache_instance *instance)
|
||||||
|
{
|
||||||
|
instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER);
|
||||||
|
if(instance->wiredlb == NULL)
|
||||||
|
{
|
||||||
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override));
|
||||||
|
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1);
|
||||||
|
if(instance->wiredlb_override)
|
||||||
|
{
|
||||||
|
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1);
|
||||||
|
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port));
|
||||||
|
}
|
||||||
|
if(wiredLB_init(instance->wiredlb) < 0)
|
||||||
|
{
|
||||||
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section)
|
static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section)
|
||||||
{
|
{
|
||||||
u_int32_t intval;
|
u_int32_t intval;
|
||||||
@@ -619,29 +665,6 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int wired_load_balancer_init(struct tango_cache_instance *instance)
|
|
||||||
{
|
|
||||||
instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER);
|
|
||||||
if(instance->wiredlb == NULL)
|
|
||||||
{
|
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override));
|
|
||||||
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1);
|
|
||||||
if(instance->wiredlb_override)
|
|
||||||
{
|
|
||||||
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1);
|
|
||||||
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port));
|
|
||||||
}
|
|
||||||
if(wiredLB_init(instance->wiredlb) < 0)
|
|
||||||
{
|
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog)
|
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog)
|
||||||
{
|
{
|
||||||
struct tango_cache_instance *instance;
|
struct tango_cache_instance *instance;
|
||||||
@@ -664,7 +687,7 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,
|
|||||||
instance->multi_hd = curl_multi_init();
|
instance->multi_hd = curl_multi_init();
|
||||||
instance->runtime_log = runtimelog;
|
instance->runtime_log = runtimelog;
|
||||||
|
|
||||||
curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1);
|
curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
|
||||||
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host);
|
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host);
|
||||||
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
|
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
|
||||||
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
|
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
|
||||||
@@ -676,3 +699,8 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,
|
|||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tango_cache_global_init(void)
|
||||||
|
{
|
||||||
|
curl_global_init(CURL_GLOBAL_NOTHING);
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
11
cache/test_demo/Makefile
vendored
11
cache/test_demo/Makefile
vendored
@@ -3,17 +3,19 @@ CCC=g++
|
|||||||
INC_PATH=-I../src/include -I../src/include/libevent2
|
INC_PATH=-I../src/include -I../src/include/libevent2
|
||||||
CFLAGS=-Wall -g $(INC_PATH)
|
CFLAGS=-Wall -g $(INC_PATH)
|
||||||
|
|
||||||
LIBS = -lMESA_handle_logger -lMESA_htable -lMESA_prof_load -lwiredcfg
|
LIBS = -lMESA_handle_logger -lMESA_htable -lMESA_prof_load -lWiredLB
|
||||||
LIBS += -lssl -lcrypto -lcurl -levent -lxml2
|
LIBS += -lssl -lcrypto -lcurl -levent -lxml2
|
||||||
LIBS += ../src/pangu_tango_cache.a
|
LIBS += ../src/pangu_tango_cache.a
|
||||||
|
|
||||||
OBJS = tango_cache_test.o
|
OBJS = tango_cache_test.o
|
||||||
OBJS_EVBASE=cache_evbase_test.o
|
OBJS_EVBASE=cache_evbase_test.o
|
||||||
|
OBJS_EVBASE_THREADS=cache_evbase_test_threads.o
|
||||||
|
|
||||||
TARGET_EXE=tango_cache_test
|
TARGET_EXE=tango_cache_test
|
||||||
TARGET_EXE_EVBASE=cache_evbase_test
|
TARGET_EXE_EVBASE=cache_evbase_test
|
||||||
|
TARGET_EXE_EVBASE_THREAD=cache_evbase_test_threads
|
||||||
|
|
||||||
ALL:$(TARGET_EXE) $(TARGET_EXE_EVBASE)
|
ALL:$(TARGET_EXE) $(TARGET_EXE_EVBASE) $(TARGET_EXE_EVBASE_THREAD)
|
||||||
|
|
||||||
$(TARGET_EXE):$(OBJS)
|
$(TARGET_EXE):$(OBJS)
|
||||||
$(CCC) $(LDFLAGS) $^ -o $@ $(LIBS)
|
$(CCC) $(LDFLAGS) $^ -o $@ $(LIBS)
|
||||||
@@ -21,6 +23,9 @@ $(TARGET_EXE):$(OBJS)
|
|||||||
$(TARGET_EXE_EVBASE):$(OBJS_EVBASE)
|
$(TARGET_EXE_EVBASE):$(OBJS_EVBASE)
|
||||||
$(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread
|
$(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread
|
||||||
|
|
||||||
|
$(TARGET_EXE_EVBASE_THREAD):$(OBJS_EVBASE_THREADS)
|
||||||
|
$(CCC) $(LDFLAGS) $^ -o $@ $(LIBS) -lpthread
|
||||||
|
|
||||||
.c.o:
|
.c.o:
|
||||||
$(CCC) $(CFLAGS) -c $<
|
$(CCC) $(CFLAGS) -c $<
|
||||||
.cpp.o:
|
.cpp.o:
|
||||||
@@ -29,5 +34,5 @@ $(TARGET_EXE_EVBASE):$(OBJS_EVBASE)
|
|||||||
-include $(DEPS)
|
-include $(DEPS)
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -rf $(OBJS) $(TARGET_EXE) $(OBJS_EVBASE) $(TARGET_EXE_EVBASE)
|
rm -rf $(OBJS) $(TARGET_EXE) $(OBJS_EVBASE) $(TARGET_EXE_EVBASE) $(OBJS_EVBASE_THREADS) $(TARGET_EXE_EVBASE_THREAD)
|
||||||
|
|
||||||
|
|||||||
1
cache/test_demo/cache_evbase_test.cpp
vendored
1
cache/test_demo/cache_evbase_test.cpp
vendored
@@ -171,6 +171,7 @@ int main(int argc, char **argv)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cache_evbase_global_init();
|
||||||
instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
|
instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
|
||||||
assert(instance_asyn!=NULL);
|
assert(instance_asyn!=NULL);
|
||||||
|
|
||||||
|
|||||||
322
cache/test_demo/cache_evbase_test_threads.cpp
vendored
Normal file
322
cache/test_demo/cache_evbase_test_threads.cpp
vendored
Normal file
@@ -0,0 +1,322 @@
|
|||||||
|
#include <sys/ioctl.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <net/if.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <sys/prctl.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sys/poll.h>
|
||||||
|
#include <curl/curl.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <sys/cdefs.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <MESA/MESA_handle_logger.h>
|
||||||
|
|
||||||
|
#include "cache_evbase_client.h"
|
||||||
|
|
||||||
|
struct cache_evbase_instance *instance_asyn;
|
||||||
|
int runing_over=0;
|
||||||
|
|
||||||
|
struct future_pdata
|
||||||
|
{
|
||||||
|
struct future * future;
|
||||||
|
FILE *fp;
|
||||||
|
char filename[256];
|
||||||
|
};
|
||||||
|
|
||||||
|
void get_future_success(future_result_t* result, void * user)
|
||||||
|
{
|
||||||
|
struct tango_cache_result *res = cache_evbase_read_result(result);
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
char buffer[1024];
|
||||||
|
|
||||||
|
if(res != NULL)
|
||||||
|
{
|
||||||
|
switch(res->type)
|
||||||
|
{
|
||||||
|
case RESULT_TYPE_BODY:
|
||||||
|
fwrite(res->data_frag, res->size, 1, pdata->fp);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case RESULT_TYPE_USERTAG:
|
||||||
|
case RESULT_TYPE_HEADER:
|
||||||
|
memcpy(buffer, res->data_frag, res->size>=1024?1023:res->size);
|
||||||
|
buffer[res->size] = '\0';
|
||||||
|
printf("%s", buffer);
|
||||||
|
break;
|
||||||
|
default:break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else //<2F><><EFBFBD><EFBFBD>
|
||||||
|
{
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
fclose(pdata->fp);
|
||||||
|
free(pdata);
|
||||||
|
runing_over = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void get_future_failed(enum e_future_error err, const char * what, void * user)
|
||||||
|
{
|
||||||
|
printf("GET fail: %s\n", what);
|
||||||
|
runing_over = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
void put_future_success(future_result_t* result, void * user)
|
||||||
|
{
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
|
||||||
|
printf("PUT %s succ\n", pdata->filename);
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
runing_over = 1;
|
||||||
|
}
|
||||||
|
void put_future_failed(enum e_future_error err, const char * what, void * user)
|
||||||
|
{
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
|
||||||
|
printf("PUT %s fail: %s\n", what, pdata->filename);
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
runing_over = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void del_future_success(future_result_t* result, void * user)
|
||||||
|
{
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
|
||||||
|
printf("DEL %s succ\n", pdata->filename);
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
runing_over = 1;
|
||||||
|
}
|
||||||
|
void del_future_failed(enum e_future_error err, const char * what, void * user)
|
||||||
|
{
|
||||||
|
struct future_pdata *pdata = (struct future_pdata *)user;
|
||||||
|
|
||||||
|
printf("DEL %s fail: %s\n", pdata->filename, what);
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
runing_over = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char * get_file_content(const char *filename, size_t *filelen_out)
|
||||||
|
{
|
||||||
|
char *buffer;
|
||||||
|
FILE *fp;
|
||||||
|
size_t filelen = 0;
|
||||||
|
struct stat filestat;
|
||||||
|
int readlen;
|
||||||
|
|
||||||
|
fp = fopen(filename, "rb");
|
||||||
|
if(fp == NULL)
|
||||||
|
{
|
||||||
|
printf("fopen file %s failed.\n", filename);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if(fstat(fileno(fp), &filestat))
|
||||||
|
{
|
||||||
|
printf("fstat %s failed.\n", filename);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = (char *)malloc(filestat.st_size);
|
||||||
|
|
||||||
|
while(filelen < (size_t)filestat.st_size)
|
||||||
|
{
|
||||||
|
readlen = fread(buffer + filelen, 1, filestat.st_size - filelen, fp);
|
||||||
|
if(readlen < 0)
|
||||||
|
{
|
||||||
|
printf("read error: %s\n", strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
filelen += readlen;
|
||||||
|
}
|
||||||
|
fclose(fp);
|
||||||
|
*filelen_out = filestat.st_size;
|
||||||
|
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct pthread_data
|
||||||
|
{
|
||||||
|
char *argv;
|
||||||
|
int upload_times;
|
||||||
|
int thread_id;
|
||||||
|
};
|
||||||
|
|
||||||
|
void* thread_upload_download(void *arg)
|
||||||
|
{
|
||||||
|
int n;
|
||||||
|
char method[16], filename_in[256], filename_out[256], *p;
|
||||||
|
struct tango_cache_meta meta;
|
||||||
|
struct future_pdata *pdata;
|
||||||
|
struct cache_evbase_ctx *ctx;
|
||||||
|
struct pthread_data *thread_data = (struct pthread_data *)arg;
|
||||||
|
|
||||||
|
if(sscanf(thread_data->argv, "%[^:]:%1023s%n", method, filename_in, &n) != 2)
|
||||||
|
{
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
if(strlen(filename_in) <= 0)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&meta, 0, sizeof(struct tango_cache_meta));
|
||||||
|
meta.url = filename_in;
|
||||||
|
meta.std_hdr[HDR_CONTENT_TYPE] = "Content-Type: maintype/subtype";
|
||||||
|
meta.std_hdr[HDR_CONTENT_ENCODING] = "Content-Encoding: gzip";
|
||||||
|
meta.usertag = "Etag: hgdkqkwdwqekdfjwjfjwelkjfkwfejwhf\r\n";
|
||||||
|
meta.usertag_len = strlen(meta.usertag);
|
||||||
|
|
||||||
|
p = method;
|
||||||
|
while(*p=='\r'||*p=='\n') p++;
|
||||||
|
assert(*p!='\0');
|
||||||
|
|
||||||
|
for(int i=0; i<thread_data->upload_times; i++)
|
||||||
|
{
|
||||||
|
pdata = (struct future_pdata *)malloc(sizeof(struct future_pdata));
|
||||||
|
|
||||||
|
if(!strcasecmp(p, "GET"))
|
||||||
|
{
|
||||||
|
sprintf(filename_out, "file_index_%d_%d.bin", thread_data->thread_id, i);
|
||||||
|
pdata->future = future_create(get_future_success, get_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
pdata->fp = fopen(filename_out, "w");
|
||||||
|
|
||||||
|
cache_evbase_fetch_object(instance_asyn, pdata->future, &meta);
|
||||||
|
}
|
||||||
|
else if(!strcasecmp(p, "DEL"))
|
||||||
|
{
|
||||||
|
pdata->future = future_create(del_future_success, del_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
sprintf(pdata->filename, "%s", filename_in);
|
||||||
|
cache_evbase_delete_object(instance_asyn, pdata->future, filename_in);
|
||||||
|
}
|
||||||
|
else if(!strcasecmp(p, "PUTONCE"))
|
||||||
|
{
|
||||||
|
size_t filelen;
|
||||||
|
p = get_file_content(filename_in, &filelen);
|
||||||
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
|
||||||
|
if(cache_evbase_upload_once_data(instance_asyn, pdata->future, PUT_MEM_FREE, p, filelen, &meta, pdata->filename, 256))
|
||||||
|
{
|
||||||
|
printf("cache_evbase_upload_once_data fail: %d\n", cache_evbase_ctx_error(instance_asyn));
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if(!strcasecmp(p, "PUTONCEEV"))
|
||||||
|
{
|
||||||
|
size_t readlen;
|
||||||
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
struct evbuffer *evbuf = evbuffer_new();
|
||||||
|
char buffer[1024];
|
||||||
|
|
||||||
|
FILE *fp = fopen(filename_in, "rb");
|
||||||
|
while(!feof(fp))
|
||||||
|
{
|
||||||
|
readlen = fread(buffer, 1, 1024, fp);
|
||||||
|
if(readlen < 0)
|
||||||
|
{
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
evbuffer_add(evbuf, buffer, readlen);
|
||||||
|
}
|
||||||
|
fclose(fp);
|
||||||
|
if(cache_evbase_upload_once_evbuf(instance_asyn, pdata->future, evbuf, &meta, pdata->filename, 256))
|
||||||
|
{
|
||||||
|
printf("cache_evbase_upload_once_evbuf fail: %d\n", cache_evbase_ctx_error(instance_asyn));
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
}
|
||||||
|
evbuffer_free(evbuf);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
pdata->future = future_create(put_future_success, put_future_failed, pdata);
|
||||||
|
promise_set_ctx(future_to_promise(pdata->future), NULL, NULL);
|
||||||
|
|
||||||
|
ctx = cache_evbase_update_start(instance_asyn, pdata->future, &meta);
|
||||||
|
if(ctx==NULL)
|
||||||
|
{
|
||||||
|
printf("cache_evbase_update_start fail: %d\n", cache_evbase_ctx_error(instance_asyn));
|
||||||
|
future_destroy(pdata->future);
|
||||||
|
free(pdata);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
cache_evbase_get_object_path(ctx, pdata->filename, 256);
|
||||||
|
|
||||||
|
char buffer[1024];
|
||||||
|
FILE *fp = fopen(filename_in, "r");
|
||||||
|
while(!feof(fp))
|
||||||
|
{
|
||||||
|
n = fread(buffer, 1, 1024, fp);
|
||||||
|
assert(n>=0);
|
||||||
|
cache_evbase_update_frag_data(ctx, PUT_MEM_COPY, buffer, n);
|
||||||
|
}
|
||||||
|
|
||||||
|
cache_evbase_update_end(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("transfer over\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv)
|
||||||
|
{
|
||||||
|
struct cache_statistics out;
|
||||||
|
void *runtime_log;
|
||||||
|
pthread_t thread_tid;
|
||||||
|
pthread_attr_t attr;
|
||||||
|
struct pthread_data pdata[20];
|
||||||
|
|
||||||
|
if(argc!=3)
|
||||||
|
{
|
||||||
|
printf("USGAE: %s <PUT/PUTONCE/PUTONCEEV/GET/DEL:filename> <how many times>\n", argv[0]);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
runtime_log = MESA_create_runtime_log_handle("./runtime.log", 10);
|
||||||
|
if(NULL==runtime_log)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
cache_evbase_global_init();
|
||||||
|
instance_asyn = cache_evbase_instance_new("./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
|
||||||
|
assert(instance_asyn!=NULL);
|
||||||
|
|
||||||
|
pthread_attr_init(&attr);
|
||||||
|
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
||||||
|
for(int i=0; i<20; i++)
|
||||||
|
{
|
||||||
|
pdata[i].argv = argv[1];
|
||||||
|
pdata[i].thread_id = i;
|
||||||
|
pdata[i].upload_times = atoi(argv[2]);
|
||||||
|
pthread_create(&thread_tid, &attr, thread_upload_download, &pdata[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
while(1)
|
||||||
|
{
|
||||||
|
sleep(30);
|
||||||
|
cache_evbase_get_statistics(instance_asyn, &out);
|
||||||
|
printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n",
|
||||||
|
out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
1
cache/test_demo/tango_cache_test.c
vendored
1
cache/test_demo/tango_cache_test.c
vendored
@@ -310,6 +310,7 @@ int main(int crgc, char **arg)
|
|||||||
}
|
}
|
||||||
init_fifo();
|
init_fifo();
|
||||||
|
|
||||||
|
tango_cache_global_init();
|
||||||
tango_instance = tango_cache_instance_new(ev_base, "./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
|
tango_instance = tango_cache_instance_new(ev_base, "./pangu_tg_cahce.conf", "TANGO_CACHE", runtime_log);
|
||||||
|
|
||||||
tv.tv_sec = 10;
|
tv.tv_sec = 10;
|
||||||
|
|||||||
Reference in New Issue
Block a user