🦄 refactor(TSG-7851): 重构fd,使用内存地址作为fd

This commit is contained in:
“pengxuanzheng”
2021-09-24 10:44:30 +08:00
committed by pengxuanzheng
parent 9250031fac
commit f9bce9d590
23 changed files with 138 additions and 1432 deletions

View File

@@ -48,8 +48,6 @@ hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle
static std::mutex m_client_lock;
static std::mutex m_instance_lock;
static std::mutex m_delete_lock;
hos_fd_context_t **g_fd_context;
size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd
static Aws::SDKOptions g_options;
static inline size_t get_current_ms()
@@ -62,7 +60,7 @@ static inline size_t get_current_ms()
static int hos_delete_fd(size_t fd, size_t thread_id)
{
std::lock_guard<std::mutex> locker(m_delete_lock);
hos_fd_context_t* context = find_context_by_fd(g_fd_context[thread_id], fd);
hos_fd_context_t* context = (hos_fd_context_t *)fd;
if (context == NULL)
{
return HOS_PARAMETER_ERROR;
@@ -83,7 +81,6 @@ static int hos_delete_fd(size_t fd, size_t thread_id)
free(context->object);
context->object = NULL;
}
HASH_DEL(g_fd_context[thread_id], context);
free(context);
return HOS_CLIENT_OK;
@@ -102,7 +99,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
size_t thread_id, fd, stream_len;
sscanf(uuid.c_str(), "%lu %lu %lu", &thread_id, &fd, &stream_len);
a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd);
a_fd_context = (hos_fd_context_t *)fd;
if (a_fd_context == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__,
@@ -156,13 +153,13 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
{
//APPEND MODE 保留fd
atomic_add(&(a_fd_context->recive_cnt), 1);
if (a_fd_context->fd_status == HOS_FD_INJECT)
if (a_fd_context->fd_status == HOS_FD_CANCEL)
{
if (a_fd_context->position == a_fd_context->recive_cnt)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete",
a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd);
a_fd_context->bucket, a_fd_context->object, thread_id, fd);
hos_delete_fd(fd, thread_id);
}
}
@@ -172,7 +169,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
//完整上传 删除fd
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete",
a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd);
a_fd_context->bucket, a_fd_context->object, thread_id, fd);
hos_delete_fd(fd, thread_id);
}
}
@@ -239,9 +236,6 @@ static void hos_client_create()
g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *));
g_fd_info = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
#if 0
if (g_hos_handle.hos_func.fd_thread == 0)
{
@@ -279,6 +273,10 @@ bool hos_verify_bucket(const char *bucket)
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: bucket:%s exits", bucket);
return true;
}
else
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_verify_bucket","debug: Get bucket list:%s", new_bucket.GetName().c_str());
}
}
}
else
@@ -477,19 +475,20 @@ static void hos_expand_fs2()
return ;
}
static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len,
size_t thread_id, size_t fd, const char *bucket, const char *object)
static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd)
{
char buf[128];
int ret = 0;
hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
data_info_t *data_info = NULL;
hos_config_t *hos_conf = &g_hos_handle.hos_config;
char *bucket = (*fd)->bucket;
char *object = (*fd)->object;
//设置回调函数
std::shared_ptr<Aws::Client::AsyncCallerContext> context =
Aws::MakeShared<Aws::Client::AsyncCallerContext>("");
sprintf(buf, "%lu %lu %lu", thread_id, fd, stream_len);
sprintf(buf, "%lu %lu %lu", thread_id, (long)*fd, stream_len);
context->SetUUID(buf);
if (hos_conf->max_request_num && hos_conf->max_request_context &&
@@ -544,11 +543,12 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t
}
}
static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, size_t fd,
const char *bucket, const char *object)
static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t stream_len, size_t thread_id, hos_fd_context_t **fd)
{
hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
data_info_t *data_info = NULL;
char *bucket = (*fd)->bucket;
char *object = (*fd)->object;
auto& S3Client = *(g_hos_handle.S3Client);
Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request);
@@ -748,7 +748,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
request.SetKey(object);
//设置上传数据类型
if (mode == 0)
if (mode == FILE_MODE)
{
//文件类型
const std::shared_ptr<Aws::IOStream> input_data =
@@ -776,18 +776,22 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
}
//设置回调函数
size_t fd = ++g_fd_info[thread_id];
hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 };
add_fd_context(&g_fd_context[thread_id], &info);
hos_fd_context_t *hos_fd = (hos_fd_context_t *)calloc(1, sizeof(hos_fd_context_t));
hos_fd->mode = mode;
hos_fd->bucket = (char *)malloc(strlen(bucket) + 1);
memcpy(hos_fd->bucket, bucket, strlen(bucket) + 1);
hos_fd->object = (char *)malloc(strlen(object) + 1);
memcpy(hos_fd->object, object, strlen(object) + 1);
hos_fd->callback = (void *)callback;
hos_fd->userdata = userdata;
if (hos_conf->pool_thread_size > 0)
{
ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object);
ret = hos_putobject_async(request, data_len, thread_id, &hos_fd);
}
else
{
ret = hos_putobject_sync(request, data_len, thread_id, fd, bucket, object);
ret = hos_putobject_sync(request, data_len, thread_id, &hos_fd);
}
return ret;
@@ -835,7 +839,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size
return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id);
}
int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id)
long hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id)
{
if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL)
{
@@ -849,15 +853,22 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca
return HOS_PARAMETER_ERROR;
}
size_t fd = ++g_fd_info[thread_id];
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd);
hos_fd_context_t info = {fd, BUFF_MODE | APPEND_MODE, (char *)bucket, (char *)object, (void *)callback, userdata,
NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/
(long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/};
add_fd_context(&g_fd_context[thread_id], &info);
hos_fd_context_t *hos_fd = (hos_fd_context_t *)calloc(1, sizeof(hos_fd_context_t));
hos_fd->mode = BUFF_MODE | APPEND_MODE;
hos_fd->bucket = (char *)malloc(strlen(bucket) + 1);
memcpy(hos_fd->bucket, bucket, strlen(bucket) + 1);
hos_fd->object = (char *)malloc(strlen(object) + 1);
memcpy(hos_fd->object, object, strlen(object) + 1);
hos_fd->callback = (void *)callback;
hos_fd->userdata = userdata;
hos_fd->cache_count = g_hos_handle.hos_config.cache_count;
hos_fd->cache_rest = g_hos_handle.hos_config.cache_size;
hos_fd->fd_status = HOS_FD_REGISTER;
hos_fd->reslut = true;
return fd;
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, (long)&hos_fd);
return (long)hos_fd;
}
int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id)
@@ -883,7 +894,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
return HOS_PARAMETER_ERROR;
}
a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd);
a_fd_context = (hos_fd_context_t *)fd;
if (a_fd_context == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find. thread_id:%lu, fd:%lu", thread_id, fd);
@@ -943,11 +954,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
if (hos_conf->pool_thread_size > 0)
{
ret = hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object);
ret = hos_putobject_async(request, upload_len, thread_id, &a_fd_context);
}
else
{
ret = hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object);
ret = hos_putobject_sync(request, upload_len, thread_id, &a_fd_context);
}
//恢复fd 的cache设置
@@ -983,7 +994,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
fd, thread_id, hos_conf->thread_num);
return HOS_PARAMETER_ERROR;
}
a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd);
a_fd_context = (hos_fd_context_t *)fd;
if (a_fd_context == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG,
@@ -1016,18 +1027,18 @@ int hos_close_fd(size_t fd, size_t thread_id)
if (hos_conf->pool_thread_size > 0)
{
hos_putobject_async(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object);
hos_putobject_async(request, upload_len, thread_id, &a_fd_context);
}
else
{
hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object);
hos_putobject_sync(request, upload_len, thread_id, &a_fd_context);
}
data_info_t *data_info = (data_info_t *)(g_hos_handle.hos_func.fs2_info.reserved);
if (data_info)
data_info->cache[thread_id] = 0;
}
}
a_fd_context->fd_status = HOS_FD_INJECT;
a_fd_context->fd_status = HOS_FD_CANCEL;
a_fd_context->cache.reset();
a_fd_context->cache = NULL;
a_fd_context->cache_rest = hos_conf->cache_size;
@@ -1046,7 +1057,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete",
a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd);
a_fd_context->bucket, a_fd_context->object, thread_id, fd);
hos_delete_fd(fd, thread_id);
}
}
@@ -1058,7 +1069,6 @@ int hos_shutdown_instance()
{
std::lock_guard<std::mutex> locker(m_instance_lock);
size_t i = 0;
hos_config_t *hos_conf = &g_hos_handle.hos_config;
hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
size_t task_num = 0;
@@ -1153,23 +1163,6 @@ int hos_shutdown_instance()
}
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: delete s3client.");
if (g_fd_info)
{
free(g_fd_info);
g_fd_info = NULL;
}
for (i = 0; i < hos_conf->thread_num; i++)
{
delete_all(&g_fd_context[i]);
}
if (g_fd_context)
{
free(g_fd_context);
g_fd_context = NULL;
}
Aws::ShutdownAPI(g_options);
MESA_destroy_runtime_log_handle(g_hos_handle.log);
g_hos_handle.log = NULL;