From 696fcecb7c2a4d0bb5f908e3c4fd8b77ba830b0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=AE=A3=E6=AD=A3?= Date: Thu, 24 Jun 2021 16:26:39 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(gtest=E4=B8=8Esrc):=20?= =?UTF-8?q?=E4=BF=AE=E6=94=B9fd=E7=AE=A1=E7=90=86=E6=B5=81=E7=A8=8B?= =?UTF-8?q?=EF=BC=8CTSG-6760?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gtest/CheckHosClient.cpp | 5 +- gtest/gtest_hos_close_fd.cpp | 11 +- gtest/gtest_hos_get_instance.cpp | 3 +- gtest/gtest_hos_init_instance.cpp | 3 +- gtest/gtest_hos_open_fd.cpp | 17 +- gtest/gtest_hos_shutdown_instance.cpp | 7 +- gtest/gtest_hos_upload_buff.cpp | 3 +- gtest/gtest_hos_upload_file.cpp | 3 +- gtest/gtest_hos_verify_bucket.cpp | 3 +- gtest/gtest_hos_write.cpp | 48 ++-- src/hos_client.cpp | 372 +++++++++----------------- src/hos_common.h | 8 +- src/hos_hash.cpp | 2 - src/hos_hash.h | 7 +- 14 files changed, 185 insertions(+), 307 deletions(-) diff --git a/gtest/CheckHosClient.cpp b/gtest/CheckHosClient.cpp index 67afb015..354752cc 100644 --- a/gtest/CheckHosClient.cpp +++ b/gtest/CheckHosClient.cpp @@ -16,7 +16,8 @@ void CheckStructHosConfigT(hos_config_t *actual, hos_config_t *expect) EXPECT_STREQ(actual->ip, expect->ip); EXPECT_EQ(actual->port, expect->port); EXPECT_EQ(actual->thread_num, expect->thread_num); - EXPECT_EQ(actual->timeout, expect->timeout); + EXPECT_EQ(actual->max_request_context, expect->max_request_context); + EXPECT_EQ(actual->max_request_num, expect->max_request_num); } void CheckStructFs2DataInfo(data_info_t *actual, data_info_t *expect, int thread_num) @@ -101,10 +102,8 @@ void CheckStructGHosFdContext(hos_fd_context_t *actual, hos_fd_context_t *expect EXPECT_EQ(actual->fd_status, expect->fd_status); EXPECT_EQ(actual->mode, expect->mode); EXPECT_STREQ(actual->object, expect->object); - EXPECT_EQ(actual->overtime, expect->overtime); EXPECT_EQ(actual->position, expect->position); EXPECT_EQ(actual->recive_cnt, expect->recive_cnt); - EXPECT_EQ(actual->timeout, expect->timeout); EXPECT_EQ(actual->userdata, expect->userdata); } } diff --git a/gtest/gtest_hos_close_fd.cpp b/gtest/gtest_hos_close_fd.cpp index 9f0cabd3..502ed862 100644 --- a/gtest/gtest_hos_close_fd.cpp +++ b/gtest/gtest_hos_close_fd.cpp @@ -24,7 +24,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; @@ -56,13 +57,11 @@ static void gtest_hos_fd_init(hos_fd_context_t *fd_info) fd_info->cache_count = 10; fd_info->cache_rest = g_hos_handle.hos_config.cache_size; fd_info->callback = NULL; - fd_info->fd = 3; + fd_info->fd = 1; fd_info->fd_status = HOS_FD_REGISTER; fd_info->mode = BUFF_MODE; - fd_info->overtime = 0; fd_info->position = 0; fd_info->recive_cnt = 0; - fd_info->timeout = g_hos_handle.hos_config.timeout; fd_info->userdata = NULL; } @@ -80,7 +79,7 @@ TEST(hos_close_fd, normal) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info); @@ -131,7 +130,7 @@ TEST(hos_close_fd, paramer_error) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); CheckStructGHosFdContext(g_fd_context[0], NULL); - int ret = hos_close_fd(fd, 0); + int ret = hos_close_fd(fd, thread_num + 1); EXPECT_EQ(ret, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); diff --git a/gtest/gtest_hos_get_instance.cpp b/gtest/gtest_hos_get_instance.cpp index 5a283111..c2fccb22 100644 --- a/gtest/gtest_hos_get_instance.cpp +++ b/gtest/gtest_hos_get_instance.cpp @@ -24,7 +24,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; diff --git a/gtest/gtest_hos_init_instance.cpp b/gtest/gtest_hos_init_instance.cpp index 0c2a1c01..f44a3842 100644 --- a/gtest/gtest_hos_init_instance.cpp +++ b/gtest/gtest_hos_init_instance.cpp @@ -29,7 +29,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; diff --git a/gtest/gtest_hos_open_fd.cpp b/gtest/gtest_hos_open_fd.cpp index dae38b7c..5c175737 100644 --- a/gtest/gtest_hos_open_fd.cpp +++ b/gtest/gtest_hos_open_fd.cpp @@ -24,7 +24,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; @@ -56,13 +57,11 @@ static void gtest_hos_fd_init(hos_fd_context_t *fd_info) fd_info->cache_count = 10; fd_info->cache_rest = g_hos_handle.hos_config.cache_size; fd_info->callback = NULL; - fd_info->fd = 3; + fd_info->fd = 1; fd_info->fd_status = HOS_FD_REGISTER; fd_info->mode = BUFF_MODE; - fd_info->overtime = 0; fd_info->position = 0; fd_info->recive_cnt = 0; - fd_info->timeout = g_hos_handle.hos_config.timeout; fd_info->userdata = NULL; } @@ -79,14 +78,14 @@ TEST(hos_open_fd, normal) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); size_t fd1 = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 1, BUFF_MODE); - EXPECT_EQ(fd1, 3); + EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); @@ -198,6 +197,7 @@ TEST(hos_open_fd, over_threadnums) EXPECT_EQ((void *)g_fd_context, (void *)NULL); } +#if 0 //修改fd管理逻辑, fd不再有大小限制 TEST(hos_open_fd, fd_not_enough) { int i = 0, fd = 0; @@ -217,8 +217,8 @@ TEST(hos_open_fd, fd_not_enough) for (i = 0; i < 65533; i++) { fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, i+3); - expect_fd_info.fd = i+3; + EXPECT_EQ(fd, i+1); + expect_fd_info.fd = i+1; if (i == 0) { current = g_fd_context[0]; @@ -257,6 +257,7 @@ TEST(hos_open_fd, fd_not_enough) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); EXPECT_EQ((void *)g_fd_context, (void *)NULL); } +#endif TEST(hos_open_fd, not_init_instance) { diff --git a/gtest/gtest_hos_shutdown_instance.cpp b/gtest/gtest_hos_shutdown_instance.cpp index 0b2bcc98..4b53a120 100644 --- a/gtest/gtest_hos_shutdown_instance.cpp +++ b/gtest/gtest_hos_shutdown_instance.cpp @@ -29,7 +29,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; @@ -89,7 +90,7 @@ TEST(hos_shutdown_instance, normal) TEST(hos_shutdown_instance, no_init) { int ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); + EXPECT_EQ(ret, HOS_INSTANCE_NOT_INIT); } TEST(hos_shutdown_instance, shutdown_more) @@ -124,7 +125,7 @@ TEST(hos_shutdown_instance, shutdown_more) EXPECT_EQ((void *)g_fd_context, (void *)NULL); ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); + EXPECT_EQ(ret, HOS_INSTANCE_NOT_INIT); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); EXPECT_EQ((void *)g_fd_context, (void *)NULL); diff --git a/gtest/gtest_hos_upload_buff.cpp b/gtest/gtest_hos_upload_buff.cpp index a0b239df..e5858ab6 100644 --- a/gtest/gtest_hos_upload_buff.cpp +++ b/gtest/gtest_hos_upload_buff.cpp @@ -25,7 +25,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; diff --git a/gtest/gtest_hos_upload_file.cpp b/gtest/gtest_hos_upload_file.cpp index aaea4c4e..d2dbb247 100644 --- a/gtest/gtest_hos_upload_file.cpp +++ b/gtest/gtest_hos_upload_file.cpp @@ -25,7 +25,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; diff --git a/gtest/gtest_hos_verify_bucket.cpp b/gtest/gtest_hos_verify_bucket.cpp index 890e8955..57fad317 100644 --- a/gtest/gtest_hos_verify_bucket.cpp +++ b/gtest/gtest_hos_verify_bucket.cpp @@ -29,7 +29,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; diff --git a/gtest/gtest_hos_write.cpp b/gtest/gtest_hos_write.cpp index 456055f1..9c29cb50 100644 --- a/gtest/gtest_hos_write.cpp +++ b/gtest/gtest_hos_write.cpp @@ -26,7 +26,8 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_config.pool_thread_size = 10; hos_handle->hos_config.port = 9098; hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.timeout = 1000; + hos_handle->hos_config.max_request_num = 100; + hos_handle->hos_config.max_request_context = 10240000; hos_handle->hos_func.fd_thread_status = 0; hos_handle->hos_func.fs2_status = 1; @@ -58,13 +59,11 @@ static void gtest_hos_fd_init(hos_fd_context_t *fd_info) fd_info->cache_count = 10; fd_info->cache_rest = g_hos_handle.hos_config.cache_size; fd_info->callback = NULL; - fd_info->fd = 3; + fd_info->fd = 1; fd_info->fd_status = HOS_FD_REGISTER; fd_info->mode = BUFF_MODE; - fd_info->overtime = 0; fd_info->position = 0; fd_info->recive_cnt = 0; - fd_info->timeout = g_hos_handle.hos_config.timeout; fd_info->userdata = NULL; } @@ -129,7 +128,7 @@ TEST(hos_write, normal) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); size_t fd = hos_open_fd(HOS_BUCKET, "object_buff", hos_write_buff_cb, (void *)"object_buff", 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); @@ -149,10 +148,9 @@ TEST(hos_write, normal) CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - //EXPECT_TRUE(g_fd_context[0][0].cache == NULL); size_t fd1 = hos_open_fd(HOS_BUCKET, "object_append", hos_write_append_cb, (void *)"object_append", 1, BUFF_MODE | APPEND_MODE); - EXPECT_EQ(fd1, 3); + EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); @@ -187,7 +185,7 @@ TEST(hos_write, normal) EXPECT_TRUE(g_fd_context[1][0].cache != NULL); size_t fd2 = hos_open_fd(HOS_BUCKET, "object_file", hos_write_file_cb, (void *)"object_file", 2, FILE_MODE); - EXPECT_EQ(fd2, 3); + EXPECT_EQ(fd2, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[2]); @@ -262,7 +260,7 @@ TEST(hos_write, bucket_not_exits) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); size_t fd = hos_open_fd("bucket_not_exits", "object_buff", hos_bucket_not_exits_cb, (void *)"object_buff", 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); @@ -280,10 +278,9 @@ TEST(hos_write, bucket_not_exits) data_info->tx_failed_pkts[0] += 1; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - EXPECT_TRUE(g_fd_context[0][0].cache == NULL); size_t fd1 = hos_open_fd("bucket_not_exits", "object_append", hos_bucket_not_exits_cb, (void *)"object_append", 1, BUFF_MODE | APPEND_MODE); - EXPECT_EQ(fd1, 3); + EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); @@ -319,7 +316,7 @@ TEST(hos_write, bucket_not_exits) EXPECT_TRUE(g_fd_context[1][0].cache != NULL); size_t fd2 = hos_open_fd("bucket_not_exits", "object_file", hos_bucket_not_exits_cb, (void *)"object_file", 2, FILE_MODE); - EXPECT_EQ(fd2, 3); + EXPECT_EQ(fd2, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[2]); @@ -342,7 +339,6 @@ TEST(hos_write, bucket_not_exits) CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - EXPECT_TRUE(g_fd_context[2][0].cache == NULL); ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -396,7 +392,7 @@ TEST(hos_write, sync_mode) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); size_t fd = hos_open_fd(HOS_BUCKET, "object_buff", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); @@ -412,10 +408,9 @@ TEST(hos_write, sync_mode) CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - EXPECT_TRUE(g_fd_context[0][0].cache == NULL); size_t fd1 = hos_open_fd(HOS_BUCKET, "object_append", NULL, NULL, 1, BUFF_MODE | APPEND_MODE); - EXPECT_EQ(fd1, 3); + EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); @@ -448,7 +443,7 @@ TEST(hos_write, sync_mode) EXPECT_TRUE(g_fd_context[1][0].cache != NULL); size_t fd2 = hos_open_fd(HOS_BUCKET, "object_buff", NULL, NULL, 2, FILE_MODE); - EXPECT_EQ(fd2, 3); + EXPECT_EQ(fd2, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[2]); @@ -467,7 +462,6 @@ TEST(hos_write, sync_mode) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); expect_fd_info[2].fd_status = 2; //CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); - EXPECT_TRUE(g_fd_context[2][0].cache == NULL); ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -521,7 +515,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); size_t fd = hos_open_fd(HOS_CONF, "object_buff", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); @@ -540,10 +534,9 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - EXPECT_TRUE(g_fd_context[0][0].cache == NULL); size_t fd1 = hos_open_fd(HOS_CONF, "object_append", NULL, NULL, 1, BUFF_MODE | APPEND_MODE); - EXPECT_EQ(fd1, 3); + EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); @@ -577,7 +570,7 @@ TEST(hos_write, sync_mode_bucket_not_exits) EXPECT_TRUE(g_fd_context[1][0].cache != NULL); size_t fd2 = hos_open_fd(HOS_CONF, "object_file", NULL, NULL, 2, FILE_MODE); - EXPECT_EQ(fd2, 3); + EXPECT_EQ(fd2, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[2]); @@ -597,7 +590,6 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - EXPECT_TRUE(g_fd_context[2][0].cache == NULL); ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -648,7 +640,7 @@ TEST(hos_write, paramer_error) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int fd = hos_open_fd(HOS_BUCKET, "object_buff", hos_callback, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info); @@ -657,7 +649,7 @@ TEST(hos_write, paramer_error) expect_fd_info.mode = BUFF_MODE; CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info); - int ret = hos_write(0, HOS_BUFF, strlen(HOS_BUFF), 0); + int ret = hos_write(fd, NULL, strlen(HOS_BUFF), 0); EXPECT_EQ(ret, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -736,7 +728,7 @@ TEST(hos_write, file_not_exit) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int fd = hos_open_fd(HOS_CONF, "object_file", NULL, NULL, 0, FILE_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info); @@ -784,7 +776,7 @@ TEST(hos_write, over_threadnums) CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, 3); + EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info); @@ -857,7 +849,7 @@ static void *hos_function(void *ptr) { snprintf(object[i], 1024, "object_%lu_%d", thread_id, i); fd[i] = hos_open_fd(HOS_BUCKET, object[i], hos_callback, object[i], 0, BUFF_MODE | APPEND_MODE); - EXPECT_EQ(fd[i], i + 3); + EXPECT_EQ(fd[i], i + 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[thread_id][i]); diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 2a5438b6..89871232 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -47,11 +47,9 @@ struct hos_instance_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; hos_fd_context_t **g_fd_context; -size_t (*g_fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd +size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; -static void *hos_fd_manage(void *ptr); - static inline size_t get_current_ms() { struct timespec timenow; @@ -59,30 +57,12 @@ static inline size_t get_current_ms() return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); } -static size_t hash_get_min_free_fd(size_t thread_id) -{ - size_t i = 0; - for (i = 3; i < MAX_HOS_CLIENT_FD_NUM + 1; i++) - { - if (!g_fd_info[thread_id][i]) - { - g_fd_info[thread_id][i] = 1; - g_fd_info[thread_id][HOS_FD_REGISTER]++; - g_fd_info[thread_id][HOS_FD_FREE]--; - - return i; - } - } - return 0; -} - static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) { if (context == NULL) { return HOS_PARAMETER_ERROR; } - size_t fd = context->fd; if (context) { @@ -99,10 +79,6 @@ static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) HASH_DEL(g_fd_context[thread_id], context); free(context); } - - g_fd_info[thread_id][fd] = 0; - g_fd_info[thread_id][HOS_FD_FREE]++; - g_fd_info[thread_id][HOS_FD_INJECT]--; return HOS_CLIENT_OK; } @@ -118,19 +94,17 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, data_info_t *data_info = NULL; const Aws::String& uuid = context->GetUUID(); size_t thread_id, fd, stream_len; + sscanf(uuid.c_str(), "%lu %lu %lu", &thread_id, &fd, &stream_len); - if (g_fd_info[thread_id][fd]) - { - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - } + a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: Not find the info of [thread_id:%lu fd:%lu]", thread_id, fd); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } @@ -144,18 +118,18 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: [%s:%s] upload failed. error:%s", a_fd_context->bucket, a_fd_context->object, error); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } } else { - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, @@ -175,14 +149,28 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, { //APPEND MODE 保留fd atomic_add(&(a_fd_context->recive_cnt), 1); + if (a_fd_context->fd_status == HOS_FD_INJECT) + { + if (a_fd_context->position == a_fd_context->recive_cnt) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + hos_delete_fd(thread_id, a_fd_context); + } + } } else { //完整上传 删除fd - hos_close_fd(fd, thread_id); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + hos_delete_fd(thread_id, a_fd_context); } } g_hos_handle.task_num[thread_id]--; + g_hos_handle.task_context[thread_id]--; } static void hos_client_create() @@ -243,20 +231,18 @@ static void hos_client_create() g_hos_handle.count++; g_hos_handle.executor = std::dynamic_pointer_cast(config.executor); g_hos_handle.task_num = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + g_hos_handle.task_context = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); g_fd_context = (hos_fd_context_t **)calloc(hos_conf->thread_num, sizeof(hos_fd_context_t *)); - g_fd_info = (size_t (*)[MAX_HOS_CLIENT_FD_NUM + 1])calloc(hos_conf->thread_num, sizeof(size_t [MAX_HOS_CLIENT_FD_NUM + 1])); - - for (size_t i = 0; i < hos_conf->thread_num; i++) - { - g_fd_info[i][0] = 65533; - } + g_fd_info = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); + #if 0 if (g_hos_handle.hos_func.fd_thread == 0) { g_hos_handle.hos_func.fd_thread_status = 0; pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); } + #endif MESA_HANDLE_RUNTIME_LOG(log, RLOG_LV_DEBUG, "hos_client_create", "debug: hos s3client create success, url:%s.",endpoint); g_hos_instance.result = true; @@ -272,9 +258,6 @@ bool hos_verify_bucket(const char *bucket) } if (g_hos_instance.result != true || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: g_hos_instance.result:%d, g_hos_handle.S3Client:%s", - g_hos_instance.result, (g_hos_handle.S3Client==NULL)?("null"):("not null")); return false; } Aws::S3::Model::ListBucketsOutcome outcome = g_hos_handle.S3Client->ListBuckets(); @@ -324,14 +307,10 @@ static void *fs2_statistics(void *ptr) size_t tx_failed_pkts_last = 0; size_t cache_last = 0; fs2_info_t *fs2_info = NULL; - int PoolThread_state[3] = {0, 0, 0};//{PoolSize, Busy, TopBusy} - int *busy = &PoolThread_state[1]; - int *top_busy = &PoolThread_state[2]; - int pool_history_sum = 0; hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; + size_t task_num = 0; - PoolThread_state[0] = hos_conf->pool_thread_size; while(1) { if (hos_func->fs2_status == HOS_FS2_STOP) @@ -348,7 +327,7 @@ static void *fs2_statistics(void *ptr) tx_failed_pkts_sum = 0; cache_sum = 0; - fs2_info = &hos_func->fs2_info[0]; + fs2_info = &hos_func->fs2_info; data_info_t *data_info = (data_info_t *)fs2_info->reserved; for (i = 0; i < hos_conf->thread_num; i++) { @@ -359,6 +338,8 @@ static void *fs2_statistics(void *ptr) tx_failed_bytes_sum += data_info->tx_failed_bytes[i]; tx_failed_pkts_sum += data_info->tx_failed_pkts[i]; cache_sum += data_info->cache[i]; + + task_num += g_hos_handle.task_num[i]; } rx_pkts_interval = rx_pkts_sum - rx_pkts_last; @@ -393,20 +374,6 @@ static void *fs2_statistics(void *ptr) FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[5], FS_OP_SET, tx_failed_bytes_sum); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[6], FS_OP_SET, cache_sum); - //PoolThread State - if (hos_conf->pool_thread_size > 0) - { - *busy = g_hos_handle.executor->GetTaskSize(); - *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy); - pool_history_sum += *busy; - - fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; - for (i = 0; i < 3; i++) - { - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]); - } - } - sleep(1); } pthread_exit(NULL); @@ -458,7 +425,7 @@ static void hos_expand_fs2() hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t i = 0; - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) return; //data info /********************************************************************************************************** @@ -466,9 +433,9 @@ static void hos_expand_fs2() * current 10 100 1 100 0 0 100 * total 100 1000 10 1000 0 0 100(无实意) ***********************************************************************************************************/ - fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE]; - hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data")); - fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle; + fs2_info = &hos_func->fs2_info; + hos_func->fs2_info.fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data")); + fs2_handle = hos_func->fs2_info.fs2_handle; fs2_info->line_ids = (int *)calloc(2, sizeof(int)); fs2_info->column_ids = (int *)calloc(7, sizeof(int)); @@ -492,29 +459,6 @@ static void hos_expand_fs2() data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); FS_start(fs2_handle); - if (hos_conf->pool_thread_size > 0) - { - //PoolThread state - /******************************************************* - * PoolSize Busy TopBusy AveBusy - * ThreadNum 1000 500 800 650 - ********************************************************/ - fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; - hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle = hos_init_fs2((char *)"hos-poolthread", strlen("hos-poolthread")); - fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle; - fs2_info->line_ids = (int *)calloc(1, sizeof(int)); - fs2_info->column_ids = (int *)calloc(3, sizeof(int)); - - const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"}; - for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++) - { - fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); - } - fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); - - FS_start(fs2_handle); - } - pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL); return ; @@ -527,17 +471,31 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t int ret = 0; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; data_info_t *data_info = NULL; + hos_config_t *hos_conf = &g_hos_handle.hos_config; + //设置回调函数 std::shared_ptr context = Aws::MakeShared(""); sprintf(buf, "%lu %lu %lu", thread_id, fd, stream_len); context->SetUUID(buf); + if (hos_conf->max_request_num && hos_conf->max_request_context && + (g_hos_handle.task_num[thread_id] >= hos_conf->max_request_num || + g_hos_handle.task_context[thread_id] >= hos_conf->max_request_context)) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: PutObjectAsync failed. [%s:%s]. task_num:%lu, task_context:%lu", + bucket, object, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]); + + return HOS_SEND_FAILED; + } + auto &S3Client = *(g_hos_handle.S3Client); ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); if (ret) { g_hos_handle.task_num[thread_id]++; + g_hos_handle.task_context[thread_id] += stream_len; //不算真正成功,需要等到PutObjectAsyncFinished的结果 MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObjectAsync success. [%s:%s]", bucket, object); @@ -549,11 +507,11 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObjectAsync failed. [%s:%s]", bucket, object); - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } @@ -572,9 +530,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t Aws::S3::Model::PutObjectOutcome Outcome = S3Client.PutObject(request); if (Outcome.IsSuccess()) { - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_pkts[thread_id]++; data_info->tx_bytes[thread_id] += stream_len; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, @@ -594,9 +552,9 @@ static int hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: PutObject failed. [%s:%s] cause:%s", bucket, object, Outcome.GetError().GetMessage().c_str()); - if (hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle && hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved) + if (hos_func->fs2_info.fs2_handle && hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[FS2_DATA_FLOW_STATE].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->tx_failed_pkts[thread_id]++; data_info->tx_failed_bytes[thread_id] += stream_len; } @@ -641,15 +599,16 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t MESA_load_profile_uint_def(conf_path, module, "hos_poolsize", &hos_conf->pool_thread_size, 0); MESA_load_profile_uint_def(conf_path, module, "hos_cache_size", &hos_conf->cache_size, 102400); MESA_load_profile_uint_def(conf_path, module, "hos_cache_count", &hos_conf->cache_count, 10); - MESA_load_profile_uint_def(conf_path, module, "hos_fd_live_time_ms", &hos_conf->timeout, 1000); MESA_load_profile_string_nodef(conf_path, module, "hos_fs2_serverip", hos_conf->fs2_ip, INET6_ADDRSTRLEN); MESA_load_profile_uint_nodef(conf_path, module, "hos_fs2_serverport", &hos_conf->fs2_port); MESA_load_profile_string_def(conf_path, module, "hos_fs2_path", hos_conf->fs2_path, sizeof(hos_conf->fs2_path), "./hos_fs2.stat"); MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0); + MESA_load_profile_uint_def(conf_path, module, "hos_request_num", &hos_conf->max_request_num, 100); + MESA_load_profile_uint_def(conf_path, module, "hos_request_context", &hos_conf->max_request_context, 10240000); if (hos_conf->ip && hos_conf->port && strlen(hos_conf->accesskeyid) && strlen(hos_conf->secretkey)) { g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); - if (log == NULL) + if (g_hos_handle.log == NULL) { g_hos_instance.result = false; g_hos_instance.error_code = HOS_RUNTIME_LOG_FAILED; @@ -693,17 +652,20 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t g_hos_instance.error_code = HOS_CONF_ERROR; snprintf(g_hos_instance.error_message, HOS_ERROR_MESSAGE_SIZE, "hos param error:hos ip:%s, hos port:%u, accesskeyid:%s, secretkey:%s", hos_conf->ip, hos_conf->port, hos_conf->accesskeyid, hos_conf->secretkey); - MESA_destroy_runtime_log_handle(g_hos_handle.log); return &g_hos_instance; } } int hos_create_bucket(const char *bucket) { - if ((bucket == NULL) || (g_hos_handle.S3Client == NULL)) + if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + { + return HOS_INSTANCE_NOT_INIT; + } + if (bucket == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_create_bucket", - "error:bucket:%s, s3client:%s", bucket, g_hos_handle.S3Client?"not null":"null"); + "error:bucket:%s", bucket); return HOS_PARAMETER_ERROR; } auto& S3Client = *g_hos_handle.S3Client; @@ -747,7 +709,12 @@ static int hos_upload_stream(const char *bucket, const char *object, const char int ret; int mode = 0; - if ((g_hos_handle.S3Client == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) + if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) + { + return HOS_INSTANCE_NOT_INIT; + } + + if ((bucket == NULL) || (object == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_upload_stream", "error: s3client:%s, bucket:%s, object:%s, thread_id:%lu, thread_num:%u", @@ -780,31 +747,22 @@ static int hos_upload_stream(const char *bucket, const char *object, const char request.SetBody(input_data); } //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->rx_pkts[thread_id]++; data_info->rx_bytes[thread_id] += data_len; } } //设置回调函数 - size_t fd = hash_get_min_free_fd(thread_id); + size_t fd = ++g_fd_info[thread_id]; hos_fd_context_t info = {fd, 0, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_fd_context(&g_fd_context[thread_id], &info); - - { - std::lock_guard locker(m_client_lock); - if (g_hos_handle.hos_func.fd_thread == 0) - { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); - } - } - + if (hos_conf->pool_thread_size > 0) { ret = hos_putobject_async(request, data_len, thread_id, fd, bucket, object); @@ -822,9 +780,6 @@ int hos_upload_file(const char *bucket, const char *file_path, put_finished_call struct stat buffer; if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; } @@ -848,9 +803,6 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; } @@ -865,102 +817,28 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); } -static void *hos_fd_manage(void *ptr) -{ - hos_fd_context_t *a_fd_context; - size_t thread_sum = g_hos_handle.hos_config.thread_num; - size_t thread_num; - //size_t fd; - while(1) - { - if (g_hos_handle.hos_func.fd_thread_status) - break; - for (thread_num = 0; thread_num < thread_sum; thread_num++) - { -#if 0 - a_fd_context = find_context_by_fd(g_fd_context[thread_num], fd); - if (!a_fd_context) - continue; -#endif - hos_fd_context_t *tmp = NULL; - HASH_ITER(hh, g_fd_context[thread_num], a_fd_context, tmp) - { - if (!a_fd_context) - break; - - if (a_fd_context->fd_status == HOS_FD_INJECT) - { - if (a_fd_context->position == a_fd_context->recive_cnt) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, - "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", - a_fd_context->bucket, a_fd_context->object, thread_num, a_fd_context->fd); - hos_delete_fd(thread_num, a_fd_context); - } - else if (a_fd_context->overtime <= get_current_ms()) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error: [%s:%s] upload not completed, but the live-time of [thread_id:%lu fd:%lu] is over.", - a_fd_context->bucket, a_fd_context->object, thread_num, a_fd_context->fd); - hos_delete_fd(thread_num, a_fd_context); - } - } - } - } - usleep(500000); - } - pthread_exit(NULL); -} - int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } if ((bucket == NULL) || (object == NULL) || (thread_id > g_hos_handle.hos_config.thread_num) || strlen(bucket) == 0 || strlen(object) == 0) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", "error: bucket:%s, obejct:%s, thread_id:%lu", - //(bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); - bucket, object, thread_id); + (bucket == NULL)?"null":bucket, (object == NULL)?"null":object, thread_id); return HOS_PARAMETER_ERROR; } - size_t fd = hash_get_min_free_fd(thread_id); - if (fd == 0) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_open_fd", - "error:fd not enough, thread_id:%lu, fd free: %lu, fd register:%lu, fd inject:%lu", - thread_id, - g_fd_info[thread_id][HOS_FD_FREE], - g_fd_info[thread_id][HOS_FD_REGISTER], - g_fd_info[thread_id][HOS_FD_INJECT]); - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); - return HOS_FD_NOT_ENOUGH; - } + size_t fd = ++g_fd_info[thread_id]; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); hos_fd_context_t info = {fd, mode, (char *)bucket, (char *)object, (void *)callback, userdata, NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ - (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/ - 0,/*overtime*/ g_hos_handle.hos_config.timeout,}; + (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/}; add_fd_context(&g_fd_context[thread_id], &info); -#if 0 - { - std::lock_guard locker(m_client_lock); - if (g_hos_handle.hos_func.fd_thread == 0) - { - g_hos_handle.hos_func.fd_thread_status = 0; - pthread_create(&g_hos_handle.hos_func.fd_thread, NULL, hos_fd_manage, NULL); - } - } -#endif - return fd; } @@ -977,13 +855,10 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?(NULL):("not null")); return HOS_INSTANCE_NOT_INIT; } - if ((fd < 3) || fd > MAX_HOS_CLIENT_FD_NUM || (stream == NULL) || (thread_id > hos_conf->thread_num)) + if ((stream == NULL) || (thread_id > hos_conf->thread_num)) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_write", "error: fd:%lu, stream:%s, stream_len:%lu, thread_id:%lu.", @@ -991,10 +866,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id return HOS_PARAMETER_ERROR; } - if (g_fd_info[thread_id][fd]) - { - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - } + a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fd info not find. thread_id:%lu, fd:%lu", thread_id, fd); @@ -1011,11 +883,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id //BUFF_MODE //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->rx_pkts[thread_id]++; data_info->rx_bytes[thread_id] += stream_len; } @@ -1079,11 +951,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id request.SetBody(input_data); upload_len = buffer.st_size; //field_stat2 record - if (hos_func->fs2_info[0].fs2_handle) + if (hos_func->fs2_info.fs2_handle) { - if (hos_func->fs2_info[0].reserved) + if (hos_func->fs2_info.reserved) { - data_info = (data_info_t *)hos_func->fs2_info[0].reserved; + data_info = (data_info_t *)hos_func->fs2_info.reserved; data_info->rx_pkts[thread_id]++; data_info->rx_bytes[thread_id] += upload_len; } @@ -1124,28 +996,22 @@ int hos_close_fd(size_t fd, size_t thread_id) if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, - "error:g_hos_instance.result:%d, g_hos_handle.S3CLient:%s", - g_hos_instance.result, (g_hos_handle.S3Client == NULL)?("null"):("not null")); return HOS_INSTANCE_NOT_INIT; } - if (fd < 3 || fd > 65533 || thread_id > hos_conf->thread_num) + if (thread_id > hos_conf->thread_num) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, "hos_close_fd", "error:fd:%lu, thread_id:%lu, thread_sum:%u.", fd, thread_id, hos_conf->thread_num); return HOS_PARAMETER_ERROR; } - if (g_fd_info[thread_id][fd]) - { - a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); - } + a_fd_context = find_context_by_fd(g_fd_context[thread_id], fd); if (a_fd_context == NULL) { MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, - "hos_close_fd", "debug: not find the a_fd_context of [fd:%lu thread:%lu]", - fd, thread_id); + "hos_close_fd", "debug: not find the a_fd_context of [thread:%lu fd:%lu]", + thread_id, fd); return HOS_CLIENT_OK; } @@ -1179,18 +1045,32 @@ int hos_close_fd(size_t fd, size_t thread_id) { hos_putobject_sync(request, upload_len, thread_id, fd, a_fd_context->bucket, a_fd_context->object); } - ((data_info_t *)(g_hos_handle.hos_func.fs2_info->reserved))->cache[thread_id] = 0; + ((data_info_t *)(g_hos_handle.hos_func.fs2_info.reserved))->cache[thread_id] = 0; } } a_fd_context->fd_status = HOS_FD_INJECT; a_fd_context->cache.reset(); a_fd_context->cache = NULL; - a_fd_context->overtime = get_current_ms() + a_fd_context->timeout; a_fd_context->cache_rest = hos_conf->cache_size; a_fd_context->cache_count = hos_conf->cache_count; - g_fd_info[thread_id][HOS_FD_REGISTER]--; - g_fd_info[thread_id][HOS_FD_INJECT]++; + if (hos_conf->pool_thread_size == 0) + { + //同步模式,立即释放fd + hos_delete_fd(thread_id, a_fd_context); + } + else + { + //异步APPEND 模式,判断是否可以释放 + //异步其他模式,在PutObjectAsyncFinished出释放fd + if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt) + { + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, + "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", + a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); + hos_delete_fd(thread_id, a_fd_context); + } + } return HOS_CLIENT_OK; } @@ -1203,10 +1083,9 @@ int hos_shutdown_instance() hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t task_num = 0; - if (g_hos_handle.S3Client == NULL) + if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_shutdown_instance", "debug: There is no hos client."); - return HOS_CLIENT_OK; + return HOS_INSTANCE_NOT_INIT; } if (g_hos_handle.count > 0 && --g_hos_handle.count) @@ -1241,17 +1120,17 @@ int hos_shutdown_instance() pthread_join(hos_func->fs2_thread, NULL); for (i = 0; i < FS2_RECORD_EVENTS; i++) { - screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle; + screen_stat_handle_t *fs2_handle = &hos_func->fs2_info.fs2_handle; if (*fs2_handle) { FS_stop(fs2_handle); *fs2_handle = NULL; } - if (hos_func->fs2_info[i].reserved) + if (hos_func->fs2_info.reserved) { if (i == 0) { - data_info_t * data_info = (data_info_t *)hos_func->fs2_info[i].reserved; + data_info_t * data_info = (data_info_t *)hos_func->fs2_info.reserved; if (data_info->rx_pkts) free(data_info->rx_pkts); if (data_info->rx_bytes) @@ -1267,18 +1146,18 @@ int hos_shutdown_instance() if (data_info->cache) free(data_info->cache); } - free(hos_func->fs2_info[i].reserved); - hos_func->fs2_info[i].reserved = NULL; + free(hos_func->fs2_info.reserved); + hos_func->fs2_info.reserved = NULL; } - if (hos_func->fs2_info[i].line_ids) + if (hos_func->fs2_info.line_ids) { - free(hos_func->fs2_info[i].line_ids); - hos_func->fs2_info[i].line_ids=NULL; + free(hos_func->fs2_info.line_ids); + hos_func->fs2_info.line_ids=NULL; } - if (hos_func->fs2_info[i].column_ids) + if (hos_func->fs2_info.column_ids) { - free(hos_func->fs2_info[i].column_ids); - hos_func->fs2_info[i].column_ids=NULL; + free(hos_func->fs2_info.column_ids); + hos_func->fs2_info.column_ids=NULL; } } } @@ -1290,6 +1169,11 @@ int hos_shutdown_instance() free(g_hos_handle.task_num); g_hos_handle.task_num = NULL; } + if (g_hos_handle.task_context != NULL) + { + free(g_hos_handle.task_context); + g_hos_handle.task_context = NULL; + } MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: delete s3client."); if (g_fd_info) diff --git a/src/hos_common.h b/src/hos_common.h index 9c9db6a0..29d9dce0 100644 --- a/src/hos_common.h +++ b/src/hos_common.h @@ -68,7 +68,8 @@ typedef struct hos_config_s uint32_t thread_num; uint32_t cache_size; uint32_t cache_count; - uint32_t timeout; + uint32_t max_request_num; + uint32_t max_request_context; }hos_config_t; typedef struct hos_func_thread_s @@ -77,7 +78,7 @@ typedef struct hos_func_thread_s pthread_t fd_thread; int fd_thread_status; /* fs2 管理线程 */ - fs2_info_t fs2_info[FS2_RECORD_EVENTS]; //0: data info; 1: fd info; 2 cache info; 3 PoolThread state + fs2_info_t fs2_info; pthread_t fs2_thread; int fs2_status; #define HOS_FS2_START 1 @@ -98,11 +99,12 @@ typedef struct hos_client_handle_s hos_func_thread_t hos_func; void *log; size_t *task_num; + size_t *task_context; }hos_client_handle_t; extern struct hos_instance_s g_hos_instance; extern hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle extern hos_fd_context_t **g_fd_context; -extern size_t (*g_fd_info)[MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd +extern size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd #endif \ No newline at end of file diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index fff9b474..bb7e2538 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -44,8 +44,6 @@ void add_fd_context(hos_fd_context_t **handle, hos_fd_context_t *input) value->position = input->position; value->recive_cnt = input->recive_cnt; value->fd_status = value->fd_status; - value->overtime = value->overtime; - value->timeout = value->timeout; } } diff --git a/src/hos_hash.h b/src/hos_hash.h index 1b196df3..c23e3d51 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -25,11 +25,8 @@ typedef struct hos_fd_context_s size_t recive_cnt; long cache_rest; int fd_status; -#define HOS_FD_FREE 0 -#define HOS_FD_REGISTER 1 -#define HOS_FD_INJECT 2 - size_t overtime; //计算后的时间点,超过即inject fd - size_t timeout; //配置的超时时间,从status变成INJECT开始计时 +#define HOS_FD_REGISTER 0 +#define HOS_FD_INJECT 1 UT_hash_handle hh; }hos_fd_context_t;