This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
doris-doris-dispatch/server/doris_server_receive.cpp

747 lines
27 KiB
C++
Raw Normal View History

2021-07-16 16:06:59 +08:00
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/prctl.h>
#include <time.h>
#include "doris_server_main.h"
#include "doris_server_scandir.h"
#include "doris_server_receive.h"
struct scanner_timer_priv
{
struct doris_callbacks doris_cbs;
struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner;
struct event timer_scanner;
};
extern struct doris_global_info g_doris_server_info;
2021-07-16 16:06:59 +08:00
void doris_worker_statistic_timer_cb(int fd, short kind, void *userp)
{
struct worker_statistic_info *statistic = (struct worker_statistic_info *)userp;
struct timeval tv;
struct doris_srv_statistics incr_statistic;
long long *plast_statistic = (long long*)&statistic->statistic_last;
long long *pnow_statistic = (long long*)&statistic->statistic;
long long *pinc_statistic = (long long*)&incr_statistic;
for(u_int32_t i=0; i<sizeof(struct doris_srv_statistics)/sizeof(long long); i++)
{
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
}
statistic->statistic_last = statistic->statistic;
for(u_int32_t i=0; i<DRS_FSSTAT_FIELD_NUM; i++)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
}
for(u_int32_t i=0; i<DRS_FSSTAT_STATUS_NUM; i++)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
}
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
event_add(&statistic->timer_statistic, &tv);
}
void config_frag_node_cleanup(struct confile_save *save, struct cont_frag_node *fragnode)
{
if(fragnode == NULL) return;
save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] -= fragnode->totalsize;
free(fragnode->content);
free(fragnode);
}
void config_table_node_cleanup(struct confile_save *save, struct table_list_node *table_node)
{
struct cont_frag_node *fragnode;
if(table_node == NULL) return;
while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head)))
{
TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node);
config_frag_node_cleanup(save, fragnode);
}
free(table_node);
}
void config_version_node_cleanup(struct confile_save *save, struct version_list_node *vernode)
{
struct table_list_node *tablenode;
if(vernode == NULL) return;
while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head)))
{
TAILQ_REMOVE(&vernode->table_head, tablenode, table_node);
config_table_node_cleanup(save, tablenode);
}
free(vernode->metacont);
cJSON_Delete(vernode->metajson);
cJSON_Delete(vernode->arrayjson);
cJSON_Delete(vernode->table_meta);
2021-07-16 16:06:59 +08:00
free(vernode);
}
void config_version_handle_cleanup(struct confile_save *save, struct version_list_handle *version)
{
struct version_list_node *vernode;
while(NULL != (vernode = TAILQ_FIRST(&version->version_head)))
{
TAILQ_REMOVE(&version->version_head, vernode, version_node);
config_version_node_cleanup(save, vernode);
}
free(version);
}
struct version_list_handle *config_version_handle_new(void)
{
struct version_list_handle *handle;
handle = (struct version_list_handle *)calloc(1, sizeof(struct version_list_handle));
TAILQ_INIT(&handle->version_head);
return handle;
}
static void doris_common_timer_start(struct event *time_event)
{
struct timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
evtimer_add(time_event, &tv);
}
static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp)
{
struct common_timer_event *delay_event=(struct common_timer_event *)userp;
struct version_list_handle *handle = (struct version_list_handle *)delay_event->data;
if(handle->references != 0)
{
doris_common_timer_start(&delay_event->timer_event);
return;
}
config_version_handle_cleanup(delay_event->save, handle);
free(delay_event);
}
static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_base *evbase, struct version_list_handle *version)
{
struct common_timer_event *delay_event;
delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event));
delay_event->data = version;
delay_event->save = save;
evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event);
doris_common_timer_start(&delay_event->timer_event);
}
/*fileϵ<65>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD>д<EFBFBD><D0B4><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD>*/
void doris_config_file_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
if(save->type == CFG_UPDATE_TYPE_FULL)
{
snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version);
snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version);
snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version);
}
else
{
snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", g_doris_server_info.store_path_root, save->version);
snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version);
}
save->fp_idx_file = fopen(save->tmp_index_path, "w+");
}
void doris_config_file_version_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
fclose(save->fp_idx_file);
if(rename(save->tmp_index_path, save->inc_index_path))
{
assert(0);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno));
}
if(save->type == CFG_UPDATE_TYPE_FULL)
{
if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //<2F><><EFBFBD><EFBFBD>Ӳ<EFBFBD><D3B2><EFBFBD><EFBFBD>
{
assert(0);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno));
}
}
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu write finished, index file: %s", save->version, save->inc_index_path);
}
void doris_config_file_version_error(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
if(save->fp_idx_file != NULL)
{
fclose(save->fp_idx_file);
remove(save->tmp_index_path);
}
if(save->fp_cfg_file != NULL)
{
fclose(save->fp_cfg_file);
remove(save->cfg_file_path);
}
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %llu error, rolling back...", save->version);
}
void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
struct tm *localtm, savetime;
time_t now;
const char *type;
char dir[256];
type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc";
now = time(NULL);
localtm = localtime_r(&now, &savetime);
snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", g_doris_server_info.store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
if(access(dir, F_OK))
{
doris_mkdir_according_path(dir);
}
snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version);
fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path);
save->fp_cfg_file = fopen(save->cfg_file_path, "w+");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s start writing...", save->cfg_file_path);
}
void doris_config_file_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
size_t writen_len;
writen_len = fwrite(data, 1, len, save->fp_cfg_file);
assert(writen_len==len);
}
void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
fclose(save->fp_cfg_file);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s write finished", save->cfg_file_path);
}
/*memϵ<6D>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>*/
void doris_config_mem_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
save->cur_vernode = (struct version_list_node *)calloc(1, sizeof(struct version_list_node));
TAILQ_INIT(&save->cur_vernode->table_head);
save->cur_vernode->metajson = cJSON_CreateObject();
save->cur_vernode->arrayjson= cJSON_CreateArray();
save->cur_vernode->version = save->version;
cJSON_AddNumberToObject(save->cur_vernode->metajson, "version", save->cur_vernode->version);
save->cur_vernode->cfg_type = save->type;
cJSON_AddNumberToObject(save->cur_vernode->metajson, "type", save->cur_vernode->cfg_type);
}
void doris_config_mem_version_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
struct version_list_handle *cur_version;
struct version_list_handle *tmplist;
cJSON_AddItemToObject(save->cur_vernode->metajson, "configs", save->cur_vernode->arrayjson);
save->cur_vernode->arrayjson = NULL;
save->cur_vernode->metacont = cJSON_PrintUnformatted(save->cur_vernode->metajson);
save->cur_vernode->metalen = strlen(save->cur_vernode->metacont);
cJSON_Delete(save->cur_vernode->metajson);
save->cur_vernode->metajson = NULL;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %lu mem finished, info: %s", save->version, save->cur_vernode->metacont);
if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && g_doris_server_info.cfgver_head->latest_version!=0)
{
cur_version = config_version_handle_new();
cur_version->latest_version = save->cur_vernode->version;
TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node);
pthread_rwlock_wrlock(&g_doris_server_info.rwlock);
tmplist = g_doris_server_info.cfgver_head;
g_doris_server_info.cfgver_head = cur_version;
pthread_rwlock_unlock(&g_doris_server_info.rwlock);
cfgver_handle_delay_destroy(save, save->evbase, tmplist);
}
else
{
TAILQ_INSERT_TAIL(&g_doris_server_info.cfgver_head->version_head, save->cur_vernode, version_node);
g_doris_server_info.cfgver_head->latest_version = save->cur_vernode->version;
}
save->cur_vernode = NULL;
}
void doris_config_mem_version_error(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
config_frag_node_cleanup(save, save->cur_frag);
config_table_node_cleanup(save, save->cur_table);
config_version_node_cleanup(save, save->cur_vernode);
save->cur_frag = NULL;
save->cur_table = NULL;
save->cur_vernode = NULL;
}
void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
save->cur_vernode->table_meta = cJSON_CreateObject();
cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", tablename);
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", cfgnum);
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", size);
2021-07-16 16:06:59 +08:00
save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
snprintf(save->cur_table->tablename, 64, "%s", tablename);
save->cur_table->filesize = size;
TAILQ_INIT(&save->cur_table->frag_head);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu start loading to memory...", tablename, save->version);
}
void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
size_t cache_len, offset=0;
save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] += len;
while(len > 0)
{
if(save->cur_frag == NULL)
{
save->cur_frag = (struct cont_frag_node *)calloc(1, sizeof(struct cont_frag_node));
save->cur_frag->start = save->cur_table->cur_totallen;
save->cur_frag->totalsize = save->cur_table->filesize - save->cur_table->cur_totallen;
if(save->cur_frag->totalsize > g_doris_server_info.cache_frag_size)
{
save->cur_frag->totalsize = g_doris_server_info.cache_frag_size;
}
save->cur_frag->end = save->cur_frag->start + save->cur_frag->totalsize - 1;
save->cur_frag->content = (char *)malloc(save->cur_frag->totalsize);
}
if(save->cur_frag->totalsize > save->cur_frag->cur_fraglen + len)
{
memcpy(save->cur_frag->content+save->cur_frag->cur_fraglen, data+offset, len);
save->cur_frag->cur_fraglen += len;
save->cur_table->cur_totallen += len;
offset += len;
len = 0;
}
else
{
cache_len = save->cur_frag->totalsize - save->cur_frag->cur_fraglen;
memcpy(save->cur_frag->content+save->cur_frag->cur_fraglen, data+offset, cache_len);
save->cur_frag->cur_fraglen += cache_len;
save->cur_table->cur_totallen += cache_len;
offset += cache_len;
len -= cache_len;
TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node);
assert(save->cur_frag->cur_fraglen == save->cur_frag->end - save->cur_frag->start + 1);
save->cur_frag = NULL;
}
}
assert(save->cur_table->cur_totallen <= save->cur_table->filesize);
}
void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
2021-07-16 16:06:59 +08:00
{
struct confile_save *save=(struct confile_save *)userdata;
cJSON_AddStringToObject(save->cur_vernode->table_meta, "md5", md5);
cJSON_AddItemToArray(save->cur_vernode->arrayjson, save->cur_vernode->table_meta);
save->cur_vernode->table_meta = NULL;
2021-07-16 16:06:59 +08:00
if(save->cur_frag != NULL)
{
TAILQ_INSERT_TAIL(&save->cur_table->frag_head, save->cur_frag, frag_node);
assert(save->cur_frag->cur_fraglen == save->cur_frag->end - save->cur_frag->start + 1);
save->cur_frag = NULL;
}
assert(save->cur_table->cur_totallen == save->cur_table->filesize);
TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu load to memory finished", save->cur_table->tablename, save->version);
save->cur_table = NULL;
}
/*commonϵ<6E>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><CAB1><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>*/
void doris_config_common_version_start(struct confile_save *save, cJSON *meta)
{
cJSON *sub;
sub = cJSON_GetObjectItem(meta, "version");
save->version = sub->valuedouble;
sub = cJSON_GetObjectItem(meta, "type");
save->type = sub->valueint;
assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC);
save->version_cfgnum = 0;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu start updating...", save->version);
}
void doris_config_common_version_finish(struct confile_save *save)
{
if(save->type == CFG_UPDATE_TYPE_FULL)
{
save->statistic.statistic.status[DRS_FSSTAT_CUR_FULL_VERSION] = save->version;
save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] = save->version_cfgnum;
save->statistic.statistic.field[DRS_FSSTAT_RECV_FULL_VER] += 1;
}
else
{
save->statistic.statistic.status[DRS_FSSTAT_CUR_INC_VERSION] = save->version;
save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] += save->version_cfgnum;
save->statistic.statistic.field[DRS_FSSTAT_RECV_INC_VER] += 1;
}
MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_latest_ver, MONITOR_VALUE_SET, save->version);
MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_total_cfgnum, MONITOR_VALUE_SET,
save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM]);
MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, MONITOR_STATUS_VERSION_ERR, NULL, NULL);
2021-07-16 16:06:59 +08:00
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu update finished", save->version);
}
void doris_config_common_version_error(struct confile_save *save)
{
save->statistic.statistic.field[DRS_FSSTAT_RECV_ERR_VER] += 1;
//Grafana+Promethues<65><73>չʾ<D5B9>ڲ<EFBFBD><DAB2>쳣״̬
MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, MONITOR_STATUS_VERSION_ERR,
"Version receive error", "Receive config file error, please check producer");
2021-07-16 16:06:59 +08:00
}
void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum)
{
save->version_cfgnum += cfgnum;
save->statistic.statistic.field[DRS_FSSTAT_RECV_START_FILES] += 1;
}
void doris_config_common_cfgfile_finish(struct confile_save *save)
{
save->statistic.statistic.field[DRS_FSSTAT_RECV_CMPLT_FILES] += 1;
}
/*localmemϵ<6D>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ӱ<EFBFBD><D3B1>ػ<EFBFBD><D8BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļص<C4BB>*/
void doris_config_localmem_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
doris_config_common_version_start((struct confile_save *)userdata, meta);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_start(instance, meta, userdata);
}
}
void doris_config_localmem_version_finish(struct doris_instance *instance, void *userdata)
{
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_finish(instance, userdata);
}
doris_config_common_version_finish((struct confile_save *)userdata);
}
void doris_config_localmem_version_error(struct doris_instance *instance, void *userdata)
{
doris_config_common_version_error((struct confile_save *)userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_error(instance, userdata);
}
}
void doris_config_localmem_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
{
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userdata);
}
}
void doris_config_localmem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
}
}
void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
2021-07-16 16:06:59 +08:00
{
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
2021-07-16 16:06:59 +08:00
}
}
/*<2A>ޱ<EFBFBD><DEB1><EFBFBD>ϵ<EFBFBD>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ص<EFBFBD>*/
void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
doris_config_common_version_start((struct confile_save *)userdata, meta);
if(g_doris_server_info.write_file_sw)
{
doris_config_file_version_start(instance, meta, userdata);
}
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_start(instance, meta, userdata);
}
}
void doris_config_version_finish(struct doris_instance *instance, void *userdata)
{
if(g_doris_server_info.write_file_sw)
{
doris_config_file_version_finish(instance, userdata);
}
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_finish(instance, userdata);
}
doris_config_common_version_finish((struct confile_save *)userdata);
}
void doris_config_version_error(struct doris_instance *instance, void *userdata)
{
doris_config_common_version_error((struct confile_save *)userdata);
if(g_doris_server_info.write_file_sw)
{
doris_config_file_version_error(instance, userdata);
}
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_version_error(instance, userdata);
}
}
void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
{
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
if(g_doris_server_info.write_file_sw)
{
doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userdata);
}
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userdata);
}
}
void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
if(g_doris_server_info.write_file_sw)
{
doris_config_file_cfgfile_update(instance, data, len, userdata);
}
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_update(instance, data, len, userdata);
}
}
void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
2021-07-16 16:06:59 +08:00
{
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
if(g_doris_server_info.write_file_sw)
{
doris_config_file_cfgfile_finish(instance, userdata);
}
if(g_doris_server_info.server_role_sw)
{
doris_config_mem_cfgfile_finish(instance, md5, userdata);
2021-07-16 16:06:59 +08:00
}
}
void* thread_doris_client_recv_cfg(void *arg)
{
struct event_base *manage_evbase=(struct event_base *)arg, *client_evbase;
struct doris_parameter *param;
struct doris_instance *instance;
struct doris_callbacks doris_cbs;
struct doris_arguments doris_args={0, 0, 0};
struct doris_idxfile_scanner *scanner;
enum DORIS_UPDATE_TYPE update_type;
struct confile_save save;
char stored_path[512];
struct timeval tv;
prctl(PR_SET_NAME, "client_recv");
client_evbase = event_base_new();
memset(&save, 0, sizeof(struct confile_save));
save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
scanner = doris_index_file_scanner(0);
/*Retaive latest config to memory from Stored configs*/
doris_cbs.version_start = doris_config_localmem_version_start;
doris_cbs.version_finish = doris_config_localmem_version_finish;
doris_cbs.version_error = doris_config_localmem_version_error;
doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
doris_cbs.userdata = &save;
snprintf(stored_path, 512, "%s/full/index", g_doris_server_info.store_path_root);
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", g_doris_server_info.store_path_root);
do {
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
}while(update_type != CFG_UPDATE_TYPE_NONE);
/*Check new configs*/
doris_cbs.version_start = doris_config_version_start;
doris_cbs.version_finish = doris_config_version_finish;
doris_cbs.version_error = doris_config_version_error;
doris_cbs.cfgfile_start = doris_config_cfgfile_start;
doris_cbs.cfgfile_update = doris_config_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
save.source_from = RECV_WAY_DRS_CLIENT;
doris_args.current_version = scanner->cur_version;
param = doris_parameter_new(NIRVANA_CONFIG_FILE, manage_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(param == NULL)
{
assert(0);return NULL;
}
instance = doris_instance_new(param, client_evbase, g_doris_server_info.log_runtime);
if(instance == NULL)
{
assert(0);return NULL;
}
evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic);
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
evtimer_add(&save.statistic.timer_statistic, &tv);
event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
assert(0);return NULL;
}
static void doris_scanner_timer_cb(int fd, short kind, void *userp)
{
struct scanner_timer_priv *timer_priv=(struct scanner_timer_priv *)userp;
enum DORIS_UPDATE_TYPE update_type;
struct timeval tv;
do {
update_type = doris_index_file_traverse(timer_priv->scanner, g_doris_server_info.recv_path_inc,
&timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime);
}while(update_type != CFG_UPDATE_TYPE_NONE);
tv.tv_sec = g_doris_server_info.scan_idx_interval;
tv.tv_usec = 0;
evtimer_add(&timer_priv->timer_scanner, &tv);
}
void* thread_index_file_recv_cfg(void *arg)
{
struct event_base *client_evbase;
struct confile_save save;
struct timeval tv;
struct scanner_timer_priv timer_priv;
enum DORIS_UPDATE_TYPE update_type;
char stored_path[256];
prctl(PR_SET_NAME, "index_file");
memset(&save, 0, sizeof(struct confile_save));
memset(&timer_priv, 0, sizeof(struct scanner_timer_priv));
client_evbase = event_base_new();
save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
timer_priv.scanner = doris_index_file_scanner(0);
/*Retaive latest config to memory from Stored configs*/
timer_priv.doris_cbs.version_start = doris_config_localmem_version_start;
timer_priv.doris_cbs.version_finish = doris_config_localmem_version_finish;
timer_priv.doris_cbs.version_error = doris_config_localmem_version_error;
timer_priv.doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start;
timer_priv.doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
timer_priv.doris_cbs.userdata = &save;
snprintf(stored_path, 512, "%s/full/index", g_doris_server_info.store_path_root);
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", g_doris_server_info.store_path_root);
do{
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
}while(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR);
/*Check new configs*/
timer_priv.doris_cbs.version_start = doris_config_version_start;
timer_priv.doris_cbs.version_finish = doris_config_version_finish;
timer_priv.doris_cbs.version_error = doris_config_version_error;
timer_priv.doris_cbs.cfgfile_start = doris_config_cfgfile_start;
timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
update_type = doris_index_file_traverse(timer_priv.scanner, g_doris_server_info.recv_path_full,
&timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR);
if(update_type!=CFG_UPDATE_TYPE_NONE && update_type!=CFG_UPDATE_TYPE_ERR)
{
tv.tv_sec = 0;
}
else
{
tv.tv_sec = g_doris_server_info.scan_idx_interval;
}
tv.tv_usec = 0;
evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv);
evtimer_add(&timer_priv.timer_scanner, &tv);
evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic);
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
evtimer_add(&save.statistic.timer_statistic, &tv);
event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
return NULL;
}