@@ -47,11 +47,9 @@ struct hos_instance_s g_hos_instance;
hos_client_handle_t g_hos_handle ; //一个进程只允许有一个g_hos_handle
static std : : mutex m_client_lock ;
hos_fd_context_t * * g_fd_context ;
size_t ( * g_fd_info ) [ MAX_HOS_CLIENT_FD_NUM + 1 ] ; //fd 实际从3 开始, fd[thread_id][0]记录register的fd, fd[thread_id][1]记录inject 的fd
size_t * g_fd_info ; //fd 实际从1 开始,每个线程有独立 的fd
static Aws : : SDKOptions g_options ;
static void * hos_fd_manage ( void * ptr ) ;
static inline size_t get_current_ms ( )
{
struct timespec timenow ;
@@ -59,30 +57,12 @@ static inline size_t get_current_ms()
return ( timenow . tv_sec * 1000 + timenow . tv_nsec / 1000 / 1000 ) ;
}
static size_t hash_get_min_free_fd ( size_t thread_id )
{
size_t i = 0 ;
for ( i = 3 ; i < MAX_HOS_CLIENT_FD_NUM + 1 ; i + + )
{
if ( ! g_fd_info [ thread_id ] [ i ] )
{
g_fd_info [ thread_id ] [ i ] = 1 ;
g_fd_info [ thread_id ] [ HOS_FD_REGISTER ] + + ;
g_fd_info [ thread_id ] [ HOS_FD_FREE ] - - ;
return i ;
}
}
return 0 ;
}
static int hos_delete_fd ( size_t thread_id , hos_fd_context_t * context )
{
if ( context = = NULL )
{
return HOS_PARAMETER_ERROR ;
}
size_t fd = context - > fd ;
if ( context )
{
@@ -99,10 +79,6 @@ static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context)
HASH_DEL ( g_fd_context [ thread_id ] , context ) ;
free ( context ) ;
}
g_fd_info [ thread_id ] [ fd ] = 0 ;
g_fd_info [ thread_id ] [ HOS_FD_FREE ] + + ;
g_fd_info [ thread_id ] [ HOS_FD_INJECT ] - - ;
return HOS_CLIENT_OK ;
}
@@ -118,19 +94,17 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
data_info_t * data_info = NULL ;
const Aws : : String & uuid = context - > GetUUID ( ) ;
size_t thread_id , fd , stream_len ;
sscanf ( uuid . c_str ( ) , " %lu %lu %lu " , & thread_id , & fd , & stream_len ) ;
if ( g_fd_info [ thread_id ] [ fd ] )
{
a_fd_context = find_context_by_fd ( g_fd_context [ thread_id ] , fd ) ;
}
a_fd_context = find_context_by_fd ( g_fd_context [ thread_id ] , fd ) ;
if ( a_fd_context = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error: Not find the info of [thread_id:%lu fd:%lu] " , thread_id , fd ) ;
if ( hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle & & hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved )
if ( hos_func - > fs2_info . fs2_handle & & hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > tx_failed_pkts [ thread_id ] + + ;
data_info - > tx_failed_bytes [ thread_id ] + = stream_len ;
}
@@ -144,18 +118,18 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error: [%s:%s] upload failed. error:%s " , a_fd_context - > bucket , a_fd_context - > object , error ) ;
if ( hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle & & hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved )
if ( hos_func - > fs2_info . fs2_handle & & hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > tx_failed_pkts [ thread_id ] + + ;
data_info - > tx_failed_bytes [ thread_id ] + = stream_len ;
}
}
else
{
if ( hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle & & hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved )
if ( hos_func - > fs2_info . fs2_handle & & hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > tx_pkts [ thread_id ] + + ;
data_info - > tx_bytes [ thread_id ] + = stream_len ;
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
@@ -175,14 +149,28 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
{
//APPEND MODE 保留fd
atomic_add ( & ( a_fd_context - > recive_cnt ) , 1 ) ;
if ( a_fd_context - > fd_status = = HOS_FD_INJECT )
{
if ( a_fd_context - > position = = a_fd_context - > recive_cnt )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete " ,
a_fd_context - > bucket , a_fd_context - > object , thread_id , a_fd_context - > fd ) ;
hos_delete_fd ( thread_id , a_fd_context ) ;
}
}
}
else
{
//完整上传 删除fd
hos_close_fd ( fd , thread_id ) ;
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete " ,
a_fd_context - > bucket , a_fd_context - > object , thread_id , a_fd_context - > fd ) ;
hos_delete_fd ( thread_id , a_fd_context ) ;
}
}
g_hos_handle . task_num [ thread_id ] - - ;
g_hos_handle . task_context [ thread_id ] - - ;
}
static void hos_client_create ( )
@@ -243,20 +231,18 @@ static void hos_client_create()
g_hos_handle . count + + ;
g_hos_handle . executor = std : : dynamic_pointer_cast < Aws : : Utils : : Threading : : PooledThreadExecutor > ( config . executor ) ;
g_hos_handle . task_num = ( size_t * ) calloc ( hos_conf - > thread_num , sizeof ( size_t ) ) ;
g_hos_handle . task_context = ( size_t * ) calloc ( hos_conf - > thread_num , sizeof ( size_t ) ) ;
g_fd_context = ( hos_fd_context_t * * ) calloc ( hos_conf - > thread_num , sizeof ( hos_fd_context_t * ) ) ;
g_fd_info = ( size_t ( * ) [ MAX_HOS_CLIENT_FD_NUM + 1 ] ) calloc ( hos_conf - > thread_num , sizeof ( size_t [ MAX_HOS_CLIENT_FD_NUM + 1 ] ) ) ;
for ( size_t i = 0 ; i < hos_conf - > thread_num ; i + + )
{
g_fd_info [ i ] [ 0 ] = 65533 ;
}
g_fd_info = ( size_t * ) calloc ( hos_conf - > thread_num , sizeof ( size_t ) ) ;
#if 0
if (g_hos_handle.hos_func.fd_thread == 0)
{
g_hos_handle.hos_func.fd_thread_status = 0;
pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL);
}
#endif
MESA_HANDLE_RUNTIME_LOG ( log , RLOG_LV_DEBUG , " hos_client_create " , " debug: hos s3client create success, url:%s. " , endpoint ) ;
g_hos_instance . result = true ;
@@ -272,9 +258,6 @@ bool hos_verify_bucket(const char *bucket)
}
if ( g_hos_instance . result ! = true | | g_hos_handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: g_hos_instance.result:%d, g_hos_handle.S3Client:%s " ,
g_hos_instance . result , ( g_hos_handle . S3Client = = NULL ) ? ( " null " ) : ( " not null " ) ) ;
return false ;
}
Aws : : S3 : : Model : : ListBucketsOutcome outcome = g_hos_handle . S3Client - > ListBuckets ( ) ;
@@ -324,14 +307,10 @@ static void *fs2_statistics(void *ptr)
size_t tx_failed_pkts_last = 0 ;
size_t cache_last = 0 ;
fs2_info_t * fs2_info = NULL ;
int PoolThread_state [ 3 ] = { 0 , 0 , 0 } ; //{PoolSize, Busy, TopBusy}
int * busy = & PoolThread_state [ 1 ] ;
int * top_busy = & PoolThread_state [ 2 ] ;
int pool_history_sum = 0 ;
hos_config_t * hos_conf = & g_hos_handle . hos_config ;
hos_func_thread_t * hos_func = & g_hos_handle . hos_func ;
size_t task_num = 0 ;
PoolThread_state [ 0 ] = hos_conf - > pool_thread_size ;
while ( 1 )
{
if ( hos_func - > fs2_status = = HOS_FS2_STOP )
@@ -348,7 +327,7 @@ static void *fs2_statistics(void *ptr)
tx_failed_pkts_sum = 0 ;
cache_sum = 0 ;
fs2_info = & hos_func - > fs2_info [ 0 ] ;
fs2_info = & hos_func - > fs2_info ;
data_info_t * data_info = ( data_info_t * ) fs2_info - > reserved ;
for ( i = 0 ; i < hos_conf - > thread_num ; i + + )
{
@@ -359,6 +338,8 @@ static void *fs2_statistics(void *ptr)
tx_failed_bytes_sum + = data_info - > tx_failed_bytes [ i ] ;
tx_failed_pkts_sum + = data_info - > tx_failed_pkts [ i ] ;
cache_sum + = data_info - > cache [ i ] ;
task_num + = g_hos_handle . task_num [ i ] ;
}
rx_pkts_interval = rx_pkts_sum - rx_pkts_last ;
@@ -393,20 +374,6 @@ static void *fs2_statistics(void *ptr)
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 1 ] , fs2_info - > column_ids [ 5 ] , FS_OP_SET , tx_failed_bytes_sum ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 1 ] , fs2_info - > column_ids [ 6 ] , FS_OP_SET , cache_sum ) ;
//PoolThread State
if ( hos_conf - > pool_thread_size > 0 )
{
* busy = g_hos_handle . executor - > GetTaskSize ( ) ;
* top_busy = ( * busy ) > ( * top_busy ) ? ( * busy ) : ( * top_busy ) ;
pool_history_sum + = * busy ;
fs2_info = & hos_func - > fs2_info [ FS2_POOL_THREAD_STATE ] ;
for ( i = 0 ; i < 3 ; i + + )
{
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 0 ] , fs2_info - > column_ids [ i ] , FS_OP_SET , PoolThread_state [ i ] ) ;
}
}
sleep ( 1 ) ;
}
pthread_exit ( NULL ) ;
@@ -458,7 +425,7 @@ static void hos_expand_fs2()
hos_func_thread_t * hos_func = & g_hos_handle . hos_func ;
size_t i = 0 ;
if ( hos_func - > fs2_info [ 0 ] .fs2_handle )
if ( hos_func - > fs2_info . fs2_handle )
return ;
//data info
/**********************************************************************************************************
@@ -466,9 +433,9 @@ static void hos_expand_fs2()
* current 10 100 1 100 0 0 100
* total 100 1000 10 1000 0 0 100(无实意)
***********************************************************************************************************/
fs2_info = & hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] ;
hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle = hos_init_fs2 ( ( char * ) " hos-data " , strlen ( " hos-data " ) ) ;
fs2_handle = hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle ;
fs2_info = & hos_func - > fs2_info ;
hos_func - > fs2_info . fs2_handle = hos_init_fs2 ( ( char * ) " hos-data " , strlen ( " hos-data " ) ) ;
fs2_handle = hos_func - > fs2_info . fs2_handle ;
fs2_info - > line_ids = ( int * ) calloc ( 2 , sizeof ( int ) ) ;
fs2_info - > column_ids = ( int * ) calloc ( 7 , sizeof ( int ) ) ;
@@ -492,29 +459,6 @@ static void hos_expand_fs2()
data_info - > cache = ( size_t * ) calloc ( hos_conf - > thread_num , sizeof ( size_t ) ) ;
FS_start ( fs2_handle ) ;
if ( hos_conf - > pool_thread_size > 0 )
{
//PoolThread state
/*******************************************************
* PoolSize Busy TopBusy AveBusy
* ThreadNum 1000 500 800 650
********************************************************/
fs2_info = & hos_func - > fs2_info [ FS2_POOL_THREAD_STATE ] ;
hos_func - > fs2_info [ FS2_POOL_THREAD_STATE ] . fs2_handle = hos_init_fs2 ( ( char * ) " hos-poolthread " , strlen ( " hos-poolthread " ) ) ;
fs2_handle = hos_func - > fs2_info [ FS2_POOL_THREAD_STATE ] . fs2_handle ;
fs2_info - > line_ids = ( int * ) calloc ( 1 , sizeof ( int ) ) ;
fs2_info - > column_ids = ( int * ) calloc ( 3 , sizeof ( int ) ) ;
const char * poolthread_col [ 3 ] = { " PoolSize " , " Busy " , " TopBusy " } ;
for ( i = 0 ; i < sizeof ( poolthread_col ) / sizeof ( const char * ) ; i + + )
{
fs2_info - > column_ids [ i ] = FS_register ( fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , poolthread_col [ i ] ) ;
}
fs2_info - > line_ids [ 0 ] = FS_register ( fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , " ThreadNum " ) ;
FS_start ( fs2_handle ) ;
}
pthread_create ( & hos_func - > fs2_thread , NULL , fs2_statistics , NULL ) ;
return ;
@@ -527,17 +471,31 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t
int ret = 0 ;
hos_func_thread_t * hos_func = & g_hos_handle . hos_func ;
data_info_t * data_info = NULL ;
hos_config_t * hos_conf = & g_hos_handle . hos_config ;
//设置回调函数
std : : shared_ptr < Aws : : Client : : AsyncCallerContext > context =
Aws : : MakeShared < Aws : : Client : : AsyncCallerContext > ( " " ) ;
sprintf ( buf , " %lu %lu %lu " , thread_id , fd , stream_len ) ;
context - > SetUUID ( buf ) ;
if ( hos_conf - > max_request_num & & hos_conf - > max_request_context & &
( g_hos_handle . task_num [ thread_id ] > = hos_conf - > max_request_num | |
g_hos_handle . task_context [ thread_id ] > = hos_conf - > max_request_context ) )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: PutObjectAsync failed. [%s:%s]. task_num:%lu, task_context:%lu " ,
bucket , object , g_hos_handle . task_num [ thread_id ] , g_hos_handle . task_context [ thread_id ] ) ;
return HOS_SEND_FAILED ;
}
auto & S3Client = * ( g_hos_handle . S3Client ) ;
ret = S3Client . PutObjectAsync ( request , PutObjectAsyncFinished , context ) ;
if ( ret )
{
g_hos_handle . task_num [ thread_id ] + + ;
g_hos_handle . task_context [ thread_id ] + = stream_len ;
//不算真正成功, 需要等到PutObjectAsyncFinished的结果
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: PutObjectAsync success. [%s:%s] " , bucket , object ) ;
@@ -549,11 +507,11 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: PutObjectAsync failed. [%s:%s] " , bucket , object ) ;
if ( hos_func - > fs2_info [ 0 ] .fs2_handle )
if ( hos_func - > fs2_info . fs2_handle )
{
if ( hos_func - > fs2_info [ 0 ] .reserved )
if ( hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ 0 ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > tx_failed_pkts [ thread_id ] + + ;
data_info - > tx_failed_bytes [ thread_id ] + = stream_len ;
}
@@ -572,9 +530,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t
Aws : : S3 : : Model : : PutObjectOutcome Outcome = S3Client . PutObject ( request ) ;
if ( Outcome . IsSuccess ( ) )
{
if ( hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle & & hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved )
if ( hos_func - > fs2_info . fs2_handle & & hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > tx_pkts [ thread_id ] + + ;
data_info - > tx_bytes [ thread_id ] + = stream_len ;
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
@@ -594,9 +552,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: PutObject failed. [%s:%s] cause:%s " , bucket , object , Outcome . GetError ( ) . GetMessage ( ) . c_str ( ) ) ;
if ( hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .fs2_handle & & hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved )
if ( hos_func - > fs2_info . fs2_handle & & hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ FS2_DATA_FLOW_STATE ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > tx_failed_pkts [ thread_id ] + + ;
data_info - > tx_failed_bytes [ thread_id ] + = stream_len ;
}
@@ -641,15 +599,16 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
MESA_load_profile_uint_def ( conf_path , module , " hos_poolsize " , & hos_conf - > pool_thread_size , 0 ) ;
MESA_load_profile_uint_def ( conf_path , module , " hos_cache_size " , & hos_conf - > cache_size , 102400 ) ;
MESA_load_profile_uint_def ( conf_path , module , " hos_cache_count " , & hos_conf - > cache_count , 10 ) ;
MESA_load_profile_uint_def ( conf_path , module , " hos_fd_live_time_ms " , & hos_conf - > timeout , 1000 ) ;
MESA_load_profile_string_nodef ( conf_path , module , " hos_fs2_serverip " , hos_conf - > fs2_ip , INET6_ADDRSTRLEN ) ;
MESA_load_profile_uint_nodef ( conf_path , module , " hos_fs2_serverport " , & hos_conf - > fs2_port ) ;
MESA_load_profile_string_def ( conf_path , module , " hos_fs2_path " , hos_conf - > fs2_path , sizeof ( hos_conf - > fs2_path ) , " ./hos_fs2.stat " ) ;
MESA_load_profile_uint_def ( conf_path , module , " hos_fs2_format " , & hos_conf - > fs2_fmt , 0 ) ;
MESA_load_profile_uint_def ( conf_path , module , " hos_request_num " , & hos_conf - > max_request_num , 100 ) ;
MESA_load_profile_uint_def ( conf_path , module , " hos_request_context " , & hos_conf - > max_request_context , 10240000 ) ;
if ( hos_conf - > ip & & hos_conf - > port & & strlen ( hos_conf - > accesskeyid ) & & strlen ( hos_conf - > secretkey ) )
{
g_hos_handle . log = MESA_create_runtime_log_handle ( hos_conf - > log_path , hos_conf - > log_level ) ;
if ( log = = NULL )
if ( g_hos_handle . log = = NULL )
{
g_hos_instance . result = false ;
g_hos_instance . error_code = HOS_RUNTIME_LOG_FAILED ;
@@ -693,17 +652,20 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
g_hos_instance . error_code = HOS_CONF_ERROR ;
snprintf ( g_hos_instance . error_message , HOS_ERROR_MESSAGE_SIZE , " hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s " ,
hos_conf - > ip , hos_conf - > port , hos_conf - > accesskeyid , hos_conf - > secretkey ) ;
MESA_destroy_runtime_log_handle ( g_hos_handle . log ) ;
return & g_hos_instance ;
}
}
int hos_create_bucket ( const char * bucket )
{
if ( ( bucke t = = NULL ) | | ( g_hos_handle . S3Client = = NULL ) )
if ( g_hos_instance . resul t = = false | | g_hos_handle . S3Client = = NULL )
{
return HOS_INSTANCE_NOT_INIT ;
}
if ( bucket = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , " hos_create_bucket " ,
" error:bucket:%s, s3client:%s " , bucket , g_hos_handle . S3Client ? " not null " : " null " );
" error:bucket:%s " , bucket ) ;
return HOS_PARAMETER_ERROR ;
}
auto & S3Client = * g_hos_handle . S3Client ;
@@ -747,7 +709,12 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
int ret ;
int mode = 0 ;
if ( ( g_hos_handle . S3Clien t = = NULL ) | | ( bucket = = NULL ) | | ( object = = NULL ) | | ( thread_id > hos_conf - > thread_num ) )
if ( g_hos_instance . resul t = = false | | g_hos_handle . S3Client = = NULL )
{
return HOS_INSTANCE_NOT_INIT ;
}
if ( ( bucket = = NULL ) | | ( object = = NULL ) | | ( thread_id > hos_conf - > thread_num ) )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , " hos_upload_stream " ,
" error: s3client:%s, bucket:%s, object:%s, thread_id:%lu, thread_num:%u " ,
@@ -780,31 +747,22 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
request . SetBody ( input_data ) ;
}
//field_stat2 record
if ( hos_func - > fs2_info [ 0 ] .fs2_handle )
if ( hos_func - > fs2_info . fs2_handle )
{
if ( hos_func - > fs2_info [ 0 ] .reserved )
if ( hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ 0 ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > rx_pkts [ thread_id ] + + ;
data_info - > rx_bytes [ thread_id ] + = data_len ;
}
}
//设置回调函数
size_t fd = hash_get_min_free_fd ( thread_id ) ;
size_t fd = + + g_fd_info [ thread_id ] ;
hos_fd_context_t info = { fd , 0 , ( char * ) bucket , ( char * ) object , ( void * ) callback , userdata , NULL , 0 , 0 , 0 } ;
add_fd_context ( & g_fd_context [ thread_id ] , & info ) ;
{
std : : lock_guard < std : : mutex > locker ( m_client_lock ) ;
if ( g_hos_handle . hos_func . fd_thread = = 0 )
{
g_hos_handle . hos_func . fd_thread_status = 0 ;
pthread_create ( & g_hos_handle . hos_func . fd_thread , NULL , hos_fd_manage , NULL ) ;
}
}
if ( hos_conf - > pool_thread_size > 0 )
{
ret = hos_putobject_async ( request , data_len , thread_id , fd , bucket , object ) ;
@@ -822,9 +780,6 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call
struct stat buffer ;
if ( g_hos_instance . result = = false | | g_hos_handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s " ,
g_hos_instance . result , ( g_hos_handle . S3Client = = NULL ) ? ( NULL ) : ( " not null " ) ) ;
return HOS_INSTANCE_NOT_INIT ;
}
@@ -848,9 +803,6 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size
{
if ( g_hos_instance . result = = false | | g_hos_handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s " ,
g_hos_instance . result , ( g_hos_handle . S3Client = = NULL ) ? ( NULL ) : ( " not null " ) ) ;
return HOS_INSTANCE_NOT_INIT ;
}
@@ -865,102 +817,28 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size
return hos_upload_stream ( bucket , object , buf , buf_len , callback , userdata , thread_id ) ;
}
static void * hos_fd_manage ( void * ptr )
{
hos_fd_context_t * a_fd_context ;
size_t thread_sum = g_hos_handle . hos_config . thread_num ;
size_t thread_num ;
//size_t fd;
while ( 1 )
{
if ( g_hos_handle . hos_func . fd_thread_status )
break ;
for ( thread_num = 0 ; thread_num < thread_sum ; thread_num + + )
{
#if 0
a_fd_context = find_context_by_fd(g_fd_context[thread_num], fd);
if (!a_fd_context)
continue;
#endif
hos_fd_context_t * tmp = NULL ;
HASH_ITER ( hh , g_fd_context [ thread_num ] , a_fd_context , tmp )
{
if ( ! a_fd_context )
break ;
if ( a_fd_context - > fd_status = = HOS_FD_INJECT )
{
if ( a_fd_context - > position = = a_fd_context - > recive_cnt )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete " ,
a_fd_context - > bucket , a_fd_context - > object , thread_num , a_fd_context - > fd ) ;
hos_delete_fd ( thread_num , a_fd_context ) ;
}
else if ( a_fd_context - > overtime < = get_current_ms ( ) )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error: [%s:%s] upload not completed, but the live-time of [thread_id:%lu fd:%lu] is over. " ,
a_fd_context - > bucket , a_fd_context - > object , thread_num , a_fd_context - > fd ) ;
hos_delete_fd ( thread_num , a_fd_context ) ;
}
}
}
}
usleep ( 500000 ) ;
}
pthread_exit ( NULL ) ;
}
int hos_open_fd ( const char * bucket , const char * object , put_finished_callback callback , void * userdata , size_t thread_id , int mode )
{
if ( g_hos_instance . result = = false | | g_hos_handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s " ,
g_hos_instance . result , ( g_hos_handle . S3Client = = NULL ) ? ( " null " ) : ( " not null " ) ) ;
return HOS_INSTANCE_NOT_INIT ;
}
if ( ( bucket = = NULL ) | | ( object = = NULL ) | | ( thread_id > g_hos_handle . hos_config . thread_num ) | | strlen ( bucket ) = = 0 | | strlen ( object ) = = 0 )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , " hos_open_fd " ,
" error: bucket:%s, obejct:%s, thread_id:%lu " ,
//( bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id);
bucket , object , thread_id ) ;
( bucket = = NULL ) ? " null " : bucket , ( object = = NULL ) ? " null " : object , thread_id) ;
return HOS_PARAMETER_ERROR ;
}
size_t fd = hash_get_min_free_fd ( thread_id ) ;
if ( fd = = 0 )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , " hos_open_fd " ,
" error:fd not enough, thread_id:%lu, fd free: %lu, fd register:%lu, fd inject:%lu " ,
thread_id ,
g_fd_info [ thread_id ] [ HOS_FD_FREE ] ,
g_fd_info [ thread_id ] [ HOS_FD_REGISTER ] ,
g_fd_info [ thread_id ] [ HOS_FD_INJECT ] ) ;
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , " hos_open_fd " , " debug: thread_id:%lu, fd:%lu " , thread_id , fd ) ;
return HOS_FD_NOT_ENOUGH ;
}
size_t fd = + + g_fd_info [ thread_id ] ;
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , " hos_open_fd " , " debug: thread_id:%lu, fd:%lu " , thread_id , fd ) ;
hos_fd_context_t info = { fd , mode , ( char * ) bucket , ( char * ) object , ( void * ) callback , userdata ,
NULL , /*cache*/ g_hos_handle . hos_config . cache_count , 0 , /*position*/ 0 , /*recive_cnt*/
( long ) g_hos_handle . hos_config . cache_size , /*cache_rest*/ HOS_FD_REGISTER , /*fd_status*/
0 , /*overtime*/ g_hos_handle . hos_config . timeout , } ;
( long ) g_hos_handle . hos_config . cache_size , /*cache_rest*/ HOS_FD_REGISTER , /*fd_status*/ } ;
add_fd_context ( & g_fd_context [ thread_id ] , & info ) ;
#if 0
{
std::lock_guard<std::mutex> locker(m_client_lock);
if (g_hos_handle.hos_func.fd_thread == 0)
{
g_hos_handle.hos_func.fd_thread_status = 0;
pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL);
}
}
#endif
return fd ;
}
@@ -977,13 +855,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
if ( g_hos_instance . result = = false | | g_hos_handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s " ,
g_hos_instance . result , ( g_hos_handle . S3Client = = NULL ) ? ( NULL ) : ( " not null " ) ) ;
return HOS_INSTANCE_NOT_INIT ;
}
if ( ( fd < 3 ) | | fd > MAX_HOS_CLIENT_FD_NUM | | ( stream = = NULL ) | | ( thread_id > hos_conf - > thread_num ) )
if ( ( stream = = NULL ) | | ( thread_id > hos_conf - > thread_num ) )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL ,
" hos_write " , " error: fd:%lu, stream:%s, stream_len:%lu, thread_id:%lu. " ,
@@ -991,10 +866,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
return HOS_PARAMETER_ERROR ;
}
if ( g_fd_info [ thread_id ] [ fd ] )
{
a_fd_context = find_context_by_fd ( g_fd_context [ thread_id ] , fd ) ;
}
a_fd_context = find_context_by_fd ( g_fd_context [ thread_id ] , fd ) ;
if ( a_fd_context = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ , " error: fd info not find. thread_id:%lu, fd:%lu " , thread_id , fd ) ;
@@ -1011,11 +883,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
//BUFF_MODE
//field_stat2 record
if ( hos_func - > fs2_info [ 0 ] .fs2_handle )
if ( hos_func - > fs2_info . fs2_handle )
{
if ( hos_func - > fs2_info [ 0 ] .reserved )
if ( hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ 0 ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > rx_pkts [ thread_id ] + + ;
data_info - > rx_bytes [ thread_id ] + = stream_len ;
}
@@ -1079,11 +951,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
request . SetBody ( input_data ) ;
upload_len = buffer . st_size ;
//field_stat2 record
if ( hos_func - > fs2_info [ 0 ] .fs2_handle )
if ( hos_func - > fs2_info . fs2_handle )
{
if ( hos_func - > fs2_info [ 0 ] .reserved )
if ( hos_func - > fs2_info . reserved )
{
data_info = ( data_info_t * ) hos_func - > fs2_info [ 0 ] .reserved ;
data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
data_info - > rx_pkts [ thread_id ] + + ;
data_info - > rx_bytes [ thread_id ] + = upload_len ;
}
@@ -1124,28 +996,22 @@ int hos_close_fd(size_t fd, size_t thread_id)
if ( g_hos_instance . result = = false | | g_hos_handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , __FUNCTION__ ,
" error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s " ,
g_hos_instance . result , ( g_hos_handle . S3Client = = NULL ) ? ( " null " ) : ( " not null " ) ) ;
return HOS_INSTANCE_NOT_INIT ;
}
if ( fd < 3 | | fd > 65533 | | thread_id > hos_conf - > thread_num )
if ( thread_id > hos_conf - > thread_num )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_FATAL , " hos_close_fd " ,
" error:fd:%lu, thread_id:%lu, thread_sum:%u. " ,
fd , thread_id , hos_conf - > thread_num ) ;
return HOS_PARAMETER_ERROR ;
}
if ( g_fd_info [ thread_id ] [ fd ] )
{
a_fd_context = find_context_by_fd ( g_fd_context [ thread_id ] , fd ) ;
}
a_fd_context = find_context_by_fd ( g_fd_context [ thread_id ] , fd ) ;
if ( a_fd_context = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG ,
" hos_close_fd " , " debug: not find the a_fd_context of [f d:%lu threa d:%lu] " ,
fd , thread_id) ;
" hos_close_fd " , " debug: not find the a_fd_context of [threa d:%lu f d:%lu] " ,
thread_id , f d ) ;
return HOS_CLIENT_OK ;
}
@@ -1179,18 +1045,32 @@ int hos_close_fd(size_t fd, size_t thread_id)
{
hos_putobject_sync ( request , upload_len , thread_id , fd , a_fd_context - > bucket , a_fd_context - > object ) ;
}
( ( data_info_t * ) ( g_hos_handle . hos_func . fs2_info - > reserved ) ) - > cache [ thread_id ] = 0 ;
( ( data_info_t * ) ( g_hos_handle . hos_func . fs2_info . reserved ) ) - > cache [ thread_id ] = 0 ;
}
}
a_fd_context - > fd_status = HOS_FD_INJECT ;
a_fd_context - > cache . reset ( ) ;
a_fd_context - > cache = NULL ;
a_fd_context - > overtime = get_current_ms ( ) + a_fd_context - > timeout ;
a_fd_context - > cache_rest = hos_conf - > cache_size ;
a_fd_context - > cache_count = hos_conf - > cache_count ;
g_fd_info [ thread_id ] [ HOS_FD_REGISTER ] - - ;
g_fd_info [ thread_id ] [ HOS_FD_INJECT ] + + ;
if ( hos_conf - > pool_thread_size = = 0 )
{
//同步模式, 立即释放fd
hos_delete_fd ( thread_id , a_fd_context ) ;
}
else
{
//异步APPEND 模式,判断是否可以释放
//异步其他模式, 在PutObjectAsyncFinished出释放fd
if ( a_fd_context - > mode = = ( BUFF_MODE | APPEND_MODE ) & & a_fd_context - > position = = a_fd_context - > recive_cnt )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ ,
" debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete " ,
a_fd_context - > bucket , a_fd_context - > object , thread_id , a_fd_context - > fd ) ;
hos_delete_fd ( thread_id , a_fd_context ) ;
}
}
return HOS_CLIENT_OK ;
}
@@ -1203,10 +1083,9 @@ int hos_shutdown_instance()
hos_func_thread_t * hos_func = & g_hos_handle . hos_func ;
size_t task_num = 0 ;
if ( g_hos_handle . S3Client = = NULL )
if ( g_hos_instance . result = = false | | g_hos_ handle . S3Client = = NULL )
{
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , " hos_shutdown_instance " , " debug: There is no hos client. " ) ;
return HOS_CLIENT_OK ;
return HOS_INSTANCE_NOT_INIT ;
}
if ( g_hos_handle . count > 0 & & - - g_hos_handle . count )
@@ -1241,17 +1120,17 @@ int hos_shutdown_instance()
pthread_join ( hos_func - > fs2_thread , NULL ) ;
for ( i = 0 ; i < FS2_RECORD_EVENTS ; i + + )
{
screen_stat_handle_t * fs2_handle = & hos_func - > fs2_info [ i ] .fs2_handle ;
screen_stat_handle_t * fs2_handle = & hos_func - > fs2_info . fs2_handle ;
if ( * fs2_handle )
{
FS_stop ( fs2_handle ) ;
* fs2_handle = NULL ;
}
if ( hos_func - > fs2_info [ i ] .reserved )
if ( hos_func - > fs2_info . reserved )
{
if ( i = = 0 )
{
data_info_t * data_info = ( data_info_t * ) hos_func - > fs2_info [ i ] .reserved ;
data_info_t * data_info = ( data_info_t * ) hos_func - > fs2_info . reserved ;
if ( data_info - > rx_pkts )
free ( data_info - > rx_pkts ) ;
if ( data_info - > rx_bytes )
@@ -1267,18 +1146,18 @@ int hos_shutdown_instance()
if ( data_info - > cache )
free ( data_info - > cache ) ;
}
free ( hos_func - > fs2_info [ i ] .reserved ) ;
hos_func - > fs2_info [ i ] .reserved = NULL ;
free ( hos_func - > fs2_info . reserved ) ;
hos_func - > fs2_info . reserved = NULL ;
}
if ( hos_func - > fs2_info [ i ] .line_ids )
if ( hos_func - > fs2_info . line_ids )
{
free ( hos_func - > fs2_info [ i ] .line_ids ) ;
hos_func - > fs2_info [ i ] .line_ids = NULL ;
free ( hos_func - > fs2_info . line_ids ) ;
hos_func - > fs2_info . line_ids = NULL ;
}
if ( hos_func - > fs2_info [ i ] .column_ids )
if ( hos_func - > fs2_info . column_ids )
{
free ( hos_func - > fs2_info [ i ] .column_ids ) ;
hos_func - > fs2_info [ i ] .column_ids = NULL ;
free ( hos_func - > fs2_info . column_ids ) ;
hos_func - > fs2_info . column_ids = NULL ;
}
}
}
@@ -1290,6 +1169,11 @@ int hos_shutdown_instance()
free ( g_hos_handle . task_num ) ;
g_hos_handle . task_num = NULL ;
}
if ( g_hos_handle . task_context ! = NULL )
{
free ( g_hos_handle . task_context ) ;
g_hos_handle . task_context = NULL ;
}
MESA_HANDLE_RUNTIME_LOG ( g_hos_handle . log , RLOG_LV_DEBUG , __FUNCTION__ , " debug: delete s3client. " ) ;
if ( g_fd_info )