Merge branch 'develop-10.19-fs2'

This commit is contained in:
pengxuanzheng
2020-11-11 16:01:04 +08:00
10 changed files with 266 additions and 51 deletions

View File

@@ -14,8 +14,8 @@ set(CPACK_PACKAGING_INSTALL_PREFIX ${CMAKE_INSTALL_PREFIX})
set(CPACK_PACKAGE_VERSION "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}.${VERSION_BUILD}")
execute_process(COMMAND bash -c "echo -ne \"`uname -r | awk -F'.' '{print $5\".\"$6\".\"$7}'`\"" OUTPUT_VARIABLE SYSTEM_VERSION)
execute_process(COMMAND sh changelog.sh WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/cmake)
SET(CPACK_RPM_CHANGELOG_FILE ${PROJECT_SOURCE_DIR}/cmake/changelog.txt)
#execute_process(COMMAND sh changelog.sh WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/cmake)
#SET(CPACK_RPM_CHANGELOG_FILE ${PROJECT_SOURCE_DIR}/cmake/changelog.txt)
# RPM Build
set(CPACK_GENERATOR "RPM")

View File

@@ -50,7 +50,7 @@ int file_to_buffer(const char *file, char *buffer, size_t *len)
return 0;
}
void callback(bool result, const char *error, void *userdata)
void callback(bool result, const char *error, const char *bucket, const char *object, void *userdata)
{
userdata_t *data = (userdata_t *)userdata;
clock_gettime(CLOCK_MONOTONIC, data->finished);

View File

@@ -50,7 +50,7 @@ int file_to_buffer(const char *file, char *buffer, size_t *len)
return 0;
}
void callback(bool result, const char *error, void *userdata)
void callback(bool result, const char *error, const char *bucket, const char *object, void *userdata)
{
userdata_t *data = (userdata_t *)userdata;
clock_gettime(CLOCK_MONOTONIC, data->finished);
@@ -80,7 +80,7 @@ int main(int argc, char *argv[])
hos_init_api();
debuginfo("hos_client_init start ...\n");
hos_client_handle handle = hos_client_create("http://192.168.40.223:9098/hos/", "default", "default", 4000);
hos_client_handle handle = hos_client_create("http://192.168.40.223:9098/hos/", "default", "default", 400);
//hos_client_handle handle = hos_client_create("http://192.168.32.10:9098/hos/", "default", "default", 4);
if (handle == NULL)
{

View File

@@ -8,6 +8,24 @@ link_directories(/opt/MESA/lib/)
link_libraries(hos-client-cpp pthread)
include_directories(/opt/MESA/include)
#for ASAN
set(ASAN_OPTION "OFF" CACHE STRING " set asan type chosen by the user, using OFF as default")
set_property(CACHE ASAN_OPTION PROPERTY STRINGS OFF ADDRESS THREAD)
message(STATUS "ASAN_OPTION='${ASAN_OPTION}'")
if(ASAN_OPTION MATCHES "ADDRESS")
set(CMAKE_C_FLAGS "${CMAKADDRESS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
elseif(ASAN_OPTION MATCHES "THREAD")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
endif()
# end of for ASAN
add_executable(HosClientPerformance HosClientPerformance.cpp)
target_link_libraries(HosClientPerformance hos-client-cpp)

View File

@@ -102,7 +102,7 @@ int read_file_list(const char *path, char file_name[][256])
return 0;
}
static void callback(bool result, const char *error, void *userdata)
static void callback(bool result, const char *error, const char *bucket, const char *object, void *userdata)
{
#if 0
userdata_t *data = (userdata_t *)userdata;
@@ -541,6 +541,7 @@ int main(int argc, char *argv[])
hos_set_thread_sum(handle, conf.thread_sum);
hos_set_cache_size(handle, conf.append_size);
hos_set_cache_times(handle, 0);
//创建bucket
if (hos_create_bucket(handle, conf.bucket))

View File

@@ -12,6 +12,24 @@ add_library(${lib_name}_shared SHARED hos_client.cpp hos_hash.cpp)
target_link_libraries(${lib_name}_shared libaws-cpp-sdk-s3.so libaws-cpp-sdk-core.so libMESA_field_stat2.so libcurl.so)
set_target_properties(${lib_name}_shared PROPERTIES OUTPUT_NAME ${lib_name})
#for ASAN
set(ASAN_OPTION "OFF" CACHE STRING " set asan type chosen by the user, using OFF as default")
set_property(CACHE ASAN_OPTION PROPERTY STRINGS OFF ADDRESS THREAD)
message(STATUS "ASAN_OPTION='${ASAN_OPTION}'")
if(ASAN_OPTION MATCHES "ADDRESS")
set(CMAKE_C_FLAGS "${CMAKADDRESS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
elseif(ASAN_OPTION MATCHES "THREAD")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -DCMAKE_BUILD_TYPE=Debug -fsanitize=thread -fno-omit-frame-pointer")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lasan")
endif()
# end of for ASAN
add_dependencies(${lib_name}_shared aws-sdk-cpp-master-static)
#install(TARGETS ${lib_name}_shared LIBRARY DESTINATION /opt/MESA/lib)

View File

@@ -30,10 +30,13 @@ typedef struct hos_client_handle_s
Aws::S3::S3Client *S3Client;
Aws::SDKOptions options;
Aws::Vector<Aws::S3::Model::Bucket> buckets;
pthread_t fd_thread;
int fd_thread_status;
/* options */
size_t cache_size;
size_t cache_times;
size_t thread_sum;
size_t timeout;
/* expand */
screen_stat_handle_t fs2_handle;
pthread_t fs2_thread;
@@ -52,10 +55,18 @@ typedef struct hos_client_handle_s
int *rx_bytes_last;
}hos_client_handle_t;
hos_client_handle hos_handle;//一个进程只允许有一个hos_handle
hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM];
size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM];
Aws::SDKOptions options;
static inline size_t get_current_ms()
{
struct timespec timenow;
clock_gettime(CLOCK_MONOTONIC, &timenow);
return (timenow.tv_sec * 1000 + timenow.tv_nsec / 1000 / 1000 );
}
static size_t hash_get_min_free_fd(size_t thread_id)
{
size_t i = 0;
@@ -67,6 +78,18 @@ static size_t hash_get_min_free_fd(size_t thread_id)
return 0;
}
static int hos_delete_fd(size_t fd, size_t thread_id)
{
if (fd == 0)
{
return HOS_PARAMETER_ERROR;
}
delete_info_by_fd(&hash_hos_info[thread_id], fd);
fd_info[thread_id][fd] = 0;
return HOS_CLIENT_OK;
}
static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
const Aws::S3::Model::PutObjectRequest& request,
const Aws::S3::Model::PutObjectOutcome& outcome,
@@ -91,14 +114,23 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client,
return ;
}
put_finished_callback callback = (put_finished_callback)hos_info->callback;
callback(result, error, hos_info->userdata);
callback(result, hos_info->bucket, hos_info->object, error, hos_info->userdata);
if (hos_info->mode & APPEND_MODE)
{
//APPEND MODE 保留fd
hos_info->recive_cnt++;
#if 0
if (hos_info->fd_status == HOS_FD_INJECT)
{
if (hos_info->recive_cnt == hos_info->position)
hos_delete_fd(fd, thread_id);
}
#endif
}else
{
//完整上传 删除fd
hos_close_fd(fd, thread_id);
//hos_delete_fd(fd, thread_id);
hos_info->fd_status = HOS_FD_INJECT;
}
}
@@ -139,9 +171,12 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
return NULL;
}
hos_client_handle handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t));
memset(handle, 0, sizeof(hos_client_handle_t));
if (hos_handle)
{
return hos_handle;
}
hos_handle = (hos_client_handle)malloc(sizeof(hos_client_handle_t));
memset(hos_handle, 0, sizeof(hos_client_handle_t));
Aws::Client::ClientConfiguration config;
Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey);
@@ -151,21 +186,23 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
config.enableEndpointDiscovery = true;
config.executor = std::shared_ptr<Aws::Utils::Threading::PooledThreadExecutor>(std::make_shared<Aws::Utils::Threading::PooledThreadExecutor>(pool_size, Aws::Utils::Threading::OverflowPolicy::REJECT_IMMEDIATELY));//支持线程池
handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
handle->options = options;
hos_handle->S3Client = new Aws::S3::S3Client(credentials, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
hos_handle->options = options;
/* 获取当前用户的所有的buckets */
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets();
Aws::S3::Model::ListBucketsOutcome outcome = hos_handle->S3Client->ListBuckets();
if (outcome.IsSuccess())
if (!outcome.IsSuccess())
{
handle->buckets = outcome.GetResult().GetBuckets();
return NULL;
}
handle->cache_size = 0;
handle->cache_times = 1;
handle->thread_sum = 1;
hos_handle->buckets = outcome.GetResult().GetBuckets();
hos_handle->cache_size = 0;
hos_handle->cache_times = 1;
hos_handle->thread_sum = 1;
hos_handle->timeout = 1000;
return handle;
return hos_handle;
}
static void *fs2_statistics(void *ptr)
@@ -420,7 +457,7 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const
sprintf(buf, "%lu %lu", thread_id, fd);
context->SetUUID(buf);
hos_info_t info = {fd, 0, handle, bucket, object, (void *)callback, userdata, NULL, 0, 0, 0 };
hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 };
add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1;
@@ -440,6 +477,37 @@ int hos_upload_buf(hos_client_handle handle, const char *bucket, const char *obj
return hos_upload_stream(handle, bucket, object, buf, buf_len, callback, userdata, thread_id, 1);
}
static void *hos_fd_manage(void *ptr)
{
hos_info_t *hos_info;
hos_client_handle handle = (hos_client_handle)ptr;
size_t thread_sum = handle->thread_sum;
size_t thread_num;
size_t fd;
while(1)
{
if (handle->fd_thread_status)
break;
for (thread_num = 0; thread_num < thread_sum; thread_num++)
{
for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++)
{
if (!fd_info[thread_num][fd])
break;
hos_info = find_info_by_fd(hash_hos_info[thread_num], fd);
if (!hos_info)
break;
if (hos_info->fd_status == HOS_FD_REGISTER)
continue;
if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms()))
hos_delete_fd(fd, thread_num);
}
}
usleep(1000);
}
pthread_exit(NULL);
}
int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object, put_finished_callback callback, void *userdata, size_t thread_id, int mode)
{
if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (thread_id > handle->thread_sum))
@@ -453,10 +521,16 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object
return HOS_FD_NOT_ENOUGH;
}
hos_info_t info = {fd, mode, handle, bucket, object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, };
hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,};
add_hos_info(&hash_hos_info[thread_id], &info);
fd_info[thread_id][fd] = 1;
#if 1
if (handle->fd_thread == 0)
{
handle->fd_thread_status = 0;
pthread_create(&handle->fd_thread, NULL, hos_fd_manage, handle);
}
#endif
return fd;
}
@@ -495,10 +569,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
Aws::S3::S3Client& S3Client = *(handle->S3Client);
// Create and configure the asynchronous put object request.
// create and configure the asynchronous put object request.
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(hos_info->bucket);
request.SetKey(hos_info->object);
//设置上传数据类型
if (hos_info->mode & BUFF_MODE)
@@ -511,27 +583,53 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
{
hos_info->cache = Aws::MakeShared<Aws::StringStream>("append mode");
}
if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest))
if (hos_info->cache_times == 0)
{
// cache
Aws::String buffer (stream, stream_len);
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
//不设置cache_times的情况下
if (stream_len < hos_info->cache_rest)
{
return HOS_CLIENT_OK;
// cache
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
return HOS_CLIENT_OK;
}
}else if (stream_len >= hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
}
}else if (stream_len > hos_info->cache_rest)
}else
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
}else
{
//over cache_times
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
//设置cache times的情况下
if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest))
{
// cache
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
hos_info->cache_rest -= stream_len;
if (hos_info->cache_rest > 0)
{
return HOS_CLIENT_OK;
}
}else if (stream_len > hos_info->cache_rest)
{
// multi handle
flag = 1;
Aws::String buffer (stream, hos_info->cache_rest);
*hos_info->cache << buffer;
rest = stream_len - hos_info->cache_rest;
}else
{
//over cache_times
Aws::String buffer (stream, stream_len);
*hos_info->cache << buffer;
}
}
request.SetBody(hos_info->cache);
@@ -565,6 +663,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
request.SetBody(input_data);
}
request.SetBucket(hos_info->bucket);
request.SetKey(hos_info->object);
//设置回调函数
std::shared_ptr<Aws::Client::AsyncCallerContext> context =
Aws::MakeShared<Aws::Client::AsyncCallerContext>("");
@@ -612,13 +713,54 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
int hos_close_fd(size_t fd, size_t thread_id)
{
hos_info_t *hos_info;
char num[128];
char buf[128];
if (fd == 0)
{
return HOS_PARAMETER_ERROR;
}
if (fd_info[thread_id][fd])
{
hos_info = find_info_by_fd(hash_hos_info[thread_id], fd);
}
if (hos_info == NULL)
{
return HOS_CLIENT_OK;
}
delete_info_by_fd(&hash_hos_info[thread_id], fd);
fd_info[thread_id][fd] = 0;
//close fd 之前发送append的缓存中内容
if ((hos_info->mode & BUFF_MODE) && (hos_info->mode & APPEND_MODE))
{
if (hos_info->cache_rest != hos_info->handle->cache_size)
{
//handle = (hos_client_handle)hos_info->handle;
Aws::S3::S3Client& S3Client = *(hos_info->handle->S3Client);
// Create and configure the asynchronous put object request.
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(hos_info->bucket);
request.SetKey(hos_info->object);
request.SetBody(hos_info->cache);
// add headers
snprintf(num, 128, "%lu", ++hos_info->position);
Aws::Map<Aws::String, Aws::String> headers;
headers["x-hos-upload-type"] = "append";
headers["x-hos-position"] = num;
request.SetMetadata(headers);
std::shared_ptr<Aws::Client::AsyncCallerContext> context =
Aws::MakeShared<Aws::Client::AsyncCallerContext>("");
sprintf(buf, "%lu %lu", thread_id, fd);
context->SetUUID(buf);
S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
}
}
hos_info->fd_status = HOS_FD_INJECT;
hos_info->cache.reset();
hos_info->overtime = get_current_ms() + hos_info->timeout;
return HOS_CLIENT_OK;
}
@@ -635,6 +777,11 @@ int hos_client_destory(hos_client_handle handle)
Aws::Vector<Aws::S3::Model::Bucket>().swap(handle->buckets);
if (handle->fd_thread)
{
handle->fd_thread_status = 1;
pthread_join(handle->fd_thread, NULL);
}
for (i = 0; i < handle->thread_sum; i++)
{
delete_all(&hash_hos_info[i]);

View File

@@ -73,7 +73,7 @@ enum s3errors
OBJECT_NOT_IN_ACTIVE_TIER
};
typedef void (*put_finished_callback)(bool, const char *, void *);
typedef void (*put_finished_callback)(bool, const char *, const char *, const char *, void *);
/*************************************************************************************

View File

@@ -13,20 +13,28 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input)
{
value = (hos_info_t *)malloc(sizeof(hos_info_t));
memcpy(value, input, sizeof(hos_info_t));
value->object = (char *)malloc(strlen(input->object) + 1);
value->bucket = (char *)malloc(strlen(input->bucket) + 1);
memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1);
memcpy(value->object, input->object, strlen(input->object) + 1);
HASH_ADD_INT(*handle, fd, value);
}
else
{
value->mode = input->mode;
value->handle = input->handle;
value->bucket = input->bucket;
value->object = input->object;
memcpy(value->bucket, input->bucket, strlen(input->bucket) + 1);
memcpy(value->object, input->object, strlen(input->object) + 1);
value->callback = input->callback;
value->userdata = input->userdata;
value->cache = input->cache;
value->cache_times = input->cache_times;
value->cache_rest = input->cache_rest;
value->position = input->position;
value->recive_cnt = input->recive_cnt;
value->fd_status = value->fd_status;
value->overtime = value->overtime;
value->timeout = value->timeout;
}
}
@@ -40,9 +48,18 @@ hos_info_t *find_info_by_fd(hos_info_t *handle, size_t fd)
void delete_info_by_fd(hos_info_t **handle, size_t fd)
{
hos_info_t *value = NULL;
HASH_FIND_INT(*handle, &fd, value);
if (value)
{
if (value->bucket)
{
free(value->bucket);
}
if (value->object)
{
free(value->object);
}
HASH_DEL(*handle, value);
free(value);
}
@@ -53,6 +70,15 @@ void delete_all(hos_info_t **handle)
hos_info_t *current, *tmp;
HASH_ITER(hh, *handle, current, tmp)
{
if (current->bucket)
{
free(current->bucket);
}
if (current->object)
{
free(current->object);
}
HASH_DEL(*handle, current);
free(current);
}
}

View File

@@ -15,15 +15,20 @@ typedef struct hos_info_s
size_t fd;
int mode;
hos_client_handle handle;
const char *bucket;
const char *object;
char *bucket;
char *object;
void *callback;
void *userdata;
std::shared_ptr<Aws::IOStream> cache;
//void *cache;
size_t cache_times;
size_t cache_rest;
size_t position;
size_t recive_cnt;
int fd_status;
#define HOS_FD_REGISTER 0
#define HOS_FD_INJECT 1
size_t overtime; //计算后超时的时间
size_t timeout; //配置的超时时间从status变成INJECT开始计时
UT_hash_handle hh;
}hos_info_t;