diff --git a/src/hos_client.cpp b/src/hos_client.cpp index 8398009d..8879a8c6 100644 --- a/src/hos_client.cpp +++ b/src/hos_client.cpp @@ -164,18 +164,17 @@ static void *fs2_statistics(void *ptr) { break; } + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[0], FS_OP_ADD, handle->tx_pkts); + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[1], FS_OP_ADD, handle->tx_bytes); + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[2], FS_OP_ADD, handle->rx_pkts); + FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[3], FS_OP_ADD, handle->rx_bytes); - FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[0], FS_OP_SET, handle->tx_pkts); - FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[1], FS_OP_SET, handle->tx_bytes); - FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[2], FS_OP_SET, handle->rx_pkts); - FS_operate(handle->fs2_handle, handle->line_ids[0], handle->column_ids[3], FS_OP_SET, handle->rx_bytes); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[0], FS_OP_SET, handle->tx_pkts * 10); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[1], FS_OP_SET, handle->tx_bytes * 10); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[2], FS_OP_SET, handle->rx_pkts * 10); + FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[3], FS_OP_SET, handle->rx_bytes * 10); - FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[0], FS_OP_SET, handle->tx_pkts); - FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[1], FS_OP_SET, handle->tx_bytes); - FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[2], FS_OP_SET, handle->rx_pkts); - FS_operate(handle->fs2_handle, handle->line_ids[1], handle->column_ids[3], FS_OP_SET, handle->rx_bytes); - - sleep(1); + usleep(100); } pthread_exit(NULL); } @@ -216,18 +215,18 @@ void hos_expand_fs2(hos_client_handle handle, const char * path, int format, cha FS_set_para(fs2_handle, STATS_SERVER_PORT, &port, sizeof(port)); //line info - snprintf(buff, sizeof(buff), "tx_pkts(MB)"); - line_ids[0] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "tx_bytes(MB)"); - line_ids[1] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "rx_pkts(MB)"); - line_ids[2] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "rx_bytes(MB)"); - line_ids[3] = FS_register(fs2_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "total"); + snprintf(buff, sizeof(buff), "tx_pkts"); column_ids[0] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); - snprintf(buff, sizeof(buff), "per-second"); - column_ids[1] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff); + snprintf(buff, sizeof(buff), "tx_bytes"); + column_ids[1] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "rx_pkts"); + column_ids[2] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "rx_bytes"); + column_ids[3] = FS_register(fs2_handle, FS_STYLE_COLUMN, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "total"); + line_ids[0] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); + snprintf(buff, sizeof(buff), "speed/s"); + line_ids[1] = FS_register(fs2_handle, FS_STYLE_LINE, FS_CALC_CURRENT, buff); handle->fs2_handle = fs2_handle; handle->line_ids = line_ids; @@ -407,8 +406,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id handle = (hos_client_handle)hos_info->handle; //field_stat2 record - handle->tx_pkts++; - handle->tx_bytes += stream_len; + handle->rx_pkts++; + handle->rx_bytes += stream_len; Aws::S3::S3Client& S3Client = *(handle->S3Client); @@ -465,6 +464,8 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id { const std::shared_ptr input_data = Aws::MakeShared("buffer mode"); + Aws::String buffer (stream, stream_len); + *input_data << buffer; request.SetBody(input_data); } } @@ -485,7 +486,7 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id Aws::MakeShared(""); sprintf(buf, "%lu %lu", thread_id, fd); context->SetUUID(buf); - + ret = S3Client.PutObjectAsync(request, PutObjectAsyncFinished, context); //恢复fd 的cache设置 if (hos_info->mode & APPEND_MODE) @@ -494,17 +495,32 @@ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id hos_info->cache_rest = hos_info->handle->cache_size; hos_info->cache_times = hos_info->handle->cache_times; } - if (ret == HOS_CLIENT_OK) + if (ret) { - handle->rx_bytes += handle->cache_size; - handle->rx_pkts++; + handle->tx_pkts++; + if (hos_info->mode & BUFF_MODE) + { + if (hos_info->mode & APPEND_MODE) + { + handle->tx_bytes += handle->cache_size; + }else + { + handle->tx_bytes += stream_len; + } + }else + { + handle->tx_bytes += buffer.st_size; + } while (flag == 1) { return hos_write(fd, &stream[hos_info->cache_rest], rest, thread_id); } + }else + { + return HOS_SEND_FAILED; } - return ret; + return HOS_CLIENT_OK; } int hos_close_fd(size_t fd, size_t thread_id) @@ -537,6 +553,8 @@ int hos_client_destory(hos_client_handle handle) { delete_all(&hash_hos_info[i]); } + handle->fs2_status = HOS_FS2_STOP; + pthread_join(handle->fs2_thread, NULL); if (handle->fs2_handle) { FS_stop(&handle->fs2_handle); diff --git a/src/hos_client.h b/src/hos_client.h index 518d8af8..bf3cde94 100644 --- a/src/hos_client.h +++ b/src/hos_client.h @@ -115,7 +115,7 @@ void set_cache_times(hos_client_handle handle, size_t cache_times); * 参数: hos_client_handle handle 非空句柄 * size_t thread_sum append 模式追加次数 *************************************************************************************/ -void set_cache_times(hos_client_handle handle, size_t thread_sum); +void set_thread_sum(hos_client_handle handle, size_t thread_sum); /************************************************************************************* * 函数名: hos_upload_async * 参数: hos_client_handle handle 非空句柄 @@ -162,6 +162,15 @@ int hos_open_fd(hos_client_handle handle, const char *bucket, const char *object * 返回值 int 成功返回0,失败返回hoserros错误码 *************************************************************************************/ int hos_write(size_t fd, const char *stream, size_t stream_len, size_t thread_id); +/************************************************************************************* + * 函数名: hos_expand_fs2 + * 参数: hos_client_handle handle 非空句柄 + * const char * path log 路径 + * int format 0:default; 1: Json + * char *server_ip 服务IP地址 + * int port 服务端口 +*************************************************************************************/ +void hos_expand_fs2(hos_client_handle handle, const char * path, int format, char *server_ip, int port); /************************************************************************************* * 函数名: hos_close_fd * 参数: size_t fd fd