This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
doris-doris-dispatch/client/doris_client_fetch.cpp

879 lines
31 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <openssl/sha.h>
#include <openssl/md5.h>
#include <MESA/MESA_prof_load.h>
#include "doris_client_fetch.h"
static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size)
{
unsigned char md5[17]={0};
int i;
if(MD5_Final(md5, c) != 1)
{
return -1;
}
if(size < 33)
return -1;
for(i=0; i<16; i++)
{
sprintf(result + i*2, "%02x", md5[i]);
}
result[32] = '\0';
return 0;
}
void doris_confile_ctx_reset(struct doris_confile_ctx *ctx)
{
//ÆäËû±£³Ö²»±ä
ctx->res_code = 0;
ctx->contlength = 0;
ctx->contl_start = 0;
ctx->contl_end = 0;
ctx->contl_total = 0;
}
void doris_confile_ctx_destry(struct doris_confile_ctx *ctx)
{
doris_confile_ctx_reset(ctx);
doris_http_ctx_destroy(ctx->httpctx);
ctx->httpctx = NULL;
}
void doris_update_new_version(struct doris_csum_instance *instance)
{
instance->cur_version = instance->new_version;
}
void doris_request_restart_timer(struct doris_csum_instance *instance, time_t wait_s)
{
struct timeval tv;
tv.tv_sec = wait_s;
tv.tv_usec = 0;
event_add(&instance->timer_fetch, &tv);
}
void doris_fetch_next_confile_meta(struct doris_csum_instance *instance)
{
cJSON *cur_a_item, *sub;
memset(&instance->curmeta, 0, sizeof(struct fetch_file_meta));
cur_a_item = cJSON_GetArrayItem(instance->array, instance->array_index);
instance->array_index++;
sub = cJSON_GetObjectItem(cur_a_item, "tablename");
instance->curmeta.meta.tablename = sub->valuestring;
sub = cJSON_GetObjectItem(cur_a_item, "filename");
instance->curmeta.meta.filename = sub->valuestring;
sub = cJSON_GetObjectItem(cur_a_item, "size");
instance->curmeta.meta.size = sub->valuedouble;
sub = cJSON_GetObjectItem(cur_a_item, "cfg_num");
instance->curmeta.meta.cfgnum = sub->valueint;
sub = cJSON_GetObjectItem(cur_a_item, "user_region");
instance->curmeta.meta.userregion = (sub==NULL)?NULL:sub->valuestring;
if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5")))
{
instance->curmeta.validate_md5 = 1;
snprintf(instance->curmeta.md5str, 36, "%s", sub->valuestring);
}
else
{
instance->curmeta.validate_md5 = 0;
}
}
void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
const char *pos_colon;
size_t datalen;
char buffer[64];
int ret;
if(instance->ctx.res_code == 0) //check code only once
{
instance->ctx.res_code = res_code;
assert(res_code != 0);
if(res_code != 200 && res_code!=206)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d",
instance->curmeta.meta.tablename, instance->req_version, code);
return;
}
instance->retry_times = 0;
if(instance->curmeta.curoffset == 0)
{
instance->cbs.cfgfile_start(instance, &instance->curmeta.meta, NULL, instance->cbs.userdata);
MD5_Init(&instance->ctx.md5ctx);
}
}
if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
{
return ;
}
datalen = pos_colon - start;
switch(datalen)
{
case 14:
if(!strncasecmp(start, "Content-Length:", 15))
{
memcpy(buffer, start+15, bytes-15);
buffer[bytes-15] = '\0';
instance->ctx.contlength = atol(buffer);
}
break;
case 13:
if(!strncasecmp(start, "Content-Range:", 14))
{
memcpy(buffer, start+13, bytes-13);
buffer[bytes-13] = '\0';
ret = sscanf(buffer, "%*[^0-9]%lu-%lu/%lu", &instance->ctx.contl_start, &instance->ctx.contl_end, &instance->ctx.contl_total);
assert(ret == 3 && instance->ctx.contl_total == instance->curmeta.meta.size && instance->ctx.contl_start==instance->curmeta.curoffset);
}
break;
default: break;
}
}
void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
if(code!=CURLE_OK || (instance->ctx.res_code!=200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206))
{
return;
}
instance->cbs.cfgfile_update(instance, ptr, bytes, instance->cbs.userdata);
MD5_Update(&instance->ctx.md5ctx, ptr, bytes);
instance->curmeta.curoffset += bytes;
instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes;
}
void doris_http_fetch_confile(struct doris_csum_instance *instance);
void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
char md5buffer[64];
bool direct_fail=false;
if(res!=CURLE_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s",
instance->curmeta.meta.tablename, instance->req_version, res_code, err);
goto out_error;
}
if((instance->ctx.res_code != 200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206))
{
goto out_error;
}
if(instance->ctx.contl_total != 0)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu",
instance->curmeta.meta.tablename, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total);
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu",
instance->curmeta.meta.tablename, instance->req_version, instance->ctx.contlength, instance->curmeta.meta.size);
}
instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1;
if(instance->curmeta.curoffset >= instance->curmeta.meta.size) //¸ÃÎļþÏÂÔØÍê±Ï
{
doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64);
if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer))
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s",
instance->curmeta.meta.tablename, instance->req_version, md5buffer, instance->curmeta.md5str);
direct_fail=true;goto out_md5;
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.%010lu over, md5: %s",
instance->curmeta.meta.tablename, instance->req_version, md5buffer);
}
instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1;
instance->cbs.cfgfile_finish(instance, md5buffer, instance->cbs.userdata);
if(instance->array_index == instance->array_size)
{
instance->cbs.version_finish(instance, instance->cbs.userdata);
instance->status = FETCH_STATUS_META;
doris_update_new_version(instance);
cJSON_Delete(instance->meta);
doris_confile_ctx_destry(&instance->ctx);
doris_request_restart_timer(instance, 0);
}
else
{
doris_fetch_next_confile_meta(instance);
doris_http_fetch_confile(instance);
}
}
else
{
doris_http_fetch_confile(instance);
}
return;
out_error:
if(instance->ctx.res_code == 404) //404Ó¦´ðµÄÖØÐ¿ªÊ¼ÇëÇó
{
direct_fail = true;
}
else
{
instance->retry_times++;
}
out_md5:
instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1;
if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail)
{
instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1;
instance->cbs.version_error(instance, instance->cbs.userdata);
instance->retry_times = 0;
instance->status = FETCH_STATUS_META;
cJSON_Delete(instance->meta);
doris_confile_ctx_destry(&instance->ctx);
}
doris_request_restart_timer(instance, instance->param->retry_interval);
}
void doris_http_fetch_confile(struct doris_csum_instance *instance)
{
struct doris_http_callback curlcbs;
char metauri[128], range[64]={0};
curlcbs.header_cb = doris_http_confile_header_cb;
curlcbs.write_cb = doris_http_confile_body_cb;
curlcbs.transfer_done_cb = doris_http_confile_done_cb;
curlcbs.userp = instance;
doris_confile_ctx_reset(&instance->ctx);
doris_http_ctx_reset(instance->ctx.httpctx, &curlcbs);
//³¬´óÎļþ·Ö¶ÎÏÂÔØ£»ÉÏ´ÎδÍê³ÉµÄ¼ÌÐøÏÂÔØ
if((instance->curmeta.meta.size > instance->param->fetch_frag_size) || instance->curmeta.curoffset!=0)
{
sprintf(range, "Range: bytes=%lu-%lu", instance->curmeta.curoffset, instance->curmeta.curoffset + instance->param->fetch_frag_size - 1);
doris_http_ctx_add_header(instance->ctx.httpctx, range);
}
snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.meta.tablename, instance->req_version, instance->args.bizname);
if(doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
doris_request_restart_timer(instance, instance->param->retry_interval);
}
else
{
instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1;
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Launch confile %s GET, req_version=%lu, %s",
instance->curmeta.meta.tablename, instance->req_version, range);
}
}
void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
//check code only once
if(instance->ctx.res_code != 0)
{
return;
}
instance->ctx.res_code = res_code;
assert(res_code != 0);
if(res_code != 200)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "business: %s, No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d",
instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, code);
}
}
void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
if(code!=CURLE_OK || res_code!=200)
{
return;
}
easy_string_savedata(&instance->estr, (const char*)ptr, bytes);
}
void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
cJSON *sub;
int64_t new_version;
evtimer_del(&instance->ctx.timer_expires);
if(res!=CURLE_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, res_code, err);
goto out_error;
}
if(instance->ctx.res_code != 200 || res_code!=200)
{
goto out_error;
}
instance->meta = cJSON_Parse(instance->estr.buff);
if(instance->meta == NULL)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Parse meta failed, server: %s, req_version=%lu, invalid json: %s",
instance->args.bizname, instance->ctx.server, instance->req_version, instance->estr.buff);
goto out_error;
}
sub = cJSON_GetObjectItem(instance->meta, "version");
new_version = sub->valuedouble;
if(new_version <= instance->cur_version)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s",
instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff);
cJSON_Delete(instance->meta);
instance->meta = NULL;
goto out_error;
}
instance->new_version = new_version;
instance->req_version = instance->new_version;
instance->statistic.field[DRS_FS_FILED_RES_META] += 1;
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "business: %s, NEW_META found, server: %s, cur_version=%lu, newjson: %s",
instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff);
instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata);
instance->array = cJSON_GetObjectItem(instance->meta, "configs");
instance->array_size = cJSON_GetArraySize(instance->array);
assert(instance->array_size > 0);
easy_string_destroy(&instance->estr);
instance->status = FETCH_STATUS_FILE;
doris_fetch_next_confile_meta(instance);
doris_http_fetch_confile(instance);
return;
out_error:
instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1;
doris_request_restart_timer(instance, instance->param->retry_interval);
easy_string_destroy(&instance->estr);
doris_confile_ctx_destry(&instance->ctx);
if(res_code==304 && instance->cbs.version_updated!=NULL) //°æ±¾ÒÑÍê³Éͬ²½
{
instance->cbs.version_updated(instance, instance->cur_version, instance->cbs.userdata);
}
if(res_code==300 && instance->param->client_sync_on) //·þÎñ¶ËÓаë;Öеİ汾ÉÏ´«
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Server is busy processing version requests, waiting it done...", instance->args.bizname);
}
}
static void doris_http_fetch_meta(struct doris_csum_instance *instance)
{
u_int64_t balance_seed;
struct doris_http_callback curlcbs;
char metauri[128], cur_version[128];
struct timeval tv={10, 0};
balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) |
(((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF);
memset(&curlcbs, 0, sizeof(struct doris_http_callback));
curlcbs.header_cb = doris_http_meta_header_cb;
curlcbs.write_cb = doris_http_meta_body_cb;
curlcbs.transfer_done_cb = doris_http_meta_done_cb;
curlcbs.userp = instance;
instance->array_index = 0;
instance->cur_httpins = instance->httpins_master;
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
{
instance->cur_httpins = instance->httpins_backup1;
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
}
if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
{
instance->cur_httpins = instance->httpins_backup2;
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
}
instance->req_version = instance->cur_version + 1; //Ö»Óа汾¸üгɹ¦ºó£¬cur_version²Å»á¸üÐÂ
if(instance->ctx.httpctx != NULL)
{
if(instance->param->client_sync_on)
{
//½öÏÞÓÚÖ÷´Óͬ²½Ïû·Ñ£¬ÓÃÓÚ±ê¼ÇÊÇ´ÓClientÇëÇó
sprintf(cur_version, "X-Doris-Sync-Current-Version: %lu", instance->cur_version);
doris_http_ctx_add_header(instance->ctx.httpctx, cur_version);
}
snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname);
if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri))
{
instance->status = FETCH_STATUS_META;
instance->statistic.field[DRS_FS_FILED_REQ_META] += 1;
evtimer_add(&instance->ctx.timer_expires, &tv);
}
else
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
doris_confile_ctx_destry(&instance->ctx);
doris_request_restart_timer(instance, instance->param->retry_interval);
}
if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1;
else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1;
}
else
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
doris_request_restart_timer(instance, instance->param->retry_interval);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->req_version);
}
}
static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
switch(instance->status)
{
case FETCH_STATUS_IDLE:
case FETCH_STATUS_META:
doris_http_fetch_meta(instance);
break;
case FETCH_STATUS_FILE:
doris_http_fetch_confile(instance);
break;
default: assert(0);break;
}
}
/*httpsģʽÏ£¬Ê¹ÓÃvalgrindÔËÐУ¬·¢ÏÖ·¢ÆðGETÇëÇóºó£¬done_cbº¯ÊýʼÖÕÎÞ·¨µÃµ½µ÷ÓÃ*/
static void instance_meta_expire_timer_cb(int fd, short kind, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
doris_confile_ctx_destry(&instance->ctx);
doris_http_fetch_meta(instance);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname);
}
void doris_http_head_version_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
const char *pos_colon;
char buffer[64];
int datalen;
if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
{
return ;
}
datalen = pos_colon - start;
switch(datalen)
{
case 16:
if(!strncasecmp(start, "X-Latest-Version:", 17))
{
memcpy(buffer, start+17, bytes-17);
buffer[bytes-17] = '\0';
instance->head_version = atol(buffer);
}
break;
default: break;
}
//check code only once
if(instance->ctx.res_code != 0)
{
return;
}
instance->ctx.res_code = res_code;
assert(res_code != 0);
if(res_code != 200)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, head version failed, server: %s, curlcode = %d",
instance->args.bizname, instance->ctx.server, code);
}
}
void doris_http_head_version_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
evtimer_del(&instance->ctx.timer_expires);
if(res != CURLE_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version failed, server: %s, curlcode = %d, error: %s",
instance->args.bizname, instance->ctx.server, res_code, err);
goto out_herror;
}
if(instance->ctx.res_code != 200 || res_code!=200)
{
goto out_herror;
}
instance->cur_version = instance->head_version + instance->cur_version;
if(instance->cur_version < 0)
{
instance->cur_version = 0;
}
instance->req_version = instance->cur_version + 1; //TODO
evtimer_assign(&instance->timer_fetch, instance->worker_evbase, instance_fetch_cfg_timer_cb, instance);
evtimer_assign(&instance->ctx.timer_expires, instance->worker_evbase, instance_meta_expire_timer_cb, instance);
doris_http_fetch_meta(instance);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Head version succ, server: %s, next request meta of version: %ld",
instance->args.bizname, instance->ctx.server, instance->req_version);
return;
out_herror:
instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1;
doris_request_restart_timer(instance, instance->param->retry_interval);
doris_confile_ctx_destry(&instance->ctx);
}
static void doris_http_head_version(struct doris_csum_instance *instance)
{
u_int64_t balance_seed;
struct doris_http_callback curlcbs;
char metauri[128];
struct timeval tv={10, 0};
balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) |
(((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF);
memset(&curlcbs, 0, sizeof(struct doris_http_callback));
curlcbs.header_cb = doris_http_head_version_header_cb;
curlcbs.write_cb = NULL;
curlcbs.transfer_done_cb = doris_http_head_version_done_cb;
curlcbs.userp = instance;
instance->array_index = 0;
instance->cur_httpins = instance->httpins_master;
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
{
instance->cur_httpins = instance->httpins_backup1;
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
}
if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
{
instance->cur_httpins = instance->httpins_backup2;
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64);
}
if(instance->ctx.httpctx != NULL)
{
snprintf(metauri, 128, "latestversion?business=%s", instance->args.bizname);
if(!doris_http_launch_head_request(instance->ctx.httpctx, metauri))
{
instance->status = FETCH_STATUS_META;
instance->statistic.field[DRS_FS_FILED_REQ_META] += 1;
evtimer_add(&instance->ctx.timer_expires, &tv);
}
else
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
doris_confile_ctx_destry(&instance->ctx);
doris_request_restart_timer(instance, instance->param->retry_interval);
}
if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1;
else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1;
}
else
{
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
doris_request_restart_timer(instance, instance->param->retry_interval);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch version HEAD failed: no active host found");
}
}
static void instance_head_version_timer_cb(int fd, short kind, void *userp)
{
doris_http_head_version((struct doris_csum_instance *)userp);
}
/*httpsģʽÏ£¬Ê¹ÓÃvalgrindÔËÐУ¬·¢ÏÖ·¢ÆðGETÇëÇóºó£¬done_cbº¯ÊýʼÖÕÎÞ·¨µÃµ½µ÷ÓÃ*/
static void instance_version_expire_timer_cb(int fd, short kind, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
doris_confile_ctx_destry(&instance->ctx);
doris_http_head_version(instance);
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33m[Warning]business: %s, launch version-head wired expired, retry....\033[0m", instance->args.bizname);
}
static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp)
{
struct doris_csum_param *param=(struct doris_csum_param *)userp;
struct timeval tv;
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_CNN_SRV], 0, FS_OP_SET, param->param_master->connected_hosts);
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_FAIL_SRV], 0, FS_OP_SET, param->param_master->failed_hosts);
if(param->param_backup1 != NULL)
{
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_CNN_SRV], 0, FS_OP_SET, param->param_backup1->connected_hosts);
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_FAIL_SRV], 0, FS_OP_SET, param->param_backup1->failed_hosts);
}
if(param->param_backup2 != NULL)
{
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_CNN_SRV], 0, FS_OP_SET, param->param_backup2->connected_hosts);
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_FAIL_SRV], 0, FS_OP_SET, param->param_backup2->failed_hosts);
}
FS_passive_output(param->fsstat_handle);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&param->fs_timer_output, &tv);
}
static int doris_client_register_field_stat(struct doris_csum_param *param, void *runtime_log, struct event_base *evbase)
{
const char *field_names[FSSTAT_DORIS_FILED_NUM]={"ReqFail", "ReqMetas", "ResMetas", "ResNoNew", "ReqFiles",
"ResFiles", "ResFrags", "ResFragErr", "ResBytes", "ResVerErr", "ReqBackup1", "ReqBackup2"};
const char *status_names[FSSTAT_DORIS_STATUS_NUM]={"MasSrvCned", "MasSrvFail",
"Bck1SrvCned", "Bck1SrvFail", "Bck2SrvCned", "Bck2SrvFail", "MemoryUsed", "HttpSession"};
struct timeval tv;
int value;
param->fsstat_handle = FS_create_handle();
FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
if(param->fsstat_print_mode == 1)
{
FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
}
else
{
FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
value = 1;
FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
}
value = param->fsstat_period;
FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
value = 0;
FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
for(int i=0; i<FSSTAT_DORIS_FILED_NUM; i++)
{
param->fsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
}
for(int i=0; i<FSSTAT_DORIS_STATUS_NUM; i++)
{
param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
}
FS_start(param->fsstat_handle);
evtimer_assign(&param->fs_timer_output, evbase, doris_client_fs_output_timer_cb, param);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&param->fs_timer_output, &tv);
return 0;
}
u_int32_t doris_csum_param_get_refernces(struct doris_csum_param *param)
{
pthread_mutex_lock(&param->mutex_lock);
u_int32_t references = param->references;
pthread_mutex_unlock(&param->mutex_lock);
return references;
}
void doris_csum_parameter_destroy(struct doris_csum_param *param)
{
evtimer_del(&param->fs_timer_output);
FS_stop(&param->fsstat_handle);
doris_http_parameter_destroy(param->param_master);
if(param->param_backup1 != NULL)
{
doris_http_parameter_destroy(param->param_backup1);
}
if(param->param_backup2 != NULL)
{
doris_http_parameter_destroy(param->param_backup2);
}
free(param);
}
struct doris_csum_param *doris_csum_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
{
struct doris_csum_param *param;
param = (struct doris_csum_param *)calloc(1, sizeof(struct doris_csum_param));
param->manage_evbase = manage_evbase;
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", &param->retry_interval, 10);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", &param->fetch_frag_size, 5242880);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_confile_max_tries", &param->fetch_max_tries, 3);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", &param->client_sync_on, 0);
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsCsmClient");
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client_csm.fs");
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", &param->fsstat_period, 10);
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", &param->fsstat_print_mode, 1);
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1");
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", &param->fsstat_dst_port, 8125);
param->param_master = doris_http_parameter_new(confile, "DORIS_CLIENT.master_server", manage_evbase, runtimelog);
if(param->param_master == NULL)
{
return NULL;
}
if(doris_client_register_field_stat(param, runtimelog, manage_evbase))
{
return NULL;
}
param->param_backup1 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup1_server", manage_evbase, runtimelog);
param->param_backup2 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup2_server", manage_evbase, runtimelog);
pthread_mutex_init(&param->mutex_lock, NULL);
return param;
}
static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
{
struct doris_csum_instance *instance = (struct doris_csum_instance *)userp;
struct timeval tv;
struct doris_csum_statistics incr_statistic;
long long *plast_statistic = (long long*)&instance->statistic_last;
long long *pnow_statistic = (long long*)&instance->statistic;
long long *pinc_statistic = (long long*)&incr_statistic;
long long http_sessions=0;
http_sessions += caculate_http_sessions_sum(instance->httpins_master);
http_sessions += caculate_http_sessions_sum(instance->httpins_backup1);
http_sessions += caculate_http_sessions_sum(instance->httpins_backup2);
instance->statistic.status[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions;
for(u_int32_t i=0; i<sizeof(struct doris_csum_statistics)/sizeof(long long); i++)
{
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
}
instance->statistic_last = instance->statistic;
for(u_int32_t i=0; i<FSSTAT_DORIS_FILED_NUM; i++)
{
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
}
for(u_int32_t i=0; i<FSSTAT_DORIS_STATUS_NUM; i++)
{
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
}
tv.tv_sec = instance->param->fsstat_period;
tv.tv_usec = 0;
event_add(&instance->timer_statistic, &tv);
}
struct doris_csum_param *doris_csum_instance_get_param(struct doris_csum_instance *instance)
{
return instance->param;
}
void doris_csum_instance_destroy(struct doris_csum_instance *instance)
{
pthread_mutex_lock(&instance->param->mutex_lock);
instance->param->references--;
pthread_mutex_unlock(&instance->param->mutex_lock);
evtimer_del(&instance->timer_fetch);
evtimer_del(&instance->timer_statistic);
/*doris_http_instance_destroy(instance->httpins_master);
if(instance->httpins_backup1 != NULL)
{
doris_http_instance_destroy(instance->httpins_backup1);
}
if(instance->httpins_backup2 != NULL)
{
doris_http_instance_destroy(instance->httpins_backup2);
}*/
free(instance);
}
struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *param, struct event_base *worker_evbase,
struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog)
{
struct doris_csum_instance *instance;
struct timeval tv;
instance = (struct doris_csum_instance *)calloc(1, sizeof(struct doris_csum_instance));
instance->param = param;
instance->worker_evbase = worker_evbase;
instance->runtime_log = runtimelog;
instance->cbs = *cbs;
instance->args= *args;
instance->cur_version = args->current_version;
instance->req_version = instance->cur_version + 1; //TODO
instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog);
if(instance->httpins_master == NULL)
{
return NULL;
}
srand((int64_t)param);
if(param->param_backup1 != NULL)
{
instance->httpins_backup1 = doris_http_instance_new(param->param_backup1, worker_evbase, runtimelog);
}
if(param->param_backup2 != NULL)
{
instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog);
}
pthread_mutex_lock(&param->mutex_lock);
param->references++;
pthread_mutex_unlock(&param->mutex_lock);
if(instance->cur_version >= 0)
{
evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance);
evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance);
tv.tv_sec = 3;
tv.tv_usec = 0;
evtimer_add(&instance->timer_fetch, &tv);
}
else
{
evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_version_expire_timer_cb, instance);
evtimer_assign(&instance->timer_fetch, worker_evbase, instance_head_version_timer_cb, instance);
tv.tv_sec = 3;
tv.tv_usec = 0;
evtimer_add(&instance->timer_fetch, &tv);
}
evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&instance->timer_statistic, &tv);
return instance;
}