From e5f5062f51ea4ea7af418e1161e3139ae9fea984 Mon Sep 17 00:00:00 2001 From: pengxuanzheng Date: Tue, 20 Oct 2020 17:20:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81field=5Fstat2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/hos_client.cpp | 137 ++++++++++++++++++++++++++++++++++++++++++--- src/hos_client.h | 1 + 2 files changed, 131 insertions(+), 7 deletions(-) diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 805450b3..8398009d 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -6,6 +6,8 @@ extern "C" { #include +#include +#include } #include #include @@ -16,9 +18,9 @@ extern "C" #include #include #include -#include #include "hos_client.h" #include "hos_hash.h" +#include "field_stat2.h" #define MAX_HOS_CLIENT_THREAD_NUM 255 #define MAX_HOS_CLIENT_FD_NUM 65535 @@ -32,6 +34,18 @@ typedef struct hos_client_handle_s size_t cache_size; size_t cache_times; size_t thread_sum; + /* expand */ + screen_stat_handle_t fs2_handle; + pthread_t fs2_thread; + int fs2_status; +#define HOS_FS2_START 1 +#define HOS_FS2_STOP 2 + int *line_ids; + int *column_ids; + size_t tx_pkts; + size_t tx_bytes; + size_t rx_pkts; + size_t rx_bytes; }hos_client_handle_t; hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; @@ -140,6 +154,93 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi return handle; } +static void *fs2_statistics(void *ptr) +{ + hos_client_handle handle = (hos_client_handle)ptr; + + while(1) + { + if (handle->fs2_status == HOS_FS2_STOP) + { + break; + } + + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[0], FS_OP_SET, handle->tx_pkts); + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[1], FS_OP_SET, handle->tx_bytes); + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[2], FS_OP_SET, handle->rx_pkts); + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[3], FS_OP_SET, handle->rx_bytes); + + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[0], FS_OP_SET, handle->tx_pkts); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[1], FS_OP_SET, handle->tx_bytes); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[2], FS_OP_SET, handle->rx_pkts); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[3], FS_OP_SET, handle->rx_bytes); + + sleep(1); + } + pthread_exit(NULL); +} + +void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port) +{ + screen_stat_handle_t fs2_handle = NULL; + const char *app_name = "hos-sdk-client-cpp"; + int *line_ids = (int *)malloc(sizeof(int) * 2); + int *column_ids = (int *)malloc(sizeof(int) * 4); + int value = 0; + char buff[128]; + + fs2_handle = FS_create_handle(); + + FS_set_para(fs2_handle, APP_NAME, app_name, strlen(app_name) + 1); + value = 1;//true + FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); + if (path != NULL) + { + FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1); + } + value = 2;//append + FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value)); + value = 1; + FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format)); + value = 4096; + FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); + if (server_ip == NULL) + { + FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); + }else + { + FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip)); + } + + FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); + + //line info + snprintf(buff, sizeof(buff), "tx_pkts(MB)"); + line_ids[0] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "tx_bytes(MB)"); + line_ids[1] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "rx_pkts(MB)"); + line_ids[2] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "rx_bytes(MB)"); + line_ids[3] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "total"); + column_ids[0] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "per-second"); + column_ids[1] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff); + + handle->fs2_handle = fs2_handle; + handle->line_ids = line_ids; + handle->column_ids = column_ids; + handle->fs2_status = HOS_FS2_START; + + FS_start(fs2_handle); + + pthread_create(&handle->fs2_thread, NULL, fs2_statistics, handle); + + return ; +} + bool hos_verify_bucket(hos_client_handle handle, const char *bucket) { Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets(); @@ -288,6 +389,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id char buf[128]; int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完 int rest; // stream 剩余未处理的数据长度 + int ret = 0; if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM)) { return HOS_PARAMETER_ERROR; @@ -303,6 +405,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id } handle = (hos_client_handle)hos_info->handle; + + //field_stat2 record + handle->tx_pkts++; + handle->tx_bytes += stream_len; + Aws::S3::S3Client& S3Client = *(handle->S3Client); // Create and configure the asynchronous put object request. @@ -379,8 +486,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); - + ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); //恢复fd 的cache设置 if (hos_info->mode & APPEND_MODE) { @@ -388,11 +494,17 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_info->cache_rest = hos_info->handle->cache_size; hos_info->cache_times = hos_info->handle->cache_times; } - while (flag == 1) + if (ret == HOS_CLIENT_OK) { - return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id); + handle->rx_bytes += handle->cache_size; + handle->rx_pkts++; + while (flag == 1) + { + return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id); + } } - return HOS_CLIENT_OK; + + return ret; } int hos_close_fd(size_t fd, size_t thread_id) @@ -425,7 +537,18 @@ int hos_client_destory(hos_client_handle handle) { delete_all(&hash_hos_info[i]); } - //delete(handle->options); + if (handle->fs2_handle) + { + FS_stop(&handle->fs2_handle); + } + if (handle->line_ids) + { + free(handle->line_ids); + } + if (handle->column_ids) + { + free(handle->column_ids); + } free(handle); return HOS_CLIENT_OK; diff --git a/src/hos_client.h b/src/hos_client.h index adacbb90..518d8af8 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -23,6 +23,7 @@ enum hoserrors HOS_FILE_NOT_EXITS = -2, HOS_HASH_NOT_FIND = -3, HOS_FD_NOT_ENOUGH = -4, + HOS_SEND_FAILED = -5, };