🐞 fix(TSG-7599): hos client 初始化失败,定期尝试重连hos服务

This commit is contained in:
pengxuanzheng
2021-09-27 18:49:27 +08:00
parent 7b6a1281cf
commit 8c49f4d2c2
5 changed files with 182 additions and 150 deletions

View File

@@ -96,14 +96,14 @@ int main(int argc, char *argv[])
printf("hos_init_instance start ...\n"); printf("hos_init_instance start ...\n");
hos_instance = hos_get_instance(); hos_instance = hos_get_instance();
if (hos_instance->result == false) if (hos_instance == NULL)
{ {
hos_instance = hos_init_instance(conf_file, module_name, 1, bucket); hos_instance = hos_init_instance(conf_file, module_name, 1, bucket);
} }
if (hos_instance->result == false) if (hos_instance == NULL)
{ {
printf("error:hos_init_instance\n"); printf("error:hos_init_instance\n");
printf("error:%s", hos_instance->error_message); printf("error:[%d]%s", hos_get_init_instance_errorcode, hos_get_init_instance_errormsg);
return -1; return -1;
} }
printf("hos_init_instance success ... \n"); printf("hos_init_instance success ... \n");

View File

@@ -15,7 +15,6 @@ extern "C"
#include<sys/stat.h> #include<sys/stat.h>
#include<math.h> #include<math.h>
#include<netinet/in.h> #include<netinet/in.h>
#include<zlog.h>
} }
#include"../../src/hos_client.h" #include"../../src/hos_client.h"
#include "MESA_handle_logger.h" #include "MESA_handle_logger.h"
@@ -474,7 +473,7 @@ int main(int argc, char *argv[])
hos_instance hos_instance = hos_init_instance(conf_path, module, thread_sum, bucket); hos_instance hos_instance = hos_init_instance(conf_path, module, thread_sum, bucket);
if (hos_instance == NULL) if (hos_instance == NULL)
{ {
printf("error:hos_client_handle\n %s\n", hos_instance->error_message); printf("error:hos_client_handle\n error:[%d]%s\n", hos_get_init_instance_errorcode(), hos_get_init_instance_errormsg());
fclose(log); fclose(log);
return -1; return -1;
} }

View File

@@ -49,13 +49,8 @@ static std::mutex m_client_lock;
static std::mutex m_instance_lock; static std::mutex m_instance_lock;
static std::mutex m_delete_lock; static std::mutex m_delete_lock;
static Aws::SDKOptions g_options; static Aws::SDKOptions g_options;
Aws::Auth::AWSCredentials g_credentials;
static inline size_t get_current_ms() Aws::Client::ClientConfiguration *g_client_config;
{
struct timespec timenow;
clock_gettime(CLOCK_MONOTONIC, &timenow);
return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 );
}
static int hos_delete_fd(size_t fd, size_t thread_id) static int hos_delete_fd(size_t fd, size_t thread_id)
{ {
@@ -177,112 +172,47 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
atomic_sub(&g_hos_handle.task_context[thread_id], stream_len); atomic_sub(&g_hos_handle.task_context[thread_id], stream_len);
} }
static void hos_client_create() static int hos_attempt_connection()
{ {
hos_config_t *hos_conf = &g_hos_handle.hos_config;
void *log = g_hos_handle.log;
if (g_hos_handle.S3Client != NULL)
{
g_hos_handle.count++;
g_hos_instance.result = true;
return ;
}
Aws::InitAPI(g_options);
Aws::Client::ClientConfiguration config;
Aws::Auth::AWSCredentials credentials(hos_conf->accesskeyid, hos_conf->secretkey);
//初始化
char endpoint[128];
snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port);
config.endpointOverride = endpoint;
config.verifySSL = false;
config.enableEndpointDiscovery = true;
if (hos_conf->pool_thread_size > 0)
{
//异步模式
//config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池
config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池
}
else
{
//同步模式
}
#ifndef HOS_MOCK
g_hos_handle.S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#else
g_hos_handle.S3Client = new Aws::S3::S3ClientMock(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#endif
/* 获取当前用户的所有的buckets */ /* 获取当前用户的所有的buckets */
Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets();
if (!outcome.IsSuccess()) if (!outcome.IsSuccess())
{ {
delete g_hos_handle.S3Client;
g_hos_handle.S3Client = NULL;
Aws::ShutdownAPI(g_options);
g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1; g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1;
snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str()); snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str());
g_hos_instance.result = false; atomic_set(&g_hos_instance.status, INSTANCE_ATTEMPT_STATE);
MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_FATAL, "hos_client_create", "error: %s", g_hos_instance.error_message); MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "[%s] ErrorCode:%d, Error: %s",
return; g_client_config->endpointOverride.c_str(), g_hos_instance.error_code, g_hos_instance.error_message);
}
g_hos_handle.buckets = outcome.GetResult().GetBuckets(); if (g_hos_instance.error_code == NETWORK_CONNECTION)
g_hos_handle.count++;
g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
#if 0
if (g_hos_handle.hos_func.fd_thread == 0)
{ {
g_hos_handle.hos_func.fd_thread_status = 0; atomic_set(&g_hos_instance.status, INSTANCE_ATTEMPT_STATE);
pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL);
}
#endif
MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint);
g_hos_instance.result = true;
}
bool hos_verify_bucket(const char *bucket)
{
if (bucket == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"debug: bucket is null");
return false;
}
if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL)
{
return false;
}
Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets();
if (outcome.IsSuccess())
{
g_hos_handle.buckets = outcome.GetResult().GetBuckets();
for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets)
{
if (strcmp(new_bucket.GetName().c_str(), bucket) == 0)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket);
return true;
} }
else else
{ {
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: Get bucket list:%s", new_bucket.GetName().c_str()); atomic_set(&g_hos_handle.count, 0);//立即shutdown
hos_shutdown_instance();
g_hos_instance.status = INSTANCE_UNINIT_STATE;
g_hos_instance.error_code = (size_t)outcome.GetError().GetErrorType() + 1;
snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, outcome.GetError().GetMessage().c_str());
} }
return g_hos_instance.error_code;
} }
g_hos_handle.buckets = outcome.GetResult().GetBuckets();
atomic_set(&g_hos_instance.status, INSTANCE_ENABLE_STATE);
return HOS_CLIENT_OK;
} }
else
//检测hos 是否可用
static void *hos_attempt_connection_exhaustively(void *ptr)
{ {
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str()); while(atomic_read(&g_hos_handle.hos_func.hos_client_retry_thread_status) == 0)
{
hos_attempt_connection();
sleep(g_hos_handle.hos_config.reconnection_time);
} }
return false; pthread_exit(NULL);
} }
static void *fs2_statistics(void *ptr) static void *fs2_statistics(void *ptr)
@@ -474,6 +404,94 @@ static void hos_expand_fs2()
return ; return ;
} }
static void hos_client_create()
{
hos_config_t *hos_conf = &g_hos_handle.hos_config;
Aws::InitAPI(g_options);
g_client_config = new Aws::Client::ClientConfiguration();
g_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid);
g_credentials.SetAWSSecretKey(hos_conf->secretkey);
//初始化
char endpoint[128];
snprintf(endpoint, 128, "http://%s:%d/hos/", hos_conf->ip, hos_conf->port);
g_client_config->endpointOverride.append(endpoint);
g_client_config->verifySSL = false;
g_client_config->enableEndpointDiscovery = true;
if (hos_conf->pool_thread_size > 0)
{
//异步模式
//config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池
g_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池
}
else
{
//同步模式
}
#ifndef HOS_MOCK
g_hos_handle.S3Client = new Aws::S3::S3Client(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#else
g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#endif
atomic_add(&g_hos_handle.count, 1);
g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_INFO, __FUNCTION__, "[%s] debug: hos s3client create success.",g_client_config->endpointOverride.c_str());
g_hos_instance.hos_url_prefix = g_client_config->endpointOverride.c_str();
hos_expand_fs2();
//hos 检测服务端是否可以连接上
hos_attempt_connection();
if (g_hos_handle.hos_func.hos_client_retry_thread_id == 0)
{
g_hos_handle.hos_func.hos_client_retry_thread_status = 0;
pthread_create(&g_hos_handle.hos_func.hos_client_retry_thread_id, NULL, hos_attempt_connection_exhaustively, NULL);
}
}
bool hos_verify_bucket(const char *bucket)
{
if (bucket == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"debug: bucket is null");
return false;
}
if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{
return false;
}
Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets();
if (outcome.IsSuccess())
{
g_hos_handle.buckets = outcome.GetResult().GetBuckets();
for (Aws::S3::Model::Bucket& new_bucket : g_hos_handle.buckets)
{
if (strcmp(new_bucket.GetName().c_str(), bucket) == 0)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket);
return true;
}
else
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: Get bucket list:%s", new_bucket.GetName().c_str());
}
}
}
else
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_verify_bucket","error:%s", outcome.GetError().GetMessage().c_str());
}
return false;
}
static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd) static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd)
{ {
char buf[128]; char buf[128];
@@ -590,26 +608,38 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t
hos_instance hos_get_instance() hos_instance hos_get_instance()
{ {
if (g_hos_handle.S3Client != NULL) switch (atomic_read(&g_hos_instance.status))
{ {
g_hos_handle.count++; case INSTANCE_UNINIT_STATE:
g_hos_instance.result = true; return NULL;
default:
atomic_add(&g_hos_handle.count, 1);
return &g_hos_instance; return &g_hos_instance;
} }
memset(&g_hos_instance, 0, sizeof(g_hos_instance)); }
g_hos_instance.result = false;
return &g_hos_instance; int hos_get_init_instance_errorcode()
{
return g_hos_instance.error_code;
}
const char *hos_get_init_instance_errormsg()
{
return g_hos_instance.error_message;
}
const char *hos_get_upload_endpoint()
{
return g_hos_instance.hos_url_prefix;
} }
hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket)
{ {
std::lock_guard<std::mutex> locker(m_client_lock); std::lock_guard<std::mutex> locker(m_client_lock);
hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_config_t *hos_conf = &g_hos_handle.hos_config;
char hos_url[1024];
if (conf_path == NULL || thread_num == 0 || module == NULL || bucket == NULL) if (conf_path == NULL || thread_num == 0 || module == NULL || bucket == NULL)
{ {
g_hos_instance.result = false;
g_hos_instance.error_code = HOS_PARAMETER_ERROR; g_hos_instance.error_code = HOS_PARAMETER_ERROR;
snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE,
"param error:conf_path:%s, module:%s, thread_num:%lu, bucket:%s", conf_path, module, thread_num, bucket); "param error:conf_path:%s, module:%s, thread_num:%lu, bucket:%s", conf_path, module, thread_num, bucket);
@@ -636,50 +666,33 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level);
if (g_hos_handle.log == NULL) if (g_hos_handle.log == NULL)
{ {
g_hos_instance.result = false;
g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED;
snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed."); snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "runtime log create failed.");
return &g_hos_instance; return NULL;
} }
snprintf(hos_url, sizeof(hos_url), "http://%s:%d/hos/", hos_conf->ip, hos_conf->port);
hos_conf->thread_num = thread_num; hos_conf->thread_num = thread_num;
hos_client_create(); hos_client_create();
if (g_hos_instance.result == true) if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE)
{ {
if(hos_verify_bucket(bucket) == false) return NULL;
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: bucket:%s not exist.", bucket);
hos_shutdown_instance();
g_hos_instance.result = false;
g_hos_instance.error_code = HOS_BUCKET_NOT_EXIST;
snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "bucket:%s not exits.", bucket);
return &g_hos_instance;
}
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug:%s","Instance init completed");
hos_expand_fs2();
g_hos_instance.error_code = 0;
g_hos_instance.error_message[0]='\0';
g_hos_instance.hos_url_prefix = (const char *)calloc(1, strlen(hos_url) + 1);
memcpy((void *)g_hos_instance.hos_url_prefix, hos_url, strlen(hos_url));
} }
return &g_hos_instance; return &g_hos_instance;
} }
else else
{ {
g_hos_instance.result = false;
g_hos_instance.error_code = HOS_CONF_ERROR; g_hos_instance.error_code = HOS_CONF_ERROR;
snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s",
hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey);
return &g_hos_instance; return NULL;
} }
} }
int hos_create_bucket(const char *bucket) int hos_create_bucket(const char *bucket)
{ {
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_ENABLE;
} }
if (bucket == NULL) if (bucket == NULL)
{ {
@@ -728,7 +741,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
int ret; int ret;
int mode = 0; int mode = 0;
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -801,7 +814,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata, size_t thread_id) int hos_upload_file(const char *bucket, const char *file_path, put_finished_callback callback, void *userdata, size_t thread_id)
{ {
struct stat buffer; struct stat buffer;
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -824,7 +837,7 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call
int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id) int hos_upload_buf(const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id)
{ {
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -842,7 +855,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size
long hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) long hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id)
{ {
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -882,7 +895,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_func_thread_t *hos_func = &g_hos_handle.hos_func; hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
size_t upload_len = 0; size_t upload_len = 0;
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -983,7 +996,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_config_t *hos_conf = &g_hos_handle.hos_config;
size_t upload_len = 0; size_t upload_len = 0;
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (g_hos_instance.status == INSTANCE_ENABLE_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -1074,7 +1087,7 @@ int hos_shutdown_instance()
hos_func_thread_t *hos_func = &g_hos_handle.hos_func; hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
size_t task_num = 0; size_t task_num = 0;
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE)
{ {
return HOS_INSTANCE_NOT_INIT; return HOS_INSTANCE_NOT_INIT;
} }
@@ -1085,6 +1098,13 @@ int hos_shutdown_instance()
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
} }
//退出hos服务检测线程
if (g_hos_handle.hos_func.hos_client_retry_thread_id != 0)
{
atomic_set(&g_hos_handle.hos_func.hos_client_retry_thread_status, 1);
pthread_join(g_hos_handle.hos_func.hos_client_retry_thread_id, NULL);
}
//先等待所有的task完成 //先等待所有的task完成
while(1) while(1)
{ {
@@ -1169,8 +1189,6 @@ int hos_shutdown_instance()
MESA_destroy_runtime_log_handle(g_hos_handle.log); MESA_destroy_runtime_log_handle(g_hos_handle.log);
g_hos_handle.log = NULL; g_hos_handle.log = NULL;
memset(&g_hos_handle, 0 , sizeof(g_hos_handle)); memset(&g_hos_handle, 0 , sizeof(g_hos_handle));
if (g_hos_instance.hos_url_prefix)
free((void *)g_hos_instance.hos_url_prefix);
memset(&g_hos_instance, 0, sizeof(g_hos_instance)); memset(&g_hos_instance, 0, sizeof(g_hos_instance));
return HOS_CLIENT_OK; return HOS_CLIENT_OK;

View File

@@ -6,13 +6,8 @@
#ifndef __HOS_CLIENT_INIT__ #ifndef __HOS_CLIENT_INIT__
#define __HOS_CLIENT_INIT__ #define __HOS_CLIENT_INIT__
/*hos instance */ struct hos_instance_s;
typedef struct hos_instance_s{ typedef struct hos_instance_s *hos_instance;
bool result;
int error_code;
char error_message[1024];
const char *hos_url_prefix;
}* hos_instance;
#define HOS_CLIENT_OK 0 #define HOS_CLIENT_OK 0
@@ -33,6 +28,7 @@ enum hoserrors
HOS_CONF_ERROR = -7, HOS_CONF_ERROR = -7,
HOS_BUCKET_NOT_EXIST = -8, HOS_BUCKET_NOT_EXIST = -8,
HOS_INSTANCE_NOT_INIT = -9, HOS_INSTANCE_NOT_INIT = -9,
HOS_INSTANCE_NOT_ENABLE = -10,
}; };
@@ -97,6 +93,9 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
* 返回值: hos_instance 成功result 为true * 返回值: hos_instance 成功result 为true
*************************************************************************************/ *************************************************************************************/
hos_instance hos_get_instance(); hos_instance hos_get_instance();
int hos_get_init_instance_errorcode();
const char *hos_get_init_instance_errormsg();
const char *hos_get_upload_endpoint();
/************************************************************************************* /*************************************************************************************
* 函数名: hos_upload_file * 函数名: hos_upload_file
* 参数: hos_instance instance 非空句柄 * 参数: hos_instance instance 非空句柄

View File

@@ -13,10 +13,12 @@
#define atomic_add(x,y) __sync_add_and_fetch((x),(y)) #define atomic_add(x,y) __sync_add_and_fetch((x),(y))
#define atomic_read(x) __sync_add_and_fetch((x),0) #define atomic_read(x) __sync_add_and_fetch((x),0)
#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) #define atomic_sub(x,y) __sync_sub_and_fetch((x),(y))
#define atomic_set(x,y) __sync_lock_test_and_set((x),(y))
#else #else
#define atomic_add(x,y) ((*(x))+=(y)) #define atomic_add(x,y) ((*(x))+=(y))
#define atomic_read(x) (*(x)) #define atomic_read(x) (*(x))
#define atomic_sub(x,y) ((*(x))-=(y)) #define atomic_sub(x,y) ((*(x))-=(y))
#define atomic_set(x,y) (*(x)=(y))
#endif #endif
#define MAX_HOS_STRING_LEN 1024 #define MAX_HOS_STRING_LEN 1024
@@ -24,6 +26,17 @@
#define MAX_HOS_CLIENT_FD_NUM 65535 #define MAX_HOS_CLIENT_FD_NUM 65535
#define HOS_LOG_PATH "./tsglog/hoslog" #define HOS_LOG_PATH "./tsglog/hoslog"
/*hos instance */
typedef struct hos_instance_s{
#define INSTANCE_UNINIT_STATE 0
#define INSTANCE_ATTEMPT_STATE 1
#define INSTANCE_ENABLE_STATE 2
int status;
int error_code;
char error_message[1024];
const char *hos_url_prefix;
}* hos_instance;
typedef struct data_info_s typedef struct data_info_s
{ {
size_t *tx_pkts; size_t *tx_pkts;
@@ -70,6 +83,7 @@ typedef struct hos_config_s
uint32_t cache_count; uint32_t cache_count;
uint32_t max_request_num; uint32_t max_request_num;
uint32_t max_request_context; uint32_t max_request_context;
uint32_t reconnection_time;
}hos_config_t; }hos_config_t;
typedef struct hos_func_thread_s typedef struct hos_func_thread_s
@@ -81,6 +95,8 @@ typedef struct hos_func_thread_s
fs2_info_t fs2_info; fs2_info_t fs2_info;
pthread_t fs2_thread; pthread_t fs2_thread;
int fs2_status; int fs2_status;
pthread_t hos_client_retry_thread_id;
int hos_client_retry_thread_status;
#define HOS_FS2_START 1 #define HOS_FS2_START 1
#define HOS_FS2_STOP 2 #define HOS_FS2_STOP 2
}hos_func_thread_t; }hos_func_thread_t;