🐞 fix(TSG-9807): 修复cache_size设置为0导致的内存快速消耗

This commit is contained in:
“pengxuanzheng”
2022-03-02 10:35:26 +00:00
parent d4e8b149c8
commit 16d71d2fe6
22 changed files with 214 additions and 148 deletions

View File

@@ -37,6 +37,9 @@ add_subdirectory(src)
enable_testing()
add_subdirectory(gtest)
add_subdirectory(example/demo)
add_subdirectory(example/performance)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/src/libhos-client-cpp.so DESTINATION ${CMAKE_INSTALL_PREFIX}/lib COMPONENT LIBRARIES)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/hos_client.h DESTINATION ${CMAKE_INSTALL_PREFIX}/include COMPONENT HEADER)
#install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/conf/hos.conf DESTINATION /etc/ld.so.conf.d COMPONENT PROFILE)

View File

@@ -5,7 +5,10 @@ project(hos_write_demo)
SET(CMAKE_BUILD_TYPE Debug)
link_directories(/opt/MESA/lib/)
include_directories(/opt/MESA/include)
include_directories("${CMAKE_SOURCE_DIR}/src/")
link_directories("${CMAKE_BINARY_DIR}/src")
add_executable(hos_write_demo hos_write_demo.cpp)
add_dependencies(hos_write_demo ${lib_name}_shared)
target_link_libraries(hos_write_demo hos-client-cpp)

View File

@@ -1,5 +1,5 @@
[hos_default_conf]
hos_serverip=192.168.10.1
hos_serverip=192.168.44.67
hos_serverport=9098
hos_accesskeyid="default"
hos_secretkey="default"

View File

@@ -18,19 +18,18 @@ extern "C"
typedef struct userdata_s
{
struct timespec *finished;
//struct timespec *finished;
}userdata_t;
static size_t calc_time(struct timespec start, struct timespec end)
{
return (end.tv_sec * 1000 * 1000 * 1000 + end.tv_nsec -
(start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec));
}
// static size_t calc_time(struct timespec start, struct timespec end)
// {
// return (end.tv_sec * 1000 * 1000 * 1000 + end.tv_nsec -
// (start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec));
// }
int file_to_buffer(const char *file, char *buffer, int size)
{
FILE *fp = fopen(file, "r");
int num = 0;
if (fp == NULL)
{
@@ -38,7 +37,7 @@ int file_to_buffer(const char *file, char *buffer, int size)
return -1;
}
do{
num = fread(buffer, 1, size, fp);
int num = fread(buffer, 1, size, fp);
if (num < 0)
{
return -1;
@@ -56,30 +55,24 @@ void callback(bool result, const char *bucket, const char *object, const char *e
{
if (!result)
{
printf("error:[%d], %s\n", errorcode, errormsg);
printf("error:[%zu], %s\n", errorcode, errormsg);
}
return ;
}
int main(int argc, char *argv[])
int main(const int argc, const char *argv[])
{
if (argc != 4)
{
printf("usege: [conf file] [module name] [file name]\n");
return -1;
}
struct timespec start, end, finished;
size_t time;
int i = 0;
char *conf_file = argv[1];
char *module_name = argv[2];
char *file_name = argv[3];
const char *conf_file = argv[1];
const char *module_name = argv[2];
const char *file_name = argv[3];
struct stat buffer;
char *buf = NULL;
size_t buf_size;
int mode = FILE_MODE;
size_t fd = 0;
userdata_t data = {&finished};
hos_instance hos_instance = NULL;
char object[1024];
const char *bucket = "hos_test_bucket";
@@ -107,7 +100,7 @@ int main(int argc, char *argv[])
if (hos_instance == NULL)
{
printf("error:hos_init_instance\n");
printf("error:[%d]%s", hos_get_init_instance_errorcode, hos_get_init_instance_errormsg);
printf("error:[%d]%s", hos_get_init_instance_errorcode(), hos_get_init_instance_errormsg());
return -1;
}
printf("hos_init_instance success ... \n");
@@ -128,19 +121,48 @@ int main(int argc, char *argv[])
}
printf("hos_upload_buff end ...\n");
mode = BUFF_MODE | APPEND_MODE;
printf("hos_write buff start ...\n");
snprintf(object, 1023, "%s_write_APPEND", file_name);
int err = hos_open_fd(bucket, object, callback, NULL, 0, &fd);
if (hos_write(fd, buf, buffer.st_size) != HOS_CLIENT_OK)
hos_open_fd(bucket, object, callback, NULL, 0, &fd);
int ret = hos_write(fd, buf, buffer.st_size);
if (ret == HOS_CLIENT_OK)
{
printf("debug: hos_write successed 1st.\n");
}
else if (ret == HOS_IN_CACHE)
{
printf("debug: hos_write in cache.\n");
}
else
{
printf("error: hos_write failed 1st!\n");
}
if (hos_write(fd, buf, buffer.st_size) != HOS_CLIENT_OK)
ret = hos_write(fd, buf, buffer.st_size);
if (ret == HOS_CLIENT_OK)
{
printf("debug: hos_write successed 2nd.\n");
}
else if (ret == HOS_IN_CACHE)
{
printf("debug: hos_write in cache.\n");
}
else
{
printf("error: hos_write failed 2nd!\n");
}
hos_close_fd(fd);
ret = hos_close_fd(fd);
if (ret == HOS_CLIENT_OK)
{
printf("debug: hos_close_fd successed.\n");
}
else if (ret == HOS_FD_CLOSE_BUT_SEND_FAILED)
{
printf("debug: hos send fialed, but close fd successed.\n");
}
else
{
printf("error: hos_close_fd failed.\n");
}
printf("hos_write buff end ...\n");
printf("hos_shutdown_instance start ...\n");

View File

@@ -7,6 +7,9 @@ link_directories(/usr/local/lib64/)
link_directories(/opt/MESA/lib/)
link_libraries(hos-client-cpp pthread)
include_directories(/opt/MESA/include)
include_directories(/opt/MESA/include/MESA)
include_directories("${CMAKE_SOURCE_DIR}/src/")
link_directories("${CMAKE_BINARY_DIR}/src")
#for ASAN
set(ASAN_OPTION "OFF" CACHE STRING " set asan type chosen by the user, using OFF as default")
@@ -27,5 +30,6 @@ endif()
# end of for ASAN
add_executable(HosClientPerformance HosClientPerformance.cpp)
add_dependencies(HosClientPerformance ${lib_name}_shared)
target_link_libraries(HosClientPerformance hos-client-cpp MESA_handle_logger)

View File

@@ -121,8 +121,6 @@ static int upload_file(char *file, char *buff, int buff_len, thread_info_t *thre
long record[1000] = {0};
double variance = 0.00;
double average = 0.00;
long time = 0;
int err = 0;
//写文件
//clock_gettime(CLOCK_MONOTONIC, &tstart);
@@ -155,14 +153,14 @@ static int upload_file(char *file, char *buff, int buff_len, thread_info_t *thre
for (i = 0; i < g_test_count; i++)
{
clock_gettime(CLOCK_MONOTONIC, &tstart);
err = hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num, &fd[i]);
hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num, &fd[i]);
if (hos_write(fd[i], file, 0) != HOS_CLIENT_OK)
{
printf("error:hos_write file:%s\n", file);
return -1;
}
clock_gettime(CLOCK_MONOTONIC, &tend);
time = calc_time(tstart, tend);
long time = calc_time(tstart, tend);
time_upload += time;
record[i] += time;
}
@@ -189,7 +187,7 @@ static int upload_file(char *file, char *buff, int buff_len, thread_info_t *thre
{
sprintf(file_size, "%dB", buff_len);
}
sprintf(&performance_info[len], "%-20lu%-20s%-20ld%-20ld%-20lf%-20lf\n",
sprintf(&performance_info[len], "%-20zu%-20s%-20ld%-20ld%-20lf%-20lf\n",
thread_info->thread_num, file_size, time_write, time_upload, average, sqrt(variance));
return 0;
@@ -198,33 +196,27 @@ static int upload_file(char *file, char *buff, int buff_len, thread_info_t *thre
static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, char *performance_info)
{
size_t i = 0;
int j = 0;
size_t fd[1000] = {0};
size_t tmp = 0;
size_t rest = 0;
struct timespec tstart, ttmp;
size_t len;
char file_size[128];
char append_size[128];
size_t success_cnt = 0;
int ret = 0;
double variance = 0.00;
double average = 0.00;
long record[30000] = {0};
int err = 0;
if (g_mode & APPEND_MODE)
{
err = hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num, &fd[0]);
hos_open_fd(thread_info->bucket, thread_info->object, callback, NULL, thread_info->thread_num, &fd[0]);
for (i = 0; i < g_test_count; i++)
{
clock_gettime(CLOCK_MONOTONIC, &tstart);
j = 0;
int j = 0;
while (1)
{
tmp = j * g_append_size;
rest = buff_len - tmp;
size_t tmp = j * g_append_size;
size_t rest = buff_len - tmp;
if (rest <= g_append_size)
{
hos_write(fd[0], &buff[tmp], rest);
@@ -259,15 +251,16 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch
}
sprintf(append_size, "%gK", (double)g_append_size / 1024);
len = strlen(performance_info);
sprintf(&performance_info[len], "%-20lu%-20s%-20s%-20lu%-20lf%-20lf\n",
sprintf(&performance_info[len], "%-20zu%-20s%-20s%-20zu%-20lf%-20lf\n",
thread_info->thread_num, file_size, append_size, g_test_count, average, sqrt(variance));
}
else
{
size_t success_cnt = 0;
for (i = 0; i < g_test_count; i++)
{
clock_gettime(CLOCK_MONOTONIC, &tstart);
ret = hos_upload_buf(thread_info->bucket, thread_info->object, buff, buff_len, callback, NULL, thread_info->thread_num);
int ret = hos_upload_buf(thread_info->bucket, thread_info->object, buff, buff_len, callback, NULL, thread_info->thread_num);
if (ret == HOS_CLIENT_OK)
{
success_cnt++;
@@ -306,7 +299,7 @@ static int upload_buff(char * buff, int buff_len, thread_info_t *thread_info, ch
}
sprintf(append_size, "%gK", (double)g_append_size / 1024);
len = strlen(performance_info);
sprintf(&performance_info[len], "%-20lu%-20s%-20d%-20lu%-20lf%-20lf\n",
sprintf(&performance_info[len], "%-20zu%-20s%-20d%-20zu%-20lf%-20lf\n",
thread_info->thread_num, file_size, 0, g_test_count, average, sqrt(variance));
}
@@ -327,7 +320,6 @@ static void *put_object_thread(void *ptr)
thread_info_t *thread_info = (thread_info_t *)ptr;
char file[128];
size_t buff_len;
int ret;
int i;
char *buff = NULL;
@@ -351,7 +343,7 @@ static void *put_object_thread(void *ptr)
{
if (g_file_name[i][0] == '\0')
break;
ret = file_to_buffer(g_file_name[i], buff, &buff_len);
int ret = file_to_buffer(g_file_name[i], buff, &buff_len);
if (ret == -1)
{
free(buff);
@@ -364,7 +356,7 @@ static void *put_object_thread(void *ptr)
}
else
{
sprintf(file, "./file/file_%lu_%d", thread_info->thread_num, i);
sprintf(file, "./file/file_%zu_%d", thread_info->thread_num, i);
upload_file(file, buff, buff_len, thread_info, performance_info);
}
}
@@ -509,7 +501,7 @@ int main(int argc, char *argv[])
for ( thread_num = 0; thread_num < thread_sum; thread_num++ )
{
thread_info[thread_num].thread_num = thread_num;
sprintf(thread_info[thread_num].object, "%s-%lu", object, thread_num);
sprintf(thread_info[thread_num].object, "%s-%zu", object, thread_num);
sprintf(thread_info[thread_num].bucket, "%s", bucket);
if(pthread_create(&thread[thread_num], NULL, put_object_thread, (void *)&thread_info[thread_num]))

View File

@@ -4,21 +4,22 @@ aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SRCS)
include_directories("/opt/MESA/include")
include_directories("/opt/MESA/include/MESA")
include_directories("${CMAKE_BINARY_DIR}/support/GoogleTest/include/")
link_directories("/opt/MESA/lib")
link_directories("${CMAKE_BINARY_DIR}/support/GoogleTest/include/")
link_directories("${CMAKE_BINARY_DIR}/src/")
link_directories("${CMAKE_BINARY_DIR}/support/GoogleTest/lib/")
#link_libraries(hos-client-cpp gtest gtest_main pthread)
# coverage
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fprofile-arcs -ftest-coverage")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fprofile-arcs -ftest-coverage")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage -fPIE")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fprofile-arcs -ftest-coverage -fPIE")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fprofile-arcs -ftest-coverage -fPIE")
add_definitions(-g -W -Wall -std=c++11)
add_definitions(-g -W -Wall -std=c++11 -fPIE)
#add_executable(gtest_hos_client gtest_hos_init_instance.cpp gtest_hos_get_instance.cpp gtest_hos_close_fd.cpp gtest_hos_open_fd.cpp)
#add_executable(gtest_hos_client CheckHosClient.cpp gtest_hos_init_instance.cpp)
add_executable(gtest_hos_client ${SRCS})
add_executable(gtest_hos_client gtest_hos_write.cpp CheckHosClient.cpp)
# add_executable(gtest_hos_client ${SRCS})
add_dependencies(gtest_hos_client ${lib_name}_shared gtest)
target_link_libraries(gtest_hos_client hos-client-cpp gtest gtest_main pthread)

View File

@@ -1,7 +1,7 @@
#ifndef __CHECKHOS_CLIENT_H__
#define __CHECKHOS_CLIENT_H__
#include <aws/external/gtest.h>
#include "gtest/gtest.h"
#include "../src/hos_client.h"
#include "../src/hos_common.h"

View File

@@ -68,26 +68,6 @@ hos_fs2_path="./log/hos_fs2_log"
hos_fs2_format=0
#default
[hos_default_conf]
hos_log_path="./hoslog"
#default
hos_log_level=30
#default
hos_poolsize=10
#default
hos_cache_size=102400
#default
hos_cache_count=10
#default
hos_fd_live_time_ms=1000
#default
hos_fs2_serverip=127.0.0.1
hos_fs2_serverport=10086
hos_fs2_path="./log/hos_fs2_log"
#default
hos_fs2_format=0
#default
[hos_error_server_conf]
hos_serverip=192.168.40.146
hos_serverport=9098
@@ -111,3 +91,27 @@ hos_fs2_path="./log/hos_fs2_log"
#default
hos_fs2_format=0
#default
[hos_cache_size_zero_conf]
hos_serverip=127.0.0.1
hos_serverport=9098
hos_accesskeyid="default"
hos_secretkey="default"
hos_log_path="./hoslog"
#default
hos_log_level=30
#default
hos_poolsize=10
#default
hos_cache_size=0
#default
hos_cache_count=10
#default
hos_fd_live_time_ms=1000
#default
hos_fs2_serverip=127.0.0.1
hos_fs2_serverport=10086
hos_fs2_path="./log/hos_fs2_log"
#default
hos_fs2_format=0
#defaul

View File

@@ -101,7 +101,6 @@ TEST(hos_close_fd, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -147,7 +146,6 @@ TEST(hos_close_fd, paramer_error)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -191,7 +189,6 @@ TEST(hos_close_fd, fd_not_exits)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);

View File

@@ -80,7 +80,6 @@ TEST(hos_get_instance, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);

View File

@@ -71,7 +71,6 @@ TEST(hos_init_instance, normal)
memset(&expect_hos_instance, 0, sizeof(expect_hos_instance));
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -133,7 +132,6 @@ TEST(hos_init_instance, server_conn_failed)
memset(&expect_hos_instance, 0, sizeof(expect_hos_instance));
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);

View File

@@ -112,7 +112,6 @@ TEST(hos_open_fd, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -148,7 +147,6 @@ TEST(hos_open_fd, paramer_error)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -184,7 +182,6 @@ TEST(hos_open_fd, over_threadnums)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);

View File

@@ -72,7 +72,6 @@ TEST(hos_shutdown_instance, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -108,7 +107,6 @@ TEST(hos_shutdown_instance, shutdown_more)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);

View File

@@ -99,7 +99,6 @@ TEST(hos_upload_buff, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -139,7 +138,6 @@ TEST(hos_upload_buff, bucket_not_exits)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -175,7 +173,6 @@ TEST(hos_upload_buff, param_error)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);

View File

@@ -101,7 +101,6 @@ TEST(hos_upload_file, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -137,7 +136,6 @@ TEST(hos_upload_file, param_error)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -173,7 +171,6 @@ TEST(hos_upload_file, file_not_exits)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -221,7 +218,6 @@ TEST(hos_upload_file, bucket_not_exits)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);

View File

@@ -202,7 +202,6 @@ TEST(hos_write, normal)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -304,7 +303,6 @@ TEST(hos_write, bucket_not_exits)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -401,7 +399,6 @@ TEST(hos_write, sync_mode)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -481,14 +478,14 @@ TEST(hos_write, sync_mode_bucket_not_exits)
ASSERT_TRUE(((hos_fd_context_t *)fd1)->cache != NULL);
ret = hos_close_fd(fd);
ASSERT_EQ(ret, HOS_CLIENT_OK);
ASSERT_EQ(ret, HOS_FD_CLOSE_BUT_SEND_FAILED);
data_info->tx_failed_bytes[0] += data_info->cache[0];
data_info->tx_failed_pkts[0] += 1;
data_info->cache[0] = 0;
CheckHosInstance(hos_instance, &expect_hos_instance);
CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
ret = hos_close_fd(fd1);
ASSERT_EQ(ret, HOS_CLIENT_OK);
ASSERT_EQ(ret, HOS_FD_CLOSE_BUT_SEND_FAILED);
data_info->tx_failed_bytes[1] += data_info->cache[1];
data_info->tx_failed_pkts[1] += 1;
data_info->cache[1] = 0;
@@ -501,7 +498,6 @@ TEST(hos_write, sync_mode_bucket_not_exits)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -546,7 +542,6 @@ TEST(hos_write, paramer_error)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -584,7 +579,6 @@ TEST(hos_write, fd_not_find)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -630,7 +624,6 @@ TEST(hos_write, over_threadnums)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
data_info_t *data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
free(data_info->cache);
free(data_info->rx_bytes);
@@ -649,6 +642,65 @@ TEST(hos_write, not_init_instance)
ASSERT_EQ(ret, HOS_INSTANCE_NOT_ENABLE);
}
TEST(hos_write, cache_size_zero)
{
hos_instance_s expect_hos_instance;
hos_client_handle_t expect_hos_handle;
hos_fd_context_t expect_fd_info;
int thread_num = 1;
data_info_t *data_info = NULL;
hos_instance hos_instance = hos_init_instance(HOS_CONF, "hos_cache_size_zero_conf", thread_num);
gtest_hos_instance_init(&expect_hos_instance);
CheckHosInstance(hos_instance, &expect_hos_instance);
gtest_hos_handle_init(&expect_hos_handle, thread_num);
data_info = (data_info_t *)expect_hos_handle.hos_func.fs2_info.reserved;
expect_hos_handle.hos_config.cache_size = 0;
CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
size_t fd = 0;
hos_open_fd(HOS_BUCKET, "object_buff", hos_write_append_cb, (void *)"object_buff", 0, &fd);
CheckHosInstance(hos_instance, &expect_hos_instance);
CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
gtest_hos_fd_init(&expect_fd_info);
expect_fd_info.object = (char *)"object_buff";
expect_fd_info.cache_rest = 0;
expect_fd_info.callback = (void *)hos_write_append_cb;
expect_fd_info.userdata = (void *)"object_buff";
CheckStructGHosFdContext((hos_fd_context_t *)fd, &expect_fd_info);
int ret = hos_write(fd, HOS_BUFF, strlen(HOS_BUFF));
ASSERT_EQ(ret, HOS_CLIENT_OK);
data_info->rx_bytes[0] += strlen(HOS_BUFF);
data_info->rx_pkts[0] += 1;
CheckHosInstance(hos_instance, &expect_hos_instance);
//CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
ret = hos_close_fd(fd);
ASSERT_EQ(ret, HOS_CLIENT_OK);
data_info->tx_bytes[0] += strlen(HOS_BUFF);
data_info->tx_pkts[0] += 1;
data_info->cache[0] = 0;
CheckHosInstance(hos_instance, &expect_hos_instance);
CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
ret = hos_shutdown_instance();
ASSERT_EQ(ret, HOS_CLIENT_OK);
expect_hos_instance.status = 0;
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
free(data_info->tx_bytes);
free(data_info->tx_pkts);
free(data_info->tx_failed_bytes);
free(data_info->tx_failed_pkts);
memset(&expect_hos_handle, 0, sizeof(hos_client_handle_s));
CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
}
static void read_file(char *path, char **content, int *len)
{
FILE *fp;
@@ -699,7 +751,7 @@ static void *hos_function(void *ptr)
{
snprintf(object[i], 1024, "object_%zu_%d", thread_id, i);
int err = hos_open_fd(HOS_BUCKET, object[i], hos_callback, object[i], 0, &fd[i]);
EXPECT_EQ(err, i + 1);
EXPECT_EQ(err, HOS_CLIENT_OK);
CheckHosInstance(hos_instance, &expect_hos_instance);
CheckStructGHosHandle(&g_hos_handle, &expect_hos_handle);
gtest_hos_fd_init(&expect_fd_info[thread_id][i]);
@@ -746,7 +798,6 @@ static void *hos_function(void *ptr)
expect_hos_instance.hos_url_prefix = NULL;
CheckHosInstance(hos_instance, &expect_hos_instance);
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
free(data_info->cache);
free(data_info->rx_bytes);
free(data_info->rx_pkts);
@@ -769,4 +820,4 @@ TEST(hos_write, mutil_thread)
{
pthread_create(&thread_num[i], NULL, hos_function, (void *)i);
}
}
}

View File

@@ -5,8 +5,8 @@ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -shared -fPIC -std=c++11")
include_directories(${CMAKE_INSTALL_PREFIX}/include/MESA)
link_directories(${CMAKE_INSTALL_PREFIX}/lib)
option(HOS_MOCK "If enabled, the SDK will be built using a MOCK .cpp file for S3." OFF)
option(HOS_MESA_LOG "If enabled, the SDK will be built using a MOCK .cpp file for S3." ON)
option(HOS_MOCK "If enabled, the SDK will be built using a MOCK .cpp file for HOS." OFF)
option(HOS_MESA_LOG "If enabled, the SDK will be built using a MESA_LOG .cpp file for HOS." ON)
file(GLOB HOS_HEADERS "*.h")
file(GLOB HOS_SOURCE "*.cpp")
@@ -35,7 +35,6 @@ target_link_libraries(${lib_name}_shared
libaws-c-event-stream.a
libaws-cpp-sdk-core.a
libaws-cpp-sdk-s3.a
libtesting-resources.a
"-Wl,--no-whole-archive"
libcurl.so
libpthread.so

View File

@@ -14,10 +14,6 @@ extern "C"
#include <aws/s3/model/CreateBucketRequest.h>
#include <fstream>
#include <iostream>
#include <aws/external/gtest.h>
#include <aws/testing/platform/PlatformTesting.h>
#include <aws/testing/TestingEnvironment.h>
#include <aws/testing/MemoryTesting.h>
#ifdef HOS_MOCK
#include "mock/hos_mock.h"
#endif
@@ -45,12 +41,12 @@ extern "C"
struct hos_instance_s g_hos_instance;
hos_client_handle_t g_hos_handle;//一个进程只允许有一个g_hos_handle
static std::mutex m_client_lock;
static std::mutex m_instance_lock;
static std::mutex m_delete_lock;
static Aws::SDKOptions g_options;
Aws::Auth::AWSCredentials g_credentials;
Aws::Client::ClientConfiguration *g_client_config;
static std::mutex m_hos_client_lock;
static std::mutex m_hos_instance_lock;
static std::mutex m_hos_delete_lock;
static Aws::SDKOptions g_hos_options;
Aws::Auth::AWSCredentials g_hos_credentials;
Aws::Client::ClientConfiguration *g_hos_client_config;
static int hos_delete_fd(size_t fd, size_t thread_id)
{
@@ -152,7 +148,7 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
if (a_fd_context->mode & APPEND_MODE)
{
std::lock_guard<std::mutex> locker(m_delete_lock);
std::lock_guard<std::mutex> locker(m_hos_delete_lock);
//APPEND MODE 保留fd
atomic_add(&(a_fd_context->recive_cnt), 1);
if (a_fd_context->fd_status == HOS_FD_CANCEL)
@@ -177,6 +173,8 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
}
atomic_sub(&g_hos_handle.task_num[thread_id], 1);
atomic_sub(&g_hos_handle.task_context[thread_id], stream_len);
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"thread_id:%zu, task_num:%zu, task_content:%zu", thread_id, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]);
}
static int hos_attempt_connection()
@@ -421,22 +419,22 @@ static void hos_client_create()
{
hos_config_t *hos_conf = &g_hos_handle.hos_config;
Aws::InitAPI(g_options);
g_client_config = new Aws::Client::ClientConfiguration();
g_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid);
g_credentials.SetAWSSecretKey(hos_conf->secretkey);
Aws::InitAPI(g_hos_options);
g_hos_client_config = new Aws::Client::ClientConfiguration();
g_hos_credentials.SetAWSAccessKeyId(hos_conf->accesskeyid);
g_hos_credentials.SetAWSSecretKey(hos_conf->secretkey);
//初始化
char endpoint[128];
snprintf(endpoint, 128, "http://%s:%u/hos/", hos_conf->ip, hos_conf->port);
g_client_config->endpointOverride.append(endpoint);
g_client_config->verifySSL = false;
g_client_config->enableEndpointDiscovery = true;
g_hos_client_config->endpointOverride.append(endpoint);
g_hos_client_config->verifySSL = false;
g_hos_client_config->enableEndpointDiscovery = true;
if (hos_conf->pool_thread_size > 0)
{
//异步模式
//config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY)); //支持线程池
g_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池
g_hos_client_config->executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(hos_conf->pool_thread_size, Aws::Utils::Threading::OverflowPolicy::QUEUE_TASKS_EVENLY_ACCROSS_THREADS)); //支持线程池
}
else
{
@@ -444,12 +442,12 @@ static void hos_client_create()
}
#ifndef HOS_MOCK
g_hos_handle.S3Client = new Aws::S3::S3Client(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
g_hos_handle.S3Client = new Aws::S3::S3Client(g_hos_credentials, *g_hos_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#else
g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_credentials, *g_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
g_hos_handle.S3Client = new Aws::S3::S3ClientMock(g_hos_credentials, *g_hos_client_config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
#endif
g_hos_instance.hos_url_prefix = g_client_config->endpointOverride.c_str();
g_hos_instance.hos_url_prefix = g_hos_client_config->endpointOverride.c_str();
//hos 检测服务端是否可以连接上
int ret = hos_attempt_connection();
if (ret != HOS_CLIENT_OK && ret != NETWORK_CONNECTION)
@@ -558,7 +556,9 @@ static int hos_putobject_async(Aws::S3::Model::PutObjectRequest& request, size_t
atomic_add(&g_hos_handle.task_context[thread_id], stream_len);
//不算真正成功需要等到PutObjectAsyncFinished的结果
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__,
"debug: [%s/%s/%s] PutObjectAsync success.", g_hos_instance.hos_url_prefix, bucket, object);
"debug: [%s/%s/%s] PutObjectAsync success. thread:%zu, task_num:%zu, task_content:%zu",
g_hos_instance.hos_url_prefix, bucket, object,
thread_id, g_hos_handle.task_num[thread_id], g_hos_handle.task_context[thread_id]);
return HOS_CLIENT_OK;
}
@@ -653,7 +653,7 @@ const char *hos_get_upload_endpoint()
hos_instance hos_init_instance(const char *conf_path, const char *module, size_t thread_num)
{
std::lock_guard<std::mutex> locker(m_client_lock);
std::lock_guard<std::mutex> locker(m_hos_client_lock);
hos_config_t *hos_conf = &g_hos_handle.hos_config;
memset(&g_hos_handle, 0, sizeof(g_hos_handle));
@@ -778,7 +778,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
return HOS_PARAMETER_ERROR;
}
mode = data?1:0; // 0, file mode; 1 buf mode
mode = data?FILE_MODE:BUFF_MODE; // 0, file mode; 1 buf mode
// Create and configure the asynchronous put object request.
Aws::S3::Model::PutObjectRequest request;
@@ -830,6 +830,7 @@ static int hos_upload_stream(const char *bucket, const char *object, const char
else
{
ret = hos_putobject_sync(request, data_len, thread_id, &hos_fd);
hos_delete_fd((size_t)hos_fd, thread_id);
}
return ret;
@@ -974,18 +975,24 @@ int hos_write(size_t fd, const char *stream, size_t stream_len)
}
Aws::String buffer(stream, stream_len);
*a_fd_context->cache << buffer;
a_fd_context->cache_rest -= stream_len;
if (data_info != NULL)
data_info->cache[thread_id] += stream_len;
if (a_fd_context->cache_count == 0 || --a_fd_context->cache_count)
{
//cache_count == 0,不设置cache_count的情况
//cache_count > 0,设置cache_count的情况
if (a_fd_context->cache_rest > 0)
data_info->cache[thread_id] += stream_len;
}
if (a_fd_context->cache_rest > stream_len)
{
a_fd_context->cache_rest -= stream_len;
if (a_fd_context->cache_count > 0 && --a_fd_context->cache_count)
{
return HOS_IN_CACHE;
}
}
else
{
a_fd_context->cache_rest = 0;
}
request.SetBody(a_fd_context->cache);
// add headers
@@ -1015,7 +1022,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len)
//恢复fd 的cache设置
if (a_fd_context->mode & APPEND_MODE)
{
if (data_info)
if (data_info && data_info->cache[thread_id])
data_info->cache[thread_id] -= upload_len;
a_fd_context->cache.reset();
a_fd_context->cache = NULL;
@@ -1106,7 +1113,7 @@ int hos_close_fd(size_t fd)
{
//异步APPEND 模式,判断是否可以释放
//异步其他模式在PutObjectAsyncFinished出释放fd
std::lock_guard<std::mutex> locker(m_delete_lock);
std::lock_guard<std::mutex> locker(m_hos_delete_lock);
a_fd_context->fd_status = HOS_FD_CANCEL;
if (a_fd_context->mode == (BUFF_MODE | APPEND_MODE) && a_fd_context->position == a_fd_context->recive_cnt)
{
@@ -1129,7 +1136,7 @@ int hos_close_fd(size_t fd)
int hos_shutdown_instance()
{
std::lock_guard<std::mutex> locker(m_instance_lock);
std::lock_guard<std::mutex> locker(m_hos_instance_lock);
hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
if (atomic_read(&g_hos_instance.status) == INSTANCE_UNINIT_STATE)
@@ -1167,8 +1174,6 @@ int hos_shutdown_instance()
usleep(500 * 1000);
}
Aws::Vector<Aws::S3::Model::Bucket>().swap(g_hos_handle.buckets);
if (hos_func->fd_thread)
{
hos_func->fd_thread_status = 1;
@@ -1234,7 +1239,7 @@ int hos_shutdown_instance()
}
MESA_HANDLE_RUNTIME_LOG(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "debug: [%s] delete s3client.", g_hos_instance.hos_url_prefix);
Aws::ShutdownAPI(g_options);
Aws::ShutdownAPI(g_hos_options);
MESA_destroy_runtime_log_handle(g_hos_handle.log);
g_hos_handle.log = NULL;
memset(&g_hos_handle, 0 , sizeof(g_hos_handle));

View File

@@ -131,7 +131,7 @@ typedef struct hos_fd_context_s
size_t cache_count;
size_t position;
size_t recive_cnt;
long cache_rest;
size_t cache_rest;
int fd_status;
#define HOS_FD_REGISTER 0
#define HOS_FD_CANCEL 1

View File

@@ -2,8 +2,8 @@ include(ExternalProject)
set(AWSS3_ROOT ${CMAKE_CURRENT_BINARY_DIR})
set(AWSS3_URL ${CMAKE_CURRENT_SOURCE_DIR}/aws-sdk-cpp-master.zip)
set(AWSS3_URL_MD5 a38984c137d7768dec2a7cde02874c32)
set(AWSS3_CONFIGURE cd ${AWSS3_ROOT}/aws-sdk-cpp-master/src/aws-sdk-cpp-master && cmake . -DBUILD_ONLY=s3)
set(AWSS3_URL_MD5 fe191c53c566e3ec58c69d016e0a18f7)
set(AWSS3_CONFIGURE cd ${AWSS3_ROOT}/aws-sdk-cpp-master/src/aws-sdk-cpp-master && cmake . -DBUILD_ONLY=s3 -DENABLE_TESTING=OFF)
set(AWSS3_MAKE cd ${AWSS3_ROOT}/aws-sdk-cpp-master/src/aws-sdk-cpp-master && make)
set(AWSS3_INSTALL cd ${AWSS3_ROOT}/aws-sdk-cpp-master/src/aws-sdk-cpp-master && make install PREFIX=${SUPPORT_INSTALL_PREFIX})

Binary file not shown.