🦄 refactor(TSG-7599): fd 保存thread_id

This commit is contained in:
pengxuanzheng
2021-10-13 18:46:17 +08:00
parent 20bc7e176c
commit 0210e679f4
9 changed files with 76 additions and 63 deletions

View File

@@ -917,13 +917,14 @@ long hos_open_fd(const char *bucket, const char *object, put_finished_callback c
hos_fd->cache_rest = g_hos_handle.hos_config.cache_size;
hos_fd->fd_status = HOS_FD_REGISTER;
hos_fd->reslut = true;
hos_fd->thread_id = thread_id;
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: [%s] thread_id:%lu, fd:%lu", g_hos_instance.hos_url_prefix, thread_id, (long)&hos_fd);
return (long)hos_fd;
}
int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id)
int hos_write(size_t fd, const char *stream, size_t stream_len)
{
hos_fd_context_t *a_fd_context = NULL;
char num[128];
@@ -932,12 +933,22 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
hos_config_t *hos_conf = &g_hos_handle.hos_config;
hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
size_t upload_len = 0;
size_t thread_id = 0;
if (g_hos_instance.status != INSTANCE_ENABLE_STATE)
{
return HOS_INSTANCE_NOT_ENABLE;
}
a_fd_context = (hos_fd_context_t *)fd;
if (a_fd_context == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s] fd is NULL", g_hos_instance.hos_url_prefix);
return HOS_FD_IS_INVALID;
}
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] Get fd_context", g_hos_instance.hos_url_prefix);
thread_id = a_fd_context->thread_id;
if ((stream == NULL) || (thread_id > hos_conf->thread_num))
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL,
@@ -946,14 +957,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
return HOS_PARAMETER_ERROR;
}
a_fd_context = (hos_fd_context_t *)fd;
if (a_fd_context == NULL)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s] fd is NULL", g_hos_instance.hos_url_prefix);
return HOS_HASH_NOT_FIND;
}
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] Get fd_context", g_hos_instance.hos_url_prefix);
// create and configure the asynchronous put object request.
Aws::S3::Model::PutObjectRequest request;
@@ -990,7 +993,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
request.SetBody(a_fd_context->cache);
// add headers
atomic_add(&(a_fd_context->position), 1);
atomic_add(&(a_fd_context->position), 100001);
snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position)));
Aws::Map<Aws::String, Aws::String> headers;
headers["x-hos-upload-type"] = "append";
@@ -1027,25 +1030,19 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
return ret;
}
int hos_close_fd(size_t fd, size_t thread_id)
int hos_close_fd(size_t fd)
{
hos_fd_context_t *a_fd_context = NULL;
char num[128];
hos_config_t *hos_conf = &g_hos_handle.hos_config;
size_t upload_len = 0;
size_t thread_id = 0;
if (g_hos_instance.status == INSTANCE_UNINIT_STATE)
{
return HOS_INSTANCE_NOT_INIT;
}
if (thread_id > hos_conf->thread_num)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd",
"error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.",
g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num);
return HOS_PARAMETER_ERROR;
}
a_fd_context = (hos_fd_context_t *)fd;
if (a_fd_context == NULL)
{
@@ -1055,6 +1052,15 @@ int hos_close_fd(size_t fd, size_t thread_id)
return HOS_CLIENT_OK;
}
thread_id = a_fd_context->thread_id;
if (thread_id > hos_conf->thread_num)
{
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd",
"error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.",
g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num);
return HOS_PARAMETER_ERROR;
}
//close fd 之前发送append的缓存中内容
if ((a_fd_context->mode & BUFF_MODE) && (a_fd_context->mode & APPEND_MODE))
{
@@ -1067,7 +1073,7 @@ int hos_close_fd(size_t fd, size_t thread_id)
request.SetBody(a_fd_context->cache);
// add headers
atomic_add(&(a_fd_context->position), 1);
atomic_add(&(a_fd_context->position), 100001);
snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position)));
Aws::Map<Aws::String, Aws::String> headers;
headers["x-hos-upload-type"] = "append";