From 87537258b3568ec89dd0082a9ed6d5c3399bc7fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=AE=A3=E6=AD=A3?= Date: Mon, 26 Apr 2021 18:17:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96shutdown=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/demo/hos_write_demo.cpp | 32 +++--- src/hos_client.cpp | 190 +++++++++++++++++--------------- 2 files changed, 119 insertions(+), 103 deletions(-) diff --git a/example/demo/hos_write_demo.cpp b/example/demo/hos_write_demo.cpp index 9a8ecb1f..c4520614 100644 --- a/example/demo/hos_write_demo.cpp +++ b/example/demo/hos_write_demo.cpp @@ -27,23 +27,23 @@ static size_t calc_time(struct timespec start, struct timespec end) (start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec)); } -int file_to_buffer(const char *file, char *buffer, size_t *len) +int file_to_buffer(const char *file, char *buffer) { FILE *fp = fopen(file, "r"); int num = 0; - *len = 0; + int len = 0; if (fp == NULL) { printf("fopen file failed:%s\n", file); return -1; } do{ - num = fread(&buffer[*len], 1, 4096, fp); + num = fread(&buffer[len], 1, 4096, fp); if (num < 0) { return -1; } - *len += num; + len += num; }while(num == 4096); fclose(fp); return 0; @@ -73,7 +73,7 @@ int main(int argc, char *argv[]) char *buf = NULL; size_t buf_size; int mode = FILE_MODE; - size_t fd[10001] = {0}; + size_t fd = 0; userdata_t data = {&finished}; hos_instance hos_instance = NULL; char object[1024]; @@ -86,7 +86,7 @@ int main(int argc, char *argv[]) buf = (char *)calloc(1, buffer.st_size); - if (file_to_buffer(file_name, buf, &buf_size) == -1) + if (file_to_buffer(file_name, buf) == -1) { free(buf); return -1; @@ -109,38 +109,38 @@ int main(int argc, char *argv[]) mode = FILE_MODE; printf("hos_write file start ...\n"); snprintf(object, 1023, "%s_write_file", file_name); - fd[0] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); - if (hos_write(fd[i], file_name, 0, 0) != HOS_CLIENT_OK) + fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); + if (hos_write(fd, file_name, 0, 0) != HOS_CLIENT_OK) { printf("error: hos_write fialed!\n"); } - hos_close_fd(fd[1], 0); + hos_close_fd(fd, 0); printf("hos_write file end ...\n"); mode = BUFF_MODE; printf("hos_write buff start ...\n"); snprintf(object, 1023, "%s_write_buff", file_name); - fd[1] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); - if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK) + fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); + if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK) { printf("error: hos_write failed!\n"); } - hos_close_fd(fd[1], 0); + hos_close_fd(fd, 0); printf("hos_write buff end ...\n"); mode = BUFF_MODE | APPEND_MODE; printf("hos_write buff start ...\n"); snprintf(object, 1023, "%s_write_APPEND", file_name); - fd[2] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); - if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK) + fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); + if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK) { printf("error: hos_write failed 1st!\n"); } - if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK) + if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK) { printf("error: hos_write failed 2nd!\n"); } - hos_close_fd(fd[2], 0); + hos_close_fd(fd, 0); printf("hos_write buff end ...\n"); printf("hos_shutdown_instance start ...\n"); diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 4d5dbd4d..d290081e 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -415,86 +415,85 @@ static void *fs2_statistics(void *ptr) FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[6], FS_OP_SET, cache_sum); //PoolThread State - *busy = g_hos_handle.executor->GetTaskSize(); - *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy); - pool_history_sum += *busy; - - fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; - for (i = 0; i < 3; i++) + if (hos_conf->pool_thread_size > 0) { - FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]); + *busy = g_hos_handle.executor->GetTaskSize(); + *top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy); + pool_history_sum += *busy; + + fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; + for (i = 0; i < 3; i++) + { + FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]); + } } - + sleep(1); } pthread_exit(NULL); } -static void hos_expand_fs2(const char * path, int format, char *server_ip, int port) +static screen_stat_handle_t hos_init_fs2(char *app_name, int app_name_size) +{ + int value = 0; + screen_stat_handle_t fs2_handle = FS_create_handle(); + hos_config_t *hos_conf = &g_hos_handle.hos_config; + + FS_set_para(fs2_handle, APP_NAME, app_name, app_name_size + 1); + value = 1; //true + FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); + if (hos_conf->fs2_path != NULL) + { + FS_set_para(fs2_handle, OUTPUT_DEVICE, hos_conf->fs2_path, strlen(hos_conf->fs2_path) + 1); + } + value = 2; + FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value)); + value = 1; + FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(fs2_handle, METRIS_FORMAT, &hos_conf->fs2_fmt, sizeof(hos_conf->fs2_fmt)); + FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value)); + value = 4096; + FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); + if (hos_conf->fs2_ip == NULL) + { + FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); + } + else + { + FS_set_para(fs2_handle, STATS_SERVER_IP, hos_conf->fs2_ip, strlen(hos_conf->fs2_ip)); + } + + FS_set_para(fs2_handle, STATS_SERVER_PORT, &hos_conf->fs2_port, sizeof(hos_conf->fs2_port)); + + value = FS_OUTPUT_STATSD; + FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value)); + + return fs2_handle; +} + +static void hos_expand_fs2() { fs2_info_t *fs2_info = NULL; screen_stat_handle_t fs2_handle = NULL; - const char *app_name[] = {"hos-data", "hos-poolthread"}; - int value = 0; hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_func_thread_t *hos_func = &g_hos_handle.hos_func; size_t i = 0; if (hos_func->fs2_info[0].fs2_handle) return; - //fs2 init - for (i = 0; i < FS2_RECORD_EVENTS; i++) - { - hos_func->fs2_info[i].fs2_handle = FS_create_handle(); - fs2_handle = hos_func->fs2_info[i].fs2_handle; - - FS_set_para(fs2_handle, APP_NAME, app_name[i], strlen(app_name[i]) + 1); - value = 1;//true - FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value)); - if (path != NULL) - { - if (FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1) != 0) - { - MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fs2 OUTOUT_DEVICE:%s", path); - return; - } - } - value = 2; - FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value)); - value = 1; - FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value)); - FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format)); - FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value)); - value = 4096; - FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value)); - if (server_ip == NULL) - { - FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1")); - } - else - { - FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip)); - } - - FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); - - value = FS_OUTPUT_STATSD; - FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value)); - } - - //pkts and bytes info - fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE]; - fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle; - fs2_info->line_ids = (int *)calloc(2, sizeof(int)); - fs2_info->column_ids = (int *)calloc(6, sizeof(int)); - //data info /********************************************************************************************************** * rx_pkts rx_bytes tx_pkts tx_bytes tx_failed_p tx_failed_b cache_bytes * current 10 100 1 100 0 0 100 * total 100 1000 10 1000 0 0 100(无实意) ***********************************************************************************************************/ - const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_b", "cache_bytes"}; + fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE]; + hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data")); + fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle; + fs2_info->line_ids = (int *)calloc(2, sizeof(int)); + fs2_info->column_ids = (int *)calloc(7, sizeof(int)); + + const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_p", "tx_failed_b", "cache_bytes"}; for (i = 0; i < sizeof(data_col) / sizeof(const char *); i++) { fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, data_col[i]); @@ -503,10 +502,8 @@ static void hos_expand_fs2(const char * path, int format, char *server_ip, int p fs2_info->line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "total"); hos_func->fs2_status = HOS_FS2_START; - data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); fs2_info->reserved = (void *)data_info; - #if 1 data_info->tx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); @@ -514,34 +511,30 @@ static void hos_expand_fs2(const char * path, int format, char *server_ip, int p data_info->tx_failed_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->tx_failed_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - #else - data_info->tx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->tx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->rx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->rx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - data_info->tx_failed_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); - #endif - //FS_start(hos_func->fs2_info[0].fs2_handle); FS_start(fs2_handle); - //PoolThread state - /******************************************************* - * PoolSize Busy TopBusy AveBusy - * ThreadNum 1000 500 800 650 - ********************************************************/ - fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; - fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle; - fs2_info->line_ids = (int *)calloc(1, sizeof(int)); - fs2_info->column_ids = (int *)calloc(3, sizeof(int)); - - const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"}; - for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++) + if (hos_conf->pool_thread_size > 0) { - fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); - } - fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); + //PoolThread state + /******************************************************* + * PoolSize Busy TopBusy AveBusy + * ThreadNum 1000 500 800 650 + ********************************************************/ + fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE]; + hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle = hos_init_fs2((char *)"hos-poolthread", strlen("hos-poolthread")); + fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle; + fs2_info->line_ids = (int *)calloc(1, sizeof(int)); + fs2_info->column_ids = (int *)calloc(3, sizeof(int)); - FS_start(fs2_handle); + const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"}; + for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++) + { + fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); + } + fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); + + FS_start(fs2_handle); + } pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL); @@ -633,6 +626,11 @@ static bool hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t } } +void hos_init_log() +{ + MESA_handle_runtime_log_creation("./log"); +} + hos_instance hos_get_instance() { if (g_hos_handle.S3Client != NULL) @@ -675,7 +673,6 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0); if (hos_conf->ip && hos_conf->port && strlen(hos_conf->accesskeyid) && strlen(hos_conf->secretkey)) { - MESA_handle_runtime_log_creation("./log"); g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); if (log == NULL) { @@ -703,7 +700,7 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Instance init completed"); if (hos_conf->fs2_ip && hos_conf->fs2_port) { - hos_expand_fs2(hos_conf->fs2_path, hos_conf->fs2_fmt, hos_conf->fs2_ip, hos_conf->fs2_port); + hos_expand_fs2(); } else { @@ -1211,7 +1208,11 @@ int hos_shutdown_instance() for (i = 0; i < FS2_RECORD_EVENTS; i++) { screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle; - FS_stop(fs2_handle); + if (*fs2_handle) + { + FS_stop(fs2_handle); + *fs2_handle = NULL; + } if (hos_func->fs2_info[i].reserved) { if (i == 0) @@ -1244,11 +1245,18 @@ int hos_shutdown_instance() #endif } free(hos_func->fs2_info[i].reserved); + hos_func->fs2_info[i].reserved = NULL; } if (hos_func->fs2_info[i].line_ids) + { free(hos_func->fs2_info[i].line_ids); + hos_func->fs2_info[i].line_ids=NULL; + } if (hos_func->fs2_info[i].column_ids) + { free(hos_func->fs2_info[i].column_ids); + hos_func->fs2_info[i].column_ids=NULL; + } } } @@ -1259,6 +1267,7 @@ int hos_shutdown_instance() if (g_fd_info) { free(g_fd_info); + g_fd_info = NULL; } for (i = 0; i < hos_conf->thread_num; i++) @@ -1269,9 +1278,16 @@ int hos_shutdown_instance() if (g_fd_context) { free(g_fd_context); + g_fd_context = NULL; } Aws::ShutdownAPI(g_options); + MESA_destroy_runtime_log_handle(g_hos_handle.log); + g_hos_handle.log = NULL; + memset(&g_hos_handle, 0 , sizeof(g_hos_handle)); + if (g_hos_instance.hos_url_prefix) + free((void *)g_hos_instance.hos_url_prefix); + memset(&g_hos_instance, 0, sizeof(g_hos_handle)); return HOS_CLIENT_OK; }