From 6386e5de579c42f2730e0c92b8e4cab5b6c57559 Mon Sep 17 00:00:00 2001 From: "linuxrc@163.com" Date: Mon, 30 Aug 2021 14:22:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B8=A6=E5=85=A8=E9=87=8F=E7=9A=84=E4=B8=9A?= =?UTF-8?q?=E5=8A=A1=EF=BC=8C=E6=94=AF=E6=8C=81=E6=8C=81=E4=B9=85=E5=8C=96?= =?UTF-8?q?=E7=9A=84=E6=97=A7=E9=85=8D=E7=BD=AE=E6=B7=98=E6=B1=B0=E5=88=A0?= =?UTF-8?q?=E9=99=A4=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/bin/conf/doris_main.conf | 1 + server/doris_server_main.cpp | 10 ++- server/doris_server_main.h | 2 + server/doris_server_receive.cpp | 52 ++++++++++---- server/doris_server_scandir.cpp | 124 +++++++++++++++++++++++++++++++- server/doris_server_scandir.h | 3 + 6 files changed, 176 insertions(+), 16 deletions(-) diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf index fcf8a6c..7816131 100644 --- a/server/bin/conf/doris_main.conf +++ b/server/bin/conf/doris_main.conf @@ -24,6 +24,7 @@ fsstat_log_dst_port=8125 #1-Doris client; 2-local file; 3-HTTP post server receive_config_way=1 grafana_monitor_status_id=3 +#max_store_full_versions=0 #producer_listen_port=9800 #producer_concurrence_allowed=0 store_config_path=./doris_store_t1 diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index 97cefe6..480e904 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -24,7 +24,7 @@ #include "doris_server_http.h" struct doris_global_info g_doris_server_info; -static unsigned long doris_version_20210826=20210826L; +static unsigned long doris_version_20210830=20210830L; int doris_mkdir_according_path(const char * path) { @@ -299,6 +299,12 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st } info->name2business->insert(make_pair(string(business->bizname), business)); + MESA_load_profile_uint_def(config_file, business->bizname, "max_store_full_versions", &business->saves_when_fulldel, 0); + if(business->saves_when_fulldel > 16) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40mAlert! %s [%s]max_store_full_versions support max 16!!!!\033[0m\n", config_file, business->bizname); + business->saves_when_fulldel = 16; + } MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mmval_status_codeid, 3); MESA_load_profile_uint_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0); if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root))) @@ -434,7 +440,7 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_version_20210826); + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_version_20210830); if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) { printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager); diff --git a/server/doris_server_main.h b/server/doris_server_main.h index ab5a4c6..b97f760 100644 --- a/server/doris_server_main.h +++ b/server/doris_server_main.h @@ -50,9 +50,11 @@ struct doris_business u_int32_t recv_way; u_int32_t cache_max_versions; u_int32_t concurrency_allowed; + u_int32_t saves_when_fulldel; //有全量到来时,最多保存几个最新的全量版本,0-全保存 char recv_path_full[256]; char recv_path_inc[256]; char store_path_root[256]; + int64_t full_version_inc[16]; struct version_list_handle *cfgver_head; struct doris_csum_param *param_csum; struct doris_prod_param *param_prod; diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 6d75fe4..6d30623 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -154,14 +154,14 @@ void doris_config_file_version_start(struct doris_csum_instance *instance, cJSON if(business->type == CFG_UPDATE_TYPE_FULL) { - snprintf(business->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", business->store_path_root, business->version); - snprintf(business->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version); - snprintf(business->full_index_path, 512, "%s/full/index/full_config_index.%010lu", business->store_path_root, business->version); + snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, business->version); + snprintf(business->tmp_index_path, 256, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version); + snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, business->version); } else { - snprintf(business->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, business->version); - snprintf(business->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version); + snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, business->version); + snprintf(business->tmp_index_path, 256, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version); } if(NULL==(business->fp_idx_file = fopen(business->tmp_index_path, "w+"))) { @@ -173,6 +173,7 @@ void doris_config_file_version_start(struct doris_csum_instance *instance, cJSON void doris_config_file_version_finish(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; + char tmp_index_dir[256]; fclose(business->fp_idx_file); if(rename(business->tmp_index_path, business->inc_index_path)) @@ -189,6 +190,19 @@ void doris_config_file_version_finish(struct doris_csum_instance *instance, void business->bizname, business->tmp_index_path, business->inc_index_path, strerror(errno)); assert(0); } + if(business->saves_when_fulldel > 0) + { + for(u_int32_t i=1; isaves_when_fulldel; i++) + { + business->full_version_inc[i-1] = business->full_version_inc[i]; //递增排序,将新的版本放入最大值 + } + business->full_version_inc[business->saves_when_fulldel-1] = business->version; + + snprintf(tmp_index_dir, 256, "%s/full/index", business->store_path_root); + remove_configs_version_smaller(tmp_index_dir, business->full_version_inc[0], 0, g_doris_server_info.log_runtime); + snprintf(tmp_index_dir, 256, "%s/inc/index", business->store_path_root); + remove_configs_version_smaller(tmp_index_dir, business->full_version_inc[0], 1, g_doris_server_info.log_runtime); + } } MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", business->bizname, business->version, business->inc_index_path); @@ -655,6 +669,10 @@ void* thread_doris_client_recv_cfg(void *arg) doris_cbs.userdata = business; snprintf(stored_path, 512, "%s/full/index", business->store_path_root); + if(business->saves_when_fulldel > 0) + { + get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); + } update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do { @@ -735,6 +753,10 @@ void* thread_index_file_recv_cfg(void *arg) snprintf(stored_path, 512, "%s/full/index", business->store_path_root); update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); + if(business->saves_when_fulldel > 0) + { + get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); + } snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do{ update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); @@ -1002,7 +1024,7 @@ void http_config_direct_version_cancel(struct version_list_node *vernode, struct remove(tablenode->localpath); } config_version_node_cleanup(vernode); - if(evtimer_pending(&vernode->timer_expire, NULL)) + if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) { evtimer_del(&vernode->timer_expire); } @@ -1096,12 +1118,12 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL) { - snprintf(business->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); - snprintf(business->full_index_path, 512, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); + snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); + snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); } else { - snprintf(business->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); + snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); } /*HTTP post时,多版本并发每个都有自己的临时通知文件名,复用本地文件的关闭函数*/ sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path); @@ -1139,7 +1161,7 @@ void http_config_direct_version_finish(struct version_list_node *vernode, struct char version[32], token[64]; int64_t new_version; - if(evtimer_pending(&vernode->timer_expire, NULL)) + if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) { evtimer_del(&vernode->timer_expire); } @@ -1220,7 +1242,7 @@ void http_prod_server_verion_end_cb(struct evhttp_request *req, void *arg) { sprintf(version, "%lu", vernode->version); evhttp_add_header(evhttp_request_get_output_headers(req), "X-Set-Version", version); - evhttp_send_error(req, HTTP_OK, "version already finished"); //保证最终一致性 + evhttp_send_reply(req, HTTP_OK, "version already finished", NULL); //保证最终一致性 return; } if(vernode->cur_table != NULL || vernode->syncing) @@ -1312,11 +1334,11 @@ void doris_config_post_version_start(struct version_list_node *cur_vernode, cons snprintf(cur_vernode->token, 64, "%s", token); if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL) { - snprintf(cur_vernode->tmp_index_path, 512, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); + snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); } else { - snprintf(cur_vernode->tmp_index_path, 512, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); + snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); } if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+"))) { @@ -2105,6 +2127,10 @@ void* thread_http_post_recv_cfg(void *arg) doris_cbs.userdata = business; snprintf(stored_path, 512, "%s/full/index", business->store_path_root); + if(business->saves_when_fulldel > 0) + { + get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); + } update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do { diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp index a9e4404..73d8f2e 100644 --- a/server/doris_server_scandir.cpp +++ b/server/doris_server_scandir.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -95,6 +96,127 @@ int filter_fn(const struct dirent * ent) strncmp(ent->d_name,"inc_config_index",strlen("inc_config_index")) == 0); } +void remove_configs_of_index_file(const char *filename) +{ + FILE* fp=NULL; + int ret=0; + char line[MAX_CONFIG_LINE]; + struct cfg_table_info idx; + + fp=fopen(filename, "r"); + while(!feof(fp)) + { + memset(line, 0, sizeof(line)); + fgets(line, sizeof(line), fp); + ret=sscanf(line,"%[^ \t]%*[ \t]%d%*[ \t]%s", idx.table_name, &idx.cfg_num, idx.cfg_path); + if(ret != 3) + { + continue; + } + if(!access(idx.cfg_path, F_OK)) + { + remove(idx.cfg_path); + } + } + fclose(fp); +} + +void remove_configs_version_smaller(const char *file_dir, int64_t version_higher, int recursively, void *logger) +{ + struct dirent **namelist; + int64_t config_seq; + int32_t i, n=0; + char update_str[32], index_path[256]; + + if(version_higher <= 1) + { + return; + } + + n = my_scandir(file_dir, &namelist, filter_fn, (int (*)(const void*, const void*))alphasort); + if(n <= 0) + { + return ; + } + for(i=0; id_name, ".") == 0) || (strcmp(namelist[i]->d_name, "..") == 0) || (namelist[i]->d_type==DT_DIR)) + { + continue; + } + if(strlen(namelist[i]->d_name) > 42) + { + continue; + } + if(sscanf(namelist[i]->d_name,"%[a-zA-Z]_config_index.%ld", update_str, &config_seq) != 2) + { + continue; + } + if(version_higher <= config_seq) + { + continue; + } + snprintf(index_path, 256, "%s/%s", file_dir, namelist[i]->d_name); + if(recursively) + { + remove_configs_of_index_file(index_path); + } + remove(index_path); + MESA_RUNTIME_LOGV4(logger, RLOG_LV_INFO, "config file %s removed initiatively.", index_path); + } +} + +/*输出从小到大排序,返回值:填充了多少个*/ +u_int32_t get_full_topN_max_versions(const char *file_dir, int64_t *ver_array, int32_t maxsize) +{ + struct dirent **namelist; + int64_t config_seq, tmpval; + int32_t curnum=0, i, j, n=0; + char update_str[32]; + + n = my_scandir(file_dir, &namelist, filter_fn, (int (*)(const void*, const void*))alphasort); + if(maxsize<=0 || n <= 0) + { + return 0; + } + for(i=0; id_name, ".") == 0) || (strcmp(namelist[i]->d_name, "..") == 0) || (namelist[i]->d_type==DT_DIR)) + { + continue; + } + if(strlen(namelist[i]->d_name) > 42) + { + continue; + } + if(sscanf(namelist[i]->d_name,"%[a-zA-Z]_config_index.%ld", update_str, &config_seq) != 2) + { + continue; + } + if(strncasecmp(update_str, "full", strlen(update_str))) + { + continue; + } + + if(ver_array[0] < config_seq) + { + ver_array[0] = config_seq; + for(j=1; j ver_array[j]); j++) + { + tmpval = ver_array[j-1]; + ver_array[j-1] = ver_array[j]; + ver_array[j] = tmpval; + } + if(curnum < maxsize) curnum++; + } + } + return curnum; +} + enum DORIS_UPDATE_TYPE get_new_idx_path(long long current_version, const char *file_dir, void *logger, struct index_path_array **idx_path, int *idx_num) { struct dirent **namelist; @@ -116,7 +238,7 @@ enum DORIS_UPDATE_TYPE get_new_idx_path(long long current_version, const char *f inc_idx_num=0; for(i=0;id_name, ".") == 0) || (strcmp(namelist[i]->d_name, "..") == 0)) + if((strcmp(namelist[i]->d_name, ".") == 0) || (strcmp(namelist[i]->d_name, "..") == 0) || (namelist[i]->d_type==DT_DIR)) { continue; } diff --git a/server/doris_server_scandir.h b/server/doris_server_scandir.h index c9e68eb..4526eff 100644 --- a/server/doris_server_scandir.h +++ b/server/doris_server_scandir.h @@ -51,5 +51,8 @@ struct doris_idxfile_scanner *doris_index_file_scanner(int64_t start_version); enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *scanner, const char*idx_dir, struct doris_callbacks *doris_cbs, const char* dec_key, void* logger); +u_int32_t get_full_topN_max_versions(const char *file_dir, int64_t *ver_array, int32_t maxsize); +void remove_configs_version_smaller(const char *file_dir, int64_t version_higher, int recursively, void *logger); + #endif