From b32381f7f54a7e6ee51b997886bdfa6dc7f08a07 Mon Sep 17 00:00:00 2001 From: "linuxrc@163.com" Date: Tue, 27 Jul 2021 16:25:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=96=87=E4=BB=B6=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=94=A8=E6=88=B7=E8=87=AA=E5=AE=9A=E4=B9=89=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/doris_client_fetch.cpp | 44 +++++++++++++++++++------------- client/doris_client_fetch.h | 2 ++ client/doris_client_transfer.cpp | 4 ++- client/doris_client_transfer.h | 2 +- client/nirvana_conhash.cpp | 6 ++--- include/doris_client.h | 2 +- server/doris_server_main.cpp | 5 ++-- server/doris_server_main.h | 1 + server/doris_server_receive.cpp | 33 ++++++++++++++++-------- server/doris_server_scandir.cpp | 14 +++++++--- server/doris_server_scandir.h | 3 ++- 11 files changed, 76 insertions(+), 40 deletions(-) diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index 85f6857..d9bea64 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -60,9 +60,12 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len void doris_confile_ctx_reset(struct doris_confile_ctx *ctx) { - struct doris_http_ctx *httpctx=ctx->httpctx; - memset(ctx, 0, sizeof(struct doris_confile_ctx)); - ctx->httpctx = httpctx; + //其他保持不变 + ctx->res_code = 0; + ctx->contlength = 0; + ctx->contl_start = 0; + ctx->contl_end = 0; + ctx->contl_total = 0; } void doris_confile_ctx_destry(struct doris_confile_ctx *ctx) @@ -104,6 +107,9 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance) sub = cJSON_GetObjectItem(cur_a_item, "cfg_num"); instance->curmeta.cfg_num = sub->valueint; + sub = cJSON_GetObjectItem(cur_a_item, "user_region"); + instance->curmeta.user_region = (sub==NULL)?NULL:sub->valuestring; + if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5"))) { instance->curmeta.validate_md5 = 1; @@ -137,8 +143,8 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code instance->retry_times = 0; if(instance->curmeta.curoffset == 0) { - instance->cbs.cfgfile_start(instance, instance->curmeta.table_name, - instance->curmeta.size, instance->curmeta.cfg_num, instance->cbs.userdata); + instance->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->curmeta.size, + instance->curmeta.cfg_num, instance->curmeta.user_region, instance->cbs.userdata); MD5_Init(&instance->ctx.md5ctx); } } @@ -305,7 +311,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) else { instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s", + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Launch confile %s GET, req_version=%lu, %s", instance->curmeta.table_name, instance->req_version, range); } } @@ -324,8 +330,8 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon if(res_code != 200) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d", - instance->cur_version, instance->req_version, code); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d", + instance->ctx.server, instance->cur_version, instance->req_version, code); } } @@ -348,8 +354,8 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void if(res!=CURLE_OK) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", - instance->cur_version, instance->req_version, res_code, err); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", + instance->ctx.server, instance->cur_version, instance->req_version, res_code, err); goto out_error; } @@ -361,15 +367,16 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void instance->meta = cJSON_Parse(instance->estr.buff); if(instance->meta == NULL) { - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->req_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, server: %s, req_version=%lu, invalid json: %s", + instance->ctx.server, instance->req_version, instance->estr.buff); goto out_error; } sub = cJSON_GetObjectItem(instance->meta, "version"); instance->new_version = sub->valuedouble; instance->req_version = instance->new_version; instance->statistic.field[DRS_FS_FILED_RES_META] += 1; - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s", - instance->cur_version, instance->estr.buff); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, server: %s, cur_version=%lu, newjson: %s", + instance->ctx.server, instance->cur_version, instance->estr.buff); instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata); instance->array = cJSON_GetObjectItem(instance->meta, "configs"); @@ -395,7 +402,8 @@ static void doris_http_fetch_meta(struct doris_instance *instance) struct doris_http_callback curlcbs; char metauri[128]; - balance_seed = (((u_int64_t)rand()) << 32) | rand(); + balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | + (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); memset(&curlcbs, 0, sizeof(struct doris_http_callback)); curlcbs.header_cb = doris_http_meta_header_cb; @@ -405,16 +413,16 @@ static void doris_http_fetch_meta(struct doris_instance *instance) instance->array_index = 0; instance->cur_httpins = instance->httpins_master; - instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL) { instance->cur_httpins = instance->httpins_backup1; - instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); } if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL) { instance->cur_httpins = instance->httpins_backup2; - instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); } instance->req_version = instance->cur_version + 1; //只有版本更新成功后,cur_version才会更新 @@ -616,7 +624,7 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct { return NULL; } - srand(time(NULL)); + srand((int64_t)param); if(param->param_backup1 != NULL) { diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 66afbdf..358f5be 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -52,6 +52,7 @@ struct fetch_file_meta u_int32_t cfg_num; u_int32_t validate_md5; char md5str[36]; + const char *user_region; }; struct md5_long @@ -68,6 +69,7 @@ union doris_md5 struct doris_confile_ctx { struct doris_http_ctx *httpctx; + char server[64]; MD5_CTX md5ctx; long res_code; diff --git a/client/doris_client_transfer.cpp b/client/doris_client_transfer.cpp index 3850461..d26bba2 100644 --- a/client/doris_client_transfer.cpp +++ b/client/doris_client_transfer.cpp @@ -49,7 +49,8 @@ void doris_http_ctx_destroy(struct doris_http_ctx *ctx) free(ctx); } -struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, struct doris_http_callback *cb, u_int64_t balance_seed) +struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, + struct doris_http_callback *cb, u_int64_t balance_seed, char *host, int32_t size) { struct doris_http_ctx *ctx; struct doris_curl_multihd *multidata; @@ -61,6 +62,7 @@ struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, } assert(instance->server_hosts->find(result.bucket_id) != instance->server_hosts->end()); multidata = instance->server_hosts->find(result.bucket_id)->second; + snprintf(host, size, multidata->host->srvaddr); ctx = (struct doris_http_ctx *)calloc(1, sizeof(struct doris_http_ctx)); ctx->instance = instance; diff --git a/client/doris_client_transfer.h b/client/doris_client_transfer.h index 6d1a9f5..8cfa4d2 100644 --- a/client/doris_client_transfer.h +++ b/client/doris_client_transfer.h @@ -44,7 +44,7 @@ struct doris_curl_multihd }; struct doris_curl_multihd *doris_initialize_multihd_for_host(struct doris_http_instance *instance, struct dst_host_cnn_balance *host); -struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, struct doris_http_callback *cb, u_int64_t balance_seed); +struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, struct doris_http_callback *cb, u_int64_t balance_seed, char *host, int32_t size); void doris_http_ctx_destroy(struct doris_http_ctx *ctx); void doris_http_ctx_reset(struct doris_http_ctx *ctx, struct doris_http_callback *cb); diff --git a/client/nirvana_conhash.cpp b/client/nirvana_conhash.cpp index 99674fd..d687242 100644 --- a/client/nirvana_conhash.cpp +++ b/client/nirvana_conhash.cpp @@ -145,7 +145,7 @@ static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_i u_int32_t hash, i=0; seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id; - hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673); + hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193); x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; while(i != cur_point_index) @@ -155,7 +155,7 @@ static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_i if(x == inner_bucket->point_array[i].point_val) //冲突 { seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; - hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673); + hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193); x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; i = 0; break; @@ -434,7 +434,7 @@ enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const voi return CONHASH_NO_VALID_BUCKETS; } - hash = MurmurHash64A(key, len, 23068673); + hash = MurmurHash64A(key, len, 515880193); idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val)); ch->point_array[idx].hit_cnt++; bucket_index = ch->point_array[idx].bucket_index; diff --git a/include/doris_client.h b/include/doris_client.h index 99b2a3e..2d39e1f 100644 --- a/include/doris_client.h +++ b/include/doris_client.h @@ -55,7 +55,7 @@ struct doris_callbacks { void *userdata; void (*version_start)(struct doris_instance *instance, cJSON *meta, void *userdata); //meta在整个版本声明周期内都有效 - void (*cfgfile_start)(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata); + void (*cfgfile_start)(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, const char *userregion, void *userdata); void (*cfgfile_update)(struct doris_instance *instance, const char *data, size_t len, void *userdata); void (*cfgfile_finish)(struct doris_instance *instance, const char *md5, void *userdata); void (*version_error)(struct doris_instance *instance, void *userdata); //下载文件失败,该版本需要回滚 diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index 64fa952..0a969b0 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -17,7 +17,7 @@ #include "doris_server_http.h" struct doris_global_info g_doris_server_info; -static unsigned long doris_vesion_20210722=20210722L; +static unsigned long doris_vesion_20210726=20210726L; int doris_mkdir_according_path(const char * path) { @@ -101,6 +101,7 @@ int32_t doris_read_profile_configs(const char *config_file) } MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1); + MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "index_file_format_maat", &g_doris_server_info.idx_file_maat, 0); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0); @@ -326,7 +327,7 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210722); + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210726); if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) { printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager); diff --git a/server/doris_server_main.h b/server/doris_server_main.h index 5678097..da83dfa 100644 --- a/server/doris_server_main.h +++ b/server/doris_server_main.h @@ -81,6 +81,7 @@ struct doris_global_info struct doris_business business[MAX_BUSINESS_NUM]; u_int32_t business_num; + u_int32_t idx_file_maat; map *name2business; map *confile2param; diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 594a6a1..63f49f6 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -181,7 +181,8 @@ void doris_config_file_version_error(struct doris_instance *instance, void *user MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version); } -void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) +void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, + size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; struct tm *localtm, savetime; @@ -198,8 +199,14 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char doris_mkdir_according_path(dir); } snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version); - fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path); - + if(g_doris_server_info.idx_file_maat) //MAAT格式的通知文件 + { + fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path); + } + else //转发角色保留用户自定义信息 + { + fprintf(save->fp_idx_file, "%s\t%u\t%s\t%s\n", tablename, cfgnum, save->cfg_file_path, userregion); + } if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+"))) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno)); @@ -299,7 +306,8 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd save->cur_vernode = NULL; } -void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) +void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename, + size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; @@ -307,7 +315,10 @@ void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", tablename); cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", cfgnum); cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", size); - + if(userregion != NULL) + { + cJSON_AddStringToObject(save->cur_vernode->table_meta, "user_region", userregion); + } save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node)); snprintf(save->cur_table->tablename, 64, "%s", tablename); save->cur_table->filesize = size; @@ -465,12 +476,13 @@ void doris_config_localmem_version_error(struct doris_instance *instance, void * } } -void doris_config_localmem_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) +void doris_config_localmem_cfgfile_start(struct doris_instance *instance, const char *tablename, + size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) { doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum); if(g_doris_server_info.server_role_sw) { - doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userdata); + doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata); } } @@ -537,18 +549,19 @@ void doris_config_version_error(struct doris_instance *instance, void *userdata) } } -void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) +void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, + size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum); if(save->business->write_file_sw) { - doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userdata); + doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata); } if(g_doris_server_info.server_role_sw) { - doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userdata); + doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata); } } diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp index 2ac8696..2863369 100644 --- a/server/doris_server_scandir.cpp +++ b/server/doris_server_scandir.cpp @@ -201,7 +201,7 @@ int cm_read_cfg_index_file(const char* path, struct cfg_table_info* idx/*OUT*/, { memset(line, 0, sizeof(line)); fgets(line, sizeof(line), fp); - ret=sscanf(line,"%s\t%d\t%s\t%s", idx[i].table_name, &(idx[i].cfg_num), idx[i].cfg_path, idx[i].encryp_algorithm); + ret=sscanf(line,"%[^ \t]%*[ \t]%d%*[ \t]%s%*[ \t]%s", idx[i].table_name, &(idx[i].cfg_num), idx[i].cfg_path, idx[i].user_region); if((ret!=3 && ret!=4) || idx[i].cfg_num==0)//jump over empty line { continue; @@ -231,7 +231,7 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab FILE *fp; size_t readlen, remainlen, oncesize; MD5_CTX md5ctx; - char md5buffer[64]; + char md5buffer[64], *user_region=NULL; if((fp = fopen(table->cfg_path, "r")) == NULL) { @@ -240,7 +240,11 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab } MD5_Init(&md5ctx); - doris_cbs->cfgfile_start(NULL, table->table_name, table->filesize, table->cfg_num, doris_cbs->userdata); + if(table->user_region[0] != '\0') + { + user_region = table->user_region; + } + doris_cbs->cfgfile_start(NULL, table->table_name, table->filesize, table->cfg_num, user_region, doris_cbs->userdata); remainlen = table->filesize; while(remainlen > 0) { @@ -273,6 +277,10 @@ cJSON *doris_index_version_start(int64_t version, struct cfg_table_info *table_a cJSON_AddStringToObject(tmp, "tablename", table_array[i].table_name); cJSON_AddNumberToObject(tmp, "size", table_array[i].filesize); cJSON_AddNumberToObject(tmp, "cfg_num", table_array[i].cfg_num); + if(table_array->user_region[0] != '\0') + { + cJSON_AddStringToObject(tmp, "user_region", table_array[i].user_region); + } cJSON_AddItemToArray(array, tmp); } cJSON_AddItemToObject(meta, "configs", array); diff --git a/server/doris_server_scandir.h b/server/doris_server_scandir.h index fb3fc2c..ab8d993 100644 --- a/server/doris_server_scandir.h +++ b/server/doris_server_scandir.h @@ -6,6 +6,7 @@ #define CM_MAX_TABLE_NUM 256 #define MAX_CONFIG_FN_LEN 256 #define MAX_CONFIG_LINE (1024*16) +#define MAX_USERREGION_LEN 4096 #define ONCE_BUF_SIZE 1048576 @@ -35,7 +36,7 @@ struct cfg_table_info char cfg_path[MAX_CONFIG_FN_LEN]; int cfg_num; size_t filesize; - char encryp_algorithm[MAX_CONFIG_FN_LEN]; + char user_region[MAX_USERREGION_LEN]; }; struct doris_idxfile_scanner