Feature cache client

This commit is contained in:
郑超
2019-01-11 22:38:07 +08:00
parent 6d33ec5891
commit f5c153c59e
8 changed files with 313 additions and 40 deletions

View File

@@ -400,7 +400,8 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
return NULL;
@@ -444,6 +445,7 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct
free(buffer->data);
free(buffer);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
@@ -478,6 +480,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc
evbuffer_free(buffer->evbuf);
free(buffer);
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
return -2;
@@ -511,6 +514,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
@@ -546,6 +550,7 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);
@@ -576,6 +581,7 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu
if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *))
{
instance->instance->error_code = CACHE_ERR_SOCKPAIR;
tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR);
tango_cache_ctx_destroy(ctx_asyn->ctx, false);
cache_asyn_ctx_destroy(ctx_asyn);
free(buffer);

View File

@@ -215,6 +215,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback)
if(ctx->put.uploadID != NULL) free(ctx->put.uploadID);
if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml);
if(ctx->put.object_meta != NULL) cJSON_Delete(ctx->put.object_meta);
if(ctx->put.once_request.len > 0)
{
ctx->instance->statistic.memory_used -= ctx->put.once_request.size;
easy_string_destroy(&ctx->put.once_request);
}
if(ctx->put.evbuf!=NULL)
{
ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf);
@@ -267,6 +272,10 @@ bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_L
//<2F><><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD>API<50><49>ʹ<EFBFBD><CAB9>ctx<74><78>evbuffer<65><72><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>޷<EFBFBD><DEB7><EFBFBD><EFBFBD><EFBFBD>ctx<74><78>ȡ<EFBFBD><C8A1><EFBFBD><EFBFBD>
enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size)
{
if(instance->param->fsstatid_trig)
{
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_histlen_id, 0, FS_OP_SET, object_size);
}
if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize)
{
return OBJECT_IN_MINIO;
@@ -883,6 +892,127 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp)
return 0; //0-success; -1-error
}
static void instance_statistic_timer_cb(int fd, short kind, void *userp)
{
struct tango_cache_instance *instance = (struct tango_cache_instance *)userp;
struct timeval tv;
struct cache_statistics incr_statistic;
long long *plast_statistic = (long long*)&instance->statistic_last;
long long *pnow_statistic = (long long*)&instance->statistic;
long long *pinc_statistic = (long long*)&incr_statistic;
for(u_int32_t i=0; i<sizeof(struct cache_statistics)/sizeof(long long); i++)
{
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
}
instance->statistic_last = instance->statistic;
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_RECV], 0, FS_OP_ADD, incr_statistic.get_recv_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_TOTAL], 0, FS_OP_ADD, incr_statistic.get_succ_http+incr_statistic.get_succ_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_HTTP], 0, FS_OP_ADD, incr_statistic.get_succ_http);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_REDIS], 0, FS_OP_ADD, incr_statistic.get_succ_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_MISS], 0, FS_OP_ADD, incr_statistic.get_miss_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_TOTAL], 0, FS_OP_ADD, incr_statistic.get_err_http+incr_statistic.get_err_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_HTTP], 0, FS_OP_ADD, incr_statistic.get_err_http);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_REDIS], 0, FS_OP_ADD, incr_statistic.get_err_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_RECV], 0, FS_OP_ADD, incr_statistic.put_recv_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_TOTAL], 0, FS_OP_ADD, incr_statistic.put_succ_http+incr_statistic.put_succ_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_HTTP], 0, FS_OP_ADD, incr_statistic.put_succ_http);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_REDIS], 0, FS_OP_ADD, incr_statistic.put_succ_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_TOTAL], 0, FS_OP_ADD, incr_statistic.put_err_http+incr_statistic.put_err_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_HTTP], 0, FS_OP_ADD, incr_statistic.put_err_http);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_REDIS], 0, FS_OP_ADD, incr_statistic.put_err_redis);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_RECV], 0, FS_OP_ADD, incr_statistic.del_recv_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_SUCC], 0, FS_OP_ADD, incr_statistic.del_succ_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_ERROR], 0, FS_OP_ADD, incr_statistic.del_error_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_TOTAL_DROP], 0, FS_OP_ADD, incr_statistic.totaldrop_num);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_MEM_USED], 0, FS_OP_ADD, incr_statistic.memory_used);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_HTTP], 0, FS_OP_ADD, incr_statistic.session_http);
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_REDIS], 0, FS_OP_ADD, incr_statistic.session_redis);
tv.tv_sec = instance->param->fsstat_period;
tv.tv_usec = 0;
event_add(&instance->timer_statistic, &tv);
}
static int _unfold_IP_range(char* ip_range, char***ip_list, int size)
{
int i=0,count=0, ret=0;
int range_digits[5];
memset(range_digits,0,sizeof(range_digits));
ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]);
if(ret!=4&&ret!=5)
{
return 0;
}
if(ret==4&&range_digits[4]==0)
{
range_digits[4]=range_digits[3];
}
for(i=0;i<5;i++)
{
if(range_digits[i]<0||range_digits[i]>255)
{
return 0;
}
}
count=range_digits[4]-range_digits[3]+1;
*ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count));
for(i=0;i<count;i++)
{
(*ip_list)[size+i]=(char*)malloc(64);
snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i);
}
return count;
}
static int unfold_IP_range(const char* ip_range, char***ip_list)
{
char *token=NULL,*sub_token=NULL,*saveptr;
char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1);
int count=0;
strcpy(buffer,ip_range);
for (token = buffer; ; token= NULL)
{
sub_token= strtok_r(token,";", &saveptr);
if (sub_token == NULL)
break;
count+=_unfold_IP_range(sub_token, ip_list,count);
}
free(buffer);
return count;
}
static int build_redis_cluster_addrs(const char *iplist, const char *ports, char *redisaddrs, size_t size, void *runtimelog)
{
u_int32_t redis_ip_num;
u_int32_t redis_port_start, redis_port_end;
char **redis_iplist=NULL;
size_t addrlen;
int ret;
redis_ip_num = unfold_IP_range(iplist, &redis_iplist);
if(redis_ip_num ==0 )
{
MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_IP_LIST %s failed.", iplist);
return -1;
}
ret = sscanf(ports, "%u-%u", &redis_port_start, &redis_port_end);
if(ret!=1 && ret!=2)
{
MESA_HANDLE_RUNTIME_LOGV2(runtimelog, RLOG_LV_FATAL, "decode REDIS_CLUSTER_PORT_RANGE %s failed.", iplist);
return -2;
}
memset(redisaddrs, 0, size);
for(u_int32_t i=0; i<redis_ip_num; i++)
{
addrlen = strlen(redisaddrs);
snprintf(redisaddrs+addrlen, size-addrlen, "%s:%u,", redis_iplist[i], redis_port_start);
}
addrlen = strlen(redisaddrs);
redisaddrs[addrlen-1] = '\0';
return 0;
}
static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runtime_log)
{
wparam->wiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER);
@@ -893,7 +1023,10 @@ static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runt
}
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port));
wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override));
if(strlen(wparam->wiredlb_datacenter) > 0)
{
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1);
}
if(wparam->wiredlb_override)
{
wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1);
@@ -907,11 +1040,54 @@ static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runt
return 0;
}
int register_field_stat(struct tango_cache_parameter *param, void *runtime_log)
{
int value;
const char *field_names[FS_FILED_NUM]={"GET_RECV", "GET_S_TOTAL", "GET_S_HTTP", "GET_S_REDIS", "GET_MISS", "GET_E_TOTAL", "GET_E_HTTP", "GET_E_REDIS",
"PUT_RECV", "PUT_S_TOTAL", "PUT_S_HTTP", "PUT_S_REDIS", "PUT_E_TOTAL", "PUT_E_HTTP", "PUT_E_REDIS",
"DEL_RECV", "DEL_SUCC", "DEL_ERROR", "TOTAL_DROP", "MEM_USED", "SESSION_HTTP", "SESSION_REDIS"};
param->fsstat_handle = FS_create_handle();
FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
value = 1;
FS_set_para(param->fsstat_handle, PRINT_MODE, &value, sizeof(value));
value = 2;
FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
value = 1;
FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
if(strlen(param->fsstat_histlen)>0 && FS_set_para(param->fsstat_handle, HISTOGRAM_GLOBAL_BINS, param->fsstat_histlen, strlen(param->fsstat_histlen)+1) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_set_para %s failed.", param->fsstat_histlen);
return -1;
}
for(int i=0; i<=FS_FILED_TOTAL_DROP; i++)
{
param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
}
for(int i=FS_FILED_MEM_USED; i<=FS_FILED_SESS_REDIS; i++)
{
param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, field_names[i]);
}
param->fsstat_histlen_id = FS_register_histogram(param->fsstat_handle, FS_CALC_CURRENT, "length(bytes)", 1L, 17179869184L, 3);
if(param->fsstat_histlen_id < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_register_histogram failed.");
return -1;
}
FS_start(param->fsstat_handle);
return 0;
}
struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log)
{
u_int32_t intval;
u_int64_t longval;
struct tango_cache_parameter *param;
char redis_cluster_ip[512], redis_ports[256];
param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter));
@@ -950,7 +1126,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
//wiredlb
MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER");
MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64, "ASTANA");
MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", &param->minio.wiredlb_override, 1);
MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100);
param->minio.wiredlb_ha_port = intval;
@@ -974,9 +1150,18 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
}
if(param->object_store_way != CACHE_ALL_MINIO)
{
if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_ADDRS", param->redisaddrs, 4096) < 0)
if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_IP_LIST not found.", profile_path, section);
return NULL;
}
if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_PORT_RANGE", redis_ports, 256) < 0)
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_PORT_RANGE not found.", profile_path, section);
return NULL;
}
if(build_redis_cluster_addrs(redis_cluster_ip, redis_ports, param->redisaddrs, 4096, runtime_log))
{
MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_ADDRS not found.", profile_path, section);
return NULL;
}
MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", &param->redis_object_maxsize, 10240);
@@ -986,6 +1171,19 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path
return NULL;
}
}
//FieldStat LOG
MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_APPNAME", param->fsstat_appname, 16, "TANGO_CACHE");
MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_FILEPATH", param->fsstat_filepath, 256, "./log/tangocache_fsstat.log");
MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_INTERVAL", &param->fsstat_period, 10);
MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_TRIG", &param->fsstatid_trig, 0);
MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_DST_IP", param->fsstat_dst_ip, 64, "10.172.128.2");
MESA_load_profile_int_def(profile_path, section, "LOG_FSSTAT_DST_PORT", &param->fsstat_dst_port, 8125);
MESA_load_profile_string_nodef(profile_path, section, "LOG_FSSTAT_HISTBINS", param->fsstat_histlen, 256);
if(param->fsstatid_trig && register_field_stat(param, runtime_log))
{
return NULL;
}
return param;
}
@@ -993,6 +1191,8 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet
{
struct tango_cache_instance *instance;
char *redis_sep, *save_ptr=NULL;
struct timeval tv;
time_t now, remain;
instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance));
memset(instance, 0, sizeof(struct tango_cache_instance));
@@ -1021,6 +1221,16 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet
sprintf(instance->redisaddr, "%s", redis_sep);
}
evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance);
if(param->fsstatid_trig)
{
evtimer_assign(&instance->timer_statistic, evbase, instance_statistic_timer_cb, instance);
now = time(NULL);
remain = instance->param->fsstat_period - (now % instance->param->fsstat_period);
tv.tv_sec = remain;
tv.tv_usec = 0;
evtimer_add(&instance->timer_statistic, &tv);
}
return instance;
}

View File

@@ -3,6 +3,7 @@
#include <curl/curl.h>
#include <sys/queue.h>
#include <pthread.h>
#include <event2/event.h>
#include <event.h>
@@ -11,6 +12,7 @@
#include <cjson/cJSON.h>
#include <MESA/wiredLB.h>
#include <MESA/field_stat2.h>
#include "tango_cache_client.h"
#define RESPONSE_HDR_EXPIRES 1
@@ -21,6 +23,36 @@
#define CACHE_META_REDIS 1 //Ԫ<><D4AA>Ϣ<EFBFBD><CFA2>REDIS<49><53><EFBFBD><EFBFBD><EFBFBD><EFBFBD>MINIO
#define CACHE_SMALL_REDIS 2 //Ԫ<><D4AA>Ϣ<EFBFBD><CFA2>С<EFBFBD>ļ<EFBFBD><C4BC><EFBFBD>REDIS<49><53><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD>MINIO
enum FIELD_STAT_FILEDS
{
FS_FILED_GET_RECV=0,
FS_FILED_GET_S_TOTAL,
FS_FILED_GET_S_HTTP,
FS_FILED_GET_S_REDIS,
FS_FILED_GET_MISS,
FS_FILED_GET_E_TOTAL,
FS_FILED_GET_E_HTTP,
FS_FILED_GET_E_REDIS,
FS_FILED_PUT_RECV,
FS_FILED_PUT_S_TOTAL,
FS_FILED_PUT_S_HTTP,
FS_FILED_PUT_S_REDIS,
FS_FILED_PUT_E_TOTAL,
FS_FILED_PUT_E_HTTP,
FS_FILED_PUT_E_REDIS,
FS_FILED_DEL_RECV,
FS_FILED_DEL_SUCC,
FS_FILED_DEL_ERROR,
FS_FILED_TOTAL_DROP,
//Next use Status
FS_FILED_MEM_USED,
FS_FILED_SESS_HTTP,
FS_FILED_SESS_REDIS,
FS_FILED_NUM,
};
enum CACHE_REQUEST_METHOD
{
CACHE_REQUEST_GET=0,
@@ -88,12 +120,25 @@ struct tango_cache_parameter
struct wiredlb_parameter minio;
char redisaddrs[4096];
u_int32_t redis_object_maxsize;//С<>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD>redisʱ<73><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С
//FieldStatLog
int32_t fsstat_dst_port;
char fsstat_dst_ip[64];
char fsstat_appname[16];
char fsstat_filepath[256];
u_int32_t fsstat_period;
u_int32_t fsstatid_trig;
char fsstat_histlen[256];
screen_stat_handle_t fsstat_handle;
int32_t fsstat_histlen_id;
int32_t fsstat_field_ids[FS_FILED_NUM];
};
struct tango_cache_instance
{
struct event_base* evbase;
struct event timer_event;
struct event timer_statistic;
CURLM *multi_hd;
enum CACHE_ERR_CODE error_code;
@@ -104,6 +149,7 @@ struct tango_cache_instance
const struct tango_cache_parameter *param;
void *runtime_log;
struct cache_statistics statistic;
struct cache_statistics statistic_last; //<2F><><EFBFBD>ڶ<EFBFBD><DAB6><EFBFBD>instanceʹ<65><CAB9>ͬһ<CDAC><D2BB>fieldstat<61>ۼ<EFBFBD>
};
struct multipart_etag_list
@@ -137,6 +183,7 @@ struct cache_ctx_data_put
char *combine_xml;
TAILQ_HEAD(__etag_list_head, multipart_etag_list) etag_head;
cJSON *object_meta;
struct easy_string once_request; //һ<><D2BB><EFBFBD><EFBFBD>PUTʱ<54><EFBFBD><E6B4A2><EFBFBD><EFBFBD><EFBFBD>ݣ<EFBFBD>ʧ<EFBFBD>ܵ<EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ܸ<EFBFBD><DCB8><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
enum PUT_OBJECT_STATE state;
u_int32_t part_index; //<2F><>RESPONSE_HDR_
u_int32_t object_ttl;

View File

@@ -65,24 +65,24 @@ static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void *
size_t len;
struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp;
if(size==0 || count==0 || ctx->response.len>=ctx->response.size)
if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size)
{
return 0; //<2F><>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
}
len = ctx->response.size - ctx->response.len; //ʣ<><CAA3><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD><CFB4>ij<EFBFBD><C4B3><EFBFBD>
len = ctx->put.once_request.size - ctx->put.once_request.len; //ʣ<><CAA3><EFBFBD><EFBFBD><EFBFBD>ϴ<EFBFBD><CFB4>ij<EFBFBD><C4B3><EFBFBD>
if(len > size * count)
{
len = size * count;
}
memcpy(ptr, ctx->response.buff + ctx->response.len, len);
ctx->response.len += len;
memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len);
ctx->put.once_request.len += len;
if(ctx->response.len >= ctx->response.size)
if(ctx->put.once_request.len >= ctx->put.once_request.size)
{
ctx->instance->statistic.memory_used -= ctx->response.size; //δʹ<CEB4><CAB9>cache buffer<65><72><EFBFBD>Լ<EFBFBD><D4BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD><EFBFBD><EFBFBD>
easy_string_destroy(&ctx->response);
ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //δʹ<CEB4><CAB9>cache buffer<65><72><EFBFBD>Լ<EFBFBD><D4BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD><EFBFBD><EFBFBD>
easy_string_destroy(&ctx->put.once_request);
}
return len;
}
@@ -532,17 +532,17 @@ int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COP
if(way == PUT_MEM_COPY)
{
ctx->response.buff = (char *)malloc(size);
memcpy(ctx->response.buff, data, size);
ctx->put.once_request.buff = (char *)malloc(size);
memcpy(ctx->put.once_request.buff, data, size);
}
else
{
ctx->response.buff = (char *)data;
ctx->put.once_request.buff = (char *)data;
}
ctx->response.size = size;
ctx->response.len = 0;
ctx->put.once_request.size = size;
ctx->put.once_request.len = 0;
curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->response.size);
curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size);
curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb);
curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx);

View File

@@ -394,6 +394,7 @@ int main(int argc, char **argv)
pthread_attr_t attr;
void *runtime_log;
struct filecontentcmd filecmd;
time_t now, remain;
struct event ev_timer;
struct timeval tv;
@@ -431,7 +432,9 @@ int main(int argc, char **argv)
}
ev_base = event_base_new();
tv.tv_sec = 10;
now = time(NULL);
remain = 10 - (now % 10);
tv.tv_sec = remain;
tv.tv_usec = 0;
evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer);
evtimer_add(&ev_timer, &tv);

BIN
cache/test/lib/libtango_cache_client.a vendored Normal file

Binary file not shown.

View File

@@ -1,35 +1,42 @@
[TANGO_CACHE]
#Addresses of minio. Format is defined by WiredLB.
MINIO_IP_LIST=10.3.35.1;
MINIO_LISTEN_PORT=9000
minio_ip_list=10.3.35.60-61;
minio_listen_port=9000
#Maximum number of connections opened by per host.
#MAX_CONNECTION_PER_HOST=1
#max_connection_per_host=1
#Maximum number of requests in a pipeline.
#MAX_CNNT_PIPELINE_NUM=20
#max_cnnt_pipeline_num=20
#Maximum parellel sessions(http and redis) is allowed to open.
#MAX_CURL_SESSION_NUM=100
#max_curl_session_num=100
#Maximum time the request is allowed to take(seconds).
#MAX_CURL_TRANSFER_TIMEOUT_S=0
#max_curl_transfer_timeout_s=0
#Bucket name in minio.
CACHE_BUCKET_NAME=openbucket
cache_bucket_name=openbucket
#Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value.
MAX_USED_MEMORY_SIZE_MB=5120
max_used_memory_size_mb=5120
#Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute).
CACHE_DEFAULT_TTL_SECOND=3600
cache_default_ttl_second=3600
#Whether to hash the object key before cache actions. GET/PUT may be faster if you open it.
CACHE_OBJECT_KEY_HASH_SWITCH=1
cache_object_key_hash_switch=1
#Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio;
CACHE_STORE_OBJECT_WAY=2
#If CACHE_STORE_OBJECT_WAY is 2 and the size of a object is not bigger than this value, object will be stored in redis.
REDIS_CACHE_OBJECT_SIZE=20480
#If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object.
REDIS_CLUSTER_ADDRS=10.4.35.33:9001,10.4.35.34:9001
cache_store_object_way=2
#If cache_store_object_way is 2 and the size of a object is not bigger than this value, object will be stored in redis.
redis_cache_object_size=20480
#If cache_store_object_way is not 0, we will use redis to store meta and object.
redis_cluster_ip_list=10.4.35.33-34;
redis_cluster_port_range=9001-9016;
#Configs of WiredLB for Minios load balancer.
#WIREDLB_OVERRIDE=1
#WIREDLB_TOPIC=
#WIREDLB_DATACENTER=
WIREDLB_HEALTH_PORT=52101
#WIREDLB_GROUP=
#wiredlb_override=1
#wiredlb_topic=
#wiredlb_datacenter=
wiredlb_health_port=52102
#wiredlb_group=
log_fsstat_appname=TANGO_CACHE
log_fsstat_filepath=./field_stat.log
log_fsstat_interval=10
log_fsstat_trig=1
log_fsstat_dst_ip=127.0.0.1
log_fsstat_dst_port=8125

View File

@@ -329,7 +329,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg)
tango_cache_update_frag_data(ctx, buffer, n);
}
fclose(fp);
if(tango_cache_update_end(ctx, pdata->filename, 256))
if(tango_cache_update_end(ctx, pdata->filename, 256) < 0)
{
put_future_failed(FUTURE_ERROR_CANCEL, "", pdata);
}