diff --git a/example/demo/hos_write_demo.cpp b/example/demo/hos_write_demo.cpp index 6cdc1aff..95402a17 100644 --- a/example/demo/hos_write_demo.cpp +++ b/example/demo/hos_write_demo.cpp @@ -96,14 +96,14 @@ int main(int argc, char *argv[]) printf("hos_init_instance start ...\n"); hos_instance = hos_get_instance(); - if (hos_instance->result == false) + if (hos_instance == NULL) { hos_instance = hos_init_instance(conf_file, module_name, 1, bucket); } - if (hos_instance->result == false) + if (hos_instance == NULL) { printf("error:hos_init_instance\n"); - printf("error:%s", hos_instance->error_message); + printf("error:[%d]%s", hos_get_init_instance_errorcode, hos_get_init_instance_errormsg); return -1; } printf("hos_init_instance success ... \n"); diff --git a/example/performance/HosClientPerformance.cpp b/example/performance/HosClientPerformance.cpp index 244c16c0..fbc95790 100644 --- a/example/performance/HosClientPerformance.cpp +++ b/example/performance/HosClientPerformance.cpp @@ -15,7 +15,6 @@ extern "C" #include #include #include -#include } #include"../../src/hos_client.h" #include "MESA_handle_logger.h" @@ -474,7 +473,7 @@ int main(int argc, char *argv[]) hos_instance hos_instance = hos_init_instance(conf_path, module, thread_sum, bucket); if (hos_instance == NULL) { - printf("error:hos_client_handle\n %s\n", hos_instance->error_message); + printf("error:hos_client_handle\n error:[%d]%s\n", hos_get_init_instance_errorcode(), hos_get_init_instance_errormsg()); fclose(log); return -1; } diff --git a/src/hos_client.cpp b/src/hos_client.cpp index cfb9bc4d..36056d3d 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -49,13 +49,8 @@ static std::mutex m_client_lock; static std::mutex m_instance_lock; static std::mutex m_delete_lock; static Aws::SDKOptions g_options; - -static inline size_t get_current_ms() -{ - struct timespec timenow; - clock_gettime(CLOCK_MONOTONIC, &timenow); - return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); -} +Aws::Auth::AWSCredentials g_credentials; +Aws::Client::ClientConfiguration *g_client_config; static int hos_delete_fd(size_t fd, size_t thread_id) { @@ -177,112 +172,47 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, atomic_sub(&g_hos_handle.task_context[thread_id], stream_len); } -static void hos_client_create() +static int hos_attempt_connection() { - hos_config_t *hos_conf = &g_hos_handle.hos_config; - void *log = g_hos_handle.log; - - if (g_hos_handle.S3Client != NULL) - { - g_hos_handle.count++; - g_hos_instance.result = true; - return ; - } - - Aws::InitAPI(g_options); - Aws::Client::ClientConfiguration config; - Aws::Auth::AWSCredentials credentials(hos_conf->accesskeyid, hos_conf->secretkey); - - //初始化 - char endpoint[128]; - snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); - config.endpointOverride = endpoint; - config.verifySSL = false; - config.enableEndpointDiscovery = true; - if (hos_conf->pool_thread_size > 0) - { - //异步模式 - //config.executor = std::shared_ptr(std::make_shared(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 - config.executor = std::shared_ptr(std::make_shared(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 - } - else - { - //同步模式 - } - - #ifndef HOS_MOCK - g_hos_handle.S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - #else - g_hos_handle.S3Client = new Aws::S3::S3ClientMock(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - #endif /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); if (!outcome.IsSuccess()) { - delete g_hos_handle.S3Client; - g_hos_handle.S3Client = NULL; - Aws::ShutdownAPI(g_options); g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); - g_hos_instance.result = false; - MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_FATAL, "hos_client_create", "error: %s", g_hos_instance.error_message); - return; - } + atomic_set(&g_hos_instance.status, INSTANCE_ATTEMPT_STATE); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "[%s] ErrorCode:%d, Error: %s", + g_client_config->endpointOverride.c_str(), g_hos_instance.error_code, g_hos_instance.error_message); + if (g_hos_instance.error_code == NETWORK_CONNECTION) + { + atomic_set(&g_hos_instance.status, INSTANCE_ATTEMPT_STATE); + } + else + { + atomic_set(&g_hos_handle.count, 0);//立即shutdown + hos_shutdown_instance(); + g_hos_instance.status = INSTANCE_UNINIT_STATE; + g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; + snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); + } + return g_hos_instance.error_code; + } g_hos_handle.buckets = outcome.GetResult().GetBuckets(); - g_hos_handle.count++; - g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - - #if 0 - if (g_hos_handle.hos_func.fd_thread == 0) - { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); - } - #endif - - MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint); - g_hos_instance.result = true; + atomic_set(&g_hos_instance.status, INSTANCE_ENABLE_STATE); + return HOS_CLIENT_OK; } -bool hos_verify_bucket(const char *bucket) +//检测hos 是否可用 +static void *hos_attempt_connection_exhaustively(void *ptr) { - if (bucket == NULL) + while(atomic_read(&g_hos_handle.hos_func.hos_client_retry_thread_status) == 0) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: bucket is null"); - return false; - } - if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL) - { - return false; + hos_attempt_connection(); + sleep(g_hos_handle.hos_config.reconnection_time); } - Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); - - if (outcome.IsSuccess()) - { - g_hos_handle.buckets = outcome.GetResult().GetBuckets(); - - for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets) - { - if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket); - return true; - } - else - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: Get bucket list:%s", new_bucket.GetName().c_str()); - } - } - } - else - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str()); - } - return false; + pthread_exit(NULL); } static void *fs2_statistics(void *ptr) @@ -474,6 +404,94 @@ static void hos_expand_fs2() return ; } +static void hos_client_create() +{ + hos_config_t *hos_conf = &g_hos_handle.hos_config; + + Aws::InitAPI(g_options); + g_client_config = new Aws::Client::ClientConfiguration(); + g_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid); + g_credentials.SetAWSSecretKey(hos_conf->secretkey); + + //初始化 + char endpoint[128]; + snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); + g_client_config->endpointOverride.append(endpoint); + g_client_config->verifySSL = false; + g_client_config->enableEndpointDiscovery = true; + if (hos_conf->pool_thread_size > 0) + { + //异步模式 + //config.executor = std::shared_ptr(std::make_shared(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池 + g_client_config->executor = std::shared_ptr(std::make_shared(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池 + } + else + { + //同步模式 + } + + #ifndef HOS_MOCK + g_hos_handle.S3Client = new Aws::S3::S3Client(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + #else + g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + #endif + + atomic_add(&g_hos_handle.count, 1); + g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_INFO, __FUNCTION__, "[%s] debug: hos s3client create success.",g_client_config->endpointOverride.c_str()); + g_hos_instance.hos_url_prefix = g_client_config->endpointOverride.c_str(); + hos_expand_fs2(); + + //hos 检测服务端是否可以连接上 + hos_attempt_connection(); + + if (g_hos_handle.hos_func.hos_client_retry_thread_id == 0) + { + g_hos_handle.hos_func.hos_client_retry_thread_status = 0; + pthread_create(&g_hos_handle.hos_func.hos_client_retry_thread_id, NULL, hos_attempt_connection_exhaustively, NULL); + } +} + +bool hos_verify_bucket(const char *bucket) +{ + if (bucket == NULL) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: bucket is null"); + return false; + } + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) + { + return false; + } + Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); + + if (outcome.IsSuccess()) + { + g_hos_handle.buckets = outcome.GetResult().GetBuckets(); + + for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets) + { + if (strcmp(new_bucket.GetName().c_str(), bucket) == 0) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket); + return true; + } + else + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: Get bucket list:%s", new_bucket.GetName().c_str()); + } + } + } + else + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str()); + } + return false; +} + static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd) { char buf[128]; @@ -590,26 +608,38 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t hos_instance hos_get_instance() { - if (g_hos_handle.S3Client != NULL) + switch (atomic_read(&g_hos_instance.status)) { - g_hos_handle.count++; - g_hos_instance.result = true; + case INSTANCE_UNINIT_STATE: + return NULL; + default: + atomic_add(&g_hos_handle.count, 1); return &g_hos_instance; } - memset(&g_hos_instance, 0, sizeof(g_hos_instance)); - g_hos_instance.result = false; - return &g_hos_instance; +} + +int hos_get_init_instance_errorcode() +{ + return g_hos_instance.error_code; +} + +const char *hos_get_init_instance_errormsg() +{ + return g_hos_instance.error_message; +} + +const char *hos_get_upload_endpoint() +{ + return g_hos_instance.hos_url_prefix; } hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) { std::lock_guard locker(m_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; - char hos_url[1024]; if (conf_path == NULL || thread_num == 0 || module == NULL || bucket == NULL) { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_PARAMETER_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "param error:conf_path:%s, module:%s, thread_num:%lu, bucket:%s", conf_path, module, thread_num, bucket); @@ -636,50 +666,33 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); if (g_hos_handle.log == NULL) { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed."); - return &g_hos_instance; + return NULL; } - snprintf(hos_url, sizeof(hos_url), "http://%s:%d/hos/", hos_conf->ip, hos_conf->port); hos_conf->thread_num = thread_num; hos_client_create(); - if (g_hos_instance.result == true) + if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE) { - if(hos_verify_bucket(bucket) == false) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: bucket:%s not exist.", bucket); - hos_shutdown_instance(); - g_hos_instance.result = false; - g_hos_instance.error_code = HOS_BUCKET_NOT_EXIST; - snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "bucket:%s not exits.", bucket); - return &g_hos_instance; - } - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug:%s","Instance init completed"); - hos_expand_fs2(); - g_hos_instance.error_code = 0; - g_hos_instance.error_message[0]='\0'; - g_hos_instance.hos_url_prefix = (const char *)calloc(1, strlen(hos_url) + 1); - memcpy((void *)g_hos_instance.hos_url_prefix, hos_url, strlen(hos_url)); + return NULL; } return &g_hos_instance; } else { - g_hos_instance.result = false; g_hos_instance.error_code = HOS_CONF_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); - return &g_hos_instance; + return NULL; } } int hos_create_bucket(const char *bucket) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { - return HOS_INSTANCE_NOT_INIT; + return HOS_INSTANCE_NOT_ENABLE; } if (bucket == NULL) { @@ -728,7 +741,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int ret; int mode = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -801,7 +814,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata, size_t thread_id) { struct stat buffer; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -824,7 +837,7 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -842,7 +855,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size long hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) { - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -882,7 +895,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t upload_len = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -983,7 +996,7 @@ int hos_close_fd(size_t fd, size_t thread_id) hos_config_t *hos_conf = &g_hos_handle.hos_config; size_t upload_len = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (g_hos_instance.status == INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -1074,7 +1087,7 @@ int hos_shutdown_instance() hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t task_num = 0; - if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE) { return HOS_INSTANCE_NOT_INIT; } @@ -1083,6 +1096,13 @@ int hos_shutdown_instance() { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: hos client count:%lu.", g_hos_handle.count); return HOS_CLIENT_OK; + } + + //退出hos服务检测线程 + if (g_hos_handle.hos_func.hos_client_retry_thread_id != 0) + { + atomic_set(&g_hos_handle.hos_func.hos_client_retry_thread_status, 1); + pthread_join(g_hos_handle.hos_func.hos_client_retry_thread_id, NULL); } //先等待所有的task完成 @@ -1169,8 +1189,6 @@ int hos_shutdown_instance() MESA_destroy_runtime_log_handle(g_hos_handle.log); g_hos_handle.log = NULL; memset(&g_hos_handle, 0 , sizeof(g_hos_handle)); - if (g_hos_instance.hos_url_prefix) - free((void *)g_hos_instance.hos_url_prefix); memset(&g_hos_instance, 0, sizeof(g_hos_instance)); return HOS_CLIENT_OK; diff --git a/src/hos_client.h b/src/hos_client.h index e235df5f..e695c453 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -6,13 +6,8 @@ #ifndef __HOS_CLIENT_INIT__ #define __HOS_CLIENT_INIT__ -/*hos instance */ -typedef struct hos_instance_s{ - bool result; - int error_code; - char error_message[1024]; - const char *hos_url_prefix; -}* hos_instance; +struct hos_instance_s; +typedef struct hos_instance_s *hos_instance; #define HOS_CLIENT_OK 0 @@ -33,6 +28,7 @@ enum hoserrors HOS_CONF_ERROR = -7, HOS_BUCKET_NOT_EXIST = -8, HOS_INSTANCE_NOT_INIT = -9, + HOS_INSTANCE_NOT_ENABLE = -10, }; @@ -97,6 +93,9 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t * 返回值: hos_instance 成功,result 为true *************************************************************************************/ hos_instance hos_get_instance(); +int hos_get_init_instance_errorcode(); +const char *hos_get_init_instance_errormsg(); +const char *hos_get_upload_endpoint(); /************************************************************************************* * 函数名: hos_upload_file * 参数: hos_instance instance 非空句柄 diff --git a/src/hos_common.h b/src/hos_common.h index fdc7c654..b6583053 100644 --- a/src/hos_common.h +++ b/src/hos_common.h @@ -13,10 +13,12 @@ #define atomic_add(x,y) __sync_add_and_fetch((x),(y)) #define atomic_read(x) __sync_add_and_fetch((x),0) #define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) +#define atomic_set(x,y) __sync_lock_test_and_set((x),(y)) #else #define atomic_add(x,y) ((*(x))+=(y)) #define atomic_read(x) (*(x)) #define atomic_sub(x,y) ((*(x))-=(y)) +#define atomic_set(x,y) (*(x)=(y)) #endif #define MAX_HOS_STRING_LEN 1024 @@ -24,6 +26,17 @@ #define MAX_HOS_CLIENT_FD_NUM 65535 #define HOS_LOG_PATH "./tsglog/hoslog" +/*hos instance */ +typedef struct hos_instance_s{ +#define INSTANCE_UNINIT_STATE 0 +#define INSTANCE_ATTEMPT_STATE 1 +#define INSTANCE_ENABLE_STATE 2 + int status; + int error_code; + char error_message[1024]; + const char *hos_url_prefix; +}* hos_instance; + typedef struct data_info_s { size_t *tx_pkts; @@ -70,6 +83,7 @@ typedef struct hos_config_s uint32_t cache_count; uint32_t max_request_num; uint32_t max_request_context; + uint32_t reconnection_time; }hos_config_t; typedef struct hos_func_thread_s @@ -81,6 +95,8 @@ typedef struct hos_func_thread_s fs2_info_t fs2_info; pthread_t fs2_thread; int fs2_status; + pthread_t hos_client_retry_thread_id; + int hos_client_retry_thread_status; #define HOS_FS2_START 1 #define HOS_FS2_STOP 2 }hos_func_thread_t;