This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
doris-doris-dispatch/client/doris_client_produce.cpp

519 lines
17 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <openssl/md5.h>
#include <cjson/cJSON.h>
#include <MESA/MESA_prof_load.h>
#include "doris_client_produce.h"
void doris_prod_upload_ctx_destroy(struct doris_upload_ctx *ctx)
{
doris_http_ctx_destroy(ctx->httpctx);
free(ctx);
}
static enum PROD_VEROP_RES version_common_result_assign_val(CURLcode res, long res_code)
{
if(res != CURLE_OK)
{
return VERSIONOP_CURL_ERROR;
}
switch(res_code)
{
case 200: return VERSIONOP_RES_OK;
case 201: return VERSIONOP_RES_OK; //文件分段重复上传才会返回201
default: return VERSIONOP_RES_ERROR;
}
}
void version_cancel_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
enum PROD_VEROP_RES result;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
result = version_common_result_assign_val(res, res_code);
if(result != VERSIONOP_RES_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version cancel sync failed, res_code: %ld, err: %s", ctx->business, res_code, err);
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, version cancel sync succ, res_code: %ld", ctx->business, res_code);
}
if(ctx->vercancel_cb != NULL)
{
ctx->vercancel_cb(result, ctx->userdata);
}
}
int32_t doris_prod_version_cancel(struct doris_upload_ctx *ctx, void (*vercancel_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata)
{
struct doris_http_callback cb;
char uri[256];
ctx->instance->statistic.field[DRS_FSPRD_FILED_VERCANCEL] += 1;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
ctx->vercancel_cb = vercancel_cb;
ctx->userdata = userdata;
cb.userp = ctx;
cb.header_cb = NULL;
cb.write_cb = NULL;
cb.read_process_cb = NULL;
cb.transfer_done_cb = version_cancel_transfer_done_cb;
doris_http_ctx_reset(ctx->httpctx, &cb);
if(ctx->instance->param->client_sync_on)
{
doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
}
snprintf(uri, sizeof(uri), "version/cancel?token=%s", ctx->token);
return doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0);
}
void version_end_header_cb(const char *start, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
const char *pos_colon;
char buffer[64];
int datalen;
if((pos_colon=(const char*)memchr(start, ':', bytes)) == NULL)
{
return ;
}
datalen = pos_colon - start;
switch(datalen)
{
case 13:
if(!strncasecmp(start, "X-Set-Version:", 14))
{
memcpy(buffer, start+14, bytes-14);
buffer[bytes-14] = '\0';
ctx->res_version = atol(buffer);
}
break;
default: break;
}
}
void version_end_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
enum PROD_VEROP_RES result;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
result = version_common_result_assign_val(res, res_code);
if(result != VERSIONOP_RES_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version end sync failed, res_code: %ld, err: %s", ctx->business, res_code, err);
}
else
{
assert(ctx->res_version != 0);
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_INFO, "business: %s, version end sync succ, res_code: %ld, new version: %lu", ctx->business, res_code, ctx->res_version);
}
if(ctx->verend_cb != NULL)
{
ctx->verend_cb(result, ctx->res_version, ctx->userdata);
}
}
int32_t doris_prod_version_end(struct doris_upload_ctx *ctx,
void (*verend_cb)(enum PROD_VEROP_RES result, int64_t version, void *userdata), void *userdata)
{
struct doris_http_callback cb;
char uri[256];
ctx->instance->statistic.field[DRS_FSPRD_FILED_VEREND] += 1;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
ctx->verend_cb = verend_cb;
ctx->userdata = userdata;
cb.userp = ctx;
cb.header_cb = version_end_header_cb;
cb.write_cb = NULL;
cb.read_process_cb = NULL;
cb.transfer_done_cb = version_end_transfer_done_cb;
doris_http_ctx_reset(ctx->httpctx, &cb);
if(ctx->instance->param->client_sync_on)
{
doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
}
snprintf(uri, sizeof(uri), "version/finish?token=%s", ctx->token);
return doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0);
}
void upload_frag_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
enum PROD_VEROP_RES result;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
result = version_common_result_assign_val(res, res_code);
if(result != VERSIONOP_RES_OK)
{
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, upload frag sync failed, res_code: %ld, err: %s", ctx->business, res_code, err);
}
else
{
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, upload frag sync succ, filename: %s, offset: %lu", ctx->business, ctx->filename, ctx->req_offset);
}
if(ctx->upfrag_cb != NULL)
{
ctx->upfrag_cb(result, ctx->userdata);
}
}
int32_t do_doris_prod_upload_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size,
struct table_meta *meta, const char *uri)
{
struct doris_http_callback cb;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
cb.userp = ctx;
cb.header_cb = NULL;
cb.write_cb = NULL;
cb.read_process_cb = NULL;
cb.transfer_done_cb = upload_frag_transfer_done_cb;
doris_http_ctx_reset(ctx->httpctx, &cb);
if(ctx->instance->param->client_sync_on)
{
doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
}
if(meta->userregion != NULL)
{
doris_http_ctx_add_header_kvstr(ctx->httpctx, "X-User-Info", meta->userregion);
}
doris_http_ctx_add_header_kvint(ctx->httpctx, "X-Config-Num", meta->cfgnum);
doris_http_ctx_add_header_kvstr(ctx->httpctx, "Content-MD5", meta->md5);
return doris_http_launch_put_request_data(ctx->httpctx, uri, data, size);
}
int32_t doris_prod_upload_frag_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size, size_t offset,
bool last, struct table_meta *meta, void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata)
{
char uri[256];
ctx->instance->statistic.field[DRS_FSPRD_FILED_FILEFRAG] += 1;
ctx->upfrag_cb = upfrag_cb;
ctx->userdata = userdata;
ctx->req_offset = offset;
snprintf(ctx->filename, 256, "%s", ctx->filename);
if(last)
{
snprintf(uri, sizeof(uri), "filefrag/upload?token=%s&tablename=%s&filename=%s&offset=%lu&last=true",
ctx->token, meta->tablename, meta->filename, offset);
}
else
{
snprintf(uri, sizeof(uri), "filefrag/upload?token=%s&tablename=%s&filename=%s&offset=%lu",
ctx->token, meta->tablename, meta->filename, offset);
}
return do_doris_prod_upload_with_cb(ctx, data, size, meta, uri);
}
int32_t doris_prod_upload_once_with_cb(struct doris_upload_ctx *ctx, char *data, size_t size,
struct table_meta *meta, void (*upfrag_cb)(enum PROD_VEROP_RES result, void *userdata), void *userdata)
{
char uri[256];
ctx->instance->statistic.field[DRS_FSPRD_FILED_FILEONCE] += 1;
ctx->upfrag_cb = upfrag_cb;
ctx->userdata = userdata;
ctx->req_offset = 0;
snprintf(ctx->filename, 256, "%s", ctx->filename);
snprintf(uri, sizeof(uri), "fileonce/upload?token=%s&tablename=%s&filename=%s",
ctx->token, meta->tablename, meta->filename);
return do_doris_prod_upload_with_cb(ctx, data, size, meta, uri);
}
void verstart_body_write_cb(const char *ptr, size_t bytes, CURLcode code, long res_code, void *userp)
{
struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
easy_string_savedata(&ctx->estr, (const char*)ptr, bytes);
}
static enum PROD_VERSTART_RES version_start_result_assign_val(CURLcode res, long res_code)
{
if(res != CURLE_OK)
{
return VERSTART_CURL_ERROR;
}
switch(res_code)
{
case 200: return VERSTART_RES_OK;
case 300: return VERSTART_RES_BUSY;
default: return VERSTART_RES_ERROR;
}
}
void verstart_transfer_done_cb(CURLcode res, long res_code, const char *err, void *userp)
{
struct doris_upload_ctx *ctx=(struct doris_upload_ctx *)userp;
cJSON *meta, *token;
enum PROD_VERSTART_RES result;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] -= 1;
result = version_start_result_assign_val(res, res_code);
switch(result)
{
case VERSTART_RES_OK:
case VERSTART_RES_BUSY: //server返回300必有token而且只有自己一个请求有效了(自己之前宕了恢复)
meta = cJSON_Parse(ctx->estr.buff);
token = cJSON_GetObjectItem(meta, "token");
assert(token->valuestring != NULL);
snprintf(ctx->token, 64, "%s", token->valuestring);
cJSON_Delete(meta);
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_DEBUG, "business: %s, version start sync %s, res_code: %ld, body: %s",
(result==VERSTART_RES_OK)?"succ":"busy", ctx->business, res_code, ctx->estr.buff);
break;
case VERSTART_RES_ERROR:
case VERSTART_CURL_ERROR:
MESA_HANDLE_RUNTIME_LOGV2(ctx->instance->runtime_log, RLOG_LV_FATAL, "business: %s, version start sync failed, res_code: %ld, err: %s",
ctx->business, res_code, err);
break;
default: assert(0);break;
}
if(ctx->verstart_cb != NULL)
{
ctx->verstart_cb(result, ctx->estr.buff, ctx->userdata);
}
easy_string_destroy(&ctx->estr);
}
struct doris_upload_ctx *doris_prod_upload_ctx_new(struct doris_prod_instance *instance,const char *business, int32_t cfgtype)
{
struct doris_upload_ctx *ctx;
struct doris_http_callback cb;
if(cfgtype!=1 && cfgtype!=2)
{
return NULL;
}
ctx = (struct doris_upload_ctx *)calloc(1, sizeof(struct doris_upload_ctx));
snprintf(ctx->business, 32, "%s", business);
ctx->instance = instance;
ctx->cfg_type = cfgtype;
cb.userp = ctx;
cb.header_cb = NULL;
cb.write_cb = verstart_body_write_cb;
cb.read_process_cb = NULL;
cb.transfer_done_cb = verstart_transfer_done_cb;
if(NULL == (ctx->httpctx = doris_http_ctx_new(instance->http_instance, &cb, rand(), NULL, 0)))
{
free(ctx);
return NULL;
}
return ctx;
}
int32_t doris_prod_version_start_with_cb(struct doris_upload_ctx *ctx,
void (*verstart_cb)(enum PROD_VERSTART_RES result, const char *body, void *userdata), void *userdata)
{
char uri[256];
ctx->userdata = userdata;
ctx->verstart_cb = verstart_cb;
if(ctx->instance->param->client_sync_on)
{
doris_http_ctx_add_header(ctx->httpctx, "X-Doris-Master-Slave-Sync: 1");
}
snprintf(uri, sizeof(uri), "version/start?business=%s&type=%d", ctx->business, ctx->cfg_type);
if(doris_http_launch_post_request(ctx->httpctx, uri, NULL, 0))
{
free(ctx);
return -1;
}
ctx->instance->statistic.field[DRS_FSPRD_FILED_VERSTART] += 1;
ctx->instance->statistic.status[DRS_FSPRD_STAT_REQ_SESSIONS] += 1;
return 0;
}
int32_t doris_prod_version_start(struct doris_upload_ctx *ctx)
{
return doris_prod_version_start_with_cb(ctx, NULL, NULL);
}
static void doris_prod_fsoutput_timer_cb(int fd, short kind, void *userp)
{
struct doris_prod_param *param=(struct doris_prod_param *)userp;
struct timeval tv;
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FSPRD_STAT_CNNED_SERVERS], 0, FS_OP_SET, param->param->connected_hosts);
FS_operate(param->fsstat_handle, param->fsstat_status[DRS_FSPRD_STAT_FAILED_SERVERS], 0, FS_OP_SET, param->param->failed_hosts);
FS_passive_output(param->fsstat_handle);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&param->fs_timer_output, &tv);
}
static int doris_prod_register_fsstat(struct doris_prod_param *param, void *runtime_log, struct event_base *evbase)
{
const char *field_names[FSSTAT_DORIS_PRD_FILED_NUM]={"VerStart", "VerEnd", "VerCancel", "FileFrag", "FileOnce"};
const char *status_names[FSSTAT_DORIS_PRD_STATUS_NUM]={"ServerCnned", "ServerFail", "MemoryUsed", "HttpSession", "ReqSession"};
struct timeval tv;
int value;
param->fsstat_handle = FS_create_handle();
FS_set_para(param->fsstat_handle, OUTPUT_DEVICE, param->fsstat_filepath, strlen(param->fsstat_filepath)+1);
if(param->fsstat_print_mode == 1)
{
FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
}
else
{
FS_set_para(param->fsstat_handle, PRINT_MODE, &param->fsstat_print_mode, sizeof(param->fsstat_print_mode));
value = 1;
FS_set_para(param->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
}
value = param->fsstat_period;
FS_set_para(param->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
value = 0;
FS_set_para(param->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(param->fsstat_handle, APP_NAME, param->fsstat_appname, strlen(param->fsstat_appname)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_IP, param->fsstat_dst_ip, strlen(param->fsstat_dst_ip)+1);
FS_set_para(param->fsstat_handle, STATS_SERVER_PORT, &param->fsstat_dst_port, sizeof(param->fsstat_dst_port));
for(int i=0; i<FSSTAT_DORIS_PRD_FILED_NUM; i++)
{
param->fsstat_field[i] = FS_register(param->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
}
for(int i=0; i<FSSTAT_DORIS_PRD_STATUS_NUM; i++)
{
param->fsstat_status[i] = FS_register(param->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
}
FS_start(param->fsstat_handle);
evtimer_assign(&param->fs_timer_output, evbase, doris_prod_fsoutput_timer_cb, param);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&param->fs_timer_output, &tv);
return 0;
}
struct doris_prod_param *doris_prod_parameter_new(const char *confile, struct event_base *manage_evbase, void *runtimelog)
{
struct doris_prod_param *param;
param = (struct doris_prod_param *)calloc(1, sizeof(struct doris_prod_param));
param->manage_evbase = manage_evbase;
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "upload_fragmet_size", &param->upload_frag_size, 5242880);
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "master_slave_sync_on", &param->client_sync_on, 0);
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_appname", param->fsstat_appname, 16, "DrsPrdClient");
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_filepath_p", param->fsstat_filepath, 256, "./log/doris_client_prd.fs");
MESA_load_profile_uint_def(confile, "DORIS_CLIENT", "fsstat_log_interval", &param->fsstat_period, 10);
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_print_mode", &param->fsstat_print_mode, 1);
MESA_load_profile_string_def(confile, "DORIS_CLIENT", "fsstat_log_dst_ip", param->fsstat_dst_ip, 64, "127.0.0.1");
MESA_load_profile_int_def(confile, "DORIS_CLIENT", "fsstat_log_dst_port", &param->fsstat_dst_port, 8125);
/*同步时只有双机备份*/
param->param = doris_http_parameter_new(confile, "DORIS_CLIENT.produce", manage_evbase, runtimelog);
if(param->param == NULL)
{
return NULL;
}
assert(param->param->ipgroup.dstaddr_num == 1);
if(doris_prod_register_fsstat(param, runtimelog, manage_evbase))
{
return NULL;
}
return param;
}
static void doris_prod_statistic_timer_cb(int fd, short kind, void *userp)
{
struct doris_prod_instance *instance = (struct doris_prod_instance *)userp;
struct timeval tv;
struct doris_prod_statistics incr_statistic;
long long *plast_statistic = (long long*)&instance->statistic_last;
long long *pnow_statistic = (long long*)&instance->statistic;
long long *pinc_statistic = (long long*)&incr_statistic;
instance->statistic.status[DRS_FSPRD_STAT_HTTP_SESSIONS] = caculate_http_sessions_sum(instance->http_instance);
for(u_int32_t i=0; i<sizeof(struct doris_prod_statistics)/sizeof(long long); i++)
{
pinc_statistic[i] = pnow_statistic[i] - plast_statistic[i];
}
instance->statistic_last = instance->statistic;
for(u_int32_t i=0; i<FSSTAT_DORIS_PRD_FILED_NUM; i++)
{
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_field[i], 0, FS_OP_ADD, incr_statistic.field[i]);
}
for(u_int32_t i=0; i<FSSTAT_DORIS_PRD_STATUS_NUM; i++)
{
FS_operate(instance->param->fsstat_handle, instance->param->fsstat_status[i], 0, FS_OP_ADD, incr_statistic.status[i]);
}
tv.tv_sec = instance->param->fsstat_period;
tv.tv_usec = 0;
event_add(&instance->timer_statistic, &tv);
}
struct doris_prod_instance *doris_prod_instance_new(struct doris_prod_param *param, struct event_base *worker_evbase, void *runtimelog)
{
struct doris_prod_instance *instance;
struct timeval tv;
instance = (struct doris_prod_instance *)calloc(1, sizeof(struct doris_prod_instance));
instance->param = param;
instance->worker_evbase = worker_evbase;
instance->runtime_log = runtimelog;
instance->http_instance = doris_http_instance_new(param->param, worker_evbase, runtimelog);
if(instance->http_instance == NULL)
{
return NULL;
}
srand((int64_t)param);
evtimer_assign(&instance->timer_statistic, worker_evbase, doris_prod_statistic_timer_cb, instance);
tv.tv_sec = param->fsstat_period;
tv.tv_usec = 0;
evtimer_add(&instance->timer_statistic, &tv);
return instance;
}