2021-07-16 16:06:59 +08:00
# include <sys/stat.h>
# include <sys/types.h>
# include <unistd.h>
# include <stdio.h>
# include <stdlib.h>
# include <assert.h>
# include <errno.h>
# include <pthread.h>
# include <string.h>
# include <sys/prctl.h>
# include <time.h>
2021-08-25 18:40:20 +08:00
# include <event2/bufferevent_ssl.h>
2021-07-16 16:06:59 +08:00
# include "doris_server_main.h"
# include "doris_server_scandir.h"
# include "doris_server_receive.h"
struct scanner_timer_priv
{
2021-07-22 10:25:42 +08:00
struct doris_business * business ;
2021-07-16 16:06:59 +08:00
struct doris_callbacks doris_cbs ;
struct doris_arguments doris_args ;
struct doris_idxfile_scanner * scanner ;
struct event timer_scanner ;
} ;
2021-07-19 17:21:38 +08:00
extern struct doris_global_info g_doris_server_info ;
2021-07-16 16:06:59 +08:00
2021-07-22 10:25:42 +08:00
void config_frag_node_cleanup ( struct cont_frag_node * fragnode )
2021-07-16 16:06:59 +08:00
{
if ( fragnode = = NULL ) return ;
2021-07-22 10:25:42 +08:00
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_status [ DRS_FSSTAT_MEMORY_USED ] , 0 , FS_OP_SUB , fragnode - > totalsize ) ;
2021-07-16 16:06:59 +08:00
free ( fragnode - > content ) ;
free ( fragnode ) ;
}
2021-07-22 10:25:42 +08:00
void config_table_node_cleanup ( struct table_list_node * table_node )
2021-07-16 16:06:59 +08:00
{
struct cont_frag_node * fragnode ;
if ( table_node = = NULL ) return ;
while ( NULL ! = ( fragnode = TAILQ_FIRST ( & table_node - > frag_head ) ) )
{
TAILQ_REMOVE ( & table_node - > frag_head , fragnode , frag_node ) ;
2021-07-22 10:25:42 +08:00
config_frag_node_cleanup ( fragnode ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
config_frag_node_cleanup ( table_node - > cur_frag ) ;
cJSON_Delete ( table_node - > table_meta ) ;
2021-07-16 16:06:59 +08:00
free ( table_node ) ;
}
2021-07-22 10:25:42 +08:00
void config_version_node_cleanup ( struct version_list_node * vernode )
2021-07-16 16:06:59 +08:00
{
struct table_list_node * tablenode ;
if ( vernode = = NULL ) return ;
while ( NULL ! = ( tablenode = TAILQ_FIRST ( & vernode - > table_head ) ) )
{
TAILQ_REMOVE ( & vernode - > table_head , tablenode , table_node ) ;
2021-07-22 10:25:42 +08:00
config_table_node_cleanup ( tablenode ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
config_table_node_cleanup ( vernode - > cur_table ) ;
2021-07-16 16:06:59 +08:00
free ( vernode - > metacont ) ;
cJSON_Delete ( vernode - > metajson ) ;
cJSON_Delete ( vernode - > arrayjson ) ;
2021-07-19 17:21:38 +08:00
cJSON_Delete ( vernode - > table_meta ) ;
2021-08-25 18:40:20 +08:00
if ( vernode - > business ! = NULL & & vernode - > business - > recv_way = = RECV_WAY_HTTP_POST )
{
vernode - > business - > token2node - > erase ( string ( vernode - > token ) ) ;
}
2021-07-16 16:06:59 +08:00
free ( vernode ) ;
}
2021-08-04 11:14:56 +08:00
void config_version_handle_free ( struct version_list_handle * version )
2021-07-16 16:06:59 +08:00
{
struct version_list_node * vernode ;
while ( NULL ! = ( vernode = TAILQ_FIRST ( & version - > version_head ) ) )
{
TAILQ_REMOVE ( & version - > version_head , vernode , version_node ) ;
2021-07-22 10:25:42 +08:00
config_version_node_cleanup ( vernode ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-04 11:14:56 +08:00
delete version - > version2node ;
2021-07-16 16:06:59 +08:00
free ( version ) ;
}
struct version_list_handle * config_version_handle_new ( void )
{
struct version_list_handle * handle ;
handle = ( struct version_list_handle * ) calloc ( 1 , sizeof ( struct version_list_handle ) ) ;
2021-08-04 11:14:56 +08:00
handle - > version2node = new map < int64_t , struct version_list_node * > ;
2021-07-16 16:06:59 +08:00
TAILQ_INIT ( & handle - > version_head ) ;
return handle ;
}
2021-08-04 11:14:56 +08:00
void config_version_node_free_content ( struct version_list_node * vernode )
{
struct table_list_node * tablenode ;
struct cont_frag_node * fragnode ;
TAILQ_FOREACH ( tablenode , & vernode - > table_head , table_node )
{
while ( NULL ! = ( fragnode = TAILQ_FIRST ( & tablenode - > frag_head ) ) )
{
TAILQ_REMOVE ( & tablenode - > frag_head , fragnode , frag_node ) ;
config_frag_node_cleanup ( fragnode ) ;
}
}
vernode - > cont_in_disk = 1 ;
}
2021-07-16 16:06:59 +08:00
static void doris_common_timer_start ( struct event * time_event )
{
struct timeval tv ;
tv . tv_sec = 2 ;
tv . tv_usec = 0 ;
evtimer_add ( time_event , & tv ) ;
}
static void cfgver_delay_destroy_timer_cb ( int fd , short kind , void * userp )
{
struct common_timer_event * delay_event = ( struct common_timer_event * ) userp ;
struct version_list_handle * handle = ( struct version_list_handle * ) delay_event - > data ;
if ( handle - > references ! = 0 )
{
doris_common_timer_start ( & delay_event - > timer_event ) ;
return ;
}
2021-08-04 11:14:56 +08:00
config_version_handle_free ( handle ) ;
2021-07-16 16:06:59 +08:00
free ( delay_event ) ;
}
2021-08-25 18:40:20 +08:00
static void cfgver_handle_delay_destroy ( struct event_base * evbase , struct version_list_handle * version )
2021-07-16 16:06:59 +08:00
{
struct common_timer_event * delay_event ;
delay_event = ( struct common_timer_event * ) malloc ( sizeof ( struct common_timer_event ) ) ;
delay_event - > data = version ;
evtimer_assign ( & delay_event - > timer_event , evbase , cfgver_delay_destroy_timer_cb , delay_event ) ;
doris_common_timer_start ( & delay_event - > timer_event ) ;
}
/*fileϵ<65> к<EFBFBD> <D0BA> <EFBFBD> <EFBFBD> <EFBFBD> д<EFBFBD> <D0B4> <EFBFBD> <EFBFBD> <EFBFBD> ļ<EFBFBD> */
2021-08-25 18:40:20 +08:00
void doris_config_file_version_start ( struct doris_csum_instance * instance , cJSON * meta , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
if ( business - > type = = CFG_UPDATE_TYPE_FULL )
2021-07-16 16:06:59 +08:00
{
2021-08-30 14:22:48 +08:00
snprintf ( business - > inc_index_path , 256 , " %s/inc/index/full_config_index.%010lu " , business - > store_path_root , business - > version ) ;
snprintf ( business - > tmp_index_path , 256 , " %s/inc/full_config_index.%010lu.ing " , business - > store_path_root , business - > version ) ;
snprintf ( business - > full_index_path , 256 , " %s/full/index/full_config_index.%010lu " , business - > store_path_root , business - > version ) ;
2021-07-16 16:06:59 +08:00
}
else
{
2021-08-30 14:22:48 +08:00
snprintf ( business - > inc_index_path , 256 , " %s/inc/index/inc_config_index.%010lu " , business - > store_path_root , business - > version ) ;
snprintf ( business - > tmp_index_path , 256 , " %s/inc/full_config_index.%010lu.ing " , business - > store_path_root , business - > version ) ;
2021-07-22 10:25:42 +08:00
}
2021-08-25 18:40:20 +08:00
if ( NULL = = ( business - > fp_idx_file = fopen ( business - > tmp_index_path , " w+ " ) ) )
2021-07-22 10:25:42 +08:00
{
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, fopen %s failed: %s " , business - > bizname , business - > tmp_index_path , strerror ( errno ) ) ;
2021-07-22 10:25:42 +08:00
assert ( 0 ) ;
2021-07-16 16:06:59 +08:00
}
}
2021-08-25 18:40:20 +08:00
void doris_config_file_version_finish ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-08-30 14:22:48 +08:00
char tmp_index_dir [ 256 ] ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
fclose ( business - > fp_idx_file ) ;
if ( rename ( business - > tmp_index_path , business - > inc_index_path ) )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, rename %s to %s failed: %s " ,
business - > bizname , business - > tmp_index_path , business - > inc_index_path , strerror ( errno ) ) ;
2021-07-16 16:06:59 +08:00
assert ( 0 ) ;
}
2021-08-25 18:40:20 +08:00
if ( business - > type = = CFG_UPDATE_TYPE_FULL )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
if ( link ( business - > inc_index_path , business - > full_index_path ) & & errno ! = EEXIST ) //<2F> <> <EFBFBD> <EFBFBD> Ӳ<EFBFBD> <D3B2> <EFBFBD> <EFBFBD>
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, rename %s to %s failed: %s " ,
business - > bizname , business - > tmp_index_path , business - > inc_index_path , strerror ( errno ) ) ;
2021-07-16 16:06:59 +08:00
assert ( 0 ) ;
}
2021-08-30 14:22:48 +08:00
if ( business - > saves_when_fulldel > 0 )
{
for ( u_int32_t i = 1 ; i < business - > saves_when_fulldel ; i + + )
{
business - > full_version_inc [ i - 1 ] = business - > full_version_inc [ i ] ; //<2F> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <F2A3ACBD> µİ汾<C4B0> <E6B1BE> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ֵ
}
business - > full_version_inc [ business - > saves_when_fulldel - 1 ] = business - > version ;
snprintf ( tmp_index_dir , 256 , " %s/full/index " , business - > store_path_root ) ;
remove_configs_version_smaller ( tmp_index_dir , business - > full_version_inc [ 0 ] , 0 , g_doris_server_info . log_runtime ) ;
snprintf ( tmp_index_dir , 256 , " %s/inc/index " , business - > store_path_root ) ;
remove_configs_version_smaller ( tmp_index_dir , business - > full_version_inc [ 0 ] , 1 , g_doris_server_info . log_runtime ) ;
}
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, Version %lu write finished, index file: %s " ,
business - > bizname , business - > version , business - > inc_index_path ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_file_version_error ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
if ( business - > fp_idx_file ! = NULL )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
fclose ( business - > fp_idx_file ) ;
remove ( business - > tmp_index_path ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
if ( business - > fp_cfg_file ! = NULL )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
fclose ( business - > fp_cfg_file ) ;
remove ( business - > cfg_file_path ) ;
2021-07-16 16:06:59 +08:00
}
}
2021-08-25 18:40:20 +08:00
void doris_config_file_cfgfile_start ( struct doris_csum_instance * instance ,
2021-08-03 10:49:52 +08:00
const struct tablemeta * meta , const char * localpath , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
struct tm * localtm , savetime ;
time_t now ;
const char * type ;
char dir [ 256 ] ;
2021-08-25 18:40:20 +08:00
type = ( business - > type = = CFG_UPDATE_TYPE_FULL ) ? " full " : " inc " ;
2021-07-16 16:06:59 +08:00
now = time ( NULL ) ;
localtm = localtime_r ( & now , & savetime ) ;
2021-08-25 18:40:20 +08:00
snprintf ( dir , 256 , " %s/%s/%04d-%02d-%02d " , business - > store_path_root , type , localtm - > tm_year + 1900 , localtm - > tm_mon + 1 , localtm - > tm_mday ) ;
2021-07-16 16:06:59 +08:00
if ( access ( dir , F_OK ) )
{
doris_mkdir_according_path ( dir ) ;
}
2021-08-25 18:40:20 +08:00
snprintf ( business - > cfg_file_path , 256 , " %s/%s " , dir , meta - > filename ) ;
if ( g_doris_server_info . idx_file_maat | | meta - > userregion = = NULL ) //MAAT<41> <54> ʽ <EFBFBD> <CABD> ֪ͨ<CDA8> ļ<EFBFBD>
2021-07-27 16:25:13 +08:00
{
2021-08-25 18:40:20 +08:00
fprintf ( business - > fp_idx_file , " %s \t %u \t %s \n " , meta - > tablename , meta - > cfgnum , business - > cfg_file_path ) ;
2021-07-27 16:25:13 +08:00
}
else //ת<> <D7AA> <EFBFBD> <EFBFBD> ɫ<EFBFBD> <C9AB> <EFBFBD> <EFBFBD> <EFBFBD> û<EFBFBD> <C3BB> Զ<EFBFBD> <D4B6> <EFBFBD> <EFBFBD> <EFBFBD> Ϣ
{
2021-08-25 18:40:20 +08:00
fprintf ( business - > fp_idx_file , " %s \t %u \t %s \t %s \n " , meta - > tablename , meta - > cfgnum , business - > cfg_file_path , meta - > userregion ) ;
2021-07-27 16:25:13 +08:00
}
2021-08-25 18:40:20 +08:00
if ( NULL = = ( business - > fp_cfg_file = fopen ( business - > cfg_file_path , " w+ " ) ) )
2021-07-22 10:25:42 +08:00
{
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, fopen %s failed: %s " , business - > bizname , business - > cfg_file_path , strerror ( errno ) ) ;
2021-07-22 10:25:42 +08:00
assert ( 0 ) ;
}
else
{
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, File %s start writing... " , business - > bizname , business - > cfg_file_path ) ;
2021-07-22 10:25:42 +08:00
}
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_file_cfgfile_update ( struct doris_csum_instance * instance , const char * data , size_t len , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
size_t writen_len ;
2021-08-25 18:40:20 +08:00
writen_len = fwrite ( data , 1 , len , business - > fp_cfg_file ) ;
2021-07-22 10:25:42 +08:00
if ( writen_len ! = len )
{
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, fwrite %s failed: %s " , business - > bizname , business - > cfg_file_path , strerror ( errno ) ) ;
2021-07-22 10:25:42 +08:00
assert ( 0 ) ;
}
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_file_cfgfile_finish ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
fclose ( business - > fp_cfg_file ) ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, File %s write finished " , business - > bizname , business - > cfg_file_path ) ;
2021-07-16 16:06:59 +08:00
}
/*memϵ<6D> к<EFBFBD> <D0BA> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ڴ<EFBFBD> */
2021-08-25 18:40:20 +08:00
void doris_config_mem_version_start ( struct doris_csum_instance * instance , cJSON * meta , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
business - > cur_vernode = ( struct version_list_node * ) calloc ( 1 , sizeof ( struct version_list_node ) ) ;
TAILQ_INIT ( & business - > cur_vernode - > table_head ) ;
business - > cur_vernode - > metajson = cJSON_CreateObject ( ) ;
business - > cur_vernode - > arrayjson = cJSON_CreateArray ( ) ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
business - > cur_vernode - > version = business - > version ;
cJSON_AddNumberToObject ( business - > cur_vernode - > metajson , " version " , business - > cur_vernode - > version ) ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
business - > cur_vernode - > cfg_type = business - > type ;
cJSON_AddNumberToObject ( business - > cur_vernode - > metajson , " type " , business - > cur_vernode - > cfg_type ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_mem_version_finish ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
struct version_list_handle * tmplist ;
2021-08-04 11:14:56 +08:00
struct version_list_handle * cfgver_handle ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
cJSON_AddItemToObject ( business - > cur_vernode - > metajson , " configs " , business - > cur_vernode - > arrayjson ) ;
business - > cur_vernode - > arrayjson = NULL ;
business - > cur_vernode - > metacont = cJSON_PrintUnformatted ( business - > cur_vernode - > metajson ) ;
business - > cur_vernode - > metalen = strlen ( business - > cur_vernode - > metacont ) ;
cJSON_Delete ( business - > cur_vernode - > metajson ) ;
business - > cur_vernode - > metajson = NULL ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, Version %lu mem finished, info: %s " , business - > bizname , business - > version , business - > cur_vernode - > metacont ) ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
if ( business - > cur_vernode - > cfg_type = = CFG_UPDATE_TYPE_FULL & & business - > cfgver_head - > latest_version ! = 0 )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
cfgver_handle = config_version_handle_new ( ) ;
cfgver_handle - > latest_version = business - > cur_vernode - > version ;
2021-09-08 10:45:47 +08:00
cfgver_handle - > version_mem_num = 1 ;
2021-08-25 18:40:20 +08:00
TAILQ_INSERT_TAIL ( & cfgver_handle - > version_head , business - > cur_vernode , version_node ) ;
cfgver_handle - > oldest_vernode = TAILQ_FIRST ( & cfgver_handle - > version_head ) ;
cfgver_handle - > version2node - > insert ( make_pair ( cfgver_handle - > latest_version , business - > cur_vernode ) ) ;
pthread_rwlock_wrlock ( & business - > rwlock ) ;
tmplist = business - > cfgver_head ;
business - > cfgver_head = cfgver_handle ;
pthread_rwlock_unlock ( & business - > rwlock ) ;
cfgver_handle_delay_destroy ( business - > worker_evbase , tmplist ) ;
2021-07-16 16:06:59 +08:00
}
else
2021-08-04 11:14:56 +08:00
{
2021-08-25 18:40:20 +08:00
pthread_rwlock_wrlock ( & business - > rwlock ) ;
cfgver_handle = business - > cfgver_head ;
TAILQ_INSERT_TAIL ( & cfgver_handle - > version_head , business - > cur_vernode , version_node ) ;
cfgver_handle - > latest_version = business - > cur_vernode - > version ;
cfgver_handle - > version2node - > insert ( make_pair ( business - > cur_vernode - > version , business - > cur_vernode ) ) ;
2021-08-04 11:14:56 +08:00
if ( cfgver_handle - > oldest_vernode = = NULL )
{
cfgver_handle - > oldest_vernode = TAILQ_FIRST ( & cfgver_handle - > version_head ) ;
}
/*<2A> <> <EFBFBD> <EFBFBD> <EFBFBD> ļ<EFBFBD> <C4BC> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <E0BBBA> N<EFBFBD> <4E> <EFBFBD> 汾<EFBFBD> <E6B1BE> Ԫ<EFBFBD> <D4AA> Ϣȫ<CFA2> <C8AB> <EFBFBD> <EFBFBD> */
2021-09-08 10:45:47 +08:00
if ( business - > cache_max_versions ! = 0 & & cfgver_handle - > version_mem_num > = business - > cache_max_versions )
2021-08-04 11:14:56 +08:00
{
2021-09-08 10:45:47 +08:00
if ( ! business - > persistence_write_on )
{
TAILQ_REMOVE ( & cfgver_handle - > version_head , cfgver_handle - > oldest_vernode , version_node ) ;
config_version_node_cleanup ( cfgver_handle - > oldest_vernode ) ;
cfgver_handle - > oldest_vernode = TAILQ_FIRST ( & cfgver_handle - > version_head ) ;
}
else
{
config_version_node_free_content ( cfgver_handle - > oldest_vernode ) ;
cfgver_handle - > oldest_vernode = TAILQ_NEXT ( cfgver_handle - > oldest_vernode , version_node ) ;
}
2021-08-04 11:14:56 +08:00
}
else
{
2021-09-08 10:45:47 +08:00
cfgver_handle - > version_mem_num + = 1 ;
2021-08-04 11:14:56 +08:00
}
2021-08-25 18:40:20 +08:00
pthread_rwlock_unlock ( & business - > rwlock ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
business - > cur_vernode = NULL ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_mem_version_error ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
if ( business - > cur_vernode ! = NULL )
{
config_version_node_cleanup ( business - > cur_vernode ) ;
}
business - > cur_vernode = NULL ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_mem_cfgfile_start ( struct doris_csum_instance * instance ,
2021-08-03 10:49:52 +08:00
const struct tablemeta * meta , const char * localpath , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
struct table_list_node * cur_table ;
business - > cur_vernode - > table_meta = cJSON_CreateObject ( ) ;
cJSON_AddStringToObject ( business - > cur_vernode - > table_meta , " tablename " , meta - > tablename ) ;
cJSON_AddStringToObject ( business - > cur_vernode - > table_meta , " filename " , meta - > filename ) ;
cJSON_AddNumberToObject ( business - > cur_vernode - > table_meta , " cfg_num " , meta - > cfgnum ) ;
cJSON_AddNumberToObject ( business - > cur_vernode - > table_meta , " size " , meta - > size ) ;
2021-08-03 10:49:52 +08:00
if ( meta - > userregion ! = NULL )
2021-07-27 16:25:13 +08:00
{
2021-08-25 18:40:20 +08:00
cJSON_AddStringToObject ( business - > cur_vernode - > table_meta , " user_region " , meta - > userregion ) ;
2021-07-27 16:25:13 +08:00
}
2021-08-25 18:40:20 +08:00
cur_table = ( struct table_list_node * ) calloc ( 1 , sizeof ( struct table_list_node ) ) ;
cur_table - > filesize = meta - > size ;
snprintf ( cur_table - > tablename , 64 , " %s " , meta - > tablename ) ;
snprintf ( cur_table - > localpath , 256 , " %s " , localpath ) ;
TAILQ_INIT ( & cur_table - > frag_head ) ;
business - > cur_vernode - > cur_table = cur_table ;
2021-07-16 16:06:59 +08:00
2021-08-03 10:49:52 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, table %s.%010llu start loading to memory... " ,
2021-08-25 18:40:20 +08:00
business - > bizname , meta - > tablename , business - > version ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_mem_cfgfile_update ( struct doris_csum_instance * instance , const char * data , size_t len , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
struct table_list_node * cur_table ;
2021-07-16 16:06:59 +08:00
size_t cache_len , offset = 0 ;
2021-07-22 10:25:42 +08:00
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_status [ DRS_FSSTAT_MEMORY_USED ] , 0 , FS_OP_ADD , len ) ;
2021-08-25 18:40:20 +08:00
cur_table = business - > cur_vernode - > cur_table ;
2021-07-16 16:06:59 +08:00
while ( len > 0 )
{
2021-08-25 18:40:20 +08:00
if ( cur_table - > cur_frag = = NULL )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
cur_table - > cur_frag = ( struct cont_frag_node * ) calloc ( 1 , sizeof ( struct cont_frag_node ) ) ;
cur_table - > cur_frag - > start = cur_table - > cur_totallen ;
cur_table - > cur_frag - > totalsize = cur_table - > filesize - cur_table - > cur_totallen ;
if ( cur_table - > filesize = = 0 | | cur_table - > cur_frag - > totalsize > g_doris_server_info . cache_frag_size )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
cur_table - > cur_frag - > totalsize = g_doris_server_info . cache_frag_size ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
cur_table - > cur_frag - > end = cur_table - > cur_frag - > start + cur_table - > cur_frag - > totalsize - 1 ;
cur_table - > cur_frag - > content = ( char * ) malloc ( cur_table - > cur_frag - > totalsize ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
if ( cur_table - > cur_frag - > totalsize > cur_table - > cur_frag - > cur_fraglen + len )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
memcpy ( cur_table - > cur_frag - > content + cur_table - > cur_frag - > cur_fraglen , data + offset , len ) ;
cur_table - > cur_frag - > cur_fraglen + = len ;
cur_table - > cur_totallen + = len ;
2021-07-16 16:06:59 +08:00
offset + = len ;
len = 0 ;
}
else
{
2021-08-25 18:40:20 +08:00
cache_len = cur_table - > cur_frag - > totalsize - cur_table - > cur_frag - > cur_fraglen ;
memcpy ( cur_table - > cur_frag - > content + cur_table - > cur_frag - > cur_fraglen , data + offset , cache_len ) ;
cur_table - > cur_frag - > cur_fraglen + = cache_len ;
cur_table - > cur_totallen + = cache_len ;
2021-07-16 16:06:59 +08:00
offset + = cache_len ;
len - = cache_len ;
2021-08-25 18:40:20 +08:00
TAILQ_INSERT_TAIL ( & cur_table - > frag_head , cur_table - > cur_frag , frag_node ) ;
assert ( cur_table - > cur_frag - > cur_fraglen = = cur_table - > cur_frag - > end - cur_table - > cur_frag - > start + 1 ) ;
cur_table - > cur_frag = NULL ;
2021-07-16 16:06:59 +08:00
}
}
2021-08-25 18:40:20 +08:00
assert ( cur_table - > cur_totallen < = cur_table - > filesize | | cur_table - > filesize = = 0 ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_mem_cfgfile_finish ( struct doris_csum_instance * instance , const char * md5 , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
struct table_list_node * cur_table ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
cJSON_AddStringToObject ( business - > cur_vernode - > table_meta , " md5 " , md5 ) ;
cJSON_AddItemToArray ( business - > cur_vernode - > arrayjson , business - > cur_vernode - > table_meta ) ;
business - > cur_vernode - > table_meta = NULL ;
2021-07-19 17:21:38 +08:00
2021-08-25 18:40:20 +08:00
cur_table = business - > cur_vernode - > cur_table ;
if ( cur_table - > cur_frag ! = NULL )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
TAILQ_INSERT_TAIL ( & cur_table - > frag_head , cur_table - > cur_frag , frag_node ) ;
assert ( cur_table - > cur_frag - > cur_fraglen = = cur_table - > cur_frag - > end - cur_table - > cur_frag - > start + 1 ) ;
cur_table - > cur_frag = NULL ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
assert ( cur_table - > cur_totallen = = cur_table - > filesize ) ;
TAILQ_INSERT_TAIL ( & business - > cur_vernode - > table_head , cur_table , table_node ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, table %s.%010llu load to memory finished " , business - > bizname , cur_table - > tablename , business - > version ) ;
business - > cur_vernode - > cur_table = NULL ;
2021-07-16 16:06:59 +08:00
}
/*commonϵ<6E> к<EFBFBD> <D0BA> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ʱ<EFBFBD> <CAB1> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> */
2021-08-25 18:40:20 +08:00
void doris_config_common_version_start ( struct doris_business * business , cJSON * meta )
2021-07-16 16:06:59 +08:00
{
cJSON * sub ;
sub = cJSON_GetObjectItem ( meta , " version " ) ;
2021-08-25 18:40:20 +08:00
business - > version = sub - > valuedouble ;
2021-07-16 16:06:59 +08:00
sub = cJSON_GetObjectItem ( meta , " type " ) ;
2021-08-25 18:40:20 +08:00
business - > type = sub - > valueint ;
assert ( business - > type = = CFG_UPDATE_TYPE_FULL | | business - > type = = CFG_UPDATE_TYPE_INC ) ;
business - > version_cfgnum = 0 ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, Version %lu start updating... " , business - > bizname , business - > version ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_common_version_finish ( struct doris_business * business )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
if ( business - > type = = CFG_UPDATE_TYPE_FULL )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
business - > total_cfgnum = business - > version_cfgnum ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_CUR_FULL_VERSION ] , FS_OP_SET , business - > version ) ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_CONFIG_TOTAL_NUM ] , FS_OP_SET , business - > version_cfgnum ) ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_RECV_FULL_VER ] , FS_OP_ADD , 1 ) ;
2021-07-16 16:06:59 +08:00
}
else
{
2021-08-25 18:40:20 +08:00
business - > total_cfgnum + = business - > version_cfgnum ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_CUR_INC_VERSION ] , FS_OP_SET , business - > version ) ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_CONFIG_TOTAL_NUM ] , FS_OP_ADD , business - > version_cfgnum ) ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_RECV_INC_VER ] , FS_OP_ADD , 1 ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
MESA_Monitor_operation ( g_doris_server_info . monitor , business - > mmid_latest_ver , MONITOR_VALUE_SET , business - > version ) ;
MESA_Monitor_operation ( g_doris_server_info . monitor , business - > mmid_total_cfgnum , MONITOR_VALUE_SET , business - > total_cfgnum ) ;
MESA_Monitor_set_status_code ( g_doris_server_info . monitor , MONITOR_STATUS_OP_CLEAR , business - > mmval_status_codeid , NULL , NULL ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, Version %lu update finished " , business - > bizname , business - > version ) ;
}
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
void doris_config_common_version_error ( struct doris_business * business )
2021-07-16 16:06:59 +08:00
{
2021-07-22 10:25:42 +08:00
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_RECV_ERR_VER ] , 0 , FS_OP_ADD , 1 ) ;
2021-07-19 17:21:38 +08:00
//Grafana+Promethues<65> <73> չʾ <D5B9> ڲ<EFBFBD> <DAB2> 쳣״̬
2021-08-25 18:40:20 +08:00
MESA_Monitor_set_status_code ( g_doris_server_info . monitor , MONITOR_STATUS_OP_SET , business - > mmval_status_codeid ,
2021-07-19 17:21:38 +08:00
" Version receive error " , " Receive config file error, please check producer " ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_common_cfgfile_start ( struct doris_business * business , u_int32_t cfgnum )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
business - > version_cfgnum + = cfgnum ;
2021-07-22 10:25:42 +08:00
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_RECV_START_FILES ] , 0 , FS_OP_ADD , 1 ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_common_cfgfile_finish ( struct doris_business * business )
2021-07-16 16:06:59 +08:00
{
2021-07-22 10:25:42 +08:00
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_RECV_CMPLT_FILES ] , 0 , FS_OP_ADD , 1 ) ;
2021-08-25 18:40:20 +08:00
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_RECV_FILES ] , FS_OP_ADD , 1 ) ;
2021-07-16 16:06:59 +08:00
}
/*localmemϵ<6D> к<EFBFBD> <D0BA> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ʱ<EFBFBD> ӱ<EFBFBD> <D3B1> ػ<EFBFBD> <D8BB> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> Ļص<C4BB> */
2021-08-25 18:40:20 +08:00
void doris_config_localmem_version_start ( struct doris_csum_instance * instance , cJSON * meta , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
doris_config_common_version_start ( ( struct doris_business * ) userdata , meta ) ;
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_version_start ( instance , meta , userdata ) ;
}
}
2021-08-25 18:40:20 +08:00
void doris_config_localmem_version_finish ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_version_finish ( instance , userdata ) ;
}
2021-08-25 18:40:20 +08:00
doris_config_common_version_finish ( ( struct doris_business * ) userdata ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_localmem_version_error ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
doris_config_common_version_error ( ( struct doris_business * ) userdata ) ;
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_version_error ( instance , userdata ) ;
}
}
2021-08-25 18:40:20 +08:00
void doris_config_localmem_cfgfile_start ( struct doris_csum_instance * instance ,
2021-08-03 10:49:52 +08:00
const struct tablemeta * meta , const char * localpath , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
doris_config_common_cfgfile_start ( ( struct doris_business * ) userdata , meta - > cfgnum ) ;
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
2021-08-03 10:49:52 +08:00
doris_config_mem_cfgfile_start ( instance , meta , localpath , userdata ) ;
2021-07-16 16:06:59 +08:00
}
}
2021-08-25 18:40:20 +08:00
void doris_config_localmem_cfgfile_update ( struct doris_csum_instance * instance , const char * data , size_t len , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_cfgfile_update ( instance , data , len , userdata ) ;
}
}
2021-08-25 18:40:20 +08:00
void doris_config_localmem_cfgfile_finish ( struct doris_csum_instance * instance , const char * md5 , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
doris_config_common_cfgfile_finish ( ( struct doris_business * ) userdata ) ;
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
2021-07-19 17:21:38 +08:00
doris_config_mem_cfgfile_finish ( instance , md5 , userdata ) ;
2021-07-16 16:06:59 +08:00
}
}
/*<2A> ޱ<EFBFBD> <DEB1> <EFBFBD> ϵ<EFBFBD> к<EFBFBD> <D0BA> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ʱ<EFBFBD> ص<EFBFBD> */
2021-08-25 18:40:20 +08:00
void doris_config_version_start ( struct doris_csum_instance * instance , cJSON * meta , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-09-08 10:45:47 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
doris_config_common_version_start ( business , meta ) ;
if ( business - > persistence_write_on )
{
doris_config_file_version_start ( instance , meta , userdata ) ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_version_start ( instance , meta , userdata ) ;
}
}
2021-08-25 18:40:20 +08:00
void doris_config_version_finish ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-09-08 10:45:47 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
if ( business - > persistence_write_on )
{
doris_config_file_version_finish ( instance , userdata ) ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_version_finish ( instance , userdata ) ;
}
2021-08-25 18:40:20 +08:00
doris_config_common_version_finish ( ( struct doris_business * ) userdata ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_version_error ( struct doris_csum_instance * instance , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-09-08 10:45:47 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-08-25 18:40:20 +08:00
doris_config_common_version_error ( ( struct doris_business * ) userdata ) ;
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
{
doris_config_file_version_error ( instance , userdata ) ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_version_error ( instance , userdata ) ;
}
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, Version %llu error, rolling back... " , business - > bizname , business - > version ) ;
2021-07-16 16:06:59 +08:00
}
2021-08-25 18:40:20 +08:00
void doris_config_cfgfile_start ( struct doris_csum_instance * instance ,
2021-08-03 10:49:52 +08:00
const struct tablemeta * meta , const char * localpath , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-07-22 10:25:42 +08:00
2021-08-25 18:40:20 +08:00
doris_config_common_cfgfile_start ( ( struct doris_business * ) userdata , meta - > cfgnum ) ;
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
{
doris_config_file_cfgfile_start ( instance , meta , localpath , userdata ) ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
2021-08-25 18:40:20 +08:00
doris_config_mem_cfgfile_start ( instance , meta , business - > cfg_file_path , userdata ) ;
2021-07-16 16:06:59 +08:00
}
}
2021-08-25 18:40:20 +08:00
void doris_config_cfgfile_update ( struct doris_csum_instance * instance , const char * data , size_t len , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-09-08 10:45:47 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
if ( business - > persistence_write_on )
{
doris_config_file_cfgfile_update ( instance , data , len , userdata ) ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
doris_config_mem_cfgfile_update ( instance , data , len , userdata ) ;
}
}
2021-08-25 18:40:20 +08:00
void doris_config_cfgfile_finish ( struct doris_csum_instance * instance , const char * md5 , void * userdata )
2021-07-16 16:06:59 +08:00
{
2021-09-08 10:45:47 +08:00
struct doris_business * business = ( struct doris_business * ) userdata ;
2021-08-25 18:40:20 +08:00
doris_config_common_cfgfile_finish ( ( struct doris_business * ) userdata ) ;
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
{
doris_config_file_cfgfile_finish ( instance , userdata ) ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
2021-07-16 16:06:59 +08:00
{
2021-07-19 17:21:38 +08:00
doris_config_mem_cfgfile_finish ( instance , md5 , userdata ) ;
2021-07-16 16:06:59 +08:00
}
}
void * thread_doris_client_recv_cfg ( void * arg )
{
2021-07-22 10:25:42 +08:00
struct doris_business * business = ( struct doris_business * ) arg ;
struct event_base * client_evbase ;
2021-08-25 18:40:20 +08:00
struct doris_csum_instance * instance ;
2021-07-16 16:06:59 +08:00
struct doris_callbacks doris_cbs ;
2021-07-22 10:25:42 +08:00
struct doris_arguments doris_args ;
2021-07-16 16:06:59 +08:00
struct doris_idxfile_scanner * scanner ;
enum DORIS_UPDATE_TYPE update_type ;
char stored_path [ 512 ] ;
prctl ( PR_SET_NAME , " client_recv " ) ;
client_evbase = event_base_new ( ) ;
2021-08-25 18:40:20 +08:00
business - > source_from = RECV_WAY_IDX_FILE ;
business - > worker_evbase = client_evbase ;
2021-07-16 16:06:59 +08:00
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
2021-08-30 14:22:48 +08:00
{
2021-09-08 10:45:47 +08:00
scanner = doris_index_file_scanner ( 0 ) ;
/*Retaive latest config to memory from Stored configs*/
doris_cbs . version_start = doris_config_localmem_version_start ;
doris_cbs . version_finish = doris_config_localmem_version_finish ;
doris_cbs . version_error = doris_config_localmem_version_error ;
doris_cbs . cfgfile_start = doris_config_localmem_cfgfile_start ;
doris_cbs . cfgfile_update = doris_config_localmem_cfgfile_update ;
doris_cbs . cfgfile_finish = doris_config_localmem_cfgfile_finish ;
doris_cbs . version_updated = NULL ;
doris_cbs . userdata = business ;
2021-07-16 16:06:59 +08:00
2021-09-08 10:45:47 +08:00
snprintf ( stored_path , 512 , " %s/full/index " , business - > store_path_root ) ;
if ( business - > saves_when_fulldel > 0 )
{
get_full_topN_max_versions ( stored_path , business - > full_version_inc , business - > saves_when_fulldel ) ;
}
update_type = doris_index_file_traverse ( scanner , stored_path , & doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
snprintf ( stored_path , 512 , " %s/inc/index " , business - > store_path_root ) ;
do {
update_type = doris_index_file_traverse ( scanner , stored_path , & doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
} while ( update_type ! = CFG_UPDATE_TYPE_NONE ) ;
}
2021-07-16 16:06:59 +08:00
/*Check new configs*/
doris_cbs . version_start = doris_config_version_start ;
doris_cbs . version_finish = doris_config_version_finish ;
doris_cbs . version_error = doris_config_version_error ;
doris_cbs . cfgfile_start = doris_config_cfgfile_start ;
doris_cbs . cfgfile_update = doris_config_cfgfile_update ;
doris_cbs . cfgfile_finish = doris_config_cfgfile_finish ;
2021-09-08 10:45:47 +08:00
doris_cbs . version_updated = NULL ;
doris_cbs . userdata = business ;
2021-07-16 16:06:59 +08:00
2021-08-25 18:40:20 +08:00
business - > source_from = RECV_WAY_DRS_CLIENT ;
2021-07-22 10:25:42 +08:00
memset ( & doris_args , 0 , sizeof ( struct doris_arguments ) ) ;
2021-09-08 10:45:47 +08:00
doris_args . current_version = ( business - > persistence_write_on ) ? scanner - > cur_version : ( 0 - business - > cache_max_versions ) ;
2021-07-22 10:25:42 +08:00
sprintf ( doris_args . bizname , " %s " , business - > bizname ) ;
2021-08-25 18:40:20 +08:00
instance = doris_csum_instance_new ( business - > param_csum , client_evbase , & doris_cbs , & doris_args , g_doris_server_info . log_runtime ) ;
2021-07-16 16:06:59 +08:00
if ( instance = = NULL )
{
assert ( 0 ) ; return NULL ;
}
event_base_dispatch ( client_evbase ) ;
printf ( " Libevent dispath error, should not run here. \n " ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " Libevent dispath error, should not run here. " ) ;
assert ( 0 ) ; return NULL ;
}
static void doris_scanner_timer_cb ( int fd , short kind , void * userp )
{
struct scanner_timer_priv * timer_priv = ( struct scanner_timer_priv * ) userp ;
enum DORIS_UPDATE_TYPE update_type ;
struct timeval tv ;
do {
2021-07-22 10:25:42 +08:00
update_type = doris_index_file_traverse ( timer_priv - > scanner , timer_priv - > business - > recv_path_inc ,
2021-07-16 16:06:59 +08:00
& timer_priv - > doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
} while ( update_type ! = CFG_UPDATE_TYPE_NONE ) ;
tv . tv_sec = g_doris_server_info . scan_idx_interval ;
tv . tv_usec = 0 ;
evtimer_add ( & timer_priv - > timer_scanner , & tv ) ;
}
void * thread_index_file_recv_cfg ( void * arg )
{
2021-07-22 10:25:42 +08:00
struct doris_business * business = ( struct doris_business * ) arg ;
2021-07-16 16:06:59 +08:00
struct event_base * client_evbase ;
struct timeval tv ;
struct scanner_timer_priv timer_priv ;
enum DORIS_UPDATE_TYPE update_type ;
char stored_path [ 256 ] ;
prctl ( PR_SET_NAME , " index_file " ) ;
memset ( & timer_priv , 0 , sizeof ( struct scanner_timer_priv ) ) ;
client_evbase = event_base_new ( ) ;
2021-08-25 18:40:20 +08:00
business - > source_from = RECV_WAY_IDX_FILE ;
business - > worker_evbase = client_evbase ;
2021-07-16 16:06:59 +08:00
timer_priv . scanner = doris_index_file_scanner ( 0 ) ;
2021-07-22 10:25:42 +08:00
timer_priv . business = business ;
2021-07-16 16:06:59 +08:00
/*Retaive latest config to memory from Stored configs*/
timer_priv . doris_cbs . version_start = doris_config_localmem_version_start ;
timer_priv . doris_cbs . version_finish = doris_config_localmem_version_finish ;
timer_priv . doris_cbs . version_error = doris_config_localmem_version_error ;
timer_priv . doris_cbs . cfgfile_start = doris_config_localmem_cfgfile_start ;
timer_priv . doris_cbs . cfgfile_update = doris_config_localmem_cfgfile_update ;
timer_priv . doris_cbs . cfgfile_finish = doris_config_localmem_cfgfile_finish ;
2021-08-25 18:40:20 +08:00
timer_priv . doris_cbs . version_updated = NULL ;
timer_priv . doris_cbs . userdata = business ;
2021-07-16 16:06:59 +08:00
2021-07-22 10:25:42 +08:00
snprintf ( stored_path , 512 , " %s/full/index " , business - > store_path_root ) ;
2021-07-16 16:06:59 +08:00
update_type = doris_index_file_traverse ( timer_priv . scanner , stored_path , & timer_priv . doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
2021-08-30 14:22:48 +08:00
if ( business - > saves_when_fulldel > 0 )
{
get_full_topN_max_versions ( stored_path , business - > full_version_inc , business - > saves_when_fulldel ) ;
}
2021-07-22 10:25:42 +08:00
snprintf ( stored_path , 512 , " %s/inc/index " , business - > store_path_root ) ;
2021-07-16 16:06:59 +08:00
do {
update_type = doris_index_file_traverse ( timer_priv . scanner , stored_path , & timer_priv . doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
2021-08-25 18:40:20 +08:00
} while ( update_type ! = CFG_UPDATE_TYPE_NONE ) ;
2021-07-16 16:06:59 +08:00
/*Check new configs*/
timer_priv . doris_cbs . version_start = doris_config_version_start ;
timer_priv . doris_cbs . version_finish = doris_config_version_finish ;
timer_priv . doris_cbs . version_error = doris_config_version_error ;
timer_priv . doris_cbs . cfgfile_start = doris_config_cfgfile_start ;
timer_priv . doris_cbs . cfgfile_update = doris_config_cfgfile_update ;
timer_priv . doris_cbs . cfgfile_finish = doris_config_cfgfile_finish ;
2021-07-22 10:25:42 +08:00
update_type = doris_index_file_traverse ( timer_priv . scanner , business - > recv_path_full ,
2021-07-16 16:06:59 +08:00
& timer_priv . doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
2021-08-25 18:40:20 +08:00
if ( update_type ! = CFG_UPDATE_TYPE_NONE )
2021-07-16 16:06:59 +08:00
{
tv . tv_sec = 0 ;
}
else
{
tv . tv_sec = g_doris_server_info . scan_idx_interval ;
}
tv . tv_usec = 0 ;
evtimer_assign ( & timer_priv . timer_scanner , client_evbase , doris_scanner_timer_cb , & timer_priv ) ;
evtimer_add ( & timer_priv . timer_scanner , & tv ) ;
event_base_dispatch ( client_evbase ) ;
printf ( " Libevent dispath error, should not run here. \n " ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " Libevent dispath error, should not run here. " ) ;
return NULL ;
}
2021-08-25 18:40:20 +08:00
struct bufferevent * doris_https_bufferevent_cb ( struct event_base * evabse , void * arg )
{
SSL_CTX * ssl_instance = ( SSL_CTX * ) arg ;
return bufferevent_openssl_socket_new ( evabse , - 1 , SSL_new ( ssl_instance ) , BUFFEREVENT_SSL_ACCEPTING , BEV_OPT_CLOSE_ON_FREE ) ;
}
struct doris_business * lookup_bizstruct_from_name ( const struct evkeyvalq * params )
{
map < string , struct doris_business * > : : iterator iter ;
const char * bizname ;
if ( NULL = = ( bizname = evhttp_find_header ( params , " business " ) ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
return NULL ;
}
if ( ( iter = g_doris_server_info . name2business - > find ( string ( bizname ) ) ) = = g_doris_server_info . name2business - > end ( ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
return NULL ;
}
return iter - > second ;
}
struct version_list_node * lookup_vernode_struct_from_name ( struct doris_business * business , const struct evkeyvalq * params )
{
map < string , struct version_list_node * > : : iterator iter ;
const char * token ;
if ( NULL = = ( token = evhttp_find_header ( params , " token " ) ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
return NULL ;
}
if ( ( iter = business - > token2node - > find ( string ( token ) ) ) = = business - > token2node - > end ( ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
return NULL ;
}
return iter - > second ;
}
struct version_list_node * lookup_vernode_struct_from_name_renew ( struct doris_business * business , const struct evkeyvalq * params )
{
struct version_list_node * vernode ;
struct timeval tv ;
if ( NULL = = ( vernode = lookup_vernode_struct_from_name ( business , params ) ) )
{
return NULL ;
}
if ( vernode - > business - > concurrency_allowed )
{
tv . tv_sec = g_doris_server_info . post_vernode_ttl ;
tv . tv_usec = 0 ;
evtimer_add ( & vernode - > timer_expire , & tv ) ;
}
return vernode ;
}
/*<2A> <> ֤business֮<73> <D6AE> <EFBFBD> <EFBFBD> <EFBFBD> ɵ<EFBFBD> token<65> <6E> <EFBFBD> <EFBFBD> ͻ*/
void prod_server_generate_token ( struct doris_business * business , char * token /*OUT*/ , size_t size )
{
pthread_mutex_lock ( & g_doris_server_info . mutex_lock ) ;
snprintf ( token , size , " %u-%lu-%u-%u " , g_doris_server_info . local_ip , time ( NULL ) , rand ( ) , + + g_doris_server_info . token_seq ) ;
pthread_mutex_unlock ( & g_doris_server_info . mutex_lock ) ;
}
2021-09-08 10:45:47 +08:00
/*TODO: <20> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ɱȶԷ<C8B6> <D4B7> 汾<EFBFBD> <E6B1BE> С <EFBFBD> İ汾*/
int64_t prod_server_generate_version ( struct doris_business * business )
{
return + + business - > genversion_seq ;
}
2021-08-25 18:40:20 +08:00
void business_resume_sync_peer_normal ( struct doris_business * business )
{
u_int32_t business_post_ups ;
if ( ! g_doris_server_info . cluster_sync_mode )
{
return ;
}
if ( 1 = = atomic_set ( & business - > ready_to_sync , 1 ) | | business - > listener_prod = = 0 )
{
return ;
}
pthread_mutex_lock ( & g_doris_server_info . mutex_lock ) ;
business_post_ups = + + g_doris_server_info . business_post_ups ;
pthread_mutex_unlock ( & g_doris_server_info . mutex_lock ) ;
if ( business_post_ups = = g_doris_server_info . business_post_num )
{
MESA_Monitor_operation ( g_doris_server_info . monitor , g_doris_server_info . mmid_post_server , MONITOR_VALUE_SET , PROMETHUES_POST_SERVER_OK ) ;
}
else
{
MESA_Monitor_operation ( g_doris_server_info . monitor , g_doris_server_info . mmid_post_server , MONITOR_VALUE_SET , PROMETHUES_POST_SERVER_UPING ) ;
}
assert ( business_post_ups < = g_doris_server_info . business_post_num ) ;
}
void business_set_sync_peer_abnormal ( struct doris_business * business )
{
u_int32_t business_post_ups ;
if ( ! g_doris_server_info . cluster_sync_mode )
{
return ;
}
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [1;31;40m[Alert]cluster sync error, please check slave status!!! \033 [0m \n " ) ;
2021-08-25 18:40:20 +08:00
if ( 0 = = atomic_set ( & business - > ready_to_sync , 0 ) | | business - > listener_prod = = 0 )
{
return ;
}
pthread_mutex_lock ( & g_doris_server_info . mutex_lock ) ;
business_post_ups = - - g_doris_server_info . business_post_ups ;
pthread_mutex_unlock ( & g_doris_server_info . mutex_lock ) ;
if ( business_post_ups = = 0 )
{
MESA_Monitor_operation ( g_doris_server_info . monitor , g_doris_server_info . mmid_post_server , MONITOR_VALUE_SET , PROMETHUES_POST_SERVER_DOWN ) ;
}
else
{
MESA_Monitor_operation ( g_doris_server_info . monitor , g_doris_server_info . mmid_post_server , MONITOR_VALUE_SET , PROMETHUES_POST_SERVER_UPING ) ;
}
assert ( business_post_ups < g_doris_server_info . business_post_num ) ;
}
char * vernode_print_json_meta ( struct version_list_node * vernode )
{
struct table_list_node * tablenode ;
cJSON * root , * array = NULL , * item ;
char * p ;
root = cJSON_CreateObject ( ) ;
cJSON_AddStringToObject ( root , " token " , vernode - > token ) ;
cJSON_AddNumberToObject ( root , " type " , vernode - > cfg_type ) ;
TAILQ_FOREACH ( tablenode , & vernode - > table_head , table_node )
{
if ( array = = NULL )
{
array = cJSON_CreateArray ( ) ;
}
item = cJSON_CreateObject ( ) ;
cJSON_AddStringToObject ( item , " tablename " , tablenode - > tablename ) ;
cJSON_AddNumberToObject ( item , " size " , tablenode - > cur_totallen ) ;
cJSON_AddItemToArray ( array , item ) ;
assert ( tablenode - > finished ) ; //<2F> ϴ<EFBFBD> <CFB4> <EFBFBD> <EFBFBD> ϵIJ<CFB5> <C4B2> ܼ<EFBFBD> <DCBC> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD>
}
if ( vernode - > cur_table ! = NULL )
{
if ( array = = NULL )
{
array = cJSON_CreateArray ( ) ;
}
item = cJSON_CreateObject ( ) ;
cJSON_AddStringToObject ( item , " tablename " , vernode - > cur_table - > tablename ) ;
cJSON_AddNumberToObject ( item , " offset " , vernode - > cur_table - > cur_totallen ) ;
cJSON_AddItemToArray ( array , item ) ;
}
cJSON_AddItemToObject ( root , " configs " , array ) ;
p = cJSON_PrintUnformatted ( root ) ;
cJSON_Delete ( root ) ;
return p ;
}
void http_prod_server_verion_check_cb ( struct evhttp_request * req , void * arg )
{
struct doris_business * business = ( struct doris_business * ) arg ;
struct version_list_node * vernode ;
struct evkeyvalq params ;
struct evbuffer * evbuf ;
char * p ;
if ( evhttp_parse_query ( evhttp_request_get_uri ( req ) , & params ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameters invalid " ) ;
return ;
}
if ( NULL = = ( vernode = lookup_vernode_struct_from_name ( business , & params ) ) )
{
evhttp_clear_headers ( & params ) ;
evhttp_send_error ( req , HTTP_NOTFOUND , " Parameter token not found " ) ;
return ;
}
evhttp_clear_headers ( & params ) ;
if ( vernode - > syncing )
{
evhttp_send_error ( req , 310 , " table syncing now, retry later " ) ;
return ;
}
p = vernode_print_json_meta ( vernode ) ;
evbuf = evbuffer_new ( ) ;
evbuffer_add ( evbuf , p , strlen ( p ) ) ;
if ( vernode - > version_finished )
{
evhttp_send_reply ( req , HTTP_OK , " OK " , evbuf ) ;
}
else
{
evhttp_send_reply ( req , 300 , " version is posting " , evbuf ) ;
}
evbuffer_free ( evbuf ) ;
free ( p ) ;
}
void http_config_direct_version_cancel ( struct version_list_node * vernode , struct evhttp_request * req )
{
struct doris_business * business = vernode - > business ;
struct table_list_node * tablenode ;
char token [ 64 ] ;
sprintf ( token , " %s " , vernode - > token ) ;
if ( vernode - > synctx ! = NULL )
{
doris_prod_upload_ctx_destroy ( vernode - > synctx ) ;
}
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
2021-08-25 18:40:20 +08:00
{
2021-09-08 10:45:47 +08:00
if ( vernode - > fp_idx_file ! = NULL )
{
fclose ( vernode - > fp_idx_file ) ;
remove ( vernode - > tmp_index_path ) ;
}
if ( vernode - > cur_table ! = NULL & & vernode - > cur_table - > fp_cfg_file ! = NULL )
{
fclose ( vernode - > cur_table - > fp_cfg_file ) ;
remove ( vernode - > cur_table - > localpath ) ;
}
TAILQ_FOREACH ( tablenode , & vernode - > table_head , table_node )
{
remove ( tablenode - > localpath ) ;
}
2021-08-25 18:40:20 +08:00
}
config_version_node_cleanup ( vernode ) ;
2021-08-30 14:22:48 +08:00
if ( business - > concurrency_allowed & & evtimer_pending ( & vernode - > timer_expire , NULL ) )
2021-08-25 18:40:20 +08:00
{
evtimer_del ( & vernode - > timer_expire ) ;
}
business - > cur_vernode = NULL ; //<2F> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD>
business - > posts_on_the_way - - ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_POST_ON_THE_WAY ] , FS_OP_SET , business - > posts_on_the_way ) ;
evhttp_send_reply ( req , 200 , " OK " , NULL ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, post server version cancel, token: %s " , business - > bizname , token ) ;
}
void prod_sync_vercancel_result_cb ( enum PROD_VEROP_RES result , void * userdata )
{
struct version_list_node * vernode = ( struct version_list_node * ) userdata ;
vernode - > syncing = 0 ;
vernode - > retry_times + + ;
switch ( result )
{
case VERSIONOP_RES_OK :
http_config_direct_version_cancel ( vernode , vernode - > req ) ;
break ;
case VERSIONOP_RES_ERROR :
evhttp_send_error ( vernode - > req , 500 , " version cancel sync error res_code " ) ;
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [1;31;40m[Alert]business: %s, version cancel sync error res_code, abandon it. Send 500 response to client. \033 [0m " , vernode - > business - > bizname ) ;
2021-08-25 18:40:20 +08:00
break ;
case VERSIONOP_CURL_ERROR :
if ( atomic_read ( & vernode - > business - > ready_to_sync ) & & ( vernode - > retry_times < 3 ) )
{
vernode - > syncing = 1 ;
doris_prod_version_cancel ( vernode - > synctx , prod_sync_vercancel_result_cb , vernode ) ;
}
else
{
http_config_direct_version_cancel ( vernode , vernode - > req ) ;
business_set_sync_peer_abnormal ( vernode - > business ) ;
}
break ;
default : assert ( 0 ) ; break ;
}
}
void http_prod_server_verion_cancel_cb ( struct evhttp_request * req , void * arg )
{
struct doris_business * business = ( struct doris_business * ) arg ;
struct version_list_node * vernode ;
struct evkeyvalq params ;
if ( evhttp_parse_query ( evhttp_request_get_uri ( req ) , & params ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameters invalid " ) ;
return ;
}
if ( NULL = = ( vernode = lookup_vernode_struct_from_name_renew ( business , & params ) ) )
{
evhttp_clear_headers ( & params ) ;
evhttp_send_error ( req , HTTP_OK , " Parameter token not found " ) ; //<2F> <> <EFBFBD> <EFBFBD> һ <EFBFBD> <D2BB> <EFBFBD> <EFBFBD>
return ;
}
evhttp_clear_headers ( & params ) ;
if ( vernode - > version_finished )
{
evhttp_send_error ( req , HTTP_BADREQUEST , " version already finished " ) ;
return ;
}
if ( vernode - > syncing )
{
evhttp_send_error ( req , 300 , " table syncing now, retry later " ) ;
return ;
}
if ( ! atomic_read ( & business - > ready_to_sync ) | |
NULL ! = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Doris-Master-Slave-Sync " ) )
{
return http_config_direct_version_cancel ( vernode , req ) ;
}
vernode - > retry_times = 0 ;
vernode - > syncing = 1 ;
vernode - > req = req ;
doris_prod_version_cancel ( vernode - > synctx , prod_sync_vercancel_result_cb , vernode ) ;
}
void doris_config_post_version_finish ( struct doris_business * business , struct version_list_node * vernode , int64_t newversion )
{
assert ( newversion > vernode - > version ) ;
vernode - > version = newversion ;
business - > version = vernode - > version ;
business - > type = vernode - > cfg_type ;
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
{
if ( vernode - > cfg_type = = CFG_UPDATE_TYPE_FULL )
{
snprintf ( business - > inc_index_path , 256 , " %s/inc/index/full_config_index.%010lu " , business - > store_path_root , vernode - > version ) ;
snprintf ( business - > full_index_path , 256 , " %s/full/index/full_config_index.%010lu " , business - > store_path_root , vernode - > version ) ;
}
else
{
snprintf ( business - > inc_index_path , 256 , " %s/inc/index/inc_config_index.%010lu " , business - > store_path_root , vernode - > version ) ;
}
/*HTTP postʱ<74> <CAB1> <EFBFBD> <EFBFBD> <EFBFBD> 汾<EFBFBD> <E6B1BE> <EFBFBD> <EFBFBD> ÿ<EFBFBD> <C3BF> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> Լ<EFBFBD> <D4BC> <EFBFBD> <EFBFBD> <EFBFBD> ʱ֪ͨ<CDA8> ļ<EFBFBD> <C4BC> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ñ<EFBFBD> <C3B1> <EFBFBD> <EFBFBD> ļ<EFBFBD> <C4BC> Ĺرպ<D8B1> <D5BA> <EFBFBD> */
sprintf ( business - > tmp_index_path , " %s " , vernode - > tmp_index_path ) ;
business - > fp_idx_file = vernode - > fp_idx_file ;
doris_config_file_version_finish ( NULL , business ) ;
vernode - > fp_idx_file = NULL ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
{
business - > cur_vernode = vernode ;
cJSON_AddNumberToObject ( vernode - > metajson , " version " , vernode - > version ) ;
doris_config_mem_version_finish ( NULL , business ) ;
}
business - > version_cfgnum = vernode - > total_cfgs ;
doris_config_common_version_finish ( business ) ;
business - > cfgver_head - > latest_version = vernode - > version ;
if ( vernode - > synctx ! = NULL )
{
doris_prod_upload_ctx_destroy ( vernode - > synctx ) ;
vernode - > synctx = NULL ;
}
vernode - > version_finished = 1 ;
business - > posts_on_the_way - - ;
business - > cur_vernode = NULL ; //<2F> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD>
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_POST_ON_THE_WAY ] , FS_OP_SET , business - > posts_on_the_way ) ;
}
void http_config_direct_version_finish ( struct version_list_node * vernode , struct evhttp_request * req , int64_t set_version )
{
struct doris_business * business = vernode - > business ;
2021-09-08 10:45:47 +08:00
char version [ 32 ] , token [ 64 ] , lvdbkey [ 40 ] ;
2021-08-25 18:40:20 +08:00
int64_t new_version ;
2021-08-30 14:22:48 +08:00
if ( business - > concurrency_allowed & & evtimer_pending ( & vernode - > timer_expire , NULL ) )
2021-08-25 18:40:20 +08:00
{
evtimer_del ( & vernode - > timer_expire ) ;
}
if ( set_version = = 0 )
{
2021-09-08 10:45:47 +08:00
new_version = prod_server_generate_version ( business ) ;
2021-08-25 18:40:20 +08:00
}
else
{
2021-09-08 10:45:47 +08:00
new_version = business - > genversion_seq = set_version ;
}
/*<2A> <> <EFBFBD> <EFBFBD> leveldb<64> <62> <EFBFBD> ɰ汾<C9B0> ŵ<EFBFBD> <C5B5> <EFBFBD> <EFBFBD> <EFBFBD> */
if ( ! business - > persistence_write_on )
{
sprintf ( lvdbkey , " %s_verseq " , business - > bizname ) ;
if ( ! doris_kvdb_update_keystr_valint ( g_doris_server_info . kvdbhandle , lvdbkey , new_version ) )
{
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [1;31;40m[Alert] business: %s, update levelDB failed! \033 [0m \n " , business - > bizname ) ;
}
2021-08-25 18:40:20 +08:00
}
2021-09-08 10:45:47 +08:00
2021-08-25 18:40:20 +08:00
sprintf ( token , " %s " , vernode - > token ) ;
doris_config_post_version_finish ( business , vernode , new_version ) ;
sprintf ( version , " %lu " , new_version ) ;
evhttp_add_header ( evhttp_request_get_output_headers ( req ) , " X-Set-Version " , version ) ;
evhttp_send_reply ( req , 200 , " OK " , NULL ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, post server version finish, token: %s, version: %lu " , business - > bizname , token , new_version ) ;
}
void prod_sync_verend_result_cb ( enum PROD_VEROP_RES result , int64_t version , void * userdata )
{
struct version_list_node * vernode = ( struct version_list_node * ) userdata ;
vernode - > retry_times + + ;
vernode - > syncing = 0 ;
switch ( result )
{
case VERSIONOP_RES_OK :
http_config_direct_version_finish ( vernode , vernode - > req , version ) ;
break ;
case VERSIONOP_RES_ERROR :
evhttp_send_error ( vernode - > req , 500 , " version end sync error res_code " ) ;
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [1;31;40m[Alert]business: %s, version end sync error res_code, abandon it. Send 500 response to client. \033 [0m " , vernode - > business - > bizname ) ;
2021-08-25 18:40:20 +08:00
break ;
case VERSIONOP_CURL_ERROR :
if ( atomic_read ( & vernode - > business - > ready_to_sync ) & & ( vernode - > retry_times < 3 ) )
{
vernode - > syncing = 1 ;
doris_prod_version_end ( vernode - > synctx , prod_sync_verend_result_cb , vernode ) ;
}
else
{
http_config_direct_version_finish ( vernode , vernode - > req , 0 ) ;
business_set_sync_peer_abnormal ( vernode - > business ) ;
}
break ;
default : assert ( 0 ) ; break ;
}
}
void http_prod_server_verion_end_cb ( struct evhttp_request * req , void * arg )
{
struct doris_business * business = ( struct doris_business * ) arg ;
struct version_list_node * vernode ;
struct evkeyvalq params ;
char version [ 32 ] ;
if ( evhttp_parse_query ( evhttp_request_get_uri ( req ) , & params ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameters invalid " ) ;
return ;
}
if ( NULL = = ( vernode = lookup_vernode_struct_from_name_renew ( business , & params ) ) )
{
evhttp_clear_headers ( & params ) ;
evhttp_send_error ( req , HTTP_NOTFOUND , " Parameter token invalid " ) ;
return ;
}
evhttp_clear_headers ( & params ) ;
if ( vernode - > version_finished )
{
sprintf ( version , " %lu " , vernode - > version ) ;
evhttp_add_header ( evhttp_request_get_output_headers ( req ) , " X-Set-Version " , version ) ;
2021-08-30 14:22:48 +08:00
evhttp_send_reply ( req , HTTP_OK , " version already finished " , NULL ) ; //<2F> <> ֤<EFBFBD> <D6A4> <EFBFBD> <EFBFBD> һ <EFBFBD> <D2BB> <EFBFBD> <EFBFBD>
2021-08-25 18:40:20 +08:00
return ;
}
if ( vernode - > cur_table ! = NULL | | vernode - > syncing )
{
evhttp_send_error ( req , 300 , " table not finished yet " ) ;
return ;
}
if ( ! atomic_read ( & business - > ready_to_sync ) | |
NULL ! = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Doris-Master-Slave-Sync " ) )
{
return http_config_direct_version_finish ( vernode , req , 0 ) ;
}
if ( vernode - > synctx = = NULL )
{
evhttp_send_error ( req , 400 , " illegal server host, cannt change server durain version life cycle " ) ;
return ;
}
vernode - > retry_times = 0 ;
vernode - > syncing = 1 ;
vernode - > req = req ;
doris_prod_version_end ( vernode - > synctx , prod_sync_verend_result_cb , vernode ) ;
}
static void post_vernode_expire_destroy_cb ( int fd , short kind , void * userp )
{
struct version_list_node * vernode = ( struct version_list_node * ) userp ;
struct table_list_node * tablenode ;
struct timeval tv ;
if ( vernode - > syncing )
{
tv . tv_sec = g_doris_server_info . post_vernode_ttl ;
tv . tv_usec = 0 ;
evtimer_add ( & vernode - > timer_expire , & tv ) ;
return ;
}
if ( vernode - > synctx ! = NULL )
{
doris_prod_upload_ctx_destroy ( vernode - > synctx ) ;
vernode - > synctx = NULL ;
}
if ( vernode - > fp_idx_file ! = NULL )
{
fclose ( vernode - > fp_idx_file ) ;
remove ( vernode - > tmp_index_path ) ;
}
if ( vernode - > cur_table ! = NULL & & vernode - > cur_table - > fp_cfg_file ! = NULL )
{
fclose ( vernode - > cur_table - > fp_cfg_file ) ;
remove ( vernode - > cur_table - > localpath ) ;
}
TAILQ_FOREACH ( tablenode , & vernode - > table_head , table_node )
{
remove ( tablenode - > localpath ) ;
}
vernode - > business - > posts_on_the_way - - ;
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_VERSION_EXPIRES ] , 0 , FS_OP_ADD , 1 ) ;
FS_operate ( g_doris_server_info . fsstat_handle , vernode - > business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_POST_ON_THE_WAY ] , FS_OP_SET , vernode - > business - > posts_on_the_way ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, token %s expires " , vernode - > business - > bizname , vernode - > token ) ;
config_version_node_cleanup ( vernode ) ;
}
struct version_list_node * doris_config_post_version_prepare ( struct doris_business * business , int32_t cfgtype )
{
struct version_list_node * vernode ;
struct timeval tv ;
vernode = ( struct version_list_node * ) calloc ( 1 , sizeof ( struct version_list_node ) ) ;
vernode - > business = business ;
vernode - > cfg_type = cfgtype ;
if ( business - > concurrency_allowed )
{
tv . tv_sec = g_doris_server_info . post_vernode_ttl ;
tv . tv_usec = 0 ;
evtimer_assign ( & vernode - > timer_expire , business - > worker_evbase , post_vernode_expire_destroy_cb , vernode ) ;
evtimer_add ( & vernode - > timer_expire , & tv ) ;
}
return vernode ;
}
void doris_config_post_version_start ( struct version_list_node * cur_vernode , const char * token )
{
struct doris_business * business = cur_vernode - > business ;
snprintf ( cur_vernode - > token , 64 , " %s " , token ) ;
2021-09-08 10:45:47 +08:00
if ( business - > persistence_write_on )
2021-08-25 18:40:20 +08:00
{
2021-09-08 10:45:47 +08:00
if ( cur_vernode - > cfg_type = = CFG_UPDATE_TYPE_FULL )
{
snprintf ( cur_vernode - > tmp_index_path , 256 , " %s/inc/full_config_index.%s.ing " , business - > store_path_root , token ) ;
}
else
{
snprintf ( cur_vernode - > tmp_index_path , 256 , " %s/inc/full_config_index.%s.ing " , business - > store_path_root , token ) ;
}
if ( NULL = = ( cur_vernode - > fp_idx_file = fopen ( cur_vernode - > tmp_index_path , " w+ " ) ) )
{
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, fopen %s failed: %s " , business - > bizname , cur_vernode - > tmp_index_path , strerror ( errno ) ) ;
assert ( 0 ) ;
}
2021-08-25 18:40:20 +08:00
}
if ( g_doris_server_info . consumer_port )
{
TAILQ_INIT ( & cur_vernode - > table_head ) ;
cur_vernode - > metajson = cJSON_CreateObject ( ) ;
cur_vernode - > arrayjson = cJSON_CreateArray ( ) ;
cJSON_AddNumberToObject ( cur_vernode - > metajson , " type " , cur_vernode - > cfg_type ) ;
}
business - > token2node - > insert ( make_pair ( string ( token ) , cur_vernode ) ) ;
}
void http_post_direct_version_start ( struct version_list_node * cur_vernode , struct evhttp_request * req , const char * role )
{
struct doris_business * business = cur_vernode - > business ;
char token [ 64 ] , * p ;
struct evbuffer * evbuf ;
cJSON * meta ;
prod_server_generate_token ( business , token , 64 ) ;
doris_config_post_version_start ( cur_vernode , token ) ;
meta = cJSON_CreateObject ( ) ;
cJSON_AddStringToObject ( meta , " token " , token ) ;
p = cJSON_PrintUnformatted ( meta ) ;
cJSON_Delete ( meta ) ;
evbuf = evbuffer_new ( ) ;
evbuffer_add ( evbuf , p , strlen ( p ) ) ;
evhttp_send_reply ( req , 200 , " OK " , evbuf ) ;
evbuffer_free ( evbuf ) ;
cur_vernode - > req = NULL ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, post %s server send response version start: %s " , business - > bizname , role , p ) ;
free ( p ) ;
}
void try_restore_from_busy_peer ( struct version_list_node * cur_vernode , const char * body , bool busy )
{
struct doris_business * business = cur_vernode - > business ;
struct evbuffer * evbuf ;
cJSON * meta , * token ;
/*<2A> Զ˼<D4B6> Ȼbusy<73> <79> ˵<EFBFBD> <CBB5> <EFBFBD> <EFBFBD> һ <EFBFBD> <D2BB> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> token<65> <6E> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> token<65> IJ<EFBFBD> <C4B2> <EFBFBD> <EFBFBD> ǶԷ<C7B6> <D4B7> <EFBFBD> <EFBFBD> <EFBFBD> */
if ( ( NULL = = ( meta = cJSON_Parse ( body ) ) ) | | NULL = = ( token = ( cJSON_GetObjectItem ( meta , " token " ) ) ) )
{
assert ( 0 ) ;
}
/*<2A> <> <EFBFBD> <EFBFBD> һ <EFBFBD> <D2BB> û<EFBFBD> <C3BB> <EFBFBD> ϴ<EFBFBD> <CFB4> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ã<EFBFBD> <C3A3> <EFBFBD> Ϊ<EFBFBD> <CEAA> <EFBFBD> Է<EFBFBD> <D4B7> <EFBFBD> <EFBFBD> <EFBFBD> ; <EFBFBD> <CDBE> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> post server<65> <72> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> curlʧ<6C> ܵ<EFBFBD> <DCB5> <EFBFBD> <EFBFBD> <EFBFBD> */
assert ( NULL = = cJSON_GetObjectItem ( meta , " configs " ) ) ;
doris_config_post_version_start ( cur_vernode , token - > valuestring ) ;
cJSON_Delete ( meta ) ;
evbuf = evbuffer_new ( ) ;
evbuffer_add ( evbuf , body , strlen ( body ) ) ;
evhttp_send_reply ( cur_vernode - > req , 200 , " OK " , evbuf ) ;
evbuffer_free ( evbuf ) ;
cur_vernode - > req = NULL ;
if ( busy )
{
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " \033 [33m[Warning]business: %s, restore from busy peer, post master server send response version start: %s \033 [0m " , business - > bizname , body ) ;
2021-08-25 18:40:20 +08:00
}
else
{
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, post master server send response version start: %s " , business - > bizname , body ) ;
}
}
void prod_sync_verstart_result_cb ( enum PROD_VERSTART_RES result , const char * body , void * userdata )
{
struct version_list_node * vernode = ( struct version_list_node * ) userdata ;
struct doris_business * business = vernode - > business ;
vernode - > retry_times + + ;
vernode - > syncing = 0 ;
switch ( result )
{
case VERSTART_RES_OK :
try_restore_from_busy_peer ( vernode , body , false ) ;
break ;
case VERSTART_RES_BUSY : //һ <> <D2BB> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ǰ<EFBFBD> <C7B0> <EFBFBD> <EFBFBD> CURLE<4C> <45> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ģ<EFBFBD> <C4A3> <EFBFBD> rate limit
try_restore_from_busy_peer ( vernode , body , true ) ;
break ;
case VERSTART_RES_ERROR : //<2F> Ƿ<EFBFBD> <C7B7> <EFBFBD> <EFBFBD> <EFBFBD> ֱ<EFBFBD> ӷ<EFBFBD> <D3B7> ظ<EFBFBD> Client
evhttp_send_error ( vernode - > req , 500 , " version start sync error res_code " ) ;
doris_prod_upload_ctx_destroy ( vernode - > synctx ) ;
free ( vernode ) ;
business - > cur_vernode = NULL ;
business - > posts_on_the_way - - ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_POST_ON_THE_WAY ] , FS_OP_SET , business - > posts_on_the_way ) ;
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [1;31;40m[Alert]business: %s, version start sync error res_code, abandon it. Send 500 response to client. \033 [0m " , business - > bizname ) ;
2021-08-25 18:40:20 +08:00
break ;
case VERSTART_CURL_ERROR :
if ( atomic_read ( & business - > ready_to_sync ) & & ( vernode - > retry_times < 3 ) )
{
vernode - > syncing = 1 ;
doris_prod_version_start_with_cb ( vernode - > synctx , prod_sync_verstart_result_cb , vernode ) ;
}
else
{
http_post_direct_version_start ( vernode , vernode - > req , " master " ) ;
business_set_sync_peer_abnormal ( vernode - > business ) ;
}
break ;
default : assert ( 0 ) ; break ;
}
}
void concurrency_send_busy_reply ( struct doris_business * business , struct evhttp_request * req )
{
char * p ;
struct evbuffer * evbuf ;
/*<2A> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> version start<72> <74> <EFBFBD> ̣<EFBFBD> <CCA3> <EFBFBD> ͬ<EFBFBD> <CDAC> δ<EFBFBD> <CEB4> <EFBFBD> <EFBFBD> token<65> <6E> <EFBFBD> <EFBFBD> ; <EFBFBD> 汾<EFBFBD> <E6B1BE> <EFBFBD> <EFBFBD> ʱ<EFBFBD> <CAB1> <EFBFBD> ܽ<EFBFBD> <DCBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> */
if ( business - > cur_vernode = = NULL | | business - > cur_vernode - > token [ 0 ] = = ' \0 ' )
{
evhttp_send_error ( req , 400 , " another empty uploading busy " ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_DEBUG , " business: %s busy starting, posts-on-the-way: %d " , business - > bizname , business - > posts_on_the_way ) ;
return ;
}
/*<2A> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> version start<72> <74> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ; <EFBFBD> 汾<EFBFBD> <E6B1BE> <EFBFBD> <EFBFBD> ͬ<EFBFBD> <CDAC> <EFBFBD> õ<EFBFBD> <C3B5> Է<EFBFBD> <D4B7> <EFBFBD> token*/
p = vernode_print_json_meta ( business - > cur_vernode ) ;
evbuf = evbuffer_new ( ) ;
evbuffer_add ( evbuf , p , strlen ( p ) ) ;
evhttp_send_reply ( req , 300 , " another uploading busy " , evbuf ) ;
evbuffer_free ( evbuf ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s busy, posts-on-the-way: %d, reply: %s " , business - > bizname , business - > posts_on_the_way , p ) ;
free ( p ) ;
}
void http_prod_server_verion_start_cb ( struct evhttp_request * req , void * arg )
{
struct doris_business * argbiz = ( struct doris_business * ) arg , * business ;
struct evkeyvalq params ;
const char * type ;
int cfgtype ;
if ( evhttp_parse_query ( evhttp_request_get_uri ( req ) , & params ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameters invalid " ) ;
return ;
}
if ( NULL = = ( business = lookup_bizstruct_from_name ( & params ) ) | | business ! = argbiz )
{
evhttp_clear_headers ( & params ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameter business invalid " ) ;
return ;
}
if ( NULL = = ( type = evhttp_find_header ( & params , " type " ) ) | | ( ( cfgtype = atoi ( type ) ) ! = 1 & & cfgtype ! = 2 ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_clear_headers ( & params ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameter type invalid " ) ;
return ;
}
evhttp_clear_headers ( & params ) ;
if ( ! business - > concurrency_allowed & & business - > posts_on_the_way > 0 )
{
return concurrency_send_busy_reply ( business , req ) ;
}
if ( business - > posts_on_the_way > g_doris_server_info . max_concurrent_reqs )
{
evhttp_send_error ( req , HTTP_SERVUNAVAIL , " Too many concurrent requests, service unavailable " ) ;
return ;
}
/*<2A> <> <EFBFBD> ڲ<EFBFBD> <DAB2> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> business->cur_vernodeʼ <65> ղ<EFBFBD> <D5B2> 䣻<EFBFBD> <E4A3BB> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> */
business - > cur_vernode = doris_config_post_version_prepare ( business , cfgtype ) ;
business - > posts_on_the_way + + ;
business - > type = cfgtype ;
FS_operate ( g_doris_server_info . fsstat_handle , business - > fs_lineid , g_doris_server_info . fsstat_column [ DRS_FSCLM_POST_ON_THE_WAY ] , FS_OP_SET , business - > posts_on_the_way ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s receives a version start request, posts-on-the-way: %d " , business - > bizname , business - > posts_on_the_way ) ;
if ( NULL ! = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Doris-Master-Slave-Sync " ) ) //<2F> ڲ<EFBFBD> <DAB2> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ͬ<EFBFBD> <CDAC> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD>
{
return http_post_direct_version_start ( business - > cur_vernode , req , " slave " ) ;
}
if ( atomic_read ( & business - > ready_to_sync ) & &
( NULL ! = ( business - > cur_vernode - > synctx = doris_prod_upload_ctx_new ( business - > instance , business - > bizname , cfgtype ) ) ) )
{
business - > cur_vernode - > retry_times = 0 ;
business - > cur_vernode - > req = req ;
business - > cur_vernode - > syncing = 1 ;
doris_prod_version_start_with_cb ( business - > cur_vernode - > synctx , prod_sync_verstart_result_cb , business - > cur_vernode ) ;
}
else
{
http_post_direct_version_start ( business - > cur_vernode , req , " master " ) ;
business_set_sync_peer_abnormal ( business ) ;
}
}
bool upload_frag_argument_check_offset ( struct evhttp_request * req , struct evkeyvalq * params ,
struct version_list_node * vernode , struct internal_tablemeta * tablemeta )
{
const char * tmparg ;
char * endptr = NULL , curoffset [ 32 ] ;
size_t length ;
tablemeta - > islast = 0 ;
if ( NULL ! = ( tmparg = evhttp_find_header ( params , " last " ) ) & & ! strcasecmp ( tmparg , " true " ) )
{
tablemeta - > islast = 1 ;
}
if ( ( length = evbuffer_get_length ( evhttp_request_get_input_buffer ( req ) ) ) > 0 )
{
if ( NULL = = ( tmparg = evhttp_find_header ( params , " offset " ) ) )
{
evhttp_send_error ( req , 401 , " Parameter offset not found " ) ;
return false ;
}
tablemeta - > offset = strtol ( tmparg , & endptr , 10 ) ;
if ( * endptr ! = ' \0 ' )
{
evhttp_send_reply ( req , 401 , " Parameter offset invalid " , NULL ) ;
return false ;
}
if ( vernode - > cur_table = = NULL )
{
if ( tablemeta - > offset ! = 0 )
{
evhttp_send_reply ( req , 401 , " Parameter offset is not starting from 0 " , NULL ) ;
return false ;
}
}
else if ( tablemeta - > offset + length < = vernode - > cur_table - > cur_totallen )
{
evhttp_send_reply ( req , 201 , " Parameter offset already uploaded " , NULL ) ;
return false ;
}
else if ( tablemeta - > offset ! = vernode - > cur_table - > cur_totallen )
{
sprintf ( curoffset , " %lu " , vernode - > cur_table - > cur_totallen ) ;
evhttp_add_header ( evhttp_request_get_output_headers ( req ) , " X-Current-Offset " , curoffset ) ;
evhttp_send_reply ( req , 401 , " Parameter offset invalid " , NULL ) ;
return false ;
}
}
else if ( ! tablemeta - > islast | | vernode - > cur_table = = NULL ) //<2F> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> last<73> <74> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> δ<EFBFBD> <CEB4> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ˵<EFBFBD> <CBB5> δ<EFBFBD> ϴ<EFBFBD> <CFB4> <EFBFBD> <EFBFBD> <EFBFBD>
{
evhttp_send_error ( req , 400 , " Content length is zero, but parameter last!=true; or total length is zero, but parameter last=true " ) ;
return false ;
}
return true ;
}
struct version_list_node * upload_file_arguments_valid_check ( struct evhttp_request * req ,
struct doris_business * business , struct internal_tablemeta * tablemeta , bool fragcheck )
{
struct evkeyvalq params ;
struct version_list_node * vernode ;
struct table_list_node * tablenode ;
const char * tablename , * tmparg ;
if ( evhttp_parse_query ( evhttp_request_get_uri ( req ) , & params ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameters invalid " ) ;
return NULL ;
}
if ( NULL = = ( vernode = lookup_vernode_struct_from_name_renew ( business , & params ) ) )
{
evhttp_send_error ( req , HTTP_NOTFOUND , " Parameter token invalid " ) ;
evhttp_clear_headers ( & params ) ;
return NULL ;
}
if ( NULL = = ( tablename = evhttp_find_header ( & params , " tablename " ) ) )
{
evhttp_send_error ( req , HTTP_BADREQUEST , " Parameter tablename invalid " ) ;
evhttp_clear_headers ( & params ) ;
return NULL ;
}
/*<2A> ϸ<EFBFBD> <CFB8> <EFBFBD> δ<EFBFBD> <CEB4> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ű<EFBFBD> <C5B1> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ϴ<EFBFBD> */
if ( vernode - > cur_table ! = NULL & & ( vernode - > syncing | | strcmp ( vernode - > cur_table - > tablename , tablename ) ) )
{
evhttp_send_error ( req , 300 , " tablename busy " ) ;
evhttp_clear_headers ( & params ) ;
return NULL ;
}
/*finished<65> <64> <EFBFBD> Ż<EFBFBD> <C5BB> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> 鿴<EFBFBD> Ƿ<EFBFBD> <C7B7> <EFBFBD> <EFBFBD> иñ<D0B8> <C3B1> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ֧<EFBFBD> <D6A7> <EFBFBD> ظ<EFBFBD> <D8B8> ı <EFBFBD> <C4B1> <EFBFBD> */
tablenode = TAILQ_FIRST ( & vernode - > table_head ) ;
while ( tablenode ! = NULL & & strcmp ( tablename , tablenode - > tablename ) )
{
tablenode = TAILQ_NEXT ( tablenode , table_node ) ;
}
if ( tablenode ! = NULL )
{
evhttp_send_error ( req , HTTP_BADREQUEST , " tablename already finished " ) ;
evhttp_clear_headers ( & params ) ;
return NULL ;
}
if ( fragcheck & & ! upload_frag_argument_check_offset ( req , & params , vernode , tablemeta ) )
{
evhttp_clear_headers ( & params ) ;
return NULL ;
}
snprintf ( tablemeta - > tablename , 64 , " %s " , tablename ) ;
if ( NULL = = ( tmparg = evhttp_find_header ( & params , " filename " ) ) )
{
tablemeta - > filename [ 0 ] = ' \0 ' ;
}
else
{
snprintf ( tablemeta - > filename , 64 , " %s " , tmparg ) ;
}
evhttp_clear_headers ( & params ) ;
return vernode ;
}
bool upload_frag_check_content_md5 ( struct evhttp_request * req , const char * content , size_t len , char * md5str , int md5size )
{
const char * md5 ;
MD5_CTX ctx ;
if ( NULL = = ( md5 = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " Content-MD5 " ) ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , 402 , " Content-MD5 header not found " ) ;
return false ;
}
MD5_Init ( & ctx ) ;
MD5_Update ( & ctx , content , len ) ;
scandir_md5_final_string ( & ctx , md5str , md5size ) ;
if ( strcasecmp ( md5 , md5str ) )
{
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_CLIENT_INVALID_REQ ] , 0 , FS_OP_ADD , 1 ) ;
evhttp_send_error ( req , 402 , " Content-MD5 not match " ) ;
return false ;
}
return true ;
}
void doris_config_post_cfgfile_prepare ( struct version_list_node * cur_vernode ,
struct internal_tablemeta * tablemeta , const char * md5 , u_int32_t cfgnum , char * content , size_t size )
{
if ( cur_vernode - > cur_table = = NULL )
{
cur_vernode - > cur_table = ( struct table_list_node * ) calloc ( 1 , sizeof ( struct table_list_node ) ) ;
cur_vernode - > cur_table - > cfgnum = cfgnum ;
cur_vernode - > total_cfgs + = cfgnum ;
sprintf ( cur_vernode - > cur_table - > tablename , " %s " , tablemeta - > tablename ) ;
if ( tablemeta - > filename [ 0 ] ! = ' \0 ' ) //Clientָ<74> <D6B8> <EFBFBD> ļ<EFBFBD> <C4BC> <EFBFBD>
{
sprintf ( cur_vernode - > cur_table - > filename , " %s " , tablemeta - > filename ) ;
}
else
{
snprintf ( cur_vernode - > cur_table - > filename , 128 , " %s.%s " , tablemeta - > tablename , cur_vernode - > token ) ;
}
MD5_Init ( & cur_vernode - > cur_table - > md5ctx ) ;
TAILQ_INIT ( & cur_vernode - > cur_table - > frag_head ) ;
}
cur_vernode - > cur_table - > fragcontent = content ;
cur_vernode - > cur_table - > fragsize = size ;
cur_vernode - > cur_table - > finished = tablemeta - > islast ;
sprintf ( cur_vernode - > cur_table - > fragmd5 , " %s " , md5 ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_DEBUG , " business: %s, table %s receives a file part, offset: %lu, size: %lu " ,
cur_vernode - > business - > bizname , tablemeta - > tablename , tablemeta - > offset , size ) ;
}
void doris_config_post_cfgfile_start ( struct version_list_node * vernode , struct evhttp_request * req )
{
struct tablemeta meta ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, table %s start... " , vernode - > business - > bizname , vernode - > cur_table - > filename ) ;
FS_operate ( g_doris_server_info . fsstat_handle , g_doris_server_info . fsstat_field [ DRS_FSSTAT_RECV_START_FILES ] , 0 , FS_OP_ADD , 1 ) ;
meta . tablename = vernode - > cur_table - > tablename ;
meta . filename = vernode - > cur_table - > filename ;
meta . userregion = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-User-Info " ) ;
meta . cfgnum = vernode - > cur_table - > cfgnum ;
meta . size = 0 ;
2021-09-08 10:45:47 +08:00
if ( vernode - > business - > persistence_write_on )
{
vernode - > business - > type = vernode - > cfg_type ;
vernode - > business - > fp_idx_file = vernode - > fp_idx_file ;
doris_config_file_cfgfile_start ( NULL , & meta , NULL , vernode - > business ) ;
sprintf ( vernode - > cur_table - > localpath , " %s " , vernode - > business - > cfg_file_path ) ;
vernode - > cur_table - > fp_cfg_file = vernode - > business - > fp_cfg_file ;
}
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . consumer_port )
{
vernode - > cur_table - > table_meta = cJSON_CreateObject ( ) ;
cJSON_AddStringToObject ( vernode - > cur_table - > table_meta , " tablename " , meta . tablename ) ;
cJSON_AddStringToObject ( vernode - > cur_table - > table_meta , " filename " , meta . filename ) ;
cJSON_AddNumberToObject ( vernode - > cur_table - > table_meta , " cfg_num " , meta . cfgnum ) ;
if ( meta . userregion ! = NULL )
{
cJSON_AddStringToObject ( vernode - > cur_table - > table_meta , " user_region " , meta . userregion ) ;
}
}
}
void doris_config_post_cfgfile_finish ( struct version_list_node * vernode , const char * md5str )
{
doris_config_common_cfgfile_finish ( vernode - > business ) ;
2021-09-08 10:45:47 +08:00
if ( vernode - > business - > persistence_write_on )
{
fclose ( vernode - > cur_table - > fp_cfg_file ) ;
}
2021-08-25 18:40:20 +08:00
assert ( vernode - > cur_table - > filesize = = 0 ) ;
vernode - > cur_table - > filesize = vernode - > cur_table - > cur_totallen ;
if ( g_doris_server_info . consumer_port )
{
cJSON_AddNumberToObject ( vernode - > cur_table - > table_meta , " size " , vernode - > cur_table - > filesize ) ;
cJSON_AddStringToObject ( vernode - > cur_table - > table_meta , " md5 " , md5str ) ;
cJSON_AddItemToArray ( vernode - > arrayjson , vernode - > cur_table - > table_meta ) ;
vernode - > cur_table - > table_meta = NULL ;
if ( vernode - > cur_table - > cur_frag ! = NULL )
{
if ( vernode - > cur_table - > cur_frag - > totalsize > vernode - > cur_table - > cur_frag - > cur_fraglen )
{
char * content = ( char * ) malloc ( vernode - > cur_table - > cur_frag - > cur_fraglen ) ;
memcpy ( content , vernode - > cur_table - > cur_frag - > content , vernode - > cur_table - > cur_frag - > cur_fraglen ) ;
free ( vernode - > cur_table - > cur_frag - > content ) ;
vernode - > cur_table - > cur_frag - > content = content ;
vernode - > cur_table - > cur_frag - > totalsize = vernode - > cur_table - > cur_frag - > cur_fraglen ;
vernode - > cur_table - > cur_frag - > end = vernode - > cur_table - > filesize - 1 ;
}
TAILQ_INSERT_TAIL ( & vernode - > cur_table - > frag_head , vernode - > cur_table - > cur_frag , frag_node ) ;
assert ( vernode - > cur_table - > cur_frag - > cur_fraglen = = vernode - > cur_table - > cur_frag - > end - vernode - > cur_table - > cur_frag - > start + 1 ) ;
vernode - > cur_table - > cur_frag = NULL ;
}
}
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_INFO , " business: %s, table %s finished " , vernode - > business - > bizname , vernode - > cur_table - > filename ) ;
TAILQ_INSERT_TAIL ( & vernode - > table_head , vernode - > cur_table , table_node ) ;
vernode - > cur_table = NULL ; //<2F> <> <EFBFBD> գ <EFBFBD> <EFBFBD> <D7BC> <EFBFBD> <EFBFBD> һ <EFBFBD> ű<EFBFBD>
}
void http_config_direct_cfgfile_update ( struct version_list_node * vernode , struct evhttp_request * req )
{
size_t writen_len ;
char md5str [ 40 ] ;
if ( vernode - > cur_table - > cur_totallen = = 0 ) //start
{
doris_config_post_cfgfile_start ( vernode , req ) ;
}
if ( vernode - > cur_table - > fragsize > 0 )
{
2021-09-08 10:45:47 +08:00
if ( vernode - > business - > persistence_write_on )
2021-08-25 18:40:20 +08:00
{
2021-09-08 10:45:47 +08:00
writen_len = fwrite ( vernode - > cur_table - > fragcontent , 1 , vernode - > cur_table - > fragsize , vernode - > cur_table - > fp_cfg_file ) ;
if ( writen_len ! = vernode - > cur_table - > fragsize )
{
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " business: %s, fwrite %s failed: %s " , vernode - > business - > bizname , vernode - > cur_table - > localpath , strerror ( errno ) ) ;
assert ( 0 ) ;
}
2021-08-25 18:40:20 +08:00
}
if ( g_doris_server_info . consumer_port )
{
vernode - > business - > cur_vernode = vernode ;
doris_config_mem_cfgfile_update ( NULL , vernode - > cur_table - > fragcontent , vernode - > cur_table - > fragsize , vernode - > business ) ;
}
else
{
vernode - > cur_table - > cur_totallen + = vernode - > cur_table - > fragsize ;
}
if ( ! vernode - > cur_table - > onceupload )
{
MD5_Update ( & vernode - > cur_table - > md5ctx , vernode - > cur_table - > fragcontent , vernode - > cur_table - > fragsize ) ;
}
free ( vernode - > cur_table - > fragcontent ) ;
}
if ( vernode - > cur_table - > finished ) //end
{
if ( ! vernode - > cur_table - > onceupload )
{
scandir_md5_final_string ( & vernode - > cur_table - > md5ctx , md5str , 40 ) ;
doris_config_post_cfgfile_finish ( vernode , md5str ) ;
evhttp_add_header ( evhttp_request_get_output_headers ( req ) , " X-Content-MD5 " , md5str ) ;
}
else
{
doris_config_post_cfgfile_finish ( vernode , vernode - > cur_table - > fragmd5 ) ;
}
}
evhttp_send_reply ( req , HTTP_OK , " OK " , NULL ) ;
}
void prod_sync_upload_frag_cb ( enum PROD_VEROP_RES result , void * userdata )
{
struct version_list_node * vernode = ( struct version_list_node * ) userdata ;
struct table_meta meta ;
vernode - > retry_times + + ;
vernode - > syncing = 0 ;
switch ( result )
{
case VERSIONOP_RES_OK :
http_config_direct_cfgfile_update ( vernode , vernode - > req ) ;
break ;
case VERSIONOP_RES_ERROR :
evhttp_send_error ( vernode - > req , 500 , " frag sync error res_code " ) ;
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [1;31;40m[Alert]business: %s, frag sync error res_code, abandon it. Send 500 response to client. \033 [0m " , vernode - > business - > bizname ) ;
2021-08-25 18:40:20 +08:00
break ;
case VERSIONOP_CURL_ERROR :
if ( atomic_read ( & vernode - > business - > ready_to_sync ) & & ( vernode - > retry_times < 3 ) )
{
vernode - > syncing = 1 ;
meta . md5 = vernode - > cur_table - > fragmd5 ;
meta . cfgnum = vernode - > cur_table - > cfgnum ;
meta . tablename = vernode - > cur_table - > tablename ;
meta . filename = vernode - > cur_table - > filename ;
meta . userregion = evhttp_find_header ( evhttp_request_get_input_headers ( vernode - > req ) , " X-User-Info " ) ;
if ( vernode - > cur_table - > onceupload )
{
doris_prod_upload_once_with_cb ( vernode - > synctx , vernode - > cur_table - > fragcontent ,
vernode - > cur_table - > fragsize , & meta , prod_sync_upload_frag_cb , vernode ) ;
}
else
{
doris_prod_upload_frag_with_cb ( vernode - > synctx , vernode - > cur_table - > fragcontent , vernode - > cur_table - > fragsize , vernode - > cur_table - > cur_totallen ,
vernode - > cur_table - > finished ? true : false , & meta , prod_sync_upload_frag_cb , vernode ) ;
}
}
else
{
http_config_direct_cfgfile_update ( vernode , vernode - > req ) ;
business_set_sync_peer_abnormal ( vernode - > business ) ;
}
break ;
default : assert ( 0 ) ; break ;
}
}
void http_prod_server_file_once_cb ( struct evhttp_request * req , void * arg )
{
struct doris_business * business = ( struct doris_business * ) arg ;
struct version_list_node * vernode ;
char * content , md5str [ 64 ] ;
const char * tmp ;
struct internal_tablemeta tablemeta ;
size_t size ;
struct table_meta meta ;
int32_t cfgnum = 0 , need_sync = 0 ;
if ( NULL = = ( vernode = upload_file_arguments_valid_check ( req , business , & tablemeta , false ) ) )
{
return ;
}
tablemeta . islast = 1 ;
tablemeta . offset = 0 ;
/*ԭʼ Client<6E> <74> <EFBFBD> <EFBFBD> X-Doris-Master-Slave-Sync<6E> <63> <EFBFBD> <EFBFBD> ͷ<EFBFBD> <CDB7> ͬһ <CDAC> 汾<EFBFBD> <E6B1BE> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> */
if ( atomic_read ( & business - > ready_to_sync ) & &
NULL = = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Doris-Master-Slave-Sync " ) )
{
need_sync = 1 ;
}
if ( need_sync & & vernode - > synctx = = NULL )
{
evhttp_send_error ( req , 400 , " illegal server host, cannt change server durain version life cycle " ) ;
return ;
}
if ( ( size = evbuffer_get_length ( evhttp_request_get_input_buffer ( req ) ) ) = = 0 )
{
evhttp_send_error ( req , 400 , " no content " ) ;
return ;
}
content = ( char * ) malloc ( size ) ;
if ( size ! = ( size_t ) evbuffer_copyout ( evhttp_request_get_input_buffer ( req ) , content , size ) )
{
assert ( 0 ) ;
}
if ( ! upload_frag_check_content_md5 ( req , content , size , md5str , 64 ) )
{
free ( content ) ;
return ;
}
if ( NULL ! = ( tmp = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Config-Num " ) ) )
{
cfgnum = atoi ( tmp ) ; ;
}
doris_config_post_cfgfile_prepare ( vernode , & tablemeta , md5str , cfgnum , content , size ) ;
meta . md5 = md5str ;
meta . cfgnum = cfgnum ;
meta . tablename = tablemeta . tablename ;
meta . userregion = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-User-Info " ) ;
meta . filename = vernode - > cur_table - > filename ;
vernode - > cur_table - > onceupload = true ;
if ( ! need_sync )
{
return http_config_direct_cfgfile_update ( vernode , req ) ;
}
vernode - > retry_times = 0 ;
vernode - > req = req ;
vernode - > syncing = 1 ;
doris_prod_upload_once_with_cb ( vernode - > synctx , content , size , & meta , prod_sync_upload_frag_cb , vernode ) ;
}
void http_prod_server_file_frag_cb ( struct evhttp_request * req , void * arg )
{
struct doris_business * business = ( struct doris_business * ) arg ;
struct version_list_node * vernode ;
char * content = NULL , md5str [ 64 ] ;
const char * tmp ;
struct internal_tablemeta tablemeta ;
size_t size = 0 ;
struct table_meta meta ;
int32_t cfgnum = 0 , need_sync = 0 ;
if ( NULL = = ( vernode = upload_file_arguments_valid_check ( req , business , & tablemeta , true ) ) )
{
return ;
}
if ( ( size = evbuffer_get_length ( evhttp_request_get_input_buffer ( req ) ) ) = = 0 & & ! tablemeta . islast )
{
evhttp_send_error ( req , 400 , " no content " ) ;
return ;
}
if ( atomic_read ( & business - > ready_to_sync ) & &
NULL = = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Doris-Master-Slave-Sync " ) )
{
need_sync = 1 ;
}
if ( need_sync & & vernode - > synctx = = NULL )
{
evhttp_send_error ( req , 400 , " illegal server host, cannt change server durain version life cycle " ) ;
return ;
}
if ( size > 0 )
{
content = ( char * ) malloc ( size ) ;
if ( size ! = ( size_t ) evbuffer_copyout ( evhttp_request_get_input_buffer ( req ) , content , size ) )
{
assert ( 0 ) ;
}
if ( ! upload_frag_check_content_md5 ( req , content , size , md5str , 64 ) )
{
free ( content ) ;
return ;
}
}
if ( NULL ! = ( tmp = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-Config-Num " ) ) )
{
cfgnum = atoi ( tmp ) ; ;
}
doris_config_post_cfgfile_prepare ( vernode , & tablemeta , md5str , cfgnum , content , size ) ;
meta . md5 = md5str ;
meta . cfgnum = cfgnum ;
meta . tablename = tablemeta . tablename ;
meta . userregion = evhttp_find_header ( evhttp_request_get_input_headers ( req ) , " X-User-Info " ) ;
meta . filename = vernode - > cur_table - > filename ;
if ( tablemeta . islast & & tablemeta . offset = = 0 )
{
vernode - > cur_table - > onceupload = true ;
}
if ( ! need_sync )
{
return http_config_direct_cfgfile_update ( vernode , req ) ;
}
vernode - > retry_times = 0 ;
vernode - > req = req ;
vernode - > syncing = 1 ;
doris_prod_upload_frag_with_cb ( vernode - > synctx , content , size , vernode - > cur_table - > cur_totallen ,
tablemeta . islast ? true : false , & meta , prod_sync_upload_frag_cb , vernode ) ;
}
void start_business_http_post_server ( struct doris_business * business )
{
struct evhttp * worker_http ;
if ( ( business - > listener_prod = doris_create_listen_socket ( business - > producer_port ) ) < 0 )
{
assert ( 0 ) ; return ;
}
business - > source_from = RECV_WAY_HTTP_POST ;
worker_http = evhttp_new ( business - > worker_evbase ) ;
if ( g_doris_server_info . ssl_conn_on )
{
evhttp_set_bevcb ( worker_http , doris_https_bufferevent_cb , g_doris_server_info . ssl_instance ) ;
}
evhttp_set_cb ( worker_http , " /version/start " , http_prod_server_verion_start_cb , business ) ;
evhttp_set_cb ( worker_http , " /version/finish " , http_prod_server_verion_end_cb , business ) ;
evhttp_set_cb ( worker_http , " /version/cancel " , http_prod_server_verion_cancel_cb , business ) ;
evhttp_set_cb ( worker_http , " /version/check " , http_prod_server_verion_check_cb , business ) ;
evhttp_set_cb ( worker_http , " /fileonce/upload " , http_prod_server_file_once_cb , business ) ;
evhttp_set_cb ( worker_http , " /filefrag/upload " , http_prod_server_file_frag_cb , business ) ;
evhttp_set_allowed_methods ( worker_http , EVHTTP_REQ_POST | EVHTTP_REQ_PUT | EVHTTP_REQ_HEAD ) ;
evhttp_set_max_body_size ( worker_http , g_doris_server_info . max_http_body_size ) ;
if ( evhttp_accept_socket ( worker_http , business - > listener_prod ) )
{
printf ( " evhttp_accept_socket %d error! \n " , business - > listener_prod ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " evhttp_accept_socket %d error! \n " , business - > listener_prod ) ;
assert ( 0 ) ;
}
}
2021-09-08 10:45:47 +08:00
void doris_config_version_sync_updated ( struct doris_csum_instance * instance , int64_t latest_version , void * userdata )
2021-08-25 18:40:20 +08:00
{
struct doris_business * business = ( struct doris_business * ) userdata ;
struct doris_csum_param * param ;
u_int32_t references , business_post_ups ;
2021-09-08 10:45:47 +08:00
if ( latest_version )
{
business - > genversion_seq = latest_version ;
assert ( business - > cfgver_head - > latest_version = = latest_version ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [34m[Attention] business %s, HTTP Post server generate version change to: %lu \033 [0m " , business - > bizname , latest_version ) ;
}
2021-08-25 18:40:20 +08:00
/*<2A> <> <EFBFBD> <EFBFBD> consuemer<65> <72> ͬʱȷ<CAB1> <C8B7> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ִֻ<D6BB> <D6B4> һ <EFBFBD> <D2BB> */
param = doris_csum_instance_get_param ( instance ) ;
doris_csum_instance_destroy ( instance ) ;
references = doris_csum_param_get_refernces ( param ) ;
if ( references = = 0 )
{
doris_csum_parameter_destroy ( param ) ;
}
/*init sync instance*/
business - > instance = doris_prod_instance_new ( business - > param_prod , business - > worker_evbase , g_doris_server_info . log_runtime ) ;
if ( business - > instance = = NULL )
{
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " doris_prod_instance_new for %s failed " , business - > bizname ) ;
assert ( 0 ) ; return ;
}
/*start worker*/
start_business_http_post_server ( business ) ;
/*ͬ<> <CDAC> <EFBFBD> <EFBFBD> <EFBFBD> ɣ <EFBFBD> <C9A3> <EFBFBD> ʾ <EFBFBD> <CABE> <EFBFBD> <EFBFBD> <EFBFBD> 汾<EFBFBD> <E6B1BE> server<65> <72> һ <EFBFBD> <D2BB> (ע<> <D7A2> <EFBFBD> <EFBFBD> һ <EFBFBD> £<EFBFBD> <C2A3> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> DZ<EFBFBD> <C7B1> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> , <20> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ʱserver<65> <72> <EFBFBD> ᷵<EFBFBD> <E1B7B5> 304)*/
atomic_set ( & business - > ready_to_sync , 1 ) ;
pthread_mutex_lock ( & g_doris_server_info . mutex_lock ) ;
business_post_ups = + + g_doris_server_info . business_post_ups ;
pthread_mutex_unlock ( & g_doris_server_info . mutex_lock ) ;
if ( business_post_ups = = g_doris_server_info . business_post_num )
{
MESA_Monitor_operation ( g_doris_server_info . monitor , g_doris_server_info . mmid_post_server , MONITOR_VALUE_SET , PROMETHUES_POST_SERVER_OK ) ;
}
else
{
MESA_Monitor_operation ( g_doris_server_info . monitor , g_doris_server_info . mmid_post_server , MONITOR_VALUE_SET , PROMETHUES_POST_SERVER_UPING ) ;
}
assert ( business_post_ups < = g_doris_server_info . business_post_num ) ;
2021-09-08 10:45:47 +08:00
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [32m[Info]******Doris Producer worker for %s starts****** \033 [0m " , business - > bizname ) ;
2021-08-25 18:40:20 +08:00
}
/*<2A> <> thread_doris_client_recv_cfg<66> <67> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> version_updated<65> <64> <EFBFBD> <EFBFBD> */
void * thread_http_post_recv_cfg ( void * arg )
{
struct doris_business * business = ( struct doris_business * ) arg ;
struct event_base * client_evbase ;
struct doris_csum_instance * instance ;
struct doris_callbacks doris_cbs ;
struct doris_arguments doris_args ;
struct doris_idxfile_scanner * scanner ;
enum DORIS_UPDATE_TYPE update_type ;
char stored_path [ 512 ] ;
2021-09-08 10:45:47 +08:00
int64_t genversion_seq ;
2021-08-25 18:40:20 +08:00
prctl ( PR_SET_NAME , " http_post " ) ;
client_evbase = event_base_new ( ) ;
business - > source_from = RECV_WAY_IDX_FILE ;
business - > worker_evbase = client_evbase ;
scanner = doris_index_file_scanner ( 0 ) ;
2021-09-08 10:45:47 +08:00
/*<2A> <> <EFBFBD> <EFBFBD> <EFBFBD> Ƿ<EFBFBD> <C7B7> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ־û<D6BE> <C3BB> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> Զ<EFBFBD> һ <EFBFBD> ±<EFBFBD> <C2B1> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ã<EFBFBD> <C3A3> <EFBFBD> <EFBFBD> <EFBFBD> Ϊ<EFBFBD> <CEAA> <EFBFBD> ð汾<C3B0> <E6B1BE> ʼ <EFBFBD> <CABC> ʹ <EFBFBD> <CAB9> */
2021-08-25 18:40:20 +08:00
doris_cbs . version_start = doris_config_localmem_version_start ;
doris_cbs . version_finish = doris_config_localmem_version_finish ;
doris_cbs . version_error = doris_config_localmem_version_error ;
doris_cbs . cfgfile_start = doris_config_localmem_cfgfile_start ;
doris_cbs . cfgfile_update = doris_config_localmem_cfgfile_update ;
doris_cbs . cfgfile_finish = doris_config_localmem_cfgfile_finish ;
doris_cbs . version_updated = NULL ;
doris_cbs . userdata = business ;
snprintf ( stored_path , 512 , " %s/full/index " , business - > store_path_root ) ;
2021-08-30 14:22:48 +08:00
if ( business - > saves_when_fulldel > 0 )
{
get_full_topN_max_versions ( stored_path , business - > full_version_inc , business - > saves_when_fulldel ) ;
}
2021-08-25 18:40:20 +08:00
update_type = doris_index_file_traverse ( scanner , stored_path , & doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
snprintf ( stored_path , 512 , " %s/inc/index " , business - > store_path_root ) ;
do {
update_type = doris_index_file_traverse ( scanner , stored_path , & doris_cbs , NULL , g_doris_server_info . log_runtime ) ;
} while ( update_type ! = CFG_UPDATE_TYPE_NONE ) ;
2021-09-08 10:45:47 +08:00
/*<2A> <> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> <EFBFBD> ɰ汾<C9B0> <E6B1BE> <EFBFBD> <EFBFBD> ʼ <EFBFBD> <CABC> <EFBFBD> <EFBFBD> */
business - > genversion_seq = scanner - > cur_version ;
if ( ! business - > persistence_write_on )
{
sprintf ( stored_path , " %s_verseq " , business - > bizname ) ;
if ( ( genversion_seq = doris_kvdb_get_keystr_valint ( g_doris_server_info . kvdbhandle , stored_path ) ) ! = 0 )
{
business - > genversion_seq = genversion_seq ;
}
}
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " \033 [34m[Attention] business %s, HTTP Post server generate version from: %lu \033 [0m \n " , business - > bizname , business - > genversion_seq ) ;
2021-08-25 18:40:20 +08:00
if ( g_doris_server_info . cluster_sync_mode ) /*Check new configs*/
{
doris_cbs . version_start = doris_config_version_start ;
doris_cbs . version_finish = doris_config_version_finish ;
doris_cbs . version_error = doris_config_version_error ;
doris_cbs . cfgfile_start = doris_config_cfgfile_start ;
doris_cbs . cfgfile_update = doris_config_cfgfile_update ;
doris_cbs . cfgfile_finish = doris_config_cfgfile_finish ;
doris_cbs . version_updated = doris_config_version_sync_updated ;
2021-09-08 10:45:47 +08:00
doris_cbs . userdata = business ;
2021-08-25 18:40:20 +08:00
business - > source_from = RECV_WAY_DRS_CLIENT ;
memset ( & doris_args , 0 , sizeof ( struct doris_arguments ) ) ;
2021-09-08 10:45:47 +08:00
doris_args . current_version = ( business - > persistence_write_on ) ? scanner - > cur_version : ( 0 - business - > cache_max_versions ) ;
2021-08-25 18:40:20 +08:00
sprintf ( doris_args . bizname , " %s " , business - > bizname ) ;
instance = doris_csum_instance_new ( business - > param_csum , client_evbase , & doris_cbs , & doris_args , g_doris_server_info . log_runtime ) ;
if ( instance = = NULL )
{
assert ( 0 ) ; return NULL ;
}
}
else
{
start_business_http_post_server ( business ) ;
}
event_base_dispatch ( client_evbase ) ;
printf ( " Libevent dispath error, should not run here. \n " ) ;
MESA_RUNTIME_LOGV3 ( g_doris_server_info . log_runtime , RLOG_LV_FATAL , " Libevent dispath error, should not run here. " ) ;
assert ( 0 ) ; return NULL ;
}