This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
doris-doris-dispatch/server/doris_server_receive.cpp

764 lines
29 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/prctl.h>
#include <time.h>
#include "doris_server_main.h"
#include "doris_server_scandir.h"
#include "doris_server_receive.h"
struct scanner_timer_priv
{
struct doris_business *business;
struct doris_callbacks doris_cbs;
struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner;
struct event timer_scanner;
};
extern struct doris_global_info g_doris_server_info;
void config_frag_node_cleanup(struct cont_frag_node *fragnode)
{
if(fragnode == NULL) return;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_SUB, fragnode->totalsize);
free(fragnode->content);
free(fragnode);
}
void config_table_node_cleanup(struct table_list_node *table_node)
{
struct cont_frag_node *fragnode;
if(table_node == NULL) return;
while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head)))
{
TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node);
config_frag_node_cleanup(fragnode);
}
free(table_node);
}
void config_version_node_cleanup(struct version_list_node *vernode)
{
struct table_list_node *tablenode;
if(vernode == NULL) return;
while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head)))
{
TAILQ_REMOVE(&vernode->table_head, tablenode, table_node);
config_table_node_cleanup(tablenode);
}
free(vernode->metacont);
cJSON_Delete(vernode->metajson);
cJSON_Delete(vernode->arrayjson);
cJSON_Delete(vernode->table_meta);
free(vernode);
}
void config_version_handle_free(struct version_list_handle *version)
{
struct version_list_node *vernode;
while(NULL != (vernode = TAILQ_FIRST(&version->version_head)))
{
TAILQ_REMOVE(&version->version_head, vernode, version_node);
config_version_node_cleanup(vernode);
}
delete version->version2node;
free(version);
}
struct version_list_handle *config_version_handle_new(void)
{
struct version_list_handle *handle;
handle = (struct version_list_handle *)calloc(1, sizeof(struct version_list_handle));
handle->version2node = new map<int64_t, struct version_list_node*>;
TAILQ_INIT(&handle->version_head);
return handle;
}
void config_version_node_free_content(struct version_list_node *vernode)
{
struct table_list_node *tablenode;
struct cont_frag_node *fragnode;
TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
{
while(NULL != (fragnode = TAILQ_FIRST(&tablenode->frag_head)))
{
TAILQ_REMOVE(&tablenode->frag_head, fragnode, frag_node);
config_frag_node_cleanup(fragnode);
}
}
vernode->cont_in_disk = 1;
}
static void doris_common_timer_start(struct event *time_event)
{
struct timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
evtimer_add(time_event, &tv);
}
static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp)
{
struct common_timer_event *delay_event=(struct common_timer_event *)userp;
struct version_list_handle *handle = (struct version_list_handle *)delay_event->data;
if(handle->references != 0)
{
doris_common_timer_start(&delay_event->timer_event);
return;
}
config_version_handle_free(handle);
free(delay_event);
}
static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_base *evbase, struct version_list_handle *version)
{
struct common_timer_event *delay_event;
delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event));
delay_event->data = version;
evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event);
doris_common_timer_start(&delay_event->timer_event);
}
/*fileϵ<65>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD>д<EFBFBD><D0B4><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD>*/
void doris_config_file_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
if(save->type == CFG_UPDATE_TYPE_FULL)
{
snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", save->business->store_path_root, save->version);
snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", save->business->store_path_root, save->version);
}
else
{
snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", save->business->store_path_root, save->version);
snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
}
if(NULL==(save->fp_idx_file = fopen(save->tmp_index_path, "w+")))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->tmp_index_path, strerror(errno));
assert(0);
}
}
void doris_config_file_version_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
fclose(save->fp_idx_file);
if(rename(save->tmp_index_path, save->inc_index_path))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
assert(0);
}
if(save->type == CFG_UPDATE_TYPE_FULL)
{
if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //<2F><><EFBFBD><EFBFBD>Ӳ<EFBFBD><D3B2><EFBFBD><EFBFBD>
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
assert(0);
}
}
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", save->business->bizname, save->version, save->inc_index_path);
}
void doris_config_file_version_error(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
if(save->fp_idx_file != NULL)
{
fclose(save->fp_idx_file);
remove(save->tmp_index_path);
}
if(save->fp_cfg_file != NULL)
{
fclose(save->fp_cfg_file);
remove(save->cfg_file_path);
}
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version);
}
void doris_config_file_cfgfile_start(struct doris_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
struct tm *localtm, savetime;
time_t now;
const char *type;
char dir[256];
type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc";
now = time(NULL);
localtm = localtime_r(&now, &savetime);
snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", save->business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
if(access(dir, F_OK))
{
doris_mkdir_according_path(dir);
}
snprintf(save->cfg_file_path, 256, "%s/%s", dir, meta->filename);
if(g_doris_server_info.idx_file_maat) //MAAT<41><54>ʽ<EFBFBD><CABD>֪ͨ<CDA8>ļ<EFBFBD>
{
fprintf(save->fp_idx_file, "%s\t%u\t%s\n", meta->tablename, meta->cfgnum, save->cfg_file_path);
}
else //ת<><D7AA><EFBFBD><EFBFBD>ɫ<EFBFBD><C9AB><EFBFBD><EFBFBD><EFBFBD>û<EFBFBD><C3BB>Զ<EFBFBD><D4B6><EFBFBD><EFBFBD><EFBFBD>Ϣ
{
fprintf(save->fp_idx_file, "%s\t%u\t%s\t%s\n", meta->tablename, meta->cfgnum, save->cfg_file_path, meta->userregion);
}
if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+")))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
assert(0);
}
else
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", save->business->bizname, save->cfg_file_path);
}
}
void doris_config_file_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
size_t writen_len;
writen_len = fwrite(data, 1, len, save->fp_cfg_file);
if(writen_len != len)
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
assert(0);
}
}
void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
fclose(save->fp_cfg_file);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", save->business->bizname, save->cfg_file_path);
}
/*memϵ<6D>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>*/
void doris_config_mem_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
save->cur_vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node));
TAILQ_INIT(&save->cur_vernode->table_head);
save->cur_vernode->metajson = cJSON_CreateObject();
save->cur_vernode->arrayjson= cJSON_CreateArray();
save->cur_vernode->version = save->version;
cJSON_AddNumberToObject(save->cur_vernode->metajson, "version", save->cur_vernode->version);
save->cur_vernode->cfg_type = save->type;
cJSON_AddNumberToObject(save->cur_vernode->metajson, "type", save->cur_vernode->cfg_type);
}
void doris_config_mem_version_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
struct version_list_handle *cur_version;
struct version_list_handle *tmplist;
struct version_list_handle *cfgver_handle;
cJSON_AddItemToObject(save->cur_vernode->metajson, "configs", save->cur_vernode->arrayjson);
save->cur_vernode->arrayjson = NULL;
save->cur_vernode->metacont = cJSON_PrintUnformatted(save->cur_vernode->metajson);
save->cur_vernode->metalen = strlen(save->cur_vernode->metacont);
cJSON_Delete(save->cur_vernode->metajson);
save->cur_vernode->metajson = NULL;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", save->business->bizname, save->version, save->cur_vernode->metacont);
if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && save->business->cfgver_head->latest_version!=0)
{
cur_version = config_version_handle_new();
cur_version->latest_version = save->cur_vernode->version;
cur_version->version_num = 1;
TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node);
cur_version->oldest_vernode = TAILQ_FIRST(&cur_version->version_head);
cur_version->version2node->insert(make_pair(cur_version->latest_version, save->cur_vernode));
pthread_rwlock_wrlock(&save->business->rwlock);
tmplist = save->business->cfgver_head;
save->business->cfgver_head = cur_version;
pthread_rwlock_unlock(&save->business->rwlock);
cfgver_handle_delay_destroy(save, save->evbase, tmplist);
}
else
{
pthread_rwlock_wrlock(&save->business->rwlock);
cfgver_handle = save->business->cfgver_head;
TAILQ_INSERT_TAIL(&cfgver_handle->version_head, save->cur_vernode, version_node);
cfgver_handle->latest_version = save->cur_vernode->version;
cfgver_handle->version2node->insert(make_pair(save->cur_vernode->version, save->cur_vernode));
if(cfgver_handle->oldest_vernode == NULL)
{
cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
}
/*<2A><><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E0BBBA>N<EFBFBD><4E><EFBFBD><EFBFBD><E6B1BE>Ԫ<EFBFBD><D4AA>Ϣȫ<CFA2><C8AB><EFBFBD><EFBFBD>*/
if(save->business->cache_max_versions!=0 && cfgver_handle->version_num>=save->business->cache_max_versions)
{
config_version_node_free_content(cfgver_handle->oldest_vernode);
cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node);
}
else
{
cfgver_handle->version_num += 1;
}
pthread_rwlock_unlock(&save->business->rwlock);
}
save->cur_vernode = NULL;
}
void doris_config_mem_version_error(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
config_frag_node_cleanup(save->cur_frag);
config_table_node_cleanup(save->cur_table);
config_version_node_cleanup(save->cur_vernode);
save->cur_frag = NULL;
save->cur_table = NULL;
save->cur_vernode = NULL;
}
void doris_config_mem_cfgfile_start(struct doris_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
save->cur_vernode->table_meta = cJSON_CreateObject();
cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", meta->tablename);
cJSON_AddStringToObject(save->cur_vernode->table_meta, "filename", meta->filename);
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", meta->cfgnum);
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", meta->size);
if(meta->userregion != NULL)
{
cJSON_AddStringToObject(save->cur_vernode->table_meta, "user_region", meta->userregion);
}
save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
save->cur_table->filesize = meta->size;
snprintf(save->cur_table->tablename, 64, "%s", meta->tablename);
snprintf(save->cur_table->localpath, 256, "%s", localpath);
TAILQ_INIT(&save->cur_table->frag_head);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...",
save->business->bizname, meta->tablename, save->version);
}
void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
size_t cache_len, offset=0;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_ADD, len);
while(len > 0)
{
if(save->cur_frag == NULL)
{
save->cur_frag = (struct cont_frag_node *)calloc(1, sizeof(struct cont_frag_node));
save->cur_frag->start = save->cur_table->cur_totallen;
save->cur_frag->totalsize = save->cur_table->filesize - save->cur_table->cur_totallen;
if(save->cur_frag->totalsize > g_doris_server_info.cache_frag_size)
{
save->cur_frag->totalsize = g_doris_server_info.cache_frag_size;
}
save->cur_frag->end = save->cur_frag->start + save->cur_frag->totalsize - 1;
save->cur_frag->content = (char *)malloc(save->cur_frag->totalsize);
}
if(save->cur_frag->totalsize > save->cur_frag->cur_fraglen + len)
{
memcpy(save->cur_frag->content+save->cur_frag->cur_fraglen, data+offset, len);
save->cur_frag->cur_fraglen += len;
save->cur_table->cur_totallen += len;
offset += len;
len = 0;
}
else
{
cache_len = save->cur_frag->totalsize - save->cur_frag->cur_fraglen;
memcpy(save->cur_frag->content+save->cur_frag->cur_fraglen, data+offset, cache_len);
save->cur_frag->cur_fraglen += cache_len;
save->cur_table->cur_totallen += cache_len;
offset += cache_len;
len -= cache_len;
TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node);
assert(save->cur_frag->cur_fraglen == save->cur_frag->end - save->cur_frag->start + 1);
save->cur_frag = NULL;
}
}
assert(save->cur_table->cur_totallen <= save->cur_table->filesize);
}
void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
cJSON_AddStringToObject(save->cur_vernode->table_meta, "md5", md5);
cJSON_AddItemToArray(save->cur_vernode->arrayjson, save->cur_vernode->table_meta);
save->cur_vernode->table_meta = NULL;
if(save->cur_frag != NULL)
{
TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node);
assert(save->cur_frag->cur_fraglen == save->cur_frag->end - save->cur_frag->start + 1);
save->cur_frag = NULL;
}
assert(save->cur_table->cur_totallen == save->cur_table->filesize);
TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", save->business->bizname, save->cur_table->tablename, save->version);
save->cur_table = NULL;
}
/*commonϵ<6E>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>*/
void doris_config_common_version_start(struct confile_save *save, cJSON *meta)
{
cJSON *sub;
sub = cJSON_GetObjectItem(meta, "version");
save->version = sub->valuedouble;
sub = cJSON_GetObjectItem(meta, "type");
save->type = sub->valueint;
assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC);
save->version_cfgnum = 0;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", save->business->bizname, save->version);
}
void doris_config_common_version_finish(struct confile_save *save)
{
if(save->type == CFG_UPDATE_TYPE_FULL)
{
save->business->total_cfgnum = save->version_cfgnum;
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, save->version);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, save->version_cfgnum);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1);
}
else
{
save->business->total_cfgnum += save->version_cfgnum;
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, save->version);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, save->version_cfgnum);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1);
}
MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_latest_ver, MONITOR_VALUE_SET, save->version);
MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_total_cfgnum, MONITOR_VALUE_SET, save->business->total_cfgnum);
MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, save->business->mm_status_codeid, NULL, NULL);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", save->business->bizname, save->version);
}
void doris_config_common_version_error(struct confile_save *save)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1);
//Grafana+Promethues<65><73>չʾ<D5B9>ڲ<EFBFBD><DAB2>쳣״̬
MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, save->business->mm_status_codeid,
"Version receive error", "Receive config file error, please check producer");
}
void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum)
{
save->version_cfgnum += cfgnum;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1);
}
void doris_config_common_cfgfile_finish(struct confile_save *save)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_CMPLT_FILES], 0, FS_OP_ADD, 1);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1);
}
/*localmemϵ<6D>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ӱ<EFBFBD><D3B1>ػ<EFBFBD><D8BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļص<C4BB>*/
void doris_config_localmem_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
doris_config_common_version_start((struct confile_save *)userdata, meta);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_start(instance, meta, userdata);
}
}
void doris_config_localmem_version_finish(struct doris_instance *instance, void *userdata)
{
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_finish(instance, userdata);
}
doris_config_common_version_finish((struct confile_save *)userdata);
}
void doris_config_localmem_version_error(struct doris_instance *instance, void *userdata)
{
doris_config_common_version_error((struct confile_save *)userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_error(instance, userdata);
}
}
void doris_config_localmem_cfgfile_start(struct doris_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_start(instance, meta, localpath, userdata);
}
}
void doris_config_localmem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
}
}
void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
{
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
}
}
/*<2A>ޱ<EFBFBD><DEB1><EFBFBD>ϵ<EFBFBD>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ص<EFBFBD>*/
void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
doris_config_common_version_start((struct confile_save *)userdata, meta);
doris_config_file_version_start(instance, meta, userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_start(instance, meta, userdata);
}
}
void doris_config_version_finish(struct doris_instance *instance, void *userdata)
{
doris_config_file_version_finish(instance, userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_finish(instance, userdata);
}
doris_config_common_version_finish((struct confile_save *)userdata);
}
void doris_config_version_error(struct doris_instance *instance, void *userdata)
{
doris_config_common_version_error((struct confile_save *)userdata);
doris_config_file_version_error(instance, userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_error(instance, userdata);
}
}
void doris_config_cfgfile_start(struct doris_instance *instance,
const struct tablemeta *meta, const char *localpath, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum);
doris_config_file_cfgfile_start(instance, meta, localpath, userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_start(instance, meta, save->cfg_file_path, userdata);
}
}
void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
doris_config_file_cfgfile_update(instance, data, len, userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
}
}
void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
{
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
doris_config_file_cfgfile_finish(instance, userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
}
}
void* thread_doris_client_recv_cfg(void *arg)
{
struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase;
struct doris_instance *instance;
struct doris_callbacks doris_cbs;
struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner;
enum DORIS_UPDATE_TYPE update_type;
struct confile_save save;
char stored_path[512];
prctl(PR_SET_NAME, "client_recv");
client_evbase = event_base_new();
memset(&save, 0, sizeof(struct confile_save));
save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
save.business = business;
scanner = doris_index_file_scanner(0);
/*Retaive latest config to memory from Stored configs*/
doris_cbs.version_start = doris_config_localmem_version_start;
doris_cbs.version_finish = doris_config_localmem_version_finish;
doris_cbs.version_error = doris_config_localmem_version_error;
doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
doris_cbs.userdata = &save;
snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do {
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
}while(update_type != CFG_UPDATE_TYPE_NONE);
/*Check new configs*/
doris_cbs.version_start = doris_config_version_start;
doris_cbs.version_finish = doris_config_version_finish;
doris_cbs.version_error = doris_config_version_error;
doris_cbs.cfgfile_start = doris_config_cfgfile_start;
doris_cbs.cfgfile_update = doris_config_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
save.source_from = RECV_WAY_DRS_CLIENT;
memset(&doris_args, 0, sizeof(struct doris_arguments));
doris_args.current_version = scanner->cur_version;
sprintf(doris_args.bizname, "%s", business->bizname);
instance = doris_instance_new(business->param, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(instance == NULL)
{
assert(0);return NULL;
}
event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
assert(0);return NULL;
}
static void doris_scanner_timer_cb(int fd, short kind, void *userp)
{
struct scanner_timer_priv *timer_priv=(struct scanner_timer_priv *)userp;
enum DORIS_UPDATE_TYPE update_type;
struct timeval tv;
do {
update_type = doris_index_file_traverse(timer_priv->scanner, timer_priv->business->recv_path_inc,
&timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime);
}while(update_type != CFG_UPDATE_TYPE_NONE);
tv.tv_sec = g_doris_server_info.scan_idx_interval;
tv.tv_usec = 0;
evtimer_add(&timer_priv->timer_scanner, &tv);
}
void* thread_index_file_recv_cfg(void *arg)
{
struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase;
struct confile_save save;
struct timeval tv;
struct scanner_timer_priv timer_priv;
enum DORIS_UPDATE_TYPE update_type;
char stored_path[256];
prctl(PR_SET_NAME, "index_file");
memset(&save, 0, sizeof(struct confile_save));
memset(&timer_priv, 0, sizeof(struct scanner_timer_priv));
client_evbase = event_base_new();
save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
save.business = business;
timer_priv.scanner = doris_index_file_scanner(0);
timer_priv.business = business;
/*Retaive latest config to memory from Stored configs*/
timer_priv.doris_cbs.version_start = doris_config_localmem_version_start;
timer_priv.doris_cbs.version_finish = doris_config_localmem_version_finish;
timer_priv.doris_cbs.version_error = doris_config_localmem_version_error;
timer_priv.doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
timer_priv.doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
timer_priv.doris_cbs.userdata = &save;
snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do{
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
}while(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR);
/*Check new configs*/
timer_priv.doris_cbs.version_start = doris_config_version_start;
timer_priv.doris_cbs.version_finish = doris_config_version_finish;
timer_priv.doris_cbs.version_error = doris_config_version_error;
timer_priv.doris_cbs.cfgfile_start = doris_config_cfgfile_start;
timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full,
&timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
if(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR)
{
tv.tv_sec = 0;
}
else
{
tv.tv_sec = g_doris_server_info.scan_idx_interval;
}
tv.tv_usec = 0;
evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv);
evtimer_add(&timer_priv.timer_scanner, &tv);
event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
return NULL;
}