diff --git a/example/performance/HosClientPerformance.cpp b/example/performance/HosClientPerformance.cpp index 35dfc4e1..08bbc945 100644 --- a/example/performance/HosClientPerformance.cpp +++ b/example/performance/HosClientPerformance.cpp @@ -214,7 +214,7 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch if (g_mode & APPEND_MODE) { - fd[0] = hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num, g_mode); + fd[0] = hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num); for (i = 0; i < g_test_count; i++) { @@ -224,7 +224,7 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch { tmp = j * g_append_size; rest = buff_len - tmp; - if (rest < g_append_size) + if (rest <= g_append_size) { hos_write(fd[0], &buff[tmp], rest, thread_info->thread_num); break; @@ -266,8 +266,7 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch for (i = 0; i < g_test_count; i++) { clock_gettime(CLOCK_MONOTONIC, &tstart); - fd[i] = hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num, g_mode); - ret = hos_write(fd[i], buff, buff_len, thread_info->thread_num); + ret = hos_upload_buf(thread_info->bucket, thread_info->object, buff, buff_len, callback, NULL, thread_info->thread_num); if (ret == HOS_CLIENT_OK) { success_cnt++; diff --git a/gtest/CMakeLists.txt b/gtest/CMakeLists.txt index 3dabf565..db8c5f56 100644 --- a/gtest/CMakeLists.txt +++ b/gtest/CMakeLists.txt @@ -13,7 +13,7 @@ SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fprofile-arcs -ftest-cove add_definitions(-g -W -Wall -std=c++11) #add_executable(gtest_hos_client gtest_hos_init_instance.cpp gtest_hos_get_instance.cpp gtest_hos_close_fd.cpp gtest_hos_open_fd.cpp) -#add_executable(gtest_hos_client CheckHosClient.cpp gtest_hos_write.cpp) -add_executable(gtest_hos_client ${SRCS}) +add_executable(gtest_hos_client CheckHosClient.cpp gtest_hos_write.cpp) +#add_executable(gtest_hos_client ${SRCS}) target_link_libraries(gtest_hos_client hos-client-cpp gtest gtest_main pthread) diff --git a/gtest/CheckHosClient.cpp b/gtest/CheckHosClient.cpp index 354752cc..75787eed 100644 --- a/gtest/CheckHosClient.cpp +++ b/gtest/CheckHosClient.cpp @@ -61,10 +61,10 @@ void CheckStructHosFunc(hos_func_thread_t *actual, hos_func_thread_t *expect, in { //EXPECT_EQ(actual->fd_thread, expect->fd_thread); EXPECT_EQ(actual->fd_thread_status, expect->fd_thread_status); - if (actual->fs2_info[0].reserved != NULL) + if (actual->fs2_info.reserved != NULL) { - CheckStructFs2DataInfo((data_info_t *)actual->fs2_info[0].reserved, - (data_info_t *)expect->fs2_info[0].reserved, thread_num); + CheckStructFs2DataInfo((data_info_t *)actual->fs2_info.reserved, + (data_info_t *)expect->fs2_info.reserved, thread_num); } EXPECT_EQ(actual->fs2_status, expect->fs2_status); //EXPECT_EQ(actual->fs2_thread, expect->fs2_thread); diff --git a/gtest/gtest_hos_close_fd.cpp b/gtest/gtest_hos_close_fd.cpp index 502ed862..dbedf6a6 100644 --- a/gtest/gtest_hos_close_fd.cpp +++ b/gtest/gtest_hos_close_fd.cpp @@ -30,7 +30,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -59,7 +59,7 @@ static void gtest_hos_fd_init(hos_fd_context_t *fd_info) fd_info->callback = NULL; fd_info->fd = 1; fd_info->fd_status = HOS_FD_REGISTER; - fd_info->mode = BUFF_MODE; + fd_info->mode = BUFF_MODE | APPEND_MODE; fd_info->position = 0; fd_info->recive_cnt = 0; fd_info->userdata = NULL; @@ -78,7 +78,7 @@ TEST(hos_close_fd, normal) gtest_hos_handle_init(&expect_hos_handle, thread_num); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); + size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -97,7 +97,7 @@ TEST(hos_close_fd, normal) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -124,7 +124,7 @@ TEST(hos_close_fd, paramer_error) expect_hos_handle.hos_config.thread_num=2; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - int fd = hos_open_fd(NULL, "object", NULL, NULL, 0, BUFF_MODE); + int fd = hos_open_fd(NULL, "object", NULL, NULL, 0); EXPECT_EQ(fd, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -142,7 +142,7 @@ TEST(hos_close_fd, paramer_error) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -186,7 +186,7 @@ TEST(hos_close_fd, fd_not_exits) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); diff --git a/gtest/gtest_hos_get_instance.cpp b/gtest/gtest_hos_get_instance.cpp index c2fccb22..03528aea 100644 --- a/gtest/gtest_hos_get_instance.cpp +++ b/gtest/gtest_hos_get_instance.cpp @@ -30,7 +30,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -82,7 +82,7 @@ TEST(hos_get_instance, normal) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); diff --git a/gtest/gtest_hos_init_instance.cpp b/gtest/gtest_hos_init_instance.cpp index f44a3842..bf27c431 100644 --- a/gtest/gtest_hos_init_instance.cpp +++ b/gtest/gtest_hos_init_instance.cpp @@ -35,7 +35,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -71,7 +71,7 @@ TEST(hos_init_instance, normal) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -96,38 +96,6 @@ TEST(hos_init_instance, param_error) CheckHosInstance(hos_instance, &expect_hos_instance); } -TEST(hos_init_instance, no_fs2) -{ - hos_instance_s expect_hos_instance; - hos_client_handle_t expect_hos_handle; - - hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_no_fs2_conf", 1, HOS_BUCKET); - gtest_hos_instance_init(&expect_hos_instance); - CheckHosInstance(hos_instance, &expect_hos_instance); - gtest_hos_handle_init(&expect_hos_handle, 1); - expect_hos_handle.hos_config.fs2_ip[0] = '\0'; - expect_hos_handle.hos_config.fs2_port = 0; - expect_hos_handle.hos_func.fs2_status = 0; - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - int ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); - memset(&expect_hos_instance, 0, sizeof(expect_hos_instance)); - CheckHosInstance(hos_instance, &expect_hos_instance); - - Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; - free(data_info->cache); - free(data_info->rx_bytes); - free(data_info->rx_pkts); - free(data_info->tx_bytes); - free(data_info->tx_pkts); - free(data_info->tx_failed_bytes); - free(data_info->tx_failed_pkts); - memset(&expect_hos_handle, 0, sizeof(hos_client_handle_s)); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); -} - TEST(hos_init_instance, bucket_not_exits) { hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_default_conf", 1, "hos_bucket_not_exits"); diff --git a/gtest/gtest_hos_open_fd.cpp b/gtest/gtest_hos_open_fd.cpp index 5c175737..121cdcef 100644 --- a/gtest/gtest_hos_open_fd.cpp +++ b/gtest/gtest_hos_open_fd.cpp @@ -30,7 +30,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -59,7 +59,7 @@ static void gtest_hos_fd_init(hos_fd_context_t *fd_info) fd_info->callback = NULL; fd_info->fd = 1; fd_info->fd_status = HOS_FD_REGISTER; - fd_info->mode = BUFF_MODE; + fd_info->mode = BUFF_MODE | APPEND_MODE; fd_info->position = 0; fd_info->recive_cnt = 0; fd_info->userdata = NULL; @@ -77,14 +77,14 @@ TEST(hos_open_fd, normal) gtest_hos_handle_init(&expect_hos_handle, 2); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); + size_t fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - size_t fd1 = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 1, BUFF_MODE); + size_t fd1 = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 1); EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -110,7 +110,7 @@ TEST(hos_open_fd, normal) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -134,7 +134,7 @@ TEST(hos_open_fd, paramer_error) gtest_hos_handle_init(&expect_hos_handle, 2); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - int fd = hos_open_fd(NULL, "object", NULL, NULL, 0, BUFF_MODE); + int fd = hos_open_fd(NULL, "object", NULL, NULL, 0); EXPECT_EQ(fd, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -147,7 +147,7 @@ TEST(hos_open_fd, paramer_error) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -171,7 +171,7 @@ TEST(hos_open_fd, over_threadnums) gtest_hos_handle_init(&expect_hos_handle, 2); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - int fd = hos_open_fd(HOS_CONF, "object", NULL, NULL, 3, BUFF_MODE); + int fd = hos_open_fd(HOS_CONF, "object", NULL, NULL, 3); EXPECT_EQ(fd, HOS_PARAMETER_ERROR); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -184,7 +184,7 @@ TEST(hos_open_fd, over_threadnums) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -197,70 +197,8 @@ TEST(hos_open_fd, over_threadnums) EXPECT_EQ((void *)g_fd_context, (void *)NULL); } -#if 0 //修改fd管理逻辑, fd不再有大小限制 -TEST(hos_open_fd, fd_not_enough) -{ - int i = 0, fd = 0; - hos_instance_s expect_hos_instance; - hos_client_handle_t expect_hos_handle; - hos_fd_context_t expect_fd_info; - - hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_default_conf", 2, HOS_BUCKET); - gtest_hos_instance_init(&expect_hos_instance); - CheckHosInstance(hos_instance, &expect_hos_instance); - gtest_hos_handle_init(&expect_hos_handle, 2); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - gtest_hos_fd_init(&expect_fd_info); - hos_fd_context_t * prev = NULL; - hos_fd_context_t * current = NULL; - for (i = 0; i < 65533; i++) - { - fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, i+1); - expect_fd_info.fd = i+1; - if (i == 0) - { - current = g_fd_context[0]; - } - else - { - current = (hos_fd_context_t *)prev->hh.next; - } - CheckStructGHosFdContext(current, &expect_fd_info); - prev = current; - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - } - fd = hos_open_fd(HOS_CONF, "object", NULL, NULL, 0, BUFF_MODE); - EXPECT_EQ(fd, HOS_FD_NOT_ENOUGH); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - EXPECT_EQ(prev->hh.next, (void *)NULL); - - int ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); - expect_hos_instance.result = 0; - expect_hos_instance.hos_url_prefix = NULL; - CheckHosInstance(hos_instance, &expect_hos_instance); - - Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; - free(data_info->cache); - free(data_info->rx_bytes); - free(data_info->rx_pkts); - free(data_info->tx_bytes); - free(data_info->tx_pkts); - free(data_info->tx_failed_bytes); - free(data_info->tx_failed_pkts); - memset(&expect_hos_handle, 0, sizeof(hos_client_handle_s)); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - EXPECT_EQ((void *)g_fd_context, (void *)NULL); -} -#endif - TEST(hos_open_fd, not_init_instance) { - int fd = hos_open_fd(HOS_CONF, "object", NULL, NULL, 0, BUFF_MODE); + int fd = hos_open_fd(HOS_CONF, "object", NULL, NULL, 0); EXPECT_EQ(fd, HOS_INSTANCE_NOT_INIT); } \ No newline at end of file diff --git a/gtest/gtest_hos_shutdown_instance.cpp b/gtest/gtest_hos_shutdown_instance.cpp index 4b53a120..b371029a 100644 --- a/gtest/gtest_hos_shutdown_instance.cpp +++ b/gtest/gtest_hos_shutdown_instance.cpp @@ -35,7 +35,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -74,7 +74,7 @@ TEST(hos_shutdown_instance, normal) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -112,7 +112,7 @@ TEST(hos_shutdown_instance, shutdown_more) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); diff --git a/gtest/gtest_hos_upload_buff.cpp b/gtest/gtest_hos_upload_buff.cpp index e5858ab6..2edca10a 100644 --- a/gtest/gtest_hos_upload_buff.cpp +++ b/gtest/gtest_hos_upload_buff.cpp @@ -31,7 +31,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -79,7 +79,7 @@ TEST(hos_upload_buff, normal) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_buf(HOS_BUCKET, "object", HOS_BUFF, strlen(HOS_BUFF), hos_callback, (void *)"object", 0); @@ -121,7 +121,7 @@ TEST(hos_upload_buff, bucket_not_exits) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_buf("bucket_not_exits", "object", HOS_BUFF, strlen(HOS_BUFF), hos_bucket_not_exits_cb, (void *)"object", 0); @@ -163,7 +163,7 @@ TEST(hos_upload_buff, param_error) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_buf(NULL, "object", HOS_BUFF, strlen(HOS_BUFF), hos_callback, (void *)"object", 0); diff --git a/gtest/gtest_hos_upload_file.cpp b/gtest/gtest_hos_upload_file.cpp index d2dbb247..4f886fa3 100644 --- a/gtest/gtest_hos_upload_file.cpp +++ b/gtest/gtest_hos_upload_file.cpp @@ -31,7 +31,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -79,7 +79,7 @@ TEST(hos_upload_file, normal) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_file(HOS_BUCKET, HOS_BUFF, hos_callback, (void *)HOS_BUFF, 0); @@ -123,7 +123,7 @@ TEST(hos_upload_file, param_error) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_file(NULL, "object", hos_callback, (void *)"object", 0); @@ -161,7 +161,7 @@ TEST(hos_upload_file, file_not_exits) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_file(HOS_BUCKET, "file_not_exits", hos_callback, (void *)"object", 0); @@ -205,7 +205,7 @@ TEST(hos_upload_file, bucket_not_exits) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, 2); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); int ret = hos_upload_file(HOS_CONF, HOS_CONF, hos_bucket_not_exits_cb, (void *)HOS_CONF, 0); diff --git a/gtest/gtest_hos_verify_bucket.cpp b/gtest/gtest_hos_verify_bucket.cpp deleted file mode 100644 index 57fad317..00000000 --- a/gtest/gtest_hos_verify_bucket.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/************************************************************************* - > File Name: gtest_hos_verify_bucket.cpp - > Author: pxz - > Created Time: Tue 29 Sep 2020 10:32:14 AM CST - ************************************************************************/ -#include -#include "CheckHosClient.h" - -#define HOS_CONF "../conf/default.conf" -#define HOS_BUCKET "firewall_hos_bucket" - -static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_num) -{ - memset(hos_handle, 0, sizeof(hos_client_handle_t)); - hos_handle->buckets.push_back(Aws::S3::Model::Bucket().WithName("session_record_hos_bucket")); - hos_handle->buckets.push_back(Aws::S3::Model::Bucket().WithName("firewall_hos_bucket")); - hos_handle->count = 1; - memcpy(hos_handle->hos_config.accesskeyid, "default", strlen("default")+1); - memcpy(hos_handle->hos_config.secretkey, "default", strlen("default")+1); - hos_handle->hos_config.cache_count = 10; - hos_handle->hos_config.cache_size = 102400; - hos_handle->hos_config.fs2_fmt = 0; - memcpy(hos_handle->hos_config.fs2_ip, "127.0.0.1", strlen("127.0.0.1")+1); - memcpy(hos_handle->hos_config.fs2_path, "./log/hos_fs2_log", strlen("./log/hos_fs2_log")+1); - hos_handle->hos_config.fs2_port = 10086; - memcpy(hos_handle->hos_config.ip, "127.0.0.1", strlen("127.0.0.1")+1); - hos_handle->hos_config.log_level = 30; - memcpy(hos_handle->hos_config.log_path, "./hoslog", strlen("./hoslog")+1); - hos_handle->hos_config.pool_thread_size = 10; - hos_handle->hos_config.port = 9098; - hos_handle->hos_config.thread_num = thread_num; - hos_handle->hos_config.max_request_num = 100; - hos_handle->hos_config.max_request_context = 10240000; - hos_handle->hos_func.fd_thread_status = 0; - hos_handle->hos_func.fs2_status = 1; - - data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; - data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); - data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); - data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); - data_info->rx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); - data_info->tx_failed_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); - data_info->tx_failed_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); - data_info->cache = (size_t *)calloc(thread_num, sizeof(size_t)); -} - -static void gtest_hos_instance_init(hos_instance instance) -{ - memset(instance, 0, sizeof(hos_instance_s)); - instance->result = true; - instance->error_code = 0; - instance->error_message[0] ='\0'; - instance->hos_url_prefix = "http://127.0.0.1:9098/hos/"; -} - -TEST(hos_verify_bucket, normal) -{ - hos_instance_s expect_hos_instance; - hos_client_handle_t expect_hos_handle; - hos_fd_context_t expect_fd_info; - - hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_default_conf", 1, HOS_BUCKET); - gtest_hos_instance_init(&expect_hos_instance); - CheckHosInstance(hos_instance, &expect_hos_instance); - gtest_hos_handle_init(&expect_hos_handle, 1); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - bool result = hos_verify_bucket(HOS_BUCKET); - EXPECT_EQ(result, true); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - int ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); - expect_hos_instance.result = 0; - expect_hos_instance.hos_url_prefix = NULL; - CheckHosInstance(hos_instance, &expect_hos_instance); - - Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; - free(data_info->cache); - free(data_info->rx_bytes); - free(data_info->rx_pkts); - free(data_info->tx_bytes); - free(data_info->tx_pkts); - free(data_info->tx_failed_bytes); - free(data_info->tx_failed_pkts); - memset(&expect_hos_handle, 0, sizeof(hos_client_handle_s)); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - EXPECT_EQ((void *)g_fd_context, (void *)NULL); -} - -TEST(hos_verify_bucket, not_exits) -{ - hos_instance_s expect_hos_instance; - hos_client_handle_t expect_hos_handle; - hos_fd_context_t expect_fd_info; - - hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_default_conf", 1, HOS_BUCKET); - gtest_hos_instance_init(&expect_hos_instance); - CheckHosInstance(hos_instance, &expect_hos_instance); - gtest_hos_handle_init(&expect_hos_handle, 1); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - bool result = hos_verify_bucket("hos_not_exits_bucket"); - EXPECT_EQ(result, false); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - int ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); - expect_hos_instance.result = 0; - expect_hos_instance.hos_url_prefix = NULL; - CheckHosInstance(hos_instance, &expect_hos_instance); - - Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; - free(data_info->cache); - free(data_info->rx_bytes); - free(data_info->rx_pkts); - free(data_info->tx_bytes); - free(data_info->tx_pkts); - free(data_info->tx_failed_bytes); - free(data_info->tx_failed_pkts); - memset(&expect_hos_handle, 0, sizeof(hos_client_handle_s)); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - EXPECT_EQ((void *)g_fd_context, (void *)NULL); -} - -TEST(hos_verify_bucket, no_instance) -{ - bool result = hos_verify_bucket("hos_not_exits_bucket"); - EXPECT_EQ(result, false); -} diff --git a/gtest/gtest_hos_write.cpp b/gtest/gtest_hos_write.cpp index 9c29cb50..e08445a3 100644 --- a/gtest/gtest_hos_write.cpp +++ b/gtest/gtest_hos_write.cpp @@ -32,7 +32,7 @@ static void gtest_hos_handle_init(hos_client_handle_t *hos_handle, int thread_nu hos_handle->hos_func.fs2_status = 1; data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); - hos_handle->hos_func.fs2_info[0].reserved = (void *)data_info; + hos_handle->hos_func.fs2_info.reserved = (void *)data_info; data_info->tx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(thread_num, sizeof(size_t)); @@ -61,7 +61,7 @@ static void gtest_hos_fd_init(hos_fd_context_t *fd_info) fd_info->callback = NULL; fd_info->fd = 1; fd_info->fd_status = HOS_FD_REGISTER; - fd_info->mode = BUFF_MODE; + fd_info->mode = BUFF_MODE | APPEND_MODE; fd_info->position = 0; fd_info->recive_cnt = 0; fd_info->userdata = NULL; @@ -94,15 +94,6 @@ static void hos_write_append_cb(bool result, const char *bucket, const char *obj EXPECT_STREQ(error, NULL); } -static void hos_write_file_cb(bool result, const char *bucket, const char *object, const char *error, void *userdata) -{ - SUCCEED(); - EXPECT_EQ(result, true); - EXPECT_STREQ(bucket, HOS_BUCKET); - EXPECT_STREQ(object, (char *)userdata); - EXPECT_STREQ(error, NULL); -} - static void hos_bucket_not_exits_cb(bool result, const char *bucket, const char *object, const char *error, void *userdata) { SUCCEED(); @@ -117,17 +108,17 @@ TEST(hos_write, normal) int thread_num = 3; hos_instance_s expect_hos_instance; hos_client_handle_t expect_hos_handle; - hos_fd_context_t expect_fd_info[3]; + hos_fd_context_t expect_fd_info[2]; data_info_t *data_info = NULL; hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_default_conf", thread_num, HOS_BUCKET); gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, thread_num); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd(HOS_BUCKET, "object_buff", hos_write_buff_cb, (void *)"object_buff", 0, BUFF_MODE); + size_t fd = hos_open_fd(HOS_BUCKET, "object_buff", hos_write_buff_cb, (void *)"object_buff", 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -142,14 +133,13 @@ TEST(hos_write, normal) expect_fd_info[0].fd_status = 2; data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] +=1; - data_info->tx_bytes[0] += strlen(HOS_BUFF); - data_info->tx_pkts[0] += 1; + data_info->cache[0] += strlen(HOS_BUFF); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - size_t fd1 = hos_open_fd(HOS_BUCKET, "object_append", hos_write_append_cb, (void *)"object_append", 1, BUFF_MODE | APPEND_MODE); + size_t fd1 = hos_open_fd(HOS_BUCKET, "object_append", hos_write_append_cb, (void *)"object_append", 1); EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -157,7 +147,6 @@ TEST(hos_write, normal) expect_fd_info[1].callback = (void *)hos_write_append_cb; expect_fd_info[1].object = (char *)"object_append"; expect_fd_info[1].userdata = (void *)"object_append"; - expect_fd_info[1].mode = BUFF_MODE | APPEND_MODE; CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); @@ -184,33 +173,12 @@ TEST(hos_write, normal) CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); EXPECT_TRUE(g_fd_context[1][0].cache != NULL); - size_t fd2 = hos_open_fd(HOS_BUCKET, "object_file", hos_write_file_cb, (void *)"object_file", 2, FILE_MODE); - EXPECT_EQ(fd2, 1); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - gtest_hos_fd_init(&expect_fd_info[2]); - expect_fd_info[2].callback = (void *)hos_write_file_cb; - expect_fd_info[2].object = (char *)"object_file"; - expect_fd_info[2].userdata = (void *)"object_file"; - expect_fd_info[2].mode = FILE_MODE; - CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - - ret = hos_write(fd2, HOS_FILE, strlen(HOS_CONF), 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - struct stat buffer; - stat(HOS_CONF, &buffer); - data_info->rx_bytes[2] += buffer.st_size; - data_info->rx_pkts[2] +=1; - data_info->tx_bytes[2] += buffer.st_size; - data_info->tx_pkts[2] += 1; - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - expect_fd_info[2].fd_status = 2; - //CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); - //EXPECT_TRUE(g_fd_context[2][0].cache != NULL); ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); + data_info->tx_bytes[0] += data_info->cache[0]; + data_info->tx_pkts[0] += 1; + data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_close_fd(fd1, 1); @@ -220,10 +188,6 @@ TEST(hos_write, normal) data_info->cache[1] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd2, 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_shutdown_instance(); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -256,10 +220,10 @@ TEST(hos_write, bucket_not_exits) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, thread_num); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd("bucket_not_exits", "object_buff", hos_bucket_not_exits_cb, (void *)"object_buff", 0, BUFF_MODE); + size_t fd = hos_open_fd("bucket_not_exits", "object_buff", hos_bucket_not_exits_cb, (void *)"object_buff", 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -274,12 +238,11 @@ TEST(hos_write, bucket_not_exits) EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[0] = strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; - data_info->tx_failed_bytes[0] = strlen(HOS_BUFF); - data_info->tx_failed_pkts[0] += 1; + data_info->cache[0] = strlen(HOS_BUFF); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd1 = hos_open_fd("bucket_not_exits", "object_append", hos_bucket_not_exits_cb, (void *)"object_append", 1, BUFF_MODE | APPEND_MODE); + size_t fd1 = hos_open_fd("bucket_not_exits", "object_append", hos_bucket_not_exits_cb, (void *)"object_append", 1); EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -288,7 +251,6 @@ TEST(hos_write, bucket_not_exits) expect_fd_info[1].userdata = (void *)"object_append"; expect_fd_info[1].bucket = (char *)"bucket_not_exits"; expect_fd_info[1].object = (char *)"object_append"; - expect_fd_info[1].mode = BUFF_MODE | APPEND_MODE; CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); @@ -315,33 +277,11 @@ TEST(hos_write, bucket_not_exits) CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); EXPECT_TRUE(g_fd_context[1][0].cache != NULL); - size_t fd2 = hos_open_fd("bucket_not_exits", "object_file", hos_bucket_not_exits_cb, (void *)"object_file", 2, FILE_MODE); - EXPECT_EQ(fd2, 1); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - gtest_hos_fd_init(&expect_fd_info[2]); - expect_fd_info[2].callback = (void *) hos_bucket_not_exits_cb; - expect_fd_info[2].userdata = (void *)"object_file"; - expect_fd_info[2].bucket = (char *)"bucket_not_exits"; - expect_fd_info[2].object = (char *)"object_file"; - expect_fd_info[2].mode = FILE_MODE; - CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - - ret = hos_write(fd2, HOS_FILE, strlen(HOS_CONF), 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - expect_fd_info[2].fd_status = 2; - struct stat buffer; - stat(HOS_CONF, &buffer); - data_info->rx_bytes[2] += buffer.st_size; - data_info->rx_pkts[2] += 1; - data_info->tx_failed_bytes[2] += buffer.st_size; - data_info->tx_failed_pkts[2] += 1; - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - //CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); + data_info->tx_failed_bytes[0] += data_info->cache[0]; + data_info->tx_failed_pkts[0] += 1; + data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_close_fd(fd1, 1); @@ -351,10 +291,6 @@ TEST(hos_write, bucket_not_exits) data_info->cache[1] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd2, 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_shutdown_instance(); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -387,11 +323,11 @@ TEST(hos_write, sync_mode) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, thread_num); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; expect_hos_handle.hos_config.pool_thread_size = 0; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd(HOS_BUCKET, "object_buff", NULL, NULL, 0, BUFF_MODE); + size_t fd = hos_open_fd(HOS_BUCKET, "object_buff", NULL, NULL, 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -403,19 +339,17 @@ TEST(hos_write, sync_mode) EXPECT_EQ(ret, HOS_CLIENT_OK); data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; - data_info->tx_bytes[0] += strlen(HOS_BUFF); - data_info->tx_pkts[0] += 1; + data_info->cache[0] += strlen(HOS_BUFF); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - size_t fd1 = hos_open_fd(HOS_BUCKET, "object_append", NULL, NULL, 1, BUFF_MODE | APPEND_MODE); + size_t fd1 = hos_open_fd(HOS_BUCKET, "object_append", NULL, NULL, 1); EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); expect_fd_info[1].object = (char *)"object_append"; - expect_fd_info[1].mode = BUFF_MODE | APPEND_MODE; CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); ret = hos_write(fd1, HOS_BUFF, strlen(HOS_BUFF), 1); @@ -442,29 +376,11 @@ TEST(hos_write, sync_mode) CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); EXPECT_TRUE(g_fd_context[1][0].cache != NULL); - size_t fd2 = hos_open_fd(HOS_BUCKET, "object_buff", NULL, NULL, 2, FILE_MODE); - EXPECT_EQ(fd2, 1); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - gtest_hos_fd_init(&expect_fd_info[2]); - expect_fd_info[2].object = (char *)"object_buff"; - expect_fd_info[2].mode = FILE_MODE; - CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - ret = hos_write(fd2, HOS_FILE, strlen(HOS_BUFF), 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - struct stat buffer; - stat(HOS_CONF, &buffer); - data_info->rx_bytes[2] += buffer.st_size; - data_info->rx_pkts[2] += 1; - data_info->tx_bytes[2] += buffer.st_size; - data_info->tx_pkts[2] += 1; - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - expect_fd_info[2].fd_status = 2; - //CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); - ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); + data_info->tx_bytes[0] += data_info->cache[0]; + data_info->tx_pkts[0] += 1; + data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_close_fd(fd1, 1); @@ -474,10 +390,6 @@ TEST(hos_write, sync_mode) data_info->cache[1] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd2, 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_shutdown_instance(); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -510,38 +422,35 @@ TEST(hos_write, sync_mode_bucket_not_exits) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, thread_num); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; expect_hos_handle.hos_config.pool_thread_size = 0; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - size_t fd = hos_open_fd(HOS_CONF, "object_buff", NULL, NULL, 0, BUFF_MODE); + size_t fd = hos_open_fd(HOS_CONF, "object_buff", NULL, NULL, 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[0]); expect_fd_info[0].object = (char *)"object_buff"; - expect_fd_info[0].mode = BUFF_MODE; expect_fd_info[0].bucket = (char *)HOS_CONF; CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF), 0); - EXPECT_EQ(ret, NO_SUCH_BUCKET); + EXPECT_EQ(ret, HOS_CLIENT_OK); expect_fd_info[0].fd_status = 2; data_info->rx_bytes[0] += strlen(HOS_BUFF); data_info->rx_pkts[0] += 1; - data_info->tx_failed_bytes[0] += strlen(HOS_BUFF); - data_info->tx_failed_pkts[0] += 1; + data_info->cache[0] += strlen(HOS_BUFF); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[0]); - size_t fd1 = hos_open_fd(HOS_CONF, "object_append", NULL, NULL, 1, BUFF_MODE | APPEND_MODE); + size_t fd1 = hos_open_fd(HOS_CONF, "object_append", NULL, NULL, 1); EXPECT_EQ(fd1, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[1]); expect_fd_info[1].object = (char *)"object_append"; - expect_fd_info[1].mode = BUFF_MODE | APPEND_MODE; expect_fd_info[1].bucket = (char *)HOS_CONF; CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); @@ -569,30 +478,11 @@ TEST(hos_write, sync_mode_bucket_not_exits) CheckStructGHosFdContext(g_fd_context[1], &expect_fd_info[1]); EXPECT_TRUE(g_fd_context[1][0].cache != NULL); - size_t fd2 = hos_open_fd(HOS_CONF, "object_file", NULL, NULL, 2, FILE_MODE); - EXPECT_EQ(fd2, 1); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - gtest_hos_fd_init(&expect_fd_info[2]); - expect_fd_info[2].object = (char *)"object_file"; - expect_fd_info[2].mode = FILE_MODE; - expect_fd_info[2].bucket = (char *)HOS_CONF; - CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - ret = hos_write(fd2, HOS_FILE, strlen(HOS_FILE), 2); - EXPECT_EQ(ret, NO_SUCH_BUCKET); - struct stat buffer; - stat(HOS_CONF, &buffer); - data_info->rx_bytes[2] += buffer.st_size; - data_info->rx_pkts[2] += 1; - data_info->tx_failed_bytes[2] += buffer.st_size; - data_info->tx_failed_pkts[2] += 1; - expect_fd_info[2].fd_status = 2; - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - //CheckStructGHosFdContext(g_fd_context[2], &expect_fd_info[2]); - ret = hos_close_fd(fd, 0); EXPECT_EQ(ret, HOS_CLIENT_OK); + data_info->tx_failed_bytes[0] += data_info->cache[0]; + data_info->tx_failed_pkts[0] += 1; + data_info->cache[0] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_close_fd(fd1, 1); @@ -602,10 +492,6 @@ TEST(hos_write, sync_mode_bucket_not_exits) data_info->cache[1] = 0; CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - ret = hos_close_fd(fd2, 2); - EXPECT_EQ(ret, HOS_CLIENT_OK); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); ret = hos_shutdown_instance(); EXPECT_EQ(ret, HOS_CLIENT_OK); @@ -639,14 +525,13 @@ TEST(hos_write, paramer_error) gtest_hos_handle_init(&expect_hos_handle, thread_num); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - int fd = hos_open_fd(HOS_BUCKET, "object_buff", hos_callback, NULL, 0, BUFF_MODE); + int fd = hos_open_fd(HOS_BUCKET, "object_buff", hos_callback, NULL, 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info); expect_fd_info.object = (char *)"object_buff"; expect_fd_info.callback = (void *)hos_callback; - expect_fd_info.mode = BUFF_MODE; CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info); int ret = hos_write(fd, NULL, strlen(HOS_BUFF), 0); @@ -662,7 +547,7 @@ TEST(hos_write, paramer_error) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -701,55 +586,7 @@ TEST(hos_write, fd_not_find) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; - free(data_info->cache); - free(data_info->rx_bytes); - free(data_info->rx_pkts); - free(data_info->tx_bytes); - free(data_info->tx_pkts); - free(data_info->tx_failed_bytes); - free(data_info->tx_failed_pkts); - memset(&expect_hos_handle, 0, sizeof(hos_client_handle_s)); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - EXPECT_EQ((void *)g_fd_context, (void *)NULL); -} - -TEST(hos_write, file_not_exit) -{ - hos_instance_s expect_hos_instance; - hos_client_handle_t expect_hos_handle; - hos_fd_context_t expect_fd_info; - int thread_num = 2; - - hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_default_conf", thread_num, HOS_BUCKET); - gtest_hos_instance_init(&expect_hos_instance); - CheckHosInstance(hos_instance, &expect_hos_instance); - gtest_hos_handle_init(&expect_hos_handle, thread_num); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - - int fd = hos_open_fd(HOS_CONF, "object_file", NULL, NULL, 0, FILE_MODE); - EXPECT_EQ(fd, 1); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - gtest_hos_fd_init(&expect_fd_info); - expect_fd_info.bucket = (char *)HOS_CONF; - expect_fd_info.object = (char *)"object_file"; - expect_fd_info.mode = FILE_MODE; - CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info); - - int ret = hos_write(fd, "not_exit_file", strlen(HOS_CONF), 0); - EXPECT_EQ(ret, HOS_FILE_NOT_EXIST); - CheckHosInstance(hos_instance, &expect_hos_instance); - CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - //CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info); - ret = hos_shutdown_instance(); - EXPECT_EQ(ret, HOS_CLIENT_OK); - expect_hos_instance.result = 0; - expect_hos_instance.hos_url_prefix = NULL; - CheckHosInstance(hos_instance, &expect_hos_instance); - - Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -775,7 +612,7 @@ TEST(hos_write, over_threadnums) gtest_hos_handle_init(&expect_hos_handle, thread_num); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); - int fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0, BUFF_MODE); + int fd = hos_open_fd(HOS_BUCKET, "object", NULL, NULL, 0); EXPECT_EQ(fd, 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); @@ -796,7 +633,7 @@ TEST(hos_write, over_threadnums) CheckHosInstance(hos_instance, &expect_hos_instance); Aws::Vector().swap(g_hos_handle.buckets); - data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; free(data_info->cache); free(data_info->rx_bytes); free(data_info->rx_pkts); @@ -841,22 +678,21 @@ static void *hos_function(void *ptr) gtest_hos_instance_init(&expect_hos_instance); CheckHosInstance(hos_instance, &expect_hos_instance); gtest_hos_handle_init(&expect_hos_handle, thread_num); - data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info[0].reserved; + data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved; expect_hos_handle.count = thread_id + 1; CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); for (i = 0; i < HOS_FD_NUMS_LOCAL; i++) { snprintf(object[i], 1024, "object_%lu_%d", thread_id, i); - fd[i] = hos_open_fd(HOS_BUCKET, object[i], hos_callback, object[i], 0, BUFF_MODE | APPEND_MODE); + fd[i] = hos_open_fd(HOS_BUCKET, object[i], hos_callback, object[i], 0); EXPECT_EQ(fd[i], i + 1); CheckHosInstance(hos_instance, &expect_hos_instance); CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle); gtest_hos_fd_init(&expect_fd_info[thread_id][i]); expect_fd_info[thread_id][i].object = object[i]; - expect_fd_info[thread_id][i].mode = BUFF_MODE | APPEND_MODE; expect_fd_info[thread_id][i].callback = (void *)hos_callback; - CheckStructGHosFdContext(g_fd_context[0], &expect_fd_info[thread_id][i]); + CheckStructGHosFdContext(g_fd_context[thread_id], &expect_fd_info[thread_id][i]); } for (i = 0; i < HOS_FD_NUMS_LOCAL; i++) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 0e9c41eb..6c02d181 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -46,6 +46,7 @@ extern "C" struct hos_instance_s g_hos_instance; hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle static std::mutex m_client_lock; +static std::mutex m_delete_lock; hos_fd_context_t **g_fd_context; size_t *g_fd_info; //fd 实际从1开始,每个线程有独立的fd static Aws::SDKOptions g_options; @@ -57,19 +58,24 @@ static inline size_t get_current_ms() return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 ); } -static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) +static int hos_delete_fd(size_t fd, size_t thread_id) { + std::lock_guard locker(m_delete_lock); + hos_fd_context_t* context = find_context_by_fd(g_fd_context[thread_id], fd); if (context == NULL) { return HOS_PARAMETER_ERROR; } - - if (context) + + put_finished_callback callback = (put_finished_callback)context->callback; + if (callback) { - if (context->bucket) - { - free(context->bucket); - context->bucket = NULL; + callback(context->reslut, context->bucket, context->object, context->error, context->userdata); + } + if (context->bucket) + { + free(context->bucket); + context->bucket = NULL; } if (context->object) { @@ -78,7 +84,6 @@ static int hos_delete_fd(size_t thread_id, hos_fd_context_t *context) } HASH_DEL(g_fd_context[thread_id], context); free(context); - } return HOS_CLIENT_OK; } @@ -143,8 +148,9 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, "debug: [%s:%s] upload success. stream size:%lu", a_fd_context->bucket, a_fd_context->object, stream_len); } } - put_finished_callback callback = (put_finished_callback)a_fd_context->callback; - callback(result, a_fd_context->bucket, a_fd_context->object, error, a_fd_context->userdata); + a_fd_context->reslut = result; + a_fd_context->error = error; + if (a_fd_context->mode & APPEND_MODE) { //APPEND MODE 保留fd @@ -156,7 +162,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } } } @@ -166,7 +172,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } } g_hos_handle.task_num[thread_id]--; @@ -175,7 +181,6 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, static void hos_client_create() { - std::lock_guard locker(m_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; void *log = g_hos_handle.log; @@ -577,6 +582,7 @@ hos_instance hos_get_instance() hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num, const char *bucket) { + std::lock_guard locker(m_client_lock); hos_config_t *hos_conf = &g_hos_handle.hos_config; char hos_url[1024]; @@ -809,7 +815,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size return hos_upload_stream(bucket, object, buf, buf_len, callback, userdata, thread_id); } -int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode) +int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id) { if (g_hos_instance.result == false || g_hos_handle.S3Client == NULL) { @@ -826,7 +832,7 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca size_t fd = ++g_fd_info[thread_id]; MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, "hos_open_fd", "debug: thread_id:%lu, fd:%lu", thread_id, fd); - hos_fd_context_t info = {fd, mode, (char *)bucket, (char *)object, (void *)callback, userdata, + hos_fd_context_t info = {fd, BUFF_MODE | APPEND_MODE, (char *)bucket, (char *)object, (void *)callback, userdata, NULL,/*cache*/ g_hos_handle.hos_config.cache_count, 0,/*position*/ 0,/*recive_cnt*/ (long)g_hos_handle.hos_config.cache_size,/*cache_rest*/ HOS_FD_REGISTER,/*fd_status*/}; add_fd_context(&g_fd_context[thread_id], &info); @@ -836,7 +842,6 @@ int hos_open_fd(const char *bucket, const char *object, put_finished_callback ca int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id) { - struct stat buffer; hos_fd_context_t *a_fd_context = NULL; char num[128]; int ret = 0; @@ -869,92 +874,50 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id // create and configure the asynchronous put object request. Aws::S3::Model::PutObjectRequest request; - //设置上传数据类型 - if (a_fd_context->mode & BUFF_MODE) + //field_stat2 record + if (hos_func->fs2_info.fs2_handle) { - //BUFF_MODE - - //field_stat2 record - if (hos_func->fs2_info.fs2_handle) + if (hos_func->fs2_info.reserved) { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += stream_len; - } - } - if (a_fd_context->mode & APPEND_MODE) - { - //APPEND_MODE - if (a_fd_context->cache == NULL) - { - //a_fd_context->cache = Aws::MakeShared("hos_write append mode"); - a_fd_context->cache = std::make_shared(); - } - Aws::String buffer(stream, stream_len); - *a_fd_context->cache << buffer; - a_fd_context->cache_rest -= stream_len; - if (data_info != NULL) - data_info->cache[thread_id] += stream_len; - if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) - { - //cache_count == 0,不设置cache_count的情况 - //cache_count > 0,设置cache_count的情况 - if (a_fd_context->cache_rest > 0) - { - return HOS_CLIENT_OK; - } - } - request.SetBody(a_fd_context->cache); - - // add headers - atomic_add(&(a_fd_context->position), 1); - snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); - Aws::Map headers; - headers["x-hos-upload-type"] = "append"; - headers["x-hos-position"] = num; - request.SetMetadata(headers); - - a_fd_context->cache->seekg(0, std::ios_base::end); - upload_len = a_fd_context->cache->tellg(); - a_fd_context->cache->seekg(0, std::ios_base::beg); - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: x-hos-posotion:%s", num); - } - else - { - const std::shared_ptr input_data = - Aws::MakeShared("hos_write buffer mode"); - Aws::String buffer (stream, stream_len); - *input_data << buffer; - request.SetBody(input_data); - upload_len = stream_len; - } - } - else - { - if (stat(stream, &buffer) == -1) - { - MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: The file:%s not exist", stream); - return HOS_FILE_NOT_EXIST; - } - //文件类型 - const std::shared_ptr input_data = - Aws::MakeShared("hos_write file mode", stream, std::ios_base::in | std::ios_base::binary); - request.SetBody(input_data); - upload_len = buffer.st_size; - //field_stat2 record - if (hos_func->fs2_info.fs2_handle) - { - if (hos_func->fs2_info.reserved) - { - data_info = (data_info_t *)hos_func->fs2_info.reserved; - data_info->rx_pkts[thread_id]++; - data_info->rx_bytes[thread_id] += upload_len; - } + data_info = (data_info_t *)hos_func->fs2_info.reserved; + data_info->rx_pkts[thread_id]++; + data_info->rx_bytes[thread_id] += stream_len; } } + if (a_fd_context->cache == NULL) + { + //a_fd_context->cache = Aws::MakeShared("hos_write append mode"); + a_fd_context->cache = std::make_shared(); + } + Aws::String buffer(stream, stream_len); + *a_fd_context->cache << buffer; + a_fd_context->cache_rest -= stream_len; + if (data_info != NULL) + data_info->cache[thread_id] += stream_len; + if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count) + { + //cache_count == 0,不设置cache_count的情况 + //cache_count > 0,设置cache_count的情况 + if (a_fd_context->cache_rest > 0) + { + return HOS_CLIENT_OK; + } + } + request.SetBody(a_fd_context->cache); + + // add headers + atomic_add(&(a_fd_context->position), 1); + snprintf(num, 128, "%lu", atomic_read(&(a_fd_context->position))); + Aws::Map headers; + headers["x-hos-upload-type"] = "append"; + headers["x-hos-position"] = num; + request.SetMetadata(headers); + + a_fd_context->cache->seekg(0, std::ios_base::end); + upload_len = a_fd_context->cache->tellg(); + a_fd_context->cache->seekg(0, std::ios_base::beg); + MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: x-hos-posotion:%s", num); request.SetBucket(a_fd_context->bucket); request.SetKey(a_fd_context->object); @@ -1053,7 +1016,7 @@ int hos_close_fd(size_t fd, size_t thread_id) if (hos_conf->pool_thread_size == 0) { //同步模式,立即释放fd - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } else { @@ -1064,7 +1027,7 @@ int hos_close_fd(size_t fd, size_t thread_id) MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s:%s] upload completed. [thread:%lu fd:%lu] delete", a_fd_context->bucket, a_fd_context->object, thread_id, a_fd_context->fd); - hos_delete_fd(thread_id, a_fd_context); + hos_delete_fd(fd, thread_id); } } diff --git a/src/hos_client.h b/src/hos_client.h index 48b2c095..f1e5e51d 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -82,7 +82,7 @@ enum s3errors OBJECT_NOT_IN_ACTIVE_TIER }; -typedef void (*put_finished_callback)(bool, const char *, const char *, const char *, void *); +typedef void (*put_finished_callback)(bool result, const char *bucket, const char *object, const char *errmsg, void *userdata); /************************************************************************************* @@ -97,18 +97,6 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t * 返回值: hos_instance 成功,result 为true *************************************************************************************/ hos_instance hos_get_instance(); -/************************************************************************************* - * 函数名: hos_verify_bucket - * 参数: const char * bucket 桶名称 - * 返回值: bool 成功返回true,失败返回false -*************************************************************************************/ -bool hos_verify_bucket(const char *bucket); -/************************************************************************************* - * 函数名: hos_create_bucket - * 参数: const char * bucket 桶名称 - * 返回值: int 成功返回0,S3错误返回s3errors错误码,hos client错误返回hoserrors错误码 -*************************************************************************************/ -int hos_create_bucket(hos_instance instance, const char *bucket); /************************************************************************************* * 函数名: hos_upload_file * 参数: hos_instance instance 非空句柄 @@ -143,7 +131,7 @@ int hos_upload_buf(const char *bucket, const char *object, const char *buf, size * int mode 模式 (FILE OR BUFFER, APPEND OR NOT) * 返回值 int 成功返回fd(fd >=3),失败返回hoserros错误码 *************************************************************************************/ -int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode); +int hos_open_fd(const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id); /************************************************************************************* * 函数名: hos_write * 参数: size_t fd hos_open_fd返回的fd diff --git a/src/hos_hash.h b/src/hos_hash.h index c23e3d51..81208f3f 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -27,6 +27,9 @@ typedef struct hos_fd_context_s int fd_status; #define HOS_FD_REGISTER 0 #define HOS_FD_INJECT 1 + bool reslut; /*PutObjectAsync result*/ + const char *error; /*PutObjectAsync error message*/ + UT_hash_handle hh; }hos_fd_context_t;