diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index fb6a20e0..c8f32e32 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.5) set(CMAKE_BUILD_TYPE Debug) -project(singleThread) +project(hos_write_complete) SET(CMAKE_BUILD_TYPE Debug) link_directories(/usr/local/lib64/) @@ -8,6 +8,6 @@ link_directories(/opt/MESA/lib/) link_libraries(hos-client-cpp) include_directories(/opt/MESA/include) -add_executable(singleThread single_thread.cpp) -target_link_libraries(singleThread hos-client-cpp) +add_executable(hos_write_complete hos_write_complete.cpp) +target_link_libraries(hos_write_complete hos-client-cpp) diff --git a/example/data/test_size.sh b/example/data/test_size.sh index 7dd3b3d1..6a3c3cf7 100755 --- a/example/data/test_size.sh +++ b/example/data/test_size.sh @@ -9,7 +9,8 @@ num=0 while((${num} < 7)) do - ./singleThread mybucket ${test_size[$num]}.data 1000 + echo ./hos_write_complete mybucket ${test_size[$num]}.data 1000 + ./hos_write_complete mybucket ${test_size[$num]}.data 1000 let "num++" done diff --git a/example/hos_upload_complete.cpp b/example/hos_upload_complete.cpp new file mode 100644 index 00000000..f47523bb --- /dev/null +++ b/example/hos_upload_complete.cpp @@ -0,0 +1,153 @@ +/************************************************************************* + > File Name: single_thread.cpp + > Author: pxz + > Created Time: Fri 11 Sep 2020 09:52:05 AM CST + ************************************************************************/ +extern "C" +{ +#include +#include +#include +#include +#include +} +#include"../src/hos_client.h" + +//#define test_times 10000 + +#define debuginfo (void) + +typedef struct userdata_s +{ + struct timespec *finished; +}userdata_t; + +static size_t calc_time(struct timespec start, struct timespec end) +{ + return (end.tv_sec * 1000 * 1000 * 1000 + end.tv_nsec - + (start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec)); +} + +int file_to_buffer(const char *file, char *buffer, size_t *len) +{ + FILE *fp = fopen(file, "r"); + int num = 0; + *len = 0; + if (fp == NULL) + { + debuginfo("fopen file failed:%s\n", file); + return -1; + } + do{ + num = fread(&buffer[*len], 1, 4096, fp); + if (num < 0) + { + return -1; + } + *len += num; + }while(num == 4096); + fclose(fp); + return 0; +} + +void callback(bool result, const char *error, void *userdata) +{ + userdata_t *data = (userdata_t *)userdata; + clock_gettime(CLOCK_MONOTONIC, data->finished); + return ; +} + +int main(int argc, char *argv[]) +{ + if (argc != 4) + { + debuginfo("usege: singThread [bucket name] [object name]\n"); + return -1; + } + struct timespec start, end, finished; + size_t time; + int i = 0; + char *bucket = argv[1]; + char *object = argv[2]; + int test_times = atoi(argv[3]); + //int test_times = 10000; + //char *buf = (char *)malloc(1024 * 1024 * 4); + char buf[1024 * 1024 * 4]; + //char buf[1024 * 4]; + size_t buf_size; + int mode = FILE_MODE; + size_t fd[10000] = {0}; + userdata_t data = {&finished}; + + file_to_buffer(object, buf, &buf_size); + + debuginfo("hos_client_init start ...\n"); + hos_client_handle handle = hos_client_create("http://192.168.44.10:9098/hos/", "default", "default", 4); + if (handle == NULL) + { + debuginfo("error:hos_client_handle\n"); + return -1; + } + debuginfo("hos_client_init success ... \n"); + + debuginfo("hos_create_bucket start ... \n"); + if(hos_create_bucket(handle, bucket)) + { + debuginfo("hos_create_bucket failed ... \n"); + return -1; + } + debuginfo("hos_create_bucket success ... \n"); + + debuginfo("hos_verify_bucket start ... \n"); + if(!hos_verify_bucket(handle, bucket)) + { + debuginfo("hos_verify_bucket failed ... \n"); + return -1; + } + debuginfo("hos_verify_bucket success ... \n"); + +#if 0 + fd = hos_open_fd(handle, bucket, object, callback, (void *)&data, 0, mode); + debuginfo("hos_upload_file start ...\n"); + clock_gettime(CLOCK_MONOTONIC, &start); + for (i = 0; i < test_times; i++) + { + hos_write(fd, object, 0, 0); + } + clock_gettime(CLOCK_MONOTONIC, &end); + time = calc_time(start, end); + time /= test_times; + printf("hos_upload_file spent %llu ns\n", time); + debuginfo("hos_upload_file end ...\n"); +#else + + mode = BUFF_MODE; + for (i = 0; i < 10000; i++) + { + fd[i] = hos_open_fd(handle, bucket, object, callback, (void *)&data, 0, mode); + } + debuginfo("hos_upload_buf start ...\n"); + clock_gettime(CLOCK_MONOTONIC, &start); + for (i = 0; i < test_times; i++) + { + hos_write(fd[i], buf, buf_size, 0); + } + clock_gettime(CLOCK_MONOTONIC, &end); + time = calc_time(start, end); + time /= test_times; + printf("hos_upload_buf spent %llu ns\n", time); + debuginfo("hos_upload_buf end ...\n"); + +#endif + debuginfo("hos_client_close start ...\n"); + if (hos_client_destory(handle) == 0) + { + time = calc_time(start, finished); + time /= test_times; + printf("hos upload finished spent %llu ns\n", time); + } + + debuginfo("hos_client_close end ...\n"); + + return 0; +} diff --git a/example/hos_write_complete.cpp b/example/hos_write_complete.cpp new file mode 100644 index 00000000..f47523bb --- /dev/null +++ b/example/hos_write_complete.cpp @@ -0,0 +1,153 @@ +/************************************************************************* + > File Name: single_thread.cpp + > Author: pxz + > Created Time: Fri 11 Sep 2020 09:52:05 AM CST + ************************************************************************/ +extern "C" +{ +#include +#include +#include +#include +#include +} +#include"../src/hos_client.h" + +//#define test_times 10000 + +#define debuginfo (void) + +typedef struct userdata_s +{ + struct timespec *finished; +}userdata_t; + +static size_t calc_time(struct timespec start, struct timespec end) +{ + return (end.tv_sec * 1000 * 1000 * 1000 + end.tv_nsec - + (start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec)); +} + +int file_to_buffer(const char *file, char *buffer, size_t *len) +{ + FILE *fp = fopen(file, "r"); + int num = 0; + *len = 0; + if (fp == NULL) + { + debuginfo("fopen file failed:%s\n", file); + return -1; + } + do{ + num = fread(&buffer[*len], 1, 4096, fp); + if (num < 0) + { + return -1; + } + *len += num; + }while(num == 4096); + fclose(fp); + return 0; +} + +void callback(bool result, const char *error, void *userdata) +{ + userdata_t *data = (userdata_t *)userdata; + clock_gettime(CLOCK_MONOTONIC, data->finished); + return ; +} + +int main(int argc, char *argv[]) +{ + if (argc != 4) + { + debuginfo("usege: singThread [bucket name] [object name]\n"); + return -1; + } + struct timespec start, end, finished; + size_t time; + int i = 0; + char *bucket = argv[1]; + char *object = argv[2]; + int test_times = atoi(argv[3]); + //int test_times = 10000; + //char *buf = (char *)malloc(1024 * 1024 * 4); + char buf[1024 * 1024 * 4]; + //char buf[1024 * 4]; + size_t buf_size; + int mode = FILE_MODE; + size_t fd[10000] = {0}; + userdata_t data = {&finished}; + + file_to_buffer(object, buf, &buf_size); + + debuginfo("hos_client_init start ...\n"); + hos_client_handle handle = hos_client_create("http://192.168.44.10:9098/hos/", "default", "default", 4); + if (handle == NULL) + { + debuginfo("error:hos_client_handle\n"); + return -1; + } + debuginfo("hos_client_init success ... \n"); + + debuginfo("hos_create_bucket start ... \n"); + if(hos_create_bucket(handle, bucket)) + { + debuginfo("hos_create_bucket failed ... \n"); + return -1; + } + debuginfo("hos_create_bucket success ... \n"); + + debuginfo("hos_verify_bucket start ... \n"); + if(!hos_verify_bucket(handle, bucket)) + { + debuginfo("hos_verify_bucket failed ... \n"); + return -1; + } + debuginfo("hos_verify_bucket success ... \n"); + +#if 0 + fd = hos_open_fd(handle, bucket, object, callback, (void *)&data, 0, mode); + debuginfo("hos_upload_file start ...\n"); + clock_gettime(CLOCK_MONOTONIC, &start); + for (i = 0; i < test_times; i++) + { + hos_write(fd, object, 0, 0); + } + clock_gettime(CLOCK_MONOTONIC, &end); + time = calc_time(start, end); + time /= test_times; + printf("hos_upload_file spent %llu ns\n", time); + debuginfo("hos_upload_file end ...\n"); +#else + + mode = BUFF_MODE; + for (i = 0; i < 10000; i++) + { + fd[i] = hos_open_fd(handle, bucket, object, callback, (void *)&data, 0, mode); + } + debuginfo("hos_upload_buf start ...\n"); + clock_gettime(CLOCK_MONOTONIC, &start); + for (i = 0; i < test_times; i++) + { + hos_write(fd[i], buf, buf_size, 0); + } + clock_gettime(CLOCK_MONOTONIC, &end); + time = calc_time(start, end); + time /= test_times; + printf("hos_upload_buf spent %llu ns\n", time); + debuginfo("hos_upload_buf end ...\n"); + +#endif + debuginfo("hos_client_close start ...\n"); + if (hos_client_destory(handle) == 0) + { + time = calc_time(start, finished); + time /= test_times; + printf("hos upload finished spent %llu ns\n", time); + } + + debuginfo("hos_client_close end ...\n"); + + return 0; +} diff --git a/example/test_size.sh b/example/test_size.sh index dc11d9a0..3884091e 100755 --- a/example/test_size.sh +++ b/example/test_size.sh @@ -6,10 +6,11 @@ #!/bin/bash test_size=("1k" "10k" "100k" "1M" "2M" "3M" "4M") num=0 +echo ${test_size[${num}]} while((${num} < 7)) do - ./singleThread mybucket ./data//${test_size[$num]}.data 1000 + ./hos_write_complete mybucket ./data/${test_size[$num]}.data 1000 let "num++" done diff --git a/example/test_times.sh b/example/test_times.sh index cc74136f..6e1adbc4 100755 --- a/example/test_times.sh +++ b/example/test_times.sh @@ -9,7 +9,7 @@ num=0 while((${num} < 5)) do - ./singleThread mybucket my-file.txt ${test_times[$num]} + ./hos_write_complete mybucket my-file.txt ${test_times[$num]} let "num++" done diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 7c19b8d6..797a075d 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -12,6 +12,7 @@ extern "C" #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; size_t append_size; size_t thread_sum; + Aws::SDKOptions *options; Aws::Vector buckets; }hos_client_handle_t; @@ -60,6 +62,14 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, //put_finished_callback& callback = *(put_finished_callback *)hos_info->callback; put_finished_callback callback = (put_finished_callback)hos_info->callback; callback(result, error, hos_info->userdata); + if (hos_info->mode & APPEND_MODE) + { + //APPEND MODE 保留fd + }else + { + //完整上传 删除fd + hos_close_fd(fd, thread_id); + } } hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t thread_sum) @@ -68,6 +78,7 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi { return NULL; } + //Aws::SDKOptions *options = (Aws::SDKOptions *)malloc(sizeof(Aws::SDKOptions)); Aws::SDKOptions options; Aws::InitAPI(options); @@ -79,10 +90,16 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.endpointOverride = endpoint; config.verifySSL = false; config.enableEndpointDiscovery = true; + //std::shared_ptr pooled_thread = Aws::MakeShared("ClientConfigration"); + //std::shared_ptr test = std::make_shared(1000); + //config.executor(Aws::MakeShared("ClientConfiguration")); + //config.executor = std::dynamic_pointer_cast(config.executor); + config.executor = std::shared_ptr(std::make_shared(100)); handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); handle->append_size = 30 * 1024 * 1024; handle->thread_sum = thread_sum; + handle->options = &options; /* 获取当前用户的所有的buckets */ Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); @@ -305,7 +322,7 @@ int hos_close_fd(size_t fd, size_t thread_id) return HOS_PARAMETER_ERROR; } - delete_info_by_fd(hash_hos_info[thread_id], fd); + delete_info_by_fd(&hash_hos_info[thread_id], fd); return HOS_CLIENT_OK; } @@ -320,11 +337,14 @@ int hos_client_destory(hos_client_handle handle) delete handle->S3Client; + Aws::Vector().swap(handle->buckets); + Aws::ShutdownAPI(*(handle->options)); + for (i = 0; i < handle->thread_sum; i++) { - delete_all(hash_hos_info[i]); + delete_all(&hash_hos_info[i]); } - + //free(handle->options); free(handle); return HOS_CLIENT_OK; diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index 5bbc7b5b..592f041b 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -17,6 +17,8 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) } else { + value->mode = input->mode; + value->handle = input->handle; value->bucket = input->bucket; value->object = input->object; value->callback = input->callback; @@ -31,22 +33,22 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd) return value; } -void delete_info_by_fd(hos_info_t *handle, size_t fd) +void delete_info_by_fd(hos_info_t **handle, size_t fd) { hos_info_t *value = NULL; - HASH_FIND_INT(handle, &fd, value); + HASH_FIND_INT(*handle, &fd, value); if (value) { - HASH_DEL(handle, value); + HASH_DEL(*handle, value); free(value); } } -void delete_all(hos_info_t *handle) +void delete_all(hos_info_t **handle) { hos_info_t *current, *tmp; - HASH_ITER(hh, handle, current, tmp) + HASH_ITER(hh, *handle, current, tmp) { - HASH_DEL(handle, current); + HASH_DEL(*handle, current); } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 66e00f47..7daf1b91 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -22,6 +22,6 @@ typedef struct hos_info_s void add_hos_info(hos_info_t **handle, hos_info_t *input); hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd); -void delete_info_by_fd(hos_info_t *handle, size_t fd); -void delete_all(hos_info_t *handle); +void delete_info_by_fd(hos_info_t **handle, size_t fd); +void delete_all(hos_info_t **handle); #endif