diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java index d540700..ffbfafb 100644 --- a/src/main/java/cn/mesalab/config/ApplicationConfig.java +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -65,5 +65,7 @@ public class ApplicationConfig { public static final Integer DRUID_CONNECTION_RETRY_TIME_MAX = ConfigUtils.getIntProperty("druid.connection.retry.time.max"); public static final Integer DRUID_CONNECTION_RETRY_SLEEP_TIME = ConfigUtils.getIntProperty("druid.connection.retry.sleep.time"); + public static final Integer MONITOR_FREQUENCY_BIN_NUM = ConfigUtils.getIntProperty("monitor.frequency.bin.num"); + } diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java index 6e0d396..62bec71 100644 --- a/src/main/java/cn/mesalab/dao/DruidData.java +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -109,27 +109,9 @@ public class DruidData { } - public static String getBatchDruidQuerySql(List attackTypeList, Long originBeginTime, int currentPart, long timeGrad){ - long startTime = originBeginTime + currentPart * timeGrad; - long endTime = originBeginTime + (currentPart+1) * timeGrad; - attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); - String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")"; - String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " >= MILLIS_TO_TIMESTAMP(" + startTime - + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; - - return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME - + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE - + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME - + " FROM " + ApplicationConfig.DRUID_TABLE - + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + " IN " + attackList - + " AND " + timeFilter; - } - public static String getBatchDruidQuerySql(List attackTypeList, int currentPart, int partitionNumGrad){ + long startTime = getTimeLimit()._1; + long endTime = getTimeLimit()._2; int startPartitionNum = currentPart * partitionNumGrad; int endPartitionNum = (currentPart + 1) * partitionNumGrad; attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); @@ -138,15 +120,23 @@ public class DruidData { + " >= " + startPartitionNum + " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM + " < " + endPartitionNum; + String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " >= MILLIS_TO_TIMESTAMP(" + startTime + + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME - + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + + ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + " FROM " + ApplicationConfig.DRUID_TABLE + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " IN " + attackList - + " AND " + partitionFilter; + + " AND " + timeFilter + + " AND " + partitionFilter + + " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")"; } /** diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java index da90f74..58a14e3 100644 --- a/src/main/java/cn/mesalab/service/BaselineGeneration.java +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -49,7 +49,7 @@ public class BaselineGeneration { e.printStackTrace(); } finally { long last = System.currentTimeMillis(); - LOG.warn("运行时间:" + (last - start)); + LOG.info("运行时间:" + (last - start)); } System.exit(0); } diff --git a/src/main/java/cn/mesalab/service/BaselineSingleThread.java b/src/main/java/cn/mesalab/service/BaselineSingleThread.java index 329bcde..a801a88 100644 --- a/src/main/java/cn/mesalab/service/BaselineSingleThread.java +++ b/src/main/java/cn/mesalab/service/BaselineSingleThread.java @@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.SeriesUtils; -import io.vavr.Tuple2; import io.vavr.Tuple3; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaStatement; @@ -38,6 +37,9 @@ public class BaselineSingleThread extends Thread { private final int currentBatch; private final CountDownLatch countDownLatch; + private final ArrayList frequencyBinCounter = new ArrayList<>(Collections.nCopies(ApplicationConfig.MONITOR_FREQUENCY_BIN_NUM, 0)); + private final ArrayList generateTypeCounter = new ArrayList<>(Collections.nCopies(3, 0)); + public BaselineSingleThread( List attackTypeList, int baselinePointNum, @@ -105,12 +107,14 @@ public class BaselineSingleThread extends Thread { } try { hbaseTable.put(putList); + LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter); + LOG.info("MONITOR-生成类别统计:" + generateTypeCounter); } catch (IOException e) { e.printStackTrace(); } finally { hbaseUtils.close(); countDownLatch.countDown(); - LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount()); + LOG.info("本线程更新字段数:" + putList.size() + " 剩余线程数量:" + countDownLatch.getCount()); } } @@ -164,6 +168,7 @@ public class BaselineSingleThread extends Thread { // 判断ip出现频率 float ipFrequency = ipDruidData.size() / (float) completSeries.size(); + updateLogFrequencyCounter(ipFrequency); if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ // 异常值剔除 baselineGenerationType = 1; @@ -190,6 +195,7 @@ public class BaselineSingleThread extends Thread { // System.out.println("type-03:" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr)); } } + updateLogGenerateTypeCounter(baselineGenerationType); return new Tuple3<>(baselineArr, baselineGenerationType, ipPercentile); } @@ -212,4 +218,18 @@ public class BaselineSingleThread extends Thread { } return result; } + + private void updateLogFrequencyCounter(float frequency){ + int binIndex = (int) Math.floor(frequency * ApplicationConfig.MONITOR_FREQUENCY_BIN_NUM); + if(frequency==1){ + binIndex = frequencyBinCounter.size()-1; + } + frequencyBinCounter.set(binIndex, frequencyBinCounter.get(binIndex)+1); + } + + private void updateLogGenerateTypeCounter(int generateType){ + int index = generateType-1; + generateTypeCounter.set(index, generateTypeCounter.get(index)+1); + } + } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index cb915b5..3593ae0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -8,7 +8,7 @@ druid.table=traffic_top_destination_ip_metrics_log #HBase配置 hbase.table=ddos_traffic_baselines -hbase.zookeeper.quorum=192.168.44.11 +hbase.zookeeper.quorum=192.168.44.12 hbase.zookeeper.client.port=2181 ########################################## @@ -63,6 +63,8 @@ baseline.kalman.r=0.1 baseline.kalman.p=8 baseline.kalman.m=2 +monitor.frequency.bin.num=100 + ########################################## ################ 并发参数 ################# ##########################################