修复堆栈空间使用不当导致的object丢失

This commit is contained in:
pengxuanzheng
2020-11-11 11:20:19 +08:00
parent 7bf190553b
commit 5df4522050
3 changed files with 181 additions and 44 deletions

View File

@@ -30,10 +30,13 @@ typedef struct hos_client_handle_s
Aws::S3::S3Client *S3Client; Aws::S3::S3Client *S3Client;
Aws::SDKOptions options; Aws::SDKOptions options;
Aws::Vector<Aws::S3::Model::Bucket> buckets; Aws::Vector<Aws::S3::Model::Bucket> buckets;
pthread_t fd_thread;
int fd_thread_status;
/* options */ /* options */
size_t cache_size; size_t cache_size;
size_t cache_times; size_t cache_times;
size_t thread_sum; size_t thread_sum;
size_t timeout;
/* expand */ /* expand */
screen_stat_handle_t fs2_handle; screen_stat_handle_t fs2_handle;
pthread_t fs2_thread; pthread_t fs2_thread;
@@ -52,10 +55,18 @@ typedef struct hos_client_handle_s
int *rx_bytes_last; int *rx_bytes_last;
}hos_client_handle_t; }hos_client_handle_t;
hos_client_handle hos_handle;//一个进程只允许有一个hos_handle
hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM];
size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM];
Aws::SDKOptions options; Aws::SDKOptions options;
static inline size_t get_current_ms()
{
struct timespec timenow;
clock_gettime(CLOCK_MONOTONIC, &timenow);
return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 );
}
static size_t hash_get_min_free_fd(size_t thread_id) static size_t hash_get_min_free_fd(size_t thread_id)
{ {
size_t i = 0; size_t i = 0;
@@ -67,6 +78,18 @@ static size_t hash_get_min_free_fd(size_t thread_id)
return 0; return 0;
} }
static int hos_delete_fd(size_t fd, size_t thread_id)
{
if (fd == 0)
{
return HOS_PARAMETER_ERROR;
}
delete_info_by_fd(&hash_hos_info[thread_id], fd);
fd_info[thread_id][fd] = 0;
return HOS_CLIENT_OK;
}
static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
const Aws::S3::Model::PutObjectRequest& request, const Aws::S3::Model::PutObjectRequest& request,
const Aws::S3::Model::PutObjectOutcome& outcome, const Aws::S3::Model::PutObjectOutcome& outcome,
@@ -95,10 +118,19 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
if (hos_info->mode & APPEND_MODE) if (hos_info->mode & APPEND_MODE)
{ {
//APPEND MODE 保留fd //APPEND MODE 保留fd
hos_info->recive_cnt++;
#if 0
if (hos_info->fd_status == HOS_FD_INJECT)
{
if (hos_info->recive_cnt == hos_info->position)
hos_delete_fd(fd, thread_id);
}
#endif
}else }else
{ {
//完整上传 删除fd //完整上传 删除fd
hos_close_fd(fd, thread_id); //hos_delete_fd(fd, thread_id);
hos_info->fd_status = HOS_FD_INJECT;
} }
} }
@@ -139,9 +171,12 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
return NULL; return NULL;
} }
if (hos_handle)
hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t)); {
memset(handle, 0, sizeof(hos_client_handle_t)); return hos_handle;
}
hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t));
memset(hos_handle, 0, sizeof(hos_client_handle_t));
Aws::Client::ClientConfiguration config; Aws::Client::ClientConfiguration config;
Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey);
@@ -151,21 +186,23 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
config.enableEndpointDiscovery = true; config.enableEndpointDiscovery = true;
config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池 config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池
handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
handle->options = options; hos_handle->options = options;
/* 获取当前用户的所有的buckets */ /* 获取当前用户的所有的buckets */
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); Aws::S3::Model::ListBucketsOutcome outcome = hos_handle->S3Client->ListBuckets();
if (outcome.IsSuccess()) if (!outcome.IsSuccess())
{ {
handle->buckets = outcome.GetResult().GetBuckets(); return NULL;
} }
handle->cache_size = 0; hos_handle->buckets = outcome.GetResult().GetBuckets();
handle->cache_times = 1; hos_handle->cache_size = 0;
handle->thread_sum = 1; hos_handle->cache_times = 1;
hos_handle->thread_sum = 1;
hos_handle->timeout = 1000;
return handle; return hos_handle;
} }
static void *fs2_statistics(void *ptr) static void *fs2_statistics(void *ptr)
@@ -420,7 +457,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
sprintf(buf, "%lu %lu", thread_id, fd); sprintf(buf, "%lu %lu", thread_id, fd);
context->SetUUID(buf); context->SetUUID(buf);
hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 }; hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 };
add_hos_info(&hash_hos_info[thread_id], &info); add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1; fd_info[thread_id][fd] = 1;
@@ -440,6 +477,37 @@ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *obj
return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1); return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1);
} }
static void *hos_fd_manage(void *ptr)
{
hos_info_t *hos_info;
hos_client_handle handle = (hos_client_handle)ptr;
size_t thread_sum = handle->thread_sum;
size_t thread_num;
size_t fd;
while(1)
{
if (handle->fd_thread_status)
break;
for (thread_num = 0; thread_num < thread_sum; thread_num++)
{
for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++)
{
if (!fd_info[thread_num][fd])
break;
hos_info = find_info_by_fd(hash_hos_info[thread_num], fd);
if (!hos_info)
break;
if (hos_info->fd_status == HOS_FD_REGISTER)
continue;
if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms()))
hos_delete_fd(fd, thread_num);
}
}
usleep(1000);
}
pthread_exit(NULL);
}
int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode)
{ {
if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum)) if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum))
@@ -453,10 +521,16 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_FD_NOT_ENOUGH; return HOS_FD_NOT_ENOUGH;
} }
hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, }; hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,};
add_hos_info(&hash_hos_info[thread_id], &info); add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1; fd_info[thread_id][fd] = 1;
#if 1
if (handle->fd_thread == 0)
{
handle->fd_thread_status = 0;
pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle);
}
#endif
return fd; return fd;
} }
@@ -495,10 +569,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
Aws::S3::S3Client& S3Client = *(handle->S3Client); Aws::S3::S3Client& S3Client = *(handle->S3Client);
// Create and configure the asynchronous put object request. // create and configure the asynchronous put object request.
Aws::S3::Model::PutObjectRequest request; Aws::S3::Model::PutObjectRequest request;
request.SetBucket(hos_info->bucket);
request.SetKey(hos_info->object);
//设置上传数据类型 //设置上传数据类型
if (hos_info->mode & BUFF_MODE) if (hos_info->mode & BUFF_MODE)
@@ -511,27 +583,53 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
{ {
hos_info->cache = Aws::MakeShared<Aws::StringStream>("append mode"); hos_info->cache = Aws::MakeShared<Aws::StringStream>("append mode");
} }
if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) if (hos_info->cache_times == 0)
{ {
// cache //不设置cache_times的情况下
Aws::String buffer (stream, stream_len); if (stream_len < hos_info->cache_rest)
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{ {
return HOS_CLIENT_OK; // cache
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
return HOS_CLIENT_OK;
}
}else if (stream_len >= hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
} }
}else if (stream_len > hos_info->cache_rest) }else
{ {
// multi handle //设置cache times的情况下
flag = 1; if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest))
Aws::String buffer (stream, hos_info->cache_rest); {
*hos_info->cache << buffer; // cache
rest = stream_len - hos_info->cache_rest; Aws::String buffer (stream, stream_len);
}else *hos_info->cache << buffer;
{ hos_info->cache_rest -= stream_len;
//over cache_times if (hos_info->cache_rest > 0)
Aws::String buffer (stream, stream_len); {
*hos_info->cache << buffer; return HOS_CLIENT_OK;
}
}else if (stream_len > hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
}else
{
//over cache_times
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
}
} }
request.SetBody(hos_info->cache); request.SetBody(hos_info->cache);
@@ -565,6 +663,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
request.SetBody(input_data); request.SetBody(input_data);
} }
request.SetBucket(hos_info->bucket);
request.SetKey(hos_info->object);
//设置回调函数 //设置回调函数
std::shared_ptr<Aws::Client::AsyncCallerContext> context = std::shared_ptr<Aws::Client::AsyncCallerContext> context =
Aws::MakeShared<Aws::Client::AsyncCallerContext>(""); Aws::MakeShared<Aws::Client::AsyncCallerContext>("");
@@ -657,9 +758,9 @@ int hos_close_fd(size_t fd, size_t thread_id)
S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
} }
} }
hos_info->fd_status = HOS_FD_INJECT;
delete_info_by_fd(&hash_hos_info[thread_id], fd); hos_info->cache.reset();
fd_info[thread_id][fd] = 0; hos_info->overtime = get_current_ms() + hos_info->timeout;
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
} }
@@ -676,6 +777,11 @@ int hos_client_destory(hos_client_handle handle)
Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets); Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets);
if (handle->fd_thread)
{
handle->fd_thread_status = 1;
pthread_join(handle->fd_thread, NULL);
}
for (i = 0; i < handle->thread_sum; i++) for (i = 0; i < handle->thread_sum; i++)
{ {
delete_all(&hash_hos_info[i]); delete_all(&hash_hos_info[i]);

View File

@@ -13,20 +13,28 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input)
{ {
value = (hos_info_t *)malloc(sizeof(hos_info_t)); value = (hos_info_t *)malloc(sizeof(hos_info_t));
memcpy(value, input, sizeof(hos_info_t)); memcpy(value, input, sizeof(hos_info_t));
value->object = (char *)malloc(strlen(input->object) + 1);
value->bucket = (char *)malloc(strlen(input->bucket) + 1);
memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1);
memcpy(value->object, input->object, strlen(input->object) + 1);
HASH_ADD_INT(*handle, fd, value); HASH_ADD_INT(*handle, fd, value);
} }
else else
{ {
value->mode = input->mode; value->mode = input->mode;
value->handle = input->handle; value->handle = input->handle;
value->bucket = input->bucket; memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1);
value->object = input->object; memcpy(value->object, input->object, strlen(input->object) + 1);
value->callback = input->callback; value->callback = input->callback;
value->userdata = input->userdata; value->userdata = input->userdata;
value->cache = input->cache; value->cache = input->cache;
value->cache_times = input->cache_times; value->cache_times = input->cache_times;
value->cache_rest = input->cache_rest; value->cache_rest = input->cache_rest;
value->position = input->position; value->position = input->position;
value->recive_cnt = input->recive_cnt;
value->fd_status = value->fd_status;
value->overtime = value->overtime;
value->timeout = value->timeout;
} }
} }
@@ -40,9 +48,18 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd)
void delete_info_by_fd(hos_info_t **handle, size_t fd) void delete_info_by_fd(hos_info_t **handle, size_t fd)
{ {
hos_info_t *value = NULL; hos_info_t *value = NULL;
HASH_FIND_INT(*handle, &fd, value); HASH_FIND_INT(*handle, &fd, value);
if (value) if (value)
{ {
if (value->bucket)
{
free(value->bucket);
}
if (value->object)
{
free(value->object);
}
HASH_DEL(*handle, value); HASH_DEL(*handle, value);
free(value); free(value);
} }
@@ -53,6 +70,15 @@ void delete_all(hos_info_t **handle)
hos_info_t *current, *tmp; hos_info_t *current, *tmp;
HASH_ITER(hh, *handle, current, tmp) HASH_ITER(hh, *handle, current, tmp)
{ {
if (current->bucket)
{
free(current->bucket);
}
if (current->object)
{
free(current->object);
}
HASH_DEL(*handle, current); HASH_DEL(*handle, current);
free(current);
} }
} }

View File

@@ -15,15 +15,20 @@ typedef struct hos_info_s
size_t fd; size_t fd;
int mode; int mode;
hos_client_handle handle; hos_client_handle handle;
const char *bucket; char *bucket;
const char *object; char *object;
void *callback; void *callback;
void *userdata; void *userdata;
std::shared_ptr<Aws::IOStream> cache; std::shared_ptr<Aws::IOStream> cache;
//void *cache;
size_t cache_times; size_t cache_times;
size_t cache_rest; size_t cache_rest;
size_t position; size_t position;
size_t recive_cnt;
int fd_status;
#define HOS_FD_REGISTER 0
#define HOS_FD_INJECT 1
size_t overtime; //计算后超时的时间
size_t timeout; //配置的超时时间从status变成INJECT开始计时
UT_hash_handle hh; UT_hash_handle hh;
}hos_info_t; }hos_info_t;