diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index 8827463..ae7cc8e 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -331,6 +331,7 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void cJSON *sub; int64_t new_version; + evtimer_del(&instance->ctx.timer_expires); if(res!=CURLE_OK) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", @@ -397,6 +398,7 @@ static void doris_http_fetch_meta(struct doris_csum_instance *instance) u_int64_t balance_seed; struct doris_http_callback curlcbs; char metauri[128], cur_version[128]; + struct timeval tv={10, 0}; balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); @@ -436,6 +438,7 @@ static void doris_http_fetch_meta(struct doris_csum_instance *instance) { instance->status = FETCH_STATUS_META; instance->statistic.field[DRS_FS_FILED_REQ_META] += 1; + evtimer_add(&instance->ctx.timer_expires, &tv); } else { @@ -472,6 +475,16 @@ static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp) } } +/*https模式下,使用valgrind运行,发现发起GET请求后,done_cb函数始终无法得到调用*/ +static void instance_meta_expire_timer_cb(int fd, short kind, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + doris_confile_ctx_destry(&instance->ctx); + doris_http_fetch_meta(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); +} + static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) { struct doris_csum_param *param=(struct doris_csum_param *)userp; @@ -688,6 +701,7 @@ struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *par { instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); pthread_mutex_lock(¶m->mutex_lock); param->references++; diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index a236135..4ef51d9 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -64,6 +64,7 @@ struct doris_confile_ctx { struct doris_http_ctx *httpctx; char server[64]; + struct event timer_expires; MD5_CTX md5ctx; long res_code; diff --git a/server/doris_server_http.cpp b/server/doris_server_http.cpp index f8bdb4e..fa9f5f7 100644 --- a/server/doris_server_http.cpp +++ b/server/doris_server_http.cpp @@ -59,7 +59,7 @@ static int32_t check_producer_ready_sync(struct doris_business *business, struct /*request from sync client, check http posts-on-the-way first*/ if(business->posts_on_the_way) { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "HttpProducer, posts-on-the-way: %d, meta response 300", business->posts_on_the_way); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "HttpProducer, business: %s, posts-on-the-way: %d, meta response 300", business->bizname, business->posts_on_the_way); return 300; } @@ -68,7 +68,7 @@ static int32_t check_producer_ready_sync(struct doris_business *business, struct if((clientversion=atol(client_version)) > cur_version) { business_set_sync_peer_abnormal(business); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "HttpProducer, client version(%lu) is newer than server(%lu)", clientversion, cur_version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "HttpProducer, business: %s, client version(%lu) is newer than server(%lu)", business->bizname, clientversion, cur_version); return 300; } diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index 2bc10f4..97cefe6 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -24,7 +24,7 @@ #include "doris_server_http.h" struct doris_global_info g_doris_server_info; -static unsigned long doris_version_20210825=20210825L; +static unsigned long doris_version_20210826=20210826L; int doris_mkdir_according_path(const char * path) { @@ -434,7 +434,7 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_version_20210825); + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_version_20210826); if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) { printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);