优化fd的生成及记录

This commit is contained in:
pengxuanzheng
2020-10-09 14:20:39 +08:00
parent 76477b253d
commit a6bc240ec0
2 changed files with 37 additions and 15 deletions

View File

@@ -25,20 +25,21 @@ typedef struct hos_client_handle_s
Aws::S3::S3Client *S3Client; Aws::S3::S3Client *S3Client;
size_t append_size; size_t append_size;
size_t thread_sum; size_t thread_sum;
Aws::SDKOptions *options; Aws::SDKOptions options;
Aws::Vector<Aws::S3::Model::Bucket> buckets; Aws::Vector<Aws::S3::Model::Bucket> buckets;
}hos_client_handle_t; }hos_client_handle_t;
#define MAX_THREAD_NUM 255 #define MAX_THREAD_NUM 255
#define MAX_FD_NUM 65535 #define MAX_FD_NUM 65535
hos_info_t *hash_hos_info[MAX_THREAD_NUM]; hos_info_t *hash_hos_info[MAX_THREAD_NUM];
size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM];
static size_t hash_get_min_free_fd(hos_info_t *handle) static size_t hash_get_min_free_fd(size_t thread_id)
{ {
size_t i = 0; size_t i = 0;
for (i = 1; i < MAX_FD_NUM; i++) for (i = 1; i < MAX_FD_NUM; i++)
{ {
if (!find_info_by_fd(handle, i)) if (!fd_info[thread_id][i])
return i; return i;
} }
return 0; return 0;
@@ -50,6 +51,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context)
{ {
const char *error = NULL; const char *error = NULL;
hos_info_t *hos_info = NULL;
bool result = outcome.IsSuccess(); bool result = outcome.IsSuccess();
if (!result) if (!result)
{ {
@@ -58,8 +60,14 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
const Aws::String& uuid = context->GetUUID(); const Aws::String& uuid = context->GetUUID();
size_t thread_id, fd; size_t thread_id, fd;
sscanf(uuid.c_str(), "%lu %lu", &thread_id, &fd); sscanf(uuid.c_str(), "%lu %lu", &thread_id, &fd);
hos_info_t *hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); if (fd_info[thread_id][fd])
//put_finished_callback& callback = *(put_finished_callback *)hos_info->callback; {
hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
}
if (hos_info == NULL)
{
return ;
}
put_finished_callback callback = (put_finished_callback)hos_info->callback; put_finished_callback callback = (put_finished_callback)hos_info->callback;
callback(result, error, hos_info->userdata); callback(result, error, hos_info->userdata);
if (hos_info->mode & APPEND_MODE) if (hos_info->mode & APPEND_MODE)
@@ -78,8 +86,8 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
{ {
return NULL; return NULL;
} }
//Aws::SDKOptions *options = (Aws::SDKOptions *)malloc(sizeof(Aws::SDKOptions));
Aws::SDKOptions options; Aws::SDKOptions options;
//auto options = new Aws::SDKOptions;
Aws::InitAPI(options); Aws::InitAPI(options);
hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t));
@@ -96,7 +104,7 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
handle->append_size = 30 * 1024 * 1024; handle->append_size = 30 * 1024 * 1024;
handle->thread_sum = thread_sum; handle->thread_sum = thread_sum;
handle->options = &options; handle->options = options;
/* 获取当前用户的所有的buckets */ /* 获取当前用户的所有的buckets */
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets();
@@ -168,7 +176,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
{ {
struct stat buffer; struct stat buffer;
char buf[128]; char buf[128];
size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); size_t fd = hash_get_min_free_fd(thread_id);
if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum)) if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum))
{ {
@@ -211,6 +219,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, };
add_hos_info(&hash_hos_info[thread_id], &info); add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1;
S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
@@ -235,7 +244,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_PARAMETER_ERROR; return HOS_PARAMETER_ERROR;
} }
size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); size_t fd = hash_get_min_free_fd(thread_id);
if (fd == 0) if (fd == 0)
{ {
return HOS_FD_NOT_ENOUGH; return HOS_FD_NOT_ENOUGH;
@@ -243,6 +252,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, };
add_hos_info(&hash_hos_info[thread_id], &info); add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1;
return fd; return fd;
} }
@@ -259,7 +269,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
return HOS_PARAMETER_ERROR; return HOS_PARAMETER_ERROR;
} }
if (fd_info[thread_id][fd])
{
hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
}
if (hos_info == NULL) if (hos_info == NULL)
{ {
return HOS_HASH_NOT_FIND; return HOS_HASH_NOT_FIND;
@@ -279,7 +292,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
if (hos_info->mode & APPEND_MODE) if (hos_info->mode & APPEND_MODE)
{ {
headers["x-hos-upload-type"] = "append"; headers["x-hos-upload-type"] = "append";
headers["x-hos_position"] = num; headers["x-hos-position"] = num;
request.SetMetadata(headers); request.SetMetadata(headers);
#if 0 #if 0
request.AddMetadata("x-hos-upload-type", "append"); request.AddMetadata("x-hos-upload-type", "append");
@@ -333,6 +346,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
} }
delete_info_by_fd(&hash_hos_info[thread_id], fd); delete_info_by_fd(&hash_hos_info[thread_id], fd);
fd_info[thread_id][fd] = 0;
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
} }
@@ -348,13 +362,13 @@ int hos_client_destory(hos_client_handle handle)
delete handle->S3Client; delete handle->S3Client;
Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets); Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets);
Aws::ShutdownAPI(*(handle->options)); Aws::ShutdownAPI((handle->options));
for (i = 0; i < handle->thread_sum; i++) for (i = 0; i < handle->thread_sum; i++)
{ {
delete_all(&hash_hos_info[i]); delete_all(&hash_hos_info[i]);
} }
//free(handle->options); //delete(handle->options);
free(handle); free(handle);
return HOS_CLIENT_OK; return HOS_CLIENT_OK;

View File

@@ -79,9 +79,10 @@ typedef void (*put_finished_callback)(bool, const char *, void *);
* 参数: const char *endpoint 目的地址如”http://192.168.44.12:9098/hos“ * 参数: const char *endpoint 目的地址如”http://192.168.44.12:9098/hos“
* const char *accesskeyid AWS access key ID如”default“ * const char *accesskeyid AWS access key ID如”default“
* const char *secretkey AWS secret key如”default“ * const char *secretkey AWS secret key如”default“
* size_t thread_sum 线程总数
* 返回值: 成功返回一个非空句柄失败返回NULL。失败原因都是因为输入参数为空 * 返回值: 成功返回一个非空句柄失败返回NULL。失败原因都是因为输入参数为空
*************************************************************************************/ *************************************************************************************/
hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_id); hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum);
/************************************************************************************* /*************************************************************************************
* 函数名: hos_create_bucket * 函数名: hos_create_bucket
* 参数: hos_client_handle handle 非空句柄 * 参数: hos_client_handle handle 非空句柄
@@ -100,8 +101,10 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket);
* 函数名: hos_upload_async * 函数名: hos_upload_async
* 参数: hos_client_handle handle 非空句柄 * 参数: hos_client_handle handle 非空句柄
* const char * bucket 桶名称 * const char * bucket 桶名称
* const char * object 上传对象名称 * const char * file_path 上传对象路径
* put_finished_callback callback upload操作结束时调用的回调函数 * put_finished_callback callback upload操作结束时调用的回调函数
* void *userdata 用户自定义数据
* size_t thread_id 当前线程id
* 返回值 int 成功返回0失败返回hoserros错误码 * 返回值 int 成功返回0失败返回hoserros错误码
*************************************************************************************/ *************************************************************************************/
int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, put_finished_callback callback, void* userdata, size_t thread_id); int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, put_finished_callback callback, void* userdata, size_t thread_id);
@@ -110,7 +113,11 @@ int hos_upload_file(hos_client_handle handle, const char *bucket, const char *fi
* 参数: hos_client_handle handle 非空句柄 * 参数: hos_client_handle handle 非空句柄
* const char * bucket 桶名称 * const char * bucket 桶名称
* const char * object 上传对象名称 * const char * object 上传对象名称
* const char *buf 上传的buf
* size_t buf_len 上传的buf的长度
* put_finished_callback callback upload操作结束时调用的回调函数 * put_finished_callback callback upload操作结束时调用的回调函数
* void *userdata 用户自定义数据
* size_t thread_id 当前线程id
* 返回值 int 成功返回0失败返回hoserros错误码 * 返回值 int 成功返回0失败返回hoserros错误码
*************************************************************************************/ *************************************************************************************/
int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id); int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id);
@@ -132,6 +139,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
* const char * stream 待上传的数据 * const char * stream 待上传的数据
* size_t stream 待上传的数据长度 * size_t stream 待上传的数据长度
* size_t thread_id 线程ID * size_t thread_id 线程ID
* size_t position append模式下的每段内容编号
* 返回值 int 成功返回0失败返回hoserros错误码 * 返回值 int 成功返回0失败返回hoserros错误码
*************************************************************************************/ *************************************************************************************/
int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position); int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position);