diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index d9bea64..9e1d9ef 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -99,16 +99,18 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance) instance->array_index++; sub = cJSON_GetObjectItem(cur_a_item, "tablename"); - instance->curmeta.table_name = sub->valuestring; + instance->curmeta.meta.tablename = sub->valuestring; + sub = cJSON_GetObjectItem(cur_a_item, "filename"); + instance->curmeta.meta.filename = sub->valuestring; sub = cJSON_GetObjectItem(cur_a_item, "size"); - instance->curmeta.size = sub->valuedouble; + instance->curmeta.meta.size = sub->valuedouble; sub = cJSON_GetObjectItem(cur_a_item, "cfg_num"); - instance->curmeta.cfg_num = sub->valueint; + instance->curmeta.meta.cfgnum = sub->valueint; sub = cJSON_GetObjectItem(cur_a_item, "user_region"); - instance->curmeta.user_region = (sub==NULL)?NULL:sub->valuestring; + instance->curmeta.meta.userregion = (sub==NULL)?NULL:sub->valuestring; if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5"))) { @@ -137,14 +139,13 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code if(res_code != 200 && res_code!=206) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d", - instance->curmeta.table_name, instance->req_version, code); + instance->curmeta.meta.tablename, instance->req_version, code); return; } instance->retry_times = 0; if(instance->curmeta.curoffset == 0) { - instance->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->curmeta.size, - instance->curmeta.cfg_num, instance->curmeta.user_region, instance->cbs.userdata); + instance->cbs.cfgfile_start(instance, &instance->curmeta.meta, NULL, instance->cbs.userdata); MD5_Init(&instance->ctx.md5ctx); } } @@ -170,7 +171,7 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code memcpy(buffer, start+13, bytes-13); buffer[bytes-13] = '\0'; ret = sscanf(buffer, "%*[^0-9]%lu-%lu/%lu", &instance->ctx.contl_start, &instance->ctx.contl_end, &instance->ctx.contl_total); - assert(ret == 3 && instance->ctx.contl_total == instance->curmeta.size && instance->ctx.contl_start==instance->curmeta.curoffset); + assert(ret == 3 && instance->ctx.contl_total == instance->curmeta.meta.size && instance->ctx.contl_start==instance->curmeta.curoffset); } break; default: break; @@ -202,7 +203,7 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo if(res!=CURLE_OK) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", - instance->curmeta.table_name, instance->req_version, res_code, err); + instance->curmeta.meta.tablename, instance->req_version, res_code, err); goto out_error; } @@ -214,28 +215,28 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo if(instance->ctx.contl_total != 0) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", - instance->curmeta.table_name, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); + instance->curmeta.meta.tablename, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", - instance->curmeta.table_name, instance->req_version, instance->ctx.contlength, instance->curmeta.size); + instance->curmeta.meta.tablename, instance->req_version, instance->ctx.contlength, instance->curmeta.meta.size); } instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1; - if(instance->curmeta.curoffset >= instance->curmeta.size) //该文件下载完毕 + if(instance->curmeta.curoffset >= instance->curmeta.meta.size) //该文件下载完毕 { doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64); if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer)) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s", - instance->curmeta.table_name, instance->req_version, md5buffer, instance->curmeta.md5str); + instance->curmeta.meta.tablename, instance->req_version, md5buffer, instance->curmeta.md5str); direct_fail=true;goto out_md5; } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.%010lu over, md5: %s", - instance->curmeta.table_name, instance->req_version, md5buffer); + instance->curmeta.meta.tablename, instance->req_version, md5buffer); } instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; instance->cbs.cfgfile_finish(instance, md5buffer, instance->cbs.userdata); @@ -296,13 +297,13 @@ void doris_http_fetch_confile(struct doris_instance *instance) doris_http_ctx_reset(instance->ctx.httpctx, &curlcbs); //超大文件分段下载;上次未完成的继续下载 - if((instance->curmeta.size > instance->param->fetch_frag_size) || instance->curmeta.curoffset!=0) + if((instance->curmeta.meta.size > instance->param->fetch_frag_size) || instance->curmeta.curoffset!=0) { sprintf(range, "Range: bytes=%lu-%lu", instance->curmeta.curoffset, instance->curmeta.curoffset + instance->param->fetch_frag_size - 1); doris_http_ctx_add_header(instance->ctx.httpctx, range); } - snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.table_name, instance->req_version, instance->args.bizname); + snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.meta.tablename, instance->req_version, instance->args.bizname); if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; @@ -312,7 +313,7 @@ void doris_http_fetch_confile(struct doris_instance *instance) { instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Launch confile %s GET, req_version=%lu, %s", - instance->curmeta.table_name, instance->req_version, range); + instance->curmeta.meta.tablename, instance->req_version, range); } } @@ -351,6 +352,7 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void { struct doris_instance *instance = (struct doris_instance *)userp; cJSON *sub; + int64_t new_version; if(res!=CURLE_OK) { @@ -372,7 +374,16 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void goto out_error; } sub = cJSON_GetObjectItem(instance->meta, "version"); - instance->new_version = sub->valuedouble; + new_version = sub->valuedouble; + if(new_version <= instance->cur_version) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s", + instance->ctx.server, instance->cur_version, instance->estr.buff); + cJSON_Delete(instance->meta); + instance->meta = NULL; + goto out_error; + } + instance->new_version = new_version; instance->req_version = instance->new_version; instance->statistic.field[DRS_FS_FILED_RES_META] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, server: %s, cur_version=%lu, newjson: %s", diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 358f5be..58376ba 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -46,13 +46,10 @@ struct doris_parameter struct fetch_file_meta { - const char *table_name; - size_t size; + struct tablemeta meta; size_t curoffset; - u_int32_t cfg_num; u_int32_t validate_md5; char md5str[36]; - const char *user_region; }; struct md5_long diff --git a/include/doris_client.h b/include/doris_client.h index 2d39e1f..08f3884 100644 --- a/include/doris_client.h +++ b/include/doris_client.h @@ -50,12 +50,21 @@ struct doris_arguments int32_t judian_id; }; +struct tablemeta +{ + const char *tablename; //表名,行列式maat适配;非行列式为文件名 + const char *filename; //落地时要求的文件名 + const char *userregion; + size_t size; + u_int32_t cfgnum; +}; + struct doris_instance; struct doris_callbacks { void *userdata; void (*version_start)(struct doris_instance *instance, cJSON *meta, void *userdata); //meta在整个版本声明周期内都有效 - void (*cfgfile_start)(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, const char *userregion, void *userdata); + void (*cfgfile_start)(struct doris_instance *instance, const struct tablemeta *meta, const char *unused, void *userdata); void (*cfgfile_update)(struct doris_instance *instance, const char *data, size_t len, void *userdata); void (*cfgfile_finish)(struct doris_instance *instance, const char *md5, void *userdata); void (*version_error)(struct doris_instance *instance, void *userdata); //下载文件失败,该版本需要回滚 diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index 0a969b0..f68e4fc 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -17,7 +17,7 @@ #include "doris_server_http.h" struct doris_global_info g_doris_server_info; -static unsigned long doris_vesion_20210726=20210726L; +static unsigned long doris_vesion_20210803=20210803L; int doris_mkdir_according_path(const char * path) { @@ -327,7 +327,7 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210726); + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210803); if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) { printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager); diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 63f49f6..c335d89 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -181,8 +181,8 @@ void doris_config_file_version_error(struct doris_instance *instance, void *user MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version); } -void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, - size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) +void doris_config_file_cfgfile_start(struct doris_instance *instance, + const struct tablemeta *meta, const char *localpath, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; struct tm *localtm, savetime; @@ -198,14 +198,14 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char { doris_mkdir_according_path(dir); } - snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version); + snprintf(save->cfg_file_path, 256, "%s/%s", dir, meta->filename); if(g_doris_server_info.idx_file_maat) //MAAT格式的通知文件 { - fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path); + fprintf(save->fp_idx_file, "%s\t%u\t%s\n", meta->tablename, meta->cfgnum, save->cfg_file_path); } else //转发角色保留用户自定义信息 { - fprintf(save->fp_idx_file, "%s\t%u\t%s\t%s\n", tablename, cfgnum, save->cfg_file_path, userregion); + fprintf(save->fp_idx_file, "%s\t%u\t%s\t%s\n", meta->tablename, meta->cfgnum, save->cfg_file_path, meta->userregion); } if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+"))) { @@ -306,25 +306,28 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd save->cur_vernode = NULL; } -void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename, - size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) +void doris_config_mem_cfgfile_start(struct doris_instance *instance, + const struct tablemeta *meta, const char *localpath, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; save->cur_vernode->table_meta = cJSON_CreateObject(); - cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", tablename); - cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", cfgnum); - cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", size); - if(userregion != NULL) + cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", meta->tablename); + cJSON_AddStringToObject(save->cur_vernode->table_meta, "filename", meta->filename); + cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", meta->cfgnum); + cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", meta->size); + if(meta->userregion != NULL) { - cJSON_AddStringToObject(save->cur_vernode->table_meta, "user_region", userregion); + cJSON_AddStringToObject(save->cur_vernode->table_meta, "user_region", meta->userregion); } save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node)); - snprintf(save->cur_table->tablename, 64, "%s", tablename); - save->cur_table->filesize = size; + save->cur_table->filesize = meta->size; + snprintf(save->cur_table->tablename, 64, "%s", meta->tablename); + snprintf(save->cur_table->localpath, 256, "%s", localpath); TAILQ_INIT(&save->cur_table->frag_head); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...", save->business->bizname, tablename, save->version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...", + save->business->bizname, meta->tablename, save->version); } void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata) @@ -476,13 +479,13 @@ void doris_config_localmem_version_error(struct doris_instance *instance, void * } } -void doris_config_localmem_cfgfile_start(struct doris_instance *instance, const char *tablename, - size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) +void doris_config_localmem_cfgfile_start(struct doris_instance *instance, + const struct tablemeta *meta, const char *localpath, void *userdata) { - doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum); + doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum); if(g_doris_server_info.server_role_sw) { - doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata); + doris_config_mem_cfgfile_start(instance, meta, localpath, userdata); } } @@ -549,19 +552,19 @@ void doris_config_version_error(struct doris_instance *instance, void *userdata) } } -void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, - size_t size, u_int32_t cfgnum, const char *userregion, void *userdata) +void doris_config_cfgfile_start(struct doris_instance *instance, + const struct tablemeta *meta, const char *localpath, void *userdata) { struct confile_save *save=(struct confile_save *)userdata; - doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum); + doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum); if(save->business->write_file_sw) { - doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata); + doris_config_file_cfgfile_start(instance, meta, localpath, userdata); } if(g_doris_server_info.server_role_sw) { - doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata); + doris_config_mem_cfgfile_start(instance, meta, save->cfg_file_path, userdata); } } diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h index 457d708..8926f25 100644 --- a/server/doris_server_receive.h +++ b/server/doris_server_receive.h @@ -58,6 +58,7 @@ struct cont_frag_node struct table_list_node { char tablename[64]; + char localpath[256]; size_t filesize; size_t cur_totallen; diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp index 2863369..85c276e 100644 --- a/server/doris_server_scandir.cpp +++ b/server/doris_server_scandir.cpp @@ -231,7 +231,8 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab FILE *fp; size_t readlen, remainlen, oncesize; MD5_CTX md5ctx; - char md5buffer[64], *user_region=NULL; + char md5buffer[64], *filename; + struct tablemeta meta; if((fp = fopen(table->cfg_path, "r")) == NULL) { @@ -240,11 +241,14 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab } MD5_Init(&md5ctx); - if(table->user_region[0] != '\0') - { - user_region = table->user_region; - } - doris_cbs->cfgfile_start(NULL, table->table_name, table->filesize, table->cfg_num, user_region, doris_cbs->userdata); + meta.tablename = table->table_name; + filename = strrchr(table->cfg_path, '/'); + meta.filename = (filename!=NULL)?(filename + 1):table->cfg_path; + meta.userregion = (table->user_region[0] != '\0')?table->user_region:NULL; + meta.size = table->filesize; + meta.cfgnum = table->cfg_num; + doris_cbs->cfgfile_start(NULL, &meta, table->cfg_path, doris_cbs->userdata); + remainlen = table->filesize; while(remainlen > 0) {