修改API,提供set options的方式设置参数

This commit is contained in:
pengxuanzheng
2020-10-19 15:35:16 +08:00
parent 59f8b200e4
commit d384353969
4 changed files with 131 additions and 44 deletions

View File

@@ -20,24 +20,27 @@ extern "C"
#include "hos_client.h" #include "hos_client.h"
#include "hos_hash.h" #include "hos_hash.h"
#define MAX_HOS_CLIENT_THREAD_NUM 255
#define MAX_HOS_CLIENT_FD_NUM 65535
typedef struct hos_client_handle_s typedef struct hos_client_handle_s
{ {
Aws::S3::S3Client *S3Client; Aws::S3::S3Client *S3Client;
size_t append_size;
size_t thread_sum;
Aws::SDKOptions options; Aws::SDKOptions options;
Aws::Vector<Aws::S3::Model::Bucket> buckets; Aws::Vector<Aws::S3::Model::Bucket> buckets;
/* options */
size_t cache_size;
size_t cache_times;
size_t thread_sum;
}hos_client_handle_t; }hos_client_handle_t;
#define MAX_THREAD_NUM 255 hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM];
#define MAX_FD_NUM 65535 size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM];
hos_info_t *hash_hos_info[MAX_THREAD_NUM];
size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM];
static size_t hash_get_min_free_fd(size_t thread_id) static size_t hash_get_min_free_fd(size_t thread_id)
{ {
size_t i = 0; size_t i = 0;
for (i = 1; i < MAX_FD_NUM; i++) for (i = 1; i < MAX_HOS_CLIENT_FD_NUM; i++)
{ {
if (!fd_info[thread_id][i]) if (!fd_info[thread_id][i])
return i; return i;
@@ -80,12 +83,31 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
} }
} }
hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum, size_t pool_size) void set_cache_size(hos_client_handle client, size_t cache_size)
{ {
if (!endpoint || !accesskeyid || !secretkey || thread_sum > MAX_THREAD_NUM) client->cache_size = cache_size;
return ;
}
void set_cache_times(hos_client_handle client, size_t cache_times)
{
client->cache_times = cache_times;
return ;
}
void set_thread_sum(hos_client_handle client, size_t thread_sum)
{
client->thread_sum = thread_sum;
return ;
}
hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size)
{
if (!endpoint || !accesskeyid || !secretkey)
{ {
return NULL; return NULL;
} }
Aws::SDKOptions options; Aws::SDKOptions options;
//options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug; //options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug;
Aws::InitAPI(options); Aws::InitAPI(options);
@@ -102,8 +124,6 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(pool_size));//支持线程池 config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(pool_size));//支持线程池
handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
handle->append_size = 30 * 1024 * 1024;
handle->thread_sum = thread_sum;
handle->options = options; handle->options = options;
/* 获取当前用户的所有的buckets */ /* 获取当前用户的所有的buckets */
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets();
@@ -113,6 +133,10 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
handle->buckets = outcome.GetResult().GetBuckets(); handle->buckets = outcome.GetResult().GetBuckets();
} }
handle->cache_size = 0;
handle->cache_times = 1;
handle->thread_sum = 1;
return handle; return handle;
} }
@@ -166,8 +190,6 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket)
} }
} }
//handle->buckets.push_back();
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
} }
@@ -217,7 +239,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
sprintf(buf, "%lu %lu", thread_id, fd); sprintf(buf, "%lu %lu", thread_id, fd);
context->SetUUID(buf); context->SetUUID(buf);
hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 };
add_hos_info(&hash_hos_info[thread_id], &info); add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1; fd_info[thread_id][fd] = 1;
@@ -250,21 +272,23 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_FD_NOT_ENOUGH; return HOS_FD_NOT_ENOUGH;
} }
hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_size, handle->cache_times, 0, };
add_hos_info(&hash_hos_info[thread_id], &info); add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1; fd_info[thread_id][fd] = 1;
return fd; return fd;
} }
int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position) int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id)
{ {
struct stat buffer; struct stat buffer;
hos_info_t *hos_info = NULL; hos_info_t *hos_info = NULL;
hos_client_handle handle = NULL; hos_client_handle handle = NULL;
char num[128]; char num[128];
char buf[128]; char buf[128];
if ((fd == 0) || (stream == NULL) || (thread_id > MAX_THREAD_NUM)) int flag = 0; // 0, 一次处理就可以完成1需要多次处理才能处理完
int rest; // stream 剩余未处理的数据长度
if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM))
{ {
return HOS_PARAMETER_ERROR; return HOS_PARAMETER_ERROR;
} }
@@ -286,34 +310,56 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
request.SetBucket(hos_info->bucket); request.SetBucket(hos_info->bucket);
request.SetKey(hos_info->object); request.SetKey(hos_info->object);
//TODO APPEND MODE
snprintf(num, 128, "%lu", position);
Aws::Map<Aws::String, Aws::String> headers;
if (hos_info->mode & APPEND_MODE)
{
headers["x-hos-upload-type"] = "append";
headers["x-hos-position"] = num;
request.SetMetadata(headers);
#if 0
request.AddMetadata("x-hos-upload-type", "append");
request.AddMetadata("x-hos-position", num);
#endif
}
//设置上传数据类型 //设置上传数据类型
if (hos_info->mode & BUFF_MODE) if (hos_info->mode & BUFF_MODE)
{ {
//BUFF_MODE //BUFF_MODE
#if 1 if (hos_info->mode & APPEND_MODE)
const std::shared_ptr<Aws::IOStream> input_data = {
Aws::MakeShared<Aws::StringStream>(stream, stream + stream_len); //APPEND_MODE
Aws::String buffer (stream, stream_len); if (hos_info->cache == NULL)
*input_data << buffer; {
#else hos_info->cache = Aws::MakeShared<Aws::StringStream>("append mode");
Aws::StringStream *buffer = new Aws::StringStream(Aws::String(stream, stream_len)); }
const std::shared_ptr<Aws::IOStream> input_data(buffer); if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest))
#endif {
request.SetBody(input_data); // cache
Aws::String buffer (stream, stream_len);
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
return HOS_CLIENT_OK;
}
}else if (stream_len > hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
}else
{
//over cache_times
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
}
request.SetBody(hos_info->cache);
// add headers
snprintf(num, 128, "%lu", ++hos_info->position);
Aws::Map<Aws::String, Aws::String> headers;
if (hos_info->mode & APPEND_MODE)
{
headers["x-hos-upload-type"] = "append";
headers["x-hos-position"] = num;
request.SetMetadata(headers);
}
}else
{
const std::shared_ptr<Aws::IOStream> input_data =
Aws::MakeShared<Aws::StringStream>("buffer mode");
request.SetBody(input_data);
}
} }
else else
{ {
@@ -334,6 +380,18 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
context->SetUUID(buf); context->SetUUID(buf);
S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
//恢复fd 的cache设置
if (hos_info->mode & APPEND_MODE)
{
hos_info->cache = NULL;
hos_info->cache_rest = hos_info->handle->cache_size;
hos_info->cache_times = hos_info->handle->cache_times;
}
while (flag == 1)
{
return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id);
}
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
} }

View File

@@ -82,7 +82,7 @@ typedef void (*put_finished_callback)(bool, const char *, void *);
* size_t thread_sum 线程总数 * size_t thread_sum 线程总数
* 返回值: 成功返回一个非空句柄失败返回NULL。失败原因都是因为输入参数不合法 * 返回值: 成功返回一个非空句柄失败返回NULL。失败原因都是因为输入参数不合法
*************************************************************************************/ *************************************************************************************/
hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum, size_t pool_size); hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size);
/************************************************************************************* /*************************************************************************************
* 函数名: hos_create_bucket * 函数名: hos_create_bucket
* 参数: hos_client_handle handle 非空句柄 * 参数: hos_client_handle handle 非空句柄
@@ -97,6 +97,24 @@ bool hos_verify_bucket(hos_client_handle handle, const char *bucket);
* 返回值: int 成功返回0S3错误返回s3errors错误码hos client错误返回hoserrors错误码 * 返回值: int 成功返回0S3错误返回s3errors错误码hos client错误返回hoserrors错误码
*************************************************************************************/ *************************************************************************************/
int hos_create_bucket(hos_client_handle handle, const char *bucket); int hos_create_bucket(hos_client_handle handle, const char *bucket);
/*************************************************************************************
* 函数名: set_cache_size
* 参数: hos_client_handle handle 非空句柄
* size_t cache_size append 模式每次追加的buffer大小
*************************************************************************************/
void set_cache_size(hos_client_handle handle, size_t cache_size);
/*************************************************************************************
* 函数名: set_cache_times
* 参数: hos_client_handle handle 非空句柄
* size_t cache_times append 模式追加次数
*************************************************************************************/
void set_cache_times(hos_client_handle handle, size_t cache_times);
/*************************************************************************************
* 函数名: set_thread_sum
* 参数: hos_client_handle handle 非空句柄
* size_t thread_sum append 模式追加次数
*************************************************************************************/
void set_cache_times(hos_client_handle handle, size_t thread_sum);
/************************************************************************************* /*************************************************************************************
* 函数名: hos_upload_async * 函数名: hos_upload_async
* 参数: hos_client_handle handle 非空句柄 * 参数: hos_client_handle handle 非空句柄
@@ -142,7 +160,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
* size_t position append模式下的每段内容编号 * size_t position append模式下的每段内容编号
* 返回值 int 成功返回0失败返回hoserros错误码 * 返回值 int 成功返回0失败返回hoserros错误码
*************************************************************************************/ *************************************************************************************/
int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position); int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id);
/************************************************************************************* /*************************************************************************************
* 函数名: hos_close_fd * 函数名: hos_close_fd
* 参数: size_t fd fd * 参数: size_t fd fd

View File

@@ -23,6 +23,10 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input)
value->object = input->object; value->object = input->object;
value->callback = input->callback; value->callback = input->callback;
value->userdata = input->userdata; value->userdata = input->userdata;
value->cache = input->cache;
value->cache_times = input->cache_times;
value->cache_rest = input->cache_rest;
value->position = input->position;
} }
} }

View File

@@ -6,17 +6,24 @@
#ifndef __HOS_HASH_H__ #ifndef __HOS_HASH_H__
#define __HOS_HASH_H__ #define __HOS_HASH_H__
#include <aws/core/Aws.h>
#include "hos_client.h"
#include "uthash.h" #include "uthash.h"
typedef struct hos_info_s typedef struct hos_info_s
{ {
size_t fd; size_t fd;
int mode; int mode;
void *handle; hos_client_handle handle;
const char *bucket; const char *bucket;
const char *object; const char *object;
void *callback; void *callback;
void *userdata; void *userdata;
std::shared_ptr<Aws::IOStream> cache;
//void *cache;
size_t cache_times;
size_t cache_rest;
size_t position;
UT_hash_handle hh; UT_hash_handle hh;
}hos_info_t; }hos_info_t;