支持多业务系统的配置并发进行同步

This commit is contained in:
linuxrc@163.com
2021-07-22 10:25:42 +08:00
parent 8b236b38cb
commit 5217e9188e
10 changed files with 453 additions and 327 deletions

View File

@@ -137,8 +137,8 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code
instance->retry_times = 0; instance->retry_times = 0;
if(instance->curmeta.curoffset == 0) if(instance->curmeta.curoffset == 0)
{ {
instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->cbs.cfgfile_start(instance, instance->curmeta.table_name,
instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata); instance->curmeta.size, instance->curmeta.cfg_num, instance->cbs.userdata);
MD5_Init(&instance->ctx.md5ctx); MD5_Init(&instance->ctx.md5ctx);
} }
} }
@@ -180,7 +180,7 @@ void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, lo
return; return;
} }
instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata); instance->cbs.cfgfile_update(instance, ptr, bytes, instance->cbs.userdata);
MD5_Update(&instance->ctx.md5ctx, ptr, bytes); MD5_Update(&instance->ctx.md5ctx, ptr, bytes);
instance->curmeta.curoffset += bytes; instance->curmeta.curoffset += bytes;
instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes;
@@ -228,14 +228,14 @@ void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, vo
} }
else else
{ {
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s", MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.%010lu over, md5: %s",
instance->curmeta.table_name, instance->req_version, md5buffer); instance->curmeta.table_name, instance->req_version, md5buffer);
} }
instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1;
instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata); instance->cbs.cfgfile_finish(instance, md5buffer, instance->cbs.userdata);
if(instance->array_index == instance->array_size) if(instance->array_index == instance->array_size)
{ {
instance->param->cbs.version_finish(instance, instance->param->cbs.userdata); instance->cbs.version_finish(instance, instance->cbs.userdata);
instance->status = FETCH_STATUS_META; instance->status = FETCH_STATUS_META;
doris_update_new_version(instance); doris_update_new_version(instance);
cJSON_Delete(instance->meta); cJSON_Delete(instance->meta);
@@ -268,7 +268,7 @@ out_md5:
if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail) if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail)
{ {
instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1; instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1;
instance->param->cbs.version_error(instance, instance->param->cbs.userdata); instance->cbs.version_error(instance, instance->cbs.userdata);
instance->retry_times = 0; instance->retry_times = 0;
instance->status = FETCH_STATUS_META; instance->status = FETCH_STATUS_META;
cJSON_Delete(instance->meta); cJSON_Delete(instance->meta);
@@ -296,7 +296,7 @@ void doris_http_fetch_confile(struct doris_instance *instance)
doris_http_ctx_add_header(instance->ctx.httpctx, range); doris_http_ctx_add_header(instance->ctx.httpctx, range);
} }
snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid); snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.table_name, instance->req_version, instance->args.bizname);
if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) if(doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{ {
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
@@ -371,7 +371,7 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s", MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s",
instance->cur_version, instance->estr.buff); instance->cur_version, instance->estr.buff);
instance->param->cbs.version_start(instance, instance->meta, instance->param->cbs.userdata); instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata);
instance->array = cJSON_GetObjectItem(instance->meta, "configs"); instance->array = cJSON_GetObjectItem(instance->meta, "configs");
instance->array_size = cJSON_GetArraySize(instance->array); instance->array_size = cJSON_GetArraySize(instance->array);
assert(instance->array_size > 0); assert(instance->array_size > 0);
@@ -420,7 +420,7 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
instance->req_version = instance->cur_version + 1; //ֻ<>а<D0B0><E6B1BE><EFBFBD>³ɹ<C2B3><C9B9><EFBFBD><EFBFBD><EFBFBD>cur_version<6F>Ż<EFBFBD><C5BB><EFBFBD><EFBFBD><EFBFBD> instance->req_version = instance->cur_version + 1; //ֻ<>а<D0B0><E6B1BE><EFBFBD>³ɹ<C2B3><C9B9><EFBFBD><EFBFBD><EFBFBD>cur_version<6F>Ż<EFBFBD><C5BB><EFBFBD><EFBFBD><EFBFBD>
if(instance->ctx.httpctx != NULL) if(instance->ctx.httpctx != NULL)
{ {
snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid); snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname);
if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{ {
instance->status = FETCH_STATUS_META; instance->status = FETCH_STATUS_META;
@@ -530,15 +530,12 @@ static int doris_client_register_field_stat(struct doris_parameter *param, void
return 0; return 0;
} }
struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs, struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
struct doris_arguments *args, void *runtimelog)
{ {
struct doris_parameter *param; struct doris_parameter *param;
param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter)); param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter));
param->manage_evbase = manage_evbase; param->manage_evbase = manage_evbase;
param->cbs = *cbs;
param->args= *args;
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", &param->retry_interval, 10); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", &param->retry_interval, 10);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", &param->fetch_frag_size, 5242880); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", &param->fetch_frag_size, 5242880);
@@ -599,7 +596,8 @@ static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
event_add(&instance->timer_statistic, &tv); event_add(&instance->timer_statistic, &tv);
} }
struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog) struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase,
struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog)
{ {
struct doris_instance *instance; struct doris_instance *instance;
struct timeval tv; struct timeval tv;
@@ -608,7 +606,9 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct
instance->param = param; instance->param = param;
instance->worker_evbase = worker_evbase; instance->worker_evbase = worker_evbase;
instance->runtime_log = runtimelog; instance->runtime_log = runtimelog;
instance->cur_version = param->args.current_version; instance->cbs = *cbs;
instance->args= *args;
instance->cur_version = args->current_version;
instance->req_version = instance->cur_version + 1; //TODO instance->req_version = instance->cur_version + 1; //TODO
instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog); instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog);

View File

@@ -23,9 +23,6 @@ enum FETCH_CFG_STATUS
struct doris_parameter struct doris_parameter
{ {
struct doris_callbacks cbs;
struct doris_arguments args;
u_int32_t retry_interval; u_int32_t retry_interval;
u_int32_t fetch_frag_size; u_int32_t fetch_frag_size;
u_int32_t fetch_max_tries; u_int32_t fetch_max_tries;
@@ -82,6 +79,9 @@ struct doris_confile_ctx
struct doris_instance struct doris_instance
{ {
struct doris_callbacks cbs;
struct doris_arguments args;
enum FETCH_CFG_STATUS status; enum FETCH_CFG_STATUS status;
u_int32_t retry_times; u_int32_t retry_times;

View File

@@ -45,8 +45,8 @@ struct doris_statistics
struct doris_arguments struct doris_arguments
{ {
char bizname[32];
int64_t current_version; //<2F><>ǰ<EFBFBD>ѻ<EFBFBD>ȡ<EFBFBD><C8A1><EFBFBD>ϵ<EFBFBD><CFB5><EFBFBD><EFBFBD>°汾<C2B0>ţ<EFBFBD><C5A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>汾ȡ<E6B1BE><C8A1><EFBFBD><EFBFBD> int64_t current_version; //<2F><>ǰ<EFBFBD>ѻ<EFBFBD>ȡ<EFBFBD><C8A1><EFBFBD>ϵ<EFBFBD><CFB5><EFBFBD><EFBFBD>°汾<C2B0>ţ<EFBFBD><C5A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD>汾ȡ<E6B1BE><C8A1><EFBFBD><EFBFBD>
int32_t businessid;
int32_t judian_id; int32_t judian_id;
}; };
@@ -62,9 +62,9 @@ struct doris_callbacks
void (*version_finish)(struct doris_instance *instance, void *userdata); void (*version_finish)(struct doris_instance *instance, void *userdata);
}; };
struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs, struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog);
struct doris_arguments *args, void *runtimelog); struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase,
struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog); struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog);
#endif #endif

View File

@@ -0,0 +1,35 @@
[DORIS_CLIENT]
fetch_fail_retry_interval=5
fetch_fragmet_size=5242880
fetch_confile_max_tries=3
fsstat_log_appname=DorisClient
fsstat_log_filepath=./log/doris_client.fs
fsstat_log_interval=2
fsstat_log_print_mode=1
fsstat_log_dst_ip=192.168.10.90
fsstat_log_dst_port=8125
[DORIS_CLIENT.master_server]
max_connection_per_host=1
max_cnnt_pipeline_num=10
https_connection_on=0
max_curl_session_num=10
http_server_listen_port=9897
http_server_manage_port=9897
http_server_ip_list=192.168.10.8
[DORIS_CLIENT.backup1_server]
max_connection_per_host=1
max_cnnt_pipeline_num=10
https_connection_on=0
max_curl_session_num=10
http_server_listen_port=9897
http_server_manage_port=9897
http_server_ip_list=192.168.11.241
[DORIS_CLIENT.backup2_server]

View File

@@ -3,13 +3,9 @@ worker_thread_num=2
server_listen_port=9898 server_listen_port=9898
manage_listen_port=2233 manage_listen_port=2233
https_connection_on=1 https_connection_on=1
cache_file_frag_size=100
#1-Doris client; 2-local file business_system_list=T1_1;VoIP
receive_config_way=2
cache_file_frag_size=67108864
store_config_path=./doris_store_path
receive_config_path_full=./doris_receive_path/full/index
receive_config_path_inc=./doris_receive_path/inc/index
run_log_dir=./log run_log_dir=./log
run_log_lv=20 run_log_lv=20
@@ -20,40 +16,20 @@ fsstat_log_print_mode=1
fsstat_log_dst_ip=192.168.10.90 fsstat_log_dst_ip=192.168.10.90
fsstat_log_dst_port=8125 fsstat_log_dst_port=8125
[T1_1]
#1-Doris client; 2-local file
receive_config_way=2
grafana_monitor_status_id=3
store_config_path=./doris_store_t1
receive_config_path_full=./doris_receive_t1/full/index
receive_config_path_inc=./doris_receive_t1/inc/index
#doris_client_confile=./conf/doris_client.conf
[VoIP]
[DORIS_CLIENT] receive_config_way=2
fetch_fail_retry_interval=5 grafana_monitor_status_id=4
fetch_fragmet_size=5242880 store_config_path=./doris_store_voip
fetch_confile_max_tries=3 receive_config_path_full=./doris_receive_voip/full/index
receive_config_path_inc=./doris_receive_voip/inc/index
fsstat_log_appname=DorisClient #doris_client_confile=./conf/doris_client.conf
fsstat_log_filepath=./log/doris_client.fs
fsstat_log_interval=2
fsstat_log_print_mode=1
fsstat_log_dst_ip=192.168.10.90
fsstat_log_dst_port=8125
[DORIS_CLIENT.master_server]
max_connection_per_host=1
max_cnnt_pipeline_num=10
https_connection_on=1
max_curl_session_num=10
http_server_listen_port=9897
http_server_manage_port=9897
http_server_ip_list=192.168.10.8
[DORIS_CLIENT.backup1_server]
max_connection_per_host=1
max_cnnt_pipeline_num=10
https_connection_on=0
max_curl_session_num=10
http_server_listen_port=9897
http_server_manage_port=9897
http_server_ip_list=192.168.11.241
[DORIS_CLIENT.backup2_server]

View File

@@ -73,47 +73,63 @@ int doris_create_listen_socket(int bind_port)
void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
{ {
struct worker_statistic_info *statistic=(struct worker_statistic_info *)arg;
struct evkeyvalq params; struct evkeyvalq params;
const char *version; const char *version, *bizname;
int64_t verlong; int64_t verlong;
char *endptr=NULL, length[64]; char *endptr=NULL, length[64];
struct version_list_node *vernode; struct version_list_node *vernode;
struct evbuffer *evbuf; struct evbuffer *evbuf;
struct doris_business *business;
map<string, struct doris_business*>::iterator iter;
statistic->statistic.field[DRS_FSSTAT_CLIENT_META_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_META_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params)) if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{ {
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return; return;
} }
if(NULL == (version = evhttp_find_header(&params, "version"))) if(NULL == (version = evhttp_find_header(&params, "version")))
{ {
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params); evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version found"); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version found");
return; return;
} }
if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0') if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0')
{ {
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params); evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid"); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return; return;
} }
evhttp_clear_headers(&params); if(NULL == (bizname = evhttp_find_header(&params, "business")))
pthread_rwlock_rdlock(&g_doris_server_info.rwlock);
if(verlong > g_doris_server_info.cfgver_head->latest_version)
{ {
pthread_rwlock_unlock(&g_doris_server_info.rwlock); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
statistic->statistic.field[DRS_FSSTAT_SEND_META_NONEW] += 1; evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
return;
}
if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
return;
}
evhttp_clear_headers(&params);
business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
if(verlong > business->cfgver_head->latest_version)
{
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found"); evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found");
return; return;
} }
vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head); vernode = TAILQ_FIRST(&business->cfgver_head->version_head);
while(vernode->version < verlong) while(vernode->version < verlong)
{ {
vernode = TAILQ_NEXT(vernode, version_node); vernode = TAILQ_NEXT(vernode, version_node);
@@ -121,9 +137,9 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
evbuf = evbuffer_new(); evbuf = evbuffer_new();
evbuffer_add(evbuf, vernode->metacont, vernode->metalen); evbuffer_add(evbuf, vernode->metacont, vernode->metalen);
sprintf(length, "%u", vernode->metalen); sprintf(length, "%u", vernode->metalen);
pthread_rwlock_unlock(&g_doris_server_info.rwlock); pthread_rwlock_unlock(&business->rwlock);
statistic->statistic.field[DRS_FSSTAT_SEND_META_RES] += 1; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_META_RES], FS_OP_ADD, 1);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/json"); evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/json");
evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive"); evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive");
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length); evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
@@ -131,8 +147,8 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
evbuffer_free(evbuf); evbuffer_free(evbuf);
} }
void doris_response_file_range(struct evhttp_request *req, const char *tablename, void doris_response_file_range(struct evhttp_request *req, const char *bizname, const char *tablename,
int64_t verlong, size_t start, size_t end, bool range, struct worker_statistic_info *statistic) int64_t verlong, size_t start, size_t end, bool range)
{ {
struct version_list_node *vernode; struct version_list_node *vernode;
struct table_list_node *tablenode; struct table_list_node *tablenode;
@@ -140,16 +156,26 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
struct evbuffer *evbuf; struct evbuffer *evbuf;
char length[128]; char length[128];
size_t filesize, res_length=0, copy_len, offset=start; size_t filesize, res_length=0, copy_len, offset=start;
struct doris_business *business;
map<string, struct doris_business*>::iterator iter;
pthread_rwlock_rdlock(&g_doris_server_info.rwlock); if((iter = g_doris_server_info.name2business->find(string(bizname)))==g_doris_server_info.name2business->end())
if(verlong > g_doris_server_info.cfgver_head->latest_version)
{ {
pthread_rwlock_unlock(&g_doris_server_info.rwlock); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES_404] += 1; evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, business invalid");
return;
}
business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
if(verlong > business->cfgver_head->latest_version)
{
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "Version too old"); evhttp_send_error(req, HTTP_NOTFOUND, "Version too old");
return; return;
} }
vernode = TAILQ_FIRST(&g_doris_server_info.cfgver_head->version_head); vernode = TAILQ_FIRST(&business->cfgver_head->version_head);
while(vernode->version < verlong) while(vernode->version < verlong)
{ {
vernode = TAILQ_NEXT(vernode, version_node); vernode = TAILQ_NEXT(vernode, version_node);
@@ -161,8 +187,8 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
} }
if(tablenode==NULL || start>tablenode->filesize) if(tablenode==NULL || start>tablenode->filesize)
{ {
pthread_rwlock_unlock(&g_doris_server_info.rwlock); pthread_rwlock_unlock(&business->rwlock);
statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES_404] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "No valid content found"); evhttp_send_error(req, HTTP_NOTFOUND, "No valid content found");
return; return;
} }
@@ -183,12 +209,12 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
offset += copy_len; offset += copy_len;
res_length += copy_len; res_length += copy_len;
} }
pthread_rwlock_unlock(&g_doris_server_info.rwlock); pthread_rwlock_unlock(&business->rwlock);
assert(res_length == end + 1 - start); assert(res_length == end + 1 - start);
sprintf(length, "%lu", res_length); sprintf(length, "%lu", res_length);
statistic->statistic.field[DRS_FSSTAT_SEND_FILE_RES] += 1; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_FILE_RES], FS_OP_ADD, 1);
statistic->statistic.field[DRS_FSSTAT_SEND_FILE_BYTES] += res_length; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_BYTES], 0, FS_OP_ADD, res_length);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length); evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
if(range) if(range)
{ {
@@ -203,31 +229,30 @@ void doris_response_file_range(struct evhttp_request *req, const char *tablename
void doris_http_server_file_cb(struct evhttp_request *req, void *arg) void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
{ {
struct worker_statistic_info *statistic=(struct worker_statistic_info *)arg;
struct evkeyvalq params; struct evkeyvalq params;
const char *version, *tablename, *content_range; const char *version, *tablename, *content_range, *bizname;
int64_t verlong; int64_t verlong;
char *endptr=NULL; char *endptr=NULL;
size_t req_start=0, req_end=0; size_t req_start=0, req_end=0;
statistic->statistic.field[DRS_FSSTAT_CLIENT_FILE_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_FILE_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params)) if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{ {
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid"); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return; return;
} }
if(NULL==(version=evhttp_find_header(&params, "version")) || NULL==(tablename=evhttp_find_header(&params, "tablename"))) if(NULL==(version=evhttp_find_header(&params, "version")) || NULL==(tablename=evhttp_find_header(&params, "tablename")))
{ {
evhttp_clear_headers(&params); evhttp_clear_headers(&params);
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version/tablename found"); evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version/tablename found");
return; return;
} }
if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0') if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0')
{ {
evhttp_clear_headers(&params); evhttp_clear_headers(&params);
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid"); evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return; return;
} }
@@ -235,12 +260,19 @@ void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
sscanf(content_range, "%*[^0-9]%lu-%lu", &req_start, &req_end)<1) sscanf(content_range, "%*[^0-9]%lu-%lu", &req_start, &req_end)<1)
{ {
evhttp_clear_headers(&params); evhttp_clear_headers(&params);
statistic->statistic.field[DRS_FSSTAT_CLIENT_INVALID_REQ] += 1; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid"); evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid");
return; return;
} }
if(NULL == (bizname = evhttp_find_header(&params, "business")))
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no business found");
return;
}
doris_response_file_range(req, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true, statistic); doris_response_file_range(req, bizname, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true);
evhttp_clear_headers(&params); evhttp_clear_headers(&params);
} }
@@ -350,14 +382,10 @@ void* thread_doris_http_server(void *arg)
{ {
struct event_base *worker_evbase; struct event_base *worker_evbase;
struct evhttp *worker_http; struct evhttp *worker_http;
struct worker_statistic_info statistic;
struct timeval tv;
prctl(PR_SET_NAME, "http_server"); prctl(PR_SET_NAME, "http_server");
memset(&statistic, 0, sizeof(struct worker_statistic_info));
worker_evbase = event_base_new(); worker_evbase = event_base_new();
worker_http = evhttp_new(worker_evbase); worker_http = evhttp_new(worker_evbase);
if(g_doris_server_info.ssl_conn_on) if(g_doris_server_info.ssl_conn_on)
@@ -370,9 +398,9 @@ void* thread_doris_http_server(void *arg)
evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance); evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance);
} }
evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, &statistic); evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, NULL);
evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, &statistic); evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, NULL);
evhttp_set_gencb(worker_http, doris_http_server_generic_cb, &statistic); evhttp_set_gencb(worker_http, doris_http_server_generic_cb, NULL);
evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_GET|EVHTTP_REQ_HEAD); evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_GET|EVHTTP_REQ_HEAD);
if(evhttp_accept_socket(worker_http, g_doris_server_info.listener)) if(evhttp_accept_socket(worker_http, g_doris_server_info.listener))
@@ -381,11 +409,6 @@ void* thread_doris_http_server(void *arg)
assert(0); return NULL; assert(0); return NULL;
} }
evtimer_assign(&statistic.timer_statistic, worker_evbase, doris_worker_statistic_timer_cb, &statistic);
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
evtimer_add(&statistic.timer_statistic, &tv);
event_base_dispatch(worker_evbase); event_base_dispatch(worker_evbase);
printf("Libevent dispath error, should not run here.\n"); printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");

View File

@@ -17,7 +17,7 @@
#include "doris_server_http.h" #include "doris_server_http.h"
struct doris_global_info g_doris_server_info; struct doris_global_info g_doris_server_info;
static unsigned long doris_vesion_20210719=20210719L; static unsigned long doris_vesion_20210722=20210722L;
int doris_mkdir_according_path(const char * path) int doris_mkdir_according_path(const char * path)
{ {
@@ -54,9 +54,24 @@ int doris_mkdir_according_path(const char * path)
return 0; return 0;
} }
static int doris_chech_name_valid(const char *name)
{
size_t i, namelen=strlen(name);
for(i=0; i<namelen; i++)
{
if(!((name[i]>='a' && name[i]<='z')||(name[i]>='A' && name[i]<='Z') ||
(name[i]>='0' && name[i]<='9') || name[i]=='_' || name[i]==':'))
{
return false;
}
}
return true;
}
int32_t doris_read_profile_configs(const char *config_file) int32_t doris_read_profile_configs(const char *config_file)
{ {
char tmp_buf[4096], tmp_dir[512], tmp_dir2[512]; char tmp_buf[4096], tmp_dir[512];
MESA_load_profile_string_def(config_file, "DORIS_SERVER", "run_log_dir", g_doris_server_info.root_log_dir, sizeof(g_doris_server_info.root_log_dir), "./log"); MESA_load_profile_string_def(config_file, "DORIS_SERVER", "run_log_dir", g_doris_server_info.root_log_dir, sizeof(g_doris_server_info.root_log_dir), "./log");
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "run_log_lv", &g_doris_server_info.log_level, 10); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "run_log_lv", &g_doris_server_info.log_level, 10);
@@ -86,37 +101,8 @@ int32_t doris_read_profile_configs(const char *config_file)
} }
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_write_file_on", &g_doris_server_info.write_file_sw, 1);
if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "store_config_path", g_doris_server_info.store_path_root, sizeof(g_doris_server_info.store_path_root)))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]store_config_path not found!", config_file);
assert(0);return -1;
}
snprintf(tmp_dir, 512, "%s/full/index", g_doris_server_info.store_path_root);
snprintf(tmp_dir2,512, "%s/inc/index", g_doris_server_info.store_path_root);
if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
return -1;
}
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "receive_config_way", &g_doris_server_info.recv_way, RECV_WAY_DRS_CLIENT);
if(g_doris_server_info.recv_way == RECV_WAY_IDX_FILE)
{
if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "receive_config_path_full", g_doris_server_info.recv_path_full, sizeof(g_doris_server_info.recv_path_full)))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]receive_config_path not found!", config_file);
assert(0);return -1;
}
if(0>MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "receive_config_path_inc", g_doris_server_info.recv_path_inc, sizeof(g_doris_server_info.recv_path_inc)))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]receive_config_path not found!", config_file);
assert(0);return -1;
}
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
}
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0); MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0);
if(g_doris_server_info.ssl_conn_on) if(g_doris_server_info.ssl_conn_on)
{ {
@@ -143,9 +129,11 @@ int32_t doris_read_profile_configs(const char *config_file)
static int doris_server_register_field_stat(struct doris_global_info *param) static int doris_server_register_field_stat(struct doris_global_info *param)
{ {
const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvFullVer", "RecvIncVer", "RecvErrVer", "FileStarts", "FileComplete", const char *field_names[DRS_FSSTAT_FIELD_NUM]={"RecvErrVer", "FileStarts", "FileComplete", "ClientInvReq",
"ClientInvReq", "ClientMetaReq", "SendResMeta", "SendNoNewMeta", "ClientFileReq", "SendFiles", "SendBytes", "SendFile404"}; "ClientMetaReq", "SendNoNewMeta", "ClientFileReq", "SendBytes", "SendFile404"};
const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed", "CurFullVer", "CurIncVer", "TotalCfgNum"}; const char *status_names[DRS_FSSTAT_STATUS_NUM]={"MemoryUsed"};
const char *column_names[DRS_FSSTAT_CLUMN_NUM]={"RecvFullVer", "RecvIncVer", "RecvFiles", "SendResMeta", "SendFiles",
"CurFullVer", "CurIncVer", "TotalCfgNum"};
int value; int value;
param->fsstat_handle = FS_create_handle(); param->fsstat_handle = FS_create_handle();
@@ -176,6 +164,10 @@ static int doris_server_register_field_stat(struct doris_global_info *param)
{ {
param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
} }
for(int i=0; i<DRS_FSSTAT_CLUMN_NUM; i++)
{
param->fsstat_column[i] = FS_register(param->fsstat_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, column_names[i]);
}
FS_start(param->fsstat_handle); FS_start(param->fsstat_handle);
return 0; return 0;
} }
@@ -191,6 +183,96 @@ static void instance_fsstat_output_timer_cb(int fd, short kind, void *userp)
event_add((struct event*)userp, &tv); event_add((struct event*)userp, &tv);
} }
static int32_t doris_init_config_for_business(struct doris_global_info *info, struct event_base *manage_evbase, const char *config_file)
{
char tmpbuffer[4096], tmp_dir[256], tmp_dir2[256], *bizname, *save=NULL;
struct doris_business *business;
map<string, struct doris_parameter *>::iterator iter;
if(0>=MESA_load_profile_string_nodef(config_file, "DORIS_SERVER", "business_system_list", tmpbuffer, sizeof(tmpbuffer)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list not found!", config_file);
assert(0);return -1;
}
for(bizname=strtok_r(tmpbuffer, ";", &save); bizname!=NULL; bizname=strtok_r(NULL, ";", &save))
{
if(!doris_chech_name_valid(bizname))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]business_system_list bizname %s invalid, name must match: [a-zA-Z_:][a-zA-Z0-9_:]", bizname, config_file);
assert(0);return -1;
}
business = &info->business[info->business_num++];
snprintf(business->bizname, sizeof(business->bizname), "%s", bizname);
pthread_rwlock_init(&business->rwlock, NULL);
business->cfgver_head = config_version_handle_new();
if(info->name2business->find(string(business->bizname)) != info->name2business->end())
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]business_system_list duplicate system name: %s!", bizname, config_file, business->bizname);
assert(0);return -1;
}
info->name2business->insert(make_pair(string(business->bizname), business));
MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3);
MESA_load_profile_uint_def(config_file, business->bizname, "doris_write_file_on", &business->write_file_sw, 1);
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file);
assert(0);return -1;
}
snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root);
snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root);
if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno));
return -1;
}
MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT);
if(business->recv_way == RECV_WAY_DRS_CLIENT)
{
if(0>=MESA_load_profile_string_nodef(config_file, business->bizname, "doris_client_confile", tmp_dir, sizeof(tmp_dir)))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "%s: [DORIS_SERVER]doris_client_confile not found!", config_file);
assert(0);return -1;
}
if((iter=info->confile2param->find(string(tmp_dir))) != info->confile2param->end())
{
business->param = iter->second;
}
else
{
business->param = doris_parameter_new(tmp_dir, manage_evbase, info->log_runtime);
if(business->param == NULL)
{
assert(0);return -2;
}
info->confile2param->insert(make_pair(string(tmp_dir), business->param));
}
}
else
{
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_full", business->recv_path_full, sizeof(business->recv_path_full)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
assert(0);return -1;
}
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "receive_config_path_inc", business->recv_path_inc, sizeof(business->recv_path_inc)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]receive_config_path not found!", bizname, config_file);
assert(0);return -1;
}
}
business->fs_lineid = FS_register(info->fsstat_handle, FS_STYLE_LINE, FS_CALC_CURRENT, business->bizname);;
snprintf(tmp_dir, 512, "latest_cfg_version_%s", business->bizname);
business->mm_latest_ver = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Latest doris config version.");
snprintf(tmp_dir, 512, "total_config_num_%s", business->bizname);
business->mm_total_cfgnum = MESA_Monitor_register(info->monitor, tmp_dir, MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
}
return 0;
}
static void manager_statistic_threads_requests_cb(struct evhttp_request *req, void *arg) static void manager_statistic_threads_requests_cb(struct evhttp_request *req, void *arg)
{ {
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported."); evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
@@ -224,30 +306,11 @@ int main(int argc, char **argv)
{ {
return -1; return -1;
} }
g_doris_server_info.cfgver_head = config_version_handle_new();
pthread_rwlock_init(&g_doris_server_info.rwlock, NULL);
evthread_use_pthreads(); evthread_use_pthreads();
g_doris_server_info.name2business = new map<string, struct doris_business*>;
g_doris_server_info.confile2param = new map<string, struct doris_parameter *>;
manage_evbase = event_base_new(); manage_evbase = event_base_new();
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if(g_doris_server_info.recv_way == RECV_WAY_DRS_CLIENT)
{
if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, manage_evbase))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
else
{
if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, NULL))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
/*Doris manager server*/ /*Doris manager server*/
g_doris_server_info.manager = doris_create_listen_socket(g_doris_server_info.manager_port); g_doris_server_info.manager = doris_create_listen_socket(g_doris_server_info.manager_port);
if(g_doris_server_info.manager < 0) if(g_doris_server_info.manager < 0)
@@ -263,10 +326,7 @@ int main(int argc, char **argv)
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210719); g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210722);
g_doris_server_info.mm_latest_ver = MESA_Monitor_register(g_doris_server_info.monitor, "latest_cfg_version", MONITOR_METRICS_GAUGE, "Latest doris config version.");
g_doris_server_info.mm_total_cfgnum = MESA_Monitor_register(g_doris_server_info.monitor, "total_config_num", MONITOR_METRICS_GAUGE, "Total config num from latest full version till now.");
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
{ {
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager); printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
@@ -274,6 +334,34 @@ int main(int argc, char **argv)
assert(0); return -7; assert(0); return -7;
} }
//Ϊÿ<CEAA><C3BF>ҵ<EFBFBD><D2B5>ϵͳ<CFB5><CDB3>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD>ȡ<EFBFBD><C8A1><EFBFBD>õĽṹ
if(doris_init_config_for_business(&g_doris_server_info, manage_evbase, NIRVANA_CONFIG_FILE))
{
return -8;
}
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(u_int32_t i=0; i<g_doris_server_info.business_num; i++)
{
if(g_doris_server_info.business[i].recv_way == RECV_WAY_DRS_CLIENT)
{
if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, &g_doris_server_info.business[i]))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
else
{
if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, &g_doris_server_info.business[i]))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
}
}
}
/*Doris http server*/ /*Doris http server*/
if(g_doris_server_info.server_role_sw) if(g_doris_server_info.server_role_sw)
{ {

View File

@@ -18,8 +18,14 @@
#include "MESA_Monitor.h" #include "MESA_Monitor.h"
#include "doris_client.h"
#include "doris_server_receive.h" #include "doris_server_receive.h"
#include <map>
#include <string>
using namespace std;
#ifndef __FILENAME__ #ifndef __FILENAME__
#define __FILENAME__ __FILE__ #define __FILENAME__ __FILE__
#endif #endif
@@ -27,9 +33,30 @@
MESA_handle_runtime_log((handle), (lv), "DorisServer", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args) MESA_handle_runtime_log((handle), (lv), "DorisServer", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args)
#define NIRVANA_CONFIG_FILE "./conf/doris_main.conf" #define NIRVANA_CONFIG_FILE "./conf/doris_main.conf"
#define MAX_BUSINESS_NUM 64
#define RECV_WAY_DRS_CLIENT 1 #define RECV_WAY_DRS_CLIENT 1
#define RECV_WAY_IDX_FILE 2 #define RECV_WAY_IDX_FILE 2
#define RECV_WAY_HTTP_POST 3
struct doris_business
{
char bizname[32];
u_int32_t recv_way;
u_int32_t write_file_sw;
char recv_path_full[256];
char recv_path_inc[256];
char store_path_root[256];
struct version_list_handle *cfgver_head;
struct doris_parameter *param;
int64_t total_cfgnum;
int32_t mm_latest_ver;
int32_t mm_total_cfgnum;
u_int32_t mm_status_codeid; //MM<4D>ڲ<EFBFBD><DAB2>쳣״̬id
u_int32_t fs_lineid;
pthread_rwlock_t rwlock;
};
struct doris_global_info struct doris_global_info
{ {
@@ -38,15 +65,9 @@ struct doris_global_info
int32_t manager_port; int32_t manager_port;
int32_t sock_recv_bufsize; int32_t sock_recv_bufsize;
u_int32_t ssl_conn_on; u_int32_t ssl_conn_on;
u_int32_t recv_way;
char recv_path_full[256];
char recv_path_inc[256];
char store_path_root[256];
char store_path_inc[256];
u_int32_t scan_idx_interval; u_int32_t scan_idx_interval;
u_int32_t cache_frag_size; u_int32_t cache_frag_size;
u_int32_t server_role_sw; u_int32_t server_role_sw;
u_int32_t write_file_sw;
char ssl_CA_path[256]; char ssl_CA_path[256];
char ssl_cert_file[256]; char ssl_cert_file[256];
@@ -55,15 +76,16 @@ struct doris_global_info
pthread_mutex_t *lock_cs; pthread_mutex_t *lock_cs;
SSL_CTX *ssl_instance; SSL_CTX *ssl_instance;
struct version_list_handle *cfgver_head;
evutil_socket_t listener; evutil_socket_t listener;
evutil_socket_t manager; evutil_socket_t manager;
pthread_rwlock_t rwlock;
struct doris_business business[MAX_BUSINESS_NUM];
u_int32_t business_num;
map<string, struct doris_business*> *name2business;
map<string, struct doris_parameter *> *confile2param;
struct MESA_MonitorHandler *monitor; struct MESA_MonitorHandler *monitor;
int32_t mm_latest_ver;
int32_t mm_total_cfgnum;
/*logs*/ /*logs*/
u_int32_t log_level; u_int32_t log_level;
u_int32_t statistic_period; u_int32_t statistic_period;
@@ -79,6 +101,7 @@ struct doris_global_info
int32_t fsstat_dst_port; int32_t fsstat_dst_port;
int32_t fsstat_field[DRS_FSSTAT_FIELD_NUM]; int32_t fsstat_field[DRS_FSSTAT_FIELD_NUM];
int32_t fsstat_status[DRS_FSSTAT_STATUS_NUM]; int32_t fsstat_status[DRS_FSSTAT_STATUS_NUM];
int32_t fsstat_column[DRS_FSSTAT_CLUMN_NUM];
}; };
int doris_mkdir_according_path(const char * path); int doris_mkdir_according_path(const char * path);

View File

@@ -16,6 +16,7 @@
struct scanner_timer_priv struct scanner_timer_priv
{ {
struct doris_business *business;
struct doris_callbacks doris_cbs; struct doris_callbacks doris_cbs;
struct doris_arguments doris_args; struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner; struct doris_idxfile_scanner *scanner;
@@ -24,45 +25,16 @@ struct scanner_timer_priv
extern struct doris_global_info g_doris_server_info; extern struct doris_global_info g_doris_server_info;
void config_frag_node_cleanup(struct cont_frag_node *fragnode)
void doris_worker_statistic_timer_cb(int fd, short kind, void *userp)
{
struct worker_statistic_info *statistic = (struct worker_statistic_info *)userp;
struct timeval tv;
struct doris_srv_statistics incr_statistic;
long long *plast_statistic = (long long*)&statistic->statistic_last;
long long *pnow_statistic = (long long*)&statistic->statistic;
long long *pinc_statistic = (long long*)&incr_statistic;
for(u_int32_t i=0; i<sizeof(struct doris_srv_statistics)/sizeof(long long); i++)
{
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
}
statistic->statistic_last = statistic->statistic;
for(u_int32_t i=0; i<DRS_FSSTAT_FIELD_NUM; i++)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
}
for(u_int32_t i=0; i<DRS_FSSTAT_STATUS_NUM; i++)
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
}
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
event_add(&statistic->timer_statistic, &tv);
}
void config_frag_node_cleanup(struct confile_save *save, struct cont_frag_node *fragnode)
{ {
if(fragnode == NULL) return; if(fragnode == NULL) return;
save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] -= fragnode->totalsize; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_SUB, fragnode->totalsize);
free(fragnode->content); free(fragnode->content);
free(fragnode); free(fragnode);
} }
void config_table_node_cleanup(struct confile_save *save, struct table_list_node *table_node) void config_table_node_cleanup(struct table_list_node *table_node)
{ {
struct cont_frag_node *fragnode; struct cont_frag_node *fragnode;
@@ -71,12 +43,12 @@ void config_table_node_cleanup(struct confile_save *save, struct table_list_node
while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head))) while(NULL != (fragnode = TAILQ_FIRST(&table_node->frag_head)))
{ {
TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node); TAILQ_REMOVE(&table_node->frag_head, fragnode, frag_node);
config_frag_node_cleanup(save, fragnode); config_frag_node_cleanup(fragnode);
} }
free(table_node); free(table_node);
} }
void config_version_node_cleanup(struct confile_save *save, struct version_list_node *vernode) void config_version_node_cleanup(struct version_list_node *vernode)
{ {
struct table_list_node *tablenode; struct table_list_node *tablenode;
@@ -85,7 +57,7 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_
while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head))) while(NULL != (tablenode = TAILQ_FIRST(&vernode->table_head)))
{ {
TAILQ_REMOVE(&vernode->table_head, tablenode, table_node); TAILQ_REMOVE(&vernode->table_head, tablenode, table_node);
config_table_node_cleanup(save, tablenode); config_table_node_cleanup(tablenode);
} }
free(vernode->metacont); free(vernode->metacont);
cJSON_Delete(vernode->metajson); cJSON_Delete(vernode->metajson);
@@ -94,14 +66,14 @@ void config_version_node_cleanup(struct confile_save *save, struct version_list_
free(vernode); free(vernode);
} }
void config_version_handle_cleanup(struct confile_save *save, struct version_list_handle *version) void config_version_handle_cleanup(struct version_list_handle *version)
{ {
struct version_list_node *vernode; struct version_list_node *vernode;
while(NULL != (vernode = TAILQ_FIRST(&version->version_head))) while(NULL != (vernode = TAILQ_FIRST(&version->version_head)))
{ {
TAILQ_REMOVE(&version->version_head, vernode, version_node); TAILQ_REMOVE(&version->version_head, vernode, version_node);
config_version_node_cleanup(save, vernode); config_version_node_cleanup(vernode);
} }
free(version); free(version);
} }
@@ -134,7 +106,7 @@ static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp)
doris_common_timer_start(&delay_event->timer_event); doris_common_timer_start(&delay_event->timer_event);
return; return;
} }
config_version_handle_cleanup(delay_event->save, handle); config_version_handle_cleanup(handle);
free(delay_event); free(delay_event);
} }
@@ -144,7 +116,6 @@ static void cfgver_handle_delay_destroy(struct confile_save *save, struct event_
delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event)); delay_event = (struct common_timer_event *)malloc(sizeof(struct common_timer_event));
delay_event->data = version; delay_event->data = version;
delay_event->save = save;
evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event); evtimer_assign(&delay_event->timer_event, evbase, cfgver_delay_destroy_timer_cb, delay_event);
doris_common_timer_start(&delay_event->timer_event); doris_common_timer_start(&delay_event->timer_event);
} }
@@ -156,16 +127,20 @@ void doris_config_file_version_start(struct doris_instance *instance, cJSON *met
if(save->type == CFG_UPDATE_TYPE_FULL) if(save->type == CFG_UPDATE_TYPE_FULL)
{ {
snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version); snprintf(save->inc_index_path, 512, "%s/inc/index/full_config_index.%010lu", save->business->store_path_root, save->version);
snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version); snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", g_doris_server_info.store_path_root, save->version); snprintf(save->full_index_path, 512, "%s/full/index/full_config_index.%010lu", save->business->store_path_root, save->version);
} }
else else
{ {
snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", g_doris_server_info.store_path_root, save->version); snprintf(save->inc_index_path, 512, "%s/inc/index/inc_config_index.%010lu", save->business->store_path_root, save->version);
snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", g_doris_server_info.store_path_root, save->version); snprintf(save->tmp_index_path, 512, "%s/inc/full_config_index.%010lu.ing", save->business->store_path_root, save->version);
}
if(NULL==(save->fp_idx_file = fopen(save->tmp_index_path, "w+")))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->tmp_index_path, strerror(errno));
assert(0);
} }
save->fp_idx_file = fopen(save->tmp_index_path, "w+");
} }
void doris_config_file_version_finish(struct doris_instance *instance, void *userdata) void doris_config_file_version_finish(struct doris_instance *instance, void *userdata)
@@ -175,18 +150,18 @@ void doris_config_file_version_finish(struct doris_instance *instance, void *use
fclose(save->fp_idx_file); fclose(save->fp_idx_file);
if(rename(save->tmp_index_path, save->inc_index_path)) if(rename(save->tmp_index_path, save->inc_index_path))
{ {
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
assert(0); assert(0);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno));
} }
if(save->type == CFG_UPDATE_TYPE_FULL) if(save->type == CFG_UPDATE_TYPE_FULL)
{ {
if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //<2F><><EFBFBD><EFBFBD>Ӳ<EFBFBD><D3B2><EFBFBD><EFBFBD> if(link(save->inc_index_path, save->full_index_path) && errno!=EEXIST) //<2F><><EFBFBD><EFBFBD>Ӳ<EFBFBD><D3B2><EFBFBD><EFBFBD>
{ {
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, rename %s to %s failed: %s", save->business->bizname, save->tmp_index_path, save->inc_index_path, strerror(errno));
assert(0); assert(0);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "rename %s to %s failed: %s", save->tmp_index_path, save->inc_index_path, strerror(errno));
} }
} }
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu write finished, index file: %s", save->version, save->inc_index_path); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu write finished, index file: %s", save->business->bizname, save->version, save->inc_index_path);
} }
void doris_config_file_version_error(struct doris_instance *instance, void *userdata) void doris_config_file_version_error(struct doris_instance *instance, void *userdata)
@@ -203,7 +178,7 @@ void doris_config_file_version_error(struct doris_instance *instance, void *user
fclose(save->fp_cfg_file); fclose(save->fp_cfg_file);
remove(save->cfg_file_path); remove(save->cfg_file_path);
} }
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %llu error, rolling back...", save->version); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version);
} }
void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
@@ -217,7 +192,7 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char
type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc"; type = (save->type == CFG_UPDATE_TYPE_FULL)?"full":"inc";
now = time(NULL); now = time(NULL);
localtm = localtime_r(&now, &savetime); localtm = localtime_r(&now, &savetime);
snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", g_doris_server_info.store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday); snprintf(dir, 256, "%s/%s/%04d-%02d-%02d", save->business->store_path_root, type, localtm->tm_year+1900, localtm->tm_mon+1, localtm->tm_mday);
if(access(dir, F_OK)) if(access(dir, F_OK))
{ {
doris_mkdir_according_path(dir); doris_mkdir_according_path(dir);
@@ -225,8 +200,15 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char
snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version); snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version);
fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path); fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path);
save->fp_cfg_file = fopen(save->cfg_file_path, "w+"); if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+")))
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s start writing...", save->cfg_file_path); {
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
assert(0);
}
else
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s start writing...", save->business->bizname, save->cfg_file_path);
}
} }
@@ -236,7 +218,11 @@ void doris_config_file_cfgfile_update(struct doris_instance *instance, const cha
size_t writen_len; size_t writen_len;
writen_len = fwrite(data, 1, len, save->fp_cfg_file); writen_len = fwrite(data, 1, len, save->fp_cfg_file);
assert(writen_len==len); if(writen_len != len)
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
assert(0);
}
} }
void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata) void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *userdata)
@@ -244,7 +230,7 @@ void doris_config_file_cfgfile_finish(struct doris_instance *instance, void *use
struct confile_save *save=(struct confile_save *)userdata; struct confile_save *save=(struct confile_save *)userdata;
fclose(save->fp_cfg_file); fclose(save->fp_cfg_file);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "File %s write finished", save->cfg_file_path); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, File %s write finished", save->business->bizname, save->cfg_file_path);
} }
/*memϵ<6D>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>*/ /*memϵ<6D>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>*/
@@ -277,24 +263,26 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user
cJSON_Delete(save->cur_vernode->metajson); cJSON_Delete(save->cur_vernode->metajson);
save->cur_vernode->metajson = NULL; save->cur_vernode->metajson = NULL;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Version %lu mem finished, info: %s", save->version, save->cur_vernode->metacont); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %lu mem finished, info: %s", save->business->bizname, save->version, save->cur_vernode->metacont);
if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && g_doris_server_info.cfgver_head->latest_version!=0) if(save->cur_vernode->cfg_type==CFG_UPDATE_TYPE_FULL && save->business->cfgver_head->latest_version!=0)
{ {
cur_version = config_version_handle_new(); cur_version = config_version_handle_new();
cur_version->latest_version = save->cur_vernode->version; cur_version->latest_version = save->cur_vernode->version;
TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node); TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node);
pthread_rwlock_wrlock(&g_doris_server_info.rwlock); pthread_rwlock_wrlock(&save->business->rwlock);
tmplist = g_doris_server_info.cfgver_head; tmplist = save->business->cfgver_head;
g_doris_server_info.cfgver_head = cur_version; save->business->cfgver_head = cur_version;
pthread_rwlock_unlock(&g_doris_server_info.rwlock); pthread_rwlock_unlock(&save->business->rwlock);
cfgver_handle_delay_destroy(save, save->evbase, tmplist); cfgver_handle_delay_destroy(save, save->evbase, tmplist);
} }
else else
{ {
TAILQ_INSERT_TAIL(&g_doris_server_info.cfgver_head->version_head, save->cur_vernode, version_node); pthread_rwlock_wrlock(&save->business->rwlock);
g_doris_server_info.cfgver_head->latest_version = save->cur_vernode->version; TAILQ_INSERT_TAIL(&save->business->cfgver_head->version_head, save->cur_vernode, version_node);
save->business->cfgver_head->latest_version = save->cur_vernode->version;
pthread_rwlock_unlock(&save->business->rwlock);
} }
save->cur_vernode = NULL; save->cur_vernode = NULL;
} }
@@ -303,9 +291,9 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd
{ {
struct confile_save *save=(struct confile_save *)userdata; struct confile_save *save=(struct confile_save *)userdata;
config_frag_node_cleanup(save, save->cur_frag); config_frag_node_cleanup(save->cur_frag);
config_table_node_cleanup(save, save->cur_table); config_table_node_cleanup(save->cur_table);
config_version_node_cleanup(save, save->cur_vernode); config_version_node_cleanup(save->cur_vernode);
save->cur_frag = NULL; save->cur_frag = NULL;
save->cur_table = NULL; save->cur_table = NULL;
save->cur_vernode = NULL; save->cur_vernode = NULL;
@@ -325,7 +313,7 @@ void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char
save->cur_table->filesize = size; save->cur_table->filesize = size;
TAILQ_INIT(&save->cur_table->frag_head); TAILQ_INIT(&save->cur_table->frag_head);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu start loading to memory...", tablename, save->version); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu start loading to memory...", save->business->bizname, tablename, save->version);
} }
void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata) void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
@@ -333,7 +321,7 @@ void doris_config_mem_cfgfile_update(struct doris_instance *instance, const char
struct confile_save *save=(struct confile_save *)userdata; struct confile_save *save=(struct confile_save *)userdata;
size_t cache_len, offset=0; size_t cache_len, offset=0;
save->statistic.statistic.status[DRS_FSSTAT_MEMORY_USED] += len; FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_status[DRS_FSSTAT_MEMORY_USED], 0, FS_OP_ADD, len);
while(len > 0) while(len > 0)
{ {
if(save->cur_frag == NULL) if(save->cur_frag == NULL)
@@ -389,7 +377,7 @@ void doris_config_mem_cfgfile_finish(struct doris_instance *instance, const char
} }
assert(save->cur_table->cur_totallen == save->cur_table->filesize); assert(save->cur_table->cur_totallen == save->cur_table->filesize);
TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node); TAILQ_INSERT_TAIL(&save->cur_vernode->table_head, save->cur_table, table_node);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "table %s.%010llu load to memory finished", save->cur_table->tablename, save->version); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, table %s.%010llu load to memory finished", save->business->bizname, save->cur_table->tablename, save->version);
save->cur_table = NULL; save->cur_table = NULL;
} }
@@ -404,47 +392,49 @@ void doris_config_common_version_start(struct confile_save *save, cJSON *meta)
save->type = sub->valueint; save->type = sub->valueint;
assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC); assert(save->type==CFG_UPDATE_TYPE_FULL || save->type==CFG_UPDATE_TYPE_INC);
save->version_cfgnum = 0; save->version_cfgnum = 0;
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu start updating...", save->version); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu start updating...", save->business->bizname, save->version);
} }
void doris_config_common_version_finish(struct confile_save *save) void doris_config_common_version_finish(struct confile_save *save)
{ {
if(save->type == CFG_UPDATE_TYPE_FULL) if(save->type == CFG_UPDATE_TYPE_FULL)
{ {
save->statistic.statistic.status[DRS_FSSTAT_CUR_FULL_VERSION] = save->version; save->business->total_cfgnum = save->version_cfgnum;
save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] = save->version_cfgnum; FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_FULL_VERSION], FS_OP_SET, save->version);
save->statistic.statistic.field[DRS_FSSTAT_RECV_FULL_VER] += 1; FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_SET, save->version_cfgnum);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FULL_VER], FS_OP_ADD, 1);
} }
else else
{ {
save->statistic.statistic.status[DRS_FSSTAT_CUR_INC_VERSION] = save->version; save->business->total_cfgnum += save->version_cfgnum;
save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM] += save->version_cfgnum; FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CUR_INC_VERSION], FS_OP_SET, save->version);
save->statistic.statistic.field[DRS_FSSTAT_RECV_INC_VER] += 1; FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_CONFIG_TOTAL_NUM], FS_OP_ADD, save->version_cfgnum);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_INC_VER], FS_OP_ADD, 1);
} }
MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_latest_ver, MONITOR_VALUE_SET, save->version); MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_latest_ver, MONITOR_VALUE_SET, save->version);
MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mm_total_cfgnum, MONITOR_VALUE_SET, MESA_Monitor_operation(g_doris_server_info.monitor, save->business->mm_total_cfgnum, MONITOR_VALUE_SET, save->business->total_cfgnum);
save->statistic.statistic.status[DRS_FSSTAT_CONFIG_TOTAL_NUM]); MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, save->business->mm_status_codeid, NULL, NULL);
MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_CLEAR, MONITOR_STATUS_VERSION_ERR, NULL, NULL); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "business: %s, Version %lu update finished", save->business->bizname, save->version);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "Version %lu update finished", save->version);
} }
void doris_config_common_version_error(struct confile_save *save) void doris_config_common_version_error(struct confile_save *save)
{ {
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_ERR_VER], 0, FS_OP_ADD, 1);
//Grafana+Promethues<65><73>չʾ<D5B9>ڲ<EFBFBD><DAB2>쳣״̬ //Grafana+Promethues<65><73>չʾ<D5B9>ڲ<EFBFBD><DAB2>쳣״̬
MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, save->business->mm_status_codeid, MESA_Monitor_set_status_code(g_doris_server_info.monitor, MONITOR_STATUS_OP_SET, save->business->mm_status_codeid,
"Version receive error", "Receive config file error, please check producer"); "Version receive error", "Receive config file error, please check producer");
} }
void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum) void doris_config_common_cfgfile_start(struct confile_save *save, u_int32_t cfgnum)
{ {
save->version_cfgnum += cfgnum; save->version_cfgnum += cfgnum;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_START_FILES], 0, FS_OP_ADD, 1);
} }
void doris_config_common_cfgfile_finish(struct confile_save *save) void doris_config_common_cfgfile_finish(struct confile_save *save)
{ {
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_RECV_CMPLT_FILES], 0, FS_OP_ADD, 1);
FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1); FS_operate(g_doris_server_info.fsstat_handle, save->business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_RECV_FILES], FS_OP_ADD, 1);
} }
@@ -504,8 +494,10 @@ void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const
/*<2A>ޱ<EFBFBD><DEB1><EFBFBD>ϵ<EFBFBD>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ص<EFBFBD>*/ /*<2A>ޱ<EFBFBD><DEB1><EFBFBD>ϵ<EFBFBD>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ص<EFBFBD>*/
void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata) void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
doris_config_common_version_start((struct confile_save *)userdata, meta); doris_config_common_version_start((struct confile_save *)userdata, meta);
if(save->business->write_file_sw) if(save->business->write_file_sw)
{ {
doris_config_file_version_start(instance, meta, userdata); doris_config_file_version_start(instance, meta, userdata);
@@ -517,7 +509,9 @@ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, vo
} }
void doris_config_version_finish(struct doris_instance *instance, void *userdata) void doris_config_version_finish(struct doris_instance *instance, void *userdata)
{ {
struct confile_save *save=(struct confile_save *)userdata;
if(save->business->write_file_sw) if(save->business->write_file_sw)
{ {
doris_config_file_version_finish(instance, userdata); doris_config_file_version_finish(instance, userdata);
@@ -530,8 +524,10 @@ void doris_config_version_finish(struct doris_instance *instance, void *userdata
} }
void doris_config_version_error(struct doris_instance *instance, void *userdata) void doris_config_version_error(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
doris_config_common_version_error((struct confile_save *)userdata); doris_config_common_version_error((struct confile_save *)userdata);
if(save->business->write_file_sw) if(save->business->write_file_sw)
{ {
doris_config_file_version_error(instance, userdata); doris_config_file_version_error(instance, userdata);
@@ -543,8 +539,10 @@ void doris_config_version_error(struct doris_instance *instance, void *userdata)
} }
void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata) void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum); doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
if(save->business->write_file_sw) if(save->business->write_file_sw)
{ {
doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userdata); doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userdata);
@@ -556,7 +554,9 @@ void doris_config_cfgfile_start(struct doris_instance *instance, const char *tab
} }
void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata) void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{ {
struct confile_save *save=(struct confile_save *)userdata;
if(save->business->write_file_sw) if(save->business->write_file_sw)
{ {
doris_config_file_cfgfile_update(instance, data, len, userdata); doris_config_file_cfgfile_update(instance, data, len, userdata);
@@ -568,8 +568,10 @@ void doris_config_cfgfile_update(struct doris_instance *instance, const char *da
} }
void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata) void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
doris_config_common_cfgfile_finish((struct confile_save *)userdata); doris_config_common_cfgfile_finish((struct confile_save *)userdata);
if(save->business->write_file_sw) if(save->business->write_file_sw)
{ {
doris_config_file_cfgfile_finish(instance, userdata); doris_config_file_cfgfile_finish(instance, userdata);
@@ -581,16 +583,15 @@ void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md
} }
void* thread_doris_client_recv_cfg(void *arg) void* thread_doris_client_recv_cfg(void *arg)
{ {
struct event_base *manage_evbase=(struct event_base *)arg, *client_evbase; struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase; struct event_base *client_evbase;
struct doris_instance *instance; struct doris_instance *instance;
struct doris_callbacks doris_cbs; struct doris_callbacks doris_cbs;
struct doris_arguments doris_args; struct doris_arguments doris_args;
struct doris_idxfile_scanner *scanner; struct doris_idxfile_scanner *scanner;
enum DORIS_UPDATE_TYPE update_type; enum DORIS_UPDATE_TYPE update_type;
struct confile_save save; struct confile_save save;
char stored_path[512];
char stored_path[512]; char stored_path[512];
prctl(PR_SET_NAME, "client_recv"); prctl(PR_SET_NAME, "client_recv");
@@ -599,6 +600,7 @@ void* thread_doris_client_recv_cfg(void *arg)
memset(&save, 0, sizeof(struct confile_save)); memset(&save, 0, sizeof(struct confile_save));
save.source_from = RECV_WAY_IDX_FILE; save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
save.business = business; save.business = business;
scanner = doris_index_file_scanner(0); scanner = doris_index_file_scanner(0);
@@ -611,10 +613,10 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
doris_cbs.userdata = &save; doris_cbs.userdata = &save;
snprintf(stored_path, 512, "%s/full/index", business->store_path_root); snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR); assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do { do {
update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime);
@@ -630,24 +632,15 @@ void* thread_doris_client_recv_cfg(void *arg)
doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_update = doris_config_cfgfile_update;
doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
save.source_from = RECV_WAY_DRS_CLIENT;
memset(&doris_args, 0, sizeof(struct doris_arguments)); memset(&doris_args, 0, sizeof(struct doris_arguments));
doris_args.current_version = scanner->cur_version; doris_args.current_version = scanner->cur_version;
param = doris_parameter_new(NIRVANA_CONFIG_FILE, manage_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); sprintf(doris_args.bizname, "%s", business->bizname);
if(param == NULL)
{
assert(0);return NULL;
}
instance = doris_instance_new(business->param, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); instance = doris_instance_new(business->param, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime);
if(instance == NULL) if(instance == NULL)
{ {
assert(0);return NULL; assert(0);return NULL;
} }
evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic);
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
evtimer_add(&save.statistic.timer_statistic, &tv);
event_base_dispatch(client_evbase); event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n"); printf("Libevent dispath error, should not run here.\n");
@@ -661,7 +654,7 @@ static void doris_scanner_timer_cb(int fd, short kind, void *userp)
enum DORIS_UPDATE_TYPE update_type; enum DORIS_UPDATE_TYPE update_type;
struct timeval tv; struct timeval tv;
do { do {
update_type = doris_index_file_traverse(timer_priv->scanner, timer_priv->business->recv_path_inc, update_type = doris_index_file_traverse(timer_priv->scanner, timer_priv->business->recv_path_inc,
&timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime); &timer_priv->doris_cbs, NULL, g_doris_server_info.log_runtime);
}while(update_type != CFG_UPDATE_TYPE_NONE); }while(update_type != CFG_UPDATE_TYPE_NONE);
@@ -672,6 +665,7 @@ static void doris_scanner_timer_cb(int fd, short kind, void *userp)
} }
void* thread_index_file_recv_cfg(void *arg) void* thread_index_file_recv_cfg(void *arg)
{
struct doris_business *business=(struct doris_business *)arg; struct doris_business *business=(struct doris_business *)arg;
struct event_base *client_evbase; struct event_base *client_evbase;
struct confile_save save; struct confile_save save;
@@ -688,8 +682,10 @@ void* thread_index_file_recv_cfg(void *arg)
client_evbase = event_base_new(); client_evbase = event_base_new();
save.source_from = RECV_WAY_IDX_FILE; save.source_from = RECV_WAY_IDX_FILE;
save.evbase = client_evbase;
save.business = business; save.business = business;
timer_priv.scanner = doris_index_file_scanner(0);
timer_priv.business = business; timer_priv.business = business;
/*Retaive latest config to memory from Stored configs*/ /*Retaive latest config to memory from Stored configs*/
@@ -700,10 +696,10 @@ void* thread_index_file_recv_cfg(void *arg)
timer_priv.doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; timer_priv.doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; timer_priv.doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish;
timer_priv.doris_cbs.userdata = &save; timer_priv.doris_cbs.userdata = &save;
snprintf(stored_path, 512, "%s/full/index", business->store_path_root); snprintf(stored_path, 512, "%s/full/index", business->store_path_root);
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR); assert(update_type!=CFG_UPDATE_TYPE_ERR);
snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); snprintf(stored_path, 512, "%s/inc/index", business->store_path_root);
do{ do{
update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); update_type = doris_index_file_traverse(timer_priv.scanner, stored_path, &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
@@ -718,7 +714,7 @@ void* thread_index_file_recv_cfg(void *arg)
timer_priv.doris_cbs.cfgfile_start = doris_config_cfgfile_start; timer_priv.doris_cbs.cfgfile_start = doris_config_cfgfile_start;
timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update; timer_priv.doris_cbs.cfgfile_update = doris_config_cfgfile_update;
timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; timer_priv.doris_cbs.cfgfile_finish = doris_config_cfgfile_finish;
update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full, update_type = doris_index_file_traverse(timer_priv.scanner, business->recv_path_full,
&timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime); &timer_priv.doris_cbs, NULL, g_doris_server_info.log_runtime);
assert(update_type!=CFG_UPDATE_TYPE_ERR); assert(update_type!=CFG_UPDATE_TYPE_ERR);
@@ -733,11 +729,6 @@ void* thread_index_file_recv_cfg(void *arg)
tv.tv_usec = 0; tv.tv_usec = 0;
evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv); evtimer_assign(&timer_priv.timer_scanner, client_evbase, doris_scanner_timer_cb, &timer_priv);
evtimer_add(&timer_priv.timer_scanner, &tv); evtimer_add(&timer_priv.timer_scanner, &tv);
evtimer_assign(&save.statistic.timer_statistic, client_evbase, doris_worker_statistic_timer_cb, &save.statistic);
tv.tv_sec = g_doris_server_info.fsstat_period;
tv.tv_usec = 0;
evtimer_add(&save.statistic.timer_statistic, &tv);
event_base_dispatch(client_evbase); event_base_dispatch(client_evbase);
printf("Libevent dispath error, should not run here.\n"); printf("Libevent dispath error, should not run here.\n");

View File

@@ -7,34 +7,39 @@
#include <cjson/cJSON.h> #include <cjson/cJSON.h>
#define MONITOR_STATUS_VERSION_ERR 3
enum DORIS_SERVER_FS_FILED enum DORIS_SERVER_FS_FILED
{ {
DRS_FSSTAT_RECV_FULL_VER=0, DRS_FSSTAT_RECV_ERR_VER=0,
DRS_FSSTAT_RECV_INC_VER,
DRS_FSSTAT_RECV_ERR_VER,
DRS_FSSTAT_RECV_START_FILES, DRS_FSSTAT_RECV_START_FILES,
DRS_FSSTAT_RECV_CMPLT_FILES, DRS_FSSTAT_RECV_CMPLT_FILES,
DRS_FSSTAT_CLIENT_INVALID_REQ, DRS_FSSTAT_CLIENT_INVALID_REQ,
DRS_FSSTAT_CLIENT_META_REQ, DRS_FSSTAT_CLIENT_META_REQ,
DRS_FSSTAT_SEND_META_RES,
DRS_FSSTAT_SEND_META_NONEW, DRS_FSSTAT_SEND_META_NONEW,
DRS_FSSTAT_CLIENT_FILE_REQ, DRS_FSSTAT_CLIENT_FILE_REQ,
DRS_FSSTAT_SEND_FILE_RES,
DRS_FSSTAT_SEND_FILE_BYTES, DRS_FSSTAT_SEND_FILE_BYTES,
DRS_FSSTAT_SEND_FILE_RES_404, DRS_FSSTAT_SEND_FILE_RES_404,
DRS_FSSTAT_FIELD_NUM, DRS_FSSTAT_FIELD_NUM,
}; };
enum DORIS_SERVER_FS_COLUMN
{
DRS_FSCLM_RECV_FULL_VER=0,
DRS_FSCLM_RECV_INC_VER,
DRS_FSCLM_RECV_FILES,
DRS_FSCLM_SEND_META_RES,
DRS_FSCLM_SEND_FILE_RES,
DRS_FSCLM_CUR_FULL_VERSION,
DRS_FSCLM_CUR_INC_VERSION,
DRS_FSCLM_CONFIG_TOTAL_NUM,
DRS_FSSTAT_CLUMN_NUM,
};
enum DORIS_SERVER_FS_STATUS enum DORIS_SERVER_FS_STATUS
{ {
DRS_FSSTAT_MEMORY_USED=0, DRS_FSSTAT_MEMORY_USED=0,
DRS_FSSTAT_CUR_FULL_VERSION,
DRS_FSSTAT_CUR_INC_VERSION,
DRS_FSSTAT_CONFIG_TOTAL_NUM,
DRS_FSSTAT_STATUS_NUM, DRS_FSSTAT_STATUS_NUM,
}; };
@@ -82,21 +87,11 @@ struct version_list_handle
struct version_list_handle *config_version_handle_new(void); struct version_list_handle *config_version_handle_new(void);
struct doris_srv_statistics struct doris_business;
{
long long field[DRS_FSSTAT_FIELD_NUM];
long long status[DRS_FSSTAT_STATUS_NUM];
};
struct worker_statistic_info
{
struct event timer_statistic;
struct doris_srv_statistics statistic, statistic_last;
};
struct confile_save struct confile_save
{ {
struct event_base *evbase; struct event_base *evbase;
struct doris_business *business;
int64_t version; int64_t version;
int32_t source_from; int32_t source_from;
int32_t type; int32_t type;
@@ -108,8 +103,6 @@ struct confile_save
FILE *fp_cfg_file; FILE *fp_cfg_file;
FILE *fp_idx_file; FILE *fp_idx_file;
struct worker_statistic_info statistic;
struct version_list_node *cur_vernode; struct version_list_node *cur_vernode;
struct table_list_node *cur_table; struct table_list_node *cur_table;
struct cont_frag_node *cur_frag; struct cont_frag_node *cur_frag;
@@ -118,12 +111,9 @@ struct confile_save
struct common_timer_event struct common_timer_event
{ {
struct event timer_event; struct event timer_event;
struct confile_save *save;
void *data; void *data;
}; };
void doris_worker_statistic_timer_cb(int fd, short kind, void *userp);
void* thread_doris_client_recv_cfg(void *arg); void* thread_doris_client_recv_cfg(void *arg);
void* thread_index_file_recv_cfg(void *arg); void* thread_index_file_recv_cfg(void *arg);