限制缓存在内存的配置文件内容的版本数量,以适应全增量模式的配置下发

This commit is contained in:
linuxrc@163.com
2021-08-04 11:14:56 +08:00
parent dcca411ddc
commit 730a744229
9 changed files with 160 additions and 83 deletions

View File

@@ -18,3 +18,5 @@ add_subdirectory (support)
add_subdirectory (client)
add_subdirectory (server)
INSTALL (DIRECTORY ${PROJECT_SOURCE_DIR}/include/ DESTINATION include)

View File

@@ -24,8 +24,8 @@ target_include_directories(doris_client_dynamic PUBLIC ${PROJECT_SOURCE_DIR}/inc
target_include_directories(doris_client_dynamic PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
set_property(TARGET doris_client_dynamic PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR}/include)
#INSTALL (TARGETS doris_client_static doris_client_dynamic
# LIBRARY DESTINATION lib
# ARCHIVE DESTINATION lib)
INSTALL (TARGETS doris_client_static doris_client_dynamic
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
#INSTALL (DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/ DESTINATION include)
#INSTALL (FILES doris_client_threads.h doris_conhash.h doris_murmurhash.h DESTINATION include)

View File

@@ -4,6 +4,8 @@ server_listen_port=9898
manage_listen_port=2233
https_connection_on=1
cache_file_frag_size=100
#doris_server_role_on=1
#index_file_format_maat=0
business_system_list=T1_1;VoIP
@@ -27,6 +29,7 @@ receive_config_path_inc=./doris_receive_t1/inc/index
[VoIP]
receive_config_way=2
mem_cache_max_versions=2
grafana_monitor_status_id=4
store_config_path=./doris_store_voip
receive_config_path_full=./doris_receive_voip/full/index

View File

@@ -12,6 +12,9 @@
#include <errno.h>
#include <sys/prctl.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <event2/bufferevent_ssl.h>
@@ -71,6 +74,28 @@ int doris_create_listen_socket(int bind_port)
return listener;
}
struct version_list_node *lookup_vernode_accord_version(struct version_list_handle *handle, int64_t version)
{
struct version_list_node *vernode;
map<int64_t, struct version_list_node*>::iterator iter;
vernode = TAILQ_FIRST(&handle->version_head);
if(vernode!=NULL && version<vernode->version) //<2F><>ǰȫ<C7B0><C8AB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>С<EFBFBD>İ汾<C4B0><E6B1BE>
{
version = vernode->version;
}
for(; version <= handle->latest_version; version++)
{
if((iter = handle->version2node->find(version)) != handle->version2node->end())
{
vernode = iter->second;
return vernode;
}
}
return NULL;
}
void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
@@ -121,19 +146,13 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
if(verlong > business->cfgver_head->latest_version)
if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong)))
{
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found");
return;
}
vernode = TAILQ_FIRST(&business->cfgver_head->version_head);
while(vernode->version < verlong)
{
vernode = TAILQ_NEXT(vernode, version_node);
}
evbuf = evbuffer_new();
evbuffer_add(evbuf, vernode->metacont, vernode->metalen);
sprintf(length, "%u", vernode->metalen);
@@ -147,15 +166,58 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
evbuffer_free(evbuf);
}
struct evbuffer *evbuf_content_from_memory(struct table_list_node *tablenode, size_t start, size_t end, size_t *length)
{
struct evbuffer *evbuf;
struct cont_frag_node *fragnode;
size_t copy_len, offset=start, res_length=0;
evbuf = evbuffer_new();
for(fragnode=TAILQ_FIRST(&tablenode->frag_head); fragnode!=NULL && fragnode->start<=end; fragnode=TAILQ_NEXT(fragnode, frag_node))
{
if(offset > fragnode->end)
{
continue;
}
copy_len = (end>fragnode->end)?(fragnode->end-offset + 1):(end-offset + 1);
evbuffer_add(evbuf, fragnode->content+(offset-fragnode->start), copy_len);
offset += copy_len;
res_length += copy_len;
}
*length = res_length;
return evbuf;
}
struct evbuffer *evbuf_content_from_disk(struct table_list_node *tablenode, size_t start, size_t end, size_t *length)
{
struct evbuffer *evbuf;
int32_t fd;
if((fd = open(tablenode->localpath, O_RDONLY, 0)) < 0)
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Send response, open %s failed: %s", strerror(errno));
return NULL;
}
evbuf = evbuffer_new();
if(evbuffer_add_file(evbuf, fd, start, end-start+1))
{
evbuffer_free(evbuf);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Send response, evbuffer_add_file %s failed");
close(fd);
return NULL;
}
*length = evbuffer_get_length(evbuf);
return evbuf;
}
void doris_response_file_range(struct evhttp_request *req, const char *bizname, const char *tablename,
int64_t verlong, size_t start, size_t end, bool range)
{
struct version_list_node *vernode;
struct table_list_node *tablenode;
struct cont_frag_node *fragnode;
struct evbuffer *evbuf;
char length[128];
size_t filesize, res_length=0, copy_len, offset=start;
size_t filesize, res_length;
struct doris_business *business;
map<string, struct doris_business*>::iterator iter;
@@ -168,18 +230,13 @@ void doris_response_file_range(struct evhttp_request *req, const char *bizname,
business = iter->second;
pthread_rwlock_rdlock(&business->rwlock);
if(verlong > business->cfgver_head->latest_version)
if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong)))
{
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "Version too old");
return;
}
vernode = TAILQ_FIRST(&business->cfgver_head->version_head);
while(vernode->version < verlong)
{
vernode = TAILQ_NEXT(vernode, version_node);
}
tablenode = TAILQ_FIRST(&vernode->table_head);
while(tablenode!=NULL && strcmp(tablename, tablenode->tablename))
{
@@ -197,17 +254,13 @@ void doris_response_file_range(struct evhttp_request *req, const char *bizname,
{
end = tablenode->filesize - 1;
}
evbuf = evbuffer_new();
for(fragnode=TAILQ_FIRST(&tablenode->frag_head); fragnode!=NULL && fragnode->start<=end; fragnode=TAILQ_NEXT(fragnode, frag_node))
if(vernode->cont_in_disk)
{
if(offset > fragnode->end)
{
continue;
}
copy_len = (end>fragnode->end)?(fragnode->end-offset + 1):(end-offset + 1);
evbuffer_add(evbuf, fragnode->content+(offset-fragnode->start), copy_len);
offset += copy_len;
res_length += copy_len;
evbuf = evbuf_content_from_disk(tablenode, start, end, &res_length);
}
else
{
evbuf = evbuf_content_from_memory(tablenode, start, end, &res_length);
}
pthread_rwlock_unlock(&business->rwlock);
@@ -390,11 +443,6 @@ void* thread_doris_http_server(void *arg)
if(g_doris_server_info.ssl_conn_on)
{
g_doris_server_info.ssl_instance = doris_connections_create_ssl_ctx();
if(g_doris_server_info.ssl_instance == NULL)
{
assert(0);return NULL;
}
evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance);
}

View File

@@ -1,7 +1,10 @@
#ifndef __DORIS_SERVER_HTTP_H__
#define __DORIS_SERVER_HTTP_H__
#include <openssl/ssl.h>
int doris_create_listen_socket(int bind_port);
SSL_CTX *doris_connections_create_ssl_ctx(void);
void* thread_doris_http_server(void *arg);
#endif

View File

@@ -17,7 +17,7 @@
#include "doris_server_http.h"
struct doris_global_info g_doris_server_info;
static unsigned long doris_vesion_20210803=20210803L;
static unsigned long doris_vesion_20210804=20210804L;
int doris_mkdir_according_path(const char * path)
{
@@ -215,7 +215,7 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st
info->name2business->insert(make_pair(string(business->bizname), business));
MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3);
MESA_load_profile_uint_def(config_file, business->bizname, "doris_write_file_on", &business->write_file_sw, 1);
MESA_load_profile_uint_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0);
if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root)))
{
MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file);
@@ -327,18 +327,18 @@ int main(int argc, char **argv)
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210803);
g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210804);
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
{
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
assert(0); return -7;
assert(0); return -3;
}
//Ϊÿ<CEAA><C3BF>ҵ<EFBFBD><D2B5>ϵͳ<CFB5><CDB3>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD>ȡ<EFBFBD><C8A1><EFBFBD>õĽṹ
if(doris_init_config_for_business(&g_doris_server_info, manage_evbase, NIRVANA_CONFIG_FILE))
{
return -8;
return -4;
}
pthread_attr_init(&attr);
@@ -350,7 +350,7 @@ int main(int argc, char **argv)
if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, &g_doris_server_info.business[i]))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
assert(0);return -5;
}
}
else
@@ -358,7 +358,7 @@ int main(int argc, char **argv)
if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, &g_doris_server_info.business[i]))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
assert(0);return -6;
}
}
}
@@ -369,14 +369,18 @@ int main(int argc, char **argv)
g_doris_server_info.listener = doris_create_listen_socket(g_doris_server_info.server_port);
if(g_doris_server_info.listener < 0)
{
return -5;
return -7;
}
if(g_doris_server_info.ssl_conn_on && NULL==(g_doris_server_info.ssl_instance=doris_connections_create_ssl_ctx()))
{
assert(0);return -8;
}
for(u_int32_t i=0; i<g_doris_server_info.iothreads; i++)
{
if(pthread_create(&thread_desc, &attr, thread_doris_http_server, NULL))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
assert(0);return -4;
assert(0);return -9;
}
}
}

View File

@@ -43,7 +43,7 @@ struct doris_business
{
char bizname[32];
u_int32_t recv_way;
u_int32_t write_file_sw;
u_int32_t cache_max_versions;
char recv_path_full[256];
char recv_path_inc[256];
char store_path_root[256];

View File

@@ -66,7 +66,7 @@ void config_version_node_cleanup(struct version_list_node *vernode)
free(vernode);
}
void config_version_handle_cleanup(struct version_list_handle *version)
void config_version_handle_free(struct version_list_handle *version)
{
struct version_list_node *vernode;
@@ -75,6 +75,7 @@ void config_version_handle_cleanup(struct version_list_handle *version)
TAILQ_REMOVE(&version->version_head, vernode, version_node);
config_version_node_cleanup(vernode);
}
delete version->version2node;
free(version);
}
@@ -83,10 +84,27 @@ struct version_list_handle *config_version_handle_new(void)
struct version_list_handle *handle;
handle = (struct version_list_handle *)calloc(1, sizeof(struct version_list_handle));
handle->version2node = new map<int64_t, struct version_list_node*>;
TAILQ_INIT(&handle->version_head);
return handle;
}
void config_version_node_free_content(struct version_list_node *vernode)
{
struct table_list_node *tablenode;
struct cont_frag_node *fragnode;
TAILQ_FOREACH(tablenode, &vernode->table_head, table_node)
{
while(NULL != (fragnode = TAILQ_FIRST(&tablenode->frag_head)))
{
TAILQ_REMOVE(&tablenode->frag_head, fragnode, frag_node);
config_frag_node_cleanup(fragnode);
}
}
vernode->cont_in_disk = 1;
}
static void doris_common_timer_start(struct event *time_event)
{
struct timeval tv;
@@ -106,7 +124,7 @@ static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp)
doris_common_timer_start(&delay_event->timer_event);
return;
}
config_version_handle_cleanup(handle);
config_version_handle_free(handle);
free(delay_event);
}
@@ -262,6 +280,7 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user
struct confile_save *save=(struct confile_save *)userdata;
struct version_list_handle *cur_version;
struct version_list_handle *tmplist;
struct version_list_handle *cfgver_handle;
cJSON_AddItemToObject(save->cur_vernode->metajson, "configs", save->cur_vernode->arrayjson);
save->cur_vernode->arrayjson = NULL;
@@ -276,7 +295,10 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user
{
cur_version = config_version_handle_new();
cur_version->latest_version = save->cur_vernode->version;
cur_version->version_num = 1;
TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node);
cur_version->oldest_vernode = TAILQ_FIRST(&cur_version->version_head);
cur_version->version2node->insert(make_pair(cur_version->latest_version, save->cur_vernode));
pthread_rwlock_wrlock(&save->business->rwlock);
tmplist = save->business->cfgver_head;
@@ -285,10 +307,26 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user
cfgver_handle_delay_destroy(save, save->evbase, tmplist);
}
else
{
{
pthread_rwlock_wrlock(&save->business->rwlock);
TAILQ_INSERT_TAIL(&save->business->cfgver_head->version_head, save->cur_vernode, version_node);
save->business->cfgver_head->latest_version = save->cur_vernode->version;
cfgver_handle = save->business->cfgver_head;
TAILQ_INSERT_TAIL(&cfgver_handle->version_head, save->cur_vernode, version_node);
cfgver_handle->latest_version = save->cur_vernode->version;
cfgver_handle->version2node->insert(make_pair(save->cur_vernode->version, save->cur_vernode));
if(cfgver_handle->oldest_vernode == NULL)
{
cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head);
}
/*<2A><><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><E0BBBA>N<EFBFBD><4E><EFBFBD><EFBFBD><E6B1BE>Ԫ<EFBFBD><D4AA>Ϣȫ<CFA2><C8AB><EFBFBD><EFBFBD>*/
if(save->business->cache_max_versions!=0 && cfgver_handle->version_num>=save->business->cache_max_versions)
{
config_version_node_free_content(cfgver_handle->oldest_vernode);
cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node);
}
else
{
cfgver_handle->version_num += 1;
}
pthread_rwlock_unlock(&save->business->rwlock);
}
save->cur_vernode = NULL;
@@ -509,13 +547,8 @@ void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const
/*<2A>ޱ<EFBFBD><DEB1><EFBFBD>ϵ<EFBFBD>к<EFBFBD><D0BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD>ص<EFBFBD>*/
void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
{
doris_config_common_version_start((struct confile_save *)userdata, meta);
if(save->business->write_file_sw)
{
doris_config_file_version_start(instance, meta, userdata);
doris_config_common_version_start((struct confile_save *)userdata, meta);
doris_config_file_version_start(instance, meta, userdata);
if(g_doris_server_info.server_role_sw)
{
@@ -524,12 +557,7 @@ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, vo
}
void doris_config_version_finish(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
if(save->business->write_file_sw)
{
doris_config_file_version_finish(instance, userdata);
{
doris_config_file_version_finish(instance, userdata);
if(g_doris_server_info.server_role_sw)
{
@@ -539,13 +567,8 @@ void doris_config_version_finish(struct doris_instance *instance, void *userdata
}
void doris_config_version_error(struct doris_instance *instance, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
{
doris_config_common_version_error((struct confile_save *)userdata);
if(save->business->write_file_sw)
{
doris_config_file_version_error(instance, userdata);
doris_config_common_version_error((struct confile_save *)userdata);
doris_config_file_version_error(instance, userdata);
if(g_doris_server_info.server_role_sw)
{
@@ -558,10 +581,7 @@ void doris_config_cfgfile_start(struct doris_instance *instance,
{
struct confile_save *save=(struct confile_save *)userdata;
doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum);
if(save->business->write_file_sw)
{
doris_config_file_cfgfile_start(instance, meta, localpath, userdata);
doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum);
doris_config_file_cfgfile_start(instance, meta, localpath, userdata);
if(g_doris_server_info.server_role_sw)
{
@@ -570,12 +590,7 @@ void doris_config_cfgfile_start(struct doris_instance *instance,
}
void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
if(save->business->write_file_sw)
{
doris_config_file_cfgfile_update(instance, data, len, userdata);
{
doris_config_file_cfgfile_update(instance, data, len, userdata);
if(g_doris_server_info.server_role_sw)
{
@@ -584,13 +599,8 @@ void doris_config_cfgfile_update(struct doris_instance *instance, const char *da
}
void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata)
{
struct confile_save *save=(struct confile_save *)userdata;
{
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
if(save->business->write_file_sw)
{
doris_config_file_cfgfile_finish(instance, userdata);
doris_config_common_cfgfile_finish((struct confile_save *)userdata);
doris_config_file_cfgfile_finish(instance, userdata);
if(g_doris_server_info.server_role_sw)
{

View File

@@ -7,6 +7,9 @@
#include <cjson/cJSON.h>
#include <map>
using namespace std;
enum DORIS_SERVER_FS_FILED
{
DRS_FSSTAT_RECV_ERR_VER=0,
@@ -71,7 +74,8 @@ struct version_list_node
int64_t version;
char *metacont;
int32_t metalen;
int32_t cfg_type; //1-full, 2-inc
int16_t cfg_type; //1-full, 2-inc
int16_t cont_in_disk;
cJSON *metajson, *arrayjson;
cJSON *table_meta;
@@ -84,6 +88,9 @@ struct version_list_handle
TAILQ_HEAD(__version_list_node, version_list_node) version_head;
int64_t latest_version;
int32_t references;
u_int32_t version_num;
map<int64_t, struct version_list_node*> *version2node;
struct version_list_node *oldest_vernode; //δ<><CEB4><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD><DAB4><EFBFBD>̭<EFBFBD><CCAD><EFBFBD><EFBFBD><EFBFBD>ϰ汾
};
struct version_list_handle *config_version_handle_new(void);