diff --git a/client/doris_client_fetch.cpp b/client/doris_client_fetch.cpp index ae7cc8e..fee1648 100644 --- a/client/doris_client_fetch.cpp +++ b/client/doris_client_fetch.cpp @@ -385,7 +385,7 @@ out_error: doris_confile_ctx_destry(&instance->ctx); if(res_code==304 && instance->cbs.version_updated!=NULL) //版本已完成同步 { - instance->cbs.version_updated(instance, instance->cbs.userdata); + instance->cbs.version_updated(instance, instance->cur_version, instance->cbs.userdata); } if(res_code==300 && instance->param->client_sync_on) //服务端有半途中的版本上传 { @@ -482,7 +482,154 @@ static void instance_meta_expire_timer_cb(int fd, short kind, void *userp) doris_confile_ctx_destry(&instance->ctx); doris_http_fetch_meta(instance); - MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); +} + +void doris_http_head_version_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + const char *pos_colon; + char buffer[64]; + int datalen; + + if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL) + { + return ; + } + datalen = pos_colon - start; + switch(datalen) + { + case 16: + if(!strncasecmp(start, "X-Latest-Version:", 17)) + { + memcpy(buffer, start+17, bytes-17); + buffer[bytes-17] = '\0'; + instance->head_version = atol(buffer); + } + break; + default: break; + } + + //check code only once + if(instance->ctx.res_code != 0) + { + return; + } + instance->ctx.res_code = res_code; + assert(res_code != 0); + + if(res_code != 200) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, head version failed, server: %s, curlcode = %d", + instance->args.bizname, instance->ctx.server, code); + } +} + +void doris_http_head_version_done_cb(CURLcode res, long res_code, const char *err, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + evtimer_del(&instance->ctx.timer_expires); + if(res != CURLE_OK) + { + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version failed, server: %s, curlcode = %d, error: %s", + instance->args.bizname, instance->ctx.server, res_code, err); + goto out_herror; + } + + if(instance->ctx.res_code != 200 || res_code!=200) + { + goto out_herror; + } + + instance->cur_version = instance->head_version + instance->cur_version; + if(instance->cur_version < 0) + { + instance->cur_version = 0; + } + instance->req_version = instance->cur_version + 1; //TODO + evtimer_assign(&instance->timer_fetch, instance->worker_evbase, instance_fetch_cfg_timer_cb, instance); + evtimer_assign(&instance->ctx.timer_expires, instance->worker_evbase, instance_meta_expire_timer_cb, instance); + doris_http_fetch_meta(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version succ, server: %s, next request meta of version: %ld", + instance->args.bizname, instance->ctx.server, instance->req_version); + return; + +out_herror: + instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1; + doris_request_restart_timer(instance, instance->param->retry_interval); + doris_confile_ctx_destry(&instance->ctx); +} + +static void doris_http_head_version(struct doris_csum_instance *instance) +{ + u_int64_t balance_seed; + struct doris_http_callback curlcbs; + char metauri[128]; + struct timeval tv={10, 0}; + + balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | + (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); + + memset(&curlcbs, 0, sizeof(struct doris_http_callback)); + curlcbs.header_cb = doris_http_head_version_header_cb; + curlcbs.write_cb = NULL; + curlcbs.transfer_done_cb = doris_http_head_version_done_cb; + curlcbs.userp = instance; + + instance->array_index = 0; + instance->cur_httpins = instance->httpins_master; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL) + { + instance->cur_httpins = instance->httpins_backup1; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + } + if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL) + { + instance->cur_httpins = instance->httpins_backup2; + instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); + } + + if(instance->ctx.httpctx != NULL) + { + snprintf(metauri, 128, "latestversion?business=%s", instance->args.bizname); + if(!doris_http_launch_head_request(instance->ctx.httpctx, metauri)) + { + instance->status = FETCH_STATUS_META; + instance->statistic.field[DRS_FS_FILED_REQ_META] += 1; + evtimer_add(&instance->ctx.timer_expires, &tv); + } + else + { + instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; + doris_confile_ctx_destry(&instance->ctx); + doris_request_restart_timer(instance, instance->param->retry_interval); + } + if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1; + else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1; + } + else + { + instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; + doris_request_restart_timer(instance, instance->param->retry_interval); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch version HEAD failed: no active host found"); + } +} + +static void instance_head_version_timer_cb(int fd, short kind, void *userp) +{ + doris_http_head_version((struct doris_csum_instance *)userp); +} + +/*https模式下,使用valgrind运行,发现发起GET请求后,done_cb函数始终无法得到调用*/ +static void instance_version_expire_timer_cb(int fd, short kind, void *userp) +{ + struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; + + doris_confile_ctx_destry(&instance->ctx); + doris_http_head_version(instance); + MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch version-head wired expired, retry....\033[0m", instance->args.bizname); } static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) @@ -701,21 +848,31 @@ struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *par { instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } - evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); - pthread_mutex_lock(¶m->mutex_lock); param->references++; pthread_mutex_unlock(¶m->mutex_lock); + if(instance->cur_version >= 0) + { + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); + evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); + tv.tv_sec = 3; + tv.tv_usec = 0; + evtimer_add(&instance->timer_fetch, &tv); + } + else + { + evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_version_expire_timer_cb, instance); + evtimer_assign(&instance->timer_fetch, worker_evbase, instance_head_version_timer_cb, instance); + tv.tv_sec = 3; + tv.tv_usec = 0; + evtimer_add(&instance->timer_fetch, &tv); + } + evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(&instance->timer_statistic, &tv); - - evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); - tv.tv_sec = 3; - tv.tv_usec = 0; - evtimer_add(&instance->timer_fetch, &tv); return instance; } diff --git a/client/doris_client_fetch.h b/client/doris_client_fetch.h index 4ef51d9..e15bd27 100644 --- a/client/doris_client_fetch.h +++ b/client/doris_client_fetch.h @@ -86,6 +86,7 @@ struct doris_csum_instance int64_t cur_version; //元信息 int64_t req_version; //文件 int64_t new_version; //新的元信息 + int64_t head_version; struct easy_string estr; cJSON *meta, *array; u_int32_t array_size; diff --git a/client/doris_client_transfer.cpp b/client/doris_client_transfer.cpp index b0edcf2..a71313f 100644 --- a/client/doris_client_transfer.cpp +++ b/client/doris_client_transfer.cpp @@ -153,6 +153,46 @@ void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *hea ctx->headers = curl_slist_append(ctx->headers, header); } +int doris_http_launch_head_request(struct doris_http_ctx *ctx, const char *uri) +{ + char minio_url[2048]; + + assert(ctx->curl == NULL); + if(NULL == (ctx->curl=curl_easy_init())) + { + assert(0);return -1; + } + + curl_easy_setopt(ctx->curl, CURLOPT_NOBODY, 1L); //HEAD请求 + if(ctx->instance->param->ssl_connection) + { + snprintf(minio_url, sizeof(minio_url), "https://%s/%s", ctx->multidata->host->srvaddr, uri); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(ctx->curl, CURLOPT_SSL_VERIFYHOST, 0L); + } + else + { + snprintf(minio_url, sizeof(minio_url), "http://%s/%s", ctx->multidata->host->srvaddr, uri); + } + curl_easy_setopt(ctx->curl, CURLOPT_URL, minio_url); + + if(ctx->headers != NULL) + { + curl_easy_setopt(ctx->curl, CURLOPT_HTTPHEADER, ctx->headers); + } + curl_easy_setopt(ctx->curl, CURLOPT_HEADERFUNCTION, curl_response_header_cb); + curl_easy_setopt(ctx->curl, CURLOPT_HEADERDATA, ctx); + curl_easy_setopt(ctx->curl, CURLOPT_PRIVATE, ctx); + curl_set_common_options(ctx->curl, ctx->instance->param->transfer_timeout, ctx->error); + + if(CURLM_OK != curl_multi_add_handle(ctx->multidata->multi_hd, ctx->curl)) + { + assert(0); return -2; + } + ctx->transfering = 1; + return 0; +} + int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri) { char minio_url[2048]; diff --git a/client/doris_client_transfer.h b/client/doris_client_transfer.h index 324c9aa..309f3fe 100644 --- a/client/doris_client_transfer.h +++ b/client/doris_client_transfer.h @@ -52,6 +52,7 @@ void doris_http_ctx_add_header(struct doris_http_ctx *ctx, const char *header); void doris_http_ctx_add_header_kvstr(struct doris_http_ctx *ctx, const char *headername, const char *value); void doris_http_ctx_add_header_kvint(struct doris_http_ctx *ctx, const char *headername, u_int64_t value); +int doris_http_launch_head_request(struct doris_http_ctx *ctx, const char *uri); int doris_http_launch_get_request(struct doris_http_ctx *ctx, const char *uri); int doris_http_launch_post_request(struct doris_http_ctx *ctx, const char *uri, const char *data, size_t data_len); int doris_http_launch_put_request_data(struct doris_http_ctx *ctx, const char *uri, char *data, size_t data_len); diff --git a/include/doris_consumer_client.h b/include/doris_consumer_client.h index f772626..4150708 100644 --- a/include/doris_consumer_client.h +++ b/include/doris_consumer_client.h @@ -46,7 +46,7 @@ struct doris_csum_statistics struct doris_arguments { char bizname[32]; - int64_t current_version; //当前已获取完毕的最新版本号,将从它下一个版本取配置 + int64_t current_version; //当前已获取完毕的最新版本号,将从它下一个版本取配置; 负数表示获取最新的N个版本; int32_t judian_id; }; @@ -70,7 +70,7 @@ struct doris_callbacks void (*cfgfile_finish)(struct doris_csum_instance *instance, const char *md5, void *userdata); void (*version_error)(struct doris_csum_instance *instance, void *userdata); //下载文件失败,该版本需要回滚 void (*version_finish)(struct doris_csum_instance *instance, void *userdata); - void (*version_updated)(struct doris_csum_instance *instance, void *userdata); //暂时没有新配置 + void (*version_updated)(struct doris_csum_instance *instance, int64_t lastest_version, void *userdata); //暂时没有新配置 }; struct doris_csum_param *doris_csum_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog); diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 92077dc..2032131 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,10 +1,10 @@ -set (NIRVANA_PLATFORM_SRC doris_server_scandir.cpp doris_server_receive.cpp doris_server_http.cpp doris_server_main.cpp) +set (NIRVANA_PLATFORM_SRC doris_server_kvdb.cpp doris_server_scandir.cpp doris_server_receive.cpp doris_server_http.cpp doris_server_main.cpp) add_definitions(-fPIC -Wall -g) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D__FILENAME__='\"$(subst ${CMAKE_CURRENT_SOURCE_DIR}/,,$(abspath $<))\"'") add_executable(doris ${NIRVANA_PLATFORM_SRC}) -target_link_libraries(doris doris_client_static libMesaMonitor libevent-static libevent-pthreads-static libcurl-static libevent-openssl-static openssl-ssl-static openssl-crypto-static cjson) +target_link_libraries(doris doris_client_static libMesaMonitor libevent-static libevent-pthreads-static libcurl-static libevent-openssl-static openssl-ssl-static openssl-crypto-static cjson libLevelDB) target_link_libraries(doris MESA_handle_logger MESA_htable MESA_prof_load MESA_field_stat2 pthread z dl) set_target_properties(doris PROPERTIES CLEAN_DIRECT_OUTPUT 1) target_include_directories(doris PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) diff --git a/server/bin/conf/doris_main.conf b/server/bin/conf/doris_main.conf index 7816131..da962da 100644 --- a/server/bin/conf/doris_main.conf +++ b/server/bin/conf/doris_main.conf @@ -35,6 +35,7 @@ doris_client_confile=./conf/doris_client_csum.conf [VoIP] receive_config_way=3 +#persistence_write_on=1 producer_listen_port=9801 producer_concurrence_allowed=1 mem_cache_max_versions=2 diff --git a/server/doris_server_kvdb.cpp b/server/doris_server_kvdb.cpp new file mode 100644 index 0000000..19b6638 --- /dev/null +++ b/server/doris_server_kvdb.cpp @@ -0,0 +1,97 @@ +#include +#include +#include + +#include + +#include "leveldb/db.h" +#include "leveldb/comparator.h" +#include "leveldb/cache.h" + +#include "doris_server_kvdb.h" + +struct doris_kvhandle +{ + leveldb::DB *kvdb; +}; + +struct doris_kvhandle *doris_kvdb_hanlde_new(const char *dir) +{ + struct doris_kvhandle *handle; + leveldb::Options options; + + handle = (struct doris_kvhandle *)malloc(sizeof(struct doris_kvhandle)); + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, std::string(dir), &(handle->kvdb)); + if(!status.ok()) + { + free(handle); + return NULL; + } + return handle; +} + +bool doris_kvdb_update_keyint_valint(struct doris_kvhandle *handle, u_int64_t key, int64_t value) +{ + leveldb::WriteOptions wop; + leveldb::Slice _key((const char *)&key, sizeof(key)); + leveldb::Slice _value((const char *)&value, sizeof(value)); + wop.sync = true; + leveldb::Status s = handle->kvdb->Put(wop, _key, _value); + return s.ok(); +} + +bool doris_kvdb_update_keystr_valint(struct doris_kvhandle *handle, const char *key, int64_t value) +{ + leveldb::WriteOptions wop; + leveldb::Slice _key((const char *)key, strlen(key)); + leveldb::Slice _value((const char *)&value, sizeof(value)); + wop.sync = true; + leveldb::Status s = handle->kvdb->Put(wop, _key, _value); + return s.ok(); +} + +int doris_kvdb_delete_keyint(struct doris_kvhandle *handle, u_int64_t key) +{ + leveldb::Slice _key((char *)&key, sizeof(key)); + leveldb::Status s = handle->kvdb->Delete(leveldb::WriteOptions(), _key); + return s.ok(); +} + +int doris_kvdb_delete_keystr(struct doris_kvhandle *handle, const char *key) +{ + leveldb::Slice _key((const char *)key, strlen(key)); + leveldb::Status s = handle->kvdb->Delete(leveldb::WriteOptions(), _key); + return s.ok(); +} + +void doris_kvdb_handle_destroy(struct doris_kvhandle *handle) +{ + delete handle->kvdb; + free(handle); +} + +int64_t doris_kvdb_get_keyint_valint(struct doris_kvhandle *handle, u_int64_t key) +{ + std::string value; + leveldb::Slice off_key((const char *)&key, sizeof(key)); + leveldb::Status s = handle->kvdb->Get(leveldb::ReadOptions(), off_key, &value); + if(!s.ok()) + { + return 0; + } + return *(int64_t *)value.data(); +} + +int64_t doris_kvdb_get_keystr_valint(struct doris_kvhandle *handle, const char *key) +{ + std::string value; + leveldb::Slice off_key((const char *)key, strlen(key)); + leveldb::Status s = handle->kvdb->Get(leveldb::ReadOptions(), off_key, &value); + if(!s.ok()) + { + return 0; + } + return *(int64_t *)value.data(); +} + diff --git a/server/doris_server_kvdb.h b/server/doris_server_kvdb.h new file mode 100644 index 0000000..6608e5c --- /dev/null +++ b/server/doris_server_kvdb.h @@ -0,0 +1,18 @@ +#ifndef __DORIS_KVDB_H__ +#define __DORIS_KVDB_H__ + +struct doris_kvhandle; + +struct doris_kvhandle *doris_kvdb_hanlde_new(const char *dir); +void doris_kvdb_handle_destroy(struct doris_kvhandle *handle); + +bool doris_kvdb_update_keyint_valint(struct doris_kvhandle *handle, u_int64_t key, int64_t value); +bool doris_kvdb_update_keystr_valint(struct doris_kvhandle *handle, const char *key, int64_t value); + +int doris_kvdb_delete_keyint(struct doris_kvhandle *handle, u_int64_t key); +int doris_kvdb_delete_keystr(struct doris_kvhandle *handle, const char *key); + +int64_t doris_kvdb_get_keyint_valint(struct doris_kvhandle *handle, u_int64_t key); +int64_t doris_kvdb_get_keystr_valint(struct doris_kvhandle *handle, const char *key); + +#endif diff --git a/server/doris_server_main.cpp b/server/doris_server_main.cpp index 480e904..4e4e20e 100644 --- a/server/doris_server_main.cpp +++ b/server/doris_server_main.cpp @@ -302,24 +302,32 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st MESA_load_profile_uint_def(config_file, business->bizname, "max_store_full_versions", &business->saves_when_fulldel, 0); if(business->saves_when_fulldel > 16) { - MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40mAlert! %s [%s]max_store_full_versions support max 16!!!!\033[0m\n", config_file, business->bizname); + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] %s [%s]max_store_full_versions support max 16!!!!\033[0m\n", config_file, business->bizname); business->saves_when_fulldel = 16; } MESA_load_profile_uint_def(config_file, business->bizname, "grafana_monitor_status_id", &business->mmval_status_codeid, 3); - MESA_load_profile_uint_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0); + MESA_load_profile_uint_def(config_file, business->bizname, "persistence_write_on", &business->persistence_write_on, 1); + MESA_load_profile_int_def(config_file, business->bizname, "mem_cache_max_versions", &business->cache_max_versions, 0); + if(business->persistence_write_on==0 && business->cache_max_versions==0) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] %s [%s], you must set mem_cache_max_versions if you disable persistence_write_on!\033[0m\n", config_file, business->bizname); + return -2; + } if(0>MESA_load_profile_string_nodef(config_file, business->bizname, "store_config_path", business->store_path_root, sizeof(business->store_path_root))) { MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]store_config_path not found!", bizname, config_file); assert(0);return -1; } - snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root); - snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root); - if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2)) + if(business->persistence_write_on) { - MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno)); - return -1; + snprintf(tmp_dir, 512, "%s/full/index", business->store_path_root); + snprintf(tmp_dir2,512, "%s/inc/index", business->store_path_root); + if(doris_mkdir_according_path(tmp_dir) || doris_mkdir_according_path(tmp_dir2)) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "mkdir %s failed: %s\n", tmp_dir, strerror(errno)); + return -1; + } } - MESA_load_profile_uint_def(config_file, business->bizname, "receive_config_way", &business->recv_way, RECV_WAY_DRS_CLIENT); assert(business->recv_way==RECV_WAY_IDX_FILE || business->recv_way==RECV_WAY_DRS_CLIENT || business->recv_way==RECV_WAY_HTTP_POST); if(business->recv_way == RECV_WAY_IDX_FILE) @@ -371,6 +379,10 @@ static int32_t doris_init_config_for_business(struct doris_global_info *info, st { assert(0);return -2; } + if(!business->persistence_write_on) + { + g_doris_server_info.business_post_nopersists++; + } g_doris_server_info.business_post_num++; business->token2node = new map; } @@ -466,7 +478,12 @@ int main(int argc, char **argv) "http_post_server_status", MONITOR_METRICS_GAUGE, "Running status of doris http post server."); MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_DOWN); } - + if(g_doris_server_info.business_post_nopersists>0 && NULL==(g_doris_server_info.kvdbhandle=doris_kvdb_hanlde_new("./leveldbdata"))) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mOpen levelDB ./leveldbdata failed.\033[0m"); + assert(0);return -11; + } + if(g_doris_server_info.ssl_conn_on && NULL==(g_doris_server_info.ssl_instance=doris_connections_create_ssl_ctx())) { assert(0);return -8; diff --git a/server/doris_server_main.h b/server/doris_server_main.h index b97f760..ea348b9 100644 --- a/server/doris_server_main.h +++ b/server/doris_server_main.h @@ -20,6 +20,7 @@ #include "doris_consumer_client.h" #include "doris_server_receive.h" +#include "doris_server_kvdb.h" #include #include @@ -48,9 +49,10 @@ struct doris_business /*first for configuration*/ char bizname[32]; u_int32_t recv_way; - u_int32_t cache_max_versions; + int32_t cache_max_versions; u_int32_t concurrency_allowed; u_int32_t saves_when_fulldel; //有全量到来时,最多保存几个最新的全量版本,0-全保存 + u_int32_t persistence_write_on; char recv_path_full[256]; char recv_path_inc[256]; char store_path_root[256]; @@ -74,6 +76,7 @@ struct doris_business struct doris_prod_instance *instance; map *token2node; int64_t version; + int64_t genversion_seq; //post模式,用于生成版本号的序列 int32_t source_from; int32_t type; int64_t version_cfgnum; @@ -117,12 +120,14 @@ struct doris_global_info u_int32_t business_num; u_int32_t business_post_num; //post模式有几个 int32_t business_post_ups; //启动了几个 + int32_t business_post_nopersists; int32_t mmid_post_server; //value=PROMETHUES_POST_* map *name2business; map *confile2csmparam; struct MESA_MonitorHandler *monitor; pthread_mutex_t mutex_lock; + struct doris_kvhandle *kvdbhandle; /*logs*/ u_int32_t log_level; diff --git a/server/doris_server_receive.cpp b/server/doris_server_receive.cpp index 6d30623..56643a7 100644 --- a/server/doris_server_receive.cpp +++ b/server/doris_server_receive.cpp @@ -222,7 +222,6 @@ void doris_config_file_version_error(struct doris_csum_instance *instance, void fclose(business->fp_cfg_file); remove(business->cfg_file_path); } - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version); } void doris_config_file_cfgfile_start(struct doris_csum_instance *instance, @@ -320,7 +319,7 @@ void doris_config_mem_version_finish(struct doris_csum_instance *instance, void { cfgver_handle = config_version_handle_new(); cfgver_handle->latest_version = business->cur_vernode->version; - cfgver_handle->version_num = 1; + cfgver_handle->version_mem_num = 1; TAILQ_INSERT_TAIL(&cfgver_handle->version_head, business->cur_vernode, version_node); cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); cfgver_handle->version2node->insert(make_pair(cfgver_handle->latest_version, business->cur_vernode)); @@ -343,14 +342,23 @@ void doris_config_mem_version_finish(struct doris_csum_instance *instance, void cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); } /*配置文件内容最多缓存N个版本,元信息全保留*/ - if(business->cache_max_versions!=0 && cfgver_handle->version_num>=business->cache_max_versions) + if(business->cache_max_versions!=0 && cfgver_handle->version_mem_num>=business->cache_max_versions) { - config_version_node_free_content(cfgver_handle->oldest_vernode); - cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node); + if(!business->persistence_write_on) + { + TAILQ_REMOVE(&cfgver_handle->version_head, cfgver_handle->oldest_vernode, version_node); + config_version_node_cleanup(cfgver_handle->oldest_vernode); + cfgver_handle->oldest_vernode = TAILQ_FIRST(&cfgver_handle->version_head); + } + else + { + config_version_node_free_content(cfgver_handle->oldest_vernode); + cfgver_handle->oldest_vernode = TAILQ_NEXT(cfgver_handle->oldest_vernode, version_node); + } } else { - cfgver_handle->version_num += 1; + cfgver_handle->version_mem_num += 1; } pthread_rwlock_unlock(&business->rwlock); } @@ -578,8 +586,13 @@ void doris_config_localmem_cfgfile_finish(struct doris_csum_instance *instance, /*无标记系列函数,新来配置时回调*/ void doris_config_version_start(struct doris_csum_instance *instance, cJSON *meta, void *userdata) { - doris_config_common_version_start((struct doris_business *)userdata, meta); - doris_config_file_version_start(instance, meta, userdata); + struct doris_business *business=(struct doris_business *)userdata; + + doris_config_common_version_start(business, meta); + if(business->persistence_write_on) + { + doris_config_file_version_start(instance, meta, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_version_start(instance, meta, userdata); @@ -588,7 +601,12 @@ void doris_config_version_start(struct doris_csum_instance *instance, cJSON *met void doris_config_version_finish(struct doris_csum_instance *instance, void *userdata) { - doris_config_file_version_finish(instance, userdata); + struct doris_business *business=(struct doris_business *)userdata; + + if(business->persistence_write_on) + { + doris_config_file_version_finish(instance, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_version_finish(instance, userdata); @@ -598,12 +616,18 @@ void doris_config_version_finish(struct doris_csum_instance *instance, void *use void doris_config_version_error(struct doris_csum_instance *instance, void *userdata) { + struct doris_business *business=(struct doris_business *)userdata; + doris_config_common_version_error((struct doris_business *)userdata); - doris_config_file_version_error(instance, userdata); + if(business->persistence_write_on) + { + doris_config_file_version_error(instance, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_version_error(instance, userdata); } + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", business->bizname, business->version); } void doris_config_cfgfile_start(struct doris_csum_instance *instance, @@ -612,7 +636,10 @@ void doris_config_cfgfile_start(struct doris_csum_instance *instance, struct doris_business *business=(struct doris_business *)userdata; doris_config_common_cfgfile_start((struct doris_business *)userdata, meta->cfgnum); - doris_config_file_cfgfile_start(instance, meta, localpath, userdata); + if(business->persistence_write_on) + { + doris_config_file_cfgfile_start(instance, meta, localpath, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_start(instance, meta, business->cfg_file_path, userdata); @@ -621,7 +648,12 @@ void doris_config_cfgfile_start(struct doris_csum_instance *instance, void doris_config_cfgfile_update(struct doris_csum_instance *instance, const char *data, size_t len, void *userdata) { - doris_config_file_cfgfile_update(instance, data, len, userdata); + struct doris_business *business=(struct doris_business *)userdata; + + if(business->persistence_write_on) + { + doris_config_file_cfgfile_update(instance, data, len, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_update(instance, data, len, userdata); @@ -630,8 +662,13 @@ void doris_config_cfgfile_update(struct doris_csum_instance *instance, const cha void doris_config_cfgfile_finish(struct doris_csum_instance *instance, const char *md5, void *userdata) { + struct doris_business *business=(struct doris_business *)userdata; + doris_config_common_cfgfile_finish((struct doris_business *)userdata); - doris_config_file_cfgfile_finish(instance, userdata); + if(business->persistence_write_on) + { + doris_config_file_cfgfile_finish(instance, userdata); + } if(g_doris_server_info.consumer_port) { doris_config_mem_cfgfile_finish(instance, md5, userdata); @@ -656,29 +693,30 @@ void* thread_doris_client_recv_cfg(void *arg) business->source_from = RECV_WAY_IDX_FILE; business->worker_evbase = client_evbase; - scanner = doris_index_file_scanner(0); - - /*Retaive latest config to memory from Stored configs*/ - doris_cbs.version_start = doris_config_localmem_version_start; - doris_cbs.version_finish = doris_config_localmem_version_finish; - doris_cbs.version_error = doris_config_localmem_version_error; - doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; - doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; - doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; - doris_cbs.version_updated= NULL; - doris_cbs.userdata = business; - - snprintf(stored_path, 512, "%s/full/index", business->store_path_root); - if(business->saves_when_fulldel > 0) + if(business->persistence_write_on) { - get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); - } - update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); - snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); - do { - update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); - }while(update_type != CFG_UPDATE_TYPE_NONE); + scanner = doris_index_file_scanner(0); + /*Retaive latest config to memory from Stored configs*/ + doris_cbs.version_start = doris_config_localmem_version_start; + doris_cbs.version_finish = doris_config_localmem_version_finish; + doris_cbs.version_error = doris_config_localmem_version_error; + doris_cbs.cfgfile_start = doris_config_localmem_cfgfile_start; + doris_cbs.cfgfile_update = doris_config_localmem_cfgfile_update; + doris_cbs.cfgfile_finish = doris_config_localmem_cfgfile_finish; + doris_cbs.version_updated= NULL; + doris_cbs.userdata = business; + snprintf(stored_path, 512, "%s/full/index", business->store_path_root); + if(business->saves_when_fulldel > 0) + { + get_full_topN_max_versions(stored_path, business->full_version_inc, business->saves_when_fulldel); + } + update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); + snprintf(stored_path, 512, "%s/inc/index", business->store_path_root); + do { + update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); + }while(update_type != CFG_UPDATE_TYPE_NONE); + } /*Check new configs*/ doris_cbs.version_start = doris_config_version_start; @@ -687,10 +725,12 @@ void* thread_doris_client_recv_cfg(void *arg) doris_cbs.cfgfile_start = doris_config_cfgfile_start; doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; + doris_cbs.version_updated= NULL; + doris_cbs.userdata = business; business->source_from = RECV_WAY_DRS_CLIENT; memset(&doris_args, 0, sizeof(struct doris_arguments)); - doris_args.current_version = scanner->cur_version; + doris_args.current_version = (business->persistence_write_on)?scanner->cur_version:(0-business->cache_max_versions); sprintf(doris_args.bizname, "%s", business->bizname); instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) @@ -860,6 +900,12 @@ void prod_server_generate_token(struct doris_business *business, char *token/*OU pthread_mutex_unlock(&g_doris_server_info.mutex_lock); } +/*TODO: 不能生成比对方版本更小的版本*/ +int64_t prod_server_generate_version(struct doris_business *business) +{ + return ++business->genversion_seq; +} + void business_resume_sync_peer_normal(struct doris_business *business) { u_int32_t business_post_ups; @@ -896,7 +942,7 @@ void business_set_sync_peer_abnormal(struct doris_business *business) { return; } - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mcluster sync error, please check slave status!!!\033[0m\n"); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]cluster sync error, please check slave status!!!\033[0m\n"); if(0 == atomic_set(&business->ready_to_sync, 0) || business->listener_prod==0) { @@ -1009,19 +1055,22 @@ void http_config_direct_version_cancel(struct version_list_node *vernode, struct { doris_prod_upload_ctx_destroy(vernode->synctx); } - if(vernode->fp_idx_file != NULL) + if(business->persistence_write_on) { - fclose(vernode->fp_idx_file); - remove(vernode->tmp_index_path); - } - if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL) - { - fclose(vernode->cur_table->fp_cfg_file); - remove(vernode->cur_table->localpath); - } - TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) - { - remove(tablenode->localpath); + if(vernode->fp_idx_file != NULL) + { + fclose(vernode->fp_idx_file); + remove(vernode->tmp_index_path); + } + if(vernode->cur_table!=NULL && vernode->cur_table->fp_cfg_file != NULL) + { + fclose(vernode->cur_table->fp_cfg_file); + remove(vernode->cur_table->localpath); + } + TAILQ_FOREACH(tablenode, &vernode->table_head, table_node) + { + remove(tablenode->localpath); + } } config_version_node_cleanup(vernode); if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) @@ -1050,7 +1099,7 @@ void prod_sync_vercancel_result_cb(enum PROD_VEROP_RES result, void *userdata) case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "version cancel sync error res_code"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version cancel sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: @@ -1115,24 +1164,27 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve { assert(newversion > vernode->version); vernode->version = newversion; - - if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL) - { - snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); - snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); - } - else - { - snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); - } - /*HTTP post时,多版本并发每个都有自己的临时通知文件名,复用本地文件的关闭函数*/ - sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path); business->version = vernode->version; business->type = vernode->cfg_type; - business->fp_idx_file = vernode->fp_idx_file; - doris_config_file_version_finish(NULL, business); - vernode->fp_idx_file = NULL; + if(business->persistence_write_on) + { + if(vernode->cfg_type == CFG_UPDATE_TYPE_FULL) + { + snprintf(business->inc_index_path, 256, "%s/inc/index/full_config_index.%010lu", business->store_path_root, vernode->version); + snprintf(business->full_index_path, 256, "%s/full/index/full_config_index.%010lu", business->store_path_root, vernode->version); + } + else + { + snprintf(business->inc_index_path, 256, "%s/inc/index/inc_config_index.%010lu", business->store_path_root, vernode->version); + } + /*HTTP post时,多版本并发每个都有自己的临时通知文件名,复用本地文件的关闭函数*/ + sprintf(business->tmp_index_path, "%s", vernode->tmp_index_path); + business->fp_idx_file = vernode->fp_idx_file; + doris_config_file_version_finish(NULL, business); + vernode->fp_idx_file = NULL; + } + if(g_doris_server_info.consumer_port) { business->cur_vernode = vernode; @@ -1158,7 +1210,7 @@ void doris_config_post_version_finish(struct doris_business *business, struct ve void http_config_direct_version_finish(struct version_list_node *vernode, struct evhttp_request *req, int64_t set_version) { struct doris_business *business=vernode->business; - char version[32], token[64]; + char version[32], token[64], lvdbkey[40]; int64_t new_version; if(business->concurrency_allowed && evtimer_pending(&vernode->timer_expire, NULL)) @@ -1168,12 +1220,22 @@ void http_config_direct_version_finish(struct version_list_node *vernode, struct if(set_version == 0) { - new_version = business->cfgver_head->latest_version + 1; + new_version = prod_server_generate_version(business); } else { - new_version = set_version; + new_version = business->genversion_seq = set_version; } + /*更新leveldb生成版本号的种子*/ + if(!business->persistence_write_on) + { + sprintf(lvdbkey, "%s_verseq", business->bizname); + if(!doris_kvdb_update_keystr_valint(g_doris_server_info.kvdbhandle, lvdbkey, new_version)) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert] business: %s, update levelDB failed!\033[0m\n", business->bizname); + } + } + sprintf(token, "%s", vernode->token); doris_config_post_version_finish(business, vernode, new_version); @@ -1198,7 +1260,7 @@ void prod_sync_verend_result_cb(enum PROD_VEROP_RES result, int64_t version, voi case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "version end sync error res_code"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version end sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: @@ -1332,20 +1394,23 @@ void doris_config_post_version_start(struct version_list_node *cur_vernode, cons struct doris_business *business=cur_vernode->business; snprintf(cur_vernode->token, 64, "%s", token); - if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL) - { - snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); - } - else - { - snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); - } - if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+"))) - { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno)); - assert(0); - } + if(business->persistence_write_on) + { + if(cur_vernode->cfg_type == CFG_UPDATE_TYPE_FULL) + { + snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); + } + else + { + snprintf(cur_vernode->tmp_index_path, 256, "%s/inc/full_config_index.%s.ing", business->store_path_root, token); + } + if(NULL==(cur_vernode->fp_idx_file = fopen(cur_vernode->tmp_index_path, "w+"))) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", business->bizname, cur_vernode->tmp_index_path, strerror(errno)); + assert(0); + } + } if(g_doris_server_info.consumer_port) { TAILQ_INIT(&cur_vernode->table_head); @@ -1404,7 +1469,7 @@ void try_restore_from_busy_peer(struct version_list_node *cur_vernode, const cha cur_vernode->req = NULL; if(busy) { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33mbusiness: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "\033[33m[Warning]business: %s, restore from busy peer, post master server send response version start: %s\033[0m", business->bizname, body); } else { @@ -1436,7 +1501,7 @@ void prod_sync_verstart_result_cb(enum PROD_VERSTART_RES result, const char *bod business->cur_vernode = NULL; business->posts_on_the_way--; FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_POST_ON_THE_WAY], FS_OP_SET, business->posts_on_the_way); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, version start sync error res_code, abandon it. Send 500 response to client.\033[0m", business->bizname); break; case VERSTART_CURL_ERROR: @@ -1727,12 +1792,14 @@ void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct e meta.cfgnum = vernode->cur_table->cfgnum; meta.size = 0; - vernode->business->type = vernode->cfg_type; - vernode->business->fp_idx_file = vernode->fp_idx_file; - doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business); - sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path); - vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file; - + if(vernode->business->persistence_write_on) + { + vernode->business->type = vernode->cfg_type; + vernode->business->fp_idx_file = vernode->fp_idx_file; + doris_config_file_cfgfile_start(NULL, &meta, NULL, vernode->business); + sprintf(vernode->cur_table->localpath, "%s", vernode->business->cfg_file_path); + vernode->cur_table->fp_cfg_file = vernode->business->fp_cfg_file; + } if(g_doris_server_info.consumer_port) { vernode->cur_table->table_meta = cJSON_CreateObject(); @@ -1749,7 +1816,10 @@ void doris_config_post_cfgfile_start(struct version_list_node *vernode, struct e void doris_config_post_cfgfile_finish(struct version_list_node *vernode, const char *md5str) { doris_config_common_cfgfile_finish(vernode->business); - fclose(vernode->cur_table->fp_cfg_file); + if(vernode->business->persistence_write_on) + { + fclose(vernode->cur_table->fp_cfg_file); + } assert(vernode->cur_table->filesize == 0); vernode->cur_table->filesize = vernode->cur_table->cur_totallen; @@ -1791,11 +1861,14 @@ void http_config_direct_cfgfile_update(struct version_list_node *vernode, struct } if(vernode->cur_table->fragsize > 0) { - writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file); - if(writen_len != vernode->cur_table->fragsize) + if(vernode->business->persistence_write_on) { - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno)); - assert(0); + writen_len = fwrite(vernode->cur_table->fragcontent, 1, vernode->cur_table->fragsize, vernode->cur_table->fp_cfg_file); + if(writen_len != vernode->cur_table->fragsize) + { + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fwrite %s failed: %s", vernode->business->bizname, vernode->cur_table->localpath, strerror(errno)); + assert(0); + } } if(g_doris_server_info.consumer_port) { @@ -1843,7 +1916,7 @@ void prod_sync_upload_frag_cb(enum PROD_VEROP_RES result,void * userdata) case VERSIONOP_RES_ERROR: evhttp_send_error(vernode->req, 500, "frag sync error res_code"); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40mbusiness: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[1;31;40m[Alert]business: %s, frag sync error res_code, abandon it. Send 500 response to client.\033[0m", vernode->business->bizname); break; case VERSIONOP_CURL_ERROR: @@ -2050,12 +2123,19 @@ void start_business_http_post_server(struct doris_business *business) } } -void doris_config_version_sync_updated(struct doris_csum_instance *instance, void *userdata) +void doris_config_version_sync_updated(struct doris_csum_instance *instance, int64_t latest_version, void *userdata) { struct doris_business *business=(struct doris_business *)userdata; struct doris_csum_param *param; u_int32_t references, business_post_ups; + if(latest_version) + { + business->genversion_seq = latest_version; + assert(business->cfgver_head->latest_version == latest_version); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[34m[Attention] business %s, HTTP Post server generate version change to: %lu\033[0m", business->bizname, latest_version); + } + /*销毁consuemer,同时确保本函数只执行一次*/ param = doris_csum_instance_get_param(instance); doris_csum_instance_destroy(instance); @@ -2091,8 +2171,7 @@ void doris_config_version_sync_updated(struct doris_csum_instance *instance, voi MESA_Monitor_operation(g_doris_server_info.monitor, g_doris_server_info.mmid_post_server, MONITOR_VALUE_SET, PROMETHUES_POST_SERVER_UPING); } assert(business_post_ups <= g_doris_server_info.business_post_num); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m******Doris Producer worker for %s starts******\033[0m", business->bizname); - MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris ready to sync for business: %s\n", business->bizname); + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[32m[Info]******Doris Producer worker for %s starts******\033[0m", business->bizname); } /*与thread_doris_client_recv_cfg差别仅在于version_updated函数*/ @@ -2106,6 +2185,7 @@ void* thread_http_post_recv_cfg(void *arg) struct doris_idxfile_scanner *scanner; enum DORIS_UPDATE_TYPE update_type; char stored_path[512]; + int64_t genversion_seq; prctl(PR_SET_NAME, "http_post"); @@ -2116,7 +2196,7 @@ void* thread_http_post_recv_cfg(void *arg) scanner = doris_index_file_scanner(0); - /*Retaive latest config to memory from Stored configs*/ + /*不管是否开启持久化,都尝试读一下本地配置;可作为设置版本起始号使用*/ doris_cbs.version_start = doris_config_localmem_version_start; doris_cbs.version_finish = doris_config_localmem_version_finish; doris_cbs.version_error = doris_config_localmem_version_error; @@ -2137,6 +2217,18 @@ void* thread_http_post_recv_cfg(void *arg) update_type = doris_index_file_traverse(scanner, stored_path, &doris_cbs, NULL, g_doris_server_info.log_runtime); }while(update_type != CFG_UPDATE_TYPE_NONE); + /*设置生成版本号起始序列*/ + business->genversion_seq = scanner->cur_version; + if(!business->persistence_write_on) + { + sprintf(stored_path, "%s_verseq", business->bizname); + if((genversion_seq = doris_kvdb_get_keystr_valint(g_doris_server_info.kvdbhandle, stored_path)) != 0) + { + business->genversion_seq = genversion_seq; + } + } + MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "\033[34m[Attention] business %s, HTTP Post server generate version from: %lu\033[0m\n", business->bizname, business->genversion_seq); + if(g_doris_server_info.cluster_sync_mode) /*Check new configs*/ { doris_cbs.version_start = doris_config_version_start; @@ -2146,10 +2238,11 @@ void* thread_http_post_recv_cfg(void *arg) doris_cbs.cfgfile_update = doris_config_cfgfile_update; doris_cbs.cfgfile_finish = doris_config_cfgfile_finish; doris_cbs.version_updated= doris_config_version_sync_updated; + doris_cbs.userdata = business; business->source_from = RECV_WAY_DRS_CLIENT; memset(&doris_args, 0, sizeof(struct doris_arguments)); - doris_args.current_version = scanner->cur_version; + doris_args.current_version = (business->persistence_write_on)?scanner->cur_version:(0-business->cache_max_versions); sprintf(doris_args.bizname, "%s", business->bizname); instance = doris_csum_instance_new(business->param_csum, client_evbase, &doris_cbs, &doris_args, g_doris_server_info.log_runtime); if(instance == NULL) diff --git a/server/doris_server_receive.h b/server/doris_server_receive.h index 70b4ea6..44a1247 100644 --- a/server/doris_server_receive.h +++ b/server/doris_server_receive.h @@ -142,7 +142,7 @@ struct version_list_handle TAILQ_HEAD(__version_list_node, version_list_node) version_head; int64_t latest_version; int32_t references; - u_int32_t version_num; + int32_t version_mem_num; map *version2node; struct version_list_node *oldest_vernode; //未进行内存淘汰的最老版本 }; diff --git a/server/doris_server_scandir.cpp b/server/doris_server_scandir.cpp index 73d8f2e..5097185 100644 --- a/server/doris_server_scandir.cpp +++ b/server/doris_server_scandir.cpp @@ -231,7 +231,7 @@ enum DORIS_UPDATE_TYPE get_new_idx_path(long long current_version, const char *f if(n < 0) { MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "scan dir error"); - return update_type; + return CFG_UPDATE_TYPE_NONE; } tmpidx_ver_array = (struct index_version_array*)calloc(sizeof(struct index_version_array), n); @@ -439,7 +439,7 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s table_num=cm_read_cfg_index_file(idx_path_array[i].path, table_array, CM_MAX_TABLE_NUM, logger); if(table_num<=0) { - MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40mAlert! Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path); + MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40m[Alert] Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path); update_type = CFG_UPDATE_TYPE_ERR; scanner->cur_version = idx_path_array[i].version; //错误的版本跳过 break; @@ -460,7 +460,7 @@ enum DORIS_UPDATE_TYPE doris_index_file_traverse(struct doris_idxfile_scanner *s update_type = CFG_UPDATE_TYPE_ERR; doris_cbs->version_error(NULL, doris_cbs->userdata); cJSON_Delete(meta); - MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40mAlert! Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path); + MESA_RUNTIME_LOGV4(logger,RLOG_LV_FATAL, "\033[1;31;40m[Alert] Load %s failed, skip this wrong version!!!!\033[0m\n", idx_path_array[i].path); break; } cJSON_Delete(meta); diff --git a/support/CMakeLists.txt b/support/CMakeLists.txt index 7247c49..f6dd070 100644 --- a/support/CMakeLists.txt +++ b/support/CMakeLists.txt @@ -145,19 +145,19 @@ set_property(TARGET libMesaMonitor PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib set_property(TARGET libMesaMonitor PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) -#### leveldb -#ExternalProject_Add(LevelDB PREFIX leveldb -# URL ${CMAKE_CURRENT_SOURCE_DIR}/leveldb-1.22.tar.gz -# URL_MD5 ada425fbd00dc0d3d892774bf71f6692 -# CMAKE_ARGS -DCMAKE_INSTALL_PREFIX= -# -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -# -DBUILD_SHARED_AND_STATIC_LIBS=1) -# -#ExternalProject_Get_Property(LevelDB INSTALL_DIR) -#file(MAKE_DIRECTORY ${INSTALL_DIR}/include) -# -#add_library(libLevelDB STATIC IMPORTED GLOBAL) -#add_dependencies(libLevelDB LevelDB) -#set_property(TARGET libLevelDB PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib64/libleveldb.a) -#set_property(TARGET libLevelDB PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include) +### leveldb +ExternalProject_Add(LevelDB PREFIX leveldb + URL ${CMAKE_CURRENT_SOURCE_DIR}/leveldb-1.22.tar.gz + URL_MD5 ada425fbd00dc0d3d892774bf71f6692 + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX= + -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DBUILD_SHARED_AND_STATIC_LIBS=1) + +ExternalProject_Get_Property(LevelDB INSTALL_DIR) +file(MAKE_DIRECTORY ${INSTALL_DIR}/include) + +add_library(libLevelDB STATIC IMPORTED GLOBAL) +add_dependencies(libLevelDB LevelDB) +set_property(TARGET libLevelDB PROPERTY IMPORTED_LOCATION ${INSTALL_DIR}/lib64/libleveldb.a) +set_property(TARGET libLevelDB PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INSTALL_DIR}/include)