@@ -25,6 +25,35 @@ extern "C"
# define MAX_HOS_CLIENT_THREAD_NUM 255
# define MAX_HOS_CLIENT_FD_NUM 65535
# if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410)
# define atomic_add(x,y) __sync_add_and_fetch((x),(y))
# define atomic_read(x) __sync_add_and_fetch((x),0)
# else
# define atomic_add(x,y) ((*(x))+=(y))
# define atomic_read(x) (*(x))
# endif
typedef struct data_info_s
{
int * tx_pkts ;
int * tx_bytes ;
int * rx_pkts ;
int * rx_bytes ;
int * tx_pkts_last ;
int * tx_bytes_last ;
int * rx_pkts_last ;
int * rx_bytes_last ;
} data_info_t ;
typedef struct fs2_info_s
{
screen_stat_handle_t fs2_handle ;
int * line_ids ;
int * column_ids ;
void * reserved ; //预留给每个fs2 handle用来存储自定义的数据
} fs2_info_t ;
typedef struct hos_client_handle_s
{
Aws : : S3 : : S3Client * S3Client ;
@@ -34,30 +63,21 @@ typedef struct hos_client_handle_s
int fd_thread_status ;
/* options */
size_t cache_size ;
size_t cache_times ;
size_t cache_count ;
size_t thread_sum ;
size_t timeout ;
/* expand */
screen_stat_handle_t fs2_handle ;
fs2_info_t fs2_info [ 3 ] ; //0: data info; 1: fd info; 2 cache info
pthread_t fs2_thread ;
int fs2_status ;
# define HOS_FS2_START 1
# define HOS_FS2_STOP 2
int * line_ids ;
int * column_ids ;
int * tx_pkts ;
int * tx_bytes ;
int * rx_pkts ;
int * rx_bytes ;
int * tx_pkts_last ;
int * tx_bytes_last ;
int * rx_pkts_last ;
int * rx_bytes_last ;
} hos_client_handle_t ;
hos_client_handle hos_handle ; //一个进程只允许有一个hos_handle
hos_info_t * hash_hos_info [ MAX_HOS_CLIENT_THREAD_NUM ] ;
size_t fd_info [ MAX_HOS_CLIENT_THREAD_NUM ] [ MAX_HOS_CLIENT_FD_NUM ] ;
size_t * hos_cache ; //记录当前hos缓存了多少数据
size_t fd_info [ MAX_HOS_CLIENT_THREAD_NUM ] [ MAX_HOS_CLIENT_FD_NUM + 1 ] ; //fd 实际从3开始, fd[thread_id][0]记录register的fd, fd[thread_id][1]记录inject的fd
Aws : : SDKOptions options ;
static inline size_t get_current_ms ( )
@@ -70,11 +90,17 @@ static inline size_t get_current_ms()
static size_t hash_get_min_free_fd ( size_t thread_id )
{
size_t i = 0 ;
for ( i = 1 ; i < MAX_HOS_CLIENT_FD_NUM ; i + + )
for ( i = 3 ; i < MAX_HOS_CLIENT_FD_NUM + 1 ; i + + )
{
if ( ! fd_info [ thread_id ] [ i ] )
{
fd_info [ thread_id ] [ i ] = 1 ;
fd_info [ thread_id ] [ HOS_FD_REGISTER ] + + ;
fd_info [ thread_id ] [ HOS_FD_FREE ] - - ;
return i ;
}
}
return 0 ;
}
@@ -86,6 +112,8 @@ static int hos_delete_fd(size_t fd, size_t thread_id)
}
delete_info_by_fd ( & hash_hos_info [ thread_id ] , fd ) ;
fd_info [ thread_id ] [ fd ] = 0 ;
fd_info [ thread_id ] [ HOS_FD_FREE ] + + ;
fd_info [ thread_id ] [ HOS_FD_INJECT ] - - ;
return HOS_CLIENT_OK ;
}
@@ -118,43 +146,44 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
if ( hos_info - > mode & APPEND_MODE )
{
//APPEND MODE 保留fd
hos_info - > recive_cnt + + ;
#if 0
if (hos_info->fd_status == HOS_FD_INJECT)
{
if (hos_info->recive_cnt == hos_info->position)
hos_delete_fd(fd, thread_id);
}
#endif
atomic_add ( & ( hos_info- > recive_cnt ) , 1 ) ;
} else
{
//完整上传 删除fd
// hos_delete_fd(fd, thread_id);
hos_info - > fd_status = HOS_FD_INJECT ;
hos_close_fd ( fd , thread_id) ;
}
}
void hos_set_cache_size ( hos_client_handle client , size_t cache_size )
{
client - > cache_size = cache_size ;
hos_cache = ( size_t * ) calloc ( client - > thread_sum , sizeof ( size_t ) ) ;
return ;
}
void hos_set_cache_times ( hos_client_handle client , size_t cache_times )
void hos_set_cache_count ( hos_client_handle client , size_t cache_count )
{
client - > cache_times = cache_times ;
client - > cache_count = cache_count ;
return ;
}
void hos_set_thread_sum ( hos_client_handle client , size_t thread_sum )
{
client - > thread_sum = thread_sum ;
for ( size_t i = 0 ; i < thread_sum ; i + + )
{
fd_info [ i ] [ 0 ] = 65533 ;
}
if ( hos_cache )
{
hos_cache = ( size_t * ) realloc ( hos_cache , thread_sum * sizeof ( size_t ) ) ;
}
return ;
}
hos_client_handle hos_client_create ( const char * endpoin t, const char * accesskeyid , const char * secretkey , size_t pool_size )
hos_client_handle hos_client_create ( const char * serverip , size_t por t, const char * accesskeyid , const char * secretkey , size_t pool_size )
{
if ( ! endpoint | | ! accesskeyid | | ! secretkey )
if ( ! serverip | | ! accesskeyid | | ! secretkey )
{
return NULL ;
}
@@ -171,6 +200,8 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
Aws : : Auth : : AWSCredentials credentials ( accesskeyid , secretkey ) ;
//初始化
char endpoint [ 128 ] ;
snprintf ( endpoint , 128 , " http://%s:%lu/hos/ " , serverip , port ) ;
config . endpointOverride = endpoint ;
config . verifySSL = false ;
config . enableEndpointDiscovery = true ;
@@ -188,17 +219,21 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
hos_handle - > buckets = outcome . GetResult ( ) . GetBuckets ( ) ;
hos_handle - > cache_size = 0 ;
hos_handle - > cache_times = 1 ;
hos_handle - > cache_count = 0 ;
hos_handle - > thread_sum = 1 ;
hos_handle - > timeout = 1000 ;
fd_info [ 0 ] [ 0 ] = 65533 ;
fd_info [ 0 ] [ 1 ] = 0 ;
fd_info [ 0 ] [ 2 ] = 0 ;
return hos_handle ;
}
static void * fs2_statistics ( void * ptr )
{
hos_client_handle handle = ( hos_client_handle ) ptr ;
in t i = 0 ;
size_ t i = 0 ;
int rx_pkts_sum = 0 ;
int rx_bytes_sum = 0 ;
int tx_pkts_sum = 0 ;
@@ -207,6 +242,7 @@ static void *fs2_statistics(void *ptr)
int rx_bytes_sum_interval = 0 ;
int tx_pkts_sum_interval = 0 ;
int tx_bytes_sum_interval = 0 ;
fs2_info_t * fs2_info = NULL ;
while ( 1 )
{
@@ -224,42 +260,60 @@ static void *fs2_statistics(void *ptr)
tx_pkts_sum_interval = 0 ;
tx_bytes_sum_interval = 0 ;
for ( i = 0 ; i < ( int ) handle - > thread_sum ; i + + )
//pkts and bytes info
fs2_info = & handle - > fs2_info [ 0 ] ;
for ( i = 0 ; i < handle - > thread_sum ; i + + )
{
rx_pkts_sum + = handle - > rx_pkts [ i ] ;
rx_byte s_sum + = handle - > rx_byte s [ i ] ;
t x_pkt s_sum + = handle - > t x_pkt s[ i ] ;
tx_byte s_sum + = handle - > tx_byte s [ i ] ;
r x_pkt s_sum_interval + = ( handle - > rx_pkts [ i ] - handle - > r x_pkts_last [ i ] ) ;
rx_byte s_sum_interval + = ( handle - > rx_byte s [ i ] - handle - > rx_byte s_last [ i ] ) ;
t x_pkt s_sum_interval + = ( handle - > t x_pkt s[ i ] - handle - > t x_pkt s_last[ i ] ) ;
tx_byte s_sum_interval + = ( handle - > tx_byte s [ i ] - handle - > tx_byte s_last [ i ] ) ;
data_info_t * data_info = ( data_info_t * ) fs2_info - > reserved ;
rx_pkt s_sum + = data_info - > rx_pkt s [ i ] ;
r x_byte s_sum + = data_info - > r x_byte s[ i ] ;
tx_pkt s_sum + = data_info - > tx_pkt s [ i ] ;
t x_byte s_sum + = data_info - > t x_bytes [ i ] ;
rx_pkt s_sum_interval + = ( data_info - > rx_pkt s [ i ] - data_info - > rx_pkt s_last [ i ] ) ;
r x_byte s_sum_interval + = ( data_info - > r x_byte s[ i ] - data_info - > r x_byte s_last[ i ] ) ;
tx_pkt s_sum_interval + = ( data_info - > tx_pkt s [ i ] - data_info - > tx_pkt s_last [ i ] ) ;
tx_bytes_sum_interval + = ( data_info - > tx_bytes [ i ] - data_info - > tx_bytes_last [ i ] ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i ] , handle - > column_ids [ 0 ] , FS_OP_SET , handle - > rx_pkts [ i ] ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i ] , handle - > column_ids [ 1 ] , FS_OP_SET , handle - > rx_bytes [ i ] ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i ] , handle - > column_ids [ 2 ] , FS_OP_SET , handle - > tx_pkts [ i ] ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i ] , handle - > column_ids [ 3 ] , FS_OP_SET , handle - > tx_bytes [ i ] ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i ] , fs2_info - > column_ids [ 0 ] , FS_OP_SET , data_info - > rx_pkts [ i ] ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i ] , fs2_info - > column_ids [ 1 ] , FS_OP_SET , data_info - > rx_bytes [ i ] ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i ] , fs2_info - > column_ids [ 2 ] , FS_OP_SET , data_info - > tx_pkts [ i ] ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i ] , fs2_info - > column_ids [ 3 ] , FS_OP_SET , data_info - > tx_bytes [ i ] ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i + 1 ] , handle - > column_ids [ 0 ] , FS_OP_SET , ( handle - > rx_pkts [ i ] - handle - > rx_pkts_last [ i ] ) ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i + 1 ] , handle - > column_ids [ 1 ] , FS_OP_SET , ( handle - > rx_bytes [ i ] - handle - > rx_bytes_last [ i ] ) ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i + 1 ] , handle - > column_ids [ 2 ] , FS_OP_SET , ( handle - > tx_pkts [ i ] - handle - > tx_pkts_last [ i ] ) ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * i + 1 ] , handle - > column_ids [ 3 ] , FS_OP_SET , ( handle - > tx_bytes [ i ] - handle - > tx_bytes_last [ i ] ) ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i + 1 ] , fs2_info - > column_ids [ 0 ] , FS_OP_SET , ( data_info - > rx_pkts [ i ] - data_info - > rx_pkts_last [ i ] ) ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i + 1 ] , fs2_info - > column_ids [ 1 ] , FS_OP_SET , ( data_info - > rx_bytes [ i ] - data_info - > rx_bytes_last [ i ] ) ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i + 1 ] , fs2_info - > column_ids [ 2 ] , FS_OP_SET , ( data_info - > tx_pkts [ i ] - data_info - > tx_pkts_last [ i ] ) ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * i + 1 ] , fs2_info - > column_ids [ 3 ] , FS_OP_SET , ( data_info - > tx_bytes [ i ] - data_info - > tx_bytes_last [ i ] ) ) ;
handle - > rx_pkts_last [ i ] = handle - > rx_pkts [ i ] ;
handle - > rx_bytes_last [ i ] = handle - > rx_bytes [ i ] ;
handle - > tx_pkts_last [ i ] = handle - > tx_pkts [ i ] ;
handle - > tx_bytes_last [ i ] = handle - > tx_bytes [ i ] ;
data_info - > rx_pkts_last [ i ] = data_info - > rx_pkts [ i ] ;
data_info - > rx_bytes_last [ i ] = data_info - > rx_bytes [ i ] ;
data_info - > tx_pkts_last [ i ] = data_info - > tx_pkts [ i ] ;
data_info - > tx_bytes_last [ i ] = data_info - > tx_bytes [ i ] ;
}
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum ] , handle - > column_ids [ 0 ] , FS_OP_SET , rx_pkts_sum ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum ] , handle - > column_ids [ 1 ] , FS_OP_SET , rx_bytes_sum ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum ] , handle - > column_ids [ 2 ] , FS_OP_SET , tx_pkts_sum ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum ] , handle - > column_ids [ 3 ] , FS_OP_SET , tx_bytes_sum ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum ] , fs2_info - > column_ids [ 0 ] , FS_OP_SET , rx_pkts_sum ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum ] , fs2_info - > column_ids [ 1 ] , FS_OP_SET , rx_bytes_sum ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum ] , fs2_info - > column_ids [ 2 ] , FS_OP_SET , tx_pkts_sum ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum ] , fs2_info - > column_ids [ 3 ] , FS_OP_SET , tx_bytes_sum ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum + 1 ] , handle - > column_ids [ 0 ] , FS_OP_SET , rx_pkts_sum_interval ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum + 1 ] , handle - > column_ids [ 1 ] , FS_OP_SET , rx_bytes_sum_interval ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum + 1 ] , handle - > column_ids [ 2 ] , FS_OP_SET , tx_pkts_sum_interval ) ;
FS_operate ( handle - > fs2_handle , handle - > line_ids [ 2 * handle - > thread_sum + 1 ] , handle - > column_ids [ 3 ] , FS_OP_SET , tx_bytes_sum_interval ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum + 1 ] , fs2_info - > column_ids [ 0 ] , FS_OP_SET , rx_pkts_sum_interval ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum + 1 ] , fs2_info - > column_ids [ 1 ] , FS_OP_SET , rx_bytes_sum_interval ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum + 1 ] , fs2_info - > column_ids [ 2 ] , FS_OP_SET , tx_pkts_sum_interval ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ 2 * handle - > thread_sum + 1 ] , fs2_info - > column_ids [ 3 ] , FS_OP_SET , tx_bytes_sum_interval ) ;
//fd info
fs2_info = & handle - > fs2_info [ 1 ] ;
for ( i = 0 ; i < handle - > thread_sum ; i + + )
{
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ i ] , fs2_info - > column_ids [ 0 ] , FS_OP_SET , fd_info [ i ] [ 1 ] ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ i ] , fs2_info - > column_ids [ 1 ] , FS_OP_SET , fd_info [ i ] [ 2 ] ) ;
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ i ] , fs2_info - > column_ids [ 2 ] , FS_OP_SET , fd_info [ i ] [ 0 ] ) ;
}
//cache info
fs2_info = & handle - > fs2_info [ 2 ] ;
for ( i = 0 ; i < handle - > thread_sum ; i + + )
{
FS_operate ( fs2_info - > fs2_handle , fs2_info - > line_ids [ i ] , fs2_info - > column_ids [ 0 ] , FS_OP_SET , hos_cache [ i ] ) ;
}
sleep ( 1 ) ;
}
pthread_exit ( NULL ) ;
@@ -267,81 +321,127 @@ static void *fs2_statistics(void *ptr)
void hos_expand_fs2 ( hos_client_handle handle , const char * path , int format , char * server_ip , int port )
{
screen_stat_handle _t fs2_handle = NULL ;
fs2_info _t * fs2_info = NULL ;
screen_stat_handle_t * fs2_handle = NULL ;
const char * app_name = " hos-sdk-client-cpp " ;
int value = 0 ;
char buff [ 128 ] ;
int i = 0 ;
fs2_handle = FS_create_handle ( ) ;
//fs2 init
for ( i = 0 ; i < 3 ; i + + )
{
fs2_handle = & handle - > fs2_info [ i ] . fs2_handle ;
* fs2_handle = FS_create_handle ( ) ;
FS_set_para ( fs2_handle , APP_NAME , app_name , strlen ( app_name ) + 1 ) ;
FS_set_para ( * fs2_handle , APP_NAME , app_name , strlen ( app_name ) + 1 ) ;
value = 1 ; //true
FS_set_para ( fs2_handle , FLUSH_BY_DATE , & value , sizeof ( value ) ) ;
FS_set_para ( * fs2_handle , FLUSH_BY_DATE , & value , sizeof ( value ) ) ;
if ( path ! = NULL )
{
FS_set_para ( fs2_handle , OUTPUT_DEVICE , path , strlen ( path ) + 1 ) ;
FS_set_para ( * fs2_handle , OUTPUT_DEVICE , path , strlen ( path ) + 1 ) ;
}
value = 2 ; //append
FS_set_para ( fs2_handle , PRINT_MODE , & value , sizeof ( value ) ) ;
FS_set_para ( * fs2_handle , PRINT_MODE , & value , sizeof ( value ) ) ;
value = 1 ;
FS_set_para ( fs2_handle , CREATE_THREAD , & value , sizeof ( value ) ) ;
FS_set_para ( fs2_handle , METRIS_FORMAT , & format , sizeof ( format ) ) ;
FS_set_para ( fs2_handle , STAT_CYCLE , & value , sizeof ( value ) ) ;
FS_set_para ( * fs2_handle , CREATE_THREAD , & value , sizeof ( value ) ) ;
FS_set_para ( * fs2_handle , METRIS_FORMAT , & format , sizeof ( format ) ) ;
FS_set_para ( * fs2_handle , STAT_CYCLE , & value , sizeof ( value ) ) ;
value = 4096 ;
FS_set_para ( fs2_handle , MAX_STAT_FIELD_NUM , & value , sizeof ( value ) ) ;
FS_set_para ( * fs2_handle , MAX_STAT_FIELD_NUM , & value , sizeof ( value ) ) ;
if ( server_ip = = NULL )
{
FS_set_para ( fs2_handle , STATS_SERVER_IP , " 127.0.0.1 " , strlen ( " 127.0.0.1 " ) ) ;
FS_set_para ( * fs2_handle , STATS_SERVER_IP , " 127.0.0.1 " , strlen ( " 127.0.0.1 " ) ) ;
} else
{
FS_set_para ( fs2_handle , STATS_SERVER_IP , server_ip , strlen ( server_ip ) ) ;
FS_set_para ( * fs2_handle , STATS_SERVER_IP , server_ip , strlen ( server_ip ) ) ;
}
FS_set_para ( fs2_handle , STATS_SERVER_PORT , & port , sizeof ( port ) ) ;
FS_set_para ( * fs2_handle , STATS_SERVER_PORT , & port , sizeof ( port ) ) ;
value = FS_OUTPUT_STATSD ;
FS_set_para ( fs2_handle , STATS_FORMAT , & value , sizeof ( value ) ) ;
FS_set_para ( * fs2_handle , STATS_FORMAT , & value , sizeof ( value ) ) ;
}
int * line_ids = ( int * ) calloc ( 2 * handle - > thread_sum + 2 , sizeof ( int ) ) ;
int * column_ids = ( int * ) calloc ( 4 , sizeof ( int ) ) ;
//pkts and bytes info
fs2_info = & handle - > fs2_info [ 0 ] ;
fs2_handle = & handle - > fs2_info [ 0 ] . fs2_handle ;
fs2_info - > line_ids = ( int * ) calloc ( 2 * handle - > thread_sum + 2 , sizeof ( int ) ) ;
fs2_info - > column_ids = ( int * ) calloc ( 4 , sizeof ( int ) ) ;
//line info
snprintf ( buff , sizeof ( buff ) , " rx_pkts " ) ;
column_ids [ 0 ] = FS_register ( fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
fs2_info - > column_ids[ 0 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " rx_bytes " ) ;
column_ids [ 1 ] = FS_register ( fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
fs2_info - > column_ids[ 1 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " tx_pkts " ) ;
column_ids [ 2 ] = FS_register ( fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
fs2_info - > column_ids[ 2 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " tx_bytes " ) ;
column_ids [ 3 ] = FS_register ( fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
fs2_info - > column_ids[ 3 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
for ( i = 0 ; i < ( int ) handle - > thread_sum ; i + + )
{
snprintf ( buff , sizeof ( buff ) , " total(%d) " , i ) ;
line_ids [ 2 * i ] = FS_register ( fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
fs2_info - > line_ids[ 2 * i ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " rate(%d) " , i ) ;
line_ids [ 2 * i + 1 ] = FS_register ( fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
fs2_info - > line_ids[ 2 * i + 1 ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
}
snprintf ( buff , sizeof ( buff ) , " total " ) ;
line_ids [ 2 * handle - > thread_sum ] = FS_register ( fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
fs2_info - > line_ids[ 2 * handle - > thread_sum ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " rate " ) ;
line_ids [ 2 * handle - > thread_sum + 1 ] = FS_register ( fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
fs2_info - > line_ids[ 2 * handle - > thread_sum + 1 ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
handle - > fs2_handle = fs2_handle ;
handle - > line_ids = line_ids ;
handle - > column_ids = column_ids ;
handle - > fs2_status = HOS_FS2_START ;
handle - > tx_pkts = ( int * ) calloc ( handle - > thread_sum , sizeof ( in t) ) ;
handle - > tx_bytes = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
handle - > r x_pkts = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
handle - > r x_bytes = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
handle - > t x_pkts_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
handle - > t x_bytes_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
handle - > r x_pkts_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
handle - > r x_bytes_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info_t * data_info = ( data_info_t * ) calloc ( 1 , sizeof ( data_info_ t) ) ;
fs2_info - > reserved = ( void * ) data_info ;
data_info - > t x_pkts = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > t x_bytes = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > r x_pkts = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > r x_bytes = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > t x_pkts_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > t x_bytes_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > rx_pkts_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
data_info - > rx_bytes_last = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
FS_start ( * fs2_handle ) ;
FS_start ( fs2_handle ) ;
//fd info
fs2_info = & handle - > fs2_info [ 1 ] ;
fs2_handle = & handle - > fs2_info [ 1 ] . fs2_handle ;
fs2_info - > line_ids = ( int * ) calloc ( handle - > thread_sum , sizeof ( int ) ) ;
fs2_info - > column_ids = ( int * ) calloc ( 3 , sizeof ( int ) ) ;
snprintf ( buff , sizeof ( buff ) , " REGISTER " ) ;
fs2_info - > column_ids [ 0 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " INJECT " ) ;
fs2_info - > column_ids [ 1 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
snprintf ( buff , sizeof ( buff ) , " FREE " ) ;
fs2_info - > column_ids [ 2 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
for ( i = 0 ; i < ( int ) handle - > thread_sum ; i + + )
{
snprintf ( buff , sizeof ( buff ) , " num(%d) " , i ) ;
fs2_info - > line_ids [ i ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
}
FS_start ( * fs2_handle ) ;
//cache info
fs2_info = & handle - > fs2_info [ 2 ] ;
fs2_handle = & handle - > fs2_info [ 2 ] . fs2_handle ;
fs2_info - > line_ids = ( int * ) calloc ( handle - > thread_sum + 1 , sizeof ( int ) ) ;
fs2_info - > column_ids = ( int * ) calloc ( 1 , sizeof ( int ) ) ;
snprintf ( buff , sizeof ( buff ) , " cached " ) ;
fs2_info - > column_ids [ 0 ] = FS_register ( * fs2_handle , FS_STYLE_COLUMN , FS_CALC_CURRENT , buff ) ;
for ( i = 0 ; i < ( int ) handle - > thread_sum ; i + + )
{
snprintf ( buff , sizeof ( buff ) , " Bytes(%d) " , i ) ;
fs2_info - > line_ids [ i ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
}
snprintf ( buff , sizeof ( buff ) , " total " ) ;
fs2_info - > line_ids [ i ] = FS_register ( * fs2_handle , FS_STYLE_LINE , FS_CALC_CURRENT , buff ) ;
FS_start ( * fs2_handle ) ;
pthread_create ( & handle - > fs2_thread , NULL , fs2_statistics , handle ) ;
@@ -406,7 +506,9 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
{
struct stat buffer ;
char buf [ 128 ] ;
size_t fd = hash_get_min_free_fd ( thread_id ) ;
size_t stream_len = 0 ;
data_info_t * data_info = NULL ;
int ret ;
if ( ( handle = = NULL ) | | ( bucket = = NULL ) | | ( object = = NULL ) | | ( callback = = NULL ) | | ( thread_id > handle - > thread_sum ) )
{
@@ -426,6 +528,8 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
{
return HOS_FILE_NOT_EXITS ;
}
stream_len = buffer . st_size ;
//文件类型
const std : : shared_ptr < Aws : : IOStream > input_data =
Aws : : MakeShared < Aws : : FStream > ( " SampleAllocationTag " , object , std : : ios_base : : in | std : : ios_base : : binary ) ;
@@ -434,14 +538,26 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
else
{
//内存块
stream_len = data_len ;
const std : : shared_ptr < Aws : : IOStream > input_data =
Aws : : MakeShared < Aws : : StringStream > ( data ) ;
Aws : : String stream ( data , data_len ) ;
* input_data < < stream ;
request . SetBody ( input_data ) ;
}
//field_stat2 record
if ( handle - > fs2_info [ 0 ] . fs2_handle )
{
if ( handle - > fs2_info [ 0 ] . reserved )
{
data_info = ( data_info_t * ) handle - > fs2_info [ 0 ] . reserved ;
data_info - > rx_pkts [ thread_id ] + + ;
data_info - > rx_bytes [ thread_id ] + = stream_len ;
}
}
//设置回调函数
size_t fd = hash_get_min_free_fd ( thread_id ) ;
std : : shared_ptr < Aws : : Client : : AsyncCallerContext > context =
Aws : : MakeShared < Aws : : Client : : AsyncCallerContext > ( " " ) ;
sprintf ( buf , " %lu %lu " , thread_id , fd ) ;
@@ -449,10 +565,23 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
hos_info_t info = { fd , 0 , handle , ( char * ) bucket , ( char * ) object , ( void * ) callback , userdata , NULL , 0 , 0 , 0 } ;
add_hos_info ( & hash_hos_info [ thread_id ] , & info ) ;
fd_info [ thread_id ] [ fd ] = 1 ;
S3Client . PutObjectAsync ( request , PutObjectAsyncFinished , context ) ;
ret = S3Client. PutObjectAsync ( request , PutObjectAsyncFinished , context ) ;
if ( ret )
{
//field_stat2 record
if ( handle - > fs2_info [ 0 ] . fs2_handle )
{
if ( handle - > fs2_info [ 0 ] . reserved )
{
data_info = ( data_info_t * ) handle - > fs2_info [ 0 ] . reserved ;
data_info - > tx_pkts [ thread_id ] + + ;
data_info - > tx_bytes [ thread_id ] + = stream_len ;
}
}
return HOS_CLIENT_OK ;
}
return HOS_SEND_FAILED ;
}
int hos_upload_file ( hos_client_handle handle , const char * bucket , const char * file_path ,
@@ -480,19 +609,20 @@ static void *hos_fd_manage(void *ptr)
break ;
for ( thread_num = 0 ; thread_num < thread_sum ; thread_num + + )
{
for ( fd = 0 ; fd < MAX_HOS_CLIENT_FD_NUM ; fd + + )
for ( fd = 3 ; fd < MAX_HOS_CLIENT_FD_NUM + 1 ; fd + + )
{
if ( ! fd_info [ thread_num ] [ fd ] )
break ;
continue ;
hos_info = find_info_by_fd ( hash_hos_info [ thread_num ] , fd ) ;
if ( ! hos_info )
break ;
if ( hos_info - > fd_status = = HOS_FD_REGISTER )
continue ;
if ( hos_info - > fd_status = = HOS_FD_INJECT )
{
if ( ( hos_info - > position = = hos_info - > recive_cnt ) | | ( hos_info - > overtime < = get_current_ms ( ) ) )
hos_delete_fd ( fd , thread_num ) ;
}
}
}
usleep ( 1000 ) ;
}
pthread_exit ( NULL ) ;
@@ -511,9 +641,8 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_FD_NOT_ENOUGH ;
}
hos_info_t info = { fd , mode , handle , ( char * ) bucket , ( char * ) object , ( void * ) callback , userdata , NULL , handle - > cache_times , handle - > cache_size , 0 , 0 , HOS_FD_REGISTER , 0 , handle - > timeout , } ;
hos_info_t info = { fd , mode , handle , ( char * ) bucket , ( char * ) object , ( void * ) callback , userdata , NULL , handle - > cache_count , handle - > cache_size , 0 , 0 , HOS_FD_REGISTER , 0 , handle - > timeout , } ;
add_hos_info ( & hash_hos_info [ thread_id ] , & info ) ;
fd_info [ thread_id ] [ fd ] = 1 ;
# if 1
if ( handle - > fd_thread = = 0 )
{
@@ -534,6 +663,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
int flag = 0 ; // 0, 一次处理就可以完成; 1, 需要多次处理才能处理完
int rest ; // stream 剩余未处理的数据长度
int ret = 0 ;
data_info_t * data_info = NULL ;
if ( ( fd = = 0 ) | | ( stream = = NULL ) | | ( thread_id > MAX_HOS_CLIENT_THREAD_NUM ) )
{
return HOS_PARAMETER_ERROR ;
@@ -551,10 +682,14 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
handle = ( hos_client_handle ) hos_info - > handle ;
//field_stat2 record
if ( handle - > fs2_handle )
if ( handle - > fs2_info [ 0 ] . fs2_ handle )
{
handle - > rx_pkts [ thread_id ] + + ;
handle - > rx_bytes [ thread_id ] + = stream_len ;
if ( handle - > fs2_info [ 0 ] . reserved )
{
data_info = ( data_info_t * ) handle - > fs2_info [ 0 ] . reserved ;
data_info - > rx_pkts [ thread_id ] + + ;
data_info - > rx_bytes [ thread_id ] + = stream_len ;
}
}
Aws : : S3 : : S3Client & S3Client = * ( handle - > S3Client ) ;
@@ -573,9 +708,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
{
hos_info - > cache = Aws : : MakeShared < Aws : : StringStream > ( " append mode " ) ;
}
if ( hos_info - > cache_times = = 0 )
if ( hos_info - > cache_count = = 0 )
{
//不设置cache_times 的情况下
//不设置cache_count 的情况下
if ( stream_len < hos_info - > cache_rest )
{
// cache
@@ -584,6 +719,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_info - > cache_rest - = stream_len ;
if ( hos_info - > cache_rest > 0 )
{
hos_cache [ thread_id ] + = stream_len ;
return HOS_CLIENT_OK ;
}
} else if ( stream_len > = hos_info - > cache_rest )
@@ -597,7 +733,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
} else
{
//设置cache times的情况下
if ( ( - - hos_info - > cache_times ) & & ( stream_len < = hos_info - > cache_rest ) )
if ( ( - - hos_info - > cache_count ) & & ( stream_len < = hos_info - > cache_rest ) )
{
// cache
Aws : : String buffer ( stream , stream_len ) ;
@@ -605,6 +741,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_info - > cache_rest - = stream_len ;
if ( hos_info - > cache_rest > 0 )
{
hos_cache [ thread_id ] + = stream_len ;
return HOS_CLIENT_OK ;
}
} else if ( stream_len > hos_info - > cache_rest )
@@ -616,7 +753,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
rest = stream_len - hos_info - > cache_rest ;
} else
{
//over cache_times
//over cache_count
Aws : : String buffer ( stream , stream_len ) ;
* hos_info - > cache < < buffer ;
}
@@ -663,30 +800,32 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
context - > SetUUID ( buf ) ;
ret = S3Client . PutObjectAsync ( request , PutObjectAsyncFinished , context ) ;
hos_cache [ thread_id ] = 0 ;
//恢复fd 的cache设置
if ( hos_info - > mode & APPEND_MODE )
{
hos_info - > cache = NULL ;
hos_info - > cache_rest = hos_info - > handle - > cache_size ;
hos_info - > cache_times = hos_info - > handle - > cache_times ;
hos_info - > cache_count = hos_info - > handle - > cache_count ;
}
if ( ret )
{
if ( handle - > fs2_handle )
if ( data_info )
{
handle - > tx_pkts [ thread_id ] + + ;
data_info - > tx_pkts [ thread_id ] + + ;
if ( hos_info - > mode & BUFF_MODE )
{
if ( hos_info - > mode & APPEND_MODE )
{
handle - > tx_bytes [ thread_id ] + = handle - > cache_size ;
data_info - > tx_bytes [ thread_id ] + = handle - > cache_size ;
} else
{
handle - > tx_bytes [ thread_id ] + = stream_len ;
data_info - > tx_bytes [ thread_id ] + = stream_len ;
}
} else
{
handle - > tx_bytes [ thread_id ] + = buffer . st_size ;
data_info - > tx_bytes [ thread_id ] + = buffer . st_size ;
}
}
while ( flag = = 1 )
@@ -703,10 +842,12 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
int hos_close_fd ( size_t fd , size_t thread_id )
{
hos_info_t * hos_info ;
hos_info_t * hos_info = NULL ;
char num [ 128 ] ;
char buf [ 128 ] ;
if ( fd = = 0 )
data_info_t * data_info = NULL ;
if ( fd < 3 )
{
return HOS_PARAMETER_ERROR ;
}
@@ -746,12 +887,24 @@ int hos_close_fd(size_t fd, size_t thread_id)
context - > SetUUID ( buf ) ;
S3Client . PutObjectAsync ( request , PutObjectAsyncFinished , context ) ;
if ( hos_info - > handle - > fs2_info [ 0 ] . fs2_handle )
{
if ( hos_info - > handle - > fs2_info [ 0 ] . reserved )
data_info = ( data_info_t * ) hos_info - > handle - > fs2_info [ 0 ] . reserved ;
data_info - > tx_pkts [ thread_id ] + + ;
data_info - > tx_bytes [ thread_id ] + = hos_info - > handle - > cache_size - hos_info - > cache_rest ;
}
hos_cache [ thread_id ] = 0 ;
}
}
hos_info - > fd_status = HOS_FD_INJECT ;
hos_info - > cache . reset ( ) ;
hos_info - > overtime = get_current_ms ( ) + hos_info - > timeout ;
fd_info [ thread_id ] [ HOS_FD_REGISTER ] - - ;
fd_info [ thread_id ] [ HOS_FD_INJECT ] + + ;
return HOS_CLIENT_OK ;
}
@@ -772,35 +925,47 @@ int hos_client_destory(hos_client_handle handle)
handle - > fd_thread_status = 1 ;
pthread_join ( handle - > fd_thread , NULL ) ;
}
for ( i = 0 ; i < handle- > thread_sum ; i + + )
{
delete_all ( & hash_hos_info [ i ] ) ;
}
if ( handle - > fs2_handle )
i f ( handle - > fs2_ thread)
{
handle - > fs2_status = HOS_FS2_STOP ;
pthread_join ( handle - > fs2_thread , NULL ) ;
FS_stop ( & handle - > fs2_handle ) ;
if ( handle - > rx_pkts )
f ree( handle - > rx_pkts ) ;
if ( handle - > rx_bytes )
free ( handle - > rx_bytes ) ;
if ( handle - > tx_pkts )
free ( handle - > tx_pkts ) ;
if ( handle - > tx_bytes )
free ( handle - > tx_bytes ) ;
if ( handle - > rx_pkts_last )
free ( handle - > rx_pkts_last ) ;
if ( handle - > rx_bytes_last )
free ( handle - > rx_bytes_last ) ;
if ( handle - > tx_pkts_last )
free ( handle - > tx_pkts_last ) ;
if ( handle - > tx_bytes_last )
free ( handle - > tx_bytes_last ) ;
if ( handle - > line_ids )
free ( handle - > line_ids ) ;
if ( handle - > column_ids )
free ( handle - > column_ids ) ;
for ( i = 0 ; i < 3 ; i + + )
{
sc reen_stat_handle_t * fs2_handle = & handle - > fs2_info [ i ] . fs2_handle ;
FS_stop ( fs2_handle ) ;
if ( handle - > fs2_info [ i ] . reserved )
{
if ( i = = 0 )
{
data_info_t * data_info = ( data_info_t * ) handle - > fs2_info [ i ] . reserved ;
if ( data_info - > rx_pkts )
free ( data_info - > rx_pkts ) ;
if ( data_info - > rx_bytes )
free ( data_info - > rx_bytes ) ;
if ( data_info - > tx_pkts )
free ( data_info - > tx_pkts ) ;
if ( data_info - > tx_bytes )
free ( data_info - > tx_bytes ) ;
if ( data_info - > rx_pkts_last )
free ( data_info - > rx_pkts_last ) ;
if ( data_info - > rx_bytes_last )
free ( data_info - > rx_bytes_last ) ;
if ( data_info - > tx_pkts_last )
free ( data_info - > tx_pkts_last ) ;
if ( data_info - > tx_bytes_last )
free ( data_info - > tx_bytes_last ) ;
}
}
if ( handle - > fs2_info [ i ] . line_ids )
free ( handle - > fs2_info [ i ] . line_ids ) ;
if ( handle - > fs2_info [ i ] . column_ids )
free ( handle - > fs2_info [ i ] . column_ids ) ;
}
}
for ( i = 0 ; i < handle - > thread_sum ; i + + )
{
delete_all ( & hash_hos_info [ i ] ) ;
}
free ( handle ) ;