diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c691f24..78d82f3e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ add_subdirectory(src) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/src/libhos-client-cpp.so DESTINATION ${CMAKE_INSTALL_PREFIX}/lib COMPONENT LIBRARIES) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/hos_client.h DESTINATION ${CMAKE_INSTALL_PREFIX}/include COMPONENT HEADER) -install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/conf/hos.conf DESTINATION /etc/ld.so.conf.d COMPONENT PROFILE) +#install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/conf/hos.conf DESTINATION /etc/ld.so.conf.d COMPONENT PROFILE) #install(FILES ${CMAKE_CURRENT_BINARY_DIR}/src/libhos-client-cpp.a DESTINATION ${CMAKE_INSTALL_PREFIX} COMPONENT PROFILE) include(Package) diff --git a/conf/hos.conf b/conf/hos.conf deleted file mode 100644 index 04ecf183..00000000 --- a/conf/hos.conf +++ /dev/null @@ -1 +0,0 @@ -/usr/local/lib64/ \ No newline at end of file diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 80c8207a..6892b387 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -5,9 +5,10 @@ ************************************************************************/ extern "C" { -#include +#include #include #include +#include } #include #include @@ -21,31 +22,35 @@ extern "C" #include "hos_client.h" #include "hos_hash.h" #include "field_stat2.h" - -#define MAX_HOS_CLIENT_THREAD_NUM 255 -#define MAX_HOS_CLIENT_FD_NUM 65535 +#include "MESA_handle_logger.h" +#include "MESA_prof_load.h" #if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410) #define atomic_add(x,y) __sync_add_and_fetch((x),(y)) #define atomic_read(x) __sync_add_and_fetch((x),0) +#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) #else #define atomic_add(x,y) ((*(x))+=(y)) #define atomic_read(x) (*(x)) +#define atomic_sub(x,y) ((*(x))-=(y)) #endif +#define MAX_HOS_STRING_LEN 1024 +#define HOS_ERROR_MESSAGE_SIZE (MAX_HOS_STRING_LEN - 1) +#define MAX_HOS_CLIENT_FD_NUM 65535 +#define HOS_LOG_PATH "./tsglog/hoslog" + typedef struct data_info_s { - int *tx_pkts; - int *tx_bytes; - int *rx_pkts; - int *rx_bytes; - int *tx_pkts_last; - int *tx_bytes_last; - int *rx_pkts_last; - int *rx_bytes_last; + volatile size_t tx_pkts; + volatile size_t tx_bytes; + volatile size_t rx_pkts; + volatile size_t rx_bytes; + volatile size_t tx_failed_pkts; + volatile size_t tx_failed_bytes; + volatile size_t cache; }data_info_t; - typedef struct fs2_info_s { screen_stat_handle_t fs2_handle; @@ -54,50 +59,63 @@ typedef struct fs2_info_s void *reserved; //预留给每个fs2 handle用来存储自定义的数据 }fs2_info_t; -typedef struct hos_client_handle_s +enum { - Aws::S3::S3Client *S3Client; - Aws::Vector buckets; - std::shared_ptr executor; + FS2_DATA_FLOW_STATE = 0, + FS2_POOL_THREAD_STATE, + FS2_RECORD_EVENTS, +}; + +typedef struct hos_config_s +{ + char ip[INET6_ADDRSTRLEN]; + char fs2_ip[INET6_ADDRSTRLEN]; + char accesskeyid[MAX_HOS_STRING_LEN]; + char secretkey[MAX_HOS_STRING_LEN]; + char log_path[MAX_HOS_STRING_LEN]; + char fs2_path[MAX_HOS_STRING_LEN]; + + uint32_t port; + uint32_t fs2_port; + uint32_t fs2_fmt; + uint32_t log_level; + uint32_t pool_thread_size; + uint32_t thread_num; + uint32_t cache_size; + uint32_t cache_count; + uint32_t timeout; +}hos_config_t; + +typedef struct hos_func_thread_s +{ + /* fd 管理线程 */ pthread_t fd_thread; int fd_thread_status; - int count; /* 记录了有多少个对象在使用hos */ - size_t pool_thread_size; - /* options */ - size_t cache_size; - size_t cache_count; - size_t thread_sum; - size_t timeout; - /* expand */ -#ifndef FS2_RECORD_EVENTS -#define FS2_RECORD_EVENTS 4 -#endif + /* fs2 管理线程 */ fs2_info_t fs2_info[FS2_RECORD_EVENTS]; //0: data info; 1: fd info; 2 cache info; 3 PoolThread state pthread_t fs2_thread; int fs2_status; #define HOS_FS2_START 1 #define HOS_FS2_STOP 2 +}hos_func_thread_t; + +typedef struct hos_client_handle_s +{ + Aws::S3::S3Client *S3Client; + Aws::Vector buckets; + std::shared_ptr executor; + size_t count; /* 记录了有多少个对象在使用hos */ + hos_config_t hos_config; + hos_func_thread_t hos_func; + void *log; }hos_client_handle_t; -std::mutex m_client_lock; /* create和destroy操作时使用的锁 */ -hos_client_handle g_hos_handle;//一个进程只允许有一个g_hos_handle -//hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; -hos_info_t **hash_hos_info; -size_t *hos_cache;//记录当前hos缓存了多少数据 -size_t (*fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd -Aws::SDKOptions g_options; -static char g_hos_error_msg[1024]; -static size_t g_hos_error_num; - -char *hos_get_error_msg() -{ - return g_hos_error_msg; -} - -size_t hos_get_error_num() -{ - return g_hos_error_num; -} +static struct hos_instance_s g_hos_instance; +static hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle +static std::mutex m_client_lock; +static hos_fd_context_t **fd_context; +static size_t (*fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd +static Aws::SDKOptions g_options; static inline size_t get_current_ms() { @@ -129,7 +147,7 @@ static int hos_delete_fd(size_t fd, size_t thread_id) { return HOS_PARAMETER_ERROR; } - delete_info_by_fd(&hash_hos_info[thread_id], fd); + delete_context_by_fd(&fd_context[thread_id], fd); fd_info[thread_id][fd] = 0; fd_info[thread_id][HOS_FD_FREE]++; fd_info[thread_id][HOS_FD_INJECT]--; @@ -143,266 +161,221 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const std::shared_ptr& context) { const char *error = NULL; - hos_info_t *hos_info = NULL; - bool result = outcome.IsSuccess(); - if (!result) - { - error = outcome.GetError().GetMessage().c_str(); - } + hos_fd_context_t *a_fd_context = NULL; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + data_info_t *data_info = NULL; const Aws::String& uuid = context->GetUUID(); size_t thread_id, fd; sscanf(uuid.c_str(), "%lu %lu", &thread_id, &fd); if (fd_info[thread_id][fd]) { - hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + a_fd_context = find_context_by_fd(fd_context[thread_id], fd); } - if (hos_info == NULL) + if (a_fd_context == NULL) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "Not find the info of [thread_id:%d fd:%d]", thread_id, fd); + + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + atomic_add(&(data_info->tx_failed_pkts), 1); + atomic_add(&(data_info->tx_failed_bytes), stream_len); + } return ; } - put_finished_callback callback = (put_finished_callback)hos_info->callback; - callback(result, hos_info->bucket, hos_info->object, error, hos_info->userdata); - if (hos_info->mode & APPEND_MODE) + + bool result = outcome.IsSuccess(); + if (!result) + { + error = outcome.GetError().GetMessage().c_str(); + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "[%s:%s] upload failed. error:%s",a_fd_context->bucket, a_fd_context->object, error); + + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + atomic_add(&(data_info->tx_failed_pkts), 1); + atomic_add(&(data_info->tx_failed_bytes), stream_len); + } + } + else + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "[%s:%s] upload success", a_fd_context->bucket, a_fd_context->object); + + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + atomic_add(&(data_info->tx_pkts), 1); + atomic_add(&(data_info->tx_bytes), stream_len); + } + } + put_finished_callback callback = (put_finished_callback)a_fd_context->callback; + callback(result, a_fd_context->bucket, a_fd_context->object, error, a_fd_context->userdata); + if (a_fd_context->mode & APPEND_MODE) { //APPEND MODE 保留fd - atomic_add(&(hos_info->recive_cnt), 1); - }else + atomic_add(&(a_fd_context->recive_cnt), 1); + } + else { //完整上传 删除fd hos_close_fd(fd, thread_id); } } -void hos_set_cache_size(hos_client_handle client, size_t cache_size) +static void hos_client_create() { - if (client == NULL) - { - return; - } - client->cache_size = cache_size; - return ; -} + std::lock_guard locker(m_client_lock); + hos_config_t *hos_conf = &g_hos_handle.hos_config; + void *log = g_hos_handle.log; -void hos_set_cache_count(hos_client_handle client, size_t cache_count) -{ - if (client == NULL) - { - return; - } - client->cache_count = cache_count; - return ; -} + Aws::InitAPI(g_options); + Aws::Client::ClientConfiguration config; + Aws::Auth::AWSCredentials credentials(hos_conf->accesskeyid, hos_conf->secretkey); -void hos_set_thread_sum(hos_client_handle client, size_t thread_sum) -{ - if (client == NULL) + //初始化 + char endpoint[128]; + snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); + config.endpointOverride = endpoint; + config.verifySSL = false; + config.enableEndpointDiscovery = true; + if (hos_conf->pool_thread_size > 0) { - return; - } - if (client->thread_sum >= thread_sum ) - { - return; - } - - if (hos_cache) - { - hos_cache = (size_t *)realloc(hos_cache, thread_sum * sizeof(size_t)); - } - if (hash_hos_info) - { - hash_hos_info = (hos_info_t **)realloc(hash_hos_info, thread_sum * sizeof(hos_info_t *)); - memset(&hash_hos_info[client->thread_sum], 0, (thread_sum - client->thread_sum) * sizeof(hos_info_t *)); + //异步模式 + config.executor = std::shared_ptr(std::make_shared(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 } else { - hash_hos_info = (hos_info_t **)calloc(thread_sum, sizeof(hos_info_t*)); + //同步模式 } - if (fd_info) + g_hos_handle.S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + /* 获取当前用户的所有的buckets */ + Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); + + if (!outcome.IsSuccess()) { - fd_info = (size_t (*) [MAX_HOS_CLIENT_FD_NUM + 1])realloc(fd_info, thread_sum * sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); - memset(&fd_info[client->thread_sum], 0, (thread_sum - client->thread_sum) * sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); - }else - { - fd_info = (size_t (*) [MAX_HOS_CLIENT_FD_NUM + 1])calloc(thread_sum, sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); + delete g_hos_handle.S3Client; + g_hos_handle.S3Client = NULL; + Aws::ShutdownAPI(g_options); + g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; + snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); + g_hos_instance.result = false; + MESA_handle_runtime_log(log, RLOG_LV_FATAL, "hos_client_create", g_hos_instance.error_message); + return; } - for (size_t i = 0; i < thread_sum; i++) + g_hos_handle.buckets = outcome.GetResult().GetBuckets(); + g_hos_handle.count++; + g_hos_handle.executor = std::dynamic_pointer_cast(config.executor); + + fd_context = (hos_fd_context_t **)calloc(1, sizeof(hos_fd_context_t *)); + fd_info = (size_t (*)[MAX_HOS_CLIENT_FD_NUM + 1])calloc(1, sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); + + for (size_t i = 0; i < hos_conf->thread_num; i++) { fd_info[i][0] = 65533; } - client->thread_sum = thread_sum; - return ; + MESA_handle_runtime_log(log, RLOG_LV_DEBUG, "hos_client_create", "hos s3client create success, url:%s.",endpoint); + g_hos_instance.result = true; } -hos_client_handle hos_client_create(const char *serverip, size_t port, const char *accesskeyid, const char *secretkey, size_t pool_size) +static bool hos_verify_bucket(const char *bucket) { - std::lock_guard locker(m_client_lock); - if (!serverip || !accesskeyid || !secretkey) + Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); + + if (outcome.IsSuccess()) { - g_hos_error_num = HOS_PARAMETER_ERROR; - snprintf(g_hos_error_msg, sizeof(g_hos_error_msg) - 1, "Error: Parameter is null. serverip:%s,port:%lu, accesskeyid:%s, secretkey:%s, pool_size:%lu\n", - serverip, port, accesskeyid, secretkey, pool_size); - return NULL; - } + g_hos_handle.buckets = outcome.GetResult().GetBuckets(); - if (g_hos_handle) + for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets) + { + if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","bucket:%s exits", bucket); + return true; + } + } + } + else { - g_hos_handle->count++; - return g_hos_handle; + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str()); } - - Aws::InitAPI(g_options); - g_hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); - memset(g_hos_handle, 0, sizeof(hos_client_handle_t)); - Aws::Client::ClientConfiguration config; - Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); - - //初始化 - char endpoint[128]; - snprintf(endpoint, 128, "http://%s:%lu/hos/", serverip, port); - config.endpointOverride = endpoint; - config.verifySSL = false; - config.enableEndpointDiscovery = true; - config.executor = std::shared_ptr(std::make_shared(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池 - - g_hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - /* 获取当前用户的所有的buckets */ - Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle->S3Client->ListBuckets(); - - if (!outcome.IsSuccess()) - { - delete g_hos_handle->S3Client; - Aws::ShutdownAPI(g_options); - free(g_hos_handle); - g_hos_handle = NULL; - g_hos_error_num = (size_t)outcome.GetError().GetErrorType() + 1; - snprintf(g_hos_error_msg, sizeof(g_hos_error_msg) - 1, outcome.GetError().GetMessage().c_str()); - return NULL; - } - - g_hos_handle->buckets = outcome.GetResult().GetBuckets(); - g_hos_handle->cache_size = 0; - g_hos_handle->cache_count = 0; - g_hos_handle->thread_sum = 1; - g_hos_handle->timeout = 1000; - g_hos_handle->count++; - g_hos_handle->pool_thread_size = pool_size; - g_hos_handle->executor = std::dynamic_pointer_cast(config.executor); - - hos_cache = (size_t *)calloc(g_hos_handle->thread_sum, sizeof(size_t)); - hash_hos_info = (hos_info_t **)calloc(1, sizeof(hos_info_t *)); - fd_info = (size_t (*)[MAX_HOS_CLIENT_FD_NUM + 1])calloc(1, sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); - - fd_info[0][0] = 65533; - fd_info[0][1] = 0; - fd_info[0][2] = 0; - - return g_hos_handle; + return false; } static void *fs2_statistics(void *ptr) { - hos_client_handle handle = (hos_client_handle)ptr; size_t i = 0; - int rx_pkts_sum = 0; - int rx_bytes_sum = 0; - int tx_pkts_sum = 0; - int tx_bytes_sum = 0; - int rx_pkts_sum_interval = 0; - int rx_bytes_sum_interval = 0; - int tx_pkts_sum_interval = 0; - int tx_bytes_sum_interval = 0; + size_t rx_pkts_sum = 0; + size_t rx_bytes_sum = 0; + size_t tx_pkts_sum = 0; + size_t tx_bytes_sum = 0; + size_t tx_failed_bytes_sum = 0; + size_t cache_sum = 0; + size_t rx_pkts_interval = 0; + size_t rx_bytes_interval = 0; + size_t tx_pkts_interval = 0; + size_t tx_bytes_interval = 0; + size_t tx_failed_bytes_interval = 0; + size_t cache_interval = 0; fs2_info_t *fs2_info = NULL; - int PoolThread_state[4] = {0, 0, 0, 0};//{PoolSize, Busy, TopBusy, AveBusy} + int PoolThread_state[3] = {0, 0, 0};//{PoolSize, Busy, TopBusy} int *busy = &PoolThread_state[1]; int *top_busy = &PoolThread_state[2]; - int *ave_busy = &PoolThread_state[3]; int pool_history_sum = 0; - size_t time = 0; + hos_config_t *hos_conf = &g_hos_handle.hos_config; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; - PoolThread_state[0] = g_hos_handle->pool_thread_size; + PoolThread_state[0] = hos_conf->pool_thread_size; while(1) { - if (handle->fs2_status == HOS_FS2_STOP) + if (hos_func->fs2_status == HOS_FS2_STOP) { break; } - rx_pkts_sum = 0; - rx_bytes_sum = 0; - tx_pkts_sum = 0; - tx_bytes_sum = 0; - rx_pkts_sum_interval = 0; - rx_bytes_sum_interval = 0; - tx_pkts_sum_interval = 0; - tx_bytes_sum_interval = 0; - - *busy = g_hos_handle->executor->GetTaskSize(); - *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy); - pool_history_sum += *busy; - time++; - *ave_busy = pool_history_sum / time; - //pkts and bytes info - fs2_info = &handle->fs2_info[0]; - for (i = 0; i < handle->thread_sum; i++) - { - data_info_t *data_info = (data_info_t *)fs2_info->reserved; - rx_pkts_sum += data_info->rx_pkts[i]; - rx_bytes_sum += data_info->rx_bytes[i]; - tx_pkts_sum += data_info->tx_pkts[i]; - tx_bytes_sum += data_info->tx_bytes[i]; - rx_pkts_sum_interval += (data_info->rx_pkts[i] - data_info->rx_pkts_last[i]); - rx_bytes_sum_interval += (data_info->rx_bytes[i] - data_info->rx_bytes_last[i]); - tx_pkts_sum_interval += (data_info->tx_pkts[i] - data_info->tx_pkts_last[i]); - tx_bytes_sum_interval += (data_info->tx_bytes[i] - data_info->tx_bytes_last[i]); + fs2_info = &hos_func->fs2_info[0]; + data_info_t *data_info = (data_info_t *)fs2_info->reserved; + rx_pkts_interval = atomic_read(&(data_info->rx_pkts)); + rx_bytes_interval = atomic_read(&(data_info->rx_bytes)); + tx_pkts_interval = atomic_read(&(data_info->tx_pkts)); + tx_bytes_interval = atomic_read(&(data_info->tx_bytes)); + tx_failed_bytes_interval = atomic_read(&(data_info->tx_failed_bytes)); + cache_interval = atomic_read(&(data_info->cache)); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[0], FS_OP_SET, data_info->rx_pkts[i]); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[1], FS_OP_SET, data_info->rx_bytes[i]); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[2], FS_OP_SET, data_info->tx_pkts[i]); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[3], FS_OP_SET, data_info->tx_bytes[i]); + rx_pkts_sum += rx_pkts_interval; + rx_bytes_sum += rx_bytes_interval; + tx_pkts_sum += tx_pkts_interval; + tx_bytes_sum += tx_bytes_interval; + tx_failed_bytes_sum += tx_failed_bytes_interval; + //cache_sum += cache_interval; - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[0], FS_OP_SET, (data_info->rx_pkts[i] - data_info->rx_pkts_last[i])); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[1], FS_OP_SET, (data_info->rx_bytes[i] - data_info->rx_bytes_last[i])); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[2], FS_OP_SET, (data_info->tx_pkts[i] - data_info->tx_pkts_last[i])); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[3], FS_OP_SET, (data_info->tx_bytes[i] - data_info->tx_bytes_last[i])); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[4], FS_OP_SET, tx_failed_bytes_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[5], FS_OP_SET, cache_interval); - data_info->rx_pkts_last[i] = data_info->rx_pkts[i]; - data_info->rx_bytes_last[i] = data_info->rx_bytes[i]; - data_info->tx_pkts_last[i] = data_info->tx_pkts[i]; - data_info->tx_bytes_last[i] = data_info->tx_bytes[i]; - } - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_sum); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_sum); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_sum); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_sum); - - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_sum_interval); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_sum_interval); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_sum_interval); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_sum_interval); - - //fd info - fs2_info = &handle->fs2_info[1]; - for (i = 0; i < handle->thread_sum; i++) - { - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[0], FS_OP_SET, fd_info[i][1]); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[1], FS_OP_SET, fd_info[i][2]); - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[2], FS_OP_SET, fd_info[i][0]); - } - - //cache info - fs2_info = &handle->fs2_info[2]; - for (i = 0; i < handle->thread_sum; i++) - { - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[0], FS_OP_SET, hos_cache[i]); - } + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[4], FS_OP_SET, tx_failed_bytes_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[5], FS_OP_SET, cache_sum); //PoolThread State - fs2_info = &handle->fs2_info[3]; - for (i = 0; i < 4; i++) + *busy = g_hos_handle.executor->GetTaskSize(); + *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy); + pool_history_sum += *busy; + + fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; + for (i = 0; i < 3; i++) { FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]); } @@ -412,188 +385,287 @@ static void *fs2_statistics(void *ptr) pthread_exit(NULL); } -void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port) +static void hos_expand_fs2(const char * path, int format, char *server_ip, int port) { fs2_info_t *fs2_info = NULL; - screen_stat_handle_t *fs2_handle = NULL; - const char *app_name = "hos-sdk-client-cpp"; + screen_stat_handle_t fs2_handle = NULL; + const char *app_name[] = {"hos-data", "hos-poolthread"}; int value = 0; - char buff[128]; - int i = 0; + //hos_config_t *hos_conf = &g_hos_handle.hos_config; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + size_t i = 0; - if (handle->fs2_info[0].fs2_handle) + if (hos_func->fs2_info[0].fs2_handle) return; //fs2 init for (i = 0; i < FS2_RECORD_EVENTS; i++) { - fs2_handle = &handle->fs2_info[i].fs2_handle; - *fs2_handle = FS_create_handle(); + hos_func->fs2_info[i].fs2_handle = FS_create_handle(); + fs2_handle = hos_func->fs2_info[i].fs2_handle; - FS_set_para(*fs2_handle, APP_NAME, app_name, strlen(app_name) + 1); + FS_set_para(fs2_handle, APP_NAME, app_name[i], strlen(app_name[i]) + 1); value = 1;//true - FS_set_para(*fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); + FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); if (path != NULL) { - FS_set_para(*fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1); + if (FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1) != 0) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fs2 OUTOUT_DEVICE:%s", path); + return; + } } - value = 2;//append - FS_set_para(*fs2_handle, PRINT_MODE, &value, sizeof(value)); + value = 2; + FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value)); value = 1; - FS_set_para(*fs2_handle, CREATE_THREAD, &value, sizeof(value)); - FS_set_para(*fs2_handle, METRIS_FORMAT, &format, sizeof(format)); - FS_set_para(*fs2_handle, STAT_CYCLE, &value, sizeof(value)); + FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format)); + FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value)); value = 4096; - FS_set_para(*fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); + FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); if (server_ip == NULL) { - FS_set_para(*fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); - }else + FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); + } + else { - FS_set_para(*fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip)); + FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip)); } - FS_set_para(*fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); + FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); value = FS_OUTPUT_STATSD; - FS_set_para(*fs2_handle, STATS_FORMAT, &value, sizeof(value)); + FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value)); } - //pkts and bytes info - fs2_info = &handle->fs2_info[0]; - fs2_handle = &handle->fs2_info[0].fs2_handle; - fs2_info->line_ids = (int *)calloc(2 * handle->thread_sum + 2, sizeof(int)); - fs2_info->column_ids = (int *)calloc(4, sizeof(int)); + fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE]; + fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle; + fs2_info->line_ids = (int *)calloc(2, sizeof(int)); + fs2_info->column_ids = (int *)calloc(6, sizeof(int)); //line info - snprintf(buff, sizeof(buff), "rx_pkts"); - fs2_info->column_ids[0] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "rx_bytes"); - fs2_info->column_ids[1] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "tx_pkts"); - fs2_info->column_ids[2] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "tx_bytes"); - fs2_info->column_ids[3] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - for (i = 0; i < (int)handle->thread_sum; i++) + const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_b", "cache_bytes"}; + for (i = 0; i < sizeof(data_col) / sizeof(const char *); i++) { - snprintf(buff, sizeof(buff), "total(%d)", i); - fs2_info->line_ids[2 * i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "rate(%d)", i); - fs2_info->line_ids[2 * i + 1] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, data_col[i]); } - snprintf(buff, sizeof(buff), "total"); - fs2_info->line_ids[2 * handle->thread_sum] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "rate"); - fs2_info->line_ids[2 * handle->thread_sum + 1] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "current"); + fs2_info->line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "total"); - handle->fs2_status = HOS_FS2_START; + hos_func->fs2_status = HOS_FS2_START; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); fs2_info->reserved = (void *)data_info; - data_info->tx_pkts = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->tx_bytes = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->rx_pkts = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->rx_bytes = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->tx_pkts_last = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->tx_bytes_last = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->rx_pkts_last = (int *)calloc(handle->thread_sum, sizeof(int)); - data_info->rx_bytes_last = (int *)calloc(handle->thread_sum, sizeof(int)); - FS_start(*fs2_handle); - - //fd info - fs2_info = &handle->fs2_info[1]; - fs2_handle = &handle->fs2_info[1].fs2_handle; - fs2_info->line_ids = (int *)calloc(handle->thread_sum, sizeof(int)); - fs2_info->column_ids = (int *)calloc(3, sizeof(int)); - - snprintf(buff, sizeof(buff), "REGISTER"); - fs2_info->column_ids[0] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "INJECT"); - fs2_info->column_ids[1] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "FREE"); - fs2_info->column_ids[2] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - - for (i = 0; i < (int)handle->thread_sum; i++) - { - snprintf(buff, sizeof(buff), "num(%d)", i); - fs2_info->line_ids[i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); - } - FS_start(*fs2_handle); - - //cache info - fs2_info = &handle->fs2_info[2]; - fs2_handle = &handle->fs2_info[2].fs2_handle; - fs2_info->line_ids = (int *)calloc(handle->thread_sum + 1, sizeof(int)); - fs2_info->column_ids = (int *)calloc(1, sizeof(int)); - - snprintf(buff, sizeof(buff), "cached"); - fs2_info->column_ids[0] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - - for (i = 0; i < (int)handle->thread_sum; i++) - { - snprintf(buff, sizeof(buff), "Bytes(%d)", i); - fs2_info->line_ids[i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); - } - snprintf(buff, sizeof(buff), "total"); - fs2_info->line_ids[i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); - FS_start(*fs2_handle); + #if 0 + data_info->tx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->tx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->rx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->rx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->tx_failed_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->tx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->tx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->rx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->rx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + data_info->tx_failed_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + #endif + //FS_start(hos_func->fs2_info[0].fs2_handle); + FS_start(fs2_handle); //PoolThread state /******************************************************* * PoolSize Busy TopBusy AveBusy * ThreadNum 1000 500 800 650 ********************************************************/ - fs2_info = &handle->fs2_info[3]; - fs2_handle = &handle->fs2_info[3].fs2_handle; + fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; + fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle; fs2_info->line_ids = (int *)calloc(1, sizeof(int)); - fs2_info->column_ids = (int *)calloc(4, sizeof(int)); + fs2_info->column_ids = (int *)calloc(3, sizeof(int)); - const char *poolthread_col[4] = {"PoolSize", "Busy", "TopBusy", "AveBusy"}; - for (i = 0; i < 4; i++) + const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"}; + for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++) { - fs2_info->column_ids[i] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); + fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); } - fs2_info->line_ids[0] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); + fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); - FS_start(*fs2_handle); + FS_start(fs2_handle); - pthread_create(&handle->fs2_thread, NULL, fs2_statistics, handle); + pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL); return ; } -bool hos_verify_bucket(hos_client_handle handle, const char *bucket) +static bool hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, + size_t thread_id, size_t fd, const char *bucket, const char *object) { - Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + char buf[128]; + int ret = 0; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + data_info_t *data_info = NULL; + //设置回调函数 + std::shared_ptr context = + Aws::MakeShared(""); + sprintf(buf, "%lu %lu", thread_id, fd); + context->SetUUID(buf); - if (outcome.IsSuccess()) + Aws::S3::S3Client& S3Client = *(g_hos_handle.S3Client); + ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + if (ret) { - handle->buckets = outcome.GetResult().GetBuckets(); + //不算真正成功,需要等到PutObjectAsyncFinished的结果 + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "PutObjectAsync success. [%s:%s]", bucket, object); + } + else + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "PutObjectAsync failed. [%s:%s]", bucket, object); - for (Aws::S3::Model::Bucket& new_bucket : handle->buckets) + if (hos_func->fs2_info[0].fs2_handle) { - if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) + if (hos_func->fs2_info[0].reserved) { - return true; + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + atomic_add(&(data_info->tx_failed_pkts), 1); + atomic_add(&(data_info->tx_failed_bytes), stream_len); } } } - return false; + + return ret; } -int hos_create_bucket(hos_client_handle handle, const char *bucket) +static bool hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, size_t fd, + const char *bucket, const char *object) { - if ((bucket == NULL) || (handle == NULL)) + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + data_info_t *data_info = NULL; + + Aws::S3::S3Client& S3Client = *(g_hos_handle.S3Client); + Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request); + if (Outcome.IsSuccess()) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "PutObject success. [%s:%s]", bucket, object); + + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + atomic_add(&(data_info->tx_pkts), 1); + atomic_add(&(data_info->tx_bytes), stream_len); + } + + return true; + } + else + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "PutObject failed. [%s:%s]", bucket, object); + + if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + { + data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + atomic_add(&(data_info->tx_failed_pkts), 1); + atomic_add(&(data_info->tx_failed_bytes), stream_len); + } + + return false; + } +} + +hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) +{ + hos_config_t *hos_conf = &g_hos_handle.hos_config; + char hos_url[1024]; + + if (conf_path == NULL || thread_num == 0) + { + g_hos_instance.result = false; + g_hos_instance.error_code = HOS_PARAMETER_ERROR; + snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "param error:conf_path:%s, thread_num:%lu", conf_path, thread_num); + return &g_hos_instance; + } + + MESA_load_profile_string_nodef(conf_path, module, "hos_serverip", hos_conf->ip, MAX_HOS_STRING_LEN); + MESA_load_profile_uint_nodef(conf_path, module, "hos_serverport", &hos_conf->port); + MESA_load_profile_string_nodef(conf_path, module, "hos_accesskeyid", hos_conf->accesskeyid, MAX_HOS_STRING_LEN); + MESA_load_profile_string_nodef(conf_path, module, "hos_secretkey", hos_conf->secretkey, MAX_HOS_STRING_LEN); + MESA_load_profile_string_def(conf_path, module, "hos_log_path", hos_conf->log_path, MAX_HOS_STRING_LEN, HOS_LOG_PATH); + MESA_load_profile_uint_def(conf_path, module, "hos_log_level", &hos_conf->log_level, 30); + MESA_load_profile_uint_def(conf_path, module, "hos_poolsize", &hos_conf->pool_thread_size, 0); + MESA_load_profile_uint_def(conf_path, module, "hos_thread_sum", &hos_conf->thread_num, 32); + MESA_load_profile_uint_def(conf_path, module, "hos_cache_size", &hos_conf->cache_size, 102400); + MESA_load_profile_uint_def(conf_path, module, "hos_cache_count", &hos_conf->cache_count, 10); + MESA_load_profile_uint_def(conf_path, module, "hos_fd_live_time_ms", &hos_conf->timeout, 1000); + MESA_load_profile_string_nodef(conf_path, module, "hos_fs2_serverip", hos_conf->fs2_ip, MAX_HOS_STRING_LEN); + MESA_load_profile_uint_nodef(conf_path, module, "hos_fs2_serverport", &hos_conf->fs2_port); + MESA_load_profile_string_def(conf_path, module, "hos_fs2_path", hos_conf->fs2_path, sizeof(hos_conf->fs2_path), "./hos_fs2.stat"); + MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0); + if (hos_conf->ip && hos_conf->port && hos_conf->accesskeyid && hos_conf->secretkey) + { + g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); + if (log == NULL) + { + g_hos_instance.result = false; + g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; + snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed."); + return &g_hos_instance; + } + + snprintf(hos_url, sizeof(hos_url), "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); + hos_client_create(); + if (g_hos_instance.result == true) + { + if(hos_verify_bucket(bucket) == false) + { + g_hos_instance.result = false; + g_hos_instance.error_code = HOS_BUCKET_NOT_EXIST; + snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed."); + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "bucket:%s not exist.", bucket); + hos_shutdown_instance(); + return &g_hos_instance; + } + g_hos_instance.hos_url_prefix = (const char *)calloc(1, strlen(hos_url) + 1); + memcpy((void *)g_hos_instance.hos_url_prefix, hos_url, strlen(hos_url)); + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Instance init completed"); + if (hos_conf->fs2_ip && hos_conf->fs2_port) + { + hos_expand_fs2(hos_conf->fs2_path, hos_conf->fs2_fmt, hos_conf->fs2_ip, hos_conf->fs2_port); + } + else + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "hos fs2 function not starup"); + } + } + return &g_hos_instance; + } + else + { + g_hos_instance.result = false; + g_hos_instance.error_code = HOS_CONF_ERROR; + snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", + hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); + MESA_destroy_runtime_log_handle(g_hos_handle.log); + return &g_hos_instance; + } +} + +int hos_create_bucket(const char *bucket) +{ + if ((bucket == NULL) || (g_hos_handle.S3Client == NULL)) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_create_bucket", + "error:bucket:%s, s3client:%s", bucket, g_hos_handle.S3Client?"not null":"null"); return HOS_PARAMETER_ERROR; } - Aws::S3::S3Client& S3Client = *handle->S3Client; + Aws::S3::S3Client& S3Client = *g_hos_handle.S3Client; /* 本地检查是否已经存在该bucket */ - for (Aws::S3::Model::Bucket& new_bucket : handle->buckets) + for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets) { if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "%s was exits", bucket); return HOS_CLIENT_OK; } } @@ -608,27 +680,35 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket) Aws::S3::S3Errors errorcode = createBucketOutcome.GetError().GetErrorType(); if (errorcode != Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__,"error: %s create failed. %s", + bucket, createBucketOutcome.GetError().GetMessage().c_str()); return (int)errorcode + 1; } } + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "%s create successful", bucket); return HOS_CLIENT_OK; } -static int hos_upload_stream(hos_client_handle handle, const char *bucket, const char *object, - const char *data, size_t data_len, put_finished_callback callback, void *userdata, size_t thread_id, int file_type) +static int hos_upload_stream(const char *bucket, const char *object, const char *data, size_t data_len, + put_finished_callback callback, void *userdata, size_t thread_id) { - struct stat buffer; char buf[128]; - size_t stream_len = 0; data_info_t *data_info = NULL; + hos_config_t *hos_conf = &g_hos_handle.hos_config; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; int ret; + int mode = 0; - if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum)) + if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", + "s3client:%s, bucket:%s, object:%s, thread_id:%d, thread_num:%d", + g_hos_handle.S3Client?"not null":"null", bucket, object, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } - Aws::S3::S3Client& S3Client = *handle->S3Client; + + mode = data?1:0; // 1, file mode; 0 buf mode // Create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -636,37 +716,30 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const request.SetKey(object); //设置上传数据类型 - if (file_type == 0) + if (mode == 0) { - if (stat(data, &buffer) == -1) - { - return HOS_FILE_NOT_EXITS; - } - - stream_len = buffer.st_size; //文件类型 const std::shared_ptr input_data = - Aws::MakeShared("SampleAllocationTag", object, std::ios_base::in | std::ios_base::binary); + Aws::MakeShared("hos_upload_file_tag", object, std::ios_base::in | std::ios_base::binary); request.SetBody(input_data); } else { //内存块 - stream_len = data_len; const std::shared_ptr input_data = - Aws::MakeShared(data); + Aws::MakeShared("hos_upload_buf_tag"); Aws::String stream (data, data_len); *input_data << stream; request.SetBody(input_data); } //field_stat2 record - if (handle->fs2_info[0].fs2_handle) + if (hos_func->fs2_info[0].fs2_handle) { - if (handle->fs2_info[0].reserved) + if (hos_func->fs2_info[0].reserved) { - data_info = (data_info_t *)handle->fs2_info[0].reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += stream_len; + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + atomic_add(&(data_info->rx_pkts), 1); + atomic_add(&(data_info->rx_bytes), data_len); } } @@ -677,49 +750,72 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; - add_hos_info(&hash_hos_info[thread_id], &info); + hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; + add_fd_context(&fd_context[thread_id], &info); - ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); - if (ret) + if (hos_conf->pool_thread_size > 0) + { + ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object); + } + else + { + ret = hos_putobject_sync(request, data_len, thread_id, fd, bucket, object); + } + + if (ret == true) { - //field_stat2 record - if (handle->fs2_info[0].fs2_handle) - { - if (handle->fs2_info[0].reserved) - { - data_info = (data_info_t *)handle->fs2_info[0].reserved; - data_info->tx_pkts[thread_id]++; - data_info->tx_bytes[thread_id] += stream_len; - } - } return HOS_CLIENT_OK; } - return HOS_SEND_FAILED; + else + { + return HOS_SEND_FAILED; + } } -int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, - put_finished_callback callback, void *userdata, size_t thread_id) +int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata, size_t thread_id) { - return hos_upload_stream(handle, bucket, file_path, NULL, 0, callback, userdata, thread_id, 0); + struct stat buffer; + + if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (file_path == NULL) || (thread_id > g_hos_handle.hos_config.thread_num)) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", + "s3client:%s, bucket:%s, file_path:%s, thread_id:%d, thread_num:%d", + g_hos_handle.S3Client?"not null":"null", bucket, file_path, thread_id, g_hos_handle.hos_config.thread_num); + return HOS_PARAMETER_ERROR; + } + + if (stat(file_path, &buffer) == -1) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", "The file:%s not exist", file_path); + return HOS_FILE_NOT_EXIST; + } + return hos_upload_stream(bucket, file_path, NULL, buffer.st_size, callback, userdata, thread_id); } -int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, - const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) +int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) { - return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); + if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (object == NULL) + || (buf == NULL) || (buf_len == 0) + || (thread_id > g_hos_handle.hos_config.thread_num)) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_buf", + "s3client:%s, bucket:%s, object:%s, buf:%s, buf_len:%d, thread_id:%d, thread_num:%d", + g_hos_handle.S3Client?"not null":"null", bucket, object, + buf?"not null":"null", buf_len, thread_id, g_hos_handle.hos_config.thread_num); + return HOS_PARAMETER_ERROR; + } + return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); } static void *hos_fd_manage(void *ptr) { - hos_info_t *hos_info; - hos_client_handle handle = (hos_client_handle)ptr; - size_t thread_sum = handle->thread_sum; + hos_fd_context_t *a_fd_context; + size_t thread_sum = g_hos_handle.hos_config.thread_num; size_t thread_num; size_t fd; while(1) { - if (handle->fd_thread_status) + if (g_hos_handle.hos_func.fd_thread_status) break; for (thread_num = 0; thread_num < thread_sum; thread_num++) { @@ -727,13 +823,25 @@ static void *hos_fd_manage(void *ptr) { if (!fd_info[thread_num][fd]) continue; - hos_info = find_info_by_fd(hash_hos_info[thread_num], fd); - if (!hos_info) + a_fd_context = find_context_by_fd(fd_context[thread_num], fd); + if (!a_fd_context) continue; - if (hos_info->fd_status == HOS_FD_INJECT) + if (a_fd_context->fd_status == HOS_FD_INJECT) { - if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) + if (a_fd_context->position == a_fd_context->recive_cnt) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "[%s:%s] upload completed. [thread:%d fd:%d] delete", + a_fd_context->bucket, a_fd_context->object, thread_num, fd); hos_delete_fd(fd, thread_num); + } + else if (a_fd_context->overtime <= get_current_ms()) + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, + "[%s:%s] upload not completed, but the live-time of [thread_id:%d fd:%d] is over.", + a_fd_context->bucket, a_fd_context->object, thread_num, fd); + hos_delete_fd(fd, thread_num); + } } } } @@ -742,230 +850,230 @@ static void *hos_fd_manage(void *ptr) pthread_exit(NULL); } -int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) +int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { - if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum) || strlen(bucket) == 0 || strlen(object) == 0) + if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", + "parameter error:s3client:%s, bucket:%s, obejct:%s, thread_id:%s", + g_hos_handle.S3Client, bucket, object, thread_id); return HOS_PARAMETER_ERROR; } size_t fd = hash_get_min_free_fd(thread_id); if (fd == 0) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", + "error:fd not enough, thread_id:%d, fd free: %d, fd register:%d, fd inject:%d", + thread_id, + fd_info[thread_id][HOS_FD_FREE], + fd_info[thread_id][HOS_FD_REGISTER], + fd_info[thread_id][HOS_FD_INJECT]); + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "thread_id:%d, fd:%d", thread_id, fd); return HOS_FD_NOT_ENOUGH; } + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "thread_id:%d, fd:%d", thread_id, fd); - hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, - NULL,/*cache*/ handle->cache_count, 0,/*position*/ 0,/*recive_cnt*/(long)handle->cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ 0,/*overtime*/ handle->timeout,}; - add_hos_info(&hash_hos_info[thread_id], &info); -#if 1 + hos_fd_context_t info = {fd, mode, (char *)bucket, (char *)object, (void *)callback, userdata, + NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ + (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ + 0,/*overtime*/ g_hos_handle.hos_config.timeout,}; + add_fd_context(&fd_context[thread_id], &info); + { std::lock_guard locker(m_client_lock); - if (handle->fd_thread == 0) + if (g_hos_handle.hos_func.fd_thread == 0) { - handle->fd_thread_status = 0; - pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle); + g_hos_handle.hos_func.fd_thread_status = 0; + pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); } } -#endif + return fd; } int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) { struct stat buffer; - hos_info_t *hos_info = NULL; - hos_client_handle handle = NULL; + hos_fd_context_t *a_fd_context = NULL; char num[128]; - char buf[128]; int ret = 0; data_info_t *data_info = NULL; + hos_config_t *hos_conf = &g_hos_handle.hos_config; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + size_t upload_len = 0; - if ((fd == 0) || (stream == NULL) || (thread_id > g_hos_handle->thread_sum)) + if ((fd < 3) || (stream == NULL) || (thread_id > hos_conf->thread_num)) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, + "hos_write", "error: fd:%d, stream:%s, stream_len:%s, thread_id:%d.", + fd, stream?"not null":"null", stream_len, thread_id); return HOS_PARAMETER_ERROR; } if (fd_info[thread_id][fd]) { - hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + a_fd_context = find_context_by_fd(fd_context[thread_id], fd); } - if (hos_info == NULL) + if (a_fd_context == NULL) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "fd info not find. thread_id:%d, fd:%d", thread_id, fd); return HOS_HASH_NOT_FIND; } - - handle = (hos_client_handle)hos_info->handle; + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Get fd_context, thread_id:%d, fd:%d", thread_id, fd); //field_stat2 record - if (handle->fs2_info[0].fs2_handle) + if (hos_func->fs2_info[0].fs2_handle) { - if (handle->fs2_info[0].reserved) + if (hos_func->fs2_info[0].reserved) { - data_info = (data_info_t *)handle->fs2_info[0].reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += stream_len; + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + atomic_add(&(data_info->rx_pkts), 1); + atomic_add(&(data_info->rx_bytes), stream_len); } } - Aws::S3::S3Client& S3Client = *(handle->S3Client); - // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; //设置上传数据类型 - if (hos_info->mode & BUFF_MODE) + if (a_fd_context->mode & BUFF_MODE) { //BUFF_MODE - if (hos_info->mode & APPEND_MODE) + if (a_fd_context->mode & APPEND_MODE) { //APPEND_MODE - if (hos_info->cache == NULL) + if (a_fd_context->cache == NULL) { - hos_info->cache = Aws::MakeShared("append mode"); + a_fd_context->cache = Aws::MakeShared("hos_write append mode"); } - if (hos_info->cache_count == 0) + Aws::String buffer(stream, stream_len); + *a_fd_context->cache << buffer; + a_fd_context->cache_rest -= stream_len; + atomic_add(&(data_info->cache), stream_len); + if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) { - //不设置cache_count的情况下 - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; - hos_info->cache_rest -= stream_len; - hos_cache[thread_id] += stream_len; - if (hos_info->cache_rest > 0) + //cache_count == 0,不设置cache_count的情况 + //cache_count > 0,设置cache_count的情况 + if (a_fd_context->cache_rest > 0) { return HOS_CLIENT_OK; } - }else - { - // cache - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; - hos_info->cache_rest -= stream_len; - hos_cache[thread_id] += stream_len; - //设置cache times的情况下 - if (--hos_info->cache_count) - { - if (hos_info->cache_rest > 0) - { - return HOS_CLIENT_OK; - } - } } - request.SetBody(hos_info->cache); + request.SetBody(a_fd_context->cache); // add headers - snprintf(num, 128, "%lu", ++hos_info->position); + atomic_add(&(a_fd_context->position), 1); + snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); Aws::Map headers; - if (hos_info->mode & APPEND_MODE) - { - headers["x-hos-upload-type"] = "append"; - headers["x-hos-position"] = num; - request.SetMetadata(headers); - } - }else + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + + upload_len = a_fd_context->cache->gcount(); + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "x-hos-posotion:%s", num); + } + else { const std::shared_ptr input_data = - Aws::MakeShared("buffer mode"); + Aws::MakeShared("hos_write buffer mode"); Aws::String buffer (stream, stream_len); *input_data << buffer; request.SetBody(input_data); + upload_len = stream_len; } } else { if (stat(stream, &buffer) == -1) { - return HOS_FILE_NOT_EXITS; + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "The file:%s not exist", stream); + return HOS_FILE_NOT_EXIST; } //文件类型 const std::shared_ptr input_data = - Aws::MakeShared("SampleAllocationTag", hos_info->object, std::ios_base::in | std::ios_base::binary); + Aws::MakeShared("hos_write file mode", a_fd_context->object, std::ios_base::in | std::ios_base::binary); request.SetBody(input_data); + upload_len = buffer.st_size; } - request.SetBucket(hos_info->bucket); - request.SetKey(hos_info->object); + request.SetBucket(a_fd_context->bucket); + request.SetKey(a_fd_context->object); - //设置回调函数 - std::shared_ptr context = - Aws::MakeShared(""); - sprintf(buf, "%lu %lu", thread_id, fd); - context->SetUUID(buf); + if (hos_conf->pool_thread_size > 0) + { + ret = hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + } + else + { + ret = hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); + } - ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); - - hos_cache[thread_id] = 0; //恢复fd 的cache设置 - if (hos_info->mode & APPEND_MODE) + if (a_fd_context->mode & APPEND_MODE) { - hos_info->cache.reset(); - hos_info->cache = NULL; - hos_info->cache_rest = hos_info->handle->cache_size; - hos_info->cache_count = hos_info->handle->cache_count; + atomic_sub(&(data_info->cache), a_fd_context->cache->gcount()); + a_fd_context->cache.reset(); + a_fd_context->cache = NULL; + a_fd_context->cache_rest = hos_conf->cache_size; + a_fd_context->cache_count = hos_conf->cache_count; } - if (ret) + + if (ret == true) { - if (data_info) - { - data_info->tx_pkts[thread_id]++; - if (hos_info->mode & BUFF_MODE) - { - if (hos_info->mode & APPEND_MODE) - { - data_info->tx_bytes[thread_id] += handle->cache_size; - }else - { - data_info->tx_bytes[thread_id] += stream_len; - } - }else - { - data_info->tx_bytes[thread_id] += buffer.st_size; - } - } - }else + return HOS_CLIENT_OK; + } + else { return HOS_SEND_FAILED; } - - return HOS_CLIENT_OK; } int hos_close_fd(size_t fd, size_t thread_id) { - hos_info_t *hos_info = NULL; + hos_fd_context_t *a_fd_context = NULL; char num[128]; char buf[128]; + int ret = 0; data_info_t *data_info = NULL; + hos_config_t *hos_conf = &g_hos_handle.hos_config; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; - if (fd < 3) + if (fd < 3 || thread_id > hos_conf->thread_num) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", + "error:fd:%d, thread_id:%d, thread_sum:%d.", + fd, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } if (fd_info[thread_id][fd]) { - hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + a_fd_context = find_context_by_fd(fd_context[thread_id], fd); } - if (hos_info == NULL) + if (a_fd_context == NULL) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, + "hos_close_fd", "not find the a_fd_context of [fd:%d thread:%d]", + fd, thread_id); return HOS_CLIENT_OK; } //close fd 之前发送append的缓存中内容 - if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE)) + if ((a_fd_context->mode & BUFF_MODE) && (a_fd_context->mode & APPEND_MODE)) { - if (hos_info->cache_rest != (long)hos_info->handle->cache_size) + if (a_fd_context->cache_rest != (long)hos_conf->cache_size && a_fd_context->cache != NULL) { - //handle = (hos_client_handle)hos_info->handle; - Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client); + Aws::S3::S3Client& S3Client = *(g_hos_handle.S3Client); // Create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - request.SetBucket(hos_info->bucket); - request.SetKey(hos_info->object); - request.SetBody(hos_info->cache); + request.SetBucket(a_fd_context->bucket); + request.SetKey(a_fd_context->object); + request.SetBody(a_fd_context->cache); // add headers - snprintf(num, 128, "%lu", ++hos_info->position); + atomic_add(&(a_fd_context->position), 1); + snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); Aws::Map headers; headers["x-hos-upload-type"] = "append"; headers["x-hos-position"] = num; @@ -976,22 +1084,46 @@ int hos_close_fd(size_t fd, size_t thread_id) sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); - - if (hos_info->handle->fs2_info[0].fs2_handle) + ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + if (ret) { - if (hos_info->handle->fs2_info[0].reserved) - data_info = (data_info_t *)hos_info->handle->fs2_info[0].reserved; + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "PutObjectAsync success."); + if (hos_func->fs2_info[0].fs2_handle) + { + if (hos_func->fs2_info[0].reserved) + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; - data_info->tx_pkts[thread_id]++; - data_info->tx_bytes[thread_id] += hos_info->handle->cache_size - hos_info->cache_rest; + if (data_info) + { + atomic_add(&(data_info->tx_pkts), 1); + atomic_add(&(data_info->tx_bytes), a_fd_context->cache->gcount()); + } + } } - hos_cache[thread_id] = 0; + else + { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "PutObjectAsync failed"); + if (hos_func->fs2_info[0].fs2_handle) + { + if (hos_func->fs2_info[0].reserved) + data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + + if (data_info) + { + atomic_add(&(data_info->tx_failed_pkts), 1); + atomic_add(&(data_info->tx_failed_bytes), a_fd_context->cache->gcount()); + } + } + } + atomic_sub(&(data_info->cache), a_fd_context->cache->gcount()); } } - hos_info->fd_status = HOS_FD_INJECT; - hos_info->cache.reset(); - hos_info->overtime = get_current_ms() + hos_info->timeout; + a_fd_context->fd_status = HOS_FD_INJECT; + a_fd_context->cache.reset(); + a_fd_context->cache = NULL; + a_fd_context->overtime = get_current_ms() + a_fd_context->timeout; + a_fd_context->cache_rest = hos_conf->cache_size; + a_fd_context->cache_count = hos_conf->cache_count; fd_info[thread_id][HOS_FD_REGISTER]--; fd_info[thread_id][HOS_FD_INJECT]++; @@ -999,40 +1131,46 @@ int hos_close_fd(size_t fd, size_t thread_id) return HOS_CLIENT_OK; } -int hos_client_destory(hos_client_handle handle) +int hos_shutdown_instance() { std::lock_guard locker(m_client_lock); size_t i = 0; - if (handle == NULL) - { - return HOS_PARAMETER_ERROR; - } + hos_config_t *hos_conf = &g_hos_handle.hos_config; + hos_func_thread_t *hos_func = &g_hos_handle.hos_func; - if (--handle->count) + if (g_hos_handle.S3Client == NULL) { + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "There is no hos client."); return HOS_CLIENT_OK; } - Aws::Vector().swap(handle->buckets); - - if (handle->fd_thread) + if (g_hos_handle.count > 0 && --g_hos_handle.count) { - handle->fd_thread_status = 1; - pthread_join(handle->fd_thread, NULL); + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "hos client count:%d.", g_hos_handle.count); + return HOS_CLIENT_OK; } - if (handle->fs2_thread) + + Aws::Vector().swap(g_hos_handle.buckets); + + if (hos_func->fd_thread) { - handle->fs2_status = HOS_FS2_STOP; - pthread_join(handle->fs2_thread, NULL); + hos_func->fd_thread_status = 1; + pthread_join(hos_func->fd_thread, NULL); + } + if (hos_func->fs2_thread) + { + hos_func->fs2_status = HOS_FS2_STOP; + pthread_join(hos_func->fs2_thread, NULL); for (i = 0; i < FS2_RECORD_EVENTS; i++) { - screen_stat_handle_t *fs2_handle = &handle->fs2_info[i].fs2_handle; + screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle; FS_stop(fs2_handle); - if (handle->fs2_info[i].reserved) + if (hos_func->fs2_info[i].reserved) { + #if 0 if (i == 0) { - data_info_t * data_info = (data_info_t *)handle->fs2_info[i].reserved; + data_info_t * data_info = (data_info_t *)hos_func->fs2_info[i].reserved; if (data_info->rx_pkts) free(data_info->rx_pkts); if (data_info->rx_bytes) @@ -1050,40 +1188,35 @@ int hos_client_destory(hos_client_handle handle) if (data_info->tx_bytes_last) free(data_info->tx_bytes_last); } - free(handle->fs2_info[i].reserved); + #endif + free(hos_func->fs2_info[i].reserved); } - if (handle->fs2_info[i].line_ids) - free(handle->fs2_info[i].line_ids); - if (handle->fs2_info[i].column_ids) - free(handle->fs2_info[i].column_ids); + if (hos_func->fs2_info[i].line_ids) + free(hos_func->fs2_info[i].line_ids); + if (hos_func->fs2_info[i].column_ids) + free(hos_func->fs2_info[i].column_ids); } } - delete handle->S3Client; - - if (hos_cache) - { - free(hos_cache); - } + delete g_hos_handle.S3Client; + g_hos_handle.S3Client = NULL; + MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "delete s3client."); if (fd_info) { free(fd_info); } - for (i = 0; i < handle->thread_sum; i++) + for (i = 0; i < hos_conf->thread_num; i++) { - delete_all(&hash_hos_info[i]); + delete_all(&fd_context[i]); } - if (hash_hos_info) + if (fd_context) { - free(hash_hos_info); + free(fd_context); } - free(handle); - g_hos_handle = NULL; - Aws::ShutdownAPI(g_options); return HOS_CLIENT_OK; diff --git a/src/hos_client.h b/src/hos_client.h index 710ddf4d..657a51c2 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -1,13 +1,18 @@ /************************************************************************* - > File Name: hos_client_api.h + > File Name: hos_client.h > Author: pxz > Created Time: Thu 10 Sep 2020 03:13:59 PM CST ************************************************************************/ #ifndef __HOS_CLIENT_INIT__ #define __HOS_CLIENT_INIT__ -/*hos client 句柄*/ -typedef struct hos_client_handle_s* hos_client_handle; +/*hos instance */ +typedef struct hos_instance_s{ + bool result; + int error_code; + char error_message[1024]; + const char *hos_url_prefix; +}* hos_instance; #define HOS_CLIENT_OK 0 @@ -20,10 +25,13 @@ typedef struct hos_client_handle_s* hos_client_handle; enum hoserrors { HOS_PARAMETER_ERROR = -1, - HOS_FILE_NOT_EXITS = -2, + HOS_FILE_NOT_EXIST = -2, HOS_HASH_NOT_FIND = -3, HOS_FD_NOT_ENOUGH = -4, HOS_SEND_FAILED = -5, + HOS_RUNTIME_LOG_FAILED = -6, + HOS_CONF_ERROR = -7, + HOS_BUCKET_NOT_EXIST = -8, }; @@ -75,61 +83,42 @@ enum s3errors typedef void (*put_finished_callback)(bool, const char *, const char *, const char *, void *); -/************************************************************************************* - * 函数名: hos_client_handle +/*//FIXME 改为static,不再对外提供 +************************************************************************************* + * 函数名: hos_instance * 参数: const char *serverip 目的地址,如"192.168.44.12" * size_t port 端口号 - * const char *accesskeyid AWS access key ID,如”default“ - * const char *secretkey AWS secret key,如”default“ + * const char *accesskeyid AWS access key ID,如"default" + * const char *secretkey AWS secret key,如"default" + * siez_t pool_size 线程池大小 * size_t thread_sum 线程总数 - * 返回值: 成功返回一个非空句柄,失败返回NULL。 -*************************************************************************************/ -hos_client_handle hos_client_create(const char *serverip, size_t port, const char *accesskeyid, const char *secretkey, size_t pool_size); + * 返回值: 成功返回一个实例,失败返回NULL。 +************************************************************************************* +hos_instance hos_client_create(const char *serverip, size_t port, const char *accesskeyid, const char *secretkey, size_t pool_size); +*/ + /************************************************************************************* - * 函数名: hos_get_error_msg - * 返回值: hos_client_create创建失败的原因 + * 函数名: hos_init_instance + * 输入参数: conf_path 配置文件路径 + * thread_num 线程数 + * 返回值: hos 实例创建结果 *************************************************************************************/ -char *hos_get_error_msg(); -/************************************************************************************* - * 函数名: hos_get_error_num - * 返回值: hos_client_create创建失败的错误码 -*************************************************************************************/ -size_t hos_get_error_num(); +hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket); /************************************************************************************* * 函数名: hos_create_bucket - * 参数: hos_client_handle handle 非空句柄 - * const char * bucket 桶名称 - * 返回值: int 成功返回0,S3错误返回s3errors错误码,hos client错误返回hoserrors错误码 + * 参数: const char *bucket 桶名称 + * 返回值: bool 成功返回true,失败返回false *************************************************************************************/ -bool hos_verify_bucket(hos_client_handle handle, const char *bucket); +//bool hos_verify_bucket(const char *bucket); /************************************************************************************* * 函数名: hos_create_bucket - * 参数: hos_client_handle handle 非空句柄 - * const char * bucket 桶名称 - * 返回值: int 成功返回0,S3错误返回s3errors错误码,hos client错误返回hoserrors错误码 + * 参数: const char * bucket 桶名称 + * 返回值: int 成功返回0,S3错误返回s3errors错误码,hos client错误返回hoserrors错误码 *************************************************************************************/ -int hos_create_bucket(hos_client_handle handle, const char *bucket); +int hos_create_bucket(hos_instance instance, const char *bucket); /************************************************************************************* - * 函数名: hos_set_cache_size - * 参数: hos_client_handle handle 非空句柄 - * size_t cache_size append 模式每次追加的buffer大小 -*************************************************************************************/ -void hos_set_cache_size(hos_client_handle handle, size_t cache_size); -/************************************************************************************* - * 函数名: hos_set_cache_count - * 参数: hos_client_handle handle 非空句柄 - * size_t cache_count append 模式追加次数 -*************************************************************************************/ -void hos_set_cache_count(hos_client_handle handle, size_t cache_count); -/************************************************************************************* - * 函数名: hos_set_thread_sum - * 参数: hos_client_handle handle 非空句柄 - * size_t thread_sum append 模式追加次数 -*************************************************************************************/ -void hos_set_thread_sum(hos_client_handle handle, size_t thread_sum); -/************************************************************************************* - * 函数名: hos_upload_async - * 参数: hos_client_handle handle 非空句柄 + * 函数名: hos_upload_file + * 参数: hos_instance instance 非空句柄 * const char * bucket 桶名称 * const char * file_path 上传对象路径 * put_finished_callback callback upload操作结束时调用的回调函数 @@ -137,10 +126,10 @@ void hos_set_thread_sum(hos_client_handle handle, size_t thread_sum); * size_t thread_id 当前线程id * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ -int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, put_finished_callback callback, void* userdata, size_t thread_id); +int hos_upload_file(hos_instance instance, const char *bucket, const char *file_path, put_finished_callback callback, void* userdata, size_t thread_id); /************************************************************************************* - * 函数名: hos_upload_async - * 参数: hos_client_handle handle 非空句柄 + * 函数名: hos_upload_buf + * 参数: hos_instance instance 非空句柄 * const char * bucket 桶名称 * const char * object 上传对象名称 * const char *buf 上传的buf @@ -150,38 +139,28 @@ int hos_upload_file(hos_client_handle handle, const char *bucket, const char *fi * size_t thread_id 当前线程id * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ -int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id); +int hos_upload_buf(hos_instance instance, const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id); /************************************************************************************* * 函数名: hos_open_fd - * 参数: hos_client_handle handle 非空句柄 - * const char * bucket 桶名称 + * 参数: const char * bucket 桶名称 * const char * object 上传对象名称 * put_finished_callback callback upload操作结束时调用的回调函数 - * void *data 用户自定义数据 + * void *data 用户回调函数自定义数据 * size_t thread_id 线程ID * int mode 模式 (FILE OR BUFFER, APPEND OR NOT) - * 返回值 int 成功返回0,失败返回hoserros错误码 + * 返回值 int 成功返回fd(fd >=3),失败返回hoserros错误码 *************************************************************************************/ -int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode); +int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode); /************************************************************************************* - * 函数名: hos_upload_stream_async - * 参数: hos_client_handle handle 非空句柄 + * 函数名: hos_write + * 参数: size_t fd hos_open_fd返回的fd * const char * stream 待上传的数据 * size_t stream 待上传的数据长度 * size_t thread_id 线程ID * size_t position append模式下的每段内容编号 - * 返回值 int 成功返回0,失败返回hoserros错误码 + * 返回值 int 成功返回0,失败返回hoserror *************************************************************************************/ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id); -/************************************************************************************* - * 函数名: hos_expand_fs2 - * 参数: hos_client_handle handle 非空句柄 - * const char * path log 路径 - * int format 0:default; 1: Json - * char *server_ip 服务IP地址 - * int port 服务端口 -*************************************************************************************/ -void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port); /************************************************************************************* * 函数名: hos_close_fd * 参数: size_t fd fd @@ -190,9 +169,8 @@ void hos_expand_fs2(hos_client_handle handle, const char * path, int format, cha *************************************************************************************/ int hos_close_fd(size_t fd, size_t thread_id); /************************************************************************************* - * 函数名: hos_client_destory - * 参数: hos_client_handle handle 非空句柄 + * 函数名: hos_shutdown_instance * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ -int hos_client_destory(hos_client_handle handle); +int hos_shutdown_instance(); #endif diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index a6c6d51d..fff9b474 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -5,14 +5,14 @@ ************************************************************************/ #include "hos_hash.h" -void add_hos_info(hos_info_t **handle, hos_info_t *input) +void add_fd_context(hos_fd_context_t **handle, hos_fd_context_t *input) { - hos_info_t *value = NULL; + hos_fd_context_t *value = NULL; HASH_FIND_INT(*handle, (int *)&input->fd, value); if (value == NULL) { - value = (hos_info_t *)malloc(sizeof(hos_info_t)); - memcpy(value, input, sizeof(hos_info_t)); + value = (hos_fd_context_t *)malloc(sizeof(hos_fd_context_t)); + memcpy(value, input, sizeof(hos_fd_context_t)); value->object = (char *)calloc(1, strlen(input->object) + 1); value->bucket = (char *)calloc(1, strlen(input->bucket) + 1); memcpy(value->bucket, input->bucket, strlen(input->bucket)); @@ -22,7 +22,6 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) else { value->mode = input->mode; - value->handle = input->handle; if (value->object != NULL) { free(value->object); @@ -50,16 +49,16 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) } } -hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd) +hos_fd_context_t *find_context_by_fd(hos_fd_context_t *handle, size_t fd) { - hos_info_t *value = NULL; + hos_fd_context_t *value = NULL; HASH_FIND_INT(handle, &fd, value); return value; } -void delete_info_by_fd(hos_info_t **handle, size_t fd) +void delete_context_by_fd(hos_fd_context_t **handle, size_t fd) { - hos_info_t *value = NULL; + hos_fd_context_t *value = NULL; HASH_FIND_INT(*handle, &fd, value); if (value) @@ -79,9 +78,9 @@ void delete_info_by_fd(hos_info_t **handle, size_t fd) } } -void delete_all(hos_info_t **handle) +void delete_all(hos_fd_context_t **handle) { - hos_info_t *current, *tmp; + hos_fd_context_t *current, *tmp; HASH_ITER(hh, *handle, current, tmp) { if (current->bucket) diff --git a/src/hos_hash.h b/src/hos_hash.h index 3936b3ec..1b196df3 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -7,14 +7,14 @@ #define __HOS_HASH_H__ #include -#include "hos_client.h" +//#include "hos_client.h" #include "uthash.h" -typedef struct hos_info_s +typedef struct hos_fd_context_s { size_t fd; int mode; - hos_client_handle handle; + //hos_client_handle handle; char *bucket; char *object; void *callback; @@ -28,13 +28,13 @@ typedef struct hos_info_s #define HOS_FD_FREE 0 #define HOS_FD_REGISTER 1 #define HOS_FD_INJECT 2 - size_t overtime; //计算后超时的时间 + size_t overtime; //计算后的时间点,超过即inject fd size_t timeout; //配置的超时时间,从status变成INJECT开始计时 UT_hash_handle hh; -}hos_info_t; +}hos_fd_context_t; -void add_hos_info(hos_info_t **handle, hos_info_t *input); -hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd); -void delete_info_by_fd(hos_info_t **handle, size_t fd); -void delete_all(hos_info_t **handle); +void add_fd_context(hos_fd_context_t **handle, hos_fd_context_t *input); +hos_fd_context_t *find_context_by_fd(hos_fd_context_t *handle, size_t fd); +void delete_context_by_fd(hos_fd_context_t **handle, size_t fd); +void delete_all(hos_fd_context_t **handle); #endif