This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/cache/src/tango_cache_client.cpp

853 lines
25 KiB
C++
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <openssl/sha.h>
#include <openssl/md5.h>
#include <MESA/MESA_prof_load.h>
#include "tango_cache_client_in.h"
#include "tango_cache_transfer.h"
#include "tango_cache_tools.h"
#include "tango_cache_xml.h"
#ifdef HEAD_OBJECT_FROM_REDIS
#include "tango_cache_redis.h"
#endif
int TANGO_CACHE_VERSION_20181009=0;
static int caculate_base64_md5(const char *data, unsigned long len, unsigned char *result, unsigned int size)
{
MD5_CTX c;
unsigned char md5[17]={0};
if(size < 33)
return -1;
MD5_Init(&c);
MD5_Update(&c, data, len);
MD5_Final(md5, &c);
Base64_EncodeBlock(md5, 16, result);
return 0;
}
void caculate_sha256(const char *data, unsigned long len, char *result, u_int32_t size)
{
SHA256_CTX c;
unsigned char sha256[128];
u_int32_t length;
SHA256_Init(&c);
SHA256_Update(&c, data, len);
SHA256_Final(sha256, &c);
length = (size > 64)?32:(size-1)/2; //Ô¤ÁôÒ»¸ö¿Õ¼ä
for(u_int32_t i=0; i<length; i++)
{
sprintf(result + i*2, "%02x", sha256[i]);
}
result[length*2] = '\0';
}
static int wired_load_balancer_lookup(WLB_handle_t wiredlb, const char *key, int keylen, char *host, size_t hostsize)
{
struct WLB_consumer_t chosen;
if(wiredLB_lookup(wiredlb, key, keylen, &chosen))
{
return -1;
}
snprintf(host, hostsize, "%s:%u", chosen.ip_addr, chosen.data_port);
return 0;
}
enum CACHE_ERR_CODE tango_cache_get_last_error(const struct tango_cache_ctx *ctx)
{
return ctx->error_code;
}
enum CACHE_ERR_CODE tango_cache_ctx_error(const struct tango_cache_instance *instance)
{
return instance->error_code;
}
void tango_cache_set_fail_state(struct tango_cache_ctx *ctx, enum CACHE_ERR_CODE error_code)
{
ctx->fail_state = true;
ctx->error_code = error_code;
}
const char *tango_cache_get_errstring(const struct tango_cache_ctx *ctx)
{
switch(ctx->error_code)
{
case CACHE_CACHE_MISS: return "cache not hit";
case CACHE_TIMEOUT: return "cache not fresh";
case CACHE_OUTOF_MEMORY:return "outof memory";
case CACHE_ERR_WIREDLB: return "wiredlb error";
case CACHE_ERR_SOCKPAIR:return "socketpair error";
case CACHE_ERR_INTERNAL:return "internal error";
case CACHE_ERR_REDIS_JSON:return "parse redis json error";
case CACHE_ERR_REDIS_CONNECT:return "redis is not connected";
default: return ctx->error;
}
}
void tango_cache_get_statistics(const struct tango_cache_instance *instance, struct cache_statistics *out)
{
out->get_recv_num = instance->statistic.get_recv_num;
out->get_succ_num = instance->statistic.get_succ_num;
out->get_error_num= instance->statistic.get_error_num;
out->get_miss_num = instance->statistic.get_miss_num;
out->put_recv_num = instance->statistic.put_recv_num;
out->put_succ_num = instance->statistic.put_succ_num;
out->put_error_num= instance->statistic.put_error_num;
out->del_recv_num = instance->statistic.del_recv_num;
out->del_succ_num = instance->statistic.del_succ_num;
out->del_error_num= instance->statistic.del_error_num;
out->totaldrop_num= instance->statistic.totaldrop_num;
out->session_num = instance->statistic.session_num;
out->memory_used = instance->statistic.memory_used;
}
struct tango_cache_result *tango_cache_read_result(future_result_t *promise_result)
{
return (struct tango_cache_result *)promise_result;
}
void tango_cache_get_object_path(const struct tango_cache_ctx *ctx, char *path, size_t pathsize)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, ctx->instance->bucketname, ctx->object_key);
}
static void update_statistics(struct tango_cache_ctx *ctx, struct cache_statistics *statistic)
{
switch(ctx->method)
{
case CACHE_REQUEST_PUT:
if(ctx->fail_state)
{
statistic->put_error_num += 1;
}
else
{
statistic->put_succ_num += 1;
}
break;
case CACHE_REQUEST_GET:
case CACHE_REQUEST_HEAD:
if(ctx->fail_state)
{
if(ctx->error_code == CACHE_CACHE_MISS || ctx->error_code == CACHE_TIMEOUT)
statistic->get_miss_num += 1;
else
statistic->get_error_num += 1;
}
else
{
statistic->get_succ_num += 1;
}
break;
case CACHE_REQUEST_DELETE:
if(ctx->fail_state)
{
statistic->del_error_num += 1;
}
else
{
statistic->del_succ_num += 1;
}
break;
case CACHE_REQUEST_DELETE_MUL:
statistic->del_succ_num += ctx->del.succ_num;
statistic->del_error_num += ctx->del.fail_num;
break;
default:break;
}
}
void easy_string_destroy(struct easy_string *estr)
{
if(estr->buff != NULL)
{
free(estr->buff);
estr->buff = NULL;
estr->len = estr->size = 0;
}
}
void easy_string_savedata(struct easy_string *estr, const char *data, size_t len)
{
if(estr->size-estr->len < len+1)
{
estr->size += len*4+1;
estr->buff = (char*)realloc(estr->buff, estr->size);
}
memcpy(estr->buff+estr->len, data, len);
estr->len += len;
estr->buff[estr->len]='\0';
}
void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx)
{
struct multipart_etag_list *etag;
if(ctx->curl != NULL)
{
curl_multi_remove_handle(ctx->instance->multi_hd, ctx->curl);
curl_easy_cleanup(ctx->curl);
}
easy_string_destroy(&ctx->response);
switch(ctx->method)
{
case CACHE_REQUEST_GET:
case CACHE_REQUEST_HEAD:
easy_string_destroy(&ctx->get.response_tag);
break;
case CACHE_REQUEST_PUT:
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
if(ctx->put.evbuf!=NULL)
{
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
evbuffer_free(ctx->put.evbuf);
}
TAILQ_FOREACH(etag, &ctx->put.etag_head, node)
{
TAILQ_REMOVE(&ctx->put.etag_head, etag, node);
free(etag->etag);
free(etag);
}//no break here
case CACHE_REQUEST_DELETE_MUL:
if(ctx->headers != NULL)
{
curl_slist_free_all(ctx->headers);
}//no break here
case CACHE_REQUEST_DELETE:
if(ctx->future != NULL)
{
if(ctx->fail_state)
{
promise_failed(future_to_promise(ctx->future), FUTURE_ERROR_CANCEL, tango_cache_get_errstring(ctx));
}
else
{
promise_success(future_to_promise(ctx->future), NULL);
}
}
break;
default: break;
}
update_statistics(ctx, &ctx->instance->statistic);
free(ctx);
}
void tango_cache_update_end(struct tango_cache_ctx *ctx)
{
cache_kick_upload_minio_end(ctx);
}
int tango_cache_update_frag_data(struct tango_cache_ctx *ctx, const char *data, size_t size)
{
if(ctx->fail_state)
{
return -1;
}
if(evbuffer_add(ctx->put.evbuf, data, size))
{
return -1;
}
ctx->instance->statistic.memory_used += size;
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
}
return 0;
}
int tango_cache_update_frag_evbuf(struct tango_cache_ctx *ctx, enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf)
{
size_t size;
if(ctx->fail_state)
{
return -1;
}
size = evbuffer_get_length(evbuf);
if(way == EVBUFFER_MOVE)
{
if(evbuffer_add_buffer(ctx->put.evbuf, evbuf))
{
return -1;
}
}
else
{
if(evbuffer_add_buffer_reference(ctx->put.evbuf, evbuf))
{
return -1;
}
}
ctx->instance->statistic.memory_used += size;
if(evbuffer_get_length(ctx->put.evbuf) >= ctx->instance->upload_block_size)
{
cache_kick_upload_minio_multipart(ctx, ctx->instance->upload_block_size);
}
return 0;
}
struct tango_cache_ctx *tango_cache_update_prepare(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
{
struct tango_cache_ctx *ctx;
char buffer[2064];
time_t expires, now, last_modify;
if((u_int64_t)instance->statistic.memory_used >= instance->cache_limit_size)
{
instance->error_code = CACHE_OUTOF_MEMORY;
instance->statistic.totaldrop_num += 1;
return NULL;
}
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->method = CACHE_REQUEST_PUT;
if(instance->hash_object_key)
{
caculate_sha256(meta->url, strlen(meta->url), buffer, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", buffer[0], buffer[1], buffer[2], buffer[3], buffer+4);
//±£´æÔ­Ê¼URL
snprintf(buffer, 2064, "x-amz-meta-url: %s", meta->url);
ctx->headers = curl_slist_append(ctx->headers, buffer);
}
else
{
snprintf(ctx->object_key, 256, "%s", meta->url);
}
if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
free(ctx);
return NULL;
}
//Expires×ֶΣ¬ÓÃÓÚ»º´æÄÚ²¿Åж¨»º´æÊÇ·ñ³¬Ê±
now = time(NULL);
expires = (meta->put.timeout==0||meta->put.timeout>instance->relative_ttl)?instance->relative_ttl:meta->put.timeout;
if(expires_timestamp2hdr_str(now + expires, buffer, 256))
{
ctx->headers = curl_slist_append(ctx->headers, buffer);
}
//Last-Modify×ֶΣ¬ÓÃÓÚGETʱÅж¨ÊÇ·ñÐÂÏÊ
last_modify = (meta->put.date > meta->put.last_modified)?meta->put.date:meta->put.last_modified;
if(last_modify == 0)
{
last_modify = get_gmtime_timestamp(now);
}
sprintf(buffer, "x-amz-meta-lm: %lu", last_modify);
ctx->headers = curl_slist_append(ctx->headers, buffer);
//ÁбíÖÐÖ§³ÖµÄ±ê׼ͷ²¿
for(int i=0; i<HDR_CONTENT_NUM; i++)
{
if(meta->std_hdr[i] != NULL)
{
ctx->headers = curl_slist_append(ctx->headers, meta->std_hdr[i]);
}
}
if(meta->std_hdr[HDR_CONTENT_TYPE] == NULL)
{
ctx->headers = curl_slist_append(ctx->headers, "Content-Type:");
}
ctx->headers = curl_slist_append(ctx->headers, "Expect:");//×¢ÒâPOST·½·¨ÓëExpect¹ØÏµ£¬ÒªÃ÷È·¸ø³öCURLOPT_POSTFIELDSIZE
//ÆäËû¶¨ÒåµÄÍ·²¿£¬GETʱ»áÔ­Ñù·µ»Ø
if(meta->usertag_len>0 && meta->usertag_len<=USER_TAG_MAX_LEN)
{
char *p = (char *)malloc((meta->usertag_len/3 + 1)*4 + 18); //¼ÆËã±àÂëºóËùÐè¿Õ¼ä£»18=17+1: Í·²¿+×Ö·û´®½áÊø·û
memcpy(p, "x-amz-meta-user: ", 17);
Base64_EncodeBlock((const unsigned char*)meta->usertag, meta->usertag_len, (unsigned char*)p+17);
ctx->headers = curl_slist_append(ctx->headers, p);
free(p);
}
ctx->put.evbuf = evbuffer_new();
TAILQ_INIT(&ctx->put.etag_head);
return ctx;
}
struct tango_cache_ctx *tango_cache_update_start(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_update_prepare(instance, f, meta);
if(ctx == NULL)
{
return NULL;
}
ctx->instance->statistic.put_recv_num += 1;
ctx->instance->error_code = CACHE_OK;
return ctx;
}
int tango_cache_upload_once_data(struct tango_cache_instance *instance, struct future* f,
enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_update_prepare(instance, f, meta);
if(ctx == NULL)
{
return -1;
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_data(ctx, way, data, size);
}
int tango_cache_upload_once_evbuf(struct tango_cache_instance *instance, struct future* f,
enum EVBUFFER_COPY_WAY way, struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_update_prepare(instance, f, meta);
if(ctx == NULL)
{
return -1;
}
if(path != NULL)
{
snprintf(path, pathsize, "http://%s/%s/%s", ctx->hostaddr, instance->bucketname, ctx->object_key);
}
return tango_cache_upload_once_start_evbuf(ctx, way, evbuf);
}
struct tango_cache_ctx *tango_cache_fetch_prepare(struct tango_cache_instance *instance, enum CACHE_REQUEST_METHOD method, struct future* f, struct tango_cache_meta_get *meta)
{
struct tango_cache_ctx *ctx;
char sha256[72];
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->method = method;
ctx->get.state = GET_STATE_START;
ctx->get.max_age = meta->get.max_age;
ctx->get.min_fresh = meta->get.min_fresh;
if(instance->hash_object_key)
{
caculate_sha256(meta->url, strlen(meta->url), sha256, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
}
else
{
snprintf(ctx->object_key, 256, "%s", meta->url);
}
if(wired_load_balancer_lookup(instance->wiredlb, meta->url, strlen(meta->url), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
free(ctx);
return NULL;
}
return ctx;
}
int tango_cache_fetch_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_GET, f, meta);
if(ctx == NULL)
{
return -1;
}
return tango_cache_fetch_start(ctx);
}
int tango_cache_head_object(struct tango_cache_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_fetch_prepare(instance, CACHE_REQUEST_HEAD, f, meta);
if(ctx == NULL)
{
return -1;
}
#ifdef HEAD_OBJECT_FROM_REDIS
return tango_cache_head_redis(ctx);
#else
return tango_cache_fetch_start(ctx);
#endif
}
struct tango_cache_ctx *tango_cache_delete_prepare(struct tango_cache_instance *instance, struct future* f, const char *objkey)
{
struct tango_cache_ctx *ctx;
char sha256[72];
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->method = CACHE_REQUEST_DELETE;
if(instance->hash_object_key)
{
caculate_sha256(objkey, strlen(objkey), sha256, 72);
snprintf(ctx->object_key, 256, "%c%c/%c%c/%s", sha256[0], sha256[1], sha256[2], sha256[3], sha256+4);
}
else
{
snprintf(ctx->object_key, 256, "%s", objkey);
}
if(wired_load_balancer_lookup(instance->wiredlb, objkey, strlen(objkey), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += 1;
free(ctx);
return NULL;
}
return ctx;
}
int tango_cache_delete_object(struct tango_cache_instance *instance, struct future* f, const char *objkey)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_delete_prepare(instance, f, objkey);
if(ctx == NULL)
{
return -1;
}
return (cache_delete_minio_object(ctx)==1)?0:-1;
}
struct tango_cache_ctx *tango_cache_multi_delete_prepare(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num)
{
struct tango_cache_ctx *ctx;
char md5[48]={0}, content_md5[48];
ctx = (struct tango_cache_ctx *)calloc(1, sizeof(struct tango_cache_ctx));
ctx->instance = instance;
ctx->future = f;
ctx->method = CACHE_REQUEST_DELETE_MUL;
ctx->del.succ_num = num;
if(wired_load_balancer_lookup(instance->wiredlb, objlist[0], strlen(objlist[0]), ctx->hostaddr, 48))
{
instance->error_code = CACHE_ERR_WIREDLB;
instance->statistic.totaldrop_num += num;
free(ctx);
return NULL;
}
construct_multiple_delete_xml(ctx->instance->bucketname, objlist, num, instance->hash_object_key, &ctx->response.buff, &ctx->response.size);
caculate_base64_md5(ctx->response.buff, ctx->response.size, (unsigned char *)md5, 48);
sprintf(content_md5, "Content-MD5: %s", md5);
ctx->headers = curl_slist_append(ctx->headers, content_md5);
ctx->headers = curl_slist_append(ctx->headers, "Content-Type: application/xml");
ctx->headers = curl_slist_append(ctx->headers, "Expect:");
return ctx;
}
//TODO: AccessDenied
int tango_cache_multi_delete(struct tango_cache_instance *instance, struct future* f, char *objlist[], u_int32_t num)
{
struct tango_cache_ctx *ctx;
ctx = tango_cache_multi_delete_prepare(instance, f, objlist, num);
if(ctx == NULL)
{
return -1;
}
return tango_cache_multi_delete_start(ctx);
}
static void check_multi_info(CURLM *multi)
{
CURLMsg *msg;
int msgs_left;
struct tango_cache_ctx *ctx;
CURL *easy;
CURLcode res;
long res_code;
while((msg = curl_multi_info_read(multi, &msgs_left)))
{
if(msg->msg != CURLMSG_DONE)
{
continue;
}
easy = msg->easy_handle;
res = msg->data.result;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctx);
curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &res_code);
curl_multi_remove_handle(multi, easy);
curl_easy_cleanup(easy);
ctx->curl = NULL;
ctx->res_code = 0;
switch(ctx->method)
{
case CACHE_REQUEST_GET:
case CACHE_REQUEST_HEAD:
tango_cache_curl_get_done(ctx, res, res_code);
break;
case CACHE_REQUEST_PUT:
tango_cache_curl_put_done(ctx, res, res_code);
break;
case CACHE_REQUEST_DELETE:
tango_cache_curl_del_done(ctx, res, res_code);
break;
case CACHE_REQUEST_DELETE_MUL:
tango_cache_curl_muldel_done(ctx, res, res_code);
break;
default: break;
}
}
}
/* Called by libevent when we get action on a multi socket */
static void libevent_socket_event_cb(int fd, short action, void *userp)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from event_assign
CURLMcode rc;
int what, still_running;
what = ((action&EV_READ)?CURL_CSELECT_IN:0) | ((action & EV_WRITE)?CURL_CSELECT_OUT:0);
rc = curl_multi_socket_action(instance->multi_hd, fd, what, &still_running);
instance->statistic.session_num = still_running;
assert(rc==CURLM_OK);
check_multi_info(instance->multi_hd);
if(still_running<=0 && evtimer_pending(&instance->timer_event, NULL))
{
evtimer_del(&instance->timer_event);
}
}
/* Called by libevent when our timeout expires */
static void libevent_timer_event_cb(int fd, short kind, void *userp)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
CURLMcode rc;
int still_running;
rc = curl_multi_socket_action(instance->multi_hd, CURL_SOCKET_TIMEOUT, 0, &still_running);
instance->statistic.session_num = still_running;
assert(rc==CURLM_OK);
check_multi_info(instance->multi_hd);
}
static int curl_socket_function_cb(CURL *curl, curl_socket_t sockfd, int what, void *userp, void *sockp)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; //from multi handle
struct curl_socket_data *sockinfo = (struct curl_socket_data *)sockp; //curl_multi_assign, for socket
int action;
if(what == CURL_POLL_REMOVE)
{
if(sockinfo != NULL)
{
event_del(&sockinfo->sock_event);
free(sockinfo);
}
}
else
{
if(sockinfo == NULL)
{
sockinfo = (struct curl_socket_data *)calloc(1, sizeof(struct curl_socket_data));
curl_multi_assign(instance->multi_hd, sockfd, sockinfo);
}
else
{
event_del(&sockinfo->sock_event);
}
action = (what&CURL_POLL_IN?EV_READ:0)|(what&CURL_POLL_OUT?EV_WRITE:0)|EV_PERSIST;
event_assign(&sockinfo->sock_event, instance->evbase, sockfd, action, libevent_socket_event_cb, instance);
event_add(&sockinfo->sock_event, NULL);
}
return 0;
}
static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
struct timeval timeout;
CURLMcode rc;
int still_running;
timeout.tv_sec = timeout_ms/1000;
timeout.tv_usec = (timeout_ms%1000)*1000;
if(timeout_ms == 0)
{
//timeout_ms is 0 means we should call curl_multi_socket_action/curl_multi_perform at once.
//To initiate the whole process(inform CURLMOPT_SOCKETFUNCTION callback) or when timeout occurs.
rc = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &still_running);
instance->statistic.session_num = still_running;
assert(rc==CURLM_OK);
}
else if(timeout_ms == -1) //timeout_ms is -1 means we should delete the timer.
{
evtimer_del(&instance->timer_event);
}
else //update the timer to the new value.
{
evtimer_add(&instance->timer_event, &timeout);
}
return 0; //0-success; -1-error
}
static int wired_load_balancer_init(struct tango_cache_instance *instance)
{
instance->wiredlb = wiredLB_create(instance->wiredlb_topic, instance->wiredlb_group, WLB_PRODUCER);
if(instance->wiredlb == NULL)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_create failed.\n");
return -1;
}
wiredLB_set_opt(instance->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &instance->wiredlb_ha_port, sizeof(instance->wiredlb_ha_port));
wiredLB_set_opt(instance->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &instance->wiredlb_override, sizeof(instance->wiredlb_override));
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_DATACENTER, instance->wiredlb_datacenter, strlen(instance->wiredlb_datacenter)+1);
if(instance->wiredlb_override)
{
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, instance->minio_iplist, strlen(instance->minio_iplist)+1);
wiredLB_set_opt(instance->wiredlb, WLB_PROD_OPT_OVERRIDE_DATAPORT, &instance->minio_port, sizeof(instance->minio_port));
}
if(wiredLB_init(instance->wiredlb) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "wiredLB_init failed.\n");
return -1;
}
return 0;
}
static int load_local_configure(struct tango_cache_instance *instance, const char* profile_path, const char* section)
{
u_int32_t intval;
u_int64_t longval;
MESA_load_profile_uint_def(profile_path, section, "MAX_CONNECTION_PER_HOST", &intval, 0);
instance->max_cnn_host = intval;
MESA_load_profile_uint_def(profile_path, section, "MAX_USED_MEMORY_SIZE_MB", &intval, 5120);
longval = intval;
instance->cache_limit_size = longval * 1024 * 1024;
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_BUCKET_NAME", instance->bucketname, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_BUCKET_NAME not found.\n", profile_path, section);
return -1;
}
#ifdef HEAD_OBJECT_FROM_REDIS
MESA_load_profile_string_def(profile_path, section, "CACHE_REDIS_KEY", instance->redis_key, 256, instance->bucketname);
if(MESA_load_profile_string_nodef(profile_path, section, "CACHE_REDIS_IP", instance->redis_ip, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_REDIS_IP not found.\n", profile_path, section);
return -1;
}
MESA_load_profile_int_def(profile_path, section, "CACHE_REDIS_PORT", &instance->redis_port, 6379);
#endif
MESA_load_profile_uint_def(profile_path, section, "CACHE_OBJECT_KEY_HASH_SWITCH", &instance->hash_object_key, 1);
MESA_load_profile_uint_def(profile_path, section, "MINIO_LISTEN_PORT", &instance->minio_port, 9000);
if(MESA_load_profile_string_nodef(profile_path, section, "MINIO_IP_LIST", instance->minio_iplist, 4096) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] MINIO_BROKERS_LIST not found.\n", profile_path, section);
return -1;
}
MESA_load_profile_uint_def(profile_path, section, "CACHE_UPLOAD_BLOCK_SIZE", &instance->upload_block_size, 5242880);
if(instance->upload_block_size < 5242880)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_UPLOAD_BLOCK_SIZE too small, must bigger than 5242880(5MB).\n", profile_path, section);
return -1;
}
MESA_load_profile_uint_def(profile_path, section, "CACHE_DEFAULT_TTL_SECOND", &intval, 999999999);
if(intval < 60)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Load config %s [%s] CACHE_DEFAULT_TTL_SECOND too small, must bigger than 60s.\n", profile_path, section);
return -1;
}
instance->relative_ttl = intval;
//Wired_LB²ÎÊý
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", instance->wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_GROUP", instance->wiredlb_group, 64, "KAZAKHSTAN");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", instance->wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &instance->wiredlb_override, 1);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
instance->wiredlb_ha_port = (u_int16_t)intval;
return 0;
}
struct tango_cache_instance *tango_cache_instance_new(struct event_base* evbase,const char* profile_path, const char* section, void *runtimelog)
{
struct tango_cache_instance *instance;
instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance));
memset(instance, 0, sizeof(struct tango_cache_instance));
instance->runtime_log = runtimelog;
instance->evbase = evbase;
if(load_local_configure(instance, profile_path, section))
{
free(instance);
return NULL;
}
if(wired_load_balancer_init(instance))
{
free(instance);
return NULL;
}
instance->multi_hd = curl_multi_init();
curl_multi_setopt(instance->multi_hd, CURLMOPT_PIPELINING, CURLPIPE_HTTP1 | CURLPIPE_MULTIPLEX);
curl_multi_setopt(instance->multi_hd, CURLMOPT_MAX_HOST_CONNECTIONS, instance->max_cnn_host);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETFUNCTION, curl_socket_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_SOCKETDATA, instance); //curl_socket_function_cb *userp
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERFUNCTION, curl_timer_function_cb);
curl_multi_setopt(instance->multi_hd, CURLMOPT_TIMERDATA, instance);
#ifdef HEAD_OBJECT_FROM_REDIS
if(redis_asyn_connect_init(instance, instance->redis_ip, instance->redis_port))
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u failed.", instance->redis_ip, instance->redis_port);
free(instance);
return NULL;
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "redis_asyn_connect_init %s:%u success.", instance->redis_ip, instance->redis_port);
}
#endif
evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
return instance;
}
void tango_cache_global_init(void)
{
curl_global_init(CURL_GLOBAL_NOTHING);
}