This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
doris-doris-dispatch/server/doris_server_http.cpp

454 lines
16 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <sys/prctl.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "doris_server_main.h"
#include "doris_server_http.h"
extern struct doris_global_info g_doris_server_info;
struct version_list_node *lookup_vernode_accord_version(struct version_list_handle *handle, int64_t version)
{
struct version_list_node *vernode;
map<int64_t, struct version_list_node*>::iterator iter;
vernode = TAILQ_FIRST(&handle->version_head);
if(vernode!=NULL && version<vernode->version) //当前全量里最小的版本号
{
version = vernode->version;
}
for(; version <= handle->latest_version; version++)
{
if((iter = handle->version2node->find(version)) != handle->version2node->end())
{
vernode = iter->second;
return vernode;
}
}
return NULL;
}
/*返回值:
*304-客户端已达到最新版本配置已同步可以销毁consumer准备推送配置
*300-尚未配置同步或者Client配置版本更新*/
static int32_t check_producer_ready_sync(struct doris_business *business, struct evhttp_request *req, int64_t cur_version)
{
const char *client_version;
int64_t clientversion;
if(NULL == (client_version=evhttp_find_header(evhttp_request_get_input_headers(req), "X-Doris-Sync-Current-Version")))
{
return HTTP_NOTMODIFIED;
}
/*request from sync client, check http posts-on-the-way first*/
if(business->posts_on_the_way)
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "HttpProducer, business: %s, posts-on-the-way: %d, meta response 300", business->bizname, business->posts_on_the_way);
return 300;
}
/*Client版本更新? 更换了从机从机向主机拉主机更新主是Client此时主从都不能推在互相拉取*/
/*主拉取时得到300响应从拉取时直到同步到主的版本从启动主再拉版本与从一致获得304*/
if((clientversion=atol(client_version)) > cur_version)
{
business_set_sync_peer_abnormal(business);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_INFO, "HttpProducer, business: %s, client version(%lu) is newer than server(%lu)", business->bizname, clientversion, cur_version);
return 300;
}
business_resume_sync_peer_normal(business);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "HttpProducer, doris client is OK to sync for business: %s", business->bizname);
return HTTP_NOTMODIFIED;
}
void doris_http_server_meta_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
const char *version;
int64_t verlong;
char *endptr=NULL, length[64];
struct version_list_node *vernode;
struct evbuffer *evbuf;
struct doris_business *business;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_META_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return;
}
if(NULL == (version = evhttp_find_header(&params, "version")))
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version found");
return;
}
if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0')
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return;
}
if(NULL == (business = lookup_bizstruct_from_name(&params)))
{
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
return;
}
evhttp_clear_headers(&params);
pthread_rwlock_rdlock(&business->rwlock);
if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong)))
{
int code = check_producer_ready_sync(business, req, business->cfgver_head->latest_version);
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_META_NONEW], 0, FS_OP_ADD, 1);
evhttp_send_error(req, code, "No new configs found");
return;
}
evbuf = evbuffer_new();
evbuffer_add(evbuf, vernode->metacont, vernode->metalen);
sprintf(length, "%u", vernode->metalen);
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, business->fs_lineid, g_doris_server_info.fsstat_column[DRS_FSCLM_SEND_META_RES], FS_OP_ADD, 1);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/json");
evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive");
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
evhttp_send_reply(req, HTTP_OK, "OK", evbuf);
evbuffer_free(evbuf);
}
struct evbuffer *evbuf_content_from_memory(struct table_list_node *tablenode, size_t start, size_t end, size_t *length)
{
struct evbuffer *evbuf;
struct cont_frag_node *fragnode;
size_t copy_len, offset=start, res_length=0;
evbuf = evbuffer_new();
for(fragnode=TAILQ_FIRST(&tablenode->frag_head); fragnode!=NULL && fragnode->start<=end; fragnode=TAILQ_NEXT(fragnode, frag_node))
{
if(offset > fragnode->end)
{
continue;
}
copy_len = (end>fragnode->end)?(fragnode->end-offset + 1):(end-offset + 1);
evbuffer_add(evbuf, fragnode->content+(offset-fragnode->start), copy_len);
offset += copy_len;
res_length += copy_len;
}
*length = res_length;
return evbuf;
}
struct evbuffer *evbuf_content_from_disk(struct table_list_node *tablenode, size_t start, size_t end, size_t *length)
{
struct evbuffer *evbuf;
int32_t fd;
if((fd = open(tablenode->localpath, O_RDONLY, 0)) < 0)
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Send response, open %s failed: %s", strerror(errno));
return NULL;
}
evbuf = evbuffer_new();
if(evbuffer_add_file(evbuf, fd, start, end-start+1))
{
evbuffer_free(evbuf);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Send response, evbuffer_add_file %s failed");
close(fd);
return NULL;
}
*length = evbuffer_get_length(evbuf);
return evbuf;
}
void doris_response_file_range(struct evhttp_request *req, struct doris_business *business,
const char *tablename, int64_t verlong, size_t start, size_t end, bool range)
{
struct version_list_node *vernode;
struct table_list_node *tablenode;
struct evbuffer *evbuf;
char length[128];
size_t filesize, res_length;
pthread_rwlock_rdlock(&business->rwlock);
if(NULL == (vernode = lookup_vernode_accord_version(business->cfgver_head, verlong)))
{
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "Version too old");
return;
}
tablenode = TAILQ_FIRST(&vernode->table_head);
while(tablenode!=NULL && strcmp(tablename, tablenode->tablename))
{
tablenode = TAILQ_NEXT(tablenode, table_node);
}
if(tablenode==NULL || start>tablenode->filesize)
{
pthread_rwlock_unlock(&business->rwlock);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_RES_404], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_NOTFOUND, "No valid content found");
return;
}
filesize = tablenode->filesize;
if(end==0 || end >= tablenode->filesize)
{
end = tablenode->filesize - 1;
}
if(vernode->cont_in_disk)
{
evbuf = evbuf_content_from_disk(tablenode, start, end, &res_length);
}
else
{
evbuf = evbuf_content_from_memory(tablenode, start, end, &res_length);
}
pthread_rwlock_unlock(&business->rwlock);
assert(res_length == end + 1 - start);
sprintf(length, "%lu", res_length);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILES], 0, FS_OP_ADD, 1);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_SEND_FILE_BYTES], 0, FS_OP_ADD, res_length);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Length", length);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Type", "application/stream");
evhttp_add_header(evhttp_request_get_output_headers(req), "Connection", "keep-alive");
if(range)
{
sprintf(length, "bytes %lu-%lu/%lu", start, end, filesize);
evhttp_add_header(evhttp_request_get_output_headers(req), "Content-Range", length);
evhttp_send_reply(req, 206, "Partial Content", evbuf);
}
else
{
evhttp_send_reply(req, HTTP_OK, "OK", evbuf);
}
evbuffer_free(evbuf);
}
void doris_http_server_file_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
struct doris_business *business;
const char *version, *tablename, *content_range;
int64_t verlong;
char *endptr=NULL;
size_t req_start=0, req_end=0;
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_FILE_REQ], 0, FS_OP_ADD, 1);
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return;
}
if(NULL==(version=evhttp_find_header(&params, "version")) || NULL==(tablename=evhttp_find_header(&params, "tablename")))
{
evhttp_clear_headers(&params);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid, no version/tablename found");
return;
}
if(0==(verlong = strtol(version, &endptr, 10)) || *endptr!='\0')
{
evhttp_clear_headers(&params);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter version invalid");
return;
}
if(NULL!=(content_range = evhttp_find_header(evhttp_request_get_input_headers(req), "Range")) &&
sscanf(content_range, "%*[^0-9]%lu-%lu", &req_start, &req_end)<1)
{
evhttp_clear_headers(&params);
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Header Range invalid");
return;
}
if(NULL == (business = lookup_bizstruct_from_name(&params)))
{
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
return;
}
doris_response_file_range(req, business, tablename, verlong, req_start, req_end, (content_range==NULL)?false:true);
evhttp_clear_headers(&params);
}
void doris_http_server_version_cb(struct evhttp_request *req, void *arg)
{
struct evkeyvalq params;
struct doris_business *business;
char verbuf[32];
if(evhttp_parse_query(evhttp_request_get_uri(req), &params))
{
FS_operate(g_doris_server_info.fsstat_handle, g_doris_server_info.fsstat_field[DRS_FSSTAT_CLIENT_INVALID_REQ], 0, FS_OP_ADD, 1);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameters invalid");
return;
}
if(NULL == (business = lookup_bizstruct_from_name(&params)))
{
evhttp_clear_headers(&params);
evhttp_send_error(req, HTTP_BADREQUEST, "Parameter business invalid");
return;
}
evhttp_clear_headers(&params);
pthread_rwlock_rdlock(&business->rwlock);
sprintf(verbuf, "%lu", business->cfgver_head->latest_version);
pthread_rwlock_unlock(&business->rwlock);
evhttp_add_header(evhttp_request_get_output_headers(req), "X-Latest-Version", verbuf);
evhttp_send_reply(req, HTTP_OK, "OK", NULL);
}
void doris_http_server_generic_cb(struct evhttp_request *req, void *arg)
{
evhttp_send_error(req, HTTP_BADREQUEST, "Not Supported.");
}
pthread_t nirvana_pthreads_thread_id(void)
{
return pthread_self();
}
void nirvana_pthreads_locking_callback(int mode, int type, const char *file, int line)
{
if(mode & CRYPTO_LOCK)
{
pthread_mutex_lock(&g_doris_server_info.lock_cs[type]);
}
else
{
pthread_mutex_unlock(&g_doris_server_info.lock_cs[type]);
}
}
int server_verify_callback(int ok, X509_STORE_CTX *ctx)
{
X509 *client_cert;
char *subject, *issuer;
client_cert = X509_STORE_CTX_get_current_cert(ctx);
subject = X509_NAME_oneline(X509_get_subject_name(client_cert), 0, 0);
issuer = X509_NAME_oneline(X509_get_issuer_name(client_cert), 0, 0);
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_DEBUG, "ClientCert suject: %s, issuer: %s, state: %d.", subject, issuer, ok);
OPENSSL_free(subject);
OPENSSL_free(issuer);
return ok;
}
SSL_CTX *doris_connections_create_ssl_ctx(void)
{
int crypto_num;
SSL_CTX *ssl_ctx;
char session_id_appname[] = "DorisServer";
SSL_library_init();
SSLeay_add_ssl_algorithms();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
ERR_load_BIO_strings();
crypto_num = CRYPTO_num_locks();
g_doris_server_info.lock_cs = (pthread_mutex_t *)OPENSSL_malloc(crypto_num * sizeof(pthread_mutex_t));
for(int i=0; i<crypto_num; i++)
{
pthread_mutex_init(&g_doris_server_info.lock_cs[i], NULL);
}
CRYPTO_set_id_callback(nirvana_pthreads_thread_id);
CRYPTO_set_locking_callback(nirvana_pthreads_locking_callback);
ssl_ctx = SSL_CTX_new(SSLv23_server_method());
//SSL_CTX_set_verify(ssl_ctx, SSL_VERIFY_PEER|SSL_VERIFY_FAIL_IF_NO_PEER_CERT|SSL_VERIFY_CLIENT_ONCE, server_verify_callback);
//设置SESSION Resumption双向重用因为认证是双向的
SSL_CTX_set_session_cache_mode(ssl_ctx, SSL_SESS_CACHE_BOTH);
//设置最大Hold默认SSL_SESSION_CACHE_MAX_SIZE_DEFAULT(1024*20)个SESSION0-代表无限
SSL_CTX_sess_set_cache_size(ssl_ctx, SSL_SESSION_CACHE_MAX_SIZE_DEFAULT);
SSL_CTX_set_session_id_context(ssl_ctx, (unsigned char*)session_id_appname, strlen(session_id_appname));
SSL_CTX_set_default_passwd_cb_userdata(ssl_ctx, g_doris_server_info.ssl_key_passwd);
if(!SSL_CTX_load_verify_locations(ssl_ctx, NULL, g_doris_server_info.ssl_CA_path))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_load_verify_locations error: %s.", ERR_reason_error_string(ERR_get_error()));
SSL_CTX_free(ssl_ctx);
return NULL;
}
if(!SSL_CTX_use_certificate_file(ssl_ctx, g_doris_server_info.ssl_cert_file, SSL_FILETYPE_PEM))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_use_certificate_file error: %s.", ERR_reason_error_string(ERR_get_error()));
SSL_CTX_free(ssl_ctx);
return NULL;
}
if(SSL_CTX_use_PrivateKey_file(ssl_ctx, g_doris_server_info.ssl_key_file, SSL_FILETYPE_PEM) < 0)
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_use_PrivateKey_file_pass error: %s.", ERR_reason_error_string(ERR_get_error()));
SSL_CTX_free(ssl_ctx);
return NULL;
}
if(!SSL_CTX_check_private_key(ssl_ctx))
{
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "SSL_CTX_check_private_key error: %s.", ERR_reason_error_string(ERR_get_error()));
SSL_CTX_free(ssl_ctx);
return NULL;
}
return ssl_ctx;
}
void* thread_doris_http_server(void *arg)
{
struct event_base *worker_evbase;
struct evhttp *worker_http;
prctl(PR_SET_NAME, "http_server");
worker_evbase = event_base_new();
worker_http = evhttp_new(worker_evbase);
if(g_doris_server_info.ssl_conn_on)
{
evhttp_set_bevcb(worker_http, doris_https_bufferevent_cb, g_doris_server_info.ssl_instance);
}
evhttp_set_cb(worker_http, "/configmeta", doris_http_server_meta_cb, NULL);
evhttp_set_cb(worker_http, "/configfile", doris_http_server_file_cb, NULL);
evhttp_set_cb(worker_http, "/latestversion", doris_http_server_version_cb, NULL);
evhttp_set_gencb(worker_http, doris_http_server_generic_cb, NULL);
evhttp_set_allowed_methods(worker_http, EVHTTP_REQ_GET|EVHTTP_REQ_HEAD);
if(evhttp_accept_socket(worker_http, g_doris_server_info.listener_csum))
{
printf("evhttp_accept_socket %d error!\n", g_doris_server_info.listener_csum);
assert(0); return NULL;
}
event_base_dispatch(worker_evbase);
printf("Libevent dispath error, should not run here.\n");
MESA_RUNTIME_LOGV3(g_doris_server_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
return NULL;
}