diff --git a/gtest/gtest_hos_write.cpp b/gtest/gtest_hos_write.cpp index 574bb004..7ebacd54 100644 --- a/gtest/gtest_hos_write.cpp +++ b/gtest/gtest_hos_write.cpp @@ -137,7 +137,7 @@ TEST(hos_write, normal) CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); expect_fd_info[0].fd_status = 2; data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] +=1; @@ -157,7 +157,7 @@ TEST(hos_write, normal) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] +=1; data_info->cache[1] += strlen(HOS_BUFF); @@ -169,7 +169,7 @@ TEST(hos_write, normal) ASSERT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] +=1; data_info->cache[1] += strlen(HOS_BUFF); @@ -241,7 +241,7 @@ TEST(hos_write, bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[0] = strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; data_info->cache[0] = strlen(HOS_BUFF); @@ -260,7 +260,7 @@ TEST(hos_write, bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->cache[1] += strlen(HOS_BUFF); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; @@ -272,7 +272,7 @@ TEST(hos_write, bucket_not_exits) ASSERT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->cache[1] += strlen(HOS_BUFF); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; @@ -341,7 +341,7 @@ TEST(hos_write, sync_mode) CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; data_info->cache[0] += strlen(HOS_BUFF); @@ -357,7 +357,7 @@ TEST(hos_write, sync_mode) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; data_info->cache[1] += strlen(HOS_BUFF); @@ -369,7 +369,7 @@ TEST(hos_write, sync_mode) ASSERT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; data_info->cache[1] += strlen(HOS_BUFF); @@ -439,7 +439,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info[0]); int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); expect_fd_info[0].fd_status = 2; data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; @@ -457,7 +457,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckStructGHosFdContext((hos_fd_context_t *)fd1, &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; data_info->cache[1] += strlen(HOS_BUFF); @@ -469,7 +469,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) ASSERT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF)); - ASSERT_EQ(ret, HOS_CLIENT_OK); + ASSERT_EQ(ret, HOS_IN_CACHE); data_info->rx_bytes[1] += strlen(HOS_BUFF); data_info->rx_pkts[1] += 1; data_info->cache[1] += strlen(HOS_BUFF); diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 0d350d43..81989cba 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -137,14 +137,14 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] upload success. tx_pkts:%lu, tx_bytes:%lu", + "debug: [%s/%s/%s] upload success. tx_pkts:%zu, tx_bytes:%zu", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); } else { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] upload success. stream size:%lu", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, stream_len); + "debug: [%s/%s/%s] upload success. stream size:%zu", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, stream_len); } a_fd_context->error = NULL; a_fd_context->errorcode = 0; @@ -160,7 +160,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, if (a_fd_context->position == a_fd_context->recive_cnt) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] upload completed. [thread:%lu fd:%lu] delete", + "debug: [%s/%s/%s] upload completed. [thread:%zu fd:%zu] delete", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, thread_id, fd); hos_delete_fd(fd, thread_id); } @@ -170,7 +170,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, { //完整上传 删除fd MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] upload completed. [thread:%lu fd:%lu] delete", + "debug: [%s/%s/%s] upload completed. [thread:%zu fd:%zu] delete", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, thread_id, fd); hos_delete_fd(fd, thread_id); } @@ -597,7 +597,7 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] PutObject success. tx_pkts:%lu, tx_bytes:%lu", + "debug: [%s/%s/%s] PutObject success. tx_pkts:%zu, tx_bytes:%zu", g_hos_instance.hos_url_prefix, bucket, object, data_info->tx_pkts[thread_id], data_info->tx_bytes[thread_id]); } else @@ -721,7 +721,7 @@ int hos_create_bucket(const char *bucket) if (bucket == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_create_bucket", - "error: [%s] bucket:%s", g_hos_instance.hos_url_prefix, bucket); + "error: [%s] bucket:%s", g_hos_instance.hos_url_prefix, "NULL"); return HOS_PARAMETER_ERROR; } auto& S3Client = *g_hos_handle.S3Client; @@ -773,7 +773,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char if ((bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", - "error: [%s] s3client:%s, bucket:%s, object:%s, thread_id:%lu, thread_num:%u", + "error: [%s] s3client:%s, bucket:%s, object:%s, thread_id:%zu, thread_num:%u", g_hos_instance.hos_url_prefix, g_hos_handle.S3Client?"not null":"null", bucket, object, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } @@ -846,8 +846,8 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call if ((bucket == NULL) || (file_path == NULL) || (thread_id > g_hos_handle.hos_config.thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_file", - "error: [%s] bucket:%s, file_path:%s, thread_id:%lu, thread_num:%u", - g_hos_instance.hos_url_prefix, bucket, file_path, thread_id, g_hos_handle.hos_config.thread_num); + "error: [%s] bucket:%s, file_path:%s, thread_id:%zu, thread_num:%u", + g_hos_instance.hos_url_prefix, (bucket == NULL)?"NULL":bucket, (file_path == NULL)?"NULL":file_path, thread_id, g_hos_handle.hos_config.thread_num); return HOS_PARAMETER_ERROR; } @@ -870,8 +870,8 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size || (thread_id > g_hos_handle.hos_config.thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_buf", - "error:[%s] bucket:%s, object:%s, buf:%s, buf_len:%lu, thread_id:%lu, thread_num:%u", - g_hos_instance.hos_url_prefix, bucket, object, buf?"not null":"null", buf_len, thread_id, g_hos_handle.hos_config.thread_num); + "error:[%s] bucket:%s, object:%s, buf:%s, buf_len:%zu, thread_id:%zu, thread_num:%u", + g_hos_instance.hos_url_prefix, bucket?bucket:"NULL", object?object:"NULL", buf?"not null":"null", buf_len, thread_id, g_hos_handle.hos_config.thread_num); return HOS_PARAMETER_ERROR; } return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); @@ -887,7 +887,7 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "error: [%s] bucket:%s, obejct:%s, thread_id:%lu", g_hos_instance.hos_url_prefix, + "error: [%s] bucket:%s, obejct:%s, thread_id:%zu", g_hos_instance.hos_url_prefix, (bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); *fd = 0; return HOS_PARAMETER_ERROR; @@ -907,7 +907,7 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca hos_fd->reslut = true; hos_fd->thread_id = thread_id; - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: [%s] thread_id:%lu, fd:%lu", g_hos_instance.hos_url_prefix, thread_id, (long)&hos_fd); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: [%s] thread_id:%zu, fd:%ld", g_hos_instance.hos_url_prefix, thread_id, (long)hos_fd); *fd = (size_t)hos_fd; return HOS_CLIENT_OK; @@ -940,7 +940,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len) if ((stream == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, - "hos_write", "error: [%s] fd:%lu, stream:%s, stream_len:%lu, thread_id:%lu.", + "hos_write", "error: [%s] fd:%zu, stream:%s, stream_len:%zu, thread_id:%zu.", g_hos_instance.hos_url_prefix, fd, stream?"not null":"null", stream_len, thread_id); return HOS_PARAMETER_ERROR; } @@ -948,7 +948,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len) if (a_fd_context->position >= hos_conf->max_position) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, - __FUNCTION__, "error: [%s%s/%s] upload times over max times[%d] ", + __FUNCTION__, "error: [%s%s/%s] upload times over max times[%u] ", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, hos_conf->max_position); return HOS_FD_OVER_POSITION; @@ -983,7 +983,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len) //cache_count > 0,设置cache_count的情况 if (a_fd_context->cache_rest > 0) { - return HOS_CLIENT_OK; + return HOS_IN_CACHE; } } request.SetBody(a_fd_context->cache); @@ -1032,6 +1032,7 @@ int hos_close_fd(size_t fd) char num[128]; hos_config_t *hos_conf = &g_hos_handle.hos_config; size_t thread_id = 0; + int ret = 0; if (g_hos_instance.status == INSTANCE_UNINIT_STATE) { @@ -1042,7 +1043,7 @@ int hos_close_fd(size_t fd) if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, - "hos_close_fd", "debug: [%s] not find the a_fd_context of [thread:%lu fd:%lu]", + "hos_close_fd", "debug: [%s] not find the a_fd_context of [thread:%zu fd:%zu]", g_hos_instance.hos_url_prefix, thread_id, fd); return HOS_CLIENT_OK; } @@ -1051,7 +1052,7 @@ int hos_close_fd(size_t fd) if (thread_id > hos_conf->thread_num) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", - "error: [%s] fd:%lu, thread_id:%lu, thread_sum:%u.", + "error: [%s] fd:%zu, thread_id:%zu, thread_sum:%u.", g_hos_instance.hos_url_prefix, fd, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } @@ -1080,11 +1081,11 @@ int hos_close_fd(size_t fd) if (hos_conf->pool_thread_size > 0) { - hos_putobject_async(request, upload_len, thread_id, &a_fd_context); + ret = hos_putobject_async(request, upload_len, thread_id, &a_fd_context); } else { - hos_putobject_sync(request, upload_len, thread_id, &a_fd_context); + ret = hos_putobject_sync(request, upload_len, thread_id, &a_fd_context); } data_info_t *data_info = (data_info_t *)(g_hos_handle.hos_func.fs2_info.reserved); if (data_info) @@ -1110,13 +1111,20 @@ int hos_close_fd(size_t fd) if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s/%s/%s] upload completed. [thread:%lu fd:%lu] delete", + "debug: [%s/%s/%s] upload completed. [thread:%zu fd:%zu] delete", g_hos_instance.hos_url_prefix, a_fd_context->bucket, a_fd_context->object, thread_id, fd); hos_delete_fd(fd, thread_id); } } - return HOS_CLIENT_OK; + if (ret == HOS_CLIENT_OK) + { + return HOS_CLIENT_OK; + } + else + { + return HOS_FD_CLOSE_BUT_SEND_FAILED; + } } int hos_shutdown_instance() @@ -1131,7 +1139,7 @@ int hos_shutdown_instance() if (g_hos_handle.count > 0 && --g_hos_handle.count) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: [%s] hos client count:%lu.", g_hos_instance.hos_url_prefix, g_hos_handle.count); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: [%s] hos client count:%zu.", g_hos_instance.hos_url_prefix, g_hos_handle.count); return HOS_CLIENT_OK; } diff --git a/src/hos_client.h b/src/hos_client.h index 7e3b048a..4b2c13c3 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -10,6 +10,7 @@ struct hos_instance_s; typedef struct hos_instance_s *hos_instance; #define HOS_CLIENT_OK 0 +#define HOS_IN_CACHE 0xFF /* fd mode */ #define FILE_MODE 0x00 @@ -30,6 +31,7 @@ enum hoserrors HOS_INSTANCE_NOT_INIT = -9, HOS_INSTANCE_NOT_ENABLE = -10, HOS_FD_OVER_POSITION = -11, + HOS_FD_CLOSE_BUT_SEND_FAILED = -12, }; /* s3 的错误码 */