diff --git a/CMakeLists.txt b/CMakeLists.txt index 26dcb39..47c095e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,3 +18,5 @@ add_subdirectory (support) add_subdirectory (client) add_subdirectory (server) +INSTALL (DIRECTORY ${PROJECT_SOURCE_DIR}/include/ DESTINATION include) + diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index e5beafe..9c13e61 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -24,8 +24,8 @@ target_include_directories(doris_client_dynamic PUBLIC ${PROJECT_SOURCE_DIR}/inc target_include_directories(doris_client_dynamic PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) set_property(TARGET doris_client_dynamic PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR}/include) -#INSTALL (TARGETS doris_client_static doris_client_dynamic -# LIBRARY DESTINATION lib -# ARCHIVE DESTINATION lib) +INSTALL (TARGETS doris_client_static doris_client_dynamic + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib) #INSTALL (DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/ DESTINATION include) #INSTALL (FILES doris_client_threads.h doris_conhash.h doris_murmurhash.h DESTINATION include) diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf index 518d694..1220cc5 100644 --- a/server/bin/conf/doris_main.conf +++ b/server/bin/conf/doris_main.conf @@ -4,6 +4,8 @@ server_listen_port=9898 manage_listen_port=2233 https_connection_on=1 cache_file_frag_size=100 +#doris_server_role_on=1 +#index_file_format_maat=0 business_system_list=T1_1;VoIP @@ -27,6 +29,7 @@ receive_config_path_inc=./doris_receive_t1/inc/index [VoIP] receive_config_way=2 +mem_cache_max_versions=2 grafana_monitor_status_id=4 store_config_path=./doris_store_voip receive_config_path_full=./doris_receive_voip/full/index diff --git a/server/doris_server_http.cpp b/server/doris_server_http.cpp index aa81c7a..8a82518 100644 --- a/server/doris_server_http.cpp +++ b/server/doris_server_http.cpp @@ -12,6 +12,9 @@ #include #include #include +#include +#include +#include #include @@ -71,6 +74,28 @@ int doris_create_listen_socket(int bind_port) return listener; } +struct version_list_node *lookup_vernode_accord_version(struct version_list_handle *handle, int64_t version) +{ + struct version_list_node *vernode; + map::iterator iter; + + vernode = TAILQ_FIRST(&handle->version_head); + if(vernode!=NULL && versionversion) //当前全量里最小的版本号 + { + version = vernode->version; + } + + for(; version <= handle->latest_version; version++) + { + if((iter = handle->version2node->find(version)) != handle->version2node->end()) + { + vernode = iter->second; + return vernode; + } + } + return NULL; +} + void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) { struct evkeyvalq params; @@ -121,19 +146,13 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) business = iter->second; pthread_rwlock_rdlock(&business->rwlock); - if(verlong > business->cfgver_head->latest_version) + if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong))) { pthread_rwlock_unlock(&business->rwlock); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_NOTMODIFIED, "No new configs found"); return; } - - vernode = TAILQ_FIRST(&business->cfgver_head->version_head); - while(vernode->version < verlong) - { - vernode = TAILQ_NEXT(vernode, version_node); - } evbuf = evbuffer_new(); evbuffer_add(evbuf, vernode->metacont, vernode->metalen); sprintf(length, "%u", vernode->metalen); @@ -147,15 +166,58 @@ void doris_http_server_meta_cb(struct evhttp_request *req, void *arg) evbuffer_free(evbuf); } +struct evbuffer *evbuf_content_from_memory(struct table_list_node *tablenode, size_t start, size_t end, size_t *length) +{ + struct evbuffer *evbuf; + struct cont_frag_node *fragnode; + size_t copy_len, offset=start, res_length=0; + + evbuf = evbuffer_new(); + for(fragnode=TAILQ_FIRST(&tablenode->frag_head); fragnode!=NULL && fragnode->start<=end; fragnode=TAILQ_NEXT(fragnode, frag_node)) + { + if(offset > fragnode->end) + { + continue; + } + copy_len = (end>fragnode->end)?(fragnode->end-offset + 1):(end-offset + 1); + evbuffer_add(evbuf, fragnode->content+(offset-fragnode->start), copy_len); + offset += copy_len; + res_length += copy_len; + } + *length = res_length; + return evbuf; +} + +struct evbuffer *evbuf_content_from_disk(struct table_list_node *tablenode, size_t start, size_t end, size_t *length) +{ + struct evbuffer *evbuf; + int32_t fd; + + if((fd = open(tablenode->localpath, O_RDONLY, 0)) < 0) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Send response, open %s failed: %s", strerror(errno)); + return NULL; + } + evbuf = evbuffer_new(); + if(evbuffer_add_file(evbuf, fd, start, end-start+1)) + { + evbuffer_free(evbuf); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Send response, evbuffer_add_file %s failed"); + close(fd); + return NULL; + } + *length = evbuffer_get_length(evbuf); + return evbuf; +} + void doris_response_file_range(struct evhttp_request *req, const char *bizname, const char *tablename, int64_t verlong, size_t start, size_t end, bool range) { struct version_list_node *vernode; struct table_list_node *tablenode; - struct cont_frag_node *fragnode; struct evbuffer *evbuf; char length[128]; - size_t filesize, res_length=0, copy_len, offset=start; + size_t filesize, res_length; struct doris_business *business; map::iterator iter; @@ -168,18 +230,13 @@ void doris_response_file_range(struct evhttp_request *req, const char *bizname, business = iter->second; pthread_rwlock_rdlock(&business->rwlock); - if(verlong > business->cfgver_head->latest_version) + if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong))) { pthread_rwlock_unlock(&business->rwlock); FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1); evhttp_send_error(req, HTTP_NOTFOUND, "Version too old"); return; } - vernode = TAILQ_FIRST(&business->cfgver_head->version_head); - while(vernode->version < verlong) - { - vernode = TAILQ_NEXT(vernode, version_node); - } tablenode = TAILQ_FIRST(&vernode->table_head); while(tablenode!=NULL && strcmp(tablename, tablenode->tablename)) { @@ -197,17 +254,13 @@ void doris_response_file_range(struct evhttp_request *req, const char *bizname, { end = tablenode->filesize - 1; } - evbuf = evbuffer_new(); - for(fragnode=TAILQ_FIRST(&tablenode->frag_head); fragnode!=NULL && fragnode->start<=end; fragnode=TAILQ_NEXT(fragnode, frag_node)) + if(vernode->cont_in_disk) { - if(offset > fragnode->end) - { - continue; - } - copy_len = (end>fragnode->end)?(fragnode->end-offset + 1):(end-offset + 1); - evbuffer_add(evbuf, fragnode->content+(offset-fragnode->start), copy_len); - offset += copy_len; - res_length += copy_len; + evbuf = evbuf_content_from_disk(tablenode, start, end, &res_length); + } + else + { + evbuf = evbuf_content_from_memory(tablenode, start, end, &res_length); } pthread_rwlock_unlock(&business->rwlock); @@ -390,11 +443,6 @@ void* thread_doris_http_server(void *arg) if(g_doris_server_info.ssl_conn_on) { - g_doris_server_info.ssl_instance = doris_connections_create_ssl_ctx(); - if(g_doris_server_info.ssl_instance == NULL) - { - assert(0);return NULL; - } evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance); } diff --git a/server/doris_server_http.h b/server/doris_server_http.h index 755df29..3092a55 100644 --- a/server/doris_server_http.h +++ b/server/doris_server_http.h @@ -1,7 +1,10 @@ #ifndef __DORIS_SERVER_HTTP_H__ #define __DORIS_SERVER_HTTP_H__ +#include + int doris_create_listen_socket(int bind_port); +SSL_CTX *doris_connections_create_ssl_ctx(void); void* thread_doris_http_server(void *arg); #endif diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index f68e4fc..c7703cf 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -17,7 +17,7 @@ #include "doris_server_http.h" struct doris_global_info g_doris_server_info; -static unsigned long doris_vesion_20210803=20210803L; +static unsigned long doris_vesion_20210804=20210804L; int doris_mkdir_according_path(const char * path) { @@ -215,7 +215,7 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st info->name2business->insert(make_pair(string(business->bizname), business)); MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mm_status_codeid, 3); - MESA_load_profile_uint_def(config_file, business->bizname, "doris_write_file_on", &business->write_file_sw, 1); + MESA_load_profile_uint_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0); if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root))) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file); @@ -327,18 +327,18 @@ int main(int argc, char **argv) evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL); evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL); evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL); - g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210803); + g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210804); if(evhttp_accept_socket(manager_http, g_doris_server_info.manager)) { printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager); MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "evhttp_accept_socket %d error!\n", g_doris_server_info.manager); - assert(0); return -7; + assert(0); return -3; } //为每个业务系统初始化拉取配置的结构 if(doris_init_config_for_business(&g_doris_server_info, manage_evbase, NIRVANA_CONFIG_FILE)) { - return -8; + return -4; } pthread_attr_init(&attr); @@ -350,7 +350,7 @@ int main(int argc, char **argv) if(pthread_create(&thread_desc, &attr, thread_doris_client_recv_cfg, &g_doris_server_info.business[i])) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); - assert(0);return -4; + assert(0);return -5; } } else @@ -358,7 +358,7 @@ int main(int argc, char **argv) if(pthread_create(&thread_desc, &attr, thread_index_file_recv_cfg, &g_doris_server_info.business[i])) { MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); - assert(0);return -4; + assert(0);return -6; } } } @@ -369,14 +369,18 @@ int main(int argc, char **argv) g_doris_server_info.listener = doris_create_listen_socket(g_doris_server_info.server_port); if(g_doris_server_info.listener < 0) { - return -5; + return -7; + } + if(g_doris_server_info.ssl_conn_on && NULL==(g_doris_server_info.ssl_instance=doris_connections_create_ssl_ctx())) + { + assert(0);return -8; } for(u_int32_t i=0; iversion_head, vernode, version_node); config_version_node_cleanup(vernode); } + delete version->version2node; free(version); } @@ -83,10 +84,27 @@ struct version_list_handle *config_version_handle_new(void) struct version_list_handle *handle; handle = (struct version_list_handle *)calloc(1, sizeof(struct version_list_handle)); + handle->version2node = new map; TAILQ_INIT(&handle->version_head); return handle; } +void config_version_node_free_content(struct version_list_node *vernode) +{ + struct table_list_node *tablenode; + struct cont_frag_node *fragnode; + + TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) + { + while(NULL != (fragnode = TAILQ_FIRST(&tablenode->frag_head))) + { + TAILQ_REMOVE(&tablenode->frag_head, fragnode, frag_node); + config_frag_node_cleanup(fragnode); + } + } + vernode->cont_in_disk = 1; +} + static void doris_common_timer_start(struct event *time_event) { struct timeval tv; @@ -106,7 +124,7 @@ static void cfgver_delay_destroy_timer_cb(int fd, short kind, void *userp) doris_common_timer_start(&delay_event->timer_event); return; } - config_version_handle_cleanup(handle); + config_version_handle_free(handle); free(delay_event); } @@ -262,6 +280,7 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user struct confile_save *save=(struct confile_save *)userdata; struct version_list_handle *cur_version; struct version_list_handle *tmplist; + struct version_list_handle *cfgver_handle; cJSON_AddItemToObject(save->cur_vernode->metajson, "configs", save->cur_vernode->arrayjson); save->cur_vernode->arrayjson = NULL; @@ -276,7 +295,10 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user { cur_version = config_version_handle_new(); cur_version->latest_version = save->cur_vernode->version; + cur_version->version_num = 1; TAILQ_INSERT_TAIL(&cur_version->version_head, save->cur_vernode, version_node); + cur_version->oldest_vernode = TAILQ_FIRST(&cur_version->version_head); + cur_version->version2node->insert(make_pair(cur_version->latest_version, save->cur_vernode)); pthread_rwlock_wrlock(&save->business->rwlock); tmplist = save->business->cfgver_head; @@ -285,10 +307,26 @@ void doris_config_mem_version_finish(struct doris_instance *instance, void *user cfgver_handle_delay_destroy(save, save->evbase, tmplist); } else - { + { pthread_rwlock_wrlock(&save->business->rwlock); - TAILQ_INSERT_TAIL(&save->business->cfgver_head->version_head, save->cur_vernode, version_node); - save->business->cfgver_head->latest_version = save->cur_vernode->version; + cfgver_handle = save->business->cfgver_head; + TAILQ_INSERT_TAIL(&cfgver_handle->version_head, save->cur_vernode, version_node); + cfgver_handle->latest_version = save->cur_vernode->version; + cfgver_handle->version2node->insert(make_pair(save->cur_vernode->version, save->cur_vernode)); + if(cfgver_handle->oldest_vernode == NULL) + { + cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); + } + /*配置文件内容最多缓存N个版本,元信息全保留*/ + if(save->business->cache_max_versions!=0 && cfgver_handle->version_num>=save->business->cache_max_versions) + { + config_version_node_free_content(cfgver_handle->oldest_vernode); + cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node); + } + else + { + cfgver_handle->version_num += 1; + } pthread_rwlock_unlock(&save->business->rwlock); } save->cur_vernode = NULL; @@ -509,13 +547,8 @@ void doris_config_localmem_cfgfile_finish(struct doris_instance *instance, const /*无标记系列函数,新来配置时回调*/ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, void *userdata) { - struct confile_save *save=(struct confile_save *)userdata; - doris_config_common_version_start((struct confile_save *)userdata, meta); - if(save->business->write_file_sw) - { - doris_config_file_version_start(instance, meta, userdata); - } + doris_config_file_version_start(instance, meta, userdata); if(g_doris_server_info.server_role_sw) { doris_config_mem_version_start(instance, meta, userdata); @@ -524,12 +557,7 @@ void doris_config_version_start(struct doris_instance *instance, cJSON *meta, vo void doris_config_version_finish(struct doris_instance *instance, void *userdata) { - struct confile_save *save=(struct confile_save *)userdata; - - if(save->business->write_file_sw) - { - doris_config_file_version_finish(instance, userdata); - } + doris_config_file_version_finish(instance, userdata); if(g_doris_server_info.server_role_sw) { doris_config_mem_version_finish(instance, userdata); @@ -539,13 +567,8 @@ void doris_config_version_finish(struct doris_instance *instance, void *userdata void doris_config_version_error(struct doris_instance *instance, void *userdata) { - struct confile_save *save=(struct confile_save *)userdata; - doris_config_common_version_error((struct confile_save *)userdata); - if(save->business->write_file_sw) - { - doris_config_file_version_error(instance, userdata); - } + doris_config_file_version_error(instance, userdata); if(g_doris_server_info.server_role_sw) { doris_config_mem_version_error(instance, userdata); @@ -558,10 +581,7 @@ void doris_config_cfgfile_start(struct doris_instance *instance, struct confile_save *save=(struct confile_save *)userdata; doris_config_common_cfgfile_start((struct confile_save *)userdata, meta->cfgnum); - if(save->business->write_file_sw) - { - doris_config_file_cfgfile_start(instance, meta, localpath, userdata); - } + doris_config_file_cfgfile_start(instance, meta, localpath, userdata); if(g_doris_server_info.server_role_sw) { doris_config_mem_cfgfile_start(instance, meta, save->cfg_file_path, userdata); @@ -570,12 +590,7 @@ void doris_config_cfgfile_start(struct doris_instance *instance, void doris_config_cfgfile_update(struct doris_instance *instance, const char *data, size_t len, void *userdata) { - struct confile_save *save=(struct confile_save *)userdata; - - if(save->business->write_file_sw) - { - doris_config_file_cfgfile_update(instance, data, len, userdata); - } + doris_config_file_cfgfile_update(instance, data, len, userdata); if(g_doris_server_info.server_role_sw) { doris_config_mem_cfgfile_update(instance, data, len, userdata); @@ -584,13 +599,8 @@ void doris_config_cfgfile_update(struct doris_instance *instance, const char *da void doris_config_cfgfile_finish(struct doris_instance *instance, const char *md5, void *userdata) { - struct confile_save *save=(struct confile_save *)userdata; - doris_config_common_cfgfile_finish((struct confile_save *)userdata); - if(save->business->write_file_sw) - { - doris_config_file_cfgfile_finish(instance, userdata); - } + doris_config_file_cfgfile_finish(instance, userdata); if(g_doris_server_info.server_role_sw) { doris_config_mem_cfgfile_finish(instance, md5, userdata); diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h index 8926f25..846ae78 100644 --- a/server/doris_server_receive.h +++ b/server/doris_server_receive.h @@ -7,6 +7,9 @@ #include +#include +using namespace std; + enum DORIS_SERVER_FS_FILED { DRS_FSSTAT_RECV_ERR_VER=0, @@ -71,7 +74,8 @@ struct version_list_node int64_t version; char *metacont; int32_t metalen; - int32_t cfg_type; //1-full, 2-inc + int16_t cfg_type; //1-full, 2-inc + int16_t cont_in_disk; cJSON *metajson, *arrayjson; cJSON *table_meta; @@ -84,6 +88,9 @@ struct version_list_handle TAILQ_HEAD(__version_list_node, version_list_node) version_head; int64_t latest_version; int32_t references; + u_int32_t version_num; + map *version2node; + struct version_list_node *oldest_vernode; //未进行内存淘汰的最老版本 }; struct version_list_handle *config_version_handle_new(void);