diff --git a/cache/cache_evbase_client.cpp b/cache/cache_evbase_client.cpp index b7b4077..1020a8f 100644 --- a/cache/cache_evbase_client.cpp +++ b/cache/cache_evbase_client.cpp @@ -485,6 +485,7 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu buffer->ctx_asyn = ctx_asyn; buffer->cmd_type = CACHE_ASYN_DELETE; + //参考Unix高级编程432页关于多线程写的安全性描述 if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 0) != sizeof(void *)) { ctx_asyn->ctx->fail_state = true; diff --git a/cache/include/cache_evbase_client.h b/cache/include/cache_evbase_client.h index 01bbca4..3585a71 100644 --- a/cache/include/cache_evbase_client.h +++ b/cache/include/cache_evbase_client.h @@ -20,7 +20,7 @@ struct cache_evbase_ctx struct cache_evbase_instance *instance_asyn; }; -/*所有API线程不安全,API的使用说明参考tango_cache_client.h*/ +/*所有API线程安全,API的使用说明参考tango_cache_client.h*/ enum CACHE_ERR_CODE cache_evbase_get_last_error(const struct cache_evbase_ctx *ctx_asyn); enum CACHE_ERR_CODE cache_evbase_ctx_error(const struct cache_evbase_instance *instance); @@ -41,11 +41,11 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, - char *path, size_t pathsize); + char *path/*OUT*/, size_t pathsize); int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struct future* future, struct evbuffer *evbuf, struct tango_cache_meta *meta, - char *path, size_t pathsize); + char *path/*OUT*/, size_t pathsize); //流式上传接口 struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance *instance, struct future* future, struct tango_cache_meta *meta); @@ -53,7 +53,7 @@ int cache_evbase_update_frag_data(struct cache_evbase_ctx *ctx_asyn, enum PUT_ME int cache_evbase_update_frag_evbuf(struct cache_evbase_ctx *ctx_asyn, struct evbuffer *evbuf); void cache_evbase_update_end(struct cache_evbase_ctx *ctx_asyn); -void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx, char *path, size_t pathsize); +void cache_evbase_get_object_path(const struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize); #endif diff --git a/cache/include/tango_cache_client.h b/cache/include/tango_cache_client.h index fc03964..2019dc5 100644 --- a/cache/include/tango_cache_client.h +++ b/cache/include/tango_cache_client.h @@ -16,7 +16,7 @@ enum CACHE_ERR_CODE CACHE_TIMEOUT, //缓存超时 CACHE_OUTOF_MEMORY,//当前内存占用超过限制,查看MAX_USED_MEMORY_SIZE_MB是否过小或者当前上传速率跟不上调用者的速率 CACHE_ERR_CURL, - CACHE_ERR_UNKNOWN + CACHE_ERR_WIREDLB, }; enum PUT_MEMORY_COPY_WAY @@ -119,11 +119,11 @@ int tango_cache_delete_object(struct tango_cache_instance *instance, struct futu int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* future, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta *meta, - char *path, size_t pathsize); + char *path/*OUT*/, size_t pathsize); int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* future, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta *meta, - char *path, size_t pathsize); + char *path/*OUT*/, size_t pathsize); /*流式上传API*/ //返回值: 若为NULL则表示创建失败,调用tango_cache_ctx_error查看错误码是否是CACHE_OUTOF_MEMORY(正常情况下是); struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* future, struct tango_cache_meta *meta); @@ -133,7 +133,7 @@ int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COP void tango_cache_update_end(struct tango_cache_ctx *ctx); //获取对象key值;当CACHE_OBJECT_KEY_HASH_SWITCH=1开启对URL/文件名哈希时有用 -void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize); +void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path/*OUT*/, size_t pathsize); #endif diff --git a/cache/pangu_tango_cache.a b/cache/pangu_tango_cache.a index 5db5f70..0094000 100644 Binary files a/cache/pangu_tango_cache.a and b/cache/pangu_tango_cache.a differ diff --git a/cache/support/wired_lb-master.zip b/cache/support/wired_lb-master.zip new file mode 100644 index 0000000..3242d95 Binary files /dev/null and b/cache/support/wired_lb-master.zip differ diff --git a/cache/tango_cache_client.cpp b/cache/tango_cache_client.cpp index 9736966..2f55ef6 100644 --- a/cache/tango_cache_client.cpp +++ b/cache/tango_cache_client.cpp @@ -33,6 +33,18 @@ void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_ } } +int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize) +{ + struct WLB_consumer_t chosen; + + if(wiredLB_lookup(wiredlb, key, keylen, &chosen)) + { + return -1; + } + snprintf(host, hostsize, "%s:%u", chosen.ip_addr, chosen.data_port); + return 0; +} + enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx) { return ctx->error_code; @@ -261,7 +273,12 @@ struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance * { snprintf(ctx->object_key, 256, "%s", meta->url); } - sprintf(ctx->hostaddr, "%s", instance->minio_hostlist); + if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48)) + { + instance->error_code = CACHE_ERR_WIREDLB; + free(ctx); + return NULL; + } //Expires字段,用于缓存内部判定缓存是否超时 now = time(NULL); @@ -377,7 +394,12 @@ struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *i { snprintf(ctx->object_key, 256, "%s", meta->url); } - sprintf(ctx->hostaddr, "%s", instance->minio_hostlist); + if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48)) + { + instance->error_code = CACHE_ERR_WIREDLB; + free(ctx); + return NULL; + } return ctx; } @@ -405,7 +427,12 @@ struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance * { snprintf(ctx->object_key, 256, "%s", objkey); } - sprintf(ctx->hostaddr, "%s", instance->minio_hostlist); + if(wired_load_balancer_lookup(instance->wiredlb, objkey, strlen(objkey), ctx->hostaddr, 48)) + { + instance->error_code = CACHE_ERR_WIREDLB; + free(ctx); + return NULL; + } return ctx; } @@ -564,7 +591,8 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha instance->cache_limit_size = longval * 1024 * 1024; MESA_load_profile_string_def(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256, "openbucket"); MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &instance->hash_object_key, 1); - if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_BROKERS_LIST", instance->minio_hostlist, 64) < 0) + MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &instance->minio_port, 9000); + if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", instance->minio_iplist, 4096) < 0) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section); return -1; @@ -583,6 +611,34 @@ static int load_local_configure(struct tango_cache_instance *instance, const cha } instance->relative_ttl = intval; + //Wired_LB参数 + MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", instance->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); + MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN"); + MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA"); + MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1); + return 0; +} + +int wired_load_balancer_init(struct tango_cache_instance *instance) +{ + instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER); + if(instance->wiredlb == NULL) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n"); + return -1; + } + wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override)); + wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1); + if(instance->wiredlb_override) + { + wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1); + wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port)); + } + if(wiredLB_init(instance->wiredlb) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n"); + return -1; + } return 0; } @@ -598,6 +654,11 @@ struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase, free(instance); return NULL; } + if(wired_load_balancer_init(instance)) + { + free(instance); + return NULL; + } instance->evbase = evbase; instance->multi_hd = curl_multi_init(); diff --git a/cache/tango_cache_client_in.h b/cache/tango_cache_client_in.h index bf32947..2c6295d 100644 --- a/cache/tango_cache_client_in.h +++ b/cache/tango_cache_client_in.h @@ -7,6 +7,7 @@ #include #include +#include #include "tango_cache_client.h" #define RESPONSE_HDR_EXPIRES 1 @@ -45,13 +46,19 @@ struct easy_string struct tango_cache_instance { - char minio_hostlist[4096]; + char minio_iplist[4096]; char bucketname[256]; + char wiredlb_topic[64]; + char wiredlb_group[64]; + char wiredlb_datacenter[64]; + u_int32_t minio_port; + u_int32_t wiredlb_override; struct event_base* evbase; struct event timer_event; struct cache_statistics statistic; CURLM *multi_hd; void *runtime_log; + WLB_handle_t wiredlb; time_t relative_ttl; //缓存的相对有效期 u_int64_t cache_limit_size; long max_cnn_host; @@ -98,7 +105,7 @@ struct tango_cache_ctx struct future* future; char error[CURL_ERROR_SIZE]; char object_key[256]; - char hostaddr[24]; + char hostaddr[48]; enum CACHE_REQUEST_METHOD method; enum CACHE_ERR_CODE error_code; diff --git a/cache/test_demo/cache_evbase_test.cpp b/cache/test_demo/cache_evbase_test.cpp index 025a5a4..295289b 100644 --- a/cache/test_demo/cache_evbase_test.cpp +++ b/cache/test_demo/cache_evbase_test.cpp @@ -277,8 +277,8 @@ int main(int argc, char **argv) struct cache_statistics out; cache_evbase_get_statistics(instance_asyn, &out); - printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, session: %llu, memory: %llu\n", - out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.session_num, out.memory_used); + printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); return 0; } diff --git a/cache/test_demo/pangu_tg_cahce.conf b/cache/test_demo/pangu_tg_cahce.conf index f6aeddf..91579ad 100644 --- a/cache/test_demo/pangu_tg_cahce.conf +++ b/cache/test_demo/pangu_tg_cahce.conf @@ -1,6 +1,7 @@ [TANGO_CACHE] #MINIO IP鍦板潃锛岀洰鍓嶅彧鏀寔涓涓 -MINIO_BROKERS_LIST=192.168.10.64:9000 +MINIO_IP_LIST=192.168.10.61-64; +MINIO_LISTEN_PORT=9000 #姣忎釜鍩熷悕鏈澶氬紑鍚殑閾炬帴鏁 MAX_CONNECTION_PER_HOST=10 @@ -15,5 +16,11 @@ MAX_USED_MEMORY_SIZE_MB=5120 CACHE_DEFAULT_TTL_SECOND=3600 #鏄惁瀵瑰璞$殑鍚嶇О杩涜鍝堝笇锛屽紑鍚搱甯屾湁鍔╀簬鎻愰珮涓婁紶涓嬭浇鐨勯熺巼 -CACHE_OBJECT_KEY_HASH_SWITCH=1 +CACHE_OBJECT_KEY_HASH_SWITCH=0 + +#WIRED LOAD BALANCER閰嶇疆 +#WIREDLB_OVERRIDE=1 +#WIREDLB_TOPIC= +#WIREDLB_GROUP= +#WIREDLB_DATACENTER= diff --git a/cache/test_demo/tango_cache_test.c b/cache/test_demo/tango_cache_test.c index 679f4ec..7d3555d 100644 --- a/cache/test_demo/tango_cache_test.c +++ b/cache/test_demo/tango_cache_test.c @@ -284,8 +284,8 @@ void timer_cb(evutil_socket_t fd, short what, void *arg) }*/ tango_cache_get_statistics(tango_instance, &out); - printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, session: %llu, memory: %llu\n", - out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.session_num, out.memory_used); + printf("get_recv: %llu, get_succ: %llu, get_miss: %llu, get_fail: %llu, put_recv: %llu, put_succ: %llu, put_fail: %llu, del_recv: %llu, del_succ: %llu, del_fail: %llu, drop_num: %llu, session: %llu, memory: %llu\n", + out.get_recv_num, out.get_succ_num, out.get_miss_num, out.get_error_num, out.put_recv_num, out.put_succ_num, out.put_error_num, out.del_recv_num, out.del_succ_num, out.del_error_num, out.totaldrop_num, out.session_num, out.memory_used); event_add((struct event *)arg, &tv); }