From d38435396918033925629e8b02d2c2f6ba9b4b9f Mon Sep 17 00:00:00 2001 From: pengxuanzheng Date: Mon, 19 Oct 2020 15:35:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9API=EF=BC=8C=E6=8F=90?= =?UTF-8?q?=E4=BE=9Bset=20options=E7=9A=84=E6=96=B9=E5=BC=8F=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_client.cpp | 140 ++++++++++++++++++++++++++++++++------------- src/hos_client.h | 22 ++++++- src/hos_hash.cpp | 4 ++ src/hos_hash.h | 9 ++- 4 files changed, 131 insertions(+), 44 deletions(-) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index b4e91867..805450b3 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -20,24 +20,27 @@ extern "C" #include "hos_client.h" #include "hos_hash.h" +#define MAX_HOS_CLIENT_THREAD_NUM 255 +#define MAX_HOS_CLIENT_FD_NUM 65535 + typedef struct hos_client_handle_s { Aws::S3::S3Client *S3Client; - size_t append_size; - size_t thread_sum; Aws::SDKOptions options; Aws::Vector buckets; + /* options */ + size_t cache_size; + size_t cache_times; + size_t thread_sum; }hos_client_handle_t; -#define MAX_THREAD_NUM 255 -#define MAX_FD_NUM 65535 -hos_info_t *hash_hos_info[MAX_THREAD_NUM]; -size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM]; +hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; +size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; - for (i = 1; i < MAX_FD_NUM; i++) + for (i = 1; i < MAX_HOS_CLIENT_FD_NUM; i++) { if (!fd_info[thread_id][i]) return i; @@ -80,12 +83,31 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, } } -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum, size_t pool_size) +void set_cache_size(hos_client_handle client, size_t cache_size) { - if (!endpoint || !accesskeyid || !secretkey || thread_sum > MAX_THREAD_NUM) + client->cache_size = cache_size; + return ; +} + +void set_cache_times(hos_client_handle client, size_t cache_times) +{ + client->cache_times = cache_times; + return ; +} + +void set_thread_sum(hos_client_handle client, size_t thread_sum) +{ + client->thread_sum = thread_sum; + return ; +} + +hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size) +{ + if (!endpoint || !accesskeyid || !secretkey) { return NULL; } + Aws::SDKOptions options; //options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug; Aws::InitAPI(options); @@ -102,8 +124,6 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.executor = std::shared_ptr(std::make_shared(pool_size));//支持线程池 handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - handle->append_size = 30 * 1024 * 1024; - handle->thread_sum = thread_sum; handle->options = options; /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); @@ -113,6 +133,10 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi handle->buckets = outcome.GetResult().GetBuckets(); } + handle->cache_size = 0; + handle->cache_times = 1; + handle->thread_sum = 1; + return handle; } @@ -166,8 +190,6 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket) } } - //handle->buckets.push_back(); - return HOS_CLIENT_OK; } @@ -217,7 +239,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; + hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; @@ -250,21 +272,23 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; + hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_size, handle->cache_times, 0, }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; return fd; } -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position) +int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) { struct stat buffer; hos_info_t *hos_info = NULL; hos_client_handle handle = NULL; char num[128]; char buf[128]; - if ((fd == 0) || (stream == NULL) || (thread_id > MAX_THREAD_NUM)) + int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完 + int rest; // stream 剩余未处理的数据长度 + if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM)) { return HOS_PARAMETER_ERROR; } @@ -286,34 +310,56 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBucket(hos_info->bucket); request.SetKey(hos_info->object); - //TODO APPEND MODE - snprintf(num, 128, "%lu", position); - Aws::Map headers; - if (hos_info->mode & APPEND_MODE) - { - headers["x-hos-upload-type"] = "append"; - headers["x-hos-position"] = num; - request.SetMetadata(headers); -#if 0 - request.AddMetadata("x-hos-upload-type", "append"); - request.AddMetadata("x-hos-position", num); -#endif - } - //设置上传数据类型 if (hos_info->mode & BUFF_MODE) { //BUFF_MODE -#if 1 - const std::shared_ptr input_data = - Aws::MakeShared(stream, stream + stream_len); - Aws::String buffer (stream, stream_len); - *input_data << buffer; -#else - Aws::StringStream *buffer = new Aws::StringStream(Aws::String(stream, stream_len)); - const std::shared_ptr input_data(buffer); -#endif - request.SetBody(input_data); + if (hos_info->mode & APPEND_MODE) + { + //APPEND_MODE + if (hos_info->cache == NULL) + { + hos_info->cache = Aws::MakeShared("append mode"); + } + if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + { + // cache + Aws::String buffer (stream, stream_len); + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len > hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; + }else + { + //over cache_times + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + } + request.SetBody(hos_info->cache); + + // add headers + snprintf(num, 128, "%lu", ++hos_info->position); + Aws::Map headers; + if (hos_info->mode & APPEND_MODE) + { + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + } + }else + { + const std::shared_ptr input_data = + Aws::MakeShared("buffer mode"); + request.SetBody(input_data); + } } else { @@ -334,6 +380,18 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id context->SetUUID(buf); S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + + //恢复fd 的cache设置 + if (hos_info->mode & APPEND_MODE) + { + hos_info->cache = NULL; + hos_info->cache_rest = hos_info->handle->cache_size; + hos_info->cache_times = hos_info->handle->cache_times; + } + while (flag == 1) + { + return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id); + } return HOS_CLIENT_OK; } diff --git a/src/hos_client.h b/src/hos_client.h index 668af9fd..adacbb90 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -82,7 +82,7 @@ typedef void (*put_finished_callback)(bool, const char *, void *); * size_t thread_sum 线程总数 * 返回值: 成功返回一个非空句柄,失败返回NULL。(失败原因都是因为输入参数不合法) *************************************************************************************/ -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum, size_t pool_size); +hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size); /************************************************************************************* * 函数名: hos_create_bucket * 参数: hos_client_handle handle 非空句柄 @@ -97,6 +97,24 @@ bool hos_verify_bucket(hos_client_handle handle, const char *bucket); * 返回值: int 成功返回0,S3错误返回s3errors错误码,hos client错误返回hoserrors错误码 *************************************************************************************/ int hos_create_bucket(hos_client_handle handle, const char *bucket); +/************************************************************************************* + * 函数名: set_cache_size + * 参数: hos_client_handle handle 非空句柄 + * size_t cache_size append 模式每次追加的buffer大小 +*************************************************************************************/ +void set_cache_size(hos_client_handle handle, size_t cache_size); +/************************************************************************************* + * 函数名: set_cache_times + * 参数: hos_client_handle handle 非空句柄 + * size_t cache_times append 模式追加次数 +*************************************************************************************/ +void set_cache_times(hos_client_handle handle, size_t cache_times); +/************************************************************************************* + * 函数名: set_thread_sum + * 参数: hos_client_handle handle 非空句柄 + * size_t thread_sum append 模式追加次数 +*************************************************************************************/ +void set_cache_times(hos_client_handle handle, size_t thread_sum); /************************************************************************************* * 函数名: hos_upload_async * 参数: hos_client_handle handle 非空句柄 @@ -142,7 +160,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object * size_t position append模式下的每段内容编号 * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position); +int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id); /************************************************************************************* * 函数名: hos_close_fd * 参数: size_t fd fd diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index 592f041b..f6bc8838 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -23,6 +23,10 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) value->object = input->object; value->callback = input->callback; value->userdata = input->userdata; + value->cache = input->cache; + value->cache_times = input->cache_times; + value->cache_rest = input->cache_rest; + value->position = input->position; } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 7daf1b91..51e2a6b7 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -6,17 +6,24 @@ #ifndef __HOS_HASH_H__ #define __HOS_HASH_H__ +#include +#include "hos_client.h" #include "uthash.h" typedef struct hos_info_s { size_t fd; int mode; - void *handle; + hos_client_handle handle; const char *bucket; const char *object; void *callback; void *userdata; + std::shared_ptr cache; + //void *cache; + size_t cache_times; + size_t cache_rest; + size_t position; UT_hash_handle hh; }hos_info_t;