修复bug,多线程加载数据,多线程baseline处理

This commit is contained in:
yinjiangyi
2021-08-03 21:44:05 +08:00
parent 44584b1139
commit d562d3db99
6 changed files with 159 additions and 29 deletions

View File

@@ -50,6 +50,8 @@ public class ApplicationConfig {
public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count");
public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size");
public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num");
// http config

View File

@@ -13,10 +13,7 @@ import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
@@ -87,6 +84,14 @@ public class DruidData {
return serverIps;
}
public static List<String> getServerIpList(List<Map<String, Object>> dataFromDruid) {
List<String> serverIps = new ArrayList<>();
List<String> collect = dataFromDruid.stream().map(i -> i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).toString())
.collect(Collectors.toList());
serverIps = collect.stream().distinct().collect(Collectors.toList());
return serverIps;
}
/**
* 从Druid读取目标IP相关数据
* @param ipList ip列表
@@ -114,6 +119,29 @@ public class DruidData {
return rsList;
}
public static List<Map<String, Object>> readFromDruid(String sql, AvaticaStatement statement){
List<Map<String, Object>> rsList = null;
try{
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
ResultSetToListService service = new ResultSetToListServiceImp();
rsList = service.selectAll(resultSet);
} catch (Exception e){
e.printStackTrace();
}
return rsList;
}
public static List<Map<String, Object>> getBatchData(List<Map<String, Object>>allData, List<String> ipList){
ArrayList<Map<String, Object>> rsList = new ArrayList<>();
for(Map<String, Object> record: allData){
if(ipList.contains(record.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME))){
rsList.add(record);
}
}
return rsList;
}
/**
* 从数据库读取结果中筛选指定ip的指定攻击类型的数据
* @param allData 数据库读取结果
@@ -180,4 +208,22 @@ public class DruidData {
exception.printStackTrace();
}
}
public static String getDruidQuerySql(Long originBeginTime, int currentPart, long timeGrad){
long startTime = originBeginTime + currentPart * timeGrad;
long endTime = originBeginTime + (currentPart+1) * timeGrad;
String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + startTime
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + timeFilter; // FOR TEST
return sql;
}
}

View File

@@ -0,0 +1,46 @@
package cn.mesalab.dao;
import org.apache.calcite.avatica.AvaticaStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
/**
* @author yjy
* @version 1.0
* @date 2021/8/3 8:10 下午
*/
public class ReadHistoricalDruidData implements Callable {
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class);
private String sql;
private AvaticaStatement statement;
public ReadHistoricalDruidData(
String sql,
AvaticaStatement statement
){
this.sql = sql;
this.statement = statement;
}
@Override
public ArrayList<Map<String, Object>> call() {
ArrayList<Map<String, Object>> resultData = new ArrayList<>();
try {
long start = System.currentTimeMillis();
resultData.addAll(DruidData.readFromDruid(sql, statement));
long end = System.currentTimeMillis();
LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start));
} catch (Exception e) {
e.printStackTrace();
}
return resultData;
}
}

View File

@@ -2,10 +2,12 @@ package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
import cn.mesalab.dao.ReadHistoricalDruidData;
import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.hadoop.hbase.client.Table;
@@ -47,10 +49,13 @@ public class BaselineGeneration {
private static final Integer BASELINE_POINT_NUM =
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
private static Tuple2<Long, Long> startEndTimes = DruidData.getTimeLimit();
private static String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + DruidData.getTimeLimit()._2
+ " >= MILLIS_TO_TIMESTAMP(" + startEndTimes._2
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + DruidData.getTimeLimit()._1 + ")";
+ " < MILLIS_TO_TIMESTAMP(" + startEndTimes._1 + ")";
private static ArrayList<Map<String, Object>> allFromDruid = new ArrayList<>();
/**
* 程序执行
@@ -80,23 +85,47 @@ public class BaselineGeneration {
* @throws InterruptedException
*/
private void generateBaselinesThread() throws InterruptedException {
int threadNum = Runtime.getRuntime().availableProcessors();
int threadNum = ApplicationConfig.THREAD_MAX_NUM;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-demo-%d").build();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadNum,
threadNum,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
// 数据读取
LOG.info("Druid 开始读取数据");
long start = System.currentTimeMillis();
// allFromDruid = DruidData.readAllFromDruid(druidConn, druidStatement, timeFilter);
ThreadFactory loadDataThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-load-data-%d").build();
ThreadPoolExecutor loadDataExecutor = new ThreadPoolExecutor(
threadNum, threadNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
long timeGrad = (startEndTimes._1 - startEndTimes._2)/threadNum;
for (int i = 0; i < threadNum; i++) {
String sql = DruidData.getDruidQuerySql(startEndTimes._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql,
druidStatement
);
Future<List<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData);
try {
allFromDruid.addAll(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
long last = System.currentTimeMillis();
LOG.info("Druid 加载数据共耗时:"+(last-start));
loadDataExecutor.shutdown();
loadDataExecutor.awaitTermination(10L, TimeUnit.HOURS);
// IP列表获取
ArrayList<String> destinationIps = DruidData.getServerIpList(druidStatement, timeFilter);
// BaseLine生成
// 获取IP列表
List<String> destinationIps = DruidData.getServerIpList(allFromDruid);
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-load-data-%d").build();
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
threadNum, threadNum, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
LOG.info("共查询到服务端ip " +destinationIps.size() + "");
LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
@@ -106,21 +135,22 @@ public class BaselineGeneration {
for (List<String> batchIps: batchIpLists){
if(batchIps.size()>0){
BaselineSingleThread testForInsider = new BaselineSingleThread(
BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
batchIps,
druidConn,
druidStatement,
hbaseTable,
attackTypeList,
BASELINE_POINT_NUM,
timeFilter
timeFilter,
allFromDruid
);
executor.execute(testForInsider);
generationExecutor.execute(baselineSingleThread);
}
}
executor.shutdown();
executor.awaitTermination(10L, TimeUnit.HOURS);
generationExecutor.shutdown();
generationExecutor.awaitTermination(10L, TimeUnit.HOURS);
}
}

View File

@@ -36,6 +36,7 @@ public class BaselineSingleThread extends Thread {
private Integer BASELINE_POINT_NUM;
private String timeFilter;
private List<Map<String, Object>> batchDruidData;
private List<Map<String, Object>> historicalData;
public BaselineSingleThread(
List<String> batchIpList,
@@ -44,7 +45,8 @@ public class BaselineSingleThread extends Thread {
Table hbaseTable,
List<String> attackTypeList,
Integer BASELINE_POINT_NUM,
String timeFilter
String timeFilter,
List<Map<String, Object>> historicalData
){
this.ipList = batchIpList;
this.druidConn = druidConn;
@@ -53,11 +55,13 @@ public class BaselineSingleThread extends Thread {
this.attackTypeList = attackTypeList;
this.BASELINE_POINT_NUM = BASELINE_POINT_NUM;
this.timeFilter = timeFilter;
this.historicalData = historicalData;
}
@Override
public void run(){
batchDruidData = DruidData.readFromDruid(druidConn, druidStatement, ipList, timeFilter);
// batchDruidData = DruidData.readFromDruid(druidConn, druidStatement, ipList, timeFilter);
batchDruidData = DruidData.getBatchData(historicalData, ipList);
List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){

View File

@@ -23,14 +23,16 @@ hbase.zookeeper.client.port=2181
#读取druid时间范围方式0读取默认范围read.druid.time.range天数1指定时间范围
read.druid.time.limit.type=1
#07-01
read.druid.min.time=1625068800000
read.druid.min.time=1625414400000
#06-01
#read.druid.min.time=1622476800000
read.druid.max.time=1625673600000
thread.max.num=5
#读取过去N天数据最小值为3天需要判断周期性
read.historical.days=7
read.historical.days=3
#历史数据汇聚粒度为10分钟
historical.grad=10
#baseline生成方法