future promise支持输出到statsd。
This commit is contained in:
@@ -15,7 +15,7 @@ typedef void (future_success_cb)(future_result_t* result, void * user);
|
|||||||
typedef void (future_failed_cb)(enum e_future_error err, const char * what, void * user);
|
typedef void (future_failed_cb)(enum e_future_error err, const char * what, void * user);
|
||||||
typedef void (promise_ctx_destroy_cb)(void* ctx);
|
typedef void (promise_ctx_destroy_cb)(void* ctx);
|
||||||
|
|
||||||
void future_promise_library_init(void);
|
void future_promise_library_init(const char* profile);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Future APIs are used by Async RPC caller.
|
* Future APIs are used by Async RPC caller.
|
||||||
|
|||||||
@@ -8,15 +8,20 @@
|
|||||||
#include <tfe_utils.h>
|
#include <tfe_utils.h>
|
||||||
#include <MESA/MESA_htable.h>
|
#include <MESA/MESA_htable.h>
|
||||||
#include <MESA/field_stat2.h>
|
#include <MESA/field_stat2.h>
|
||||||
|
#include <MESA/MESA_prof_load.h>
|
||||||
|
|
||||||
|
|
||||||
const char* FP_HISTOGRAM_BINS="0.5,0.8,0.9,0.95";
|
static const char* FP_HISTOGRAM_BINS="0.50,0.80,0.9,0.95,0.99";
|
||||||
|
|
||||||
struct future_promise_instance
|
struct future_promise_instance
|
||||||
{
|
{
|
||||||
int fsid_f_num;
|
int fsid_f_num;
|
||||||
long long f_num;
|
long long f_num;
|
||||||
|
int no_stats;
|
||||||
MESA_htable_handle name_table;
|
MESA_htable_handle name_table;
|
||||||
|
char statsd_server_ip[256];
|
||||||
|
int statsd_server_port;
|
||||||
|
char histogram_bins[256];
|
||||||
screen_stat_handle_t fs_handle;
|
screen_stat_handle_t fs_handle;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -46,7 +51,7 @@ struct promise
|
|||||||
};
|
};
|
||||||
static struct future_promise_instance g_FP_instance;
|
static struct future_promise_instance g_FP_instance;
|
||||||
static int g_is_FP_init=0;
|
static int g_is_FP_init=0;
|
||||||
void future_promise_library_init(void)
|
void future_promise_library_init(const char* profile)
|
||||||
{
|
{
|
||||||
if(g_is_FP_init==1)
|
if(g_is_FP_init==1)
|
||||||
{
|
{
|
||||||
@@ -55,6 +60,25 @@ void future_promise_library_init(void)
|
|||||||
|
|
||||||
int value=0;
|
int value=0;
|
||||||
memset(&g_FP_instance,0,sizeof(g_FP_instance));
|
memset(&g_FP_instance,0,sizeof(g_FP_instance));
|
||||||
|
strcpy(g_FP_instance.histogram_bins, FP_HISTOGRAM_BINS);
|
||||||
|
if(profile!=NULL)
|
||||||
|
{
|
||||||
|
MESA_load_profile_int_def(profile, "STAT", "no_stats", &(g_FP_instance.no_stats), 0);
|
||||||
|
MESA_load_profile_string_def(profile, "STAT", "statsd_server",
|
||||||
|
g_FP_instance.statsd_server_ip, sizeof(g_FP_instance.statsd_server_ip), "");
|
||||||
|
MESA_load_profile_int_def(profile, "STAT", "statsd_port", &(g_FP_instance.statsd_server_port), 0);
|
||||||
|
MESA_load_profile_string_def(profile, "STAT", "histogram_bins",
|
||||||
|
g_FP_instance.histogram_bins, sizeof(g_FP_instance.histogram_bins), FP_HISTOGRAM_BINS);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
if(g_FP_instance.no_stats)
|
||||||
|
{
|
||||||
|
g_is_FP_init=1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
MESA_htable_handle htable = MESA_htable_born();
|
MESA_htable_handle htable = MESA_htable_born();
|
||||||
value=0;
|
value=0;
|
||||||
MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL,&value,sizeof(value));
|
MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL,&value,sizeof(value));
|
||||||
@@ -81,7 +105,12 @@ void future_promise_library_init(void)
|
|||||||
FS_set_para(fs, CREATE_THREAD, &value, sizeof(value));
|
FS_set_para(fs, CREATE_THREAD, &value, sizeof(value));
|
||||||
value=2;
|
value=2;
|
||||||
FS_set_para(fs, STAT_CYCLE, &value, sizeof(value));
|
FS_set_para(fs, STAT_CYCLE, &value, sizeof(value));
|
||||||
FS_set_para(fs, HISTOGRAM_GLOBAL_BINS, FP_HISTOGRAM_BINS, strlen(FP_HISTOGRAM_BINS)+1);
|
if(strlen(g_FP_instance.statsd_server_ip)>0 && g_FP_instance.statsd_server_port!=0)
|
||||||
|
{
|
||||||
|
FS_set_para(fs, STATS_SERVER_IP, g_FP_instance.statsd_server_ip, strlen(g_FP_instance.statsd_server_ip)+1);
|
||||||
|
FS_set_para(fs, STATS_SERVER_PORT, &(g_FP_instance.statsd_server_port), sizeof(g_FP_instance.statsd_server_port));
|
||||||
|
}
|
||||||
|
FS_set_para(fs, HISTOGRAM_GLOBAL_BINS, g_FP_instance.histogram_bins, strlen(g_FP_instance.histogram_bins)+1);
|
||||||
g_FP_instance.fsid_f_num=FS_register(fs, FS_STYLE_FIELD, FS_CALC_CURRENT, "futures");
|
g_FP_instance.fsid_f_num=FS_register(fs, FS_STYLE_FIELD, FS_CALC_CURRENT, "futures");
|
||||||
FS_start(fs);
|
FS_start(fs);
|
||||||
g_FP_instance.fs_handle=fs;
|
g_FP_instance.fs_handle=fs;
|
||||||
@@ -110,7 +139,7 @@ static long field_get_set_cb(void * data, const uchar * key, uint size, void * u
|
|||||||
{
|
{
|
||||||
field_id=(int*)malloc(sizeof(int)*2);
|
field_id=(int*)malloc(sizeof(int)*2);
|
||||||
snprintf(buff,sizeof(buff),"%s(ms)",(char*)key);
|
snprintf(buff,sizeof(buff),"%s(ms)",(char*)key);
|
||||||
field_id[0]=FS_register_histogram(args->fs_handle, FS_CALC_SPEED, buff, 1, 5*1000, 2);//1ms~5s
|
field_id[0]=FS_register(args->fs_handle, FS_STYLE_HISTOGRAM, FS_CALC_CURRENT, buff);
|
||||||
args->fsid_latency=field_id[0];
|
args->fsid_latency=field_id[0];
|
||||||
snprintf(buff,sizeof(buff),"%s%s",(char*)key,fail_str);
|
snprintf(buff,sizeof(buff),"%s%s",(char*)key,fail_str);
|
||||||
field_id[1]=FS_register(args->fs_handle, FS_STYLE_FIELD, FS_CALC_SPEED,buff);
|
field_id[1]=FS_register(args->fs_handle, FS_STYLE_FIELD, FS_CALC_SPEED,buff);
|
||||||
@@ -134,14 +163,16 @@ struct future * future_create(const char* symbol, future_success_cb * cb_success
|
|||||||
p->f.cb_success = cb_success;
|
p->f.cb_success = cb_success;
|
||||||
p->f.cb_failed = cb_failed;
|
p->f.cb_failed = cb_failed;
|
||||||
strncpy(p->f.symbol,symbol,sizeof(p->f.symbol));
|
strncpy(p->f.symbol,symbol,sizeof(p->f.symbol));
|
||||||
|
if(!g_FP_instance.no_stats)
|
||||||
clock_gettime(CLOCK_MONOTONIC,&p->debug.create_time);
|
{
|
||||||
long cb_ret=0;
|
clock_gettime(CLOCK_MONOTONIC,&p->debug.create_time);
|
||||||
struct field_get_set_args args={.htable = g_FP_instance.name_table, .fs_handle = g_FP_instance.fs_handle};
|
long cb_ret=0;
|
||||||
MESA_htable_search_cb(g_FP_instance.name_table, (const unsigned char*)symbol, strlen(symbol), field_get_set_cb, &args, &cb_ret);
|
struct field_get_set_args args={.htable = g_FP_instance.name_table, .fs_handle = g_FP_instance.fs_handle};
|
||||||
p->debug.fsid_latency=args.fsid_latency;
|
MESA_htable_search_cb(g_FP_instance.name_table, (const unsigned char*)symbol, strlen(symbol), field_get_set_cb, &args, &cb_ret);
|
||||||
p->debug.fsid_failed=args.fsid_failed;
|
p->debug.fsid_latency=args.fsid_latency;
|
||||||
FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_ADD, 1);
|
p->debug.fsid_failed=args.fsid_failed;
|
||||||
|
FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_ADD, 1);
|
||||||
|
}
|
||||||
return &p->f;
|
return &p->f;
|
||||||
}
|
}
|
||||||
void future_set_timeout(struct future * f, struct timeval timeout)
|
void future_set_timeout(struct future * f, struct timeval timeout)
|
||||||
@@ -157,8 +188,8 @@ void future_destroy(struct future * f)
|
|||||||
if (p->cb_ctx_destroy != NULL)
|
if (p->cb_ctx_destroy != NULL)
|
||||||
{
|
{
|
||||||
p->cb_ctx_destroy(p->ctx);
|
p->cb_ctx_destroy(p->ctx);
|
||||||
}
|
}
|
||||||
FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
|
if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
|
||||||
memset(p, 0, sizeof(struct promise));
|
memset(p, 0, sizeof(struct promise));
|
||||||
free(p);
|
free(p);
|
||||||
}
|
}
|
||||||
@@ -184,14 +215,14 @@ static void fp_stat_latency(struct _future_promise_debug* debug, int is_success)
|
|||||||
}
|
}
|
||||||
void promise_failed(struct promise * p, enum e_future_error error, const char * what)
|
void promise_failed(struct promise * p, enum e_future_error error, const char * what)
|
||||||
{
|
{
|
||||||
fp_stat_latency(&p->debug, 0);
|
if(!g_FP_instance.no_stats) fp_stat_latency(&p->debug, 0);
|
||||||
p->f.cb_failed(error, what, p->f.user);
|
p->f.cb_failed(error, what, p->f.user);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void promise_success(struct promise * p, void * result)
|
void promise_success(struct promise * p, void * result)
|
||||||
{
|
{
|
||||||
fp_stat_latency(&p->debug, 1);
|
if(!g_FP_instance.no_stats) fp_stat_latency(&p->debug, 1);
|
||||||
p->f.cb_success(result, p->f.user);
|
p->f.cb_success(result, p->f.user);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
5
conf/tfe/future.conf
Normal file
5
conf/tfe/future.conf
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
[STAT]
|
||||||
|
no_stats=0
|
||||||
|
statsd_server=192.168.10.72
|
||||||
|
statsd_port=8092
|
||||||
|
histogram_bins=0.50,0.80,0.9,0.95
|
||||||
@@ -303,7 +303,7 @@ int tfe_stat_init(struct tfe_proxy * proxy, const char * profile)
|
|||||||
int main(int argc, char * argv[])
|
int main(int argc, char * argv[])
|
||||||
{
|
{
|
||||||
const char * main_profile = "./conf/tfe/tfe.conf";
|
const char * main_profile = "./conf/tfe/tfe.conf";
|
||||||
|
const char * future_profile= "./conf/future.conf";
|
||||||
unsigned int __log_level = RLOG_LV_INFO;
|
unsigned int __log_level = RLOG_LV_INFO;
|
||||||
MESA_load_profile_uint_def(main_profile, "log", "level", &__log_level, RLOG_LV_INFO);
|
MESA_load_profile_uint_def(main_profile, "log", "level", &__log_level, RLOG_LV_INFO);
|
||||||
|
|
||||||
@@ -317,7 +317,7 @@ int main(int argc, char * argv[])
|
|||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
future_promise_library_init();
|
future_promise_library_init(future_profile);
|
||||||
tango_cache_global_init();
|
tango_cache_global_init();
|
||||||
|
|
||||||
/* PROXY INSTANCE */
|
/* PROXY INSTANCE */
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ struct key_keeper_ctx{
|
|||||||
int main()
|
int main()
|
||||||
{
|
{
|
||||||
void* logger = NULL;
|
void* logger = NULL;
|
||||||
future_promise_library_init();
|
future_promise_library_init(NULL);
|
||||||
//struct event_base* evbase = event_base_new();
|
//struct event_base* evbase = event_base_new();
|
||||||
struct key_keeper * keeper = key_keeper_init("./conf/tfe.conf", "key_keeper", logger);
|
struct key_keeper * keeper = key_keeper_init("./conf/tfe.conf", "key_keeper", logger);
|
||||||
struct key_keeper_ctx* ctx = (struct key_keeper_ctx*)malloc(sizeof(struct key_keeper_ctx));
|
struct key_keeper_ctx* ctx = (struct key_keeper_ctx*)malloc(sizeof(struct key_keeper_ctx));
|
||||||
|
|||||||
@@ -225,7 +225,7 @@ int main()
|
|||||||
{
|
{
|
||||||
char cert_store_host[TFE_STRING_MAX];
|
char cert_store_host[TFE_STRING_MAX];
|
||||||
unsigned int cert_store_port;
|
unsigned int cert_store_port;
|
||||||
future_promise_library_init();
|
future_promise_library_init(NULL);
|
||||||
//const char* file_path = "./log/test_tfe_rpc.log",*host="localhost";
|
//const char* file_path = "./log/test_tfe_rpc.log",*host="localhost";
|
||||||
//void * logger = MESA_create_runtime_log_handle(file_path, RLOG_LV_INFO);
|
//void * logger = MESA_create_runtime_log_handle(file_path, RLOG_LV_INFO);
|
||||||
const char* profile = "./conf/tfe.conf";
|
const char* profile = "./conf/tfe.conf";
|
||||||
|
|||||||
Reference in New Issue
Block a user