取消cache模式下,多次发送的机制

This commit is contained in:
彭宣正
2020-12-01 18:24:20 +08:00
parent 63d45c50b7
commit a5b9648415
2 changed files with 19 additions and 44 deletions

View File

@@ -641,7 +641,8 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_FD_NOT_ENOUGH;
}
hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_count, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,};
hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata,
NULL,/*cache*/ handle->cache_count, 0,/*position*/ 0,/*recive_cnt*/(long)handle->cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ 0,/*overtime*/ handle->timeout,};
add_hos_info(&hash_hos_info[thread_id], &info);
#if 1
if (handle->fd_thread == 0)
@@ -660,8 +661,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_client_handle handle = NULL;
char num[128];
char buf[128];
int flag = 0; // 0, 一次处理就可以完成1需要多次处理才能处理完
int rest; // stream 剩余未处理的数据长度
int ret = 0;
data_info_t *data_info = NULL;
@@ -711,51 +710,28 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
if (hos_info->cache_count == 0)
{
//不设置cache_count的情况下
if (stream_len < hos_info->cache_rest)
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
hos_cache[thread_id] += stream_len;
if (hos_info->cache_rest > 0)
{
// cache
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
hos_cache[thread_id] += stream_len;
return HOS_CLIENT_OK;
}
}else if (stream_len >= hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
return HOS_CLIENT_OK;
}
}else
{
// cache
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
hos_cache[thread_id] += stream_len;
//设置cache times的情况下
if ((--hos_info->cache_count) && (stream_len <= hos_info->cache_rest))
if (--hos_info->cache_count)
{
// cache
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
hos_cache[thread_id] += stream_len;
return HOS_CLIENT_OK;
}
}else if (stream_len > hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
}else
{
//over cache_count
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
}
}
request.SetBody(hos_info->cache);
@@ -805,6 +781,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
//恢复fd 的cache设置
if (hos_info->mode & APPEND_MODE)
{
hos_info->cache.reset();
hos_info->cache = NULL;
hos_info->cache_rest = hos_info->handle->cache_size;
hos_info->cache_count = hos_info->handle->cache_count;
@@ -828,10 +805,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
data_info->tx_bytes[thread_id] += buffer.st_size;
}
}
while (flag == 1)
{
return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id);
}
}else
{
return HOS_SEND_FAILED;
@@ -863,7 +836,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
//close fd 之前发送append的缓存中内容
if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE))
{
if (hos_info->cache_rest != hos_info->handle->cache_size)
if (hos_info->cache_rest != (long)hos_info->handle->cache_size)
{
//handle = (hos_client_handle)hos_info->handle;
Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client);
@@ -900,6 +873,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
}
}
hos_info->fd_status = HOS_FD_INJECT;
hos_info->cache.reset();
hos_info->overtime = get_current_ms() + hos_info->timeout;
fd_info[thread_id][HOS_FD_REGISTER]--;
@@ -955,6 +929,7 @@ int hos_client_destory(hos_client_handle handle)
if (data_info->tx_bytes_last)
free(data_info->tx_bytes_last);
}
free(handle->fs2_info[i].reserved);
}
if (handle->fs2_info[i].line_ids)
free(handle->fs2_info[i].line_ids);