From a5b96484152a5a6321ea373a0e550da68712aa04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=AE=A3=E6=AD=A3?= Date: Tue, 1 Dec 2020 18:24:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=96=E6=B6=88cache=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=B8=8B=EF=BC=8C=E5=A4=9A=E6=AC=A1=E5=8F=91=E9=80=81=E7=9A=84?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_client.cpp | 61 ++++++++++++++-------------------------------- src/hos_hash.h | 2 +- 2 files changed, 19 insertions(+), 44 deletions(-) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 3497e97e..18207471 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -641,7 +641,8 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_count, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; + hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, + NULL,/*cache*/ handle->cache_count, 0,/*position*/ 0,/*recive_cnt*/(long)handle->cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ 0,/*overtime*/ handle->timeout,}; add_hos_info(&hash_hos_info[thread_id], &info); #if 1 if (handle->fd_thread == 0) @@ -660,8 +661,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_client_handle handle = NULL; char num[128]; char buf[128]; - int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完 - int rest; // stream 剩余未处理的数据长度 int ret = 0; data_info_t *data_info = NULL; @@ -711,51 +710,28 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (hos_info->cache_count == 0) { //不设置cache_count的情况下 - if (stream_len < hos_info->cache_rest) + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + hos_cache[thread_id] += stream_len; + if (hos_info->cache_rest > 0) { - // cache - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; - hos_info->cache_rest -= stream_len; - if (hos_info->cache_rest > 0) - { - hos_cache[thread_id] += stream_len; - return HOS_CLIENT_OK; - } - }else if (stream_len >= hos_info->cache_rest) - { - // multi handle - flag = 1; - Aws::String buffer (stream, hos_info->cache_rest); - *hos_info->cache << buffer; - rest = stream_len - hos_info->cache_rest; + return HOS_CLIENT_OK; } }else { + // cache + Aws::String buffer (stream, stream_len); + *hos_info->cache << buffer; + hos_info->cache_rest -= stream_len; + hos_cache[thread_id] += stream_len; //设置cache times的情况下 - if ((--hos_info->cache_count) && (stream_len <= hos_info->cache_rest)) + if (--hos_info->cache_count) { - // cache - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; - hos_info->cache_rest -= stream_len; if (hos_info->cache_rest > 0) { - hos_cache[thread_id] += stream_len; return HOS_CLIENT_OK; } - }else if (stream_len > hos_info->cache_rest) - { - // multi handle - flag = 1; - Aws::String buffer (stream, hos_info->cache_rest); - *hos_info->cache << buffer; - rest = stream_len - hos_info->cache_rest; - }else - { - //over cache_count - Aws::String buffer (stream, stream_len); - *hos_info->cache << buffer; } } request.SetBody(hos_info->cache); @@ -805,6 +781,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id //恢复fd 的cache设置 if (hos_info->mode & APPEND_MODE) { + hos_info->cache.reset(); hos_info->cache = NULL; hos_info->cache_rest = hos_info->handle->cache_size; hos_info->cache_count = hos_info->handle->cache_count; @@ -828,10 +805,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id data_info->tx_bytes[thread_id] += buffer.st_size; } } - while (flag == 1) - { - return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id); - } }else { return HOS_SEND_FAILED; @@ -863,7 +836,7 @@ int hos_close_fd(size_t fd, size_t thread_id) //close fd 之前发送append的缓存中内容 if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE)) { - if (hos_info->cache_rest != hos_info->handle->cache_size) + if (hos_info->cache_rest != (long)hos_info->handle->cache_size) { //handle = (hos_client_handle)hos_info->handle; Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client); @@ -900,6 +873,7 @@ int hos_close_fd(size_t fd, size_t thread_id) } } hos_info->fd_status = HOS_FD_INJECT; + hos_info->cache.reset(); hos_info->overtime = get_current_ms() + hos_info->timeout; fd_info[thread_id][HOS_FD_REGISTER]--; @@ -955,6 +929,7 @@ int hos_client_destory(hos_client_handle handle) if (data_info->tx_bytes_last) free(data_info->tx_bytes_last); } + free(handle->fs2_info[i].reserved); } if (handle->fs2_info[i].line_ids) free(handle->fs2_info[i].line_ids); diff --git a/src/hos_hash.h b/src/hos_hash.h index 24dac240..3936b3ec 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -21,9 +21,9 @@ typedef struct hos_info_s void *userdata; std::shared_ptr cache; size_t cache_count; - size_t cache_rest; size_t position; size_t recive_cnt; + long cache_rest; int fd_status; #define HOS_FD_FREE 0 #define HOS_FD_REGISTER 1