This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
doris-doris-dispatch/server/doris_server_main.cpp
2021-08-03 10:49:52 +08:00

396 lines
16 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <event2/thread.h>
#include <MESA/MESA_prof_load.h>
#include "doris_client.h"
#include "doris_server_main.h"
#include "doris_server_http.h"
struct doris_global_info g_doris_server_info;
static unsigned long doris_vesion_20210803=20210803L;
int doris_mkdir_according_path(const char * path)
{
char buffer[256];
const char *ps=path, *pc;
if(*ps == '/')
ps += 1;
while((pc = strchr(ps, '/')) != NULL)
{
while(*(pc+1) == '/')
pc++;
memcpy(buffer, path, pc - path);
buffer[pc-path] = '\0';
if(access(buffer, F_OK))
{
if(mkdir(buffer, 0777))
{
return -1;
}
}
ps = pc + 1;
}
if(access(path, F_OK))
{
if(mkdir(path, 0777))
{
return -1;
}
}
return 0;
}
static int doris_chech_name_valid(const char *name)
{
size_t i, namelen=strlen(name);
for(i=0; i<namelen; i++)
{
if(!((name[i]>='a' && name[i]<='z')||(name[i]>='A' && name[i]<='Z') ||
(name[i]>='0' && name[i]<='9') || name[i]=='_' || name[i]==':'))
{
return false;
}
}
return true;
}
int32_t doris_read_profile_configs(const char *config_file)
{
char tmp_buf[4096], tmp_dir[512];
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "run_log_dir", g_doris_server_info.root_log_dir, sizeof(g_doris_server_info.root_log_dir), "./log");
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "run_log_lv", &g_doris_server_info.log_level, 10);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "statistic_log_interval", &g_doris_server_info.statistic_period, 300);
/*runtimelog*/
snprintf(tmp_dir, 256, "%s/runtime_log", g_doris_server_info.root_log_dir);
if(doris_mkdir_according_path(tmp_dir))
{
printf("mkdir %s for runtimelog failed: %s\n", tmp_dir, strerror(errno));
return -1;
}
snprintf(tmp_buf, 256, "%s/doris_runtime.log", tmp_dir);
g_doris_server_info.log_runtime = MESA_create_runtime_log_handle(tmp_buf, g_doris_server_info.log_level);
if(NULL==g_doris_server_info.log_runtime)
{
printf("MESA_create_runtime_log_handle %s failed: %s\n", tmp_buf, strerror(errno));
return -1;
}
/*System*/
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "worker_thread_num", &g_doris_server_info.iothreads, 1);
if(g_doris_server_info.iothreads > 255)
{
printf("%s: [DORIS_SERVER]worker_thread_num=%d should not be bigger than 255!", config_file, g_doris_server_info.iothreads);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]worker_thread_num=%d should not be bigger than 255!", config_file, g_doris_server_info.iothreads);
assert(0);return -1;
}
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "index_file_format_maat", &g_doris_server_info.idx_file_maat, 0);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0);
if(g_doris_server_info.ssl_conn_on)
{
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "ssl_trusted_ca_path", g_doris_server_info.ssl_CA_path, 256, "./conf/ssl_CA_path/");
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "ssl_certificate_file", g_doris_server_info.ssl_cert_file, 256, "./conf/server_cert.pem");
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "ssl_private_key_file", g_doris_server_info.ssl_key_file, 256, "./conf/server_key.pem");
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "ssl_private_key_passwd", g_doris_server_info.ssl_key_passwd, 64, "12345678");
}
/*FiledStat*/
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_appname", g_doris_server_info.fsstat_appname, 16, "DORIS_SERVER_S");
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_filepath", g_doris_server_info.fsstat_filepath, 256, "./log/doris_server.fs");
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "fsstat_log_interval", &g_doris_server_info.fsstat_period, 10);
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "fsstat_log_print_mode", &g_doris_server_info.fsstat_print_mode, 1);
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "fsstat_log_dst_ip", g_doris_server_info.fsstat_dst_ip, 64, "127.0.0.1");
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "fsstat_log_dst_port", &g_doris_server_info.fsstat_dst_port, 8125);
//LIBEVENT
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "server_listen_port", &g_doris_server_info.server_port, 9898);
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "manage_listen_port", &g_doris_server_info.manager_port, 2233);
MESA_load_profile_int_def(config_file, "DORIS_SERVER", "socket_recv_bufsize", &g_doris_server_info.sock_recv_bufsize, 524288);
return 0;
}
static int doris_server_register_field_stat(struct doris_global_info *param)
{
const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvErrVer", "FileStarts", "FileComplete", "ClientInvReq",
"ClientMetaReq", "SendNoNewMeta", "ClientFileReq", "SendBytes", "SendFile404"};
const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed"};
const char *column_names[DRS_FSSTAT_CLUMN_NUM]={"RecvFullVer", "RecvIncVer", "RecvFiles", "SendResMeta", "SendFiles",
"CurFullVer", "CurIncVer", "TotalCfgNum"};
int value;
param->fsstat_handle = FS_create_handle();
FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
if(param->fsstat_print_mode == 1)
{
FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
}
else
{
FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
value = 1;
FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
}
value = param->fsstat_period;
FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
value = 0;
FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
for(int i=0; i<DRS_FSSTAT_FIELD_NUM; i++)
{
param->fsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
}
for(int i=0; i<DRS_FSSTAT_STATUS_NUM; i++)
{
param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
}
for(int i=0; i<DRS_FSSTAT_CLUMN_NUM; i++)
{
param->fsstat_column[i] = FS_register(param->fsstat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, column_names[i]);
}
FS_start(param->fsstat_handle);
return 0;
}
static void instance_fsstat_output_timer_cb(int fd, short kind, void *userp)
{
struct timeval tv;
FS_passive_output(g_doris_server_info.fsstat_handle);
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
event_add((struct event*)userp, &tv);
}
static int32_t doris_init_config_for_business(struct doris_global_info *info, struct event_base *manage_evbase, const char *config_file)
{
char tmpbuffer[4096], tmp_dir[256], tmp_dir2[256], *bizname, *save=NULL;
struct doris_business *business;
map<string, struct doris_parameter *>::iterator iter;
if(0>=MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "business_system_list", tmpbuffer, sizeof(tmpbuffer)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list not found!", config_file);
assert(0);return -1;
}
for(bizname=strtok_r(tmpbuffer, ";", &save); bizname!=NULL; bizname=strtok_r(NULL, ";", &save))
{
if(!doris_chech_name_valid(bizname))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list bizname %s invalid, name must match: [a-zA-Z_:][a-zA-Z0-9_:]", bizname, config_file);
assert(0);return -1;
}
business = &info->business[info->business_num++];
snprintf(business->bizname, sizeof(business->bizname), "%s", bizname);
pthread_rwlock_init(&business->rwlock, NULL);
business->cfgver_head = config_version_handle_new();
if(info->name2business->find(string(business->bizname)) != info->name2business->end())
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]business_system_list duplicate system name: %s!", bizname, config_file, business->bizname);
assert(0);return -1;
}
info->name2business->insert(make_pair(string(business->bizname), business));
MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3);
MESA_load_profile_uint_def(config_file, business->bizname, "doris_write_file_on", &business->write_file_sw, 1);
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file);
assert(0);return -1;
}
snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root);
snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root);
if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
return -1;
}
MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT);
if(business->recv_way == RECV_WAY_DRS_CLIENT)
{
if(0>=MESA_load_profile_string_nodef(config_file, business->bizname, "doris_client_confile", tmp_dir, sizeof(tmp_dir)))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]doris_client_confile not found!", config_file);
assert(0);return -1;
}
if((iter=info->confile2param->find(string(tmp_dir))) != info->confile2param->end())
{
business->param = iter->second;
}
else
{
business->param = doris_parameter_new(tmp_dir, manage_evbase, info->log_runtime);
if(business->param == NULL)
{
assert(0);return -2;
}
info->confile2param->insert(make_pair(string(tmp_dir), business->param));
}
}
else
{
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_full", business->recv_path_full, sizeof(business->recv_path_full)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
assert(0);return -1;
}
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_inc", business->recv_path_inc, sizeof(business->recv_path_inc)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
assert(0);return -1;
}
}
business->fs_lineid = FS_register(info->fsstat_handle, FS_STYLE_LINE, FS_CALC_CURRENT, business->bizname);;
snprintf(tmp_dir, 512, "latest_cfg_version_%s", business->bizname);
business->mm_latest_ver = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Latest doris config version.");
snprintf(tmp_dir, 512, "total_config_num_%s", business->bizname);
business->mm_total_cfgnum = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
}
return 0;
}
static void manager_statistic_threads_requests_cb(struct evhttp_request *req, void *arg)
{
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
}
static void manager_statistic_status_requests_cb(struct evhttp_request *req, void *arg)
{
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
}
static void manager_statistic_field_requests_cb(struct evhttp_request *req, void *arg)
{
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
}
void manager_generic_requests_cb(struct evhttp_request *req, void *arg)
{
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
}
int main(int argc, char **argv)
{
struct event_base *manage_evbase;
pthread_t thread_desc;
pthread_attr_t attr;
struct timeval tv;
struct event timer_statistic_fsstat;
struct evhttp *manager_http;
if(doris_read_profile_configs(NIRVANA_CONFIG_FILE) || doris_server_register_field_stat(&g_doris_server_info))
{
return -1;
}
evthread_use_pthreads();
g_doris_server_info.name2business = new map<string, struct doris_business*>;
g_doris_server_info.confile2param = new map<string, struct doris_parameter *>;
manage_evbase = event_base_new();
/*Doris manager server*/
g_doris_server_info.manager = doris_create_listen_socket(g_doris_server_info.manager_port);
if(g_doris_server_info.manager < 0)
{
return -6;
}
if(NULL == (manager_http = evhttp_new(manage_evbase)))
{
printf("evhttp_new error!\n");
assert(0); return -2;
}
evhttp_set_cb(manager_http, "/doris/statistic/field", manager_statistic_field_requests_cb, NULL);
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210803);
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
{
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
assert(0); return -7;
}
//Ϊÿ<CEAA><C3BF>ҵ<EFBFBD><D2B5>ϵͳ<CFB5><CDB3>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD>ȡ<EFBFBD><C8A1><EFBFBD>õĽṹ
if(doris_init_config_for_business(&g_doris_server_info, manage_evbase, NIRVANA_CONFIG_FILE))
{
return -8;
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(u_int32_t i=0; i<g_doris_server_info.business_num; i++)
{
if(g_doris_server_info.business[i].recv_way == RECV_WAY_DRS_CLIENT)
{
if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, &g_doris_server_info.business[i]))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
else
{
if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, &g_doris_server_info.business[i]))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
}
/*Doris http server*/
if(g_doris_server_info.server_role_sw)
{
g_doris_server_info.listener = doris_create_listen_socket(g_doris_server_info.server_port);
if(g_doris_server_info.listener < 0)
{
return -5;
}
for(u_int32_t i=0; i<g_doris_server_info.iothreads; i++)
{
if(pthread_create(&thread_desc, &attr, thread_doris_http_server, NULL))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
}
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
evtimer_assign(&timer_statistic_fsstat, manage_evbase, instance_fsstat_output_timer_cb, &timer_statistic_fsstat);
evtimer_add(&timer_statistic_fsstat, &tv);
event_base_dispatch(manage_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
return 0;
}