支持field_stat2
This commit is contained in:
@@ -6,6 +6,8 @@
|
||||
extern "C"
|
||||
{
|
||||
#include<string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
}
|
||||
#include <aws/core/Aws.h>
|
||||
#include <aws/s3/S3Client.h>
|
||||
@@ -16,9 +18,9 @@ extern "C"
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <sys/stat.h>
|
||||
#include "hos_client.h"
|
||||
#include "hos_hash.h"
|
||||
#include "field_stat2.h"
|
||||
|
||||
#define MAX_HOS_CLIENT_THREAD_NUM 255
|
||||
#define MAX_HOS_CLIENT_FD_NUM 65535
|
||||
@@ -32,6 +34,18 @@ typedef struct hos_client_handle_s
|
||||
size_t cache_size;
|
||||
size_t cache_times;
|
||||
size_t thread_sum;
|
||||
/* expand */
|
||||
screen_stat_handle_t fs2_handle;
|
||||
pthread_t fs2_thread;
|
||||
int fs2_status;
|
||||
#define HOS_FS2_START 1
|
||||
#define HOS_FS2_STOP 2
|
||||
int *line_ids;
|
||||
int *column_ids;
|
||||
size_t tx_pkts;
|
||||
size_t tx_bytes;
|
||||
size_t rx_pkts;
|
||||
size_t rx_bytes;
|
||||
}hos_client_handle_t;
|
||||
|
||||
hos_info_t *hash_hos_info[MAX_HOS_CLIENT_THREAD_NUM];
|
||||
@@ -140,6 +154,93 @@ hos_client_handle hos_client_create(const char *endpoint, const char *accesskeyi
|
||||
return handle;
|
||||
}
|
||||
|
||||
static void *fs2_statistics(void *ptr)
|
||||
{
|
||||
hos_client_handle handle = (hos_client_handle)ptr;
|
||||
|
||||
while(1)
|
||||
{
|
||||
if (handle->fs2_status == HOS_FS2_STOP)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[0], FS_OP_SET, handle->tx_pkts);
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[1], FS_OP_SET, handle->tx_bytes);
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[2], FS_OP_SET, handle->rx_pkts);
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[3], FS_OP_SET, handle->rx_bytes);
|
||||
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[0], FS_OP_SET, handle->tx_pkts);
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[1], FS_OP_SET, handle->tx_bytes);
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[2], FS_OP_SET, handle->rx_pkts);
|
||||
FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[3], FS_OP_SET, handle->rx_bytes);
|
||||
|
||||
sleep(1);
|
||||
}
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
|
||||
void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port)
|
||||
{
|
||||
screen_stat_handle_t fs2_handle = NULL;
|
||||
const char *app_name = "hos-sdk-client-cpp";
|
||||
int *line_ids = (int *)malloc(sizeof(int) * 2);
|
||||
int *column_ids = (int *)malloc(sizeof(int) * 4);
|
||||
int value = 0;
|
||||
char buff[128];
|
||||
|
||||
fs2_handle = FS_create_handle();
|
||||
|
||||
FS_set_para(fs2_handle, APP_NAME, app_name, strlen(app_name) + 1);
|
||||
value = 1;//true
|
||||
FS_set_para(fs2_handle, FLUSH_BY_DATE, &value, sizeof(value));
|
||||
if (path != NULL)
|
||||
{
|
||||
FS_set_para(fs2_handle, OUTPUT_DEVICE, path, strlen(path) + 1);
|
||||
}
|
||||
value = 2;//append
|
||||
FS_set_para(fs2_handle, PRINT_MODE, &value, sizeof(value));
|
||||
value = 1;
|
||||
FS_set_para(fs2_handle, CREATE_THREAD, &value, sizeof(value));
|
||||
FS_set_para(fs2_handle, METRIS_FORMAT, &format, sizeof(format));
|
||||
value = 4096;
|
||||
FS_set_para(fs2_handle, MAX_STAT_FIELD_NUM, &value, sizeof(value));
|
||||
if (server_ip == NULL)
|
||||
{
|
||||
FS_set_para(fs2_handle, STATS_SERVER_IP, "127.0.0.1", strlen("127.0.0.1"));
|
||||
}else
|
||||
{
|
||||
FS_set_para(fs2_handle, STATS_SERVER_IP, server_ip, strlen(server_ip));
|
||||
}
|
||||
|
||||
FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port));
|
||||
|
||||
//line info
|
||||
snprintf(buff, sizeof(buff), "tx_pkts(MB)");
|
||||
line_ids[0] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
|
||||
snprintf(buff, sizeof(buff), "tx_bytes(MB)");
|
||||
line_ids[1] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
|
||||
snprintf(buff, sizeof(buff), "rx_pkts(MB)");
|
||||
line_ids[2] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
|
||||
snprintf(buff, sizeof(buff), "rx_bytes(MB)");
|
||||
line_ids[3] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff);
|
||||
snprintf(buff, sizeof(buff), "total");
|
||||
column_ids[0] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff);
|
||||
snprintf(buff, sizeof(buff), "per-second");
|
||||
column_ids[1] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff);
|
||||
|
||||
handle->fs2_handle = fs2_handle;
|
||||
handle->line_ids = line_ids;
|
||||
handle->column_ids = column_ids;
|
||||
handle->fs2_status = HOS_FS2_START;
|
||||
|
||||
FS_start(fs2_handle);
|
||||
|
||||
pthread_create(&handle->fs2_thread, NULL, fs2_statistics, handle);
|
||||
|
||||
return ;
|
||||
}
|
||||
|
||||
bool hos_verify_bucket(hos_client_handle handle, const char *bucket)
|
||||
{
|
||||
Aws::S3::Model::ListBucketsOutcome outcome = handle->S3Client->ListBuckets();
|
||||
@@ -288,6 +389,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
|
||||
char buf[128];
|
||||
int flag = 0; // 0, 一次处理就可以完成;1,需要多次处理才能处理完
|
||||
int rest; // stream 剩余未处理的数据长度
|
||||
int ret = 0;
|
||||
if ((fd == 0) || (stream == NULL) || (thread_id > MAX_HOS_CLIENT_THREAD_NUM))
|
||||
{
|
||||
return HOS_PARAMETER_ERROR;
|
||||
@@ -303,6 +405,11 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
|
||||
}
|
||||
|
||||
handle = (hos_client_handle)hos_info->handle;
|
||||
|
||||
//field_stat2 record
|
||||
handle->tx_pkts++;
|
||||
handle->tx_bytes += stream_len;
|
||||
|
||||
Aws::S3::S3Client& S3Client = *(handle->S3Client);
|
||||
|
||||
// Create and configure the asynchronous put object request.
|
||||
@@ -379,8 +486,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
|
||||
sprintf(buf, "%lu %lu", thread_id, fd);
|
||||
context->SetUUID(buf);
|
||||
|
||||
S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
|
||||
|
||||
ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context);
|
||||
//恢复fd 的cache设置
|
||||
if (hos_info->mode & APPEND_MODE)
|
||||
{
|
||||
@@ -388,11 +494,17 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id
|
||||
hos_info->cache_rest = hos_info->handle->cache_size;
|
||||
hos_info->cache_times = hos_info->handle->cache_times;
|
||||
}
|
||||
if (ret == HOS_CLIENT_OK)
|
||||
{
|
||||
handle->rx_bytes += handle->cache_size;
|
||||
handle->rx_pkts++;
|
||||
while (flag == 1)
|
||||
{
|
||||
return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id);
|
||||
}
|
||||
return HOS_CLIENT_OK;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int hos_close_fd(size_t fd, size_t thread_id)
|
||||
@@ -425,7 +537,18 @@ int hos_client_destory(hos_client_handle handle)
|
||||
{
|
||||
delete_all(&hash_hos_info[i]);
|
||||
}
|
||||
//delete(handle->options);
|
||||
if (handle->fs2_handle)
|
||||
{
|
||||
FS_stop(&handle->fs2_handle);
|
||||
}
|
||||
if (handle->line_ids)
|
||||
{
|
||||
free(handle->line_ids);
|
||||
}
|
||||
if (handle->column_ids)
|
||||
{
|
||||
free(handle->column_ids);
|
||||
}
|
||||
free(handle);
|
||||
|
||||
return HOS_CLIENT_OK;
|
||||
|
||||
@@ -23,6 +23,7 @@ enum hoserrors
|
||||
HOS_FILE_NOT_EXITS = -2,
|
||||
HOS_HASH_NOT_FIND = -3,
|
||||
HOS_FD_NOT_ENOUGH = -4,
|
||||
HOS_SEND_FAILED = -5,
|
||||
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user