From a9bbbc320cfe433ccb566552c2e7f43916eb67bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=AE=A3=E6=AD=A3?= Date: Tue, 1 Dec 2020 16:12:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0fs2=E5=92=8Chos=5Fclient=5Fcr?= =?UTF-8?q?eate=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/demo/hos_upload_complete.cpp | 2 +- example/demo/hos_write_complete.cpp | 2 +- example/performance/HosClientPerformance.cpp | 23 +- src/hos_client.cpp | 489 +++++++++++++------ src/hos_client.h | 11 +- src/hos_hash.cpp | 2 +- src/hos_hash.h | 7 +- 7 files changed, 355 insertions(+), 181 deletions(-) diff --git a/example/demo/hos_upload_complete.cpp b/example/demo/hos_upload_complete.cpp index bb76db52..efe8c47c 100644 --- a/example/demo/hos_upload_complete.cpp +++ b/example/demo/hos_upload_complete.cpp @@ -82,7 +82,7 @@ int main(int argc, char *argv[]) file_to_buffer(object, buf, &buf_size); debuginfo("hos_client_init start ...\n"); - hos_client_handle handle = hos_client_create("http://192.168.44.10:9098/hos/", "default", "default", 3000); + hos_client_handle handle = hos_client_create("192.168.44.10", 9098, "default", "default", 3000); if (handle == NULL) { debuginfo("error:hos_client_handle\n"); diff --git a/example/demo/hos_write_complete.cpp b/example/demo/hos_write_complete.cpp index 8cba751a..490cbdd2 100644 --- a/example/demo/hos_write_complete.cpp +++ b/example/demo/hos_write_complete.cpp @@ -79,7 +79,7 @@ int main(int argc, char *argv[]) file_to_buffer(object, buf, &buf_size); debuginfo("hos_client_init start ...\n"); - hos_client_handle handle = hos_client_create("http://192.168.40.223:9098/hos/", "default", "default", 400); + hos_client_handle handle = hos_client_create("192.168.40.223", 9098, "default", "default", 400); //hos_client_handle handle = hos_client_create("http://192.168.32.10:9098/hos/", "default", "default", 4); if (handle == NULL) { diff --git a/example/performance/HosClientPerformance.cpp b/example/performance/HosClientPerformance.cpp index e4d9f967..8c115cd0 100644 --- a/example/performance/HosClientPerformance.cpp +++ b/example/performance/HosClientPerformance.cpp @@ -14,6 +14,7 @@ extern "C" #include #include #include +#include } #include"../../src/hos_client.h" @@ -25,10 +26,11 @@ extern "C" typedef struct conf_s { #define STRING_SIZE 128 - char endpoint[STRING_SIZE]; + char serverip[INET_ADDRSTRLEN]; char bucket[STRING_SIZE]; char object[STRING_SIZE]; char file[STRING_SIZE]; + size_t port; size_t pool_size; size_t thread_sum; size_t size; @@ -46,10 +48,11 @@ typedef struct thread_info_s static void configuration_init(conf_t *conf) { - strcpy(conf->endpoint, "http://192.168.40.223:9098/hos/"); + strcpy(conf->serverip, "192.168.40.223"); strcpy(conf->bucket, "mybucket"); strcpy(conf->object, "myobject"); strcpy(conf->file, "./file/test.txt"); + conf->port = 9098; conf->pool_size = 4000; conf->append_size = 1024; conf->thread_sum = 1; @@ -456,17 +459,17 @@ int main(int argc, char *argv[]) #endif configuration_init(&conf); //读取命令行配置 - while((ch = getopt(argc, argv, "a:e:b:o:f:p:t:k:s:S:BFAh")) != -1) + while((ch = getopt(argc, argv, "a:b:o:f:p:t:k:s:i:P:S:BFAh")) != -1) { switch(ch) { case 'a': conf.append_size = 1024 * atof(optarg); break; - case 'e': + case 'i': //endpoint buf_size = MIN(STRING_SIZE, strlen(optarg)); - strncpy((char *)conf.endpoint, optarg, buf_size); + strncpy((char *)conf.serverip, optarg, buf_size); break; case 'b': buf_size = MIN(STRING_SIZE, strlen(optarg)); @@ -483,7 +486,7 @@ int main(int argc, char *argv[]) strncpy(conf.file, optarg, buf_size); conf.file[buf_size] = '\0'; break; - case 'p': + case 'P': conf.pool_size = atoi(optarg); conf.pool_size = MIN(4000, conf.pool_size); break; @@ -508,6 +511,9 @@ int main(int argc, char *argv[]) case 'S': conf.slice = atoi(optarg); break; + case 'p': + conf.port = atoi(optarg); + break; case 'h': default: printf("usage: HosClientPerformance \n[-e set endpoint] \n[-b set bucket] \n" @@ -530,7 +536,7 @@ int main(int argc, char *argv[]) return -1; } //创建client - hos_client_handle handle = hos_client_create(conf.endpoint, "default", "default", conf.pool_size); + hos_client_handle handle = hos_client_create(conf.serverip, conf.port, "default", "default", conf.pool_size); if (handle == NULL) { printf("error:hos_client_handle\n"); @@ -540,7 +546,7 @@ int main(int argc, char *argv[]) hos_set_thread_sum(handle, conf.thread_sum); hos_set_cache_size(handle, conf.append_size); - hos_set_cache_times(handle, 0); + hos_set_cache_count(handle, 0); //创建bucket if (hos_create_bucket(handle, conf.bucket)) @@ -560,6 +566,7 @@ int main(int argc, char *argv[]) printf("%-20s%-20s%-20s%-20s%-20s\n", "thread_id", "file_size", "write_time", "upload_time", "total_time"); } hos_expand_fs2(handle, "./log/fs2.log", 0, "127.0.0.1", 8001); + //hos_expand_fs2(handle, NULL, 0, "127.0.0.1", 8001); for ( thread_num = 0; thread_num < conf.thread_sum; thread_num++ ) { thread_info[thread_num].conf = conf; diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 7b4cb180..564bd496 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -25,6 +25,35 @@ extern "C" #define MAX_HOS_CLIENT_THREAD_NUM 255 #define MAX_HOS_CLIENT_FD_NUM 65535 +#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410) +#define atomic_add(x,y) __sync_add_and_fetch((x),(y)) +#define atomic_read(x) __sync_add_and_fetch((x),0) +#else +#define atomic_add(x,y) ((*(x))+=(y)) +#define atomic_read(x) (*(x)) +#endif + +typedef struct data_info_s +{ + int *tx_pkts; + int *tx_bytes; + int *rx_pkts; + int *rx_bytes; + int *tx_pkts_last; + int *tx_bytes_last; + int *rx_pkts_last; + int *rx_bytes_last; +}data_info_t; + + +typedef struct fs2_info_s +{ + screen_stat_handle_t fs2_handle; + int *line_ids; + int *column_ids; + void *reserved; //预留给每个fs2 handle用来存储自定义的数据 +}fs2_info_t; + typedef struct hos_client_handle_s { Aws::S3::S3Client *S3Client; @@ -34,30 +63,21 @@ typedef struct hos_client_handle_s int fd_thread_status; /* options */ size_t cache_size; - size_t cache_times; + size_t cache_count; size_t thread_sum; size_t timeout; /* expand */ - screen_stat_handle_t fs2_handle; + fs2_info_t fs2_info[3]; //0: data info; 1: fd info; 2 cache info pthread_t fs2_thread; int fs2_status; #define HOS_FS2_START 1 #define HOS_FS2_STOP 2 - int *line_ids; - int *column_ids; - int *tx_pkts; - int *tx_bytes; - int *rx_pkts; - int *rx_bytes; - int *tx_pkts_last; - int *tx_bytes_last; - int *rx_pkts_last; - int *rx_bytes_last; }hos_client_handle_t; hos_client_handle hos_handle;//一个进程只允许有一个hos_handle hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM]; -size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM]; +size_t *hos_cache;//记录当前hos缓存了多少数据 +size_t fd_info[MAX_HOS_CLIENT_THREAD_NUM][MAX_HOS_CLIENT_FD_NUM + 1]; //fd 实际从3开始, fd[thread_id][0]记录register的fd,fd[thread_id][1]记录inject的fd Aws::SDKOptions options; static inline size_t get_current_ms() @@ -70,10 +90,16 @@ static inline size_t get_current_ms() static size_t hash_get_min_free_fd(size_t thread_id) { size_t i = 0; - for (i = 1; i < MAX_HOS_CLIENT_FD_NUM; i++) + for (i = 3; i < MAX_HOS_CLIENT_FD_NUM + 1; i++) { if (!fd_info[thread_id][i]) + { + fd_info[thread_id][i] = 1; + fd_info[thread_id][HOS_FD_REGISTER]++; + fd_info[thread_id][HOS_FD_FREE]--; + return i; + } } return 0; } @@ -86,6 +112,8 @@ static int hos_delete_fd(size_t fd, size_t thread_id) } delete_info_by_fd(&hash_hos_info[thread_id], fd); fd_info[thread_id][fd] = 0; + fd_info[thread_id][HOS_FD_FREE]++; + fd_info[thread_id][HOS_FD_INJECT]--; return HOS_CLIENT_OK; } @@ -118,43 +146,44 @@ static void PutObjectAsyncFinished(const Aws::S3::S3Client* S3Client, if (hos_info->mode & APPEND_MODE) { //APPEND MODE 保留fd - hos_info->recive_cnt++; -#if 0 - if (hos_info->fd_status == HOS_FD_INJECT) - { - if (hos_info->recive_cnt == hos_info->position) - hos_delete_fd(fd, thread_id); - } -#endif + atomic_add(&(hos_info->recive_cnt), 1); }else { //完整上传 删除fd - //hos_delete_fd(fd, thread_id); - hos_info->fd_status = HOS_FD_INJECT; + hos_close_fd(fd, thread_id); } } void hos_set_cache_size(hos_client_handle client, size_t cache_size) { client->cache_size = cache_size; + hos_cache = (size_t *)calloc(client->thread_sum, sizeof(size_t)); return ; } -void hos_set_cache_times(hos_client_handle client, size_t cache_times) +void hos_set_cache_count(hos_client_handle client, size_t cache_count) { - client->cache_times = cache_times; + client->cache_count = cache_count; return ; } void hos_set_thread_sum(hos_client_handle client, size_t thread_sum) { client->thread_sum = thread_sum; + for (size_t i = 0; i < thread_sum; i++) + { + fd_info[i][0] = 65533; + } + if (hos_cache) + { + hos_cache = (size_t *)realloc(hos_cache, thread_sum * sizeof(size_t)); + } return ; } -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size) +hos_client_handle hos_client_create(const char *serverip, size_t port, const char *accesskeyid, const char *secretkey, size_t pool_size) { - if (!endpoint || !accesskeyid || !secretkey) + if (!serverip || !accesskeyid || !secretkey) { return NULL; } @@ -171,6 +200,8 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi Aws::Auth::AWSCredentials credentials(accesskeyid, secretkey); //初始化 + char endpoint[128]; + snprintf(endpoint, 128, "http://%s:%lu/hos/", serverip, port); config.endpointOverride = endpoint; config.verifySSL = false; config.enableEndpointDiscovery = true; @@ -188,17 +219,21 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi hos_handle->buckets = outcome.GetResult().GetBuckets(); hos_handle->cache_size = 0; - hos_handle->cache_times = 1; + hos_handle->cache_count = 0; hos_handle->thread_sum = 1; hos_handle->timeout = 1000; + fd_info[0][0] = 65533; + fd_info[0][1] = 0; + fd_info[0][2] = 0; + return hos_handle; } static void *fs2_statistics(void *ptr) { hos_client_handle handle = (hos_client_handle)ptr; - int i = 0; + size_t i = 0; int rx_pkts_sum = 0; int rx_bytes_sum = 0; int tx_pkts_sum = 0; @@ -207,6 +242,7 @@ static void *fs2_statistics(void *ptr) int rx_bytes_sum_interval = 0; int tx_pkts_sum_interval = 0; int tx_bytes_sum_interval = 0; + fs2_info_t *fs2_info = NULL; while(1) { @@ -224,42 +260,60 @@ static void *fs2_statistics(void *ptr) tx_pkts_sum_interval = 0; tx_bytes_sum_interval = 0; - for (i = 0; i < (int)handle->thread_sum; i++) + //pkts and bytes info + fs2_info = &handle->fs2_info[0]; + for (i = 0; i < handle->thread_sum; i++) { - rx_pkts_sum += handle->rx_pkts[i]; - rx_bytes_sum += handle->rx_bytes[i]; - tx_pkts_sum += handle->tx_pkts[i]; - tx_bytes_sum += handle->tx_bytes[i]; - rx_pkts_sum_interval += (handle->rx_pkts[i] - handle->rx_pkts_last[i]); - rx_bytes_sum_interval += (handle->rx_bytes[i] - handle->rx_bytes_last[i]); - tx_pkts_sum_interval += (handle->tx_pkts[i] - handle->tx_pkts_last[i]); - tx_bytes_sum_interval += (handle->tx_bytes[i] - handle->tx_bytes_last[i]); + data_info_t *data_info = (data_info_t *)fs2_info->reserved; + rx_pkts_sum += data_info->rx_pkts[i]; + rx_bytes_sum += data_info->rx_bytes[i]; + tx_pkts_sum += data_info->tx_pkts[i]; + tx_bytes_sum += data_info->tx_bytes[i]; + rx_pkts_sum_interval += (data_info->rx_pkts[i] - data_info->rx_pkts_last[i]); + rx_bytes_sum_interval += (data_info->rx_bytes[i] - data_info->rx_bytes_last[i]); + tx_pkts_sum_interval += (data_info->tx_pkts[i] - data_info->tx_pkts_last[i]); + tx_bytes_sum_interval += (data_info->tx_bytes[i] - data_info->tx_bytes_last[i]); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i], handle->column_ids[0], FS_OP_SET, handle->rx_pkts[i]); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i], handle->column_ids[1], FS_OP_SET, handle->rx_bytes[i]); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i], handle->column_ids[2], FS_OP_SET, handle->tx_pkts[i]); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i], handle->column_ids[3], FS_OP_SET, handle->tx_bytes[i]); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[0], FS_OP_SET, data_info->rx_pkts[i]); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[1], FS_OP_SET, data_info->rx_bytes[i]); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[2], FS_OP_SET, data_info->tx_pkts[i]); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i], fs2_info->column_ids[3], FS_OP_SET, data_info->tx_bytes[i]); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i + 1], handle->column_ids[0], FS_OP_SET, (handle->rx_pkts[i] - handle->rx_pkts_last[i])); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i + 1], handle->column_ids[1], FS_OP_SET, (handle->rx_bytes[i] - handle->rx_bytes_last[i])); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i + 1], handle->column_ids[2], FS_OP_SET, (handle->tx_pkts[i] - handle->tx_pkts_last[i])); - FS_operate(handle->fs2_handle, handle->line_ids[2 * i + 1], handle->column_ids[3], FS_OP_SET, (handle->tx_bytes[i] - handle->tx_bytes_last[i])); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[0], FS_OP_SET, (data_info->rx_pkts[i] - data_info->rx_pkts_last[i])); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[1], FS_OP_SET, (data_info->rx_bytes[i] - data_info->rx_bytes_last[i])); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[2], FS_OP_SET, (data_info->tx_pkts[i] - data_info->tx_pkts_last[i])); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * i + 1], fs2_info->column_ids[3], FS_OP_SET, (data_info->tx_bytes[i] - data_info->tx_bytes_last[i])); - handle->rx_pkts_last[i] = handle->rx_pkts[i]; - handle->rx_bytes_last[i] = handle->rx_bytes[i]; - handle->tx_pkts_last[i] = handle->tx_pkts[i]; - handle->tx_bytes_last[i] = handle->tx_bytes[i]; + data_info->rx_pkts_last[i] = data_info->rx_pkts[i]; + data_info->rx_bytes_last[i] = data_info->rx_bytes[i]; + data_info->tx_pkts_last[i] = data_info->tx_pkts[i]; + data_info->tx_bytes_last[i] = data_info->tx_bytes[i]; } - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum], handle->column_ids[0], FS_OP_SET, rx_pkts_sum); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum], handle->column_ids[1], FS_OP_SET, rx_bytes_sum); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum], handle->column_ids[2], FS_OP_SET, tx_pkts_sum); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum], handle->column_ids[3], FS_OP_SET, tx_bytes_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_sum); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_sum); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum + 1], handle->column_ids[0], FS_OP_SET, rx_pkts_sum_interval); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum + 1], handle->column_ids[1], FS_OP_SET, rx_bytes_sum_interval); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum + 1], handle->column_ids[2], FS_OP_SET, tx_pkts_sum_interval); - FS_operate(handle->fs2_handle, handle->line_ids[2 * handle->thread_sum + 1], handle->column_ids[3], FS_OP_SET, tx_bytes_sum_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[0], FS_OP_SET, rx_pkts_sum_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[1], FS_OP_SET, rx_bytes_sum_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[2], FS_OP_SET, tx_pkts_sum_interval); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[2 * handle->thread_sum + 1], fs2_info->column_ids[3], FS_OP_SET, tx_bytes_sum_interval); + //fd info + fs2_info = &handle->fs2_info[1]; + for (i = 0; i < handle->thread_sum; i++) + { + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[0], FS_OP_SET, fd_info[i][1]); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[1], FS_OP_SET, fd_info[i][2]); + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[2], FS_OP_SET, fd_info[i][0]); + } + + //cache info + fs2_info = &handle->fs2_info[2]; + for (i = 0; i < handle->thread_sum; i++) + { + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[i], fs2_info->column_ids[0], FS_OP_SET, hos_cache[i]); + } sleep(1); } pthread_exit(NULL); @@ -267,81 +321,127 @@ static void *fs2_statistics(void *ptr) void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port) { - screen_stat_handle_t fs2_handle = NULL; + fs2_info_t *fs2_info = NULL; + screen_stat_handle_t *fs2_handle = NULL; const char *app_name = "hos-sdk-client-cpp"; int value = 0; char buff[128]; int i = 0; - fs2_handle = FS_create_handle(); + //fs2 init + for (i = 0; i < 3; i++) + { + fs2_handle = &handle->fs2_info[i].fs2_handle; + *fs2_handle = FS_create_handle(); - FS_set_para(fs2_handle, APP_NAME, app_name, strlen(app_name) + 1); - value = 1;//true - FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); - if (path != NULL) - { - FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1); - } - value = 2;//append - FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value)); - value = 1; - FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value)); - FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format)); - FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value)); - value = 4096; - FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); - if (server_ip == NULL) - { - FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); - }else - { - FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip)); + FS_set_para(*fs2_handle, APP_NAME, app_name, strlen(app_name) + 1); + value = 1;//true + FS_set_para(*fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); + if (path != NULL) + { + FS_set_para(*fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1); + } + value = 2;//append + FS_set_para(*fs2_handle, PRINT_MODE, &value, sizeof(value)); + value = 1; + FS_set_para(*fs2_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(*fs2_handle, METRIS_FORMAT, &format, sizeof(format)); + FS_set_para(*fs2_handle, STAT_CYCLE, &value, sizeof(value)); + value = 4096; + FS_set_para(*fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); + if (server_ip == NULL) + { + FS_set_para(*fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); + }else + { + FS_set_para(*fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip)); + } + + FS_set_para(*fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); + + value = FS_OUTPUT_STATSD; + FS_set_para(*fs2_handle, STATS_FORMAT, &value, sizeof(value)); } - FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); - value = FS_OUTPUT_STATSD; - FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value)); - - int *line_ids = (int *)calloc(2 * handle->thread_sum + 2, sizeof(int)); - int *column_ids = (int *)calloc(4, sizeof(int)); + //pkts and bytes info + fs2_info = &handle->fs2_info[0]; + fs2_handle = &handle->fs2_info[0].fs2_handle; + fs2_info->line_ids = (int *)calloc(2 * handle->thread_sum + 2, sizeof(int)); + fs2_info->column_ids = (int *)calloc(4, sizeof(int)); //line info snprintf(buff, sizeof(buff), "rx_pkts"); - column_ids[0] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + fs2_info->column_ids[0] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); snprintf(buff, sizeof(buff), "rx_bytes"); - column_ids[1] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + fs2_info->column_ids[1] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); snprintf(buff, sizeof(buff), "tx_pkts"); - column_ids[2] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + fs2_info->column_ids[2] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); snprintf(buff, sizeof(buff), "tx_bytes"); - column_ids[3] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + fs2_info->column_ids[3] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); for (i = 0; i < (int)handle->thread_sum; i++) { snprintf(buff, sizeof(buff), "total(%d)", i); - line_ids[2 * i] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + fs2_info->line_ids[2 * i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); snprintf(buff, sizeof(buff), "rate(%d)", i); - line_ids[2 * i + 1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + fs2_info->line_ids[2 * i + 1] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); } snprintf(buff, sizeof(buff), "total"); - line_ids[2 * handle->thread_sum] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + fs2_info->line_ids[2 * handle->thread_sum] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); snprintf(buff, sizeof(buff), "rate"); - line_ids[2 * handle->thread_sum + 1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + fs2_info->line_ids[2 * handle->thread_sum + 1] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); - handle->fs2_handle = fs2_handle; - handle->line_ids = line_ids; - handle->column_ids = column_ids; handle->fs2_status = HOS_FS2_START; - handle->tx_pkts = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->tx_bytes = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->rx_pkts = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->rx_bytes = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->tx_pkts_last = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->tx_bytes_last = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->rx_pkts_last = (int *)calloc(handle->thread_sum, sizeof(int)); - handle->rx_bytes_last = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); + fs2_info->reserved = (void *)data_info; + data_info->tx_pkts = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->tx_bytes = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->rx_pkts = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->rx_bytes = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->tx_pkts_last = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->tx_bytes_last = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->rx_pkts_last = (int *)calloc(handle->thread_sum, sizeof(int)); + data_info->rx_bytes_last = (int *)calloc(handle->thread_sum, sizeof(int)); + FS_start(*fs2_handle); - FS_start(fs2_handle); + //fd info + fs2_info = &handle->fs2_info[1]; + fs2_handle = &handle->fs2_info[1].fs2_handle; + fs2_info->line_ids = (int *)calloc(handle->thread_sum, sizeof(int)); + fs2_info->column_ids = (int *)calloc(3, sizeof(int)); + + snprintf(buff, sizeof(buff), "REGISTER"); + fs2_info->column_ids[0] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "INJECT"); + fs2_info->column_ids[1] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "FREE"); + fs2_info->column_ids[2] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + + for (i = 0; i < (int)handle->thread_sum; i++) + { + snprintf(buff, sizeof(buff), "num(%d)", i); + fs2_info->line_ids[i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + } + FS_start(*fs2_handle); + + //cache info + fs2_info = &handle->fs2_info[2]; + fs2_handle = &handle->fs2_info[2].fs2_handle; + fs2_info->line_ids = (int *)calloc(handle->thread_sum + 1, sizeof(int)); + fs2_info->column_ids = (int *)calloc(1, sizeof(int)); + + snprintf(buff, sizeof(buff), "cached"); + fs2_info->column_ids[0] = FS_register(*fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + + for (i = 0; i < (int)handle->thread_sum; i++) + { + snprintf(buff, sizeof(buff), "Bytes(%d)", i); + fs2_info->line_ids[i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + } + snprintf(buff, sizeof(buff), "total"); + fs2_info->line_ids[i] = FS_register(*fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + FS_start(*fs2_handle); pthread_create(&handle->fs2_thread, NULL, fs2_statistics, handle); @@ -406,7 +506,9 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const { struct stat buffer; char buf[128]; - size_t fd = hash_get_min_free_fd(thread_id); + size_t stream_len = 0; + data_info_t *data_info = NULL; + int ret; if ((handle == NULL) || (bucket == NULL) || (object == NULL) || (callback == NULL) || (thread_id > handle->thread_sum)) { @@ -426,6 +528,8 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const { return HOS_FILE_NOT_EXITS; } + + stream_len = buffer.st_size; //文件类型 const std::shared_ptr input_data = Aws::MakeShared("SampleAllocationTag", object, std::ios_base::in | std::ios_base::binary); @@ -434,14 +538,26 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const else { //内存块 + stream_len = data_len; const std::shared_ptr input_data = Aws::MakeShared(data); Aws::String stream (data, data_len); *input_data << stream; request.SetBody(input_data); } + //field_stat2 record + if (handle->fs2_info[0].fs2_handle) + { + if (handle->fs2_info[0].reserved) + { + data_info = (data_info_t *)handle->fs2_info[0].reserved; + data_info->rx_pkts[thread_id]++; + data_info->rx_bytes[thread_id] += stream_len; + } + } //设置回调函数 + size_t fd = hash_get_min_free_fd(thread_id); std::shared_ptr context = Aws::MakeShared(""); sprintf(buf, "%lu %lu", thread_id, fd); @@ -449,10 +565,23 @@ static int hos_upload_stream(hos_client_handle handle, const char *bucket, const hos_info_t info = {fd, 0, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, 0, 0, 0 }; add_hos_info(&hash_hos_info[thread_id], &info); - fd_info[thread_id][fd] = 1; - S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); - return HOS_CLIENT_OK; + ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + if (ret) + { + //field_stat2 record + if (handle->fs2_info[0].fs2_handle) + { + if (handle->fs2_info[0].reserved) + { + data_info = (data_info_t *)handle->fs2_info[0].reserved; + data_info->tx_pkts[thread_id]++; + data_info->tx_bytes[thread_id] += stream_len; + } + } + return HOS_CLIENT_OK; + } + return HOS_SEND_FAILED; } int hos_upload_file(hos_client_handle handle, const char *bucket, const char *file_path, @@ -480,17 +609,18 @@ static void *hos_fd_manage(void *ptr) break; for (thread_num = 0; thread_num < thread_sum; thread_num++) { - for(fd = 0; fd < MAX_HOS_CLIENT_FD_NUM; fd++) + for(fd = 3; fd < MAX_HOS_CLIENT_FD_NUM + 1; fd++) { if (!fd_info[thread_num][fd]) - break; + continue; hos_info = find_info_by_fd(hash_hos_info[thread_num], fd); if (!hos_info) - break; - if (hos_info->fd_status == HOS_FD_REGISTER) continue; - if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) - hos_delete_fd(fd, thread_num); + if (hos_info->fd_status == HOS_FD_INJECT) + { + if ((hos_info->position == hos_info->recive_cnt) || (hos_info->overtime <= get_current_ms())) + hos_delete_fd(fd, thread_num); + } } } usleep(1000); @@ -511,9 +641,8 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object return HOS_FD_NOT_ENOUGH; } - hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_times, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; + hos_info_t info = {fd, mode, handle, (char *)bucket, (char *)object, (void *)callback, userdata, NULL, handle->cache_count, handle->cache_size, 0, 0, HOS_FD_REGISTER, 0, handle->timeout,}; add_hos_info(&hash_hos_info[thread_id], &info); - fd_info[thread_id][fd] = 1; #if 1 if (handle->fd_thread == 0) { @@ -534,6 +663,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完 int rest; // stream 剩余未处理的数据长度 int ret = 0; + data_info_t *data_info = NULL; + if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM)) { return HOS_PARAMETER_ERROR; @@ -551,10 +682,14 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id handle = (hos_client_handle)hos_info->handle; //field_stat2 record - if (handle->fs2_handle) + if (handle->fs2_info[0].fs2_handle) { - handle->rx_pkts[thread_id]++; - handle->rx_bytes[thread_id] += stream_len; + if (handle->fs2_info[0].reserved) + { + data_info = (data_info_t *)handle->fs2_info[0].reserved; + data_info->rx_pkts[thread_id]++; + data_info->rx_bytes[thread_id] += stream_len; + } } Aws::S3::S3Client& S3Client = *(handle->S3Client); @@ -573,9 +708,9 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { hos_info->cache = Aws::MakeShared("append mode"); } - if (hos_info->cache_times == 0) + if (hos_info->cache_count == 0) { - //不设置cache_times的情况下 + //不设置cache_count的情况下 if (stream_len < hos_info->cache_rest) { // cache @@ -584,6 +719,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_info->cache_rest -= stream_len; if (hos_info->cache_rest > 0) { + hos_cache[thread_id] += stream_len; return HOS_CLIENT_OK; } }else if (stream_len >= hos_info->cache_rest) @@ -597,7 +733,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id }else { //设置cache times的情况下 - if ((--hos_info->cache_times) && (stream_len <= hos_info->cache_rest)) + if ((--hos_info->cache_count) && (stream_len <= hos_info->cache_rest)) { // cache Aws::String buffer (stream, stream_len); @@ -605,6 +741,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_info->cache_rest -= stream_len; if (hos_info->cache_rest > 0) { + hos_cache[thread_id] += stream_len; return HOS_CLIENT_OK; } }else if (stream_len > hos_info->cache_rest) @@ -616,7 +753,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id rest = stream_len - hos_info->cache_rest; }else { - //over cache_times + //over cache_count Aws::String buffer (stream, stream_len); *hos_info->cache << buffer; } @@ -663,30 +800,32 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id context->SetUUID(buf); ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + + hos_cache[thread_id] = 0; //恢复fd 的cache设置 if (hos_info->mode & APPEND_MODE) { hos_info->cache = NULL; hos_info->cache_rest = hos_info->handle->cache_size; - hos_info->cache_times = hos_info->handle->cache_times; + hos_info->cache_count = hos_info->handle->cache_count; } if (ret) { - if (handle->fs2_handle) + if (data_info) { - handle->tx_pkts[thread_id]++; + data_info->tx_pkts[thread_id]++; if (hos_info->mode & BUFF_MODE) { if (hos_info->mode & APPEND_MODE) { - handle->tx_bytes[thread_id] += handle->cache_size; + data_info->tx_bytes[thread_id] += handle->cache_size; }else { - handle->tx_bytes[thread_id] += stream_len; + data_info->tx_bytes[thread_id] += stream_len; } }else { - handle->tx_bytes[thread_id] += buffer.st_size; + data_info->tx_bytes[thread_id] += buffer.st_size; } } while (flag == 1) @@ -703,10 +842,12 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id int hos_close_fd(size_t fd, size_t thread_id) { - hos_info_t *hos_info; + hos_info_t *hos_info = NULL; char num[128]; char buf[128]; - if (fd == 0) + data_info_t *data_info = NULL; + + if (fd < 3) { return HOS_PARAMETER_ERROR; } @@ -746,12 +887,24 @@ int hos_close_fd(size_t fd, size_t thread_id) context->SetUUID(buf); S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); + + if (hos_info->handle->fs2_info[0].fs2_handle) + { + if (hos_info->handle->fs2_info[0].reserved) + data_info = (data_info_t *)hos_info->handle->fs2_info[0].reserved; + + data_info->tx_pkts[thread_id]++; + data_info->tx_bytes[thread_id] += hos_info->handle->cache_size - hos_info->cache_rest; + } + hos_cache[thread_id] = 0; } } hos_info->fd_status = HOS_FD_INJECT; - hos_info->cache.reset(); hos_info->overtime = get_current_ms() + hos_info->timeout; + fd_info[thread_id][HOS_FD_REGISTER]--; + fd_info[thread_id][HOS_FD_INJECT]++; + return HOS_CLIENT_OK; } @@ -772,35 +925,47 @@ int hos_client_destory(hos_client_handle handle) handle->fd_thread_status = 1; pthread_join(handle->fd_thread, NULL); } - for (i = 0; i < handle->thread_sum; i++) - { - delete_all(&hash_hos_info[i]); - } - if (handle->fs2_handle) + if (handle->fs2_thread) { handle->fs2_status = HOS_FS2_STOP; pthread_join(handle->fs2_thread, NULL); - FS_stop(&handle->fs2_handle); - if (handle->rx_pkts) - free(handle->rx_pkts); - if (handle->rx_bytes) - free(handle->rx_bytes); - if (handle->tx_pkts) - free(handle->tx_pkts); - if (handle->tx_bytes) - free(handle->tx_bytes); - if (handle->rx_pkts_last) - free(handle->rx_pkts_last); - if (handle->rx_bytes_last) - free(handle->rx_bytes_last); - if (handle->tx_pkts_last) - free(handle->tx_pkts_last); - if (handle->tx_bytes_last) - free(handle->tx_bytes_last); - if (handle->line_ids) - free(handle->line_ids); - if (handle->column_ids) - free(handle->column_ids); + for (i = 0; i < 3; i++) + { + screen_stat_handle_t *fs2_handle = &handle->fs2_info[i].fs2_handle; + FS_stop(fs2_handle); + if (handle->fs2_info[i].reserved) + { + if (i == 0) + { + data_info_t * data_info = (data_info_t *)handle->fs2_info[i].reserved; + if (data_info->rx_pkts) + free(data_info->rx_pkts); + if (data_info->rx_bytes) + free(data_info->rx_bytes); + if (data_info->tx_pkts) + free(data_info->tx_pkts); + if (data_info->tx_bytes) + free(data_info->tx_bytes); + if (data_info->rx_pkts_last) + free(data_info->rx_pkts_last); + if (data_info->rx_bytes_last) + free(data_info->rx_bytes_last); + if (data_info->tx_pkts_last) + free(data_info->tx_pkts_last); + if (data_info->tx_bytes_last) + free(data_info->tx_bytes_last); + } + } + if (handle->fs2_info[i].line_ids) + free(handle->fs2_info[i].line_ids); + if (handle->fs2_info[i].column_ids) + free(handle->fs2_info[i].column_ids); + } + } + + for (i = 0; i < handle->thread_sum; i++) + { + delete_all(&hash_hos_info[i]); } free(handle); diff --git a/src/hos_client.h b/src/hos_client.h index c549f372..730df87c 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -77,13 +77,14 @@ typedef void (*put_finished_callback)(bool, const char *, const char *, const ch /************************************************************************************* * 函数名: hos_client_init - * 参数: const char *endpoint 目的地址,如”http://192.168.44.12:9098/hos“ + * 参数: const char *serverip 目的地址,如"192.168.44.12" + * size_t port 端口号 * const char *accesskeyid AWS access key ID,如”default“ * const char *secretkey AWS secret key,如”default“ * size_t thread_sum 线程总数 * 返回值: 成功返回一个非空句柄,失败返回NULL。(失败原因都是因为输入参数不合法) *************************************************************************************/ -hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyid, const char *secretkey, size_t pool_size); +hos_client_handle hos_client_create(const char *serverip, size_t port, const char *accesskeyid, const char *secretkey, size_t pool_size); /************************************************************************************* * 函数名: hos_create_bucket * 参数: hos_client_handle handle 非空句柄 @@ -105,11 +106,11 @@ int hos_create_bucket(hos_client_handle handle, const char *bucket); *************************************************************************************/ void hos_set_cache_size(hos_client_handle handle, size_t cache_size); /************************************************************************************* - * 函数名: hos_set_cache_times + * 函数名: hos_set_cache_count * 参数: hos_client_handle handle 非空句柄 - * size_t cache_times append 模式追加次数 + * size_t cache_count append 模式追加次数 *************************************************************************************/ -void hos_set_cache_times(hos_client_handle handle, size_t cache_times); +void hos_set_cache_count(hos_client_handle handle, size_t cache_count); /************************************************************************************* * 函数名: hos_set_thread_sum * 参数: hos_client_handle handle 非空句柄 diff --git a/src/hos_hash.cpp b/src/hos_hash.cpp index c10e9494..fe6662e9 100644 --- a/src/hos_hash.cpp +++ b/src/hos_hash.cpp @@ -28,7 +28,7 @@ void add_hos_info(hos_info_t **handle, hos_info_t *input) value->callback = input->callback; value->userdata = input->userdata; value->cache = input->cache; - value->cache_times = input->cache_times; + value->cache_count = input->cache_count; value->cache_rest = input->cache_rest; value->position = input->position; value->recive_cnt = input->recive_cnt; diff --git a/src/hos_hash.h b/src/hos_hash.h index 4bdf7b0c..24dac240 100644 --- a/src/hos_hash.h +++ b/src/hos_hash.h @@ -20,13 +20,14 @@ typedef struct hos_info_s void *callback; void *userdata; std::shared_ptr cache; - size_t cache_times; + size_t cache_count; size_t cache_rest; size_t position; size_t recive_cnt; int fd_status; -#define HOS_FD_REGISTER 0 -#define HOS_FD_INJECT 1 +#define HOS_FD_FREE 0 +#define HOS_FD_REGISTER 1 +#define HOS_FD_INJECT 2 size_t overtime; //计算后超时的时间 size_t timeout; //配置的超时时间,从status变成INJECT开始计时 UT_hash_handle hh;