支持文件接口用户自定义信息;
This commit is contained in:
@@ -60,9 +60,12 @@ void easy_string_savedata(struct easy_string *estr, const char *data, size_t len
|
|||||||
|
|
||||||
void doris_confile_ctx_reset(struct doris_confile_ctx *ctx)
|
void doris_confile_ctx_reset(struct doris_confile_ctx *ctx)
|
||||||
{
|
{
|
||||||
struct doris_http_ctx *httpctx=ctx->httpctx;
|
//<2F><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֲ<EFBFBD><D6B2><EFBFBD>
|
||||||
memset(ctx, 0, sizeof(struct doris_confile_ctx));
|
ctx->res_code = 0;
|
||||||
ctx->httpctx = httpctx;
|
ctx->contlength = 0;
|
||||||
|
ctx->contl_start = 0;
|
||||||
|
ctx->contl_end = 0;
|
||||||
|
ctx->contl_total = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doris_confile_ctx_destry(struct doris_confile_ctx *ctx)
|
void doris_confile_ctx_destry(struct doris_confile_ctx *ctx)
|
||||||
@@ -104,6 +107,9 @@ void doris_fetch_next_confile_meta(struct doris_instance *instance)
|
|||||||
sub = cJSON_GetObjectItem(cur_a_item, "cfg_num");
|
sub = cJSON_GetObjectItem(cur_a_item, "cfg_num");
|
||||||
instance->curmeta.cfg_num = sub->valueint;
|
instance->curmeta.cfg_num = sub->valueint;
|
||||||
|
|
||||||
|
sub = cJSON_GetObjectItem(cur_a_item, "user_region");
|
||||||
|
instance->curmeta.user_region = (sub==NULL)?NULL:sub->valuestring;
|
||||||
|
|
||||||
if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5")))
|
if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5")))
|
||||||
{
|
{
|
||||||
instance->curmeta.validate_md5 = 1;
|
instance->curmeta.validate_md5 = 1;
|
||||||
@@ -137,8 +143,8 @@ void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code
|
|||||||
instance->retry_times = 0;
|
instance->retry_times = 0;
|
||||||
if(instance->curmeta.curoffset == 0)
|
if(instance->curmeta.curoffset == 0)
|
||||||
{
|
{
|
||||||
instance->cbs.cfgfile_start(instance, instance->curmeta.table_name,
|
instance->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->curmeta.size,
|
||||||
instance->curmeta.size, instance->curmeta.cfg_num, instance->cbs.userdata);
|
instance->curmeta.cfg_num, instance->curmeta.user_region, instance->cbs.userdata);
|
||||||
MD5_Init(&instance->ctx.md5ctx);
|
MD5_Init(&instance->ctx.md5ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -305,7 +311,7 @@ void doris_http_fetch_confile(struct doris_instance *instance)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1;
|
instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1;
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s",
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Launch confile %s GET, req_version=%lu, %s",
|
||||||
instance->curmeta.table_name, instance->req_version, range);
|
instance->curmeta.table_name, instance->req_version, range);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -324,8 +330,8 @@ void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, lon
|
|||||||
|
|
||||||
if(res_code != 200)
|
if(res_code != 200)
|
||||||
{
|
{
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d",
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d",
|
||||||
instance->cur_version, instance->req_version, code);
|
instance->ctx.server, instance->cur_version, instance->req_version, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -348,8 +354,8 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void
|
|||||||
|
|
||||||
if(res!=CURLE_OK)
|
if(res!=CURLE_OK)
|
||||||
{
|
{
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
|
||||||
instance->cur_version, instance->req_version, res_code, err);
|
instance->ctx.server, instance->cur_version, instance->req_version, res_code, err);
|
||||||
goto out_error;
|
goto out_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -361,15 +367,16 @@ void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void
|
|||||||
instance->meta = cJSON_Parse(instance->estr.buff);
|
instance->meta = cJSON_Parse(instance->estr.buff);
|
||||||
if(instance->meta == NULL)
|
if(instance->meta == NULL)
|
||||||
{
|
{
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->req_version, instance->estr.buff);
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, server: %s, req_version=%lu, invalid json: %s",
|
||||||
|
instance->ctx.server, instance->req_version, instance->estr.buff);
|
||||||
goto out_error;
|
goto out_error;
|
||||||
}
|
}
|
||||||
sub = cJSON_GetObjectItem(instance->meta, "version");
|
sub = cJSON_GetObjectItem(instance->meta, "version");
|
||||||
instance->new_version = sub->valuedouble;
|
instance->new_version = sub->valuedouble;
|
||||||
instance->req_version = instance->new_version;
|
instance->req_version = instance->new_version;
|
||||||
instance->statistic.field[DRS_FS_FILED_RES_META] += 1;
|
instance->statistic.field[DRS_FS_FILED_RES_META] += 1;
|
||||||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s",
|
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, server: %s, cur_version=%lu, newjson: %s",
|
||||||
instance->cur_version, instance->estr.buff);
|
instance->ctx.server, instance->cur_version, instance->estr.buff);
|
||||||
|
|
||||||
instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata);
|
instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata);
|
||||||
instance->array = cJSON_GetObjectItem(instance->meta, "configs");
|
instance->array = cJSON_GetObjectItem(instance->meta, "configs");
|
||||||
@@ -395,7 +402,8 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
|
|||||||
struct doris_http_callback curlcbs;
|
struct doris_http_callback curlcbs;
|
||||||
char metauri[128];
|
char metauri[128];
|
||||||
|
|
||||||
balance_seed = (((u_int64_t)rand()) << 32) | rand();
|
balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) |
|
||||||
|
(((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF);
|
||||||
|
|
||||||
memset(&curlcbs, 0, sizeof(struct doris_http_callback));
|
memset(&curlcbs, 0, sizeof(struct doris_http_callback));
|
||||||
curlcbs.header_cb = doris_http_meta_header_cb;
|
curlcbs.header_cb = doris_http_meta_header_cb;
|
||||||
@@ -405,16 +413,16 @@ static void doris_http_fetch_meta(struct doris_instance *instance)
|
|||||||
|
|
||||||
instance->array_index = 0;
|
instance->array_index = 0;
|
||||||
instance->cur_httpins = instance->httpins_master;
|
instance->cur_httpins = instance->httpins_master;
|
||||||
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
|
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
|
||||||
if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
|
if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
|
||||||
{
|
{
|
||||||
instance->cur_httpins = instance->httpins_backup1;
|
instance->cur_httpins = instance->httpins_backup1;
|
||||||
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
|
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
|
||||||
}
|
}
|
||||||
if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
|
if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
|
||||||
{
|
{
|
||||||
instance->cur_httpins = instance->httpins_backup2;
|
instance->cur_httpins = instance->httpins_backup2;
|
||||||
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
|
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
|
||||||
}
|
}
|
||||||
|
|
||||||
instance->req_version = instance->cur_version + 1; //ֻ<>а汾<D0B0><E6B1BE><EFBFBD>³ɹ<C2B3><C9B9><EFBFBD><EFBFBD><EFBFBD>cur_version<6F>Ż<EFBFBD><C5BB><EFBFBD><EFBFBD><EFBFBD>
|
instance->req_version = instance->cur_version + 1; //ֻ<>а汾<D0B0><E6B1BE><EFBFBD>³ɹ<C2B3><C9B9><EFBFBD><EFBFBD><EFBFBD>cur_version<6F>Ż<EFBFBD><C5BB><EFBFBD><EFBFBD><EFBFBD>
|
||||||
@@ -616,7 +624,7 @@ struct doris_instance *doris_instance_new(struct doris_parameter *param, struct
|
|||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
srand(time(NULL));
|
srand((int64_t)param);
|
||||||
|
|
||||||
if(param->param_backup1 != NULL)
|
if(param->param_backup1 != NULL)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ struct fetch_file_meta
|
|||||||
u_int32_t cfg_num;
|
u_int32_t cfg_num;
|
||||||
u_int32_t validate_md5;
|
u_int32_t validate_md5;
|
||||||
char md5str[36];
|
char md5str[36];
|
||||||
|
const char *user_region;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct md5_long
|
struct md5_long
|
||||||
@@ -68,6 +69,7 @@ union doris_md5
|
|||||||
struct doris_confile_ctx
|
struct doris_confile_ctx
|
||||||
{
|
{
|
||||||
struct doris_http_ctx *httpctx;
|
struct doris_http_ctx *httpctx;
|
||||||
|
char server[64];
|
||||||
|
|
||||||
MD5_CTX md5ctx;
|
MD5_CTX md5ctx;
|
||||||
long res_code;
|
long res_code;
|
||||||
|
|||||||
@@ -49,7 +49,8 @@ void doris_http_ctx_destroy(struct doris_http_ctx *ctx)
|
|||||||
free(ctx);
|
free(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, struct doris_http_callback *cb, u_int64_t balance_seed)
|
struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance,
|
||||||
|
struct doris_http_callback *cb, u_int64_t balance_seed, char *host, int32_t size)
|
||||||
{
|
{
|
||||||
struct doris_http_ctx *ctx;
|
struct doris_http_ctx *ctx;
|
||||||
struct doris_curl_multihd *multidata;
|
struct doris_curl_multihd *multidata;
|
||||||
@@ -61,6 +62,7 @@ struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance,
|
|||||||
}
|
}
|
||||||
assert(instance->server_hosts->find(result.bucket_id) != instance->server_hosts->end());
|
assert(instance->server_hosts->find(result.bucket_id) != instance->server_hosts->end());
|
||||||
multidata = instance->server_hosts->find(result.bucket_id)->second;
|
multidata = instance->server_hosts->find(result.bucket_id)->second;
|
||||||
|
snprintf(host, size, multidata->host->srvaddr);
|
||||||
|
|
||||||
ctx = (struct doris_http_ctx *)calloc(1, sizeof(struct doris_http_ctx));
|
ctx = (struct doris_http_ctx *)calloc(1, sizeof(struct doris_http_ctx));
|
||||||
ctx->instance = instance;
|
ctx->instance = instance;
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ struct doris_curl_multihd
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct doris_curl_multihd *doris_initialize_multihd_for_host(struct doris_http_instance *instance, struct dst_host_cnn_balance *host);
|
struct doris_curl_multihd *doris_initialize_multihd_for_host(struct doris_http_instance *instance, struct dst_host_cnn_balance *host);
|
||||||
struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, struct doris_http_callback *cb, u_int64_t balance_seed);
|
struct doris_http_ctx *doris_http_ctx_new(struct doris_http_instance *instance, struct doris_http_callback *cb, u_int64_t balance_seed, char *host, int32_t size);
|
||||||
void doris_http_ctx_destroy(struct doris_http_ctx *ctx);
|
void doris_http_ctx_destroy(struct doris_http_ctx *ctx);
|
||||||
void doris_http_ctx_reset(struct doris_http_ctx *ctx, struct doris_http_callback *cb);
|
void doris_http_ctx_reset(struct doris_http_ctx *ctx, struct doris_http_callback *cb);
|
||||||
|
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_i
|
|||||||
u_int32_t hash, i=0;
|
u_int32_t hash, i=0;
|
||||||
|
|
||||||
seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id;
|
seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id;
|
||||||
hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673);
|
hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193);
|
||||||
x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
|
x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
|
||||||
|
|
||||||
while(i != cur_point_index)
|
while(i != cur_point_index)
|
||||||
@@ -155,7 +155,7 @@ static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_i
|
|||||||
if(x == inner_bucket->point_array[i].point_val) //<2F><>ͻ
|
if(x == inner_bucket->point_array[i].point_val) //<2F><>ͻ
|
||||||
{
|
{
|
||||||
seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
|
seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
|
||||||
hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673);
|
hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193);
|
||||||
x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
|
x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
|
||||||
i = 0;
|
i = 0;
|
||||||
break;
|
break;
|
||||||
@@ -434,7 +434,7 @@ enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const voi
|
|||||||
return CONHASH_NO_VALID_BUCKETS;
|
return CONHASH_NO_VALID_BUCKETS;
|
||||||
}
|
}
|
||||||
|
|
||||||
hash = MurmurHash64A(key, len, 23068673);
|
hash = MurmurHash64A(key, len, 515880193);
|
||||||
idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val));
|
idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val));
|
||||||
ch->point_array[idx].hit_cnt++;
|
ch->point_array[idx].hit_cnt++;
|
||||||
bucket_index = ch->point_array[idx].bucket_index;
|
bucket_index = ch->point_array[idx].bucket_index;
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ struct doris_callbacks
|
|||||||
{
|
{
|
||||||
void *userdata;
|
void *userdata;
|
||||||
void (*version_start)(struct doris_instance *instance, cJSON *meta, void *userdata); //meta<74><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>汾<EFBFBD><E6B1BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڶ<EFBFBD><DAB6><EFBFBD>Ч
|
void (*version_start)(struct doris_instance *instance, cJSON *meta, void *userdata); //meta<74><61><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>汾<EFBFBD><E6B1BE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڶ<EFBFBD><DAB6><EFBFBD>Ч
|
||||||
void (*cfgfile_start)(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata);
|
void (*cfgfile_start)(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, const char *userregion, void *userdata);
|
||||||
void (*cfgfile_update)(struct doris_instance *instance, const char *data, size_t len, void *userdata);
|
void (*cfgfile_update)(struct doris_instance *instance, const char *data, size_t len, void *userdata);
|
||||||
void (*cfgfile_finish)(struct doris_instance *instance, const char *md5, void *userdata);
|
void (*cfgfile_finish)(struct doris_instance *instance, const char *md5, void *userdata);
|
||||||
void (*version_error)(struct doris_instance *instance, void *userdata); //<2F><><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD>ʧ<EFBFBD>ܣ<EFBFBD><DCA3>ð汾<C3B0><E6B1BE>Ҫ<EFBFBD>ع<EFBFBD>
|
void (*version_error)(struct doris_instance *instance, void *userdata); //<2F><><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD>ʧ<EFBFBD>ܣ<EFBFBD><DCA3>ð汾<C3B0><E6B1BE>Ҫ<EFBFBD>ع<EFBFBD>
|
||||||
|
|||||||
@@ -17,7 +17,7 @@
|
|||||||
#include "doris_server_http.h"
|
#include "doris_server_http.h"
|
||||||
|
|
||||||
struct doris_global_info g_doris_server_info;
|
struct doris_global_info g_doris_server_info;
|
||||||
static unsigned long doris_vesion_20210722=20210722L;
|
static unsigned long doris_vesion_20210726=20210726L;
|
||||||
|
|
||||||
int doris_mkdir_according_path(const char * path)
|
int doris_mkdir_according_path(const char * path)
|
||||||
{
|
{
|
||||||
@@ -101,6 +101,7 @@ int32_t doris_read_profile_configs(const char *config_file)
|
|||||||
}
|
}
|
||||||
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864);
|
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "cache_file_frag_size", &g_doris_server_info.cache_frag_size, 67108864);
|
||||||
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1);
|
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "doris_server_role_on", &g_doris_server_info.server_role_sw, 1);
|
||||||
|
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "index_file_format_maat", &g_doris_server_info.idx_file_maat, 0);
|
||||||
|
|
||||||
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
|
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "scan_index_file_interval", &g_doris_server_info.scan_idx_interval, 10);
|
||||||
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0);
|
MESA_load_profile_uint_def(config_file, "DORIS_SERVER", "https_connection_on", &g_doris_server_info.ssl_conn_on, 0);
|
||||||
@@ -326,7 +327,7 @@ int main(int argc, char **argv)
|
|||||||
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
|
evhttp_set_cb(manager_http, "/doris/statistic/status", manager_statistic_status_requests_cb, NULL);
|
||||||
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
|
evhttp_set_cb(manager_http, "/doris/statistic/threads", manager_statistic_threads_requests_cb, NULL);
|
||||||
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
|
evhttp_set_gencb(manager_http, manager_generic_requests_cb, NULL);
|
||||||
g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210722);
|
g_doris_server_info.monitor = MESA_Monitor_instance_evhttp_new(manager_http, doris_vesion_20210726);
|
||||||
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
|
if(evhttp_accept_socket(manager_http, g_doris_server_info.manager))
|
||||||
{
|
{
|
||||||
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
|
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.manager);
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ struct doris_global_info
|
|||||||
|
|
||||||
struct doris_business business[MAX_BUSINESS_NUM];
|
struct doris_business business[MAX_BUSINESS_NUM];
|
||||||
u_int32_t business_num;
|
u_int32_t business_num;
|
||||||
|
u_int32_t idx_file_maat;
|
||||||
map<string, struct doris_business*> *name2business;
|
map<string, struct doris_business*> *name2business;
|
||||||
map<string, struct doris_parameter *> *confile2param;
|
map<string, struct doris_parameter *> *confile2param;
|
||||||
|
|
||||||
|
|||||||
@@ -181,7 +181,8 @@ void doris_config_file_version_error(struct doris_instance *instance, void *user
|
|||||||
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version);
|
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, Version %llu error, rolling back...", save->business->bizname, save->version);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
|
void doris_config_file_cfgfile_start(struct doris_instance *instance, const char *tablename,
|
||||||
|
size_t size, u_int32_t cfgnum, const char *userregion, void *userdata)
|
||||||
{
|
{
|
||||||
struct confile_save *save=(struct confile_save *)userdata;
|
struct confile_save *save=(struct confile_save *)userdata;
|
||||||
struct tm *localtm, savetime;
|
struct tm *localtm, savetime;
|
||||||
@@ -198,8 +199,14 @@ void doris_config_file_cfgfile_start(struct doris_instance *instance, const char
|
|||||||
doris_mkdir_according_path(dir);
|
doris_mkdir_according_path(dir);
|
||||||
}
|
}
|
||||||
snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version);
|
snprintf(save->cfg_file_path, 256, "%s/%s.%010lu", dir, tablename, save->version);
|
||||||
fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path);
|
if(g_doris_server_info.idx_file_maat) //MAAT<41><54>ʽ<EFBFBD><CABD>֪ͨ<CDA8>ļ<EFBFBD>
|
||||||
|
{
|
||||||
|
fprintf(save->fp_idx_file, "%s\t%u\t%s\n", tablename, cfgnum, save->cfg_file_path);
|
||||||
|
}
|
||||||
|
else //ת<><D7AA><EFBFBD><EFBFBD>ɫ<EFBFBD><C9AB><EFBFBD><EFBFBD><EFBFBD>û<EFBFBD><C3BB>Զ<EFBFBD><D4B6><EFBFBD><EFBFBD><EFBFBD>Ϣ
|
||||||
|
{
|
||||||
|
fprintf(save->fp_idx_file, "%s\t%u\t%s\t%s\n", tablename, cfgnum, save->cfg_file_path, userregion);
|
||||||
|
}
|
||||||
if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+")))
|
if(NULL == (save->fp_cfg_file = fopen(save->cfg_file_path, "w+")))
|
||||||
{
|
{
|
||||||
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
|
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "business: %s, fopen %s failed: %s", save->business->bizname, save->cfg_file_path, strerror(errno));
|
||||||
@@ -299,7 +306,8 @@ void doris_config_mem_version_error(struct doris_instance *instance, void *userd
|
|||||||
save->cur_vernode = NULL;
|
save->cur_vernode = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename, size_t size, u_int32_t cfgnum, void *userdata)
|
void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char *tablename,
|
||||||
|
size_t size, u_int32_t cfgnum, const char *userregion, void *userdata)
|
||||||
{
|
{
|
||||||
struct confile_save *save=(struct confile_save *)userdata;
|
struct confile_save *save=(struct confile_save *)userdata;
|
||||||
|
|
||||||
@@ -307,7 +315,10 @@ void doris_config_mem_cfgfile_start(struct doris_instance *instance, const char
|
|||||||
cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", tablename);
|
cJSON_AddStringToObject(save->cur_vernode->table_meta, "tablename", tablename);
|
||||||
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", cfgnum);
|
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "cfg_num", cfgnum);
|
||||||
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", size);
|
cJSON_AddNumberToObject(save->cur_vernode->table_meta, "size", size);
|
||||||
|
if(userregion != NULL)
|
||||||
|
{
|
||||||
|
cJSON_AddStringToObject(save->cur_vernode->table_meta, "user_region", userregion);
|
||||||
|
}
|
||||||
save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
|
save->cur_table = (struct table_list_node *)calloc(1, sizeof(struct table_list_node));
|
||||||
snprintf(save->cur_table->tablename, 64, "%s", tablename);
|
snprintf(save->cur_table->tablename, 64, "%s", tablename);
|
||||||
save->cur_table->filesize = size;
|
save->cur_table->filesize = size;
|
||||||
@@ -465,12 +476,13 @@ void doris_config_localmem_version_error(struct doris_instance *instance, void *
|
|||||||
doris_config_mem_version_error(instance, userdata);
|
doris_config_mem_version_error(instance, userdata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doris_config_localmem_cfgfile_start(struct doris_instance *instance, const char *tablename,
|
||||||
size_t size, u_int32_t cfgnum, const char *userregion, void *userdata)
|
size_t size, u_int32_t cfgnum, const char *userregion, void *userdata)
|
||||||
{
|
{
|
||||||
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
|
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
|
||||||
if(g_doris_server_info.server_role_sw)
|
if(g_doris_server_info.server_role_sw)
|
||||||
{
|
{
|
||||||
doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata);
|
doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -537,18 +549,19 @@ void doris_config_version_error(struct doris_instance *instance, void *userdata)
|
|||||||
doris_config_mem_version_error(instance, userdata);
|
doris_config_mem_version_error(instance, userdata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doris_config_cfgfile_start(struct doris_instance *instance, const char *tablename,
|
||||||
size_t size, u_int32_t cfgnum, const char *userregion, void *userdata)
|
size_t size, u_int32_t cfgnum, const char *userregion, void *userdata)
|
||||||
{
|
{
|
||||||
struct confile_save *save=(struct confile_save *)userdata;
|
struct confile_save *save=(struct confile_save *)userdata;
|
||||||
|
|
||||||
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
|
doris_config_common_cfgfile_start((struct confile_save *)userdata, cfgnum);
|
||||||
if(save->business->write_file_sw)
|
if(save->business->write_file_sw)
|
||||||
{
|
{
|
||||||
doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata);
|
doris_config_file_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata);
|
||||||
}
|
}
|
||||||
if(g_doris_server_info.server_role_sw)
|
if(g_doris_server_info.server_role_sw)
|
||||||
{
|
{
|
||||||
doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata);
|
doris_config_mem_cfgfile_start(instance, tablename, size, cfgnum, userregion, userdata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -201,7 +201,7 @@ int cm_read_cfg_index_file(const char* path, struct cfg_table_info* idx/*OUT*/,
|
|||||||
{
|
{
|
||||||
memset(line, 0, sizeof(line));
|
memset(line, 0, sizeof(line));
|
||||||
fgets(line, sizeof(line), fp);
|
fgets(line, sizeof(line), fp);
|
||||||
ret=sscanf(line,"%s\t%d\t%s\t%s", idx[i].table_name, &(idx[i].cfg_num), idx[i].cfg_path, idx[i].encryp_algorithm);
|
ret=sscanf(line,"%[^ \t]%*[ \t]%d%*[ \t]%s%*[ \t]%s", idx[i].table_name, &(idx[i].cfg_num), idx[i].cfg_path, idx[i].user_region);
|
||||||
if((ret!=3 && ret!=4) || idx[i].cfg_num==0)//jump over empty line
|
if((ret!=3 && ret!=4) || idx[i].cfg_num==0)//jump over empty line
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
@@ -231,7 +231,7 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab
|
|||||||
FILE *fp;
|
FILE *fp;
|
||||||
size_t readlen, remainlen, oncesize;
|
size_t readlen, remainlen, oncesize;
|
||||||
MD5_CTX md5ctx;
|
MD5_CTX md5ctx;
|
||||||
char md5buffer[64];
|
char md5buffer[64], *user_region=NULL;
|
||||||
|
|
||||||
if((fp = fopen(table->cfg_path, "r")) == NULL)
|
if((fp = fopen(table->cfg_path, "r")) == NULL)
|
||||||
{
|
{
|
||||||
@@ -240,7 +240,11 @@ bool doris_read_table_file(struct doris_idxfile_scanner *scanner, struct cfg_tab
|
|||||||
}
|
}
|
||||||
MD5_Init(&md5ctx);
|
MD5_Init(&md5ctx);
|
||||||
|
|
||||||
doris_cbs->cfgfile_start(NULL, table->table_name, table->filesize, table->cfg_num, doris_cbs->userdata);
|
if(table->user_region[0] != '\0')
|
||||||
|
{
|
||||||
|
user_region = table->user_region;
|
||||||
|
}
|
||||||
|
doris_cbs->cfgfile_start(NULL, table->table_name, table->filesize, table->cfg_num, user_region, doris_cbs->userdata);
|
||||||
remainlen = table->filesize;
|
remainlen = table->filesize;
|
||||||
while(remainlen > 0)
|
while(remainlen > 0)
|
||||||
{
|
{
|
||||||
@@ -273,6 +277,10 @@ cJSON *doris_index_version_start(int64_t version, struct cfg_table_info *table_a
|
|||||||
cJSON_AddStringToObject(tmp, "tablename", table_array[i].table_name);
|
cJSON_AddStringToObject(tmp, "tablename", table_array[i].table_name);
|
||||||
cJSON_AddNumberToObject(tmp, "size", table_array[i].filesize);
|
cJSON_AddNumberToObject(tmp, "size", table_array[i].filesize);
|
||||||
cJSON_AddNumberToObject(tmp, "cfg_num", table_array[i].cfg_num);
|
cJSON_AddNumberToObject(tmp, "cfg_num", table_array[i].cfg_num);
|
||||||
|
if(table_array->user_region[0] != '\0')
|
||||||
|
{
|
||||||
|
cJSON_AddStringToObject(tmp, "user_region", table_array[i].user_region);
|
||||||
|
}
|
||||||
cJSON_AddItemToArray(array, tmp);
|
cJSON_AddItemToArray(array, tmp);
|
||||||
}
|
}
|
||||||
cJSON_AddItemToObject(meta, "configs", array);
|
cJSON_AddItemToObject(meta, "configs", array);
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
#define CM_MAX_TABLE_NUM 256
|
#define CM_MAX_TABLE_NUM 256
|
||||||
#define MAX_CONFIG_FN_LEN 256
|
#define MAX_CONFIG_FN_LEN 256
|
||||||
#define MAX_CONFIG_LINE (1024*16)
|
#define MAX_CONFIG_LINE (1024*16)
|
||||||
|
#define MAX_USERREGION_LEN 4096
|
||||||
|
|
||||||
#define ONCE_BUF_SIZE 1048576
|
#define ONCE_BUF_SIZE 1048576
|
||||||
|
|
||||||
@@ -35,7 +36,7 @@ struct cfg_table_info
|
|||||||
char cfg_path[MAX_CONFIG_FN_LEN];
|
char cfg_path[MAX_CONFIG_FN_LEN];
|
||||||
int cfg_num;
|
int cfg_num;
|
||||||
size_t filesize;
|
size_t filesize;
|
||||||
char encryp_algorithm[MAX_CONFIG_FN_LEN];
|
char user_region[MAX_USERREGION_LEN];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct doris_idxfile_scanner
|
struct doris_idxfile_scanner
|
||||||
|
|||||||
Reference in New Issue
Block a user