通过增加promise_finish函数,实现future的cancel。

This commit is contained in:
zhengchao
2018-11-23 20:15:01 +08:00
parent 6cd2b8186b
commit 5d20a52552
3 changed files with 54 additions and 10 deletions

View File

@@ -31,6 +31,9 @@ void future_destroy(struct future * f);
struct promise * future_to_promise(struct future * f);
void promise_failed(struct promise * p, enum e_future_error error, const char * what);
void promise_success(struct promise * p, void * result);
void promise_finish(struct promise * p);
void promise_allow_many_successes(struct promise *p);
void promise_set_ctx(struct promise * p, void * ctx, promise_ctx_destroy_cb * cb);
void * promise_get_ctx(struct promise * p);
void * promise_dettach_ctx(struct promise * p);

View File

@@ -45,7 +45,9 @@ struct promise
{
struct future f;
void * ctx;
int has_timeout;
char has_timeout;
char ref_cnt;
char may_success_many_times;
promise_ctx_destroy_cb * cb_ctx_destroy;
struct _future_promise_debug debug;
};
@@ -117,11 +119,34 @@ void future_promise_library_init(const char* profile)
g_is_FP_init=1;
return;
}
static struct promise * __future_to_promise(struct future * f)
{
return (struct promise *) f;
}
static void __promise_destroy(struct promise *p)
{
if (p->cb_ctx_destroy != NULL)
{
p->cb_ctx_destroy(p->ctx);
}
if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
memset(p, 0, sizeof(struct promise));
free(p);
return;
}
struct promise * future_to_promise(struct future * f)
{
struct promise *p=__future_to_promise(f);
p->ref_cnt++;
assert(p->ref_cnt==2);
return (struct promise *) f;
}
void promise_allow_many_successes(struct promise *p)
{
p->may_success_many_times=1;
return;
}
struct field_get_set_args
{
MESA_htable_handle htable;
@@ -165,6 +190,7 @@ struct future * future_create(const char* symbol, future_success_cb * cb_success
p->f.user = user;
p->f.cb_success = cb_success;
p->f.cb_failed = cb_failed;
p->ref_cnt=1;
strncpy(p->f.symbol,symbol,sizeof(p->f.symbol));
if(!g_FP_instance.no_stats)
{
@@ -185,16 +211,23 @@ void future_set_timeout(struct future * f, struct timeval timeout)
p->has_timeout=1;
return;
}
void future_destroy(struct future * f)
{
struct promise * p = future_to_promise(f);
if (p->cb_ctx_destroy != NULL)
struct promise * p = __future_to_promise(f);
p->ref_cnt--;
if(p->ref_cnt==0)
{
p->cb_ctx_destroy(p->ctx);
__promise_destroy(p);
}
}
void promise_finish(struct promise * p)
{
p->ref_cnt--;
if(p->ref_cnt==0)
{
__promise_destroy(p);
}
if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
memset(p, 0, sizeof(struct promise));
free(p);
}
static void fp_stat_latency(struct _future_promise_debug* debug, int is_success)
{
@@ -220,6 +253,10 @@ void promise_failed(struct promise * p, enum e_future_error error, const char *
{
if(!g_FP_instance.no_stats) fp_stat_latency(&p->debug, 0);
p->f.cb_failed(error, what, p->f.user);
if(!p->may_success_many_times)
{
promise_finish(p);
}
return;
}
@@ -227,6 +264,10 @@ void promise_success(struct promise * p, void * result)
{
if(!g_FP_instance.no_stats) fp_stat_latency(&p->debug, 1);
p->f.cb_success(result, p->f.user);
if(!p->may_success_many_times)
{
promise_finish(p);
}
return;
}

View File

@@ -707,10 +707,10 @@ failed:
return;
}
static void ssl_async_peek_client_hello(struct future * future, evutil_socket_t fd, struct event_base * evbase,
static void ssl_async_peek_client_hello(struct future * f, evutil_socket_t fd, struct event_base * evbase,
void * logger)
{
struct promise * p = future_to_promise(future);
struct promise * p = future_to_promise(f);
struct peek_client_hello_ctx * ctx = ALLOC(struct peek_client_hello_ctx, 1);
ctx->ev = event_new(evbase, fd, EV_READ, peek_client_hello_cb, p);
ctx->logger = logger;