From 5df452205030b36e5fe76eec69c7cd32229a5697 Mon Sep 17 00:00:00 2001 From: pengxuanzheng Date: Wed, 11 Nov 2020 11:20:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A0=86=E6=A0=88=E7=A9=BA?= =?UTF-8?q?=E9=97=B4=E4=BD=BF=E7=94=A8=E4=B8=8D=E5=BD=93=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E7=9A=84object=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_client.cpp | 184 +++++++++++++++++++++++++++++++++++---------- src/hos_hash.cpp | 30 +++++++- src/hos_hash.h | 11 ++- 3 files changed, 181 insertions(+), 44 deletions(-) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 9b1fa454..27cced9a 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -30,10 +30,13 @@ typedef struct hos_client_handle_s Aws::S3::S3Client *S3Client; Aws::SDKOptions options; Aws::Vector buckets; + pthread_t fd_thread; + int fd_thread_status; /* options */ size_t cache_size; size_t cache_times; size_t thread_sum; + size_t timeout; /* expand */ screen_stat_handle_t fs2_handle; pthread_t fs2_thread; @@ -52,10 +55,18 @@ typedef struct hos_client_handle_s int *rx_bytes_last; }hos_client_handle_t; +hos_client_handle hos_handle;//一个进程只允许有一个hos_handle hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; Aws::SDKOptions options; +static inline size_t get_current_ms() +{ + struct timespec timenow; + clock_gettime(CLOCK_MONOTONIC, &timenow); + return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); +} + static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; @@ -67,6 +78,18 @@ static size_t hash_get_min_free_fd(size_t thread_id) return 0; } +static int hos_delete_fd(size_t fd, size_t thread_id) +{ + if (fd == 0) + { + return HOS_PARAMETER_ERROR; + } + delete_info_by_fd(&hash_hos_info[thread_id], fd); + fd_info[thread_id][fd] = 0; + + return HOS_CLIENT_OK; +} + static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, const Aws::S3::Model::PutObjectRequest& request, const Aws::S3::Model::PutObjectOutcome& outcome, @@ -95,10 +118,19 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, if (hos_info->mode & APPEND_MODE) { //APPEND MODE 保留fd + hos_info->recive_cnt++; +#if 0 + if (hos_info->fd_status == HOS_FD_INJECT) + { + if (hos_info->recive_cnt == hos_info->position) + hos_delete_fd(fd, thread_id); + } +#endif }else { //完整上传 删除fd - hos_close_fd(fd, thread_id); + //hos_delete_fd(fd, thread_id); + hos_info->fd_status = HOS_FD_INJECT; } } @@ -139,9 +171,12 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi return NULL; } - - hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); - memset(handle, 0, sizeof(hos_client_handle_t)); + if (hos_handle) + { + return hos_handle; + } + hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); + memset(hos_handle, 0, sizeof(hos_client_handle_t)); Aws::Client::ClientConfiguration config; Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); @@ -151,21 +186,23 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi config.enableEndpointDiscovery = true; config.executor = std::shared_ptr(std::make_shared(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池 - handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); - handle->options = options; + hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + hos_handle->options = options; /* 获取当前用户的所有的buckets */ - Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); + Aws::S3::Model::ListBucketsOutcome outcome = hos_handle->S3Client->ListBuckets(); - if (outcome.IsSuccess()) + if (!outcome.IsSuccess()) { - handle->buckets = outcome.GetResult().GetBuckets(); + return NULL; } - handle->cache_size = 0; - handle->cache_times = 1; - handle->thread_sum = 1; + hos_handle->buckets = outcome.GetResult().GetBuckets(); + hos_handle->cache_size = 0; + hos_handle->cache_times = 1; + hos_handle->thread_sum = 1; + hos_handle->timeout = 1000; - return handle; + return hos_handle; } static void *fs2_statistics(void *ptr) @@ -420,7 +457,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; + hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; @@ -440,6 +477,37 @@ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *obj return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); } +static void *hos_fd_manage(void *ptr) +{ + hos_info_t *hos_info; + hos_client_handle handle = (hos_client_handle)ptr; + size_t thread_sum = handle->thread_sum; + size_t thread_num; + size_t fd; + while(1) + { + if (handle->fd_thread_status) + break; + for (thread_num = 0; thread_num < thread_sum; thread_num++) + { + for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++) + { + if (!fd_info[thread_num][fd]) + break; + hos_info = find_info_by_fd(hash_hos_info[thread_num], fd); + if (!hos_info) + break; + if (hos_info->fd_status == HOS_FD_REGISTER) + continue; + if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) + hos_delete_fd(fd, thread_num); + } + } + usleep(1000); + } + pthread_exit(NULL); +} + int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum)) @@ -453,10 +521,16 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, }; + hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; add_hos_info(&hash_hos_info[thread_id], &info); fd_info[thread_id][fd] = 1; - +#if 1 + if (handle->fd_thread == 0) + { + handle->fd_thread_status = 0; + pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle); + } +#endif return fd; } @@ -495,10 +569,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id Aws::S3::S3Client& S3Client = *(handle->S3Client); - // Create and configure the asynchronous put object request. + // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - request.SetBucket(hos_info->bucket); - request.SetKey(hos_info->object); //设置上传数据类型 if (hos_info->mode & BUFF_MODE) @@ -511,27 +583,53 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { hos_info->cache = Aws::MakeShared("append mode"); } - if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + if (hos_info->cache_times == 0) { - // cache - Aws::String buffer (stream, stream_len); - hos_info->cache_rest -= stream_len; - if (hos_info->cache_rest > 0) + //不设置cache_times的情况下 + if (stream_len < hos_info->cache_rest) { - return HOS_CLIENT_OK; + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len >= hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; } - }else if (stream_len > hos_info->cache_rest) + }else { - // multi handle - flag = 1; - Aws::String buffer (stream, hos_info->cache_rest); - *hos_info->cache << buffer; - rest = stream_len - hos_info->cache_rest; - }else - { - //over cache_times - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; + //设置cache times的情况下 + if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + if (hos_info->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + }else if (stream_len > hos_info->cache_rest) + { + // multi handle + flag = 1; + Aws::String buffer (stream, hos_info->cache_rest); + *hos_info->cache << buffer; + rest = stream_len - hos_info->cache_rest; + }else + { + //over cache_times + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + } } request.SetBody(hos_info->cache); @@ -565,6 +663,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(input_data); } + request.SetBucket(hos_info->bucket); + request.SetKey(hos_info->object); + //设置回调函数 std::shared_ptr context = Aws::MakeShared(""); @@ -657,9 +758,9 @@ int hos_close_fd(size_t fd, size_t thread_id) S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); } } - - delete_info_by_fd(&hash_hos_info[thread_id], fd); - fd_info[thread_id][fd] = 0; + hos_info->fd_status = HOS_FD_INJECT; + hos_info->cache.reset(); + hos_info->overtime = get_current_ms() + hos_info->timeout; return HOS_CLIENT_OK; } @@ -676,6 +777,11 @@ int hos_client_destory(hos_client_handle handle) Aws::Vector().swap(handle->buckets); + if (handle->fd_thread) + { + handle->fd_thread_status = 1; + pthread_join(handle->fd_thread, NULL); + } for (i = 0; i < handle->thread_sum; i++) { delete_all(&hash_hos_info[i]); diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index f6bc8838..c10e9494 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -13,20 +13,28 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) { value = (hos_info_t *)malloc(sizeof(hos_info_t)); memcpy(value, input, sizeof(hos_info_t)); + value->object = (char *)malloc(strlen(input->object) + 1); + value->bucket = (char *)malloc(strlen(input->bucket) + 1); + memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1); + memcpy(value->object, input->object, strlen(input->object) + 1); HASH_ADD_INT(*handle, fd, value); } else { value->mode = input->mode; value->handle = input->handle; - value->bucket = input->bucket; - value->object = input->object; + memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1); + memcpy(value->object, input->object, strlen(input->object) + 1); value->callback = input->callback; value->userdata = input->userdata; value->cache = input->cache; value->cache_times = input->cache_times; value->cache_rest = input->cache_rest; value->position = input->position; + value->recive_cnt = input->recive_cnt; + value->fd_status = value->fd_status; + value->overtime = value->overtime; + value->timeout = value->timeout; } } @@ -40,9 +48,18 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd) void delete_info_by_fd(hos_info_t **handle, size_t fd) { hos_info_t *value = NULL; + HASH_FIND_INT(*handle, &fd, value); if (value) { + if (value->bucket) + { + free(value->bucket); + } + if (value->object) + { + free(value->object); + } HASH_DEL(*handle, value); free(value); } @@ -53,6 +70,15 @@ void delete_all(hos_info_t **handle) hos_info_t *current, *tmp; HASH_ITER(hh, *handle, current, tmp) { + if (current->bucket) + { + free(current->bucket); + } + if (current->object) + { + free(current->object); + } HASH_DEL(*handle, current); + free(current); } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 51e2a6b7..4bdf7b0c 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -15,15 +15,20 @@ typedef struct hos_info_s size_t fd; int mode; hos_client_handle handle; - const char *bucket; - const char *object; + char *bucket; + char *object; void *callback; void *userdata; std::shared_ptr cache; - //void *cache; size_t cache_times; size_t cache_rest; size_t position; + size_t recive_cnt; + int fd_status; +#define HOS_FD_REGISTER 0 +#define HOS_FD_INJECT 1 + size_t overtime; //计算后超时的时间 + size_t timeout; //配置的超时时间,从status变成INJECT开始计时 UT_hash_handle hh; }hos_info_t;