优化shutdown函数

This commit is contained in:
彭宣正
2021-04-26 18:17:37 +08:00
parent c5727b4c84
commit 87537258b3
2 changed files with 119 additions and 103 deletions

View File

@@ -27,23 +27,23 @@ static size_t calc_time(struct timespec start, struct timespec end)
(start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec)); (start.tv_sec * 1000 * 1000 * 1000 + start.tv_nsec));
} }
int file_to_buffer(const char *file, char *buffer, size_t *len) int file_to_buffer(const char *file, char *buffer)
{ {
FILE *fp = fopen(file, "r"); FILE *fp = fopen(file, "r");
int num = 0; int num = 0;
*len = 0; int len = 0;
if (fp == NULL) if (fp == NULL)
{ {
printf("fopen file failed:%s\n", file); printf("fopen file failed:%s\n", file);
return -1; return -1;
} }
do{ do{
num = fread(&buffer[*len], 1, 4096, fp); num = fread(&buffer[len], 1, 4096, fp);
if (num < 0) if (num < 0)
{ {
return -1; return -1;
} }
*len += num; len += num;
}while(num == 4096); }while(num == 4096);
fclose(fp); fclose(fp);
return 0; return 0;
@@ -73,7 +73,7 @@ int main(int argc, char *argv[])
char *buf = NULL; char *buf = NULL;
size_t buf_size; size_t buf_size;
int mode = FILE_MODE; int mode = FILE_MODE;
size_t fd[10001] = {0}; size_t fd = 0;
userdata_t data = {&finished}; userdata_t data = {&finished};
hos_instance hos_instance = NULL; hos_instance hos_instance = NULL;
char object[1024]; char object[1024];
@@ -86,7 +86,7 @@ int main(int argc, char *argv[])
buf = (char *)calloc(1, buffer.st_size); buf = (char *)calloc(1, buffer.st_size);
if (file_to_buffer(file_name, buf, &buf_size) == -1) if (file_to_buffer(file_name, buf) == -1)
{ {
free(buf); free(buf);
return -1; return -1;
@@ -109,38 +109,38 @@ int main(int argc, char *argv[])
mode = FILE_MODE; mode = FILE_MODE;
printf("hos_write file start ...\n"); printf("hos_write file start ...\n");
snprintf(object, 1023, "%s_write_file", file_name); snprintf(object, 1023, "%s_write_file", file_name);
fd[0] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
if (hos_write(fd[i], file_name, 0, 0) != HOS_CLIENT_OK) if (hos_write(fd, file_name, 0, 0) != HOS_CLIENT_OK)
{ {
printf("error: hos_write fialed!\n"); printf("error: hos_write fialed!\n");
} }
hos_close_fd(fd[1], 0); hos_close_fd(fd, 0);
printf("hos_write file end ...\n"); printf("hos_write file end ...\n");
mode = BUFF_MODE; mode = BUFF_MODE;
printf("hos_write buff start ...\n"); printf("hos_write buff start ...\n");
snprintf(object, 1023, "%s_write_buff", file_name); snprintf(object, 1023, "%s_write_buff", file_name);
fd[1] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK) if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK)
{ {
printf("error: hos_write failed!\n"); printf("error: hos_write failed!\n");
} }
hos_close_fd(fd[1], 0); hos_close_fd(fd, 0);
printf("hos_write buff end ...\n"); printf("hos_write buff end ...\n");
mode = BUFF_MODE | APPEND_MODE; mode = BUFF_MODE | APPEND_MODE;
printf("hos_write buff start ...\n"); printf("hos_write buff start ...\n");
snprintf(object, 1023, "%s_write_APPEND", file_name); snprintf(object, 1023, "%s_write_APPEND", file_name);
fd[2] = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode); fd = hos_open_fd("hos_test_bucket", object, callback, NULL, 0, mode);
if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK) if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK)
{ {
printf("error: hos_write failed 1st!\n"); printf("error: hos_write failed 1st!\n");
} }
if (hos_write(fd[i], buf, buffer.st_size, 0) != HOS_CLIENT_OK) if (hos_write(fd, buf, buffer.st_size, 0) != HOS_CLIENT_OK)
{ {
printf("error: hos_write failed 2nd!\n"); printf("error: hos_write failed 2nd!\n");
} }
hos_close_fd(fd[2], 0); hos_close_fd(fd, 0);
printf("hos_write buff end ...\n"); printf("hos_write buff end ...\n");
printf("hos_shutdown_instance start ...\n"); printf("hos_shutdown_instance start ...\n");

View File

@@ -415,86 +415,85 @@ static void *fs2_statistics(void *ptr)
FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[6], FS_OP_SET, cache_sum); FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[1], fs2_info->column_ids[6], FS_OP_SET, cache_sum);
//PoolThread State //PoolThread State
*busy = g_hos_handle.executor->GetTaskSize(); if (hos_conf->pool_thread_size > 0)
*top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy);
pool_history_sum += *busy;
fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
for (i = 0; i < 3; i++)
{ {
FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]); *busy = g_hos_handle.executor->GetTaskSize();
*top_busy = (*busy) > (*top_busy) ? (*busy) : (*top_busy);
pool_history_sum += *busy;
fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
for (i = 0; i < 3; i++)
{
FS_operate(fs2_info->fs2_handle, fs2_info->line_ids[0], fs2_info->column_ids[i], FS_OP_SET, PoolThread_state[i]);
}
} }
sleep(1); sleep(1);
} }
pthread_exit(NULL); pthread_exit(NULL);
} }
static void hos_expand_fs2(const char * path, int format, char *server_ip, int port) static screen_stat_handle_t hos_init_fs2(char *app_name, int app_name_size)
{
int value = 0;
screen_stat_handle_t fs2_handle = FS_create_handle();
hos_config_t *hos_conf = &g_hos_handle.hos_config;
FS_set_para(fs2_handle, APP_NAME, app_name, app_name_size + 1);
value = 1; //true
FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value));
if (hos_conf->fs2_path != NULL)
{
FS_set_para(fs2_handle, OUTPUT_DEVICE, hos_conf->fs2_path, strlen(hos_conf->fs2_path) + 1);
}
value = 2;
FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value));
value = 1;
FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(fs2_handle, METRIS_FORMAT, &hos_conf->fs2_fmt, sizeof(hos_conf->fs2_fmt));
FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value));
value = 4096;
FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
if (hos_conf->fs2_ip == NULL)
{
FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1"));
}
else
{
FS_set_para(fs2_handle, STATS_SERVER_IP, hos_conf->fs2_ip, strlen(hos_conf->fs2_ip));
}
FS_set_para(fs2_handle, STATS_SERVER_PORT, &hos_conf->fs2_port, sizeof(hos_conf->fs2_port));
value = FS_OUTPUT_STATSD;
FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value));
return fs2_handle;
}
static void hos_expand_fs2()
{ {
fs2_info_t *fs2_info = NULL; fs2_info_t *fs2_info = NULL;
screen_stat_handle_t fs2_handle = NULL; screen_stat_handle_t fs2_handle = NULL;
const char *app_name[] = {"hos-data", "hos-poolthread"};
int value = 0;
hos_config_t *hos_conf = &g_hos_handle.hos_config; hos_config_t *hos_conf = &g_hos_handle.hos_config;
hos_func_thread_t *hos_func = &g_hos_handle.hos_func; hos_func_thread_t *hos_func = &g_hos_handle.hos_func;
size_t i = 0; size_t i = 0;
if (hos_func->fs2_info[0].fs2_handle) if (hos_func->fs2_info[0].fs2_handle)
return; return;
//fs2 init
for (i = 0; i < FS2_RECORD_EVENTS; i++)
{
hos_func->fs2_info[i].fs2_handle = FS_create_handle();
fs2_handle = hos_func->fs2_info[i].fs2_handle;
FS_set_para(fs2_handle, APP_NAME, app_name[i], strlen(app_name[i]) + 1);
value = 1;//true
FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value));
if (path != NULL)
{
if (FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1) != 0)
{
MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_FATAL, __FUNCTION__, "error: fs2 OUTOUT_DEVICE:%s", path);
return;
}
}
value = 2;
FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value));
value = 1;
FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format));
FS_set_para(fs2_handle, STAT_CYCLE, &value, sizeof(value));
value = 4096;
FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
if (server_ip == NULL)
{
FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1"));
}
else
{
FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip));
}
FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port));
value = FS_OUTPUT_STATSD;
FS_set_para(fs2_handle, STATS_FORMAT, &value, sizeof(value));
}
//pkts and bytes info
fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE];
fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle;
fs2_info->line_ids = (int *)calloc(2, sizeof(int));
fs2_info->column_ids = (int *)calloc(6, sizeof(int));
//data info //data info
/********************************************************************************************************** /**********************************************************************************************************
* rx_pkts rx_bytes tx_pkts tx_bytes tx_failed_p tx_failed_b cache_bytes * rx_pkts rx_bytes tx_pkts tx_bytes tx_failed_p tx_failed_b cache_bytes
* current 10 100 1 100 0 0 100 * current 10 100 1 100 0 0 100
* total 100 1000 10 1000 0 0 100(无实意) * total 100 1000 10 1000 0 0 100(无实意)
***********************************************************************************************************/ ***********************************************************************************************************/
const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_b", "cache_bytes"}; fs2_info = &hos_func->fs2_info[FS2_DATA_FLOW_STATE];
hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle = hos_init_fs2((char *)"hos-data", strlen("hos-data"));
fs2_handle = hos_func->fs2_info[FS2_DATA_FLOW_STATE].fs2_handle;
fs2_info->line_ids = (int *)calloc(2, sizeof(int));
fs2_info->column_ids = (int *)calloc(7, sizeof(int));
const char *data_col[] = {"rx_pkts", "rx_bytes", "tx_pkts", "tx_bytes", "tx_failed_p", "tx_failed_b", "cache_bytes"};
for (i = 0; i < sizeof(data_col) / sizeof(const char *); i++) for (i = 0; i < sizeof(data_col) / sizeof(const char *); i++)
{ {
fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, data_col[i]); fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, data_col[i]);
@@ -503,10 +502,8 @@ static void hos_expand_fs2(const char * path, int format, char *server_ip, int p
fs2_info->line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "total"); fs2_info->line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "total");
hos_func->fs2_status = HOS_FS2_START; hos_func->fs2_status = HOS_FS2_START;
data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t)); data_info_t *data_info = (data_info_t *)calloc(1, sizeof(data_info_t));
fs2_info->reserved = (void *)data_info; fs2_info->reserved = (void *)data_info;
#if 1
data_info->tx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->tx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->tx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->tx_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->rx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->rx_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
@@ -514,34 +511,30 @@ static void hos_expand_fs2(const char * path, int format, char *server_ip, int p
data_info->tx_failed_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->tx_failed_pkts = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->tx_failed_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->tx_failed_bytes = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t)); data_info->cache = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
#else
data_info->tx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->tx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->rx_pkts_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->rx_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
data_info->tx_failed_bytes_last = (size_t *)calloc(hos_conf->thread_num, sizeof(size_t));
#endif
//FS_start(hos_func->fs2_info[0].fs2_handle);
FS_start(fs2_handle); FS_start(fs2_handle);
//PoolThread state if (hos_conf->pool_thread_size > 0)
/*******************************************************
* PoolSize Busy TopBusy AveBusy
* ThreadNum 1000 500 800 650
********************************************************/
fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle;
fs2_info->line_ids = (int *)calloc(1, sizeof(int));
fs2_info->column_ids = (int *)calloc(3, sizeof(int));
const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"};
for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++)
{ {
fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]); //PoolThread state
} /*******************************************************
fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum"); * PoolSize Busy TopBusy AveBusy
* ThreadNum 1000 500 800 650
********************************************************/
fs2_info = &hos_func->fs2_info[FS2_POOL_THREAD_STATE];
hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle = hos_init_fs2((char *)"hos-poolthread", strlen("hos-poolthread"));
fs2_handle = hos_func->fs2_info[FS2_POOL_THREAD_STATE].fs2_handle;
fs2_info->line_ids = (int *)calloc(1, sizeof(int));
fs2_info->column_ids = (int *)calloc(3, sizeof(int));
FS_start(fs2_handle); const char *poolthread_col[3] = {"PoolSize", "Busy", "TopBusy"};
for (i = 0; i < sizeof(poolthread_col) / sizeof(const char *); i++)
{
fs2_info->column_ids[i] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, poolthread_col[i]);
}
fs2_info->line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, "ThreadNum");
FS_start(fs2_handle);
}
pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL); pthread_create(&hos_func->fs2_thread, NULL, fs2_statistics, NULL);
@@ -633,6 +626,11 @@ static bool hos_putobject_sync(Aws::S3::Model::PutObjectRequest& request, size_t
} }
} }
void hos_init_log()
{
MESA_handle_runtime_log_creation("./log");
}
hos_instance hos_get_instance() hos_instance hos_get_instance()
{ {
if (g_hos_handle.S3Client != NULL) if (g_hos_handle.S3Client != NULL)
@@ -675,7 +673,6 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0); MESA_load_profile_uint_def(conf_path, module, "hos_fs2_format", &hos_conf->fs2_fmt, 0);
if (hos_conf->ip && hos_conf->port && strlen(hos_conf->accesskeyid) && strlen(hos_conf->secretkey)) if (hos_conf->ip && hos_conf->port && strlen(hos_conf->accesskeyid) && strlen(hos_conf->secretkey))
{ {
MESA_handle_runtime_log_creation("./log");
g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level); g_hos_handle.log = MESA_create_runtime_log_handle(hos_conf->log_path, hos_conf->log_level);
if (log == NULL) if (log == NULL)
{ {
@@ -703,7 +700,7 @@ hos_instance hos_init_instance(const char *conf_path, const char *module, size_t
MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Instance init completed"); MESA_handle_runtime_log(g_hos_handle.log, RLOG_LV_DEBUG, __FUNCTION__, "Instance init completed");
if (hos_conf->fs2_ip && hos_conf->fs2_port) if (hos_conf->fs2_ip && hos_conf->fs2_port)
{ {
hos_expand_fs2(hos_conf->fs2_path, hos_conf->fs2_fmt, hos_conf->fs2_ip, hos_conf->fs2_port); hos_expand_fs2();
} }
else else
{ {
@@ -1211,7 +1208,11 @@ int hos_shutdown_instance()
for (i = 0; i < FS2_RECORD_EVENTS; i++) for (i = 0; i < FS2_RECORD_EVENTS; i++)
{ {
screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle; screen_stat_handle_t *fs2_handle = &hos_func->fs2_info[i].fs2_handle;
FS_stop(fs2_handle); if (*fs2_handle)
{
FS_stop(fs2_handle);
*fs2_handle = NULL;
}
if (hos_func->fs2_info[i].reserved) if (hos_func->fs2_info[i].reserved)
{ {
if (i == 0) if (i == 0)
@@ -1244,11 +1245,18 @@ int hos_shutdown_instance()
#endif #endif
} }
free(hos_func->fs2_info[i].reserved); free(hos_func->fs2_info[i].reserved);
hos_func->fs2_info[i].reserved = NULL;
} }
if (hos_func->fs2_info[i].line_ids) if (hos_func->fs2_info[i].line_ids)
{
free(hos_func->fs2_info[i].line_ids); free(hos_func->fs2_info[i].line_ids);
hos_func->fs2_info[i].line_ids=NULL;
}
if (hos_func->fs2_info[i].column_ids) if (hos_func->fs2_info[i].column_ids)
{
free(hos_func->fs2_info[i].column_ids); free(hos_func->fs2_info[i].column_ids);
hos_func->fs2_info[i].column_ids=NULL;
}
} }
} }
@@ -1259,6 +1267,7 @@ int hos_shutdown_instance()
if (g_fd_info) if (g_fd_info)
{ {
free(g_fd_info); free(g_fd_info);
g_fd_info = NULL;
} }
for (i = 0; i < hos_conf->thread_num; i++) for (i = 0; i < hos_conf->thread_num; i++)
@@ -1269,9 +1278,16 @@ int hos_shutdown_instance()
if (g_fd_context) if (g_fd_context)
{ {
free(g_fd_context); free(g_fd_context);
g_fd_context = NULL;
} }
Aws::ShutdownAPI(g_options); Aws::ShutdownAPI(g_options);
MESA_destroy_runtime_log_handle(g_hos_handle.log);
g_hos_handle.log = NULL;
memset(&g_hos_handle, 0 , sizeof(g_hos_handle));
if (g_hos_instance.hos_url_prefix)
free((void *)g_hos_instance.hos_url_prefix);
memset(&g_hos_instance, 0, sizeof(g_hos_handle));
return HOS_CLIENT_OK; return HOS_CLIENT_OK;
} }