diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index 28318c6..85f6857 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -137,8 +137,8 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code instance->retry_times = 0; if(instance->curmeta.curoffset == 0) { - instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name, - instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata); + instance->cbs.cfgfile_start(instance, instance->curmeta.table_name, + instance->curmeta.size, instance->curmeta.cfg_num, instance->cbs.userdata); MD5_Init(&instance->ctx.md5ctx); } } @@ -180,7 +180,7 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo return; } - instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata); + instance->cbs.cfgfile_update(instance, ptr, bytes, instance->cbs.userdata); MD5_Update(&instance->ctx.md5ctx, ptr, bytes); instance->curmeta.curoffset += bytes; instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; @@ -228,14 +228,14 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo } else { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s", + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.%010lu over, md5: %s", instance->curmeta.table_name, instance->req_version, md5buffer); } instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; - instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata); + instance->cbs.cfgfile_finish(instance, md5buffer, instance->cbs.userdata); if(instance->array_index == instance->array_size) { - instance->param->cbs.version_finish(instance, instance->param->cbs.userdata); + instance->cbs.version_finish(instance, instance->cbs.userdata); instance->status = FETCH_STATUS_META; doris_update_new_version(instance); cJSON_Delete(instance->meta); @@ -268,7 +268,7 @@ out_md5: if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail) { instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1; - instance->param->cbs.version_error(instance, instance->param->cbs.userdata); + instance->cbs.version_error(instance, instance->cbs.userdata); instance->retry_times = 0; instance->status = FETCH_STATUS_META; cJSON_Delete(instance->meta); @@ -296,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) doris_http_ctx_add_header(instance->ctx.httpctx, range); } - snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid); + snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.table_name, instance->req_version, instance->args.bizname); if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; @@ -371,7 +371,7 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s", instance->cur_version, instance->estr.buff); - instance->param->cbs.version_start(instance, instance->meta, instance->param->cbs.userdata); + instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata); instance->array = cJSON_GetObjectItem(instance->meta, "configs"); instance->array_size = cJSON_GetArraySize(instance->array); assert(instance->array_size > 0); @@ -420,7 +420,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance) instance->req_version = instance->cur_version + 1; //只有版本更新成功后,cur_version才会更新 if(instance->ctx.httpctx != NULL) { - snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid); + snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname); if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->status = FETCH_STATUS_META; @@ -530,15 +530,12 @@ static int doris_client_register_field_stat(struct doris_parameter *param, void return 0; } -struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs, - struct doris_arguments *args, void *runtimelog) +struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog) { struct doris_parameter *param; param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter)); param->manage_evbase = manage_evbase; - param->cbs = *cbs; - param->args= *args; MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", ¶m->retry_interval, 10); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", ¶m->fetch_frag_size, 5242880); @@ -599,7 +596,8 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp) event_add(&instance->timer_statistic, &tv); } -struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog) +struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, + struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog) { struct doris_instance *instance; struct timeval tv; @@ -608,7 +606,9 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct instance->param = param; instance->worker_evbase = worker_evbase; instance->runtime_log = runtimelog; - instance->cur_version = param->args.current_version; + instance->cbs = *cbs; + instance->args= *args; + instance->cur_version = args->current_version; instance->req_version = instance->cur_version + 1; //TODO instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog); diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 7125540..66afbdf 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -23,9 +23,6 @@ enum FETCH_CFG_STATUS struct doris_parameter { - struct doris_callbacks cbs; - struct doris_arguments args; - u_int32_t retry_interval; u_int32_t fetch_frag_size; u_int32_t fetch_max_tries; @@ -82,6 +79,9 @@ struct doris_confile_ctx struct doris_instance { + struct doris_callbacks cbs; + struct doris_arguments args; + enum FETCH_CFG_STATUS status; u_int32_t retry_times; diff --git a/include/doris_client.h b/include/doris_client.h index b994e11..99b2a3e 100644 --- a/include/doris_client.h +++ b/include/doris_client.h @@ -45,8 +45,8 @@ struct doris_statistics struct doris_arguments { + char bizname[32]; int64_t current_version; //当前已获取完毕的最新版本号,将从它下一个版本取配置 - int32_t businessid; int32_t judian_id; }; @@ -62,9 +62,9 @@ struct doris_callbacks void (*version_finish)(struct doris_instance *instance, void *userdata); }; -struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs, - struct doris_arguments *args, void *runtimelog); -struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog); +struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog); +struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, + struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog); #endif diff --git a/server/bin/conf/doris_client.conf b/server/bin/conf/doris_client.conf new file mode 100644 index 0000000..062aeeb --- /dev/null +++ b/server/bin/conf/doris_client.conf @@ -0,0 +1,35 @@ +[DORIS_CLIENT] +fetch_fail_retry_interval=5 +fetch_fragmet_size=5242880 +fetch_confile_max_tries=3 + +fsstat_log_appname=DorisClient +fsstat_log_filepath=./log/doris_client.fs +fsstat_log_interval=2 +fsstat_log_print_mode=1 +fsstat_log_dst_ip=192.168.10.90 +fsstat_log_dst_port=8125 + +[DORIS_CLIENT.master_server] +max_connection_per_host=1 +max_cnnt_pipeline_num=10 +https_connection_on=0 +max_curl_session_num=10 + +http_server_listen_port=9897 +http_server_manage_port=9897 +http_server_ip_list=192.168.10.8 + +[DORIS_CLIENT.backup1_server] +max_connection_per_host=1 +max_cnnt_pipeline_num=10 +https_connection_on=0 +max_curl_session_num=10 + +http_server_listen_port=9897 +http_server_manage_port=9897 +http_server_ip_list=192.168.11.241 + +[DORIS_CLIENT.backup2_server] + + diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf index d0ea133..518d694 100644 --- a/server/bin/conf/doris_main.conf +++ b/server/bin/conf/doris_main.conf @@ -3,13 +3,9 @@ worker_thread_num=2 server_listen_port=9898 manage_listen_port=2233 https_connection_on=1 +cache_file_frag_size=100 -#1-Doris client; 2-local file -receive_config_way=2 -cache_file_frag_size=67108864 -store_config_path=./doris_store_path -receive_config_path_full=./doris_receive_path/full/index -receive_config_path_inc=./doris_receive_path/inc/index +business_system_list=T1_1;VoIP run_log_dir=./log run_log_lv=20 @@ -20,40 +16,20 @@ fsstat_log_print_mode=1 fsstat_log_dst_ip=192.168.10.90 fsstat_log_dst_port=8125 +[T1_1] +#1-Doris client; 2-local file +receive_config_way=2 +grafana_monitor_status_id=3 +store_config_path=./doris_store_t1 +receive_config_path_full=./doris_receive_t1/full/index +receive_config_path_inc=./doris_receive_t1/inc/index +#doris_client_confile=./conf/doris_client.conf - -[DORIS_CLIENT] -fetch_fail_retry_interval=5 -fetch_fragmet_size=5242880 -fetch_confile_max_tries=3 - -fsstat_log_appname=DorisClient -fsstat_log_filepath=./log/doris_client.fs -fsstat_log_interval=2 -fsstat_log_print_mode=1 -fsstat_log_dst_ip=192.168.10.90 -fsstat_log_dst_port=8125 - -[DORIS_CLIENT.master_server] -max_connection_per_host=1 -max_cnnt_pipeline_num=10 -https_connection_on=1 -max_curl_session_num=10 - -http_server_listen_port=9897 -http_server_manage_port=9897 -http_server_ip_list=192.168.10.8 - -[DORIS_CLIENT.backup1_server] -max_connection_per_host=1 -max_cnnt_pipeline_num=10 -https_connection_on=0 -max_curl_session_num=10 - -http_server_listen_port=9897 -http_server_manage_port=9897 -http_server_ip_list=192.168.11.241 - -[DORIS_CLIENT.backup2_server] - +[VoIP] +receive_config_way=2 +grafana_monitor_status_id=4 +store_config_path=./doris_store_voip +receive_config_path_full=./doris_receive_voip/full/index +receive_config_path_inc=./doris_receive_voip/inc/index +#doris_client_confile=./conf/doris_client.conf diff --git a/server/doris_server_http.cpp b/server/doris_server_http.cpp index 1cff399..aa81c7a 100644 --- a/server/doris_server_http.cpp +++ b/server/doris_server_http.cpp @@ -73,47 +73,63 @@ int doris_create_listen_socket(int bind_port) void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) { - struct worker_statistic_info *statistic=(struct worker_statistic_info *)arg; struct evkeyvalq params; - const char *version; + const char *version, *bizname; int64_t verlong; char *endptr=NULL, length[64]; struct version_list_node *vernode; struct evbuffer *evbuf; + struct doris_business *business; + map::iterator iter; - statistic->statistic.field[DRS_FSSTAT_CLIENT_META_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_META_REQ], 0, FS_OP_ADD, 1); if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return; } if(NULL == (version = evhttp_find_header(¶ms, "version"))) { - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version found"); return; } if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0') { - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid"); return; } - evhttp_clear_headers(¶ms); - - pthread_rwlock_rdlock(&g_doris_server_info.rwlock); - if(verlong > g_doris_server_info.cfgver_head->latest_version) + if(NULL == (bizname = evhttp_find_header(¶ms, "business"))) { - pthread_rwlock_unlock(&g_doris_server_info.rwlock); - statistic->statistic.field[DRS_FSSTAT_SEND_META_NONEW] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); + evhttp_clear_headers(¶ms); + evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found"); + return; + } + if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end()) + { + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); + evhttp_clear_headers(¶ms); + evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid"); + return; + } + evhttp_clear_headers(¶ms); + business = iter->second; + + pthread_rwlock_rdlock(&business->rwlock); + if(verlong > business->cfgver_head->latest_version) + { + pthread_rwlock_unlock(&business->rwlock); + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found"); return; } - vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head); + vernode = TAILQ_FIRST(&business->cfgver_head->version_head); while(vernode->version < verlong) { vernode = TAILQ_NEXT(vernode, version_node); @@ -121,9 +137,9 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) evbuf = evbuffer_new(); evbuffer_add(evbuf, vernode->metacont, vernode->metalen); sprintf(length, "%u", vernode->metalen); - pthread_rwlock_unlock(&g_doris_server_info.rwlock); + pthread_rwlock_unlock(&business->rwlock); - statistic->statistic.field[DRS_FSSTAT_SEND_META_RES] += 1; + FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_META_RES], FS_OP_ADD, 1); evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/json"); evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive"); evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length); @@ -131,8 +147,8 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) evbuffer_free(evbuf); } -void doris_response_file_range(struct evhttp_request *req, const char *tablename, - int64_t verlong, size_t start, size_t end, bool range, struct worker_statistic_info *statistic) +void doris_response_file_range(struct evhttp_request *req, const char *bizname, const char *tablename, + int64_t verlong, size_t start, size_t end, bool range) { struct version_list_node *vernode; struct table_list_node *tablenode; @@ -140,16 +156,26 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename struct evbuffer *evbuf; char length[128]; size_t filesize, res_length=0, copy_len, offset=start; + struct doris_business *business; + map::iterator iter; - pthread_rwlock_rdlock(&g_doris_server_info.rwlock); - if(verlong > g_doris_server_info.cfgver_head->latest_version) + if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end()) { - pthread_rwlock_unlock(&g_doris_server_info.rwlock); - statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES_404] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); + evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid"); + return; + } + business = iter->second; + + pthread_rwlock_rdlock(&business->rwlock); + if(verlong > business->cfgver_head->latest_version) + { + pthread_rwlock_unlock(&business->rwlock); + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_NOTFOUND, "Version too old"); return; } - vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head); + vernode = TAILQ_FIRST(&business->cfgver_head->version_head); while(vernode->version < verlong) { vernode = TAILQ_NEXT(vernode, version_node); @@ -161,8 +187,8 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename } if(tablenode==NULL || start>tablenode->filesize) { - pthread_rwlock_unlock(&g_doris_server_info.rwlock); - statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES_404] += 1; + pthread_rwlock_unlock(&business->rwlock); + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_NOTFOUND, "No valid content found"); return; } @@ -183,12 +209,12 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename offset += copy_len; res_length += copy_len; } - pthread_rwlock_unlock(&g_doris_server_info.rwlock); + pthread_rwlock_unlock(&business->rwlock); assert(res_length == end + 1 - start); sprintf(length, "%lu", res_length); - statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES] += 1; - statistic->statistic.field[DRS_FSSTAT_SEND_FILE_BYTES] += res_length; + FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_FILE_RES], FS_OP_ADD, 1); + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_BYTES], 0, FS_OP_ADD, res_length); evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length); if(range) { @@ -203,31 +229,30 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename void doris_http_server_file_cb(struct evhttp_request *req, void *arg) { - struct worker_statistic_info *statistic=(struct worker_statistic_info *)arg; struct evkeyvalq params; - const char *version, *tablename, *content_range; + const char *version, *tablename, *content_range, *bizname; int64_t verlong; char *endptr=NULL; size_t req_start=0, req_end=0; - statistic->statistic.field[DRS_FSSTAT_CLIENT_FILE_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_FILE_REQ], 0, FS_OP_ADD, 1); if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return; } if(NULL==(version=evhttp_find_header(¶ms, "version")) || NULL==(tablename=evhttp_find_header(¶ms, "tablename"))) { evhttp_clear_headers(¶ms); - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version/tablename found"); return; } if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0') { evhttp_clear_headers(¶ms); - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid"); return; } @@ -235,12 +260,19 @@ void doris_http_server_file_cb(struct evhttp_request *req, void *arg) sscanf(content_range, "%*[^0-9]%lu-%lu", &req_start, &req_end)<1) { evhttp_clear_headers(¶ms); - statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid"); return; } + if(NULL == (bizname = evhttp_find_header(¶ms, "business"))) + { + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); + evhttp_clear_headers(¶ms); + evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found"); + return; + } - doris_response_file_range(req, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true, statistic); + doris_response_file_range(req, bizname, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true); evhttp_clear_headers(¶ms); } @@ -350,14 +382,10 @@ void* thread_doris_http_server(void *arg) { struct event_base *worker_evbase; struct evhttp *worker_http; - struct worker_statistic_info statistic; - struct timeval tv; prctl(PR_SET_NAME, "http_server"); - memset(&statistic, 0, sizeof(struct worker_statistic_info)); worker_evbase = event_base_new(); - worker_http = evhttp_new(worker_evbase); if(g_doris_server_info.ssl_conn_on) @@ -370,9 +398,9 @@ void* thread_doris_http_server(void *arg) evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance); } - evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, &statistic); - evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, &statistic); - evhttp_set_gencb(worker_http, doris_http_server_generic_cb, &statistic); + evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, NULL); + evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, NULL); + evhttp_set_gencb(worker_http, doris_http_server_generic_cb, NULL); evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_GET|EVHTTP_REQ_HEAD); if(evhttp_accept_socket(worker_http, g_doris_server_info.listener)) @@ -381,11 +409,6 @@ void* thread_doris_http_server(void *arg) assert(0); return NULL; } - evtimer_assign(&statistic.timer_statistic, worker_evbase, doris_worker_statistic_timer_cb, &statistic); - tv.tv_sec = g_doris_server_info.fsstat_period; - tv.tv_usec = 0; - evtimer_add(&statistic.timer_statistic, &tv); - event_base_dispatch(worker_evbase); printf("Libevent dispath error, should not run here.\n"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index 6cb8462..64fa952 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -17,7 +17,7 @@ #include "doris_server_http.h" struct doris_global_info g_doris_server_info; -static unsigned long doris_vesion_20210719=20210719L; +static unsigned long doris_vesion_20210722=20210722L; int doris_mkdir_according_path(const char * path) { @@ -54,9 +54,24 @@ int doris_mkdir_according_path(const char * path) return 0; } +static int doris_chech_name_valid(const char *name) +{ + size_t i, namelen=strlen(name); + + for(i=0; i='a' && name[i]<='z')||(name[i]>='A' && name[i]<='Z') || + (name[i]>='0' && name[i]<='9') || name[i]=='_' || name[i]==':')) + { + return false; + } + } + return true; +} + int32_t doris_read_profile_configs(const char *config_file) { - char tmp_buf[4096], tmp_dir[512], tmp_dir2[512]; + char tmp_buf[4096], tmp_dir[512]; MESA_load_profile_string_def(config_file, "DORIS_SERVER", "run_log_dir", g_doris_server_info.root_log_dir, sizeof(g_doris_server_info.root_log_dir), "./log"); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "run_log_lv", &g_doris_server_info.log_level, 10); @@ -86,37 +101,8 @@ int32_t doris_read_profile_configs(const char *config_file) } MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1); - MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_write_file_on", &g_doris_server_info.write_file_sw, 1); - - if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "store_config_path", g_doris_server_info.store_path_root, sizeof(g_doris_server_info.store_path_root))) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]store_config_path not found!", config_file); - assert(0);return -1; - } - snprintf(tmp_dir, 512, "%s/full/index", g_doris_server_info.store_path_root); - snprintf(tmp_dir2,512, "%s/inc/index", g_doris_server_info.store_path_root); - if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2)) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno)); - return -1; - } - - MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "receive_config_way", &g_doris_server_info.recv_way, RECV_WAY_DRS_CLIENT); - if(g_doris_server_info.recv_way == RECV_WAY_IDX_FILE) - { - if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "receive_config_path_full", g_doris_server_info.recv_path_full, sizeof(g_doris_server_info.recv_path_full))) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]receive_config_path not found!", config_file); - assert(0);return -1; - } - if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "receive_config_path_inc", g_doris_server_info.recv_path_inc, sizeof(g_doris_server_info.recv_path_inc))) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]receive_config_path not found!", config_file); - assert(0);return -1; - } - MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10); - } + MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0); if(g_doris_server_info.ssl_conn_on) { @@ -143,9 +129,11 @@ int32_t doris_read_profile_configs(const char *config_file) static int doris_server_register_field_stat(struct doris_global_info *param) { - const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvFullVer", "RecvIncVer", "RecvErrVer", "FileStarts", "FileComplete", - "ClientInvReq", "ClientMetaReq", "SendResMeta", "SendNoNewMeta", "ClientFileReq", "SendFiles", "SendBytes", "SendFile404"}; - const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed", "CurFullVer", "CurIncVer", "TotalCfgNum"}; + const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvErrVer", "FileStarts", "FileComplete", "ClientInvReq", + "ClientMetaReq", "SendNoNewMeta", "ClientFileReq", "SendBytes", "SendFile404"}; + const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed"}; + const char *column_names[DRS_FSSTAT_CLUMN_NUM]={"RecvFullVer", "RecvIncVer", "RecvFiles", "SendResMeta", "SendFiles", + "CurFullVer", "CurIncVer", "TotalCfgNum"}; int value; param->fsstat_handle = FS_create_handle(); @@ -176,6 +164,10 @@ static int doris_server_register_field_stat(struct doris_global_info *param) { param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); } + for(int i=0; ifsstat_column[i] = FS_register(param->fsstat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, column_names[i]); + } FS_start(param->fsstat_handle); return 0; } @@ -191,6 +183,96 @@ static void instance_fsstat_output_timer_cb(int fd, short kind, void *userp) event_add((struct event*)userp, &tv); } +static int32_t doris_init_config_for_business(struct doris_global_info *info, struct event_base *manage_evbase, const char *config_file) +{ + char tmpbuffer[4096], tmp_dir[256], tmp_dir2[256], *bizname, *save=NULL; + struct doris_business *business; + map::iterator iter; + + if(0>=MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "business_system_list", tmpbuffer, sizeof(tmpbuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list not found!", config_file); + assert(0);return -1; + } + for(bizname=strtok_r(tmpbuffer, ";", &save); bizname!=NULL; bizname=strtok_r(NULL, ";", &save)) + { + if(!doris_chech_name_valid(bizname)) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list bizname %s invalid, name must match: [a-zA-Z_:][a-zA-Z0-9_:]", bizname, config_file); + assert(0);return -1; + } + business = &info->business[info->business_num++]; + snprintf(business->bizname, sizeof(business->bizname), "%s", bizname); + pthread_rwlock_init(&business->rwlock, NULL); + + business->cfgver_head = config_version_handle_new(); + if(info->name2business->find(string(business->bizname)) != info->name2business->end()) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]business_system_list duplicate system name: %s!", bizname, config_file, business->bizname); + assert(0);return -1; + } + info->name2business->insert(make_pair(string(business->bizname), business)); + + MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3); + MESA_load_profile_uint_def(config_file, business->bizname, "doris_write_file_on", &business->write_file_sw, 1); + if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file); + assert(0);return -1; + } + snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root); + snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root); + if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2)) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno)); + return -1; + } + + MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT); + if(business->recv_way == RECV_WAY_DRS_CLIENT) + { + if(0>=MESA_load_profile_string_nodef(config_file, business->bizname, "doris_client_confile", tmp_dir, sizeof(tmp_dir))) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]doris_client_confile not found!", config_file); + assert(0);return -1; + } + if((iter=info->confile2param->find(string(tmp_dir))) != info->confile2param->end()) + { + business->param = iter->second; + } + else + { + business->param = doris_parameter_new(tmp_dir, manage_evbase, info->log_runtime); + if(business->param == NULL) + { + assert(0);return -2; + } + info->confile2param->insert(make_pair(string(tmp_dir), business->param)); + } + } + else + { + if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_full", business->recv_path_full, sizeof(business->recv_path_full))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file); + assert(0);return -1; + } + if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_inc", business->recv_path_inc, sizeof(business->recv_path_inc))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file); + assert(0);return -1; + } + } + business->fs_lineid = FS_register(info->fsstat_handle, FS_STYLE_LINE, FS_CALC_CURRENT, business->bizname);; + + snprintf(tmp_dir, 512, "latest_cfg_version_%s", business->bizname); + business->mm_latest_ver = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Latest doris config version."); + snprintf(tmp_dir, 512, "total_config_num_%s", business->bizname); + business->mm_total_cfgnum = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Total config num from latest full version till now."); + } + return 0; +} + static void manager_statistic_threads_requests_cb(struct evhttp_request *req, void *arg) { evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported."); @@ -224,30 +306,11 @@ int main(int argc, char **argv) { return -1; } - g_doris_server_info.cfgver_head = config_version_handle_new(); - pthread_rwlock_init(&g_doris_server_info.rwlock, NULL); evthread_use_pthreads(); + g_doris_server_info.name2business = new map; + g_doris_server_info.confile2param = new map; manage_evbase = event_base_new(); - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if(g_doris_server_info.recv_way == RECV_WAY_DRS_CLIENT) - { - if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, manage_evbase)) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); - assert(0);return -4; - } - } - else - { - if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, NULL)) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); - assert(0);return -4; - } - } - /*Doris manager server*/ g_doris_server_info.manager = doris_create_listen_socket(g_doris_server_info.manager_port); if(g_doris_server_info.manager < 0) @@ -263,10 +326,7 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210719); - g_doris_server_info.mm_latest_ver = MESA_Monitor_register(g_doris_server_info.monitor, "latest_cfg_version", MONITOR_METRICS_GAUGE, "Latest doris config version."); - g_doris_server_info.mm_total_cfgnum = MESA_Monitor_register(g_doris_server_info.monitor, "total_config_num", MONITOR_METRICS_GAUGE, "Total config num from latest full version till now."); - + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210722); if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) { printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager); @@ -274,6 +334,34 @@ int main(int argc, char **argv) assert(0); return -7; } + //为每个业务系统初始化拉取配置的结构 + if(doris_init_config_for_business(&g_doris_server_info, manage_evbase, NIRVANA_CONFIG_FILE)) + { + return -8; + } + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + for(u_int32_t i=0; i +#include + +using namespace std; + #ifndef __FILENAME__ #define __FILENAME__ __FILE__ #endif @@ -27,9 +33,30 @@ MESA_handle_runtime_log((handle), (lv), "DorisServer", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args) #define NIRVANA_CONFIG_FILE "./conf/doris_main.conf" +#define MAX_BUSINESS_NUM 64 #define RECV_WAY_DRS_CLIENT 1 #define RECV_WAY_IDX_FILE 2 +#define RECV_WAY_HTTP_POST 3 + +struct doris_business +{ + char bizname[32]; + u_int32_t recv_way; + u_int32_t write_file_sw; + char recv_path_full[256]; + char recv_path_inc[256]; + char store_path_root[256]; + struct version_list_handle *cfgver_head; + struct doris_parameter *param; + + int64_t total_cfgnum; + int32_t mm_latest_ver; + int32_t mm_total_cfgnum; + u_int32_t mm_status_codeid; //MM内部异常状态id + u_int32_t fs_lineid; + pthread_rwlock_t rwlock; +}; struct doris_global_info { @@ -38,15 +65,9 @@ struct doris_global_info int32_t manager_port; int32_t sock_recv_bufsize; u_int32_t ssl_conn_on; - u_int32_t recv_way; - char recv_path_full[256]; - char recv_path_inc[256]; - char store_path_root[256]; - char store_path_inc[256]; u_int32_t scan_idx_interval; u_int32_t cache_frag_size; u_int32_t server_role_sw; - u_int32_t write_file_sw; char ssl_CA_path[256]; char ssl_cert_file[256]; @@ -55,15 +76,16 @@ struct doris_global_info pthread_mutex_t *lock_cs; SSL_CTX *ssl_instance; - struct version_list_handle *cfgver_head; evutil_socket_t listener; evutil_socket_t manager; - pthread_rwlock_t rwlock; + + struct doris_business business[MAX_BUSINESS_NUM]; + u_int32_t business_num; + map *name2business; + map *confile2param; struct MESA_MonitorHandler *monitor; - int32_t mm_latest_ver; - int32_t mm_total_cfgnum; - + /*logs*/ u_int32_t log_level; u_int32_t statistic_period; @@ -79,6 +101,7 @@ struct doris_global_info int32_t fsstat_dst_port; int32_t fsstat_field[DRS_FSSTAT_FIELD_NUM]; int32_t fsstat_status[DRS_FSSTAT_STATUS_NUM]; + int32_t fsstat_column[DRS_FSSTAT_CLUMN_NUM]; }; int doris_mkdir_according_path(const char * path); diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 3538f4f..594a6a1 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -16,6 +16,7 @@ struct scanner_timer_priv { + struct doris_business *business; struct doris_callbacks doris_cbs; struct doris_arguments doris_args; struct doris_idxfile_scanner *scanner; @@ -24,45 +25,16 @@ struct scanner_timer_priv extern struct doris_global_info g_doris_server_info; - -void doris_worker_statistic_timer_cb(int fd, short kind, void *userp) -{ - struct worker_statistic_info *statistic = (struct worker_statistic_info *)userp; - struct timeval tv; - struct doris_srv_statistics incr_statistic; - long long *plast_statistic = (long long*)&statistic->statistic_last; - long long *pnow_statistic = (long long*)&statistic->statistic; - long long *pinc_statistic = (long long*)&incr_statistic; - - for(u_int32_t i=0; istatistic_last = statistic->statistic; - - for(u_int32_t i=0; itimer_statistic, &tv); -} - -void config_frag_node_cleanup(struct confile_save *save, struct cont_frag_node *fragnode) +void config_frag_node_cleanup(struct cont_frag_node *fragnode) { if(fragnode == NULL) return; - save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] -= fragnode->totalsize; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_SUB, fragnode->totalsize); free(fragnode->content); free(fragnode); } -void config_table_node_cleanup(struct confile_save *save, struct table_list_node *table_node) +void config_table_node_cleanup(struct table_list_node *table_node) { struct cont_frag_node *fragnode; @@ -71,12 +43,12 @@ void config_table_node_cleanup(struct confile_save *save, struct table_list_node while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head))) { TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node); - config_frag_node_cleanup(save, fragnode); + config_frag_node_cleanup(fragnode); } free(table_node); } -void config_version_node_cleanup(struct confile_save *save, struct version_list_node *vernode) +void config_version_node_cleanup(struct version_list_node *vernode) { struct table_list_node *tablenode; @@ -85,7 +57,7 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_ while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head))) { TAILQ_REMOVE(&vernode->table_head, tablenode, table_node); - config_table_node_cleanup(save, tablenode); + config_table_node_cleanup(tablenode); } free(vernode->metacont); cJSON_Delete(vernode->metajson); @@ -94,14 +66,14 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_ free(vernode); } -void config_version_handle_cleanup(struct confile_save *save, struct version_list_handle *version) +void config_version_handle_cleanup(struct version_list_handle *version) { struct version_list_node *vernode; while(NULL != (vernode = TAILQ_FIRST(&version->version_head))) { TAILQ_REMOVE(&version->version_head, vernode, version_node); - config_version_node_cleanup(save, vernode); + config_version_node_cleanup(vernode); } free(version); } @@ -134,7 +106,7 @@ static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp) doris_common_timer_start(&delay_event->timer_event); return; } - config_version_handle_cleanup(delay_event->save, handle); + config_version_handle_cleanup(handle); free(delay_event); } @@ -144,7 +116,6 @@ static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_ delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event)); delay_event->data = version; - delay_event->save = save; evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event); doris_common_timer_start(&delay_event->timer_event); } @@ -156,16 +127,20 @@ void doris_config_file_version_start(struct doris_instance *instance, cJSON *met if(save->type == CFG_UPDATE_TYPE_FULL) { - snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version); - snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version); - snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version); + snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", save->business->store_path_root, save->version); + snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version); + snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", save->business->store_path_root, save->version); } else { - snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", g_doris_server_info.store_path_root, save->version); - snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version); + snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", save->business->store_path_root, save->version); + snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version); + } + if(NULL==(save->fp_idx_file = fopen(save->tmp_index_path, "w+"))) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->tmp_index_path, strerror(errno)); + assert(0); } - save->fp_idx_file = fopen(save->tmp_index_path, "w+"); } void doris_config_file_version_finish(struct doris_instance *instance, void *userdata) @@ -175,18 +150,18 @@ void doris_config_file_version_finish(struct doris_instance *instance, void *use fclose(save->fp_idx_file); if(rename(save->tmp_index_path, save->inc_index_path)) { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno)); assert(0); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno)); } if(save->type == CFG_UPDATE_TYPE_FULL) { if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //创建硬链接 { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno)); assert(0); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno)); } } - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu write finished, index file: %s", save->version, save->inc_index_path); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", save->business->bizname, save->version, save->inc_index_path); } void doris_config_file_version_error(struct doris_instance *instance, void *userdata) @@ -203,7 +178,7 @@ void doris_config_file_version_error(struct doris_instance *instance, void *user fclose(save->fp_cfg_file); remove(save->cfg_file_path); } - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %llu error, rolling back...", save->version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version); } void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) @@ -217,7 +192,7 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc"; now = time(NULL); localtm = localtime_r(&now, &savetime); - snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", g_doris_server_info.store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday); + snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", save->business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday); if(access(dir, F_OK)) { doris_mkdir_according_path(dir); @@ -225,8 +200,15 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version); fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path); - save->fp_cfg_file = fopen(save->cfg_file_path, "w+"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s start writing...", save->cfg_file_path); + if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+"))) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno)); + assert(0); + } + else + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", save->business->bizname, save->cfg_file_path); + } } @@ -236,7 +218,11 @@ void doris_config_file_cfgfile_update(struct doris_instance *instance, const cha size_t writen_len; writen_len = fwrite(data, 1, len, save->fp_cfg_file); - assert(writen_len==len); + if(writen_len != len) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno)); + assert(0); + } } void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata) @@ -244,7 +230,7 @@ void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *use struct confile_save *save=(struct confile_save *)userdata; fclose(save->fp_cfg_file); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s write finished", save->cfg_file_path); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", save->business->bizname, save->cfg_file_path); } /*mem系列函数,加载入内存*/ @@ -277,24 +263,26 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user cJSON_Delete(save->cur_vernode->metajson); save->cur_vernode->metajson = NULL; - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %lu mem finished, info: %s", save->version, save->cur_vernode->metacont); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", save->business->bizname, save->version, save->cur_vernode->metacont); - if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && g_doris_server_info.cfgver_head->latest_version!=0) + if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && save->business->cfgver_head->latest_version!=0) { cur_version = config_version_handle_new(); cur_version->latest_version = save->cur_vernode->version; TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node); - pthread_rwlock_wrlock(&g_doris_server_info.rwlock); - tmplist = g_doris_server_info.cfgver_head; - g_doris_server_info.cfgver_head = cur_version; - pthread_rwlock_unlock(&g_doris_server_info.rwlock); + pthread_rwlock_wrlock(&save->business->rwlock); + tmplist = save->business->cfgver_head; + save->business->cfgver_head = cur_version; + pthread_rwlock_unlock(&save->business->rwlock); cfgver_handle_delay_destroy(save, save->evbase, tmplist); } else { - TAILQ_INSERT_TAIL(&g_doris_server_info.cfgver_head->version_head, save->cur_vernode, version_node); - g_doris_server_info.cfgver_head->latest_version = save->cur_vernode->version; + pthread_rwlock_wrlock(&save->business->rwlock); + TAILQ_INSERT_TAIL(&save->business->cfgver_head->version_head, save->cur_vernode, version_node); + save->business->cfgver_head->latest_version = save->cur_vernode->version; + pthread_rwlock_unlock(&save->business->rwlock); } save->cur_vernode = NULL; } @@ -303,9 +291,9 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd { struct confile_save *save=(struct confile_save *)userdata; - config_frag_node_cleanup(save, save->cur_frag); - config_table_node_cleanup(save, save->cur_table); - config_version_node_cleanup(save, save->cur_vernode); + config_frag_node_cleanup(save->cur_frag); + config_table_node_cleanup(save->cur_table); + config_version_node_cleanup(save->cur_vernode); save->cur_frag = NULL; save->cur_table = NULL; save->cur_vernode = NULL; @@ -325,7 +313,7 @@ void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char save->cur_table->filesize = size; TAILQ_INIT(&save->cur_table->frag_head); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu start loading to memory...", tablename, save->version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...", save->business->bizname, tablename, save->version); } void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata) @@ -333,7 +321,7 @@ void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char struct confile_save *save=(struct confile_save *)userdata; size_t cache_len, offset=0; - save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] += len; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_ADD, len); while(len > 0) { if(save->cur_frag == NULL) @@ -389,7 +377,7 @@ void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char } assert(save->cur_table->cur_totallen == save->cur_table->filesize); TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu load to memory finished", save->cur_table->tablename, save->version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", save->business->bizname, save->cur_table->tablename, save->version); save->cur_table = NULL; } @@ -404,47 +392,49 @@ void doris_config_common_version_start(struct confile_save *save, cJSON *meta) save->type = sub->valueint; assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC); save->version_cfgnum = 0; - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu start updating...", save->version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", save->business->bizname, save->version); } void doris_config_common_version_finish(struct confile_save *save) { if(save->type == CFG_UPDATE_TYPE_FULL) { - save->statistic.statistic.status[DRS_FSSTAT_CUR_FULL_VERSION] = save->version; - save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] = save->version_cfgnum; - save->statistic.statistic.field[DRS_FSSTAT_RECV_FULL_VER] += 1; + save->business->total_cfgnum = save->version_cfgnum; + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, save->version); + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, save->version_cfgnum); + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1); } else { - save->statistic.statistic.status[DRS_FSSTAT_CUR_INC_VERSION] = save->version; - save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] += save->version_cfgnum; - save->statistic.statistic.field[DRS_FSSTAT_RECV_INC_VER] += 1; + save->business->total_cfgnum += save->version_cfgnum; + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, save->version); + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, save->version_cfgnum); + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1); } - MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_latest_ver, MONITOR_VALUE_SET, save->version); - MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_total_cfgnum, MONITOR_VALUE_SET, - save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM]); - MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, MONITOR_STATUS_VERSION_ERR, NULL, NULL); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu update finished", save->version); + MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_latest_ver, MONITOR_VALUE_SET, save->version); + MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_total_cfgnum, MONITOR_VALUE_SET, save->business->total_cfgnum); + MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, save->business->mm_status_codeid, NULL, NULL); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", save->business->bizname, save->version); } void doris_config_common_version_error(struct confile_save *save) { - save->statistic.statistic.field[DRS_FSSTAT_RECV_ERR_VER] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1); //Grafana+Promethues,展示内部异常状态 - MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, MONITOR_STATUS_VERSION_ERR, + MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, save->business->mm_status_codeid, "Version receive error", "Receive config file error, please check producer"); } void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum) { save->version_cfgnum += cfgnum; - save->statistic.statistic.field[DRS_FSSTAT_RECV_START_FILES] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1); } void doris_config_common_cfgfile_finish(struct confile_save *save) { - save->statistic.statistic.field[DRS_FSSTAT_RECV_CMPLT_FILES] += 1; + FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_CMPLT_FILES], 0, FS_OP_ADD, 1); + FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1); } /*localmem系列函数,启动时从本地缓存读的回调*/ @@ -504,8 +494,10 @@ void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const /*无标记系列函数,新来配置时回调*/ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata) { + struct confile_save *save=(struct confile_save *)userdata; + doris_config_common_version_start((struct confile_save *)userdata, meta); - if(g_doris_server_info.write_file_sw) + if(save->business->write_file_sw) { doris_config_file_version_start(instance, meta, userdata); } @@ -517,7 +509,9 @@ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, vo void doris_config_version_finish(struct doris_instance *instance, void *userdata) { - if(g_doris_server_info.write_file_sw) + struct confile_save *save=(struct confile_save *)userdata; + + if(save->business->write_file_sw) { doris_config_file_version_finish(instance, userdata); } @@ -530,8 +524,10 @@ void doris_config_version_finish(struct doris_instance *instance, void *userdata void doris_config_version_error(struct doris_instance *instance, void *userdata) { + struct confile_save *save=(struct confile_save *)userdata; + doris_config_common_version_error((struct confile_save *)userdata); - if(g_doris_server_info.write_file_sw) + if(save->business->write_file_sw) { doris_config_file_version_error(instance, userdata); } @@ -543,8 +539,10 @@ void doris_config_version_error(struct doris_instance *instance, void *userdata) void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) { + struct confile_save *save=(struct confile_save *)userdata; + doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum); - if(g_doris_server_info.write_file_sw) + if(save->business->write_file_sw) { doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userdata); } @@ -556,7 +554,9 @@ void doris_config_cfgfile_start(struct doris_instance *instance, const char *tab void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata) { - if(g_doris_server_info.write_file_sw) + struct confile_save *save=(struct confile_save *)userdata; + + if(save->business->write_file_sw) { doris_config_file_cfgfile_update(instance, data, len, userdata); } @@ -568,8 +568,10 @@ void doris_config_cfgfile_update(struct doris_instance *instance, const char *da void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata) { + struct confile_save *save=(struct confile_save *)userdata; + doris_config_common_cfgfile_finish((struct confile_save *)userdata); - if(g_doris_server_info.write_file_sw) + if(save->business->write_file_sw) { doris_config_file_cfgfile_finish(instance, userdata); } @@ -581,16 +583,15 @@ void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md void* thread_doris_client_recv_cfg(void *arg) { - struct event_base *manage_evbase=(struct event_base *)arg, *client_evbase; - struct doris_parameter *param; + struct doris_business *business=(struct doris_business *)arg; + struct event_base *client_evbase; struct doris_instance *instance; struct doris_callbacks doris_cbs; - struct doris_arguments doris_args={0, 0, 0}; + struct doris_arguments doris_args; struct doris_idxfile_scanner *scanner; enum DORIS_UPDATE_TYPE update_type; struct confile_save save; char stored_path[512]; - struct timeval tv; prctl(PR_SET_NAME, "client_recv"); @@ -599,6 +600,7 @@ void* thread_doris_client_recv_cfg(void *arg) memset(&save, 0, sizeof(struct confile_save)); save.source_from = RECV_WAY_IDX_FILE; save.evbase = client_evbase; + save.business = business; scanner = doris_index_file_scanner(0); @@ -611,10 +613,10 @@ void* thread_doris_client_recv_cfg(void *arg) doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; doris_cbs.userdata = &save; - snprintf(stored_path, 512, "%s/full/index", g_doris_server_info.store_path_root); + snprintf(stored_path, 512, "%s/full/index", business->store_path_root); update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); assert(update_type!=CFG_UPDATE_TYPE_ERR); - snprintf(stored_path, 512, "%s/inc/index", g_doris_server_info.store_path_root); + snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do { update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); assert(update_type!=CFG_UPDATE_TYPE_ERR); @@ -630,24 +632,15 @@ void* thread_doris_client_recv_cfg(void *arg) doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; save.source_from = RECV_WAY_DRS_CLIENT; + memset(&doris_args, 0, sizeof(struct doris_arguments)); doris_args.current_version = scanner->cur_version; - param = doris_parameter_new(NIRVANA_CONFIG_FILE, manage_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); - if(param == NULL) - { - assert(0);return NULL; - } - - instance = doris_instance_new(param, client_evbase, g_doris_server_info.log_runtime); + sprintf(doris_args.bizname, "%s", business->bizname); + instance = doris_instance_new(business->param, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) { assert(0);return NULL; } - evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic); - tv.tv_sec = g_doris_server_info.fsstat_period; - tv.tv_usec = 0; - evtimer_add(&save.statistic.timer_statistic, &tv); - event_base_dispatch(client_evbase); printf("Libevent dispath error, should not run here.\n"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); @@ -661,7 +654,7 @@ static void doris_scanner_timer_cb(int fd, short kind, void *userp) struct timeval tv; do { - update_type = doris_index_file_traverse(timer_priv->scanner, g_doris_server_info.recv_path_inc, + update_type = doris_index_file_traverse(timer_priv->scanner, timer_priv->business->recv_path_inc, &timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type != CFG_UPDATE_TYPE_NONE); @@ -672,6 +665,7 @@ static void doris_scanner_timer_cb(int fd, short kind, void *userp) void* thread_index_file_recv_cfg(void *arg) { + struct doris_business *business=(struct doris_business *)arg; struct event_base *client_evbase; struct confile_save save; struct timeval tv; @@ -688,8 +682,10 @@ void* thread_index_file_recv_cfg(void *arg) save.source_from = RECV_WAY_IDX_FILE; save.evbase = client_evbase; + save.business = business; timer_priv.scanner = doris_index_file_scanner(0); + timer_priv.business = business; /*Retaive latest config to memory from Stored configs*/ timer_priv.doris_cbs.version_start = doris_config_localmem_version_start; @@ -700,10 +696,10 @@ void* thread_index_file_recv_cfg(void *arg) timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; timer_priv.doris_cbs.userdata = &save; - snprintf(stored_path, 512, "%s/full/index", g_doris_server_info.store_path_root); + snprintf(stored_path, 512, "%s/full/index", business->store_path_root); update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); assert(update_type!=CFG_UPDATE_TYPE_ERR); - snprintf(stored_path, 512, "%s/inc/index", g_doris_server_info.store_path_root); + snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do{ update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); assert(update_type!=CFG_UPDATE_TYPE_ERR); @@ -718,7 +714,7 @@ void* thread_index_file_recv_cfg(void *arg) timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update; timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; - update_type = doris_index_file_traverse(timer_priv.scanner, g_doris_server_info.recv_path_full, + update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); assert(update_type!=CFG_UPDATE_TYPE_ERR); if(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR) @@ -733,11 +729,6 @@ void* thread_index_file_recv_cfg(void *arg) evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv); evtimer_add(&timer_priv.timer_scanner, &tv); - evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic); - tv.tv_sec = g_doris_server_info.fsstat_period; - tv.tv_usec = 0; - evtimer_add(&save.statistic.timer_statistic, &tv); - event_base_dispatch(client_evbase); printf("Libevent dispath error, should not run here.\n"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h index 73ec493..457d708 100644 --- a/server/doris_server_receive.h +++ b/server/doris_server_receive.h @@ -7,34 +7,39 @@ #include -#define MONITOR_STATUS_VERSION_ERR 3 - enum DORIS_SERVER_FS_FILED { - DRS_FSSTAT_RECV_FULL_VER=0, - DRS_FSSTAT_RECV_INC_VER, - DRS_FSSTAT_RECV_ERR_VER, + DRS_FSSTAT_RECV_ERR_VER=0, DRS_FSSTAT_RECV_START_FILES, DRS_FSSTAT_RECV_CMPLT_FILES, - + DRS_FSSTAT_CLIENT_INVALID_REQ, DRS_FSSTAT_CLIENT_META_REQ, - DRS_FSSTAT_SEND_META_RES, DRS_FSSTAT_SEND_META_NONEW, DRS_FSSTAT_CLIENT_FILE_REQ, - DRS_FSSTAT_SEND_FILE_RES, DRS_FSSTAT_SEND_FILE_BYTES, DRS_FSSTAT_SEND_FILE_RES_404, DRS_FSSTAT_FIELD_NUM, }; +enum DORIS_SERVER_FS_COLUMN +{ + DRS_FSCLM_RECV_FULL_VER=0, + DRS_FSCLM_RECV_INC_VER, + DRS_FSCLM_RECV_FILES, + DRS_FSCLM_SEND_META_RES, + DRS_FSCLM_SEND_FILE_RES, + DRS_FSCLM_CUR_FULL_VERSION, + DRS_FSCLM_CUR_INC_VERSION, + DRS_FSCLM_CONFIG_TOTAL_NUM, + + DRS_FSSTAT_CLUMN_NUM, +}; + enum DORIS_SERVER_FS_STATUS { DRS_FSSTAT_MEMORY_USED=0, - DRS_FSSTAT_CUR_FULL_VERSION, - DRS_FSSTAT_CUR_INC_VERSION, - DRS_FSSTAT_CONFIG_TOTAL_NUM, DRS_FSSTAT_STATUS_NUM, }; @@ -82,21 +87,11 @@ struct version_list_handle struct version_list_handle *config_version_handle_new(void); -struct doris_srv_statistics -{ - long long field[DRS_FSSTAT_FIELD_NUM]; - long long status[DRS_FSSTAT_STATUS_NUM]; -}; - -struct worker_statistic_info -{ - struct event timer_statistic; - struct doris_srv_statistics statistic, statistic_last; -}; - +struct doris_business; struct confile_save { struct event_base *evbase; + struct doris_business *business; int64_t version; int32_t source_from; int32_t type; @@ -108,8 +103,6 @@ struct confile_save FILE *fp_cfg_file; FILE *fp_idx_file; - struct worker_statistic_info statistic; - struct version_list_node *cur_vernode; struct table_list_node *cur_table; struct cont_frag_node *cur_frag; @@ -118,12 +111,9 @@ struct confile_save struct common_timer_event { struct event timer_event; - struct confile_save *save; void *data; }; -void doris_worker_statistic_timer_cb(int fd, short kind, void *userp); - void* thread_doris_client_recv_cfg(void *arg); void* thread_index_file_recv_cfg(void *arg);