diff --git a/cmake/Package.cmake b/cmake/Package.cmake index 5b2d61c5..dce310d7 100644 --- a/cmake/Package.cmake +++ b/cmake/Package.cmake @@ -14,8 +14,8 @@ set(CPACK_PACKAGING_INSTALL_PREFIX ${CMAKE_INSTALL_PREFIX}) set(CPACK_PACKAGE_VERSION "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}.${VERSION_BUILD}") execute_process(COMMAND bash -c "echo -ne \"`uname -r | awk -F'.' '{print $5\".\"$6\".\"$7}'`\"" OUTPUT_VARIABLE SYSTEM_VERSION) -execute_process(COMMAND sh changelog.sh WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/cmake) -SET(CPACK_RPM_CHANGELOG_FILE ${PROJECT_SOURCE_DIR}/cmake/changelog.txt) +#execute_process(COMMAND sh changelog.sh WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/cmake) +#SET(CPACK_RPM_CHANGELOG_FILE ${PROJECT_SOURCE_DIR}/cmake/changelog.txt) # RPM Build set(CPACK_GENERATOR "RPM") diff --git a/example/demo/hos_upload_complete.cpp b/example/demo/hos_upload_complete.cpp index c53f50e8..bb76db52 100644 --- a/example/demo/hos_upload_complete.cpp +++ b/example/demo/hos_upload_complete.cpp @@ -50,7 +50,7 @@ int file_to_buffer(const char *file, char *buffer, size_t *len) return 0; } -void callback(bool result, const char *error, void *userdata) +void callback(bool result, const char *error, const char *bucket, const char *object, void *userdata) { userdata_t *data = (userdata_t *)userdata; clock_gettime(CLOCK_MONOTONIC, data->finished); diff --git a/example/demo/hos_write_complete.cpp b/example/demo/hos_write_complete.cpp index 00a78109..e78ed694 100644 --- a/example/demo/hos_write_complete.cpp +++ b/example/demo/hos_write_complete.cpp @@ -50,7 +50,7 @@ int file_to_buffer(const char *file, char *buffer, size_t *len) return 0; } -void callback(bool result, const char *error, void *userdata) +void callback(bool result, const char *error, const char *bucket, const char *object, void *userdata) { userdata_t *data = (userdata_t *)userdata; clock_gettime(CLOCK_MONOTONIC, data->finished); @@ -80,7 +80,7 @@ int main(int argc, char *argv[]) hos_init_api(); debuginfo("hos_client_init start ...\n"); - hos_client_handle handle = hos_client_create("http://192.168.40.223:9098/hos/", "default", "default", 4000); + hos_client_handle handle = hos_client_create("http://192.168.40.223:9098/hos/", "default", "default", 400); //hos_client_handle handle = hos_client_create("http://192.168.32.10:9098/hos/", "default", "default", 4); if (handle == NULL) { diff --git a/example/performance/CMakeLists.txt b/example/performance/CMakeLists.txt index 1eea1e3a..58d2c7cf 100644 --- a/example/performance/CMakeLists.txt +++ b/example/performance/CMakeLists.txt @@ -8,6 +8,24 @@ link_directories(/opt/MESA/lib/) link_libraries(hos-client-cpp pthread) include_directories(/opt/MESA/include) +#for ASAN +set(ASAN_OPTION "OFF" CACHE STRING " set asan type chosen by the user, using OFF as default") +set_property(CACHE ASAN_OPTION PROPERTY STRINGS OFF ADDRESS THREAD) +message(STATUS "ASAN_OPTION='${ASAN_OPTION}'") + +if(ASAN_OPTION MATCHES "ADDRESS") + set(CMAKE_C_FLAGS "${CMAKADDRESS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") +elseif(ASAN_OPTION MATCHES "THREAD") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") +endif() +# end of for ASAN + add_executable(HosClientPerformance HosClientPerformance.cpp) target_link_libraries(HosClientPerformance hos-client-cpp) diff --git a/example/performance/HosClientPerformance.cpp b/example/performance/HosClientPerformance.cpp index e2aa7b23..3cd88ef8 100644 --- a/example/performance/HosClientPerformance.cpp +++ b/example/performance/HosClientPerformance.cpp @@ -102,7 +102,7 @@ int read_file_list(const char *path, char file_name[][256]) return 0; } -static void callback(bool result, const char *error, void *userdata) +static void callback(bool result, const char *error, const char *bucket, const char *object, void *userdata) { #if 0 userdata_t *data = (userdata_t *)userdata; @@ -541,6 +541,7 @@ int main(int argc, char *argv[]) hos_set_thread_sum(handle, conf.thread_sum); hos_set_cache_size(handle, conf.append_size); + hos_set_cache_times(handle, 0); //创建bucket if (hos_create_bucket(handle, conf.bucket)) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2228c01c..d4fa97b0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,6 +12,24 @@ add_library(${lib_name}_shared SHARED hos_client.cpp hos_hash.cpp) target_link_libraries(${lib_name}_shared libaws-cpp-sdk-s3.so libaws-cpp-sdk-core.so libMESA_field_stat2.so libcurl.so) set_target_properties(${lib_name}_shared PROPERTIES OUTPUT_NAME ${lib_name}) +#for ASAN +set(ASAN_OPTION "OFF" CACHE STRING " set asan type chosen by the user, using OFF as default") +set_property(CACHE ASAN_OPTION PROPERTY STRINGS OFF ADDRESS THREAD) +message(STATUS "ASAN_OPTION='${ASAN_OPTION}'") + +if(ASAN_OPTION MATCHES "ADDRESS") + set(CMAKE_C_FLAGS "${CMAKADDRESS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") +elseif(ASAN_OPTION MATCHES "THREAD") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan") +endif() +# end of for ASAN + add_dependencies(${lib_name}_shared aws-sdk-cpp-master-static) #install(TARGETS ${lib_name}_shared LIBRARY DESTINATION /opt/MESA/lib) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 5d251aeb..27cced9a 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -30,10 +30,13 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; Aws::SDKOptions options; Aws::Vector buckets; + pthread_t fd_thread; + int fd_thread_status; /* options */ size_t cache_size; size_t cache_times; size_t thread_sum; + size_t timeout; /* expand */ screen_stat_handle_t fs2_handle; pthread_t fs2_thread; @@ -52,10 +55,18 @@ typedef struct hos_client_handle_s int *rx_bytes_last; }hos_client_handle_t; +hos_client_handle hos_handle;//一个进程只允许有一个hos_handle hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; Aws::SDKOptions options; +static inline size_t get_current_ms() +{ + struct timespec timenow; + clock_gettime(CLOCK_MONOTONIC, &timenow); + return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); +} + static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; @@ -67,6 +78,18 @@ static size_t hash_get_min_free_fd(size_t thread_id) return 0; } +static int hos_delete_fd(size_t fd, size_t thread_id) +{ + if (fd == 0) + { + return HOS_PARAMETER_ERROR; + } + delete_info_by_fd(&hash_hos_info[thread_id], fd); + fd_info[thread_id][fd] = 0; + + return HOS_CLIENT_OK; +} + static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::S3::Model::PutObjectRequest& request, const Aws::S3::Model::PutObjectOutcome& outcome, @@ -91,14 +114,23 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, return ; } put_finished_callback callback = (put_finished_callback)hos_info->callback; - callback(result, error, hos_info->userdata); + callback(result, hos_info->bucket, hos_info->object, error, hos_info->userdata); if (hos_info->mode & APPEND_MODE) { //APPEND MODE 保留fd + hos_info->recive_cnt++; +#if 0 + if (hos_info->fd_status == HOS_FD_INJECT) + { + if (hos_info->recive_cnt == hos_info->position) + hos_delete_fd(fd, thread_id); + } +#endif }else { //完整上传 删除fd - hos_close_fd(fd, thread_id); + //hos_delete_fd(fd, thread_id); + hos_info->fd_status = HOS_FD_INJECT; } } @@ -139,9 +171,12 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi return NULL; } - - hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); - memset(handle, 0, sizeof(hos_client_handle_t)); + if (hos_handle) + { + return hos_handle; + } + hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); + memset(hos_handle, 0, sizeof(hos_client_handle_t)); Aws::Client::ClientConfiguration config; Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); @@ -151,21 +186,23 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.enableEndpointDiscovery = true; config.executor = std::shared_ptr(std::make_shared(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池 - handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - handle->options = options; + hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + hos_handle->options = options; /* 获取当前用户的所有的buckets */ - Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + Aws::S3::Model::ListBucketsOutcome outcome = hos_handle->S3Client->ListBuckets(); - if (outcome.IsSuccess()) + if (!outcome.IsSuccess()) { - handle->buckets = outcome.GetResult().GetBuckets(); + return NULL; } - handle->cache_size = 0; - handle->cache_times = 1; - handle->thread_sum = 1; + hos_handle->buckets = outcome.GetResult().GetBuckets(); + hos_handle->cache_size = 0; + hos_handle->cache_times = 1; + hos_handle->thread_sum = 1; + hos_handle->timeout = 1000; - return handle; + return hos_handle; } static void *fs2_statistics(void *ptr) @@ -420,7 +457,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; + hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; @@ -440,6 +477,37 @@ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *obj return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); } +static void *hos_fd_manage(void *ptr) +{ + hos_info_t *hos_info; + hos_client_handle handle = (hos_client_handle)ptr; + size_t thread_sum = handle->thread_sum; + size_t thread_num; + size_t fd; + while(1) + { + if (handle->fd_thread_status) + break; + for (thread_num = 0; thread_num < thread_sum; thread_num++) + { + for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++) + { + if (!fd_info[thread_num][fd]) + break; + hos_info = find_info_by_fd(hash_hos_info[thread_num], fd); + if (!hos_info) + break; + if (hos_info->fd_status == HOS_FD_REGISTER) + continue; + if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) + hos_delete_fd(fd, thread_num); + } + } + usleep(1000); + } + pthread_exit(NULL); +} + int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum)) @@ -453,10 +521,16 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, }; + hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; - +#if 1 + if (handle->fd_thread == 0) + { + handle->fd_thread_status = 0; + pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle); + } +#endif return fd; } @@ -495,10 +569,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id Aws::S3::S3Client& S3Client = *(handle->S3Client); - // Create and configure the asynchronous put object request. + // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - request.SetBucket(hos_info->bucket); - request.SetKey(hos_info->object); //设置上传数据类型 if (hos_info->mode & BUFF_MODE) @@ -511,27 +583,53 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { hos_info->cache = Aws::MakeShared("append mode"); } - if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + if (hos_info->cache_times == 0) { - // cache - Aws::String buffer (stream, stream_len); - hos_info->cache_rest -= stream_len; - if (hos_info->cache_rest > 0) + //不设置cache_times的情况下 + if (stream_len < hos_info->cache_rest) { - return HOS_CLIENT_OK; + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len >= hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; } - }else if (stream_len > hos_info->cache_rest) + }else { - // multi handle - flag = 1; - Aws::String buffer (stream, hos_info->cache_rest); - *hos_info->cache << buffer; - rest = stream_len - hos_info->cache_rest; - }else - { - //over cache_times - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; + //设置cache times的情况下 + if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len > hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; + }else + { + //over cache_times + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + } } request.SetBody(hos_info->cache); @@ -565,6 +663,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(input_data); } + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + //设置回调函数 std::shared_ptr context = Aws::MakeShared(""); @@ -612,13 +713,54 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id int hos_close_fd(size_t fd, size_t thread_id) { + hos_info_t *hos_info; + char num[128]; + char buf[128]; if (fd == 0) { return HOS_PARAMETER_ERROR; } + if (fd_info[thread_id][fd]) + { + hos_info = find_info_by_fd(hash_hos_info[thread_id], fd); + } + if (hos_info == NULL) + { + return HOS_CLIENT_OK; + } - delete_info_by_fd(&hash_hos_info[thread_id], fd); - fd_info[thread_id][fd] = 0; + //close fd 之前发送append的缓存中内容 + if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE)) + { + if (hos_info->cache_rest != hos_info->handle->cache_size) + { + //handle = (hos_client_handle)hos_info->handle; + Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client); + + // Create and configure the asynchronous put object request. + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + request.SetBody(hos_info->cache); + + // add headers + snprintf(num, 128, "%lu", ++hos_info->position); + Aws::Map headers; + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + + std::shared_ptr context = + Aws::MakeShared(""); + sprintf(buf, "%lu %lu", thread_id, fd); + context->SetUUID(buf); + + S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + } + } + hos_info->fd_status = HOS_FD_INJECT; + hos_info->cache.reset(); + hos_info->overtime = get_current_ms() + hos_info->timeout; return HOS_CLIENT_OK; } @@ -635,6 +777,11 @@ int hos_client_destory(hos_client_handle handle) Aws::Vector().swap(handle->buckets); + if (handle->fd_thread) + { + handle->fd_thread_status = 1; + pthread_join(handle->fd_thread, NULL); + } for (i = 0; i < handle->thread_sum; i++) { delete_all(&hash_hos_info[i]); diff --git a/src/hos_client.h b/src/hos_client.h index 438b5be7..a53d23b5 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -73,7 +73,7 @@ enum s3errors OBJECT_NOT_IN_ACTIVE_TIER }; -typedef void (*put_finished_callback)(bool, const char *, void *); +typedef void (*put_finished_callback)(bool, const char *, const char *, const char *, void *); /************************************************************************************* diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index f6bc8838..c10e9494 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -13,20 +13,28 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) { value = (hos_info_t *)malloc(sizeof(hos_info_t)); memcpy(value, input, sizeof(hos_info_t)); + value->object = (char *)malloc(strlen(input->object) + 1); + value->bucket = (char *)malloc(strlen(input->bucket) + 1); + memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1); + memcpy(value->object, input->object, strlen(input->object) + 1); HASH_ADD_INT(*handle, fd, value); } else { value->mode = input->mode; value->handle = input->handle; - value->bucket = input->bucket; - value->object = input->object; + memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1); + memcpy(value->object, input->object, strlen(input->object) + 1); value->callback = input->callback; value->userdata = input->userdata; value->cache = input->cache; value->cache_times = input->cache_times; value->cache_rest = input->cache_rest; value->position = input->position; + value->recive_cnt = input->recive_cnt; + value->fd_status = value->fd_status; + value->overtime = value->overtime; + value->timeout = value->timeout; } } @@ -40,9 +48,18 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd) void delete_info_by_fd(hos_info_t **handle, size_t fd) { hos_info_t *value = NULL; + HASH_FIND_INT(*handle, &fd, value); if (value) { + if (value->bucket) + { + free(value->bucket); + } + if (value->object) + { + free(value->object); + } HASH_DEL(*handle, value); free(value); } @@ -53,6 +70,15 @@ void delete_all(hos_info_t **handle) hos_info_t *current, *tmp; HASH_ITER(hh, *handle, current, tmp) { + if (current->bucket) + { + free(current->bucket); + } + if (current->object) + { + free(current->object); + } HASH_DEL(*handle, current); + free(current); } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 51e2a6b7..4bdf7b0c 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -15,15 +15,20 @@ typedef struct hos_info_s size_t fd; int mode; hos_client_handle handle; - const char *bucket; - const char *object; + char *bucket; + char *object; void *callback; void *userdata; std::shared_ptr cache; - //void *cache; size_t cache_times; size_t cache_rest; size_t position; + size_t recive_cnt; + int fd_status; +#define HOS_FD_REGISTER 0 +#define HOS_FD_INJECT 1 + size_t overtime; //计算后超时的时间 + size_t timeout; //配置的超时时间,从status变成INJECT开始计时 UT_hash_handle hh; }hos_info_t;