diff --git a/cache/src/cache_evbase_client.cpp b/cache/src/cache_evbase_client.cpp index bef351e..327b95d 100644 --- a/cache/src/cache_evbase_client.cpp +++ b/cache/src/cache_evbase_client.cpp @@ -400,7 +400,8 @@ struct cache_evbase_ctx *cache_evbase_update_start(struct cache_evbase_instance if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { instance->instance->error_code = CACHE_ERR_SOCKPAIR; - tango_cache_ctx_destroy(ctx_asyn->ctx, false); + tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); + tango_cache_ctx_destroy(ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); return NULL; @@ -444,6 +445,7 @@ int cache_evbase_upload_once_data(struct cache_evbase_instance *instance, struct free(buffer->data); free(buffer); instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx, false); cache_asyn_ctx_destroy(ctx_asyn); return -2; @@ -478,6 +480,7 @@ int cache_evbase_upload_once_evbuf(struct cache_evbase_instance *instance, struc evbuffer_free(buffer->evbuf); free(buffer); instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_set_fail_state(ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx, false); cache_asyn_ctx_destroy(ctx_asyn); return -2; @@ -511,6 +514,7 @@ int cache_evbase_fetch_object(struct cache_evbase_instance *instance, struct fut if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); @@ -546,6 +550,7 @@ int cache_evbase_head_object(struct cache_evbase_instance *instance, struct futu if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); @@ -576,6 +581,7 @@ int cache_evbase_delete_object(struct cache_evbase_instance *instance, struct fu if(iothread_notify_event(instance->notify_sendfd, &buffer, sizeof(void *), 2) != sizeof(void *)) { instance->instance->error_code = CACHE_ERR_SOCKPAIR; + tango_cache_set_fail_state(ctx_asyn->ctx, CACHE_ERR_SOCKPAIR); tango_cache_ctx_destroy(ctx_asyn->ctx, false); cache_asyn_ctx_destroy(ctx_asyn); free(buffer); diff --git a/cache/src/tango_cache_client.cpp b/cache/src/tango_cache_client.cpp index 02d53ae..6653f23 100644 --- a/cache/src/tango_cache_client.cpp +++ b/cache/src/tango_cache_client.cpp @@ -215,6 +215,11 @@ void tango_cache_ctx_destroy(struct tango_cache_ctx *ctx, bool callback) if(ctx->put.uploadID != NULL) free(ctx->put.uploadID); if(ctx->put.combine_xml != NULL) free(ctx->put.combine_xml); if(ctx->put.object_meta != NULL) cJSON_Delete(ctx->put.object_meta); + if(ctx->put.once_request.len > 0) + { + ctx->instance->statistic.memory_used -= ctx->put.once_request.size; + easy_string_destroy(&ctx->put.once_request); + } if(ctx->put.evbuf!=NULL) { ctx->instance->statistic.memory_used -= evbuffer_get_length(ctx->put.evbuf); @@ -267,6 +272,10 @@ bool sessions_exceeds_limit(struct tango_cache_instance *instance, enum OBJECT_L //完整上传API不使用ctx的evbuffer,所以无法根据ctx获取长度 enum OBJECT_LOCATION tango_cache_object_locate(struct tango_cache_instance *instance, size_t object_size) { + if(instance->param->fsstatid_trig) + { + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_histlen_id, 0, FS_OP_SET, object_size); + } if(instance->param->object_store_way!=CACHE_SMALL_REDIS || object_size > instance->param->redis_object_maxsize) { return OBJECT_IN_MINIO; @@ -883,6 +892,127 @@ static int curl_timer_function_cb(CURLM *multi, long timeout_ms, void *userp) return 0; //0-success; -1-error } +static void instance_statistic_timer_cb(int fd, short kind, void *userp) +{ + struct tango_cache_instance *instance = (struct tango_cache_instance *)userp; + struct timeval tv; + struct cache_statistics incr_statistic; + long long *plast_statistic = (long long*)&instance->statistic_last; + long long *pnow_statistic = (long long*)&instance->statistic; + long long *pinc_statistic = (long long*)&incr_statistic; + + for(u_int32_t i=0; istatistic_last = instance->statistic; + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_RECV], 0, FS_OP_ADD, incr_statistic.get_recv_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_TOTAL], 0, FS_OP_ADD, incr_statistic.get_succ_http+incr_statistic.get_succ_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_HTTP], 0, FS_OP_ADD, incr_statistic.get_succ_http); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_S_REDIS], 0, FS_OP_ADD, incr_statistic.get_succ_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_MISS], 0, FS_OP_ADD, incr_statistic.get_miss_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_TOTAL], 0, FS_OP_ADD, incr_statistic.get_err_http+incr_statistic.get_err_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_HTTP], 0, FS_OP_ADD, incr_statistic.get_err_http); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_GET_E_REDIS], 0, FS_OP_ADD, incr_statistic.get_err_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_RECV], 0, FS_OP_ADD, incr_statistic.put_recv_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_TOTAL], 0, FS_OP_ADD, incr_statistic.put_succ_http+incr_statistic.put_succ_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_HTTP], 0, FS_OP_ADD, incr_statistic.put_succ_http); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_S_REDIS], 0, FS_OP_ADD, incr_statistic.put_succ_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_TOTAL], 0, FS_OP_ADD, incr_statistic.put_err_http+incr_statistic.put_err_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_HTTP], 0, FS_OP_ADD, incr_statistic.put_err_http); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_PUT_E_REDIS], 0, FS_OP_ADD, incr_statistic.put_err_redis); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_RECV], 0, FS_OP_ADD, incr_statistic.del_recv_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_SUCC], 0, FS_OP_ADD, incr_statistic.del_succ_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_DEL_ERROR], 0, FS_OP_ADD, incr_statistic.del_error_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_TOTAL_DROP], 0, FS_OP_ADD, incr_statistic.totaldrop_num); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_MEM_USED], 0, FS_OP_ADD, incr_statistic.memory_used); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_HTTP], 0, FS_OP_ADD, incr_statistic.session_http); + FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field_ids[FS_FILED_SESS_REDIS], 0, FS_OP_ADD, incr_statistic.session_redis); + tv.tv_sec = instance->param->fsstat_period; + tv.tv_usec = 0; + event_add(&instance->timer_statistic, &tv); +} + +static int _unfold_IP_range(char* ip_range, char***ip_list, int size) +{ + int i=0,count=0, ret=0; + int range_digits[5]; + memset(range_digits,0,sizeof(range_digits)); + ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]); + if(ret!=4&&ret!=5) + { + return 0; + } + if(ret==4&&range_digits[4]==0) + { + range_digits[4]=range_digits[3]; + } + for(i=0;i<5;i++) + { + if(range_digits[i]<0||range_digits[i]>255) + { + return 0; + } + } + count=range_digits[4]-range_digits[3]+1; + *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count)); + for(i=0;iwiredlb = wiredLB_create(wparam->wiredlb_topic, wparam->wiredlb_group, WLB_PRODUCER); @@ -893,7 +1023,10 @@ static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runt } wiredLB_set_opt(wparam->wiredlb, WLB_OPT_HEALTH_CHECK_PORT, &wparam->wiredlb_ha_port, sizeof(wparam->wiredlb_ha_port)); wiredLB_set_opt(wparam->wiredlb, WLB_OPT_ENABLE_OVERRIDE, &wparam->wiredlb_override, sizeof(wparam->wiredlb_override)); - wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1); + if(strlen(wparam->wiredlb_datacenter) > 0) + { + wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_DATACENTER, wparam->wiredlb_datacenter, strlen(wparam->wiredlb_datacenter)+1); + } if(wparam->wiredlb_override) { wiredLB_set_opt(wparam->wiredlb, WLB_PROD_OPT_OVERRIDE_PRIMARY_IP, wparam->iplist, strlen(wparam->iplist)+1); @@ -907,11 +1040,54 @@ static int wired_load_balancer_init(struct wiredlb_parameter *wparam, void *runt return 0; } +int register_field_stat(struct tango_cache_parameter *param, void *runtime_log) +{ + int value; + const char *field_names[FS_FILED_NUM]={"GET_RECV", "GET_S_TOTAL", "GET_S_HTTP", "GET_S_REDIS", "GET_MISS", "GET_E_TOTAL", "GET_E_HTTP", "GET_E_REDIS", + "PUT_RECV", "PUT_S_TOTAL", "PUT_S_HTTP", "PUT_S_REDIS", "PUT_E_TOTAL", "PUT_E_HTTP", "PUT_E_REDIS", + "DEL_RECV", "DEL_SUCC", "DEL_ERROR", "TOTAL_DROP", "MEM_USED", "SESSION_HTTP", "SESSION_REDIS"}; + + param->fsstat_handle = FS_create_handle(); + FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1); + value = 1; + FS_set_para(param->fsstat_handle, PRINT_MODE, &value, sizeof(value)); + value = 2; + FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); + value = 1; + FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1); + FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1); + FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port)); + if(strlen(param->fsstat_histlen)>0 && FS_set_para(param->fsstat_handle, HISTOGRAM_GLOBAL_BINS, param->fsstat_histlen, strlen(param->fsstat_histlen)+1) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_set_para %s failed.", param->fsstat_histlen); + return -1; + } + + for(int i=0; i<=FS_FILED_TOTAL_DROP; i++) + { + param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); + } + for(int i=FS_FILED_MEM_USED; i<=FS_FILED_SESS_REDIS; i++) + { + param->fsstat_field_ids[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, field_names[i]); + } + param->fsstat_histlen_id = FS_register_histogram(param->fsstat_handle, FS_CALC_CURRENT, "length(bytes)", 1L, 17179869184L, 3); + if(param->fsstat_histlen_id < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "FS_register_histogram failed."); + return -1; + } + FS_start(param->fsstat_handle); + return 0; +} + struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path, const char* section, void *runtime_log) { u_int32_t intval; u_int64_t longval; struct tango_cache_parameter *param; + char redis_cluster_ip[512], redis_ports[256]; param = (struct tango_cache_parameter *)calloc(1, sizeof(struct tango_cache_parameter)); @@ -950,7 +1126,7 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path //wiredlb MESA_load_profile_string_def(profile_path, section, "WIREDLB_TOPIC", param->minio.wiredlb_topic, 64, "TANGO_CACHE_PRODUCER"); - MESA_load_profile_string_def(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64, "ASTANA"); + MESA_load_profile_string_nodef(profile_path, section, "WIREDLB_DATACENTER", param->minio.wiredlb_datacenter, 64); MESA_load_profile_uint_def(profile_path, section, "WIREDLB_OVERRIDE", ¶m->minio.wiredlb_override, 1); MESA_load_profile_uint_def(profile_path, section, "WIREDLB_HEALTH_PORT", &intval, 52100); param->minio.wiredlb_ha_port = intval; @@ -974,9 +1150,18 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path } if(param->object_store_way != CACHE_ALL_MINIO) { - if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_ADDRS", param->redisaddrs, 4096) < 0) + if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_IP_LIST", redis_cluster_ip, 512) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_IP_LIST not found.", profile_path, section); + return NULL; + } + if(MESA_load_profile_string_nodef(profile_path, section, "REDIS_CLUSTER_PORT_RANGE", redis_ports, 256) < 0) + { + MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_PORT_RANGE not found.", profile_path, section); + return NULL; + } + if(build_redis_cluster_addrs(redis_cluster_ip, redis_ports, param->redisaddrs, 4096, runtime_log)) { - MESA_HANDLE_RUNTIME_LOGV2(runtime_log, RLOG_LV_FATAL, "Load config %s [%s] REDIS_CLUSTER_ADDRS not found.", profile_path, section); return NULL; } MESA_load_profile_uint_def(profile_path, section, "REDIS_CACHE_OBJECT_SIZE", ¶m->redis_object_maxsize, 10240); @@ -986,6 +1171,19 @@ struct tango_cache_parameter *tango_cache_parameter_new(const char* profile_path return NULL; } } + + //FieldStat LOG + MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_APPNAME", param->fsstat_appname, 16, "TANGO_CACHE"); + MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_FILEPATH", param->fsstat_filepath, 256, "./log/tangocache_fsstat.log"); + MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_INTERVAL", ¶m->fsstat_period, 10); + MESA_load_profile_uint_def(profile_path, section, "LOG_FSSTAT_TRIG", ¶m->fsstatid_trig, 0); + MESA_load_profile_string_def(profile_path, section, "LOG_FSSTAT_DST_IP", param->fsstat_dst_ip, 64, "10.172.128.2"); + MESA_load_profile_int_def(profile_path, section, "LOG_FSSTAT_DST_PORT", ¶m->fsstat_dst_port, 8125); + MESA_load_profile_string_nodef(profile_path, section, "LOG_FSSTAT_HISTBINS", param->fsstat_histlen, 256); + if(param->fsstatid_trig && register_field_stat(param, runtime_log)) + { + return NULL; + } return param; } @@ -993,6 +1191,8 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet { struct tango_cache_instance *instance; char *redis_sep, *save_ptr=NULL; + struct timeval tv; + time_t now, remain; instance = (struct tango_cache_instance *)malloc(sizeof(struct tango_cache_instance)); memset(instance, 0, sizeof(struct tango_cache_instance)); @@ -1021,6 +1221,16 @@ struct tango_cache_instance *tango_cache_instance_new(struct tango_cache_paramet sprintf(instance->redisaddr, "%s", redis_sep); } evtimer_assign(&instance->timer_event, evbase, libevent_timer_event_cb, instance); + + if(param->fsstatid_trig) + { + evtimer_assign(&instance->timer_statistic, evbase, instance_statistic_timer_cb, instance); + now = time(NULL); + remain = instance->param->fsstat_period - (now % instance->param->fsstat_period); + tv.tv_sec = remain; + tv.tv_usec = 0; + evtimer_add(&instance->timer_statistic, &tv); + } return instance; } diff --git a/cache/src/tango_cache_client_in.h b/cache/src/tango_cache_client_in.h index fe679fb..f457fed 100644 --- a/cache/src/tango_cache_client_in.h +++ b/cache/src/tango_cache_client_in.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -11,6 +12,7 @@ #include #include +#include #include "tango_cache_client.h" #define RESPONSE_HDR_EXPIRES 1 @@ -21,6 +23,36 @@ #define CACHE_META_REDIS 1 //元信息在REDIS对象在MINIO #define CACHE_SMALL_REDIS 2 //元信息和小文件在REDIS,大文件在MINIO +enum FIELD_STAT_FILEDS +{ + FS_FILED_GET_RECV=0, + FS_FILED_GET_S_TOTAL, + FS_FILED_GET_S_HTTP, + FS_FILED_GET_S_REDIS, + FS_FILED_GET_MISS, + FS_FILED_GET_E_TOTAL, + FS_FILED_GET_E_HTTP, + FS_FILED_GET_E_REDIS, + FS_FILED_PUT_RECV, + FS_FILED_PUT_S_TOTAL, + FS_FILED_PUT_S_HTTP, + FS_FILED_PUT_S_REDIS, + FS_FILED_PUT_E_TOTAL, + FS_FILED_PUT_E_HTTP, + FS_FILED_PUT_E_REDIS, + FS_FILED_DEL_RECV, + FS_FILED_DEL_SUCC, + FS_FILED_DEL_ERROR, + FS_FILED_TOTAL_DROP, + + //Next use Status + FS_FILED_MEM_USED, + FS_FILED_SESS_HTTP, + FS_FILED_SESS_REDIS, + + FS_FILED_NUM, +}; + enum CACHE_REQUEST_METHOD { CACHE_REQUEST_GET=0, @@ -88,12 +120,25 @@ struct tango_cache_parameter struct wiredlb_parameter minio; char redisaddrs[4096]; u_int32_t redis_object_maxsize;//小文件存在redis时,对象的最大大小 + + //FieldStatLog + int32_t fsstat_dst_port; + char fsstat_dst_ip[64]; + char fsstat_appname[16]; + char fsstat_filepath[256]; + u_int32_t fsstat_period; + u_int32_t fsstatid_trig; + char fsstat_histlen[256]; + screen_stat_handle_t fsstat_handle; + int32_t fsstat_histlen_id; + int32_t fsstat_field_ids[FS_FILED_NUM]; }; struct tango_cache_instance { struct event_base* evbase; struct event timer_event; + struct event timer_statistic; CURLM *multi_hd; enum CACHE_ERR_CODE error_code; @@ -104,6 +149,7 @@ struct tango_cache_instance const struct tango_cache_parameter *param; void *runtime_log; struct cache_statistics statistic; + struct cache_statistics statistic_last; //用于多个instance使用同一个fieldstat累加 }; struct multipart_etag_list @@ -137,6 +183,7 @@ struct cache_ctx_data_put char *combine_xml; TAILQ_HEAD(__etag_list_head, multipart_etag_list) etag_head; cJSON *object_meta; + struct easy_string once_request; //一次性PUT时存储了数据,失败的时候便于清理,不能复用其他结构 enum PUT_OBJECT_STATE state; u_int32_t part_index; //宏RESPONSE_HDR_ u_int32_t object_ttl; diff --git a/cache/src/tango_cache_transfer.cpp b/cache/src/tango_cache_transfer.cpp index 183eb2d..7afa18c 100644 --- a/cache/src/tango_cache_transfer.cpp +++ b/cache/src/tango_cache_transfer.cpp @@ -65,24 +65,24 @@ static size_t curl_put_once_send_cb(void *ptr, size_t size, size_t count, void * size_t len; struct tango_cache_ctx *ctx = (struct tango_cache_ctx *)userp; - if(size==0 || count==0 || ctx->response.len>=ctx->response.size) + if(size==0 || count==0 || ctx->put.once_request.len>=ctx->put.once_request.size) { return 0; //不一定调用 } - len = ctx->response.size - ctx->response.len; //剩余待上传的长度 + len = ctx->put.once_request.size - ctx->put.once_request.len; //剩余待上传的长度 if(len > size * count) { len = size * count; } - memcpy(ptr, ctx->response.buff + ctx->response.len, len); - ctx->response.len += len; + memcpy(ptr, ctx->put.once_request.buff + ctx->put.once_request.len, len); + ctx->put.once_request.len += len; - if(ctx->response.len >= ctx->response.size) + if(ctx->put.once_request.len >= ctx->put.once_request.size) { - ctx->instance->statistic.memory_used -= ctx->response.size; //未使用cache buffer,自己计算内存增减 - easy_string_destroy(&ctx->response); + ctx->instance->statistic.memory_used -= ctx->put.once_request.size; //未使用cache buffer,自己计算内存增减 + easy_string_destroy(&ctx->put.once_request); } return len; } @@ -532,17 +532,17 @@ int http_put_complete_part_data(struct tango_cache_ctx *ctx, enum PUT_MEMORY_COP if(way == PUT_MEM_COPY) { - ctx->response.buff = (char *)malloc(size); - memcpy(ctx->response.buff, data, size); + ctx->put.once_request.buff = (char *)malloc(size); + memcpy(ctx->put.once_request.buff, data, size); } else { - ctx->response.buff = (char *)data; + ctx->put.once_request.buff = (char *)data; } - ctx->response.size = size; - ctx->response.len = 0; + ctx->put.once_request.size = size; + ctx->put.once_request.len = 0; curl_easy_setopt(ctx->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->response.size); + curl_easy_setopt(ctx->curl, CURLOPT_INFILESIZE, ctx->put.once_request.size); curl_easy_setopt(ctx->curl, CURLOPT_READFUNCTION, curl_put_once_send_cb); curl_easy_setopt(ctx->curl, CURLOPT_READDATA, ctx); diff --git a/cache/test/cache_evbase_benchmark.cpp b/cache/test/cache_evbase_benchmark.cpp index e3744f6..c5ce437 100644 --- a/cache/test/cache_evbase_benchmark.cpp +++ b/cache/test/cache_evbase_benchmark.cpp @@ -394,6 +394,7 @@ int main(int argc, char **argv) pthread_attr_t attr; void *runtime_log; struct filecontentcmd filecmd; + time_t now, remain; struct event ev_timer; struct timeval tv; @@ -431,7 +432,9 @@ int main(int argc, char **argv) } ev_base = event_base_new(); - tv.tv_sec = 10; + now = time(NULL); + remain = 10 - (now % 10); + tv.tv_sec = remain; tv.tv_usec = 0; evtimer_assign(&ev_timer, ev_base, timer_cb, &ev_timer); evtimer_add(&ev_timer, &tv); diff --git a/cache/test/lib/libtango_cache_client.a b/cache/test/lib/libtango_cache_client.a new file mode 100644 index 0000000..e395ce6 Binary files /dev/null and b/cache/test/lib/libtango_cache_client.a differ diff --git a/cache/test/pangu_tg_cahce.conf b/cache/test/pangu_tg_cahce.conf index e3ef66c..21ac1f8 100644 --- a/cache/test/pangu_tg_cahce.conf +++ b/cache/test/pangu_tg_cahce.conf @@ -1,35 +1,42 @@ [TANGO_CACHE] #Addresses of minio. Format is defined by WiredLB. -MINIO_IP_LIST=10.3.35.1; -MINIO_LISTEN_PORT=9000 +minio_ip_list=10.3.35.60-61; +minio_listen_port=9000 #Maximum number of connections opened by per host. -#MAX_CONNECTION_PER_HOST=1 +#max_connection_per_host=1 #Maximum number of requests in a pipeline. -#MAX_CNNT_PIPELINE_NUM=20 +#max_cnnt_pipeline_num=20 #Maximum parellel sessions(http and redis) is allowed to open. -#MAX_CURL_SESSION_NUM=100 +#max_curl_session_num=100 #Maximum time the request is allowed to take(seconds). -#MAX_CURL_TRANSFER_TIMEOUT_S=0 +#max_curl_transfer_timeout_s=0 #Bucket name in minio. -CACHE_BUCKET_NAME=openbucket +cache_bucket_name=openbucket #Maximum size of memory used by tango_cache_client. Upload will fail if the current size of memory used exceeds this value. -MAX_USED_MEMORY_SIZE_MB=5120 +max_used_memory_size_mb=5120 #Default TTL of objects, i.e. the time after which the object will expire(minumun 60s, i.e. 1 minute). -CACHE_DEFAULT_TTL_SECOND=3600 +cache_default_ttl_second=3600 #Whether to hash the object key before cache actions. GET/PUT may be faster if you open it. -CACHE_OBJECT_KEY_HASH_SWITCH=1 +cache_object_key_hash_switch=1 #Store way: 0-MINIO; 1-META in REDIS, object in minio; 2-META and small object in Redis, large object in minio; -CACHE_STORE_OBJECT_WAY=2 -#If CACHE_STORE_OBJECT_WAY is 2 and the size of a object is not bigger than this value, object will be stored in redis. -REDIS_CACHE_OBJECT_SIZE=20480 -#If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object. -REDIS_CLUSTER_ADDRS=10.4.35.33:9001,10.4.35.34:9001 +cache_store_object_way=2 +#If cache_store_object_way is 2 and the size of a object is not bigger than this value, object will be stored in redis. +redis_cache_object_size=20480 +#If cache_store_object_way is not 0, we will use redis to store meta and object. +redis_cluster_ip_list=10.4.35.33-34; +redis_cluster_port_range=9001-9016; #Configs of WiredLB for Minios load balancer. -#WIREDLB_OVERRIDE=1 -#WIREDLB_TOPIC= -#WIREDLB_DATACENTER= -WIREDLB_HEALTH_PORT=52101 -#WIREDLB_GROUP= +#wiredlb_override=1 +#wiredlb_topic= +#wiredlb_datacenter= +wiredlb_health_port=52102 +#wiredlb_group= +log_fsstat_appname=TANGO_CACHE +log_fsstat_filepath=./field_stat.log +log_fsstat_interval=10 +log_fsstat_trig=1 +log_fsstat_dst_ip=127.0.0.1 +log_fsstat_dst_port=8125 diff --git a/cache/test/tango_cache_test.cpp b/cache/test/tango_cache_test.cpp index fb7f618..c00bd8c 100644 --- a/cache/test/tango_cache_test.cpp +++ b/cache/test/tango_cache_test.cpp @@ -329,7 +329,7 @@ static void dummy_accept_callback(evutil_socket_t fd, short events, void *arg) tango_cache_update_frag_data(ctx, buffer, n); } fclose(fp); - if(tango_cache_update_end(ctx, pdata->filename, 256)) + if(tango_cache_update_end(ctx, pdata->filename, 256) < 0) { put_future_failed(FUTURE_ERROR_CANCEL, "", pdata); }