diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 659db6c5..f49bf9ff 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -25,20 +25,21 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; size_t append_size; size_t thread_sum; - Aws::SDKOptions *options; + Aws::SDKOptions options; Aws::Vector buckets; }hos_client_handle_t; #define MAX_THREAD_NUM 255 #define MAX_FD_NUM 65535 hos_info_t *hash_hos_info[MAX_THREAD_NUM]; +size_t fd_info[MAX_THREAD_NUM][MAX_FD_NUM]; -static size_t hash_get_min_free_fd(hos_info_t *handle) +static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; for (i = 1; i < MAX_FD_NUM; i++) { - if (!find_info_by_fd(handle, i)) + if (!fd_info[thread_id][i]) return i; } return 0; @@ -50,6 +51,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const std::shared_ptr& context) { const char *error = NULL; + hos_info_t *hos_info = NULL; bool result = outcome.IsSuccess(); if (!result) { @@ -58,8 +60,14 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::String& uuid = context->GetUUID(); size_t thread_id, fd; sscanf(uuid.c_str(), "%lu %lu", &thread_id, &fd); - hos_info_t *hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); - //put_finished_callback& callback = *(put_finished_callback *)hos_info->callback; + if (fd_info[thread_id][fd]) + { + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + } + if (hos_info == NULL) + { + return ; + } put_finished_callback callback = (put_finished_callback)hos_info->callback; callback(result, error, hos_info->userdata); if (hos_info->mode & APPEND_MODE) @@ -78,8 +86,8 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi { return NULL; } - //Aws::SDKOptions *options = (Aws::SDKOptions *)malloc(sizeof(Aws::SDKOptions)); Aws::SDKOptions options; + //auto options = new Aws::SDKOptions; Aws::InitAPI(options); hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); @@ -96,7 +104,7 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); handle->append_size = 30 * 1024 * 1024; handle->thread_sum = thread_sum; - handle->options = &options; + handle->options = options; /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); @@ -168,7 +176,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const { struct stat buffer; char buf[128]; - size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); + size_t fd = hash_get_min_free_fd(thread_id); if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum)) { @@ -211,6 +219,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, }; add_hos_info(&hash_hos_info[thread_id], &info); + fd_info[thread_id][fd] = 1; S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); return HOS_CLIENT_OK; @@ -235,7 +244,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_PARAMETER_ERROR; } - size_t fd = hash_get_min_free_fd(hash_hos_info[thread_id]); + size_t fd = hash_get_min_free_fd(thread_id); if (fd == 0) { return HOS_FD_NOT_ENOUGH; @@ -243,6 +252,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, }; add_hos_info(&hash_hos_info[thread_id], &info); + fd_info[thread_id][fd] = 1; return fd; } @@ -259,7 +269,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + if (fd_info[thread_id][fd]) + { + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + } if (hos_info == NULL) { return HOS_HASH_NOT_FIND; @@ -279,7 +292,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (hos_info->mode & APPEND_MODE) { headers["x-hos-upload-type"] = "append"; - headers["x-hos_position"] = num; + headers["x-hos-position"] = num; request.SetMetadata(headers); #if 0 request.AddMetadata("x-hos-upload-type", "append"); @@ -333,6 +346,7 @@ int hos_close_fd(size_t fd, size_t thread_id) } delete_info_by_fd(&hash_hos_info[thread_id], fd); + fd_info[thread_id][fd] = 0; return HOS_CLIENT_OK; } @@ -348,13 +362,13 @@ int hos_client_destory(hos_client_handle handle) delete handle->S3Client; Aws::Vector().swap(handle->buckets); - Aws::ShutdownAPI(*(handle->options)); + Aws::ShutdownAPI((handle->options)); for (i = 0; i < handle->thread_sum; i++) { delete_all(&hash_hos_info[i]); } - //free(handle->options); + //delete(handle->options); free(handle); return HOS_CLIENT_OK; diff --git a/src/hos_client.h b/src/hos_client.h index 961539ea..d1cb7db5 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -79,9 +79,10 @@ typedef void (*put_finished_callback)(bool, const char *, void *); * 参数: const char *endpoint 目的地址,如”http://192.168.44.12:9098/hos“ * const char *accesskeyid AWS access key ID,如”default“ * const char *secretkey AWS secret key,如”default“ + * size_t thread_sum 线程总数 * 返回值: 成功返回一个非空句柄,失败返回NULL。(失败原因都是因为输入参数为空) *************************************************************************************/ -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_id); +hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum); /************************************************************************************* * 函数名: hos_create_bucket * 参数: hos_client_handle handle 非空句柄 @@ -100,8 +101,10 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket); * 函数名: hos_upload_async * 参数: hos_client_handle handle 非空句柄 * const char * bucket 桶名称 - * const char * object 上传对象名称 + * const char * file_path 上传对象路径 * put_finished_callback callback upload操作结束时调用的回调函数 + * void *userdata 用户自定义数据 + * size_t thread_id 当前线程id * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, put_finished_callback callback, void* userdata, size_t thread_id); @@ -110,7 +113,11 @@ int hos_upload_file(hos_client_handle handle, const char *bucket, const char *fi * 参数: hos_client_handle handle 非空句柄 * const char * bucket 桶名称 * const char * object 上传对象名称 + * const char *buf 上传的buf + * size_t buf_len 上传的buf的长度 * put_finished_callback callback upload操作结束时调用的回调函数 + * void *userdata 用户自定义数据 + * size_t thread_id 当前线程id * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *object, const char *buf, size_t buf_len, put_finished_callback callback, void *userdata, size_t thread_id); @@ -132,6 +139,7 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object * const char * stream 待上传的数据 * size_t stream 待上传的数据长度 * size_t thread_id 线程ID + * size_t position append模式下的每段内容编号 * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id, size_t position);