#include #include #include #include #include #include #include #include #include #include #include #include #include "doris_server_main.h" #include "doris_server_scandir.h" #include "doris_server_receive.h" struct scanner_timer_priv { struct doris_business *business; struct doris_callbacks doris_cbs; struct doris_arguments doris_args; struct doris_idxfile_scanner *scanner; struct event timer_scanner; }; extern struct doris_global_info g_doris_server_info; void config_frag_node_cleanup(struct cont_frag_node *fragnode) { if(fragnode == NULL) return; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_SUB, fragnode->totalsize); free(fragnode->content); free(fragnode); } void config_table_node_cleanup(struct table_list_node *table_node) { struct cont_frag_node *fragnode; if(table_node == NULL) return; while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head))) { TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node); config_frag_node_cleanup(fragnode); } config_frag_node_cleanup(table_node->cur_frag); cJSON_Delete(table_node->table_meta); free(table_node); } void config_version_node_cleanup(struct version_list_node *vernode) { struct table_list_node *tablenode; if(vernode == NULL) return; while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head))) { TAILQ_REMOVE(&vernode->table_head, tablenode, table_node); config_table_node_cleanup(tablenode); } config_table_node_cleanup(vernode->cur_table); free(vernode->metacont); cJSON_Delete(vernode->metajson); cJSON_Delete(vernode->arrayjson); cJSON_Delete(vernode->table_meta); if(vernode->business!=NULL && vernode->business->recv_way==RECV_WAY_HTTP_POST) { vernode->business->token2node->erase(string(vernode->token)); } free(vernode); } void config_version_handle_free(struct version_list_handle *version) { struct version_list_node *vernode; while(NULL != (vernode = TAILQ_FIRST(&version->version_head))) { TAILQ_REMOVE(&version->version_head, vernode, version_node); config_version_node_cleanup(vernode); } delete version->version2node; free(version); } struct version_list_handle *config_version_handle_new(void) { struct version_list_handle *handle; handle = (struct version_list_handle *)calloc(1, sizeof(struct version_list_handle)); handle->version2node = new map; TAILQ_INIT(&handle->version_head); return handle; } void config_version_node_free_content(struct version_list_node *vernode) { struct table_list_node *tablenode; struct cont_frag_node *fragnode; TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) { while(NULL != (fragnode = TAILQ_FIRST(&tablenode->frag_head))) { TAILQ_REMOVE(&tablenode->frag_head, fragnode, frag_node); config_frag_node_cleanup(fragnode); } } vernode->cont_in_disk = 1; } static void doris_common_timer_start(struct event *time_event) { struct timeval tv; tv.tv_sec = 2; tv.tv_usec = 0; evtimer_add(time_event, &tv); } static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp) { struct common_timer_event *delay_event=(struct common_timer_event *)userp; struct version_list_handle *handle = (struct version_list_handle *)delay_event->data; if(handle->references != 0) { doris_common_timer_start(&delay_event->timer_event); return; } config_version_handle_free(handle); free(delay_event); } static void cfgver_handle_delay_destroy(struct event_base *evbase, struct version_list_handle *version) { struct common_timer_event *delay_event; delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event)); delay_event->data = version; evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event); doris_common_timer_start(&delay_event->timer_event); } /*file系列函数,写本地文件*/ void doris_config_file_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; if(business->type == CFG_UPDATE_TYPE_FULL) { snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, business->version); snprintf(business->tmp_index_path, 256, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version); snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, business->version); } else { snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, business->version); snprintf(business->tmp_index_path, 256, "%s/inc/full_config_index.%010lu.ing", business->store_path_root, business->version); } if(NULL==(business->fp_idx_file = fopen(business->tmp_index_path, "w+"))) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, business->tmp_index_path, strerror(errno)); assert(0); } } void doris_config_file_version_finish(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; char tmp_index_dir[256]; fclose(business->fp_idx_file); if(rename(business->tmp_index_path, business->inc_index_path)) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", business->bizname, business->tmp_index_path, business->inc_index_path, strerror(errno)); assert(0); } if(business->type == CFG_UPDATE_TYPE_FULL) { if(link(business->inc_index_path, business->full_index_path) && errno!=EEXIST) //创建硬链接 { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", business->bizname, business->tmp_index_path, business->inc_index_path, strerror(errno)); assert(0); } if(business->saves_when_fulldel > 0) { for(u_int32_t i=1; isaves_when_fulldel; i++) { business->full_version_inc[i-1] = business->full_version_inc[i]; //递增排序,将新的版本放入最大值 } business->full_version_inc[business->saves_when_fulldel-1] = business->version; snprintf(tmp_index_dir, 256, "%s/full/index", business->store_path_root); remove_configs_version_smaller(tmp_index_dir, business->full_version_inc[0], 0, g_doris_server_info.log_runtime); snprintf(tmp_index_dir, 256, "%s/inc/index", business->store_path_root); remove_configs_version_smaller(tmp_index_dir, business->full_version_inc[0], 1, g_doris_server_info.log_runtime); } } MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", business->bizname, business->version, business->inc_index_path); } void doris_config_file_version_error(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; if(business->fp_idx_file != NULL) { fclose(business->fp_idx_file); remove(business->tmp_index_path); } if(business->fp_cfg_file != NULL) { fclose(business->fp_cfg_file); remove(business->cfg_file_path); } MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version); } void doris_config_file_cfgfile_start(struct doris_csum_instance *instance, const struct tablemeta *meta, const char *localpath, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct tm *localtm, savetime; time_t now; const char *type; char dir[256]; type = (business->type == CFG_UPDATE_TYPE_FULL)?"full":"inc"; now = time(NULL); localtm = localtime_r(&now, &savetime); snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday); if(access(dir, F_OK)) { doris_mkdir_according_path(dir); } snprintf(business->cfg_file_path, 256, "%s/%s", dir, meta->filename); if(g_doris_server_info.idx_file_maat || meta->userregion==NULL) //MAAT格式的通知文件 { fprintf(business->fp_idx_file, "%s\t%u\t%s\n", meta->tablename, meta->cfgnum, business->cfg_file_path); } else //转发角色保留用户自定义信息 { fprintf(business->fp_idx_file, "%s\t%u\t%s\t%s\n", meta->tablename, meta->cfgnum, business->cfg_file_path, meta->userregion); } if(NULL == (business->fp_cfg_file = fopen(business->cfg_file_path, "w+"))) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, business->cfg_file_path, strerror(errno)); assert(0); } else { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", business->bizname, business->cfg_file_path); } } void doris_config_file_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; size_t writen_len; writen_len = fwrite(data, 1, len, business->fp_cfg_file); if(writen_len != len) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", business->bizname, business->cfg_file_path, strerror(errno)); assert(0); } } void doris_config_file_cfgfile_finish(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; fclose(business->fp_cfg_file); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", business->bizname, business->cfg_file_path); } /*mem系列函数,加载入内存*/ void doris_config_mem_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; business->cur_vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node)); TAILQ_INIT(&business->cur_vernode->table_head); business->cur_vernode->metajson = cJSON_CreateObject(); business->cur_vernode->arrayjson= cJSON_CreateArray(); business->cur_vernode->version = business->version; cJSON_AddNumberToObject(business->cur_vernode->metajson, "version", business->cur_vernode->version); business->cur_vernode->cfg_type = business->type; cJSON_AddNumberToObject(business->cur_vernode->metajson, "type", business->cur_vernode->cfg_type); } void doris_config_mem_version_finish(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct version_list_handle *tmplist; struct version_list_handle *cfgver_handle; cJSON_AddItemToObject(business->cur_vernode->metajson, "configs", business->cur_vernode->arrayjson); business->cur_vernode->arrayjson = NULL; business->cur_vernode->metacont = cJSON_PrintUnformatted(business->cur_vernode->metajson); business->cur_vernode->metalen = strlen(business->cur_vernode->metacont); cJSON_Delete(business->cur_vernode->metajson); business->cur_vernode->metajson = NULL; MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", business->bizname, business->version, business->cur_vernode->metacont); if(business->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && business->cfgver_head->latest_version!=0) { cfgver_handle = config_version_handle_new(); cfgver_handle->latest_version = business->cur_vernode->version; cfgver_handle->version_num = 1; TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node); cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); cfgver_handle->version2node->insert(make_pair(cfgver_handle->latest_version, business->cur_vernode)); pthread_rwlock_wrlock(&business->rwlock); tmplist = business->cfgver_head; business->cfgver_head = cfgver_handle; pthread_rwlock_unlock(&business->rwlock); cfgver_handle_delay_destroy(business->worker_evbase, tmplist); } else { pthread_rwlock_wrlock(&business->rwlock); cfgver_handle = business->cfgver_head; TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node); cfgver_handle->latest_version = business->cur_vernode->version; cfgver_handle->version2node->insert(make_pair(business->cur_vernode->version, business->cur_vernode)); if(cfgver_handle->oldest_vernode == NULL) { cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); } /*配置文件内容最多缓存N个版本,元信息全保留*/ if(business->cache_max_versions!=0 && cfgver_handle->version_num>=business->cache_max_versions) { config_version_node_free_content(cfgver_handle->oldest_vernode); cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node); } else { cfgver_handle->version_num += 1; } pthread_rwlock_unlock(&business->rwlock); } business->cur_vernode = NULL; } void doris_config_mem_version_error(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; if(business->cur_vernode != NULL) { config_version_node_cleanup(business->cur_vernode); } business->cur_vernode = NULL; } void doris_config_mem_cfgfile_start(struct doris_csum_instance *instance, const struct tablemeta *meta, const char *localpath, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct table_list_node *cur_table; business->cur_vernode->table_meta = cJSON_CreateObject(); cJSON_AddStringToObject(business->cur_vernode->table_meta, "tablename", meta->tablename); cJSON_AddStringToObject(business->cur_vernode->table_meta, "filename", meta->filename); cJSON_AddNumberToObject(business->cur_vernode->table_meta, "cfg_num", meta->cfgnum); cJSON_AddNumberToObject(business->cur_vernode->table_meta, "size", meta->size); if(meta->userregion != NULL) { cJSON_AddStringToObject(business->cur_vernode->table_meta, "user_region", meta->userregion); } cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node)); cur_table->filesize = meta->size; snprintf(cur_table->tablename, 64, "%s", meta->tablename); snprintf(cur_table->localpath, 256, "%s", localpath); TAILQ_INIT(&cur_table->frag_head); business->cur_vernode->cur_table = cur_table; MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...", business->bizname, meta->tablename, business->version); } void doris_config_mem_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct table_list_node *cur_table; size_t cache_len, offset=0; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_ADD, len); cur_table = business->cur_vernode->cur_table; while(len > 0) { if(cur_table->cur_frag == NULL) { cur_table->cur_frag = (struct cont_frag_node *)calloc(1, sizeof(struct cont_frag_node)); cur_table->cur_frag->start = cur_table->cur_totallen; cur_table->cur_frag->totalsize = cur_table->filesize - cur_table->cur_totallen; if(cur_table->filesize==0 || cur_table->cur_frag->totalsize > g_doris_server_info.cache_frag_size) { cur_table->cur_frag->totalsize = g_doris_server_info.cache_frag_size; } cur_table->cur_frag->end = cur_table->cur_frag->start + cur_table->cur_frag->totalsize - 1; cur_table->cur_frag->content = (char *)malloc(cur_table->cur_frag->totalsize); } if(cur_table->cur_frag->totalsize > cur_table->cur_frag->cur_fraglen + len) { memcpy(cur_table->cur_frag->content+cur_table->cur_frag->cur_fraglen, data+offset, len); cur_table->cur_frag->cur_fraglen += len; cur_table->cur_totallen += len; offset += len; len = 0; } else { cache_len = cur_table->cur_frag->totalsize - cur_table->cur_frag->cur_fraglen; memcpy(cur_table->cur_frag->content+cur_table->cur_frag->cur_fraglen, data+offset, cache_len); cur_table->cur_frag->cur_fraglen += cache_len; cur_table->cur_totallen += cache_len; offset += cache_len; len -= cache_len; TAILQ_INSERT_TAIL(&cur_table->frag_head, cur_table->cur_frag, frag_node); assert(cur_table->cur_frag->cur_fraglen == cur_table->cur_frag->end - cur_table->cur_frag->start + 1); cur_table->cur_frag = NULL; } } assert(cur_table->cur_totallen <= cur_table->filesize || cur_table->filesize==0); } void doris_config_mem_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct table_list_node *cur_table; cJSON_AddStringToObject(business->cur_vernode->table_meta, "md5", md5); cJSON_AddItemToArray(business->cur_vernode->arrayjson, business->cur_vernode->table_meta); business->cur_vernode->table_meta = NULL; cur_table = business->cur_vernode->cur_table; if(cur_table->cur_frag != NULL) { TAILQ_INSERT_TAIL(&cur_table->frag_head, cur_table->cur_frag, frag_node); assert(cur_table->cur_frag->cur_fraglen == cur_table->cur_frag->end - cur_table->cur_frag->start + 1); cur_table->cur_frag = NULL; } assert(cur_table->cur_totallen == cur_table->filesize); TAILQ_INSERT_TAIL(&business->cur_vernode->table_head, cur_table, table_node); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", business->bizname, cur_table->tablename, business->version); business->cur_vernode->cur_table = NULL; } /*common系列函数,载入配置时公共操作*/ void doris_config_common_version_start(struct doris_business *business, cJSON *meta) { cJSON *sub; sub = cJSON_GetObjectItem(meta, "version"); business->version = sub->valuedouble; sub = cJSON_GetObjectItem(meta, "type"); business->type = sub->valueint; assert(business->type==CFG_UPDATE_TYPE_FULL || business->type==CFG_UPDATE_TYPE_INC); business->version_cfgnum = 0; MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", business->bizname, business->version); } void doris_config_common_version_finish(struct doris_business *business) { if(business->type == CFG_UPDATE_TYPE_FULL) { business->total_cfgnum = business->version_cfgnum; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, business->version); FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, business->version_cfgnum); FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1); } else { business->total_cfgnum += business->version_cfgnum; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, business->version); FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, business->version_cfgnum); FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1); } MESA_Monitor_operation(g_doris_server_info.monitor, business->mmid_latest_ver, MONITOR_VALUE_SET, business->version); MESA_Monitor_operation(g_doris_server_info.monitor, business->mmid_total_cfgnum, MONITOR_VALUE_SET, business->total_cfgnum); MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, business->mmval_status_codeid, NULL, NULL); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", business->bizname, business->version); } void doris_config_common_version_error(struct doris_business *business) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1); //Grafana+Promethues,展示内部异常状态 MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, business->mmval_status_codeid, "Version receive error", "Receive config file error, please check producer"); } void doris_config_common_cfgfile_start(struct doris_business *business, u_int32_t cfgnum) { business->version_cfgnum += cfgnum; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1); } void doris_config_common_cfgfile_finish(struct doris_business *business) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_CMPLT_FILES], 0, FS_OP_ADD, 1); FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1); } /*localmem系列函数,启动时从本地缓存读的回调*/ void doris_config_localmem_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata) { doris_config_common_version_start((struct doris_business *)userdata, meta); if(g_doris_server_info.consumer_port) { doris_config_mem_version_start(instance, meta, userdata); } } void doris_config_localmem_version_finish(struct doris_csum_instance *instance, void *userdata) { if(g_doris_server_info.consumer_port) { doris_config_mem_version_finish(instance, userdata); } doris_config_common_version_finish((struct doris_business *)userdata); } void doris_config_localmem_version_error(struct doris_csum_instance *instance, void *userdata) { doris_config_common_version_error((struct doris_business *)userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_version_error(instance, userdata); } } void doris_config_localmem_cfgfile_start(struct doris_csum_instance *instance, const struct tablemeta *meta, const char *localpath, void *userdata) { doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum); if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_start(instance, meta, localpath, userdata); } } void doris_config_localmem_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata) { if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_update(instance, data, len, userdata); } } void doris_config_localmem_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata) { doris_config_common_cfgfile_finish((struct doris_business *)userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_finish(instance, md5, userdata); } } /*无标记系列函数,新来配置时回调*/ void doris_config_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata) { doris_config_common_version_start((struct doris_business *)userdata, meta); doris_config_file_version_start(instance, meta, userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_version_start(instance, meta, userdata); } } void doris_config_version_finish(struct doris_csum_instance *instance, void *userdata) { doris_config_file_version_finish(instance, userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_version_finish(instance, userdata); } doris_config_common_version_finish((struct doris_business *)userdata); } void doris_config_version_error(struct doris_csum_instance *instance, void *userdata) { doris_config_common_version_error((struct doris_business *)userdata); doris_config_file_version_error(instance, userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_version_error(instance, userdata); } } void doris_config_cfgfile_start(struct doris_csum_instance *instance, const struct tablemeta *meta, const char *localpath, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum); doris_config_file_cfgfile_start(instance, meta, localpath, userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_start(instance, meta, business->cfg_file_path, userdata); } } void doris_config_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata) { doris_config_file_cfgfile_update(instance, data, len, userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_update(instance, data, len, userdata); } } void doris_config_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata) { doris_config_common_cfgfile_finish((struct doris_business *)userdata); doris_config_file_cfgfile_finish(instance, userdata); if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_finish(instance, md5, userdata); } } void* thread_doris_client_recv_cfg(void *arg) { struct doris_business *business=(struct doris_business *)arg; struct event_base *client_evbase; struct doris_csum_instance *instance; struct doris_callbacks doris_cbs; struct doris_arguments doris_args; struct doris_idxfile_scanner *scanner; enum DORIS_UPDATE_TYPE update_type; char stored_path[512]; prctl(PR_SET_NAME, "client_recv"); client_evbase = event_base_new(); business->source_from = RECV_WAY_IDX_FILE; business->worker_evbase = client_evbase; scanner = doris_index_file_scanner(0); /*Retaive latest config to memory from Stored configs*/ doris_cbs.version_start = doris_config_localmem_version_start; doris_cbs.version_finish = doris_config_localmem_version_finish; doris_cbs.version_error = doris_config_localmem_version_error; doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; doris_cbs.version_updated= NULL; doris_cbs.userdata = business; snprintf(stored_path, 512, "%s/full/index", business->store_path_root); if(business->saves_when_fulldel > 0) { get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); } update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do { update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type != CFG_UPDATE_TYPE_NONE); /*Check new configs*/ doris_cbs.version_start = doris_config_version_start; doris_cbs.version_finish = doris_config_version_finish; doris_cbs.version_error = doris_config_version_error; doris_cbs.cfgfile_start = doris_config_cfgfile_start; doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; business->source_from = RECV_WAY_DRS_CLIENT; memset(&doris_args, 0, sizeof(struct doris_arguments)); doris_args.current_version = scanner->cur_version; sprintf(doris_args.bizname, "%s", business->bizname); instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) { assert(0);return NULL; } event_base_dispatch(client_evbase); printf("Libevent dispath error, should not run here.\n"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); assert(0);return NULL; } static void doris_scanner_timer_cb(int fd, short kind, void *userp) { struct scanner_timer_priv *timer_priv=(struct scanner_timer_priv *)userp; enum DORIS_UPDATE_TYPE update_type; struct timeval tv; do { update_type = doris_index_file_traverse(timer_priv->scanner, timer_priv->business->recv_path_inc, &timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type != CFG_UPDATE_TYPE_NONE); tv.tv_sec = g_doris_server_info.scan_idx_interval; tv.tv_usec = 0; evtimer_add(&timer_priv->timer_scanner, &tv); } void* thread_index_file_recv_cfg(void *arg) { struct doris_business *business=(struct doris_business *)arg; struct event_base *client_evbase; struct timeval tv; struct scanner_timer_priv timer_priv; enum DORIS_UPDATE_TYPE update_type; char stored_path[256]; prctl(PR_SET_NAME, "index_file"); memset(&timer_priv, 0, sizeof(struct scanner_timer_priv)); client_evbase = event_base_new(); business->source_from = RECV_WAY_IDX_FILE; business->worker_evbase = client_evbase; timer_priv.scanner = doris_index_file_scanner(0); timer_priv.business = business; /*Retaive latest config to memory from Stored configs*/ timer_priv.doris_cbs.version_start = doris_config_localmem_version_start; timer_priv.doris_cbs.version_finish = doris_config_localmem_version_finish; timer_priv.doris_cbs.version_error = doris_config_localmem_version_error; timer_priv.doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; timer_priv.doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; timer_priv.doris_cbs.version_updated= NULL; timer_priv.doris_cbs.userdata = business; snprintf(stored_path, 512, "%s/full/index", business->store_path_root); update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); if(business->saves_when_fulldel > 0) { get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); } snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do{ update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type!=CFG_UPDATE_TYPE_NONE); /*Check new configs*/ timer_priv.doris_cbs.version_start = doris_config_version_start; timer_priv.doris_cbs.version_finish = doris_config_version_finish; timer_priv.doris_cbs.version_error = doris_config_version_error; timer_priv.doris_cbs.cfgfile_start = doris_config_cfgfile_start; timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update; timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); if(update_type!=CFG_UPDATE_TYPE_NONE) { tv.tv_sec = 0; } else { tv.tv_sec = g_doris_server_info.scan_idx_interval; } tv.tv_usec = 0; evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv); evtimer_add(&timer_priv.timer_scanner, &tv); event_base_dispatch(client_evbase); printf("Libevent dispath error, should not run here.\n"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); return NULL; } struct bufferevent *doris_https_bufferevent_cb(struct event_base *evabse, void *arg) { SSL_CTX *ssl_instance = (SSL_CTX *)arg; return bufferevent_openssl_socket_new(evabse, -1, SSL_new(ssl_instance), BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE); } struct doris_business *lookup_bizstruct_from_name(const struct evkeyvalq *params) { map::iterator iter; const char *bizname; if(NULL == (bizname = evhttp_find_header(params, "business"))) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); return NULL; } if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end()) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); return NULL; } return iter->second; } struct version_list_node *lookup_vernode_struct_from_name(struct doris_business *business, const struct evkeyvalq *params) { map::iterator iter; const char *token; if(NULL == (token = evhttp_find_header(params, "token"))) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); return NULL; } if((iter = business->token2node->find(string(token)))==business->token2node->end()) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); return NULL; } return iter->second; } struct version_list_node *lookup_vernode_struct_from_name_renew(struct doris_business *business, const struct evkeyvalq *params) { struct version_list_node *vernode; struct timeval tv; if(NULL == (vernode = lookup_vernode_struct_from_name(business, params))) { return NULL; } if(vernode->business->concurrency_allowed) { tv.tv_sec = g_doris_server_info.post_vernode_ttl; tv.tv_usec = 0; evtimer_add(&vernode->timer_expire, &tv); } return vernode; } /*保证business之间生成的token不冲突*/ void prod_server_generate_token(struct doris_business *business, char *token/*OUT*/, size_t size) { pthread_mutex_lock(&g_doris_server_info.mutex_lock); snprintf(token, size, "%u-%lu-%u-%u", g_doris_server_info.local_ip, time(NULL), rand(), ++g_doris_server_info.token_seq); pthread_mutex_unlock(&g_doris_server_info.mutex_lock); } void business_resume_sync_peer_normal(struct doris_business *business) { u_int32_t business_post_ups; if(!g_doris_server_info.cluster_sync_mode) { return; } if(1 == atomic_set(&business->ready_to_sync, 1) || business->listener_prod==0) { return; } pthread_mutex_lock(&g_doris_server_info.mutex_lock); business_post_ups = ++g_doris_server_info.business_post_ups; pthread_mutex_unlock(&g_doris_server_info.mutex_lock); if(business_post_ups == g_doris_server_info.business_post_num) { MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_OK); } else { MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING); } assert(business_post_ups <= g_doris_server_info.business_post_num); } void business_set_sync_peer_abnormal(struct doris_business *business) { u_int32_t business_post_ups; if(!g_doris_server_info.cluster_sync_mode) { return; } MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mcluster sync error, please check slave status!!!\033[0m\n"); if(0 == atomic_set(&business->ready_to_sync, 0) || business->listener_prod==0) { return; } pthread_mutex_lock(&g_doris_server_info.mutex_lock); business_post_ups = --g_doris_server_info.business_post_ups; pthread_mutex_unlock(&g_doris_server_info.mutex_lock); if(business_post_ups == 0) { MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_DOWN); } else { MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING); } assert(business_post_ups < g_doris_server_info.business_post_num); } char *vernode_print_json_meta(struct version_list_node *vernode) { struct table_list_node *tablenode; cJSON *root, *array=NULL, *item; char *p; root = cJSON_CreateObject(); cJSON_AddStringToObject(root, "token", vernode->token); cJSON_AddNumberToObject(root, "type", vernode->cfg_type); TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) { if(array == NULL) { array = cJSON_CreateArray(); } item = cJSON_CreateObject(); cJSON_AddStringToObject(item, "tablename", tablenode->tablename); cJSON_AddNumberToObject(item, "size", tablenode->cur_totallen); cJSON_AddItemToArray(array, item); assert(tablenode->finished); //上传完毕的才能加入链表 } if(vernode->cur_table != NULL) { if(array == NULL) { array = cJSON_CreateArray(); } item = cJSON_CreateObject(); cJSON_AddStringToObject(item, "tablename", vernode->cur_table->tablename); cJSON_AddNumberToObject(item, "offset", vernode->cur_table->cur_totallen); cJSON_AddItemToArray(array, item); } cJSON_AddItemToObject(root, "configs", array); p = cJSON_PrintUnformatted(root); cJSON_Delete(root); return p; } void http_prod_server_verion_check_cb(struct evhttp_request *req, void *arg) { struct doris_business *business=(struct doris_business *)arg; struct version_list_node *vernode; struct evkeyvalq params; struct evbuffer *evbuf; char *p; if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return; } if(NULL == (vernode = lookup_vernode_struct_from_name(business, ¶ms))) { evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_NOTFOUND, "Parameter token not found"); return; } evhttp_clear_headers(¶ms); if(vernode->syncing) { evhttp_send_error(req, 310, "table syncing now, retry later"); return; } p = vernode_print_json_meta(vernode); evbuf = evbuffer_new(); evbuffer_add(evbuf, p, strlen(p)); if(vernode->version_finished) { evhttp_send_reply(req, HTTP_OK, "OK", evbuf); } else { evhttp_send_reply(req, 300, "version is posting", evbuf); } evbuffer_free(evbuf); free(p); } void http_config_direct_version_cancel(struct version_list_node *vernode, struct evhttp_request *req) { struct doris_business *business=vernode->business; struct table_list_node *tablenode; char token[64]; sprintf(token, "%s", vernode->token); if(vernode->synctx != NULL) { doris_prod_upload_ctx_destroy(vernode->synctx); } if(vernode->fp_idx_file != NULL) { fclose(vernode->fp_idx_file); remove(vernode->tmp_index_path); } if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL) { fclose(vernode->cur_table->fp_cfg_file); remove(vernode->cur_table->localpath); } TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) { remove(tablenode->localpath); } config_version_node_cleanup(vernode); if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) { evtimer_del(&vernode->timer_expire); } business->cur_vernode = NULL; //结束清空 business->posts_on_the_way--; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way); evhttp_send_reply(req, 200, "OK", NULL); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post server version cancel, token: %s", business->bizname, token); } void prod_sync_vercancel_result_cb(enum PROD_VEROP_RES result, void *userdata) { struct version_list_node *vernode=(struct version_list_node *)userdata; vernode->syncing = 0; vernode->retry_times++; switch(result) { case VERSIONOP_RES_OK: http_config_direct_version_cancel(vernode, vernode->req); break; case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "version cancel sync error res_code"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: if(atomic_read(&vernode->business->ready_to_sync) && (vernode->retry_times < 3)) { vernode->syncing = 1; doris_prod_version_cancel(vernode->synctx, prod_sync_vercancel_result_cb, vernode); } else { http_config_direct_version_cancel(vernode, vernode->req); business_set_sync_peer_abnormal(vernode->business); } break; default: assert(0);break; } } void http_prod_server_verion_cancel_cb(struct evhttp_request *req, void *arg) { struct doris_business *business=(struct doris_business *)arg; struct version_list_node *vernode; struct evkeyvalq params; if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return; } if(NULL == (vernode = lookup_vernode_struct_from_name_renew(business, ¶ms))) { evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_OK, "Parameter token not found"); //最终一致性 return; } evhttp_clear_headers(¶ms); if(vernode->version_finished) { evhttp_send_error(req, HTTP_BADREQUEST, "version already finished"); return; } if(vernode->syncing) { evhttp_send_error(req, 300, "table syncing now, retry later"); return; } if(!atomic_read(&business->ready_to_sync) || NULL!=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync")) { return http_config_direct_version_cancel(vernode, req); } vernode->retry_times = 0; vernode->syncing = 1; vernode->req = req; doris_prod_version_cancel(vernode->synctx, prod_sync_vercancel_result_cb, vernode); } void doris_config_post_version_finish(struct doris_business *business, struct version_list_node *vernode, int64_t newversion) { assert(newversion > vernode->version); vernode->version = newversion; if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL) { snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); } else { snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); } /*HTTP post时,多版本并发每个都有自己的临时通知文件名,复用本地文件的关闭函数*/ sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path); business->version = vernode->version; business->type = vernode->cfg_type; business->fp_idx_file = vernode->fp_idx_file; doris_config_file_version_finish(NULL, business); vernode->fp_idx_file = NULL; if(g_doris_server_info.consumer_port) { business->cur_vernode = vernode; cJSON_AddNumberToObject(vernode->metajson, "version", vernode->version); doris_config_mem_version_finish(NULL, business); } business->version_cfgnum = vernode->total_cfgs; doris_config_common_version_finish(business); business->cfgver_head->latest_version = vernode->version; if(vernode->synctx != NULL) { doris_prod_upload_ctx_destroy(vernode->synctx); vernode->synctx = NULL; } vernode->version_finished = 1; business->posts_on_the_way--; business->cur_vernode = NULL; //结束清空 FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way); } void http_config_direct_version_finish(struct version_list_node *vernode, struct evhttp_request *req, int64_t set_version) { struct doris_business *business=vernode->business; char version[32], token[64]; int64_t new_version; if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) { evtimer_del(&vernode->timer_expire); } if(set_version == 0) { new_version = business->cfgver_head->latest_version + 1; } else { new_version = set_version; } sprintf(token, "%s", vernode->token); doris_config_post_version_finish(business, vernode, new_version); sprintf(version, "%lu", new_version); evhttp_add_header(evhttp_request_get_output_headers(req), "X-Set-Version", version); evhttp_send_reply(req, 200, "OK", NULL); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post server version finish, token: %s, version: %lu", business->bizname, token, new_version); } void prod_sync_verend_result_cb(enum PROD_VEROP_RES result, int64_t version, void *userdata) { struct version_list_node *vernode=(struct version_list_node *)userdata; vernode->retry_times++; vernode->syncing = 0; switch(result) { case VERSIONOP_RES_OK: http_config_direct_version_finish(vernode, vernode->req, version); break; case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "version end sync error res_code"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: if(atomic_read(&vernode->business->ready_to_sync) && (vernode->retry_times < 3)) { vernode->syncing = 1; doris_prod_version_end(vernode->synctx, prod_sync_verend_result_cb, vernode); } else { http_config_direct_version_finish(vernode, vernode->req, 0); business_set_sync_peer_abnormal(vernode->business); } break; default: assert(0);break; } } void http_prod_server_verion_end_cb(struct evhttp_request *req, void *arg) { struct doris_business *business=(struct doris_business *)arg; struct version_list_node *vernode; struct evkeyvalq params; char version[32]; if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return; } if(NULL == (vernode = lookup_vernode_struct_from_name_renew(business, ¶ms))) { evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_NOTFOUND, "Parameter token invalid"); return; } evhttp_clear_headers(¶ms); if(vernode->version_finished) { sprintf(version, "%lu", vernode->version); evhttp_add_header(evhttp_request_get_output_headers(req), "X-Set-Version", version); evhttp_send_reply(req, HTTP_OK, "version already finished", NULL); //保证最终一致性 return; } if(vernode->cur_table != NULL || vernode->syncing) { evhttp_send_error(req, 300, "table not finished yet"); return; } if(!atomic_read(&business->ready_to_sync) || NULL!=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync")) { return http_config_direct_version_finish(vernode, req, 0); } if(vernode->synctx == NULL) { evhttp_send_error(req, 400, "illegal server host, cannt change server durain version life cycle"); return; } vernode->retry_times = 0; vernode->syncing = 1; vernode->req = req; doris_prod_version_end(vernode->synctx, prod_sync_verend_result_cb, vernode); } static void post_vernode_expire_destroy_cb(int fd, short kind, void *userp) { struct version_list_node *vernode=(struct version_list_node *)userp; struct table_list_node *tablenode; struct timeval tv; if(vernode->syncing) { tv.tv_sec = g_doris_server_info.post_vernode_ttl; tv.tv_usec = 0; evtimer_add(&vernode->timer_expire, &tv); return; } if(vernode->synctx != NULL) { doris_prod_upload_ctx_destroy(vernode->synctx); vernode->synctx = NULL; } if(vernode->fp_idx_file != NULL) { fclose(vernode->fp_idx_file); remove(vernode->tmp_index_path); } if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL) { fclose(vernode->cur_table->fp_cfg_file); remove(vernode->cur_table->localpath); } TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) { remove(tablenode->localpath); } vernode->business->posts_on_the_way--; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_VERSION_EXPIRES], 0, FS_OP_ADD, 1); FS_operate(g_doris_server_info.fsstat_handle, vernode->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, vernode->business->posts_on_the_way); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, token %s expires", vernode->business->bizname, vernode->token); config_version_node_cleanup(vernode); } struct version_list_node *doris_config_post_version_prepare(struct doris_business *business, int32_t cfgtype) { struct version_list_node *vernode; struct timeval tv; vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node)); vernode->business = business; vernode->cfg_type = cfgtype; if(business->concurrency_allowed) { tv.tv_sec = g_doris_server_info.post_vernode_ttl; tv.tv_usec = 0; evtimer_assign(&vernode->timer_expire, business->worker_evbase, post_vernode_expire_destroy_cb, vernode); evtimer_add(&vernode->timer_expire, &tv); } return vernode; } void doris_config_post_version_start(struct version_list_node *cur_vernode, const char *token) { struct doris_business *business=cur_vernode->business; snprintf(cur_vernode->token, 64, "%s", token); if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL) { snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); } else { snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); } if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+"))) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno)); assert(0); } if(g_doris_server_info.consumer_port) { TAILQ_INIT(&cur_vernode->table_head); cur_vernode->metajson = cJSON_CreateObject(); cur_vernode->arrayjson= cJSON_CreateArray(); cJSON_AddNumberToObject(cur_vernode->metajson, "type", cur_vernode->cfg_type); } business->token2node->insert(make_pair(string(token), cur_vernode)); } void http_post_direct_version_start(struct version_list_node *cur_vernode, struct evhttp_request *req, const char *role) { struct doris_business *business=cur_vernode->business; char token[64], *p; struct evbuffer *evbuf; cJSON *meta; prod_server_generate_token(business, token, 64); doris_config_post_version_start(cur_vernode, token); meta = cJSON_CreateObject(); cJSON_AddStringToObject(meta, "token", token); p = cJSON_PrintUnformatted(meta); cJSON_Delete(meta); evbuf = evbuffer_new(); evbuffer_add(evbuf, p, strlen(p)); evhttp_send_reply(req, 200, "OK", evbuf); evbuffer_free(evbuf); cur_vernode->req = NULL; MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post %s server send response version start: %s", business->bizname, role, p); free(p); } void try_restore_from_busy_peer(struct version_list_node *cur_vernode, const char *body, bool busy) { struct doris_business *business=cur_vernode->business; struct evbuffer *evbuf; cJSON *meta, *token; /*对端既然busy,说明它一定生成了这个token,生成token的操作是对方做;*/ if((NULL==(meta=cJSON_Parse(body))) || NULL==(token=(cJSON_GetObjectItem(meta, "token")))) { assert(0); } /*并且一定没有上传过配置,因为若对方有在途,本机post server就起不来;处理的是curl失败的情况*/ assert(NULL == cJSON_GetObjectItem(meta, "configs")); doris_config_post_version_start(cur_vernode, token->valuestring); cJSON_Delete(meta); evbuf = evbuffer_new(); evbuffer_add(evbuf, body, strlen(body)); evhttp_send_reply(cur_vernode->req, 200, "OK", evbuf); evbuffer_free(evbuf); cur_vernode->req = NULL; if(busy) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33mbusiness: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body); } else { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, post master server send response version start: %s", business->bizname, body); } } void prod_sync_verstart_result_cb(enum PROD_VERSTART_RES result, const char *body, void *userdata) { struct version_list_node *vernode=(struct version_list_node *)userdata; struct doris_business *business=vernode->business; vernode->retry_times++; vernode->syncing = 0; switch(result) { case VERSTART_RES_OK: try_restore_from_busy_peer(vernode, body, false); break; case VERSTART_RES_BUSY: //一定是由于前几次CURLE错误引起的,如rate limit try_restore_from_busy_peer(vernode, body, true); break; case VERSTART_RES_ERROR: //非法请求直接返回给Client evhttp_send_error(vernode->req, 500, "version start sync error res_code"); doris_prod_upload_ctx_destroy(vernode->synctx); free(vernode); business->cur_vernode = NULL; business->posts_on_the_way--; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname); break; case VERSTART_CURL_ERROR: if(atomic_read(&business->ready_to_sync) && (vernode->retry_times < 3)) { vernode->syncing = 1; doris_prod_version_start_with_cb(vernode->synctx, prod_sync_verstart_result_cb, vernode); } else { http_post_direct_version_start(vernode, vernode->req, "master"); business_set_sync_peer_abnormal(vernode->business); } break; default: assert(0);break; } } void concurrency_send_busy_reply(struct doris_business *business, struct evhttp_request *req) { char *p; struct evbuffer *evbuf; /*有其他正在version start过程,即同步未获得token的在途版本,此时不能接受其他请求*/ if(business->cur_vernode==NULL || business->cur_vernode->token[0]=='\0') { evhttp_send_error(req, 400, "another empty uploading busy"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "business: %s busy starting, posts-on-the-way: %d", business->bizname, business->posts_on_the_way); return; } /*有其他已完成version start操作的在途版本,已同步得到对方的token*/ p = vernode_print_json_meta(business->cur_vernode); evbuf = evbuffer_new(); evbuffer_add(evbuf, p, strlen(p)); evhttp_send_reply(req, 300, "another uploading busy", evbuf); evbuffer_free(evbuf); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s busy, posts-on-the-way: %d, reply: %s", business->bizname, business->posts_on_the_way, p); free(p); } void http_prod_server_verion_start_cb(struct evhttp_request *req, void *arg) { struct doris_business *argbiz=(struct doris_business *)arg, *business; struct evkeyvalq params; const char *type; int cfgtype; if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return; } if(NULL == (business = lookup_bizstruct_from_name(¶ms)) || business!=argbiz) { evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid"); return; } if(NULL == (type = evhttp_find_header(¶ms, "type")) || ((cfgtype=atoi(type))!=1 && cfgtype!=2)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_clear_headers(¶ms); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter type invalid"); return ; } evhttp_clear_headers(¶ms); if(!business->concurrency_allowed && business->posts_on_the_way>0) { return concurrency_send_busy_reply(business, req); } if(business->posts_on_the_way > g_doris_server_info.max_concurrent_reqs) { evhttp_send_error(req, HTTP_SERVUNAVAIL, "Too many concurrent requests, service unavailable"); return ; } /*对于不允许并发的情况,business->cur_vernode始终不变;否则会根据情况调整*/ business->cur_vernode = doris_config_post_version_prepare(business, cfgtype); business->posts_on_the_way++; business->type = cfgtype; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s receives a version start request, posts-on-the-way: %d", business->bizname, business->posts_on_the_way); if(NULL != evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync")) //内部标记主从同步请求 { return http_post_direct_version_start(business->cur_vernode, req, "slave"); } if(atomic_read(&business->ready_to_sync) && (NULL!=(business->cur_vernode->synctx=doris_prod_upload_ctx_new(business->instance, business->bizname, cfgtype)))) { business->cur_vernode->retry_times = 0; business->cur_vernode->req = req; business->cur_vernode->syncing = 1; doris_prod_version_start_with_cb(business->cur_vernode->synctx, prod_sync_verstart_result_cb, business->cur_vernode); } else { http_post_direct_version_start(business->cur_vernode, req, "master"); business_set_sync_peer_abnormal(business); } } bool upload_frag_argument_check_offset(struct evhttp_request *req, struct evkeyvalq *params, struct version_list_node *vernode, struct internal_tablemeta *tablemeta) { const char *tmparg; char *endptr=NULL, curoffset[32]; size_t length; tablemeta->islast = 0; if(NULL!=(tmparg=evhttp_find_header(params, "last")) && !strcasecmp(tmparg, "true")) { tablemeta->islast = 1; } if((length=evbuffer_get_length(evhttp_request_get_input_buffer(req))) > 0) { if(NULL == (tmparg = evhttp_find_header(params, "offset"))) { evhttp_send_error(req, 401, "Parameter offset not found"); return false; } tablemeta->offset = strtol(tmparg, &endptr, 10); if(*endptr != '\0') { evhttp_send_reply(req, 401, "Parameter offset invalid", NULL); return false; } if(vernode->cur_table == NULL) { if(tablemeta->offset != 0) { evhttp_send_reply(req, 401, "Parameter offset is not starting from 0", NULL); return false; } } else if(tablemeta->offset+length <= vernode->cur_table->cur_totallen) { evhttp_send_reply(req, 201, "Parameter offset already uploaded", NULL); return false; } else if(tablemeta->offset != vernode->cur_table->cur_totallen) { sprintf(curoffset, "%lu", vernode->cur_table->cur_totallen); evhttp_add_header(evhttp_request_get_output_headers(req), "X-Current-Offset", curoffset); evhttp_send_reply(req, 401, "Parameter offset invalid", NULL); return false; } } else if(!tablemeta->islast || vernode->cur_table==NULL) //如果是last,但是未创建表,说明未上传内容 { evhttp_send_error(req, 400, "Content length is zero, but parameter last!=true; or total length is zero, but parameter last=true"); return false; } return true; } struct version_list_node *upload_file_arguments_valid_check(struct evhttp_request *req, struct doris_business *business, struct internal_tablemeta *tablemeta, bool fragcheck) { struct evkeyvalq params; struct version_list_node *vernode; struct table_list_node *tablenode; const char *tablename, *tmparg; if(evhttp_parse_query(evhttp_request_get_uri(req), ¶ms)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); return NULL; } if(NULL==(vernode = lookup_vernode_struct_from_name_renew(business, ¶ms))) { evhttp_send_error(req, HTTP_NOTFOUND, "Parameter token invalid"); evhttp_clear_headers(¶ms); return NULL; } if(NULL == (tablename = evhttp_find_header(¶ms, "tablename"))) { evhttp_send_error(req, HTTP_BADREQUEST, "Parameter tablename invalid"); evhttp_clear_headers(¶ms); return NULL; } /*上个表未结束,不允许多张表并发上传*/ if(vernode->cur_table!=NULL && (vernode->syncing || strcmp(vernode->cur_table->tablename, tablename))) { evhttp_send_error(req, 300, "tablename busy"); evhttp_clear_headers(¶ms); return NULL; } /*finished表才会加入链表,查看是否已有该表结束,不支持重复的表名*/ tablenode = TAILQ_FIRST(&vernode->table_head); while(tablenode!=NULL && strcmp(tablename, tablenode->tablename)) { tablenode = TAILQ_NEXT(tablenode, table_node); } if(tablenode != NULL) { evhttp_send_error(req, HTTP_BADREQUEST, "tablename already finished"); evhttp_clear_headers(¶ms); return NULL; } if(fragcheck && !upload_frag_argument_check_offset(req, ¶ms, vernode, tablemeta)) { evhttp_clear_headers(¶ms); return NULL; } snprintf(tablemeta->tablename, 64, "%s", tablename); if(NULL == (tmparg = evhttp_find_header(¶ms, "filename"))) { tablemeta->filename[0] = '\0'; } else { snprintf(tablemeta->filename, 64, "%s", tmparg); } evhttp_clear_headers(¶ms); return vernode; } bool upload_frag_check_content_md5(struct evhttp_request *req, const char *content, size_t len, char *md5str, int md5size) { const char *md5; MD5_CTX ctx; if(NULL == (md5=evhttp_find_header(evhttp_request_get_input_headers(req), "Content-MD5"))) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, 402, "Content-MD5 header not found"); return false; } MD5_Init(&ctx); MD5_Update(&ctx, content, len); scandir_md5_final_string(&ctx, md5str, md5size); if(strcasecmp(md5, md5str)) { FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1); evhttp_send_error(req, 402, "Content-MD5 not match"); return false; } return true; } void doris_config_post_cfgfile_prepare(struct version_list_node *cur_vernode, struct internal_tablemeta *tablemeta, const char *md5, u_int32_t cfgnum, char *content, size_t size) { if(cur_vernode->cur_table == NULL) { cur_vernode->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node)); cur_vernode->cur_table->cfgnum = cfgnum; cur_vernode->total_cfgs += cfgnum; sprintf(cur_vernode->cur_table->tablename, "%s", tablemeta->tablename); if(tablemeta->filename[0] != '\0') //Client指定文件名 { sprintf(cur_vernode->cur_table->filename, "%s", tablemeta->filename); } else { snprintf(cur_vernode->cur_table->filename, 128, "%s.%s", tablemeta->tablename, cur_vernode->token); } MD5_Init(&cur_vernode->cur_table->md5ctx); TAILQ_INIT(&cur_vernode->cur_table->frag_head); } cur_vernode->cur_table->fragcontent = content; cur_vernode->cur_table->fragsize = size; cur_vernode->cur_table->finished = tablemeta->islast; sprintf(cur_vernode->cur_table->fragmd5, "%s", md5); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "business: %s, table %s receives a file part, offset: %lu, size: %lu", cur_vernode->business->bizname, tablemeta->tablename, tablemeta->offset, size); } void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct evhttp_request *req) { struct tablemeta meta; MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s start...", vernode->business->bizname, vernode->cur_table->filename); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1); meta.tablename = vernode->cur_table->tablename; meta.filename = vernode->cur_table->filename; meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(req), "X-User-Info"); meta.cfgnum = vernode->cur_table->cfgnum; meta.size = 0; vernode->business->type = vernode->cfg_type; vernode->business->fp_idx_file = vernode->fp_idx_file; doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business); sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path); vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file; if(g_doris_server_info.consumer_port) { vernode->cur_table->table_meta = cJSON_CreateObject(); cJSON_AddStringToObject(vernode->cur_table->table_meta, "tablename", meta.tablename); cJSON_AddStringToObject(vernode->cur_table->table_meta, "filename", meta.filename); cJSON_AddNumberToObject(vernode->cur_table->table_meta, "cfg_num", meta.cfgnum); if(meta.userregion != NULL) { cJSON_AddStringToObject(vernode->cur_table->table_meta, "user_region", meta.userregion); } } } void doris_config_post_cfgfile_finish(struct version_list_node *vernode, const char *md5str) { doris_config_common_cfgfile_finish(vernode->business); fclose(vernode->cur_table->fp_cfg_file); assert(vernode->cur_table->filesize == 0); vernode->cur_table->filesize = vernode->cur_table->cur_totallen; if(g_doris_server_info.consumer_port) { cJSON_AddNumberToObject(vernode->cur_table->table_meta, "size", vernode->cur_table->filesize); cJSON_AddStringToObject(vernode->cur_table->table_meta, "md5", md5str); cJSON_AddItemToArray(vernode->arrayjson, vernode->cur_table->table_meta); vernode->cur_table->table_meta = NULL; if(vernode->cur_table->cur_frag != NULL) { if(vernode->cur_table->cur_frag->totalsize > vernode->cur_table->cur_frag->cur_fraglen) { char *content = (char *)malloc(vernode->cur_table->cur_frag->cur_fraglen); memcpy(content, vernode->cur_table->cur_frag->content, vernode->cur_table->cur_frag->cur_fraglen); free(vernode->cur_table->cur_frag->content); vernode->cur_table->cur_frag->content = content; vernode->cur_table->cur_frag->totalsize = vernode->cur_table->cur_frag->cur_fraglen; vernode->cur_table->cur_frag->end = vernode->cur_table->filesize - 1; } TAILQ_INSERT_TAIL(&vernode->cur_table->frag_head, vernode->cur_table->cur_frag, frag_node); assert(vernode->cur_table->cur_frag->cur_fraglen == vernode->cur_table->cur_frag->end - vernode->cur_table->cur_frag->start + 1); vernode->cur_table->cur_frag = NULL; } } MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s finished", vernode->business->bizname, vernode->cur_table->filename); TAILQ_INSERT_TAIL(&vernode->table_head, vernode->cur_table, table_node); vernode->cur_table = NULL; //清空,准备下一张表 } void http_config_direct_cfgfile_update(struct version_list_node *vernode, struct evhttp_request *req) { size_t writen_len; char md5str[40]; if(vernode->cur_table->cur_totallen == 0) //start { doris_config_post_cfgfile_start(vernode, req); } if(vernode->cur_table->fragsize > 0) { writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file); if(writen_len != vernode->cur_table->fragsize) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno)); assert(0); } if(g_doris_server_info.consumer_port) { vernode->business->cur_vernode = vernode; doris_config_mem_cfgfile_update(NULL, vernode->cur_table->fragcontent, vernode->cur_table->fragsize, vernode->business); } else { vernode->cur_table->cur_totallen += vernode->cur_table->fragsize; } if(!vernode->cur_table->onceupload) { MD5_Update(&vernode->cur_table->md5ctx, vernode->cur_table->fragcontent, vernode->cur_table->fragsize); } free(vernode->cur_table->fragcontent); } if(vernode->cur_table->finished) //end { if(!vernode->cur_table->onceupload) { scandir_md5_final_string(&vernode->cur_table->md5ctx, md5str, 40); doris_config_post_cfgfile_finish(vernode, md5str); evhttp_add_header(evhttp_request_get_output_headers(req), "X-Content-MD5", md5str); } else { doris_config_post_cfgfile_finish(vernode, vernode->cur_table->fragmd5); } } evhttp_send_reply(req, HTTP_OK, "OK", NULL); } void prod_sync_upload_frag_cb(enum PROD_VEROP_RES result,void * userdata) { struct version_list_node *vernode=(struct version_list_node *)userdata; struct table_meta meta; vernode->retry_times++; vernode->syncing = 0; switch(result) { case VERSIONOP_RES_OK: http_config_direct_cfgfile_update(vernode, vernode->req); break; case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "frag sync error res_code"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: if(atomic_read(&vernode->business->ready_to_sync) && (vernode->retry_times < 3)) { vernode->syncing = 1; meta.md5 = vernode->cur_table->fragmd5; meta.cfgnum = vernode->cur_table->cfgnum; meta.tablename = vernode->cur_table->tablename; meta.filename = vernode->cur_table->filename; meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(vernode->req), "X-User-Info"); if(vernode->cur_table->onceupload) { doris_prod_upload_once_with_cb(vernode->synctx, vernode->cur_table->fragcontent, vernode->cur_table->fragsize, &meta, prod_sync_upload_frag_cb, vernode); } else { doris_prod_upload_frag_with_cb(vernode->synctx, vernode->cur_table->fragcontent, vernode->cur_table->fragsize, vernode->cur_table->cur_totallen, vernode->cur_table->finished?true:false, &meta, prod_sync_upload_frag_cb, vernode); } } else { http_config_direct_cfgfile_update(vernode, vernode->req); business_set_sync_peer_abnormal(vernode->business); } break; default: assert(0);break; } } void http_prod_server_file_once_cb(struct evhttp_request *req, void *arg) { struct doris_business *business=(struct doris_business *)arg; struct version_list_node *vernode; char *content, md5str[64]; const char *tmp; struct internal_tablemeta tablemeta; size_t size; struct table_meta meta; int32_t cfgnum=0, need_sync=0; if(NULL == (vernode=upload_file_arguments_valid_check(req, business, &tablemeta, false))) { return; } tablemeta.islast = 1; tablemeta.offset = 0; /*原始Client不带X-Doris-Master-Slave-Sync请求头,同一版本请求不允许跨机器*/ if(atomic_read(&business->ready_to_sync) && NULL==evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync")) { need_sync = 1; } if(need_sync && vernode->synctx==NULL) { evhttp_send_error(req, 400, "illegal server host, cannt change server durain version life cycle"); return ; } if((size=evbuffer_get_length(evhttp_request_get_input_buffer(req))) == 0) { evhttp_send_error(req, 400, "no content"); return ; } content = (char*)malloc(size); if(size != (size_t)evbuffer_copyout(evhttp_request_get_input_buffer(req), content, size)) { assert(0); } if(!upload_frag_check_content_md5(req, content, size, md5str, 64)) { free(content); return ; } if(NULL != (tmp=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Config-Num"))) { cfgnum = atoi(tmp);; } doris_config_post_cfgfile_prepare(vernode, &tablemeta, md5str, cfgnum, content, size); meta.md5 = md5str; meta.cfgnum = cfgnum; meta.tablename = tablemeta.tablename; meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(req), "X-User-Info"); meta.filename = vernode->cur_table->filename; vernode->cur_table->onceupload = true; if(!need_sync) { return http_config_direct_cfgfile_update(vernode, req); } vernode->retry_times = 0; vernode->req = req; vernode->syncing = 1; doris_prod_upload_once_with_cb(vernode->synctx, content, size, &meta, prod_sync_upload_frag_cb, vernode); } void http_prod_server_file_frag_cb(struct evhttp_request *req, void *arg) { struct doris_business *business=(struct doris_business *)arg; struct version_list_node *vernode; char *content=NULL, md5str[64]; const char *tmp; struct internal_tablemeta tablemeta; size_t size=0; struct table_meta meta; int32_t cfgnum=0, need_sync=0; if(NULL == (vernode=upload_file_arguments_valid_check(req, business, &tablemeta, true))) { return; } if((size=evbuffer_get_length(evhttp_request_get_input_buffer(req)))==0 && !tablemeta.islast) { evhttp_send_error(req, 400, "no content"); return ; } if(atomic_read(&business->ready_to_sync) && NULL==evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Master-Slave-Sync")) { need_sync = 1; } if(need_sync && vernode->synctx==NULL) { evhttp_send_error(req, 400, "illegal server host, cannt change server durain version life cycle"); return ; } if(size > 0) { content = (char *)malloc(size); if(size != (size_t)evbuffer_copyout(evhttp_request_get_input_buffer(req), content, size)) { assert(0); } if(!upload_frag_check_content_md5(req, content, size, md5str, 64)) { free(content); return ; } } if(NULL != (tmp=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Config-Num"))) { cfgnum = atoi(tmp);; } doris_config_post_cfgfile_prepare(vernode, &tablemeta, md5str, cfgnum, content, size); meta.md5 = md5str; meta.cfgnum = cfgnum; meta.tablename = tablemeta.tablename; meta.userregion = evhttp_find_header(evhttp_request_get_input_headers(req), "X-User-Info"); meta.filename = vernode->cur_table->filename; if(tablemeta.islast && tablemeta.offset==0) { vernode->cur_table->onceupload = true; } if(!need_sync) { return http_config_direct_cfgfile_update(vernode, req); } vernode->retry_times = 0; vernode->req = req; vernode->syncing = 1; doris_prod_upload_frag_with_cb(vernode->synctx, content, size, vernode->cur_table->cur_totallen, tablemeta.islast?true:false, &meta, prod_sync_upload_frag_cb, vernode); } void start_business_http_post_server(struct doris_business *business) { struct evhttp *worker_http; if((business->listener_prod = doris_create_listen_socket(business->producer_port)) < 0) { assert(0);return; } business->source_from = RECV_WAY_HTTP_POST; worker_http = evhttp_new(business->worker_evbase); if(g_doris_server_info.ssl_conn_on) { evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance); } evhttp_set_cb(worker_http, "/version/start", http_prod_server_verion_start_cb, business); evhttp_set_cb(worker_http, "/version/finish", http_prod_server_verion_end_cb, business); evhttp_set_cb(worker_http, "/version/cancel", http_prod_server_verion_cancel_cb, business); evhttp_set_cb(worker_http, "/version/check", http_prod_server_verion_check_cb, business); evhttp_set_cb(worker_http, "/fileonce/upload", http_prod_server_file_once_cb, business); evhttp_set_cb(worker_http, "/filefrag/upload", http_prod_server_file_frag_cb, business); evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_POST|EVHTTP_REQ_PUT|EVHTTP_REQ_HEAD); evhttp_set_max_body_size(worker_http, g_doris_server_info.max_http_body_size); if(evhttp_accept_socket(worker_http, business->listener_prod)) { printf("evhttp_accept_socket %d error!\n", business->listener_prod); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "evhttp_accept_socket %d error!\n", business->listener_prod); assert(0); } } void doris_config_version_sync_updated(struct doris_csum_instance *instance, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct doris_csum_param *param; u_int32_t references, business_post_ups; /*销毁consuemer,同时确保本函数只执行一次*/ param = doris_csum_instance_get_param(instance); doris_csum_instance_destroy(instance); references = doris_csum_param_get_refernces(param); if(references == 0) { doris_csum_parameter_destroy(param); } /*init sync instance*/ business->instance = doris_prod_instance_new(business->param_prod, business->worker_evbase, g_doris_server_info.log_runtime); if(business->instance == NULL) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "doris_prod_instance_new for %s failed", business->bizname); assert(0);return; } /*start worker*/ start_business_http_post_server(business); /*同步完成,表示本机版本与server已一致(注意是一致,而不是本机更新, 本机更新时server不会返回304)*/ atomic_set(&business->ready_to_sync, 1); pthread_mutex_lock(&g_doris_server_info.mutex_lock); business_post_ups = ++g_doris_server_info.business_post_ups; pthread_mutex_unlock(&g_doris_server_info.mutex_lock); if(business_post_ups == g_doris_server_info.business_post_num) { MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_OK); } else { MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING); } assert(business_post_ups <= g_doris_server_info.business_post_num); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m******Doris Producer worker for %s starts******\033[0m", business->bizname); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris ready to sync for business: %s\n", business->bizname); } /*与thread_doris_client_recv_cfg差别仅在于version_updated函数*/ void* thread_http_post_recv_cfg(void *arg) { struct doris_business *business=(struct doris_business *)arg; struct event_base *client_evbase; struct doris_csum_instance *instance; struct doris_callbacks doris_cbs; struct doris_arguments doris_args; struct doris_idxfile_scanner *scanner; enum DORIS_UPDATE_TYPE update_type; char stored_path[512]; prctl(PR_SET_NAME, "http_post"); client_evbase = event_base_new(); business->source_from = RECV_WAY_IDX_FILE; business->worker_evbase = client_evbase; scanner = doris_index_file_scanner(0); /*Retaive latest config to memory from Stored configs*/ doris_cbs.version_start = doris_config_localmem_version_start; doris_cbs.version_finish = doris_config_localmem_version_finish; doris_cbs.version_error = doris_config_localmem_version_error; doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; doris_cbs.version_updated= NULL; doris_cbs.userdata = business; snprintf(stored_path, 512, "%s/full/index", business->store_path_root); if(business->saves_when_fulldel > 0) { get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); } update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); do { update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type != CFG_UPDATE_TYPE_NONE); if(g_doris_server_info.cluster_sync_mode) /*Check new configs*/ { doris_cbs.version_start = doris_config_version_start; doris_cbs.version_finish = doris_config_version_finish; doris_cbs.version_error = doris_config_version_error; doris_cbs.cfgfile_start = doris_config_cfgfile_start; doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; doris_cbs.version_updated= doris_config_version_sync_updated; business->source_from = RECV_WAY_DRS_CLIENT; memset(&doris_args, 0, sizeof(struct doris_arguments)); doris_args.current_version = scanner->cur_version; sprintf(doris_args.bizname, "%s", business->bizname); instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) { assert(0);return NULL; } } else { start_business_http_post_server(business); } event_base_dispatch(client_evbase); printf("Libevent dispath error, should not run here.\n"); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); assert(0);return NULL; }