根据partition_num进行多线程分区

This commit is contained in:
yinjiangyi
2021-08-09 15:47:42 +08:00
parent ef1f0fd826
commit 357fc2eabc
7 changed files with 125 additions and 138 deletions

View File

@@ -27,9 +27,10 @@ public class ApplicationConfig {
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname");
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.serverip");
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.attacktype");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.columnname.recvtime");
public static final String DRUID_COLUMNNAME_PARTITION_NUM = ConfigUtils.getStringProperty("druid.columnname.partition.num");
// 周期性相关系数阈值
public static final Float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
@@ -54,16 +55,10 @@ public class ApplicationConfig {
public static final Double BASELINE_KALMAN_P = ConfigUtils.getDoubleProperty("baseline.kalman.p");
public static final Double BASELINE_KALMAN_M = ConfigUtils.getDoubleProperty("baseline.kalman.m");
public static final Integer BASELINE_GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("baseline.generate.batch.size");
public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour");
public static final Integer THREAD_POOL_NUM = ConfigUtils.getIntProperty("thread.pool.num");
public static final Integer PARTITION_NUM_MAX = ConfigUtils.getIntProperty("druid.partition.num.max");
// http config
public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");
public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout");
public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout");
public static final Integer HTTP_MAX_CONNECTION_NUM = ConfigUtils.getIntProperty("http.max.connection.num");
public static final Integer HTTP_MAX_PER_ROUTE = ConfigUtils.getIntProperty("http.max.per.route");
}

View File

@@ -109,7 +109,7 @@ public class DruidData {
}
public static String getDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){
public static String getBatchDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){
long startTime = originBeginTime + currentPart * timeGrad;
long endTime = originBeginTime + (currentPart+1) * timeGrad;
attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
@@ -129,6 +129,26 @@ public class DruidData {
+ " AND " + timeFilter;
}
public static String getBatchDruidQuerySql(List<String> attackTypeList, int currentPart, int partitionNumGrad){
int startPartitionNum = currentPart * partitionNumGrad;
int endPartitionNum = (currentPart + 1) * partitionNumGrad;
attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")";
String partitionFilter = ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM
+ " >= " + startPartitionNum
+ " AND " + ApplicationConfig.DRUID_COLUMNNAME_PARTITION_NUM
+ " < " + endPartitionNum;
return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ " IN " + attackList
+ " AND " + partitionFilter;
}
/**
* 描述分割Map
* @param map 原始数据

View File

@@ -9,6 +9,6 @@ import cn.mesalab.service.BaselineGeneration;
*/
public class BaselineApplication {
public static void main(String[] args) {
new BaselineGeneration().perform();
BaselineGeneration.perform();
}
}

View File

@@ -23,7 +23,6 @@ import java.util.concurrent.*;
public class BaselineGeneration {
private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
private static final List<String> ATTACK_TYPE_LIST = Arrays.asList(
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD
// ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
@@ -34,18 +33,18 @@ public class BaselineGeneration {
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
private static final Map<String, List<Map<String, Object>>> allFromDruid = new HashMap<>();
private static final int threadPoolNum = ApplicationConfig.THREAD_POOL_NUM;
// 每个线程读取数据所覆盖的partition_num个数
private static final int batchPartitionRange = (int) Math.ceil(ApplicationConfig.PARTITION_NUM_MAX /(double)threadPoolNum);
/**
* 程序执行
*/
public void perform() {
public static void perform() {
long start = System.currentTimeMillis();
try{
loadFromDruid();
baselineGenration();
hbaseTable.close();
baselineGeneration();
} catch (Exception e){
e.printStackTrace();
} finally {
@@ -55,92 +54,32 @@ public class BaselineGeneration {
System.exit(0);
}
/**
* Druid数据读取
* @throws InterruptedException
*/
private void loadFromDruid() throws InterruptedException {
LOG.info("开始读取数据");
long start = System.currentTimeMillis();
long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
int loadDataThreadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad);
ArrayList<Future<Map<String, List<Map<String, Object>>>>> resultList = new ArrayList<>();
CountDownLatch loadDataCountDownLatch = new CountDownLatch(loadDataThreadPoolNum);
ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-load-data-%d").build();
ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor(
loadDataThreadPoolNum, loadDataThreadPoolNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// 按ip数分区
for (int i = 0; i < loadDataThreadPoolNum; i++) {
String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql,
loadDataCountDownLatch
);
Future<Map<String, List<Map<String, Object>>>> future = loadDataExecutor.submit(readHistoricalDruidData);
resultList.add(future);
}
loadDataExecutor.shutdown();
loadDataCountDownLatch.await();
// 返回结果合并
for(Future<Map<String, List<Map<String, Object>>>> future: resultList){
try {
Map<String, List<Map<String, Object>>> queryBatchIpData = future.get();
if(queryBatchIpData !=null){
queryBatchIpData.forEach((ip, data)->
allFromDruid.merge(ip, data, ListUtils::union));
}else{
LOG.error("future.get()未获取到结果");
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
LOG.info("本次共查询到服务端ip个数" +allFromDruid.size());
LOG.info("查询范围: " + START_END_TIMES._1 + " - " + START_END_TIMES._2);
long last = System.currentTimeMillis();
LOG.info("Druid 加载数据共耗时:"+(last-start));
}
/**
* Baseline生成及写入
* @throws InterruptedException
*/
private static void baselineGenration() throws InterruptedException {
List<Map<String, List<Map<String, Object>>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
int generationThreadPoolNum = batchDruidDataLists.size();
CountDownLatch generateCountDownLatch = new CountDownLatch(generationThreadPoolNum);
private static void baselineGeneration() throws InterruptedException {
CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum);
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-generate-%d").build();
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
generationThreadPoolNum, generationThreadPoolNum, 0L,
threadPoolNum, threadPoolNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
for (Map<String, List<Map<String, Object>>>batchDruidData: batchDruidDataLists){
if(batchDruidData.size()>0){
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
hbaseTable,
ATTACK_TYPE_LIST,
BASELINE_POINT_NUM,
batchDruidData,
generateCountDownLatch
);
generationExecutor.execute(baselineSingleThread);
}
for(int threadCount = 0; threadCount<threadPoolNum; threadCount++){
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
ATTACK_TYPE_LIST,
BASELINE_POINT_NUM,
batchPartitionRange,
threadCount,
generateCountDownLatch
);
generationExecutor.execute(baselineSingleThread);
}
generationExecutor.shutdown();
generateCountDownLatch.await();
}
public static void main(String[] args) {
perform();
}
}

View File

@@ -1,9 +1,14 @@
package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
import cn.mesalab.service.algorithm.KalmanFilter;
import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.SeriesUtils;
import org.apache.calcite.avatica.AvaticaClientRuntimeException;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
@@ -11,10 +16,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
@@ -26,28 +28,41 @@ import java.util.stream.Collectors;
public class BaselineSingleThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class);
private final HbaseUtils hbaseUtils;
private final Table hbaseTable;
private final List<String> attackTypeList;
private final Integer baselinePointNum;
private final Map<String,List<Map<String, Object>>> batchDruidData;
private final int batchPartitionRange;
private final int currentBatch;
private final CountDownLatch countDownLatch;
public BaselineSingleThread(
Table hbaseTable,
List<String> attackTypeList,
Integer baselinePointNum,
Map<String,List<Map<String, Object>>> batchDruidData,
int baselinePointNum,
int batchPartitionRange,
int currentBatch,
CountDownLatch countDownLatch
){
this.hbaseTable = hbaseTable;
hbaseUtils = HbaseUtils.getInstance();
this.hbaseTable = hbaseUtils.getHbaseTable();
this.attackTypeList = attackTypeList;
this.baselinePointNum = baselinePointNum;
this.batchDruidData = batchDruidData;
this.batchPartitionRange = batchPartitionRange;
this.currentBatch = currentBatch;
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
long start = System.currentTimeMillis();
// 数据读取
LOG.info("开始数据读取");
Map<String, List<Map<String, Object>>> batchDruidData = getBatchDruidData();
LOG.info("完成数据读取获取Server IP" + batchDruidData.size() +
" 运行时间:" + (System.currentTimeMillis()- start));
// 基线生成
List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){
for(String ip: batchDruidData.keySet()){
@@ -57,7 +72,7 @@ public class BaselineSingleThread extends Thread {
// baseline生成
int[] ipBaseline = generateSingleIpBaseline(ip, ipDruidData);
if (ipBaseline!= null){
HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
}
}
}
@@ -66,11 +81,27 @@ public class BaselineSingleThread extends Thread {
} catch (IOException e) {
e.printStackTrace();
} finally {
hbaseUtils.close();
countDownLatch.countDown();
LOG.info("成功写入Baseline条数共计 " + putList.size() + " 剩余线程数量:" + countDownLatch.getCount());
}
}
private Map<String, List<Map<String, Object>>> getBatchDruidData() {
Map<String, List<Map<String, Object>>> readFromDruid = new HashMap<>();
try {
AvaticaConnection connection = DruidUtils.getConn();
AvaticaStatement stat = connection.createStatement();
String sql = DruidData.getBatchDruidQuerySql(attackTypeList, currentBatch, batchPartitionRange);
readFromDruid = DruidData.readFromDruid(sql, stat);
connection.close();
stat.close();
} catch (Exception e){
e.printStackTrace();
}
return readFromDruid;
}
/**
* 单ip baseline生成逻辑
* @return baseline序列长度为 60/HISTORICAL_GRAD*24

View File

@@ -1 +1 @@
package cn.mesalab.utils;
package cn.mesalab.utils;