diff --git a/example/demo/hos_write_demo.cpp b/example/demo/hos_write_demo.cpp index 7f794e20..7d757a7c 100644 --- a/example/demo/hos_write_demo.cpp +++ b/example/demo/hos_write_demo.cpp @@ -54,6 +54,10 @@ int file_to_buffer(const char *file, char *buffer, int size) void callback(bool result, const char *bucket, const char *object, const char *errormsg, size_t errorcode, void *userdata) { + if (!result) + { + printf("error:[%d], %s\n", errorcode, errormsg); + } return ; } @@ -128,15 +132,15 @@ int main(int argc, char *argv[]) printf("hos_write buff start ...\n"); snprintf(object, 1023, "%s_write_APPEND", file_name); fd = hos_open_fd(bucket, object, callback, NULL, 0); - if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK) + if (hos_write(fd, buf, buffer.st_size) != HOS_CLIENT_OK) { printf("error: hos_write failed 1st!\n"); } - if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK) + if (hos_write(fd, buf, buffer.st_size) != HOS_CLIENT_OK) { printf("error: hos_write failed 2nd!\n"); } - hos_close_fd(fd, 0); + hos_close_fd(fd); printf("hos_write buff end ...\n"); printf("hos_shutdown_instance start ...\n"); diff --git a/example/performance/HosClientPerformance.cpp b/example/performance/HosClientPerformance.cpp index cee829f7..59366474 100644 --- a/example/performance/HosClientPerformance.cpp +++ b/example/performance/HosClientPerformance.cpp @@ -155,7 +155,7 @@ static int upload_file(char *file, char *buff, int buff_len, thread_info_t *thre { clock_gettime(CLOCK_MONOTONIC, &tstart); fd[i] = hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num); - if (hos_write(fd[i], file, 0, thread_info->thread_num) != HOS_CLIENT_OK) + if (hos_write(fd[i], file, 0) != HOS_CLIENT_OK) { printf("error:hos_write file:%s\n", file); return -1; @@ -225,10 +225,10 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch rest = buff_len - tmp; if (rest <= g_append_size) { - hos_write(fd[0], &buff[tmp], rest, thread_info->thread_num); + hos_write(fd[0], &buff[tmp], rest); break; } - hos_write(fd[0], &buff[tmp], g_append_size, thread_info->thread_num); + hos_write(fd[0], &buff[tmp], g_append_size); j++; } clock_gettime(CLOCK_MONOTONIC, &ttmp); @@ -312,7 +312,7 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch { if (fd[i] > 2) { - hos_close_fd(fd[i], thread_info->thread_num); + hos_close_fd(fd[i]); } } diff --git a/gtest/CMakeLists.txt b/gtest/CMakeLists.txt index 3dabf565..948bdc72 100644 --- a/gtest/CMakeLists.txt +++ b/gtest/CMakeLists.txt @@ -13,7 +13,7 @@ SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fprofile-arcs -ftest-cove add_definitions(-g -W -Wall -std=c++11) #add_executable(gtest_hos_client gtest_hos_init_instance.cpp gtest_hos_get_instance.cpp gtest_hos_close_fd.cpp gtest_hos_open_fd.cpp) -#add_executable(gtest_hos_client CheckHosClient.cpp gtest_hos_write.cpp) +#add_executable(gtest_hos_client CheckHosClient.cpp gtest_hos_close_fd.cpp) add_executable(gtest_hos_client ${SRCS}) target_link_libraries(gtest_hos_client hos-client-cpp gtest gtest_main pthread) diff --git a/gtest/gtest_hos_close_fd.cpp b/gtest/gtest_hos_close_fd.cpp index bfbdc3b7..160bda59 100644 --- a/gtest/gtest_hos_close_fd.cpp +++ b/gtest/gtest_hos_close_fd.cpp @@ -77,13 +77,13 @@ TEST(hos_close_fd, normal) gtest_hos_handle_init(&expect_hos_handle, thread_num); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0); + long fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info); CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info); - int ret = hos_close_fd(fd, 0); + int ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_CLIENT_OK); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -121,12 +121,14 @@ TEST(hos_close_fd, paramer_error) expect_hos_handle.hos_config.thread_num=2; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - int fd = hos_open_fd(NULL, "object", NULL, NULL, 0); - EXPECT_EQ(fd, HOS_PARAMETER_ERROR); + long fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); + gtest_hos_fd_init(&expect_fd_info); + CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info); - int ret = hos_close_fd(fd, thread_num + 1); + ((hos_fd_context_t *)fd)->thread_id = thread_num + 1; + int ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -152,7 +154,7 @@ TEST(hos_close_fd, paramer_error) TEST(hos_close_fd, not_init_instance) { - int ret = hos_close_fd(1,1); + int ret = hos_close_fd(1); EXPECT_EQ(ret, HOS_INSTANCE_NOT_INIT); } diff --git a/gtest/gtest_hos_open_fd.cpp b/gtest/gtest_hos_open_fd.cpp index 401e2ed9..ac2ec3e6 100644 --- a/gtest/gtest_hos_open_fd.cpp +++ b/gtest/gtest_hos_open_fd.cpp @@ -89,12 +89,12 @@ TEST(hos_open_fd, normal) gtest_hos_fd_init(&expect_fd_info[1]); CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); - int ret = hos_close_fd(fd, 0); + int ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_CLIENT_OK); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd1, 1); + ret = hos_close_fd(fd1); EXPECT_EQ(ret, HOS_CLIENT_OK); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); diff --git a/gtest/gtest_hos_write.cpp b/gtest/gtest_hos_write.cpp index 256fe014..c149c4a9 100644 --- a/gtest/gtest_hos_write.cpp +++ b/gtest/gtest_hos_write.cpp @@ -130,7 +130,7 @@ TEST(hos_write, normal) expect_fd_info[0].userdata = (void *)"object_buff"; CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); - int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF), 0); + int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); expect_fd_info[0].fd_status = 2; data_info->rx_bytes[0] += strlen(HOS_BUFF); @@ -149,7 +149,7 @@ TEST(hos_write, normal) expect_fd_info[1].userdata = (void *)"object_append"; CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] +=1; @@ -161,7 +161,7 @@ TEST(hos_write, normal) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] +=1; @@ -174,14 +174,14 @@ TEST(hos_write, normal) EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_close_fd(fd, 0); + ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_bytes[0] += data_info->cache[0]; data_info->tx_pkts[0] += 1; data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd1, 1); + ret = hos_close_fd(fd1); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_bytes[1] += data_info->cache[1]; data_info->tx_pkts[1] += 1; @@ -232,7 +232,7 @@ TEST(hos_write, bucket_not_exits) expect_fd_info[0].object = (char *)"object_buff"; CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); - int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF), 0); + int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[0] = strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; @@ -250,7 +250,7 @@ TEST(hos_write, bucket_not_exits) expect_fd_info[1].object = (char *)"object_append"; CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->cache[1] += strlen(HOS_BUFF); data_info->rx_bytes[1] += strlen(HOS_BUFF); @@ -262,7 +262,7 @@ TEST(hos_write, bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->cache[1] += strlen(HOS_BUFF); data_info->rx_bytes[1] += strlen(HOS_BUFF); @@ -274,14 +274,14 @@ TEST(hos_write, bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_close_fd(fd, 0); + ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_failed_bytes[0] += data_info->cache[0]; data_info->tx_failed_pkts[0] += 1; data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd1, 1); + ret = hos_close_fd(fd1); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_failed_bytes[1] += data_info->cache[1]; data_info->tx_failed_pkts[1] += 1; @@ -330,7 +330,7 @@ TEST(hos_write, sync_mode) expect_fd_info[0].object = (char *)"object_buff"; CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); - int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF), 0); + int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; @@ -345,7 +345,7 @@ TEST(hos_write, sync_mode) expect_fd_info[1].object = (char *)"object_append"; CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; @@ -357,7 +357,7 @@ TEST(hos_write, sync_mode) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; @@ -369,14 +369,14 @@ TEST(hos_write, sync_mode) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_close_fd(fd, 0); + ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_bytes[0] += data_info->cache[0]; data_info->tx_pkts[0] += 1; data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd1, 1); + ret = hos_close_fd(fd1); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_bytes[1] += data_info->cache[1]; data_info->tx_pkts[1] += 1; @@ -426,7 +426,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) expect_fd_info[0].bucket = (char *)HOS_CONF; CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); - int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF), 0); + int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); expect_fd_info[0].fd_status = 2; data_info->rx_bytes[0] += strlen(HOS_BUFF); @@ -443,7 +443,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) expect_fd_info[1].bucket = (char *)HOS_CONF; CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; @@ -455,7 +455,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); + ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; @@ -467,14 +467,14 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); EXPECT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); - ret = hos_close_fd(fd, 0); + ret = hos_close_fd(fd); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_failed_bytes[0] += data_info->cache[0]; data_info->tx_failed_pkts[0] += 1; data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd1, 1); + ret = hos_close_fd(fd1); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->tx_failed_bytes[1] += data_info->cache[1]; data_info->tx_failed_pkts[1] += 1; @@ -521,7 +521,7 @@ TEST(hos_write, paramer_error) expect_fd_info.callback = (void *)hos_callback; CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info); - int ret = hos_write(fd, NULL, strlen(HOS_BUFF), 0); + int ret = hos_write(fd, NULL, strlen(HOS_BUFF)); EXPECT_EQ(ret, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -604,7 +604,7 @@ TEST(hos_write, over_threadnums) expect_fd_info.object = (char *)"object"; CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info); - int ret = hos_write(3, HOS_BUFF, strlen(HOS_CONF), 6); + int ret = hos_write(fd, NULL, strlen(HOS_CONF)); EXPECT_EQ(ret, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -630,7 +630,7 @@ TEST(hos_write, over_threadnums) TEST(hos_write, not_init_instance) { - int ret = hos_write(3, HOS_BUFF, strlen(HOS_CONF), 0); + int ret = hos_write(3, HOS_BUFF, strlen(HOS_CONF)); EXPECT_EQ(ret, HOS_INSTANCE_NOT_ENABLE); } @@ -700,7 +700,7 @@ static void *hos_function(void *ptr) int len = 0; snprintf(path, 256, "../file/test%d.file", i%10); read_file(path, &content, &len); - ret = hos_write(fd[i], content, len, i); + ret = hos_write(fd[i], content, len); free(content); EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->cache[i] = strlen(HOS_BUFF); @@ -716,7 +716,7 @@ static void *hos_function(void *ptr) for (i = 0; i < HOS_FD_NUMS_LOCAL; i++) { - ret = hos_close_fd(fd[i], i); + ret = hos_close_fd(fd[i]); EXPECT_EQ(ret, 0); data_info->rx_bytes[i] = data_info->cache[i]; data_info->rx_pkts[i] += 1; diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 52e82f81..fcd9742b 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -917,13 +917,14 @@ long hos_open_fd(const char *bucket, const char *object, put_finished_callback c hos_fd->cache_rest = g_hos_handle.hos_config.cache_size; hos_fd->fd_status = HOS_FD_REGISTER; hos_fd->reslut = true; + hos_fd->thread_id = thread_id; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: [%s] thread_id:%lu, fd:%lu", g_hos_instance.hos_url_prefix, thread_id, (long)&hos_fd); return (long)hos_fd; } -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) +int hos_write(size_t fd, const char *stream, size_t stream_len) { hos_fd_context_t *a_fd_context = NULL; char num[128]; @@ -932,12 +933,22 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t upload_len = 0; + size_t thread_id = 0; if (g_hos_instance.status != INSTANCE_ENABLE_STATE) { return HOS_INSTANCE_NOT_ENABLE; } + a_fd_context = (hos_fd_context_t *)fd; + if (a_fd_context == NULL) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s] fd is NULL", g_hos_instance.hos_url_prefix); + return HOS_FD_IS_INVALID; + } + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] Get fd_context", g_hos_instance.hos_url_prefix); + + thread_id = a_fd_context->thread_id; if ((stream == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, @@ -946,14 +957,6 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - a_fd_context = (hos_fd_context_t *)fd; - if (a_fd_context == NULL) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s] fd is NULL", g_hos_instance.hos_url_prefix); - return HOS_HASH_NOT_FIND; - } - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] Get fd_context", g_hos_instance.hos_url_prefix); - // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; @@ -990,7 +993,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(a_fd_context->cache); // add headers - atomic_add(&(a_fd_context->position), 1); + atomic_add(&(a_fd_context->position), 100001); snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); Aws::Map headers; headers["x-hos-upload-type"] = "append"; @@ -1027,25 +1030,19 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return ret; } -int hos_close_fd(size_t fd, size_t thread_id) +int hos_close_fd(size_t fd) { hos_fd_context_t *a_fd_context = NULL; char num[128]; hos_config_t *hos_conf = &g_hos_handle.hos_config; size_t upload_len = 0; + size_t thread_id = 0; if (g_hos_instance.status == INSTANCE_UNINIT_STATE) { return HOS_INSTANCE_NOT_INIT; } - if (thread_id > hos_conf->thread_num) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", - "error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.", - g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num); - return HOS_PARAMETER_ERROR; - } a_fd_context = (hos_fd_context_t *)fd; if (a_fd_context == NULL) { @@ -1055,6 +1052,15 @@ int hos_close_fd(size_t fd, size_t thread_id) return HOS_CLIENT_OK; } + thread_id = a_fd_context->thread_id; + if (thread_id > hos_conf->thread_num) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", + "error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.", + g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num); + return HOS_PARAMETER_ERROR; + } + //close fd 之前发送append的缓存中内容 if ((a_fd_context->mode & BUFF_MODE) && (a_fd_context->mode & APPEND_MODE)) { @@ -1067,7 +1073,7 @@ int hos_close_fd(size_t fd, size_t thread_id) request.SetBody(a_fd_context->cache); // add headers - atomic_add(&(a_fd_context->position), 1); + atomic_add(&(a_fd_context->position), 100001); snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); Aws::Map headers; headers["x-hos-upload-type"] = "append"; diff --git a/src/hos_client.h b/src/hos_client.h index e5b775dc..713fd51c 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -21,7 +21,7 @@ enum hoserrors { HOS_PARAMETER_ERROR = -1, HOS_FILE_NOT_EXIST = -2, - HOS_HASH_NOT_FIND = -3, + HOS_FD_IS_INVALID = -3, HOS_FD_NOT_ENOUGH = -4, HOS_SEND_FAILED = -5, HOS_RUNTIME_LOG_FAILED = -6, @@ -140,14 +140,14 @@ long hos_open_fd(const char *bucket, const char *object, put_finished_callback c * size_t position append模式下的每段内容编号 * 返回值 int 成功返回0,失败返回hoserror *************************************************************************************/ -int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id); +int hos_write(size_t fd, const char *stream, size_t stream_len); /************************************************************************************* * 函数名: hos_close_fd * 参数: size_t fd fd * size_t thread_id 线程ID * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ -int hos_close_fd(size_t fd, size_t thread_id); +int hos_close_fd(size_t fd); /************************************************************************************* * 函数名: hos_shutdown_instance * 返回值 int 成功返回0,失败返回hoserros错误码 diff --git a/src/hos_common.h b/src/hos_common.h index ec4a8467..f4829919 100644 --- a/src/hos_common.h +++ b/src/hos_common.h @@ -136,6 +136,7 @@ typedef struct hos_fd_context_s bool reslut; /*PutObjectAsync result*/ const char *error; /*PutObjectAsync error message*/ size_t errorcode; + size_t thread_id; }hos_fd_context_t;