增加数据分布LOG;修改数据读取sql,解决数据重复问题

This commit is contained in:
yinjiangyi
2021-08-17 19:17:33 +08:00
parent bb3f6299ca
commit 8e79c9da54
5 changed files with 40 additions and 26 deletions

View File

@@ -65,5 +65,7 @@ public class ApplicationConfig {
public static final Integer DRUID_CONNECTION_RETRY_TIME_MAX = ConfigUtils.getIntProperty("druid.connection.retry.time.max"); public static final Integer DRUID_CONNECTION_RETRY_TIME_MAX = ConfigUtils.getIntProperty("druid.connection.retry.time.max");
public static final Integer DRUID_CONNECTION_RETRY_SLEEP_TIME = ConfigUtils.getIntProperty("druid.connection.retry.sleep.time"); public static final Integer DRUID_CONNECTION_RETRY_SLEEP_TIME = ConfigUtils.getIntProperty("druid.connection.retry.sleep.time");
public static final Integer MONITOR_FREQUENCY_BIN_NUM = ConfigUtils.getIntProperty("monitor.frequency.bin.num");
} }

View File

@@ -109,27 +109,9 @@ public class DruidData {
} }
public static String getBatchDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){
long startTime = originBeginTime + currentPart * timeGrad;
long endTime = originBeginTime + (currentPart+1) * timeGrad;
attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")";
String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + startTime
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ " IN " + attackList
+ " AND " + timeFilter;
}
public static String getBatchDruidQuerySql(List<String> attackTypeList, int currentPart, int partitionNumGrad){ public static String getBatchDruidQuerySql(List<String> attackTypeList, int currentPart, int partitionNumGrad){
long startTime = getTimeLimit()._1;
long endTime = getTimeLimit()._2;
int startPartitionNum = currentPart * partitionNumGrad; int startPartitionNum = currentPart * partitionNumGrad;
int endPartitionNum = (currentPart + 1) * partitionNumGrad; int endPartitionNum = (currentPart + 1) * partitionNumGrad;
attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList()); attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
@@ -138,15 +120,23 @@ public class DruidData {
+ " >= " + startPartitionNum + " >= " + startPartitionNum
+ " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM + " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM
+ " < " + endPartitionNum; + " < " + endPartitionNum;
String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + startTime
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + ", AVG("+ ApplicationConfig.BASELINE_METRIC_TYPE + ") as " + ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE + " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ " IN " + attackList + " IN " + attackList
+ " AND " + partitionFilter; + " AND " + timeFilter
+ " AND " + partitionFilter
+ " GROUP BY (" + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ")";
} }
/** /**

View File

@@ -49,7 +49,7 @@ public class BaselineGeneration {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
long last = System.currentTimeMillis(); long last = System.currentTimeMillis();
LOG.warn("运行时间:" + (last - start)); LOG.info("运行时间:" + (last - start));
} }
System.exit(0); System.exit(0);
} }

View File

@@ -7,7 +7,6 @@ import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.RetryUtils; import cn.mesalab.utils.RetryUtils;
import cn.mesalab.utils.SeriesUtils; import cn.mesalab.utils.SeriesUtils;
import io.vavr.Tuple2;
import io.vavr.Tuple3; import io.vavr.Tuple3;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaStatement;
@@ -38,6 +37,9 @@ public class BaselineSingleThread extends Thread {
private final int currentBatch; private final int currentBatch;
private final CountDownLatch countDownLatch; private final CountDownLatch countDownLatch;
private final ArrayList<Integer> frequencyBinCounter = new ArrayList<>(Collections.nCopies(ApplicationConfig.MONITOR_FREQUENCY_BIN_NUM, 0));
private final ArrayList<Integer> generateTypeCounter = new ArrayList<>(Collections.nCopies(3, 0));
public BaselineSingleThread( public BaselineSingleThread(
List<String> attackTypeList, List<String> attackTypeList,
int baselinePointNum, int baselinePointNum,
@@ -105,12 +107,14 @@ public class BaselineSingleThread extends Thread {
} }
try { try {
hbaseTable.put(putList); hbaseTable.put(putList);
LOG.info("MONITOR-IP频率分段统计:" + frequencyBinCounter);
LOG.info("MONITOR-生成类别统计:" + generateTypeCounter);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
hbaseUtils.close(); hbaseUtils.close();
countDownLatch.countDown(); countDownLatch.countDown();
LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount()); LOG.info("本线程更新字段数:" + putList.size() + " 剩余线程数量:" + countDownLatch.getCount());
} }
} }
@@ -164,6 +168,7 @@ public class BaselineSingleThread extends Thread {
// 判断ip出现频率 // 判断ip出现频率
float ipFrequency = ipDruidData.size() / (float) completSeries.size(); float ipFrequency = ipDruidData.size() / (float) completSeries.size();
updateLogFrequencyCounter(ipFrequency);
if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){ if(ipFrequency >ApplicationConfig.BASELINE_HISTORICAL_FREQUENCY_THREAD){
// 异常值剔除 // 异常值剔除
baselineGenerationType = 1; baselineGenerationType = 1;
@@ -190,6 +195,7 @@ public class BaselineSingleThread extends Thread {
// System.out.println("type-03" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr)); // System.out.println("type-03" + ipPercentile + " " + ip + " " + Arrays.toString(baselineArr));
} }
} }
updateLogGenerateTypeCounter(baselineGenerationType);
return new Tuple3<>(baselineArr, baselineGenerationType, ipPercentile); return new Tuple3<>(baselineArr, baselineGenerationType, ipPercentile);
} }
@@ -212,4 +218,18 @@ public class BaselineSingleThread extends Thread {
} }
return result; return result;
} }
private void updateLogFrequencyCounter(float frequency){
int binIndex = (int) Math.floor(frequency * ApplicationConfig.MONITOR_FREQUENCY_BIN_NUM);
if(frequency==1){
binIndex = frequencyBinCounter.size()-1;
}
frequencyBinCounter.set(binIndex, frequencyBinCounter.get(binIndex)+1);
}
private void updateLogGenerateTypeCounter(int generateType){
int index = generateType-1;
generateTypeCounter.set(index, generateTypeCounter.get(index)+1);
}
} }

View File

@@ -8,7 +8,7 @@ druid.table=traffic_top_destination_ip_metrics_log
#HBase配置 #HBase配置
hbase.table=ddos_traffic_baselines hbase.table=ddos_traffic_baselines
hbase.zookeeper.quorum=192.168.44.11 hbase.zookeeper.quorum=192.168.44.12
hbase.zookeeper.client.port=2181 hbase.zookeeper.client.port=2181
########################################## ##########################################
@@ -63,6 +63,8 @@ baseline.kalman.r=0.1
baseline.kalman.p=8 baseline.kalman.p=8
baseline.kalman.m=2 baseline.kalman.m=2
monitor.frequency.bin.num=100
########################################## ##########################################
################ 并发参数 ################# ################ 并发参数 #################
########################################## ##########################################