#include #include #include #include #include #include #include #include #include #include #include #include #include #include "doris_client_fetch.h" static int doris_md5_final_string(MD5_CTX *c, char *result, unsigned int size) { unsigned char md5[17]={0}; int i; if(MD5_Final(md5, c) != 1) { return -1; } if(size < 33) return -1; for(i=0; i<16; i++) { sprintf(result + i*2, "%02x", md5[i]); } result[32] = '\0'; return 0; } void doris_confile_ctx_reset(struct doris_confile_ctx *ctx) { //其他保持不变 ctx->res_code = 0; ctx->contlength = 0; ctx->contl_start = 0; ctx->contl_end = 0; ctx->contl_total = 0; } void doris_confile_ctx_destry(struct doris_confile_ctx *ctx) { doris_confile_ctx_reset(ctx); doris_http_ctx_destroy(ctx->httpctx); ctx->httpctx = NULL; } void doris_update_new_version(struct doris_csum_instance *instance) { instance->cur_version = instance->new_version; } void doris_request_restart_timer(struct doris_csum_instance *instance, time_t wait_s) { struct timeval tv; tv.tv_sec = wait_s; tv.tv_usec = 0; event_add(&instance->timer_fetch, &tv); } void doris_fetch_next_confile_meta(struct doris_csum_instance *instance) { cJSON *cur_a_item, *sub; memset(&instance->curmeta, 0, sizeof(struct fetch_file_meta)); cur_a_item = cJSON_GetArrayItem(instance->array, instance->array_index); instance->array_index++; sub = cJSON_GetObjectItem(cur_a_item, "tablename"); instance->curmeta.meta.tablename = sub->valuestring; sub = cJSON_GetObjectItem(cur_a_item, "filename"); instance->curmeta.meta.filename = sub->valuestring; sub = cJSON_GetObjectItem(cur_a_item, "size"); instance->curmeta.meta.size = sub->valuedouble; sub = cJSON_GetObjectItem(cur_a_item, "cfg_num"); instance->curmeta.meta.cfgnum = sub->valueint; sub = cJSON_GetObjectItem(cur_a_item, "user_region"); instance->curmeta.meta.userregion = (sub==NULL)?NULL:sub->valuestring; if(NULL != (sub = cJSON_GetObjectItem(cur_a_item, "md5"))) { instance->curmeta.validate_md5 = 1; snprintf(instance->curmeta.md5str, 36, "%s", sub->valuestring); } else { instance->curmeta.validate_md5 = 0; } } void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; const char *pos_colon; size_t datalen; char buffer[64]; int ret; if(instance->ctx.res_code == 0) //check code only once { instance->ctx.res_code = res_code; assert(res_code != 0); if(res_code != 200 && res_code!=206) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d", instance->curmeta.meta.tablename, instance->req_version, code); return; } instance->retry_times = 0; if(instance->curmeta.curoffset == 0) { instance->cbs.cfgfile_start(instance, &instance->curmeta.meta, NULL, instance->cbs.userdata); MD5_Init(&instance->ctx.md5ctx); } } if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL) { return ; } datalen = pos_colon - start; switch(datalen) { case 14: if(!strncasecmp(start, "Content-Length:", 15)) { memcpy(buffer, start+15, bytes-15); buffer[bytes-15] = '\0'; instance->ctx.contlength = atol(buffer); } break; case 13: if(!strncasecmp(start, "Content-Range:", 14)) { memcpy(buffer, start+13, bytes-13); buffer[bytes-13] = '\0'; ret = sscanf(buffer, "%*[^0-9]%lu-%lu/%lu", &instance->ctx.contl_start, &instance->ctx.contl_end, &instance->ctx.contl_total); assert(ret == 3 && instance->ctx.contl_total == instance->curmeta.meta.size && instance->ctx.contl_start==instance->curmeta.curoffset); } break; default: break; } } void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; if(code!=CURLE_OK || (instance->ctx.res_code!=200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206)) { return; } instance->cbs.cfgfile_update(instance, ptr, bytes, instance->cbs.userdata); MD5_Update(&instance->ctx.md5ctx, ptr, bytes); instance->curmeta.curoffset += bytes; instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; } void doris_http_fetch_confile(struct doris_csum_instance *instance); void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; char md5buffer[64]; bool direct_fail=false; if(res!=CURLE_OK) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", instance->curmeta.meta.tablename, instance->req_version, res_code, err); goto out_error; } if((instance->ctx.res_code != 200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206)) { goto out_error; } if(instance->ctx.contl_total != 0) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", instance->curmeta.meta.tablename, instance->req_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", instance->curmeta.meta.tablename, instance->req_version, instance->ctx.contlength, instance->curmeta.meta.size); } instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1; if(instance->curmeta.curoffset >= instance->curmeta.meta.size) //该文件下载完毕 { doris_md5_final_string(&instance->ctx.md5ctx, md5buffer, 64); if(instance->curmeta.validate_md5 && strcasecmp(instance->curmeta.md5str, md5buffer)) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s over, version=%lu, md5 validate fail, real: %s, expect: %s", instance->curmeta.meta.tablename, instance->req_version, md5buffer, instance->curmeta.md5str); direct_fail=true;goto out_md5; } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s.%010lu over, md5: %s", instance->curmeta.meta.tablename, instance->req_version, md5buffer); } instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; instance->cbs.cfgfile_finish(instance, md5buffer, instance->cbs.userdata); if(instance->array_index == instance->array_size) { instance->cbs.version_finish(instance, instance->cbs.userdata); instance->status = FETCH_STATUS_META; doris_update_new_version(instance); cJSON_Delete(instance->meta); doris_confile_ctx_destry(&instance->ctx); doris_request_restart_timer(instance, 0); } else { doris_fetch_next_confile_meta(instance); doris_http_fetch_confile(instance); } } else { doris_http_fetch_confile(instance); } return; out_error: if(instance->ctx.res_code == 404) //404应答的重新开始请求 { direct_fail = true; } else { instance->retry_times++; } out_md5: instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1; if(instance->retry_times >= instance->param->fetch_max_tries || direct_fail) { instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1; instance->cbs.version_error(instance, instance->cbs.userdata); instance->retry_times = 0; instance->status = FETCH_STATUS_META; cJSON_Delete(instance->meta); doris_confile_ctx_destry(&instance->ctx); } doris_request_restart_timer(instance, instance->param->retry_interval); } void doris_http_fetch_confile(struct doris_csum_instance *instance) { struct doris_http_callback curlcbs; char metauri[128], range[64]={0}; curlcbs.header_cb = doris_http_confile_header_cb; curlcbs.write_cb = doris_http_confile_body_cb; curlcbs.transfer_done_cb = doris_http_confile_done_cb; curlcbs.userp = instance; doris_confile_ctx_reset(&instance->ctx); doris_http_ctx_reset(instance->ctx.httpctx, &curlcbs); //超大文件分段下载;上次未完成的继续下载 if((instance->curmeta.meta.size > instance->param->fetch_frag_size) || instance->curmeta.curoffset!=0) { sprintf(range, "Range: bytes=%lu-%lu", instance->curmeta.curoffset, instance->curmeta.curoffset + instance->param->fetch_frag_size - 1); doris_http_ctx_add_header(instance->ctx.httpctx, range); } snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&business=%s", instance->curmeta.meta.tablename, instance->req_version, instance->args.bizname); if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); } else { instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Launch confile %s GET, req_version=%lu, %s", instance->curmeta.meta.tablename, instance->req_version, range); } } void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; //check code only once if(instance->ctx.res_code != 0) { return; } instance->ctx.res_code = res_code; assert(res_code != 0); if(res_code != 200) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "business: %s, No new meta found, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d", instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, code); } } void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; if(code!=CURLE_OK || res_code!=200) { return; } easy_string_savedata(&instance->estr, (const char*)ptr, bytes); } void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; cJSON *sub; int64_t new_version; evtimer_del(&instance->ctx.timer_expires); if(res!=CURLE_OK) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Request meta failed, server: %s, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", instance->args.bizname, instance->ctx.server, instance->cur_version, instance->req_version, res_code, err); goto out_error; } if(instance->ctx.res_code != 200 || res_code!=200) { goto out_error; } instance->meta = cJSON_Parse(instance->estr.buff); if(instance->meta == NULL) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Parse meta failed, server: %s, req_version=%lu, invalid json: %s", instance->args.bizname, instance->ctx.server, instance->req_version, instance->estr.buff); goto out_error; } sub = cJSON_GetObjectItem(instance->meta, "version"); new_version = sub->valuedouble; if(new_version <= instance->cur_version) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, An older version received, abandon it. server: %s, cur_version=%lu, invalid json: %s", instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff); cJSON_Delete(instance->meta); instance->meta = NULL; goto out_error; } instance->new_version = new_version; instance->req_version = instance->new_version; instance->statistic.field[DRS_FS_FILED_RES_META] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "business: %s, NEW_META found, server: %s, cur_version=%lu, newjson: %s", instance->args.bizname, instance->ctx.server, instance->cur_version, instance->estr.buff); instance->cbs.version_start(instance, instance->meta, instance->cbs.userdata); instance->array = cJSON_GetObjectItem(instance->meta, "configs"); instance->array_size = cJSON_GetArraySize(instance->array); assert(instance->array_size > 0); easy_string_destroy(&instance->estr); instance->status = FETCH_STATUS_FILE; doris_fetch_next_confile_meta(instance); doris_http_fetch_confile(instance); return; out_error: instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); easy_string_destroy(&instance->estr); doris_confile_ctx_destry(&instance->ctx); if(res_code==304 && instance->cbs.version_updated!=NULL) //版本已完成同步 { instance->cbs.version_updated(instance, instance->cbs.userdata); } if(res_code==300 && instance->param->client_sync_on) //服务端有半途中的版本上传 { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "business: %s, Server is busy processing version requests, waiting it done...", instance->args.bizname); } } static void doris_http_fetch_meta(struct doris_csum_instance *instance) { u_int64_t balance_seed; struct doris_http_callback curlcbs; char metauri[128], cur_version[128]; struct timeval tv={10, 0}; balance_seed = (((u_int64_t)rand()&0xFFFF) << 48) | (((u_int64_t)rand()&0xFFFF) << 32) | (((u_int64_t)rand()&0xFFFF) << 16) | ((u_int64_t)rand()&0xFFFF); memset(&curlcbs, 0, sizeof(struct doris_http_callback)); curlcbs.header_cb = doris_http_meta_header_cb; curlcbs.write_cb = doris_http_meta_body_cb; curlcbs.transfer_done_cb = doris_http_meta_done_cb; curlcbs.userp = instance; instance->array_index = 0; instance->cur_httpins = instance->httpins_master; instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL) { instance->cur_httpins = instance->httpins_backup1; instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); } if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL) { instance->cur_httpins = instance->httpins_backup2; instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed, instance->ctx.server, 64); } instance->req_version = instance->cur_version + 1; //只有版本更新成功后,cur_version才会更新 if(instance->ctx.httpctx != NULL) { if(instance->param->client_sync_on) { //仅限于主从同步消费,用于标记是从Client请求 sprintf(cur_version, "X-Doris-Sync-Current-Version: %lu", instance->cur_version); doris_http_ctx_add_header(instance->ctx.httpctx, cur_version); } snprintf(metauri, 128, "configmeta?version=%lu&business=%s", instance->req_version, instance->args.bizname); if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->status = FETCH_STATUS_META; instance->statistic.field[DRS_FS_FILED_REQ_META] += 1; evtimer_add(&instance->ctx.timer_expires, &tv); } else { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_confile_ctx_destry(&instance->ctx); doris_request_restart_timer(instance, instance->param->retry_interval); } if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1; else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1; } else { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->req_version); } } static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; switch(instance->status) { case FETCH_STATUS_IDLE: case FETCH_STATUS_META: doris_http_fetch_meta(instance); break; case FETCH_STATUS_FILE: doris_http_fetch_confile(instance); break; default: assert(0);break; } } /*https模式下,使用valgrind运行,发现发起GET请求后,done_cb函数始终无法得到调用*/ static void instance_meta_expire_timer_cb(int fd, short kind, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; doris_confile_ctx_destry(&instance->ctx); doris_http_fetch_meta(instance); MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "\033[33mbusiness: %s, launch meta-get wired expired, retry....\033[0m", instance->args.bizname); } static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) { struct doris_csum_param *param=(struct doris_csum_param *)userp; struct timeval tv; FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_CNN_SRV], 0, FS_OP_SET, param->param_master->connected_hosts); FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_FAIL_SRV], 0, FS_OP_SET, param->param_master->failed_hosts); if(param->param_backup1 != NULL) { FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_CNN_SRV], 0, FS_OP_SET, param->param_backup1->connected_hosts); FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_FAIL_SRV], 0, FS_OP_SET, param->param_backup1->failed_hosts); } if(param->param_backup2 != NULL) { FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_CNN_SRV], 0, FS_OP_SET, param->param_backup2->connected_hosts); FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_FAIL_SRV], 0, FS_OP_SET, param->param_backup2->failed_hosts); } FS_passive_output(param->fsstat_handle); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(¶m->fs_timer_output, &tv); } static int doris_client_register_field_stat(struct doris_csum_param *param, void *runtime_log, struct event_base *evbase) { const char *field_names[FSSTAT_DORIS_FILED_NUM]={"ReqFail", "ReqMetas", "ResMetas", "ResNoNew", "ReqFiles", "ResFiles", "ResFrags", "ResFragErr", "ResBytes", "ResVerErr", "ReqBackup1", "ReqBackup2"}; const char *status_names[FSSTAT_DORIS_STATUS_NUM]={"MasSrvCned", "MasSrvFail", "Bck1SrvCned", "Bck1SrvFail", "Bck2SrvCned", "Bck2SrvFail", "MemoryUsed", "HttpSession"}; struct timeval tv; int value; param->fsstat_handle = FS_create_handle(); FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1); if(param->fsstat_print_mode == 1) { FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode)); } else { FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode)); value = 1; FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value)); } value = param->fsstat_period; FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); value = 0; FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1); FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1); FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port)); for(int i=0; ifsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); } for(int i=0; ifsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); } FS_start(param->fsstat_handle); evtimer_assign(¶m->fs_timer_output, evbase, doris_client_fs_output_timer_cb, param); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(¶m->fs_timer_output, &tv); return 0; } u_int32_t doris_csum_param_get_refernces(struct doris_csum_param *param) { pthread_mutex_lock(¶m->mutex_lock); u_int32_t references = param->references; pthread_mutex_unlock(¶m->mutex_lock); return references; } void doris_csum_parameter_destroy(struct doris_csum_param *param) { evtimer_del(¶m->fs_timer_output); FS_stop(¶m->fsstat_handle); doris_http_parameter_destroy(param->param_master); if(param->param_backup1 != NULL) { doris_http_parameter_destroy(param->param_backup1); } if(param->param_backup2 != NULL) { doris_http_parameter_destroy(param->param_backup2); } free(param); } struct doris_csum_param *doris_csum_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog) { struct doris_csum_param *param; param = (struct doris_csum_param *)calloc(1, sizeof(struct doris_csum_param)); param->manage_evbase = manage_evbase; MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", ¶m->retry_interval, 10); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", ¶m->fetch_frag_size, 5242880); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_confile_max_tries", ¶m->fetch_max_tries, 3); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", ¶m->client_sync_on, 0); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsCsmClient"); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client_csm.fs"); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", ¶m->fsstat_period, 10); MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", ¶m->fsstat_print_mode, 1); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1"); MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", ¶m->fsstat_dst_port, 8125); param->param_master = doris_http_parameter_new(confile, "DORIS_CLIENT.master_server", manage_evbase, runtimelog); if(param->param_master == NULL) { return NULL; } if(doris_client_register_field_stat(param, runtimelog, manage_evbase)) { return NULL; } param->param_backup1 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup1_server", manage_evbase, runtimelog); param->param_backup2 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup2_server", manage_evbase, runtimelog); pthread_mutex_init(¶m->mutex_lock, NULL); return param; } static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp) { struct doris_csum_instance *instance = (struct doris_csum_instance *)userp; struct timeval tv; struct doris_csum_statistics incr_statistic; long long *plast_statistic = (long long*)&instance->statistic_last; long long *pnow_statistic = (long long*)&instance->statistic; long long *pinc_statistic = (long long*)&incr_statistic; long long http_sessions=0; http_sessions += caculate_http_sessions_sum(instance->httpins_master); http_sessions += caculate_http_sessions_sum(instance->httpins_backup1); http_sessions += caculate_http_sessions_sum(instance->httpins_backup2); instance->statistic.status[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions; for(u_int32_t i=0; istatistic_last = instance->statistic; for(u_int32_t i=0; iparam->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]); } for(u_int32_t i=0; iparam->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]); } tv.tv_sec = instance->param->fsstat_period; tv.tv_usec = 0; event_add(&instance->timer_statistic, &tv); } struct doris_csum_param *doris_csum_instance_get_param(struct doris_csum_instance *instance) { return instance->param; } void doris_csum_instance_destroy(struct doris_csum_instance *instance) { pthread_mutex_lock(&instance->param->mutex_lock); instance->param->references--; pthread_mutex_unlock(&instance->param->mutex_lock); evtimer_del(&instance->timer_fetch); evtimer_del(&instance->timer_statistic); /*doris_http_instance_destroy(instance->httpins_master); if(instance->httpins_backup1 != NULL) { doris_http_instance_destroy(instance->httpins_backup1); } if(instance->httpins_backup2 != NULL) { doris_http_instance_destroy(instance->httpins_backup2); }*/ free(instance); } struct doris_csum_instance *doris_csum_instance_new(struct doris_csum_param *param, struct event_base *worker_evbase, struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog) { struct doris_csum_instance *instance; struct timeval tv; instance = (struct doris_csum_instance *)calloc(1, sizeof(struct doris_csum_instance)); instance->param = param; instance->worker_evbase = worker_evbase; instance->runtime_log = runtimelog; instance->cbs = *cbs; instance->args= *args; instance->cur_version = args->current_version; instance->req_version = instance->cur_version + 1; //TODO instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog); if(instance->httpins_master == NULL) { return NULL; } srand((int64_t)param); if(param->param_backup1 != NULL) { instance->httpins_backup1 = doris_http_instance_new(param->param_backup1, worker_evbase, runtimelog); } if(param->param_backup2 != NULL) { instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } evtimer_assign(&instance->ctx.timer_expires, worker_evbase, instance_meta_expire_timer_cb, instance); pthread_mutex_lock(¶m->mutex_lock); param->references++; pthread_mutex_unlock(¶m->mutex_lock); evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(&instance->timer_statistic, &tv); evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); tv.tv_sec = 3; tv.tv_usec = 0; evtimer_add(&instance->timer_fetch, &tv); return instance; }