642 lines
22 KiB
C++
642 lines
22 KiB
C++
#include <sys/types.h>
|
||
#include <sys/stat.h>
|
||
#include <unistd.h>
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <assert.h>
|
||
#include <errno.h>
|
||
#include <sys/time.h>
|
||
#include <time.h>
|
||
#include <string.h>
|
||
#include <openssl/sha.h>
|
||
#include <openssl/md5.h>
|
||
|
||
#include <MESA/MESA_prof_load.h>
|
||
|
||
#include "doris_client_fetch.h"
|
||
|
||
static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size)
|
||
{
|
||
unsigned char md5[17]={0};
|
||
int i;
|
||
|
||
if(MD5_Final(md5, c) != 1)
|
||
{
|
||
return -1;
|
||
}
|
||
if(size < 33)
|
||
return -1;
|
||
|
||
for(i=0; i<16; i++)
|
||
{
|
||
sprintf(result + i*2, "%02x", md5[i]);
|
||
}
|
||
result[32] = '\0';
|
||
return 0;
|
||
}
|
||
|
||
void easy_string_destroy(struct easy_string *estr)
|
||
{
|
||
if(estr->buff != NULL)
|
||
{
|
||
free(estr->buff);
|
||
estr->buff = NULL;
|
||
estr->len = estr->size = 0;
|
||
}
|
||
}
|
||
|
||
void easy_string_savedata(struct easy_string *estr, const char *data, size_t len)
|
||
{
|
||
if(estr->size-estr->len < len+1)
|
||
{
|
||
estr->size += len*4+1;
|
||
estr->buff = (char*)realloc(estr->buff, estr->size);
|
||
}
|
||
|
||
memcpy(estr->buff+estr->len, data, len);
|
||
estr->len += len;
|
||
estr->buff[estr->len]='\0';
|
||
}
|
||
|
||
void doris_confile_ctx_reset(struct doris_confile_ctx *ctx)
|
||
{
|
||
struct doris_http_ctx *httpctx=ctx->httpctx;
|
||
memset(ctx, 0, sizeof(struct doris_confile_ctx));
|
||
ctx->httpctx = httpctx;
|
||
}
|
||
|
||
void doris_confile_ctx_destry(struct doris_confile_ctx *ctx)
|
||
{
|
||
doris_confile_ctx_reset(ctx);
|
||
doris_http_ctx_destroy(ctx->httpctx);
|
||
ctx->httpctx = NULL;
|
||
}
|
||
|
||
void doris_update_new_version(struct doris_instance *instance)
|
||
{
|
||
instance->cur_version = instance->new_version;
|
||
}
|
||
|
||
void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s)
|
||
{
|
||
struct timeval tv;
|
||
|
||
tv.tv_sec = wait_s;
|
||
tv.tv_usec = 0;
|
||
event_add(&instance->timer_fetch, &tv);
|
||
}
|
||
|
||
void doris_fetch_next_confile_meta(struct doris_instance *instance)
|
||
{
|
||
cJSON *cur_a_item, *sub;
|
||
|
||
memset(&instance->curmeta, 0, sizeof(struct fetch_file_meta));
|
||
|
||
cur_a_item = cJSON_GetArrayItem(instance->array, instance->array_index);
|
||
instance->array_index++;
|
||
|
||
sub = cJSON_GetObjectItem(cur_a_item, "tablename");
|
||
instance->curmeta.table_name = sub->valuestring;
|
||
|
||
sub = cJSON_GetObjectItem(cur_a_item, "size");
|
||
instance->curmeta.size = sub->valuedouble;
|
||
|
||
sub = cJSON_GetObjectItem(cur_a_item, "cfg_num");
|
||
instance->curmeta.cfg_num = sub->valueint;
|
||
|
||
if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5")))
|
||
{
|
||
instance->curmeta.validate_md5 = 1;
|
||
snprintf(instance->curmeta.md5str, 36, "%s", sub->valuestring);
|
||
}
|
||
else
|
||
{
|
||
instance->curmeta.validate_md5 = 0;
|
||
}
|
||
}
|
||
|
||
void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
const char *pos_colon;
|
||
size_t datalen;
|
||
char buffer[64];
|
||
int ret;
|
||
|
||
if(instance->ctx.res_code == 0) //check code only once
|
||
{
|
||
instance->ctx.res_code = res_code;
|
||
assert(res_code != 0);
|
||
|
||
if(res_code != 200 && res_code!=206)
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d",
|
||
instance->curmeta.table_name, instance->req_version, code);
|
||
return;
|
||
}
|
||
instance->retry_times = 0;
|
||
if(instance->curmeta.curoffset == 0)
|
||
{
|
||
instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name,
|
||
instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata);
|
||
MD5_Init(&instance->ctx.md5ctx);
|
||
}
|
||
}
|
||
|
||
if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
|
||
{
|
||
return ;
|
||
}
|
||
datalen = pos_colon - start;
|
||
switch(datalen)
|
||
{
|
||
case 14:
|
||
if(!strncasecmp(start, "Content-Length:", 15))
|
||
{
|
||
memcpy(buffer, start+15, bytes-15);
|
||
buffer[bytes-15] = '\0';
|
||
instance->ctx.contlength = atol(buffer);
|
||
}
|
||
break;
|
||
case 13:
|
||
if(!strncasecmp(start, "Content-Range:", 14))
|
||
{
|
||
memcpy(buffer, start+13, bytes-13);
|
||
buffer[bytes-13] = '\0';
|
||
ret = sscanf(buffer, "%*[^0-9]%lu-%lu/%lu", &instance->ctx.contl_start, &instance->ctx.contl_end, &instance->ctx.contl_total);
|
||
assert(ret == 3 && instance->ctx.contl_total == instance->curmeta.size && instance->ctx.contl_start==instance->curmeta.curoffset);
|
||
}
|
||
break;
|
||
default: break;
|
||
}
|
||
}
|
||
|
||
void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
|
||
if(code!=CURLE_OK || (instance->ctx.res_code!=200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206))
|
||
{
|
||
return;
|
||
}
|
||
|
||
instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata);
|
||
MD5_Update(&instance->ctx.md5ctx, ptr, bytes);
|
||
instance->curmeta.curoffset += bytes;
|
||
instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes;
|
||
}
|
||
|
||
void doris_http_fetch_confile(struct doris_instance *instance);
|
||
void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
char md5buffer[64];
|
||
bool direct_fail=false;
|
||
|
||
if(res!=CURLE_OK)
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s",
|
||
instance->curmeta.table_name, instance->req_version, res_code, err);
|
||
goto out_error;
|
||
}
|
||
|
||
if((instance->ctx.res_code != 200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206))
|
||
{
|
||
goto out_error;
|
||
}
|
||
|
||
if(instance->ctx.contl_total != 0)
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu",
|
||
instance->curmeta.table_name, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total);
|
||
}
|
||
else
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu",
|
||
instance->curmeta.table_name, instance->req_version, instance->ctx.contlength, instance->curmeta.size);
|
||
}
|
||
|
||
instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1;
|
||
if(instance->curmeta.curoffset >= instance->curmeta.size) //<2F><><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||
{
|
||
doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64);
|
||
if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer))
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s",
|
||
instance->curmeta.table_name, instance->req_version, md5buffer, instance->curmeta.md5str);
|
||
direct_fail=true;goto out_md5;
|
||
}
|
||
else
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.010%lu over, md5: %s",
|
||
instance->curmeta.table_name, instance->req_version, md5buffer);
|
||
}
|
||
instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1;
|
||
instance->param->cbs.cfgfile_finish(instance, md5buffer, instance->param->cbs.userdata);
|
||
if(instance->array_index == instance->array_size)
|
||
{
|
||
instance->param->cbs.version_finish(instance, instance->param->cbs.userdata);
|
||
instance->status = FETCH_STATUS_META;
|
||
doris_update_new_version(instance);
|
||
cJSON_Delete(instance->meta);
|
||
doris_confile_ctx_destry(&instance->ctx);
|
||
doris_request_restart_timer(instance, 0);
|
||
}
|
||
else
|
||
{
|
||
doris_fetch_next_confile_meta(instance);
|
||
doris_http_fetch_confile(instance);
|
||
}
|
||
}
|
||
else
|
||
{
|
||
doris_http_fetch_confile(instance);
|
||
}
|
||
return;
|
||
|
||
out_error:
|
||
if(instance->ctx.res_code == 404) //404Ӧ<34><D3A6><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>¿<EFBFBD>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD>
|
||
{
|
||
direct_fail = true;
|
||
}
|
||
else
|
||
{
|
||
instance->retry_times++;
|
||
}
|
||
out_md5:
|
||
instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1;
|
||
if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail)
|
||
{
|
||
instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1;
|
||
instance->param->cbs.version_error(instance, instance->param->cbs.userdata);
|
||
instance->retry_times = 0;
|
||
instance->status = FETCH_STATUS_META;
|
||
cJSON_Delete(instance->meta);
|
||
doris_confile_ctx_destry(&instance->ctx);
|
||
}
|
||
doris_request_restart_timer(instance, instance->param->retry_interval);
|
||
}
|
||
|
||
void doris_http_fetch_confile(struct doris_instance *instance)
|
||
{
|
||
struct doris_http_callback curlcbs;
|
||
char metauri[128], range[64]={0};
|
||
|
||
curlcbs.header_cb = doris_http_confile_header_cb;
|
||
curlcbs.write_cb = doris_http_confile_body_cb;
|
||
curlcbs.transfer_done_cb = doris_http_confile_done_cb;
|
||
curlcbs.userp = instance;
|
||
doris_confile_ctx_reset(&instance->ctx);
|
||
doris_http_ctx_reset(instance->ctx.httpctx, &curlcbs);
|
||
|
||
//<2F><><EFBFBD><EFBFBD><EFBFBD>ļ<EFBFBD><C4BC>ֶ<EFBFBD><D6B6><EFBFBD><EFBFBD>أ<EFBFBD><D8A3>ϴ<EFBFBD>δ<EFBFBD><CEB4><EFBFBD>ɵļ<C9B5><C4BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||
if((instance->curmeta.size > instance->param->fetch_frag_size) || instance->curmeta.curoffset!=0)
|
||
{
|
||
sprintf(range, "Range: bytes=%lu-%lu", instance->curmeta.curoffset, instance->curmeta.curoffset + instance->param->fetch_frag_size - 1);
|
||
doris_http_ctx_add_header(instance->ctx.httpctx, range);
|
||
}
|
||
|
||
snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->req_version, instance->param->args.businessid);
|
||
if(doris_http_launch_get_request(instance->ctx.httpctx, metauri))
|
||
{
|
||
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
|
||
doris_request_restart_timer(instance, instance->param->retry_interval);
|
||
}
|
||
else
|
||
{
|
||
instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1;
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s",
|
||
instance->curmeta.table_name, instance->req_version, range);
|
||
}
|
||
}
|
||
|
||
void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
|
||
//check code only once
|
||
if(instance->ctx.res_code != 0)
|
||
{
|
||
return;
|
||
}
|
||
instance->ctx.res_code = res_code;
|
||
assert(res_code != 0);
|
||
|
||
if(res_code != 200)
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d",
|
||
instance->cur_version, instance->req_version, code);
|
||
}
|
||
}
|
||
|
||
void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
|
||
if(code!=CURLE_OK || res_code!=200)
|
||
{
|
||
return;
|
||
}
|
||
|
||
easy_string_savedata(&instance->estr, (const char*)ptr, bytes);
|
||
}
|
||
|
||
void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
cJSON *sub;
|
||
|
||
if(res!=CURLE_OK)
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Request meta failed, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s",
|
||
instance->cur_version, instance->req_version, res_code, err);
|
||
goto out_error;
|
||
}
|
||
|
||
if(instance->ctx.res_code != 200 || res_code!=200)
|
||
{
|
||
goto out_error;
|
||
}
|
||
|
||
instance->meta = cJSON_Parse(instance->estr.buff);
|
||
if(instance->meta == NULL)
|
||
{
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->req_version, instance->estr.buff);
|
||
goto out_error;
|
||
}
|
||
sub = cJSON_GetObjectItem(instance->meta, "version");
|
||
instance->new_version = sub->valuedouble;
|
||
instance->req_version = instance->new_version;
|
||
instance->statistic.field[DRS_FS_FILED_RES_META] += 1;
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s",
|
||
instance->cur_version, instance->estr.buff);
|
||
|
||
instance->param->cbs.version_start(instance, instance->meta, instance->param->cbs.userdata);
|
||
instance->array = cJSON_GetObjectItem(instance->meta, "configs");
|
||
instance->array_size = cJSON_GetArraySize(instance->array);
|
||
assert(instance->array_size > 0);
|
||
|
||
easy_string_destroy(&instance->estr);
|
||
instance->status = FETCH_STATUS_FILE;
|
||
doris_fetch_next_confile_meta(instance);
|
||
doris_http_fetch_confile(instance);
|
||
return;
|
||
|
||
out_error:
|
||
instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1;
|
||
doris_request_restart_timer(instance, instance->param->retry_interval);
|
||
easy_string_destroy(&instance->estr);
|
||
doris_confile_ctx_destry(&instance->ctx);
|
||
}
|
||
|
||
static void doris_http_fetch_meta(struct doris_instance *instance)
|
||
{
|
||
u_int64_t balance_seed;
|
||
struct doris_http_callback curlcbs;
|
||
char metauri[128];
|
||
|
||
balance_seed = (((u_int64_t)rand()) << 32) | rand();
|
||
|
||
memset(&curlcbs, 0, sizeof(struct doris_http_callback));
|
||
curlcbs.header_cb = doris_http_meta_header_cb;
|
||
curlcbs.write_cb = doris_http_meta_body_cb;
|
||
curlcbs.transfer_done_cb = doris_http_meta_done_cb;
|
||
curlcbs.userp = instance;
|
||
|
||
instance->array_index = 0;
|
||
instance->cur_httpins = instance->httpins_master;
|
||
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
|
||
if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL)
|
||
{
|
||
instance->cur_httpins = instance->httpins_backup1;
|
||
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
|
||
}
|
||
if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL)
|
||
{
|
||
instance->cur_httpins = instance->httpins_backup2;
|
||
instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed);
|
||
}
|
||
|
||
instance->req_version = instance->cur_version + 1; //ֻ<>а汾<D0B0><E6B1BE><EFBFBD>³ɹ<C2B3><C9B9><EFBFBD><EFBFBD><EFBFBD>cur_version<6F>Ż<EFBFBD><C5BB><EFBFBD><EFBFBD><EFBFBD>
|
||
if(instance->ctx.httpctx != NULL)
|
||
{
|
||
snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->req_version, instance->param->args.businessid);
|
||
if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri))
|
||
{
|
||
instance->status = FETCH_STATUS_META;
|
||
instance->statistic.field[DRS_FS_FILED_REQ_META] += 1;
|
||
}
|
||
else
|
||
{
|
||
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
|
||
doris_confile_ctx_destry(&instance->ctx);
|
||
doris_request_restart_timer(instance, instance->param->retry_interval);
|
||
}
|
||
if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1;
|
||
else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1;
|
||
}
|
||
else
|
||
{
|
||
instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1;
|
||
doris_request_restart_timer(instance, instance->param->retry_interval);
|
||
MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->req_version);
|
||
}
|
||
}
|
||
|
||
static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
|
||
switch(instance->status)
|
||
{
|
||
case FETCH_STATUS_IDLE:
|
||
case FETCH_STATUS_META:
|
||
doris_http_fetch_meta(instance);
|
||
break;
|
||
|
||
case FETCH_STATUS_FILE:
|
||
doris_http_fetch_confile(instance);
|
||
break;
|
||
default: assert(0);break;
|
||
}
|
||
}
|
||
|
||
static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp)
|
||
{
|
||
struct doris_parameter *param=(struct doris_parameter *)userp;
|
||
struct timeval tv;
|
||
|
||
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_CNN_SRV], 0, FS_OP_SET, param->param_master->connected_hosts);
|
||
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_FAIL_SRV], 0, FS_OP_SET, param->param_master->failed_hosts);
|
||
if(param->param_backup1 != NULL)
|
||
{
|
||
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_CNN_SRV], 0, FS_OP_SET, param->param_backup1->connected_hosts);
|
||
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_FAIL_SRV], 0, FS_OP_SET, param->param_backup1->failed_hosts);
|
||
}
|
||
if(param->param_backup2 != NULL)
|
||
{
|
||
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_CNN_SRV], 0, FS_OP_SET, param->param_backup2->connected_hosts);
|
||
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_FAIL_SRV], 0, FS_OP_SET, param->param_backup2->failed_hosts);
|
||
}
|
||
FS_passive_output(param->fsstat_handle);
|
||
tv.tv_sec = param->fsstat_period;
|
||
tv.tv_usec = 0;
|
||
evtimer_add(¶m->fs_timer_output, &tv);
|
||
}
|
||
|
||
static int doris_client_register_field_stat(struct doris_parameter *param, void *runtime_log, struct event_base *evbase)
|
||
{
|
||
const char *field_names[FSSTAT_DORIS_FILED_NUM]={"ReqFail", "ReqMetas", "ResMetas", "ResNoNew", "ReqFiles",
|
||
"ResFiles", "ResFrags", "ResFragErr", "ResBytes", "ResVerErr", "ReqBackup1", "ReqBackup2"};
|
||
const char *status_names[FSSTAT_DORIS_STATUS_NUM]={"MasSrvCned", "MasSrvFail",
|
||
"Bck1SrvCned", "Bck1SrvFail", "Bck2SrvCned", "Bck2SrvFail", "MemoryUsed", "HttpSession"};
|
||
struct timeval tv;
|
||
int value;
|
||
|
||
param->fsstat_handle = FS_create_handle();
|
||
FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
|
||
if(param->fsstat_print_mode == 1)
|
||
{
|
||
FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode));
|
||
}
|
||
else
|
||
{
|
||
FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode));
|
||
value = 1;
|
||
FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
|
||
}
|
||
value = param->fsstat_period;
|
||
FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
|
||
value = 0;
|
||
FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
|
||
FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
|
||
FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
|
||
FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port));
|
||
|
||
for(int i=0; i<FSSTAT_DORIS_FILED_NUM; i++)
|
||
{
|
||
param->fsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
|
||
}
|
||
for(int i=0; i<FSSTAT_DORIS_STATUS_NUM; i++)
|
||
{
|
||
param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
|
||
}
|
||
FS_start(param->fsstat_handle);
|
||
|
||
evtimer_assign(¶m->fs_timer_output, evbase, doris_client_fs_output_timer_cb, param);
|
||
tv.tv_sec = param->fsstat_period;
|
||
tv.tv_usec = 0;
|
||
evtimer_add(¶m->fs_timer_output, &tv);
|
||
return 0;
|
||
}
|
||
|
||
struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs,
|
||
struct doris_arguments *args, void *runtimelog)
|
||
{
|
||
struct doris_parameter *param;
|
||
|
||
param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter));
|
||
param->manage_evbase = manage_evbase;
|
||
param->cbs = *cbs;
|
||
param->args= *args;
|
||
|
||
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", ¶m->retry_interval, 10);
|
||
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", ¶m->fetch_frag_size, 5242880);
|
||
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_confile_max_tries", ¶m->fetch_max_tries, 3);
|
||
|
||
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DorisClient");
|
||
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client.fs");
|
||
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", ¶m->fsstat_period, 10);
|
||
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", ¶m->fsstat_print_mode, 1);
|
||
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1");
|
||
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", ¶m->fsstat_dst_port, 8125);
|
||
|
||
param->param_master = doris_http_parameter_new(confile, "DORIS_CLIENT.master_server", manage_evbase, runtimelog);
|
||
if(param->param_master == NULL)
|
||
{
|
||
return NULL;
|
||
}
|
||
if(doris_client_register_field_stat(param, runtimelog, manage_evbase))
|
||
{
|
||
return NULL;
|
||
}
|
||
param->param_backup1 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup1_server", manage_evbase, runtimelog);
|
||
param->param_backup2 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup2_server", manage_evbase, runtimelog);
|
||
return param;
|
||
}
|
||
|
||
static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp)
|
||
{
|
||
struct doris_instance *instance = (struct doris_instance *)userp;
|
||
struct timeval tv;
|
||
struct doris_statistics incr_statistic;
|
||
long long *plast_statistic = (long long*)&instance->statistic_last;
|
||
long long *pnow_statistic = (long long*)&instance->statistic;
|
||
long long *pinc_statistic = (long long*)&incr_statistic;
|
||
long long http_sessions=0;
|
||
|
||
http_sessions += caculate_http_sessions_sum(instance->httpins_master);
|
||
http_sessions += caculate_http_sessions_sum(instance->httpins_backup1);
|
||
http_sessions += caculate_http_sessions_sum(instance->httpins_backup2);
|
||
instance->statistic.field[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions;
|
||
|
||
for(u_int32_t i=0; i<sizeof(struct doris_statistics)/sizeof(long long); i++)
|
||
{
|
||
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
|
||
}
|
||
instance->statistic_last = instance->statistic;
|
||
|
||
for(u_int32_t i=0; i<FSSTAT_DORIS_FILED_NUM; i++)
|
||
{
|
||
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
|
||
}
|
||
for(u_int32_t i=0; i<FSSTAT_DORIS_STATUS_NUM; i++)
|
||
{
|
||
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
|
||
}
|
||
tv.tv_sec = instance->param->fsstat_period;
|
||
tv.tv_usec = 0;
|
||
event_add(&instance->timer_statistic, &tv);
|
||
}
|
||
|
||
struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog)
|
||
{
|
||
struct doris_instance *instance;
|
||
struct timeval tv;
|
||
|
||
instance = (struct doris_instance *)calloc(1, sizeof(struct doris_instance));
|
||
instance->param = param;
|
||
instance->worker_evbase = worker_evbase;
|
||
instance->runtime_log = runtimelog;
|
||
instance->cur_version = param->args.current_version;
|
||
instance->req_version = instance->cur_version + 1; //TODO
|
||
|
||
instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog);
|
||
if(instance->httpins_master == NULL)
|
||
{
|
||
return NULL;
|
||
}
|
||
srand(time(NULL));
|
||
|
||
if(param->param_backup1 != NULL)
|
||
{
|
||
instance->httpins_backup1 = doris_http_instance_new(param->param_backup1, worker_evbase, runtimelog);
|
||
}
|
||
if(param->param_backup2 != NULL)
|
||
{
|
||
instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog);
|
||
}
|
||
|
||
evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance);
|
||
tv.tv_sec = param->fsstat_period;
|
||
tv.tv_usec = 0;
|
||
evtimer_add(&instance->timer_statistic, &tv);
|
||
|
||
evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance);
|
||
tv.tv_sec = 3;
|
||
tv.tv_usec = 0;
|
||
evtimer_add(&instance->timer_fetch, &tv);
|
||
return instance;
|
||
}
|
||
|