#include #include #include #include #include #include #include #include #include #include #include #include #include #include "doris_client_fetch.h" void easy_string_destroy(struct easy_string *estr) { if(estr->buff != NULL) { free(estr->buff); estr->buff = NULL; estr->len = estr->size = 0; } } void easy_string_savedata(struct easy_string *estr, const char *data, size_t len) { if(estr->size-estr->len < len+1) { estr->size += len*4+1; estr->buff = (char*)realloc(estr->buff, estr->size); } memcpy(estr->buff+estr->len, data, len); estr->len += len; estr->buff[estr->len]='\0'; } void doris_confile_ctx_reset(struct doris_confile_ctx *ctx) { struct doris_http_ctx *httpctx=ctx->httpctx; memset(ctx, 0, sizeof(struct doris_confile_ctx)); ctx->httpctx = httpctx; } void doris_confile_ctx_destry(struct doris_confile_ctx *ctx) { doris_confile_ctx_reset(ctx); doris_http_ctx_destroy(ctx->httpctx); ctx->httpctx = NULL; } void doris_update_new_version(struct doris_instance *instance) { instance->cur_version = instance->new_version; instance->new_version += 1; } void doris_request_restart_timer(struct doris_instance *instance, time_t wait_s) { struct timeval tv; tv.tv_sec = wait_s; tv.tv_usec = 0; event_add(&instance->timer_fetch, &tv); } void doris_fetch_next_confile_meta(struct doris_instance *instance) { cJSON *cur_a_item, *sub; memset(&instance->curmeta, 0, sizeof(struct fetch_file_meta)); cur_a_item = cJSON_GetArrayItem(instance->array, instance->array_index); instance->array_index++; sub = cJSON_GetObjectItem(cur_a_item, "tablename"); instance->curmeta.table_name = sub->valuestring; sub = cJSON_GetObjectItem(cur_a_item, "size"); instance->curmeta.size = sub->valuedouble; sub = cJSON_GetObjectItem(cur_a_item, "cfg_num"); instance->curmeta.cfg_num = sub->valueint; } void doris_http_confile_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; const char *pos_colon; size_t datalen; char buffer[64]; int ret; if(instance->ctx.res_code == 0) //check code only once { instance->ctx.res_code = res_code; assert(res_code != 0); if(res_code != 200 && res_code!=206) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "Fetch confile %s failed, req_version=%lu, curlcode = %d", instance->curmeta.table_name, instance->new_version, code); return; } instance->retry_times = 0; if(instance->curmeta.curoffset == 0) { instance->param->cbs.cfgfile_start(instance, instance->curmeta.table_name, instance->curmeta.size, instance->curmeta.cfg_num, instance->param->cbs.userdata); } } if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL) { return ; } datalen = pos_colon - start; switch(datalen) { case 14: if(!strncasecmp(start, "Content-Length:", 15)) { memcpy(buffer, start+15, bytes-15); buffer[bytes-15] = '\0'; instance->ctx.contlength = atol(buffer); } break; case 13: if(!strncasecmp(start, "Content-Range:", 14)) { memcpy(buffer, start+13, bytes-13); buffer[bytes-13] = '\0'; ret = sscanf(buffer, "%*[^0-9]%lu-%lu/%lu", &instance->ctx.contl_start, &instance->ctx.contl_end, &instance->ctx.contl_total); assert(ret == 3 && instance->ctx.contl_total == instance->curmeta.size && instance->ctx.contl_start==instance->curmeta.curoffset); } break; default: break; } } void doris_http_confile_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; if(code!=CURLE_OK || (instance->ctx.res_code!=200 && instance->ctx.res_code!=206) || (res_code!=200 && res_code!=206)) { return; } instance->param->cbs.cfgfile_update(instance, ptr, bytes, instance->param->cbs.userdata); instance->curmeta.curoffset += bytes; instance->statistic.field[DRS_FS_FILED_RES_BYTES] += bytes; } void doris_http_fetch_confile(struct doris_instance *instance); void doris_http_confile_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; if(instance->ctx.res_code != 200 && instance->ctx.res_code!=206) { goto out_error; } if(res!=CURLE_OK || (res_code!=200 && res_code!=206)) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Fetch confile %s failed, req_version=%lu, curlcode = %d, error: %s", instance->curmeta.table_name, instance->new_version, res_code, err); goto out_error; } if(instance->ctx.contl_total != 0) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Range: %lu-%lu/%lu", instance->curmeta.table_name, instance->new_version, instance->ctx.contl_start, instance->ctx.contl_end, instance->ctx.contl_total); } else { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Fetch confile %s success, req_version=%lu, Content-Length: %lu/%lu", instance->curmeta.table_name, instance->new_version, instance->ctx.contlength, instance->curmeta.size); } instance->statistic.field[DRS_FS_FILED_RES_FRAGS] += 1; if(instance->curmeta.curoffset >= instance->curmeta.size) //该文件下载完毕 { instance->statistic.field[DRS_FS_FILED_RES_FILES] += 1; instance->param->cbs.cfgfile_finish(instance, instance->param->cbs.userdata); if(instance->array_index == instance->array_size) { instance->param->cbs.version_finish(instance, instance->param->cbs.userdata); instance->status = FETCH_STATUS_META; doris_update_new_version(instance); cJSON_Delete(instance->meta); doris_confile_ctx_destry(&instance->ctx); doris_request_restart_timer(instance, 0); } else { doris_fetch_next_confile_meta(instance); doris_http_fetch_confile(instance); } } else { doris_http_fetch_confile(instance); } return; out_error: instance->statistic.field[DRS_FS_FILED_RES_FRAGERR] += 1; if(instance->ctx.res_code == 404) //404应答的重新开始请求 { instance->retry_times = instance->param->fetch_max_tries; } else { instance->retry_times++; } if(instance->retry_times >= instance->param->fetch_max_tries) { instance->statistic.field[DRS_FS_FILED_RES_VERERR] += 1; instance->param->cbs.version_error(instance, instance->param->cbs.userdata); instance->retry_times = 0; instance->status = FETCH_STATUS_META; cJSON_Delete(instance->meta); doris_confile_ctx_destry(&instance->ctx); } doris_request_restart_timer(instance, instance->param->retry_interval); } void doris_http_fetch_confile(struct doris_instance *instance) { struct doris_http_callback curlcbs; char metauri[128], range[64]={0}; curlcbs.header_cb = doris_http_confile_header_cb; curlcbs.write_cb = doris_http_confile_body_cb; curlcbs.transfer_done_cb = doris_http_confile_done_cb; curlcbs.userp = instance; doris_confile_ctx_reset(&instance->ctx); doris_http_ctx_reset(instance->ctx.httpctx, &curlcbs); //超大文件分段下载;上次未完成的继续下载 if((instance->curmeta.size > instance->param->fetch_frag_size) || instance->curmeta.curoffset!=0) { sprintf(range, "Range: bytes=%lu-%lu", instance->curmeta.curoffset, instance->curmeta.curoffset + instance->param->fetch_frag_size - 1); doris_http_ctx_add_header(instance->ctx.httpctx, range); } snprintf(metauri, 128, "configfile?tablename=%s&version=%lu&businessid=%u", instance->curmeta.table_name, instance->new_version, instance->param->args.businessid); if(doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); } else { instance->statistic.field[DRS_FS_FILED_REQ_FILES] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "Launch confile %s GET, req_version=%lu, %s", instance->curmeta.table_name, instance->new_version, range); } } void doris_http_meta_header_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; //check code only once if(instance->ctx.res_code != 0) { return; } instance->ctx.res_code = res_code; assert(res_code != 0); if(res_code != 200) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_DEBUG, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d", instance->cur_version, instance->new_version, code); } } void doris_http_meta_body_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; if(code!=CURLE_OK || res_code!=200) { return; } easy_string_savedata(&instance->estr, (const char*)ptr, bytes); } void doris_http_meta_done_cb(CURLcode res, long res_code, const char *err, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; if(instance->ctx.res_code != 200) { goto out_error; } if(res!=CURLE_OK || res_code!=200) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "No new meta found, cur_version=%lu, req_version=%lu, curlcode = %d, error: %s", instance->cur_version, instance->new_version, res_code, err); goto out_error; } instance->meta = cJSON_Parse(instance->estr.buff); if(instance->meta == NULL) { MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Parse meta failed, req_version=%lu, invalid json: %s", instance->new_version, instance->estr.buff); goto out_error; } instance->statistic.field[DRS_FS_FILED_RES_META] += 1; MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_INFO, "NEW_META found, cur_version=%lu, newjson: %s", instance->cur_version, instance->estr.buff); instance->param->cbs.version_start(instance, instance->meta, instance->param->cbs.userdata); instance->array = cJSON_GetObjectItem(instance->meta, "configs"); instance->array_size = cJSON_GetArraySize(instance->array); assert(instance->array_size > 0); easy_string_destroy(&instance->estr); instance->status = FETCH_STATUS_FILE; doris_fetch_next_confile_meta(instance); doris_http_fetch_confile(instance); return; out_error: instance->statistic.field[DRS_FS_FILED_RES_NOMETA] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); easy_string_destroy(&instance->estr); doris_confile_ctx_destry(&instance->ctx); } static void doris_http_fetch_meta(struct doris_instance *instance) { u_int64_t balance_seed; struct doris_http_callback curlcbs; char metauri[128]; balance_seed = (((u_int64_t)rand()) << 32) | rand(); memset(&curlcbs, 0, sizeof(struct doris_http_callback)); curlcbs.header_cb = doris_http_meta_header_cb; curlcbs.write_cb = doris_http_meta_body_cb; curlcbs.transfer_done_cb = doris_http_meta_done_cb; curlcbs.userp = instance; instance->array_index = 0; instance->cur_httpins = instance->httpins_master; instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); if(instance->ctx.httpctx==NULL && instance->httpins_backup1!=NULL) { instance->cur_httpins = instance->httpins_backup1; instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); } if(instance->ctx.httpctx==NULL && instance->httpins_backup2!=NULL) { instance->cur_httpins = instance->httpins_backup2; instance->ctx.httpctx = doris_http_ctx_new(instance->cur_httpins, &curlcbs, balance_seed); } if(instance->ctx.httpctx != NULL) { snprintf(metauri, 128, "configmeta?version=%lu&businessid=%u", instance->new_version, instance->param->args.businessid); if(!doris_http_launch_get_request(instance->ctx.httpctx, metauri)) { instance->status = FETCH_STATUS_META; instance->statistic.field[DRS_FS_FILED_REQ_META] += 1; } else { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_confile_ctx_destry(&instance->ctx); doris_request_restart_timer(instance, instance->param->retry_interval); } if(instance->cur_httpins == instance->httpins_backup1) instance->statistic.field[DRS_FS_FILED_BACKUP1_REQ] += 1; else if(instance->cur_httpins == instance->httpins_backup2) instance->statistic.field[DRS_FS_FILED_BACKUP2_REQ] += 1; } else { instance->statistic.field[DRS_FS_FILED_REQ_FAIL] += 1; doris_request_restart_timer(instance, instance->param->retry_interval); MESA_HANDLE_RUNTIME_LOGV2(instance->runtime_log, RLOG_LV_FATAL, "Launch meta GET failed: no active host found,req_version=%lu", instance->new_version); } } static void instance_fetch_cfg_timer_cb(int fd, short kind, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; switch(instance->status) { case FETCH_STATUS_IDLE: case FETCH_STATUS_META: doris_http_fetch_meta(instance); break; case FETCH_STATUS_FILE: doris_http_fetch_confile(instance); break; default: assert(0);break; } } static void doris_client_fs_output_timer_cb(int fd, short kind, void *userp) { struct doris_parameter *param=(struct doris_parameter *)userp; struct timeval tv; FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_CNN_SRV], 0, FS_OP_SET, param->param_master->connected_hosts); FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_MST_FAIL_SRV], 0, FS_OP_SET, param->param_master->failed_hosts); if(param->param_backup1 != NULL) { FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_CNN_SRV], 0, FS_OP_SET, param->param_backup1->connected_hosts); FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK1_FAIL_SRV], 0, FS_OP_SET, param->param_backup1->failed_hosts); } if(param->param_backup2 != NULL) { FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_CNN_SRV], 0, FS_OP_SET, param->param_backup2->connected_hosts); FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FS_STAT_BCK2_FAIL_SRV], 0, FS_OP_SET, param->param_backup2->failed_hosts); } FS_passive_output(param->fsstat_handle); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(¶m->fs_timer_output, &tv); } static int doris_client_register_field_stat(struct doris_parameter *param, void *runtime_log, struct event_base *evbase) { const char *field_names[FSSTAT_DORIS_FILED_NUM]={"ReqFail", "ReqMetas", "ResMetas", "ResNoNew", "ReqFiles", "ResFiles", "ResFrags", "ResFragErr", "ResBytes", "ResVerErr", "ReqBackup1", "ReqBackup2"}; const char *status_names[FSSTAT_DORIS_STATUS_NUM]={"MasSrvCned", "MasSrvFail", "Bck1SrvCned", "Bck1SrvFail", "Bck2SrvCned", "Bck2SrvFail", "MemoryUsed", "HttpSession"}; struct timeval tv; int value; param->fsstat_handle = FS_create_handle(); FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1); if(param->fsstat_print_mode == 1) { FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode)); } else { FS_set_para(param->fsstat_handle, PRINT_MODE, ¶m->fsstat_print_mode, sizeof(param->fsstat_print_mode)); value = 1; FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value)); } value = param->fsstat_period; FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); value = 0; FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1); FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1); FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, ¶m->fsstat_dst_port, sizeof(param->fsstat_dst_port)); for(int i=0; ifsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); } for(int i=0; ifsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); } FS_start(param->fsstat_handle); evtimer_assign(¶m->fs_timer_output, evbase, doris_client_fs_output_timer_cb, param); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(¶m->fs_timer_output, &tv); return 0; } struct doris_parameter *doris_parameter_new(const char *confile, struct event_base *manage_evbase, struct doris_callbacks *cbs, struct doris_arguments *args, void *runtimelog) { struct doris_parameter *param; param = (struct doris_parameter *)calloc(1, sizeof(struct doris_parameter)); param->manage_evbase = manage_evbase; param->cbs = *cbs; param->args= *args; MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fail_retry_interval", ¶m->retry_interval, 10); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_fragmet_size", ¶m->fetch_frag_size, 5242880); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fetch_confile_max_tries", ¶m->fetch_max_tries, 3); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DorisClient"); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath", param->fsstat_filepath, 256, "./log/doris_client.fs"); MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", ¶m->fsstat_period, 10); MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", ¶m->fsstat_print_mode, 1); MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1"); MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", ¶m->fsstat_dst_port, 8125); param->param_master = doris_http_parameter_new(confile, "DORIS_CLIENT.master_server", manage_evbase, runtimelog); if(param->param_master == NULL) { return NULL; } if(doris_client_register_field_stat(param, runtimelog, manage_evbase)) { return NULL; } param->param_backup1 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup1_server", manage_evbase, runtimelog); param->param_backup2 = doris_http_parameter_new(confile, "DORIS_CLIENT.backup2_server", manage_evbase, runtimelog); return param; } static void doris_instance_statistic_timer_cb(int fd, short kind, void *userp) { struct doris_instance *instance = (struct doris_instance *)userp; struct timeval tv; struct doris_statistics incr_statistic; long long *plast_statistic = (long long*)&instance->statistic_last; long long *pnow_statistic = (long long*)&instance->statistic; long long *pinc_statistic = (long long*)&incr_statistic; long long http_sessions=0; http_sessions += caculate_http_sessions_sum(instance->httpins_master); http_sessions += caculate_http_sessions_sum(instance->httpins_backup1); http_sessions += caculate_http_sessions_sum(instance->httpins_backup2); instance->statistic.field[DRS_FS_STAT_HTTP_SESSIONS] = http_sessions; for(u_int32_t i=0; istatistic_last = instance->statistic; for(u_int32_t i=0; iparam->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]); } for(u_int32_t i=0; iparam->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]); } tv.tv_sec = instance->param->fsstat_period; tv.tv_usec = 0; event_add(&instance->timer_statistic, &tv); } struct doris_instance *doris_instance_new(struct doris_parameter *param, struct event_base *worker_evbase, void *runtimelog) { struct doris_instance *instance; struct timeval tv; instance = (struct doris_instance *)calloc(1, sizeof(struct doris_instance)); instance->param = param; instance->worker_evbase = worker_evbase; instance->runtime_log = runtimelog; instance->cur_version = param->args.current_version; instance->new_version = instance->cur_version + 1; //TODO instance->httpins_master = doris_http_instance_new(param->param_master, worker_evbase, runtimelog); if(instance->httpins_master == NULL) { return NULL; } srand(time(NULL)); if(param->param_backup1 != NULL) { instance->httpins_backup1 = doris_http_instance_new(param->param_backup1, worker_evbase, runtimelog); } if(param->param_backup2 != NULL) { instance->httpins_backup2 = doris_http_instance_new(param->param_backup2, worker_evbase, runtimelog); } evtimer_assign(&instance->timer_statistic, worker_evbase, doris_instance_statistic_timer_cb, instance); tv.tv_sec = param->fsstat_period; tv.tv_usec = 0; evtimer_add(&instance->timer_statistic, &tv); evtimer_assign(&instance->timer_fetch, worker_evbase, instance_fetch_cfg_timer_cb, instance); tv.tv_sec = 3; tv.tv_usec = 0; evtimer_add(&instance->timer_fetch, &tv); return instance; }