diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index 27d6dc4..d0f486f 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -4,6 +4,7 @@ import cn.mesalab.config.ApplicationConfig; import cn.mesalab.utils.DruidUtils; import io.vavr.Tuple; import io.vavr.Tuple2; +import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -25,18 +26,46 @@ public class DruidData { private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); - public static Map>> readFromDruid(String sql, AvaticaStatement statement){ - Map>> rsList = null; - try{ - ResultSet resultSet = DruidUtils.executeQuery(statement, sql); - LOG.info("完成数据读取,开始处理数据……"); - rsList = selectAll(resultSet); - } catch (Exception e){ - e.printStackTrace(); + public static ArrayList> loadFromIterator(ResultSet rs) { + ArrayList> result = new ArrayList<>(); + + try { + ResultSetMetaData rmd = rs.getMetaData(); + int columnCount = rmd.getColumnCount(); + + while (rs.next()) { + Map rowData = new HashMap<>(); + for (int i = 1; i <= columnCount; ++i) { + rowData.put(rmd.getColumnName(i), rs.getObject(i)); + } + result.add(rowData); + } + } catch (Exception ex) { + ex.printStackTrace(); } - return rsList; + + return result; } + public static Map>> selectAll(List> result) { + Map>> allIpDataList = new HashMap<>(); + ArrayList ipList = new ArrayList<>(); + + for (Map rowData : result) { + String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); + if (!ipList.contains(ip)) { + ipList.add(ip); + List> ipData = new ArrayList<>(); + allIpDataList.put(ip, ipData); + } + rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); + allIpDataList.get(ip).add(rowData); + } + + return allIpDataList; + } + + /** * 处理Druid读取返回数据为Map>>形式 * 外层map key为ip,内层map的key为ip的一条日志 @@ -72,6 +101,9 @@ public class DruidData { return allIpDataList; } + + + /** * 计算查询时间范围,可指定时间范围(测试)或使用默认配置 * @return 时间范围起始点和终止点 diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index 9829b05..021364d 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -16,6 +16,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.concurrent.CountDownLatch; @@ -66,7 +67,6 @@ public class BaselineSingleThread extends Thread { public void run(){ long start = System.currentTimeMillis(); // 数据读取 - LOG.info("开始数据读取"); Map>> batchDruidData = new HashMap<>(); // Druid retry try { @@ -113,10 +113,11 @@ public class BaselineSingleThread extends Thread { } } try { - hbaseTable.put(putList); LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter); LOG.info("MONITOR-生成类别统计:" + generateTypeCounter); LOG.info("MONITOR-无baseline生成的个数:" + discardBaselineCounter + " 其中包括IP共:" + discardIpList.size()); + hbaseTable.put(putList); + LOG.info("MONITOR-完成hbase写入"); } catch (IOException e) { e.printStackTrace(); } finally { @@ -130,13 +131,21 @@ public class BaselineSingleThread extends Thread { Map>> readFromDruid = new HashMap<>(); try { AvaticaConnection connection = DruidUtils.getConn(); - AvaticaStatement stat = connection.createStatement(); - stat.setQueryTimeout(ApplicationConfig.DRUID_STATEMENT_QUERY_TIMEOUT); + AvaticaStatement statement = connection.createStatement(); + statement.setQueryTimeout(ApplicationConfig.DRUID_STATEMENT_QUERY_TIMEOUT); String sql = DruidData.getBatchDruidQuerySql(attackTypeList, currentBatch, batchPartitionRange); LOG.debug("Read Druid SQL: " + sql); - readFromDruid = DruidData.readFromDruid(sql, stat); + + LOG.info("开始数据读取"); + ResultSet resultSet = DruidUtils.executeQuery(statement, sql); + ArrayList> maps = DruidData.loadFromIterator(resultSet); connection.close(); - stat.close(); + statement.close(); + + LOG.info("开始数据处理"); + readFromDruid = DruidData.selectAll(maps); + LOG.info("完成数据处理"); + } catch (SQLException e){ e.printStackTrace(); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 06d03cc..fd60e8b 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -17,9 +17,9 @@ hbase.zookeeper.client.port=2181 #读取druid时间范围方式, # 0:读取默认范围天数read.historical.days; # 1:指定时间范围 -read.druid.time.limit.type=0 -read.druid.min.time=1630771200000 -read.druid.max.time=1631030400000 +read.druid.time.limit.type=1 +read.druid.min.time=1630080000000 +read.druid.max.time=1630425600000 #Druid字段映射 druid.attacktype.tcpsynflood=TCP SYN Flood @@ -68,7 +68,7 @@ monitor.frequency.bin.num=100 ########################################## ################ 并发参数 ################# ########################################## -thread.pool.num=100 +thread.pool.num=10 #druid分区字段partition_num的最大值为9999 druid.statement.query.timeout=36000 druid.partition.num.max=10000