IP为key组织Druid数据,删除ResultSetToListService及DataUtil淘汰方法

This commit is contained in:
yinjiangyi
2021-08-05 17:15:21 +08:00
parent 307f283134
commit 0d3d2aaded
9 changed files with 126 additions and 296 deletions

View File

@@ -51,8 +51,6 @@ public class ApplicationConfig {
public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour"); public static final Long DRUID_READ_BATCH_TIME_GRAD_HOUR = ConfigUtils.getLongProperty("druid.read.batch.time.grad.hour");
public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num"); public static final Integer THREAD_MAX_NUM = ConfigUtils.getIntProperty("thread.max.num");
// http config // http config
public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout");
public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout"); public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout");

View File

@@ -1,18 +1,16 @@
package cn.mesalab.dao; package cn.mesalab.dao;
import cn.mesalab.config.ApplicationConfig; import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.Impl.ResultSetToListServiceImp;
import cn.mesalab.utils.DruidUtils; import cn.mesalab.utils.DruidUtils;
import io.vavr.Tuple; import io.vavr.Tuple;
import io.vavr.Tuple2; import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement; import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.ResultSetMetaData;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -26,139 +24,51 @@ import java.util.stream.Collectors;
public class DruidData { public class DruidData {
private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
private static DruidData druidData;
private AvaticaConnection connection;
private AvaticaStatement statement;
public static Map<String, List<Map<String, Object>>> readFromDruid(String sql, AvaticaStatement statement){
{ Map<String, List<Map<String, Object>>> rsList = null;
connectionInit(); try{
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
rsList = selectAll(resultSet);
} catch (Exception e){
e.printStackTrace();
}
return rsList;
} }
/** /**
* 连接初始化 * 处理Druid读取返回数据为Map<String, List<Map<String, Object>>>形式
* 外层map key为ip内层map的key为ip的一条日志
* @param rs
* @return
*/ */
private void connectionInit(){ public static Map<String, List<Map<String, Object>>> selectAll(ResultSet rs) {
Map<String, List<Map<String, Object>>> allIpDataList = new HashMap<>();
ArrayList<String> ipList = new ArrayList<>();
try { try {
connection = DruidUtils.getConn(); ResultSetMetaData rmd = rs.getMetaData();
statement = connection.createStatement(); int columnCount = rmd.getColumnCount();
statement.setQueryTimeout(0);
} catch (SQLException exception) { while (rs.next()) {
exception.printStackTrace(); Map<String, Object> rowData = new HashMap<>();
} for (int i = 1; i <= columnCount; ++i) {
} rowData.put(rmd.getColumnName(i), rs.getObject(i));
}
/** String ip = (String) rowData.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
* 获取实例 if(!ipList.contains(ip)){
* @return DruidData实例 ipList.add(ip);
*/ List<Map<String, Object>> ipData = new ArrayList<>();
public static DruidData getInstance() { allIpDataList.put(ip, ipData);
druidData = new DruidData(); }
return druidData; rowData.remove(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
} allIpDataList.get(ip).add(rowData);
/**
* 获取distinct server ip
* @return ArrayList<String> ip列表
*/
public static ArrayList<String> getServerIpList(AvaticaStatement statement, String timeFilter) {
Long startQueryIpLIstTime = System.currentTimeMillis();
ArrayList<String> serverIps = new ArrayList<String>();
String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + timeFilter
+ " LIMIT 200";// FOR TEST
try{
ResultSet resultSet = DruidUtils.executeQuery(statement,sql);
while(resultSet.next()){
String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
serverIps.add(ip);
} }
} catch (Exception e){ } catch (Exception ex) {
e.printStackTrace(); ex.printStackTrace();
} }
Long endQueryIpListTime = System.currentTimeMillis(); return allIpDataList;
LOG.info("性能测试ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime));
return serverIps;
}
public static List<String> getServerIpList(List<Map<String, Object>> dataFromDruid) {
List<String> serverIps = new ArrayList<>();
List<String> collect = dataFromDruid.stream().map(i -> i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).toString())
.collect(Collectors.toList());
serverIps = collect.stream().distinct().collect(Collectors.toList());
return serverIps;
}
/**
* 从Druid读取目标IP相关数据
* @param ipList ip列表
* @return 数据库读取结果
*/
public static List<Map<String, Object>> readFromDruid(AvaticaConnection connection, AvaticaStatement statement, List<String> ipList, String timeFilter){
List<Map<String, Object>> rsList = null;
ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList());
String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")";
String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ " IN " + ipString
+ " AND " + timeFilter;
try{
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
ResultSetToListService service = new ResultSetToListServiceImp();
rsList = service.selectAll(resultSet);
} catch (Exception e){
e.printStackTrace();
}
return rsList;
}
public static List<Map<String, Object>> readFromDruid(String sql, AvaticaStatement statement){
List<Map<String, Object>> rsList = null;
try{
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
ResultSetToListService service = new ResultSetToListServiceImp();
rsList = service.selectAll(resultSet);
} catch (Exception e){
e.printStackTrace();
}
return rsList;
}
public static List<Map<String, Object>> getBatchData(List<Map<String, Object>>allData, List<String> ipList){
ArrayList<Map<String, Object>> rsList = new ArrayList<>();
for(Map<String, Object> record: allData){
if(ipList.contains(record.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME))){
rsList.add(record);
}
}
return rsList;
}
/**
* 从数据库读取结果中筛选指定ip的指定攻击类型的数据
* @param allData 数据库读取结果
* @param ip 指定ip
* @param attackType 指定攻击类型
* @return 筛选结果
*/
public static List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){
List<Map<String, Object>> rsList = new ArrayList<>();
try{
rsList = allData.stream().
filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip))
)&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)))
.collect(Collectors.toList());
} catch (NullPointerException e){
}
return rsList;
} }
/** /**
@@ -180,7 +90,7 @@ public class DruidData {
default: default:
LOG.warn("没有设置Druid数据读取方式"); LOG.warn("没有设置Druid数据读取方式");
} }
return Tuple.of(maxTime, minTime); return Tuple.of(minTime, maxTime);
} }
private static long getCurrentDay(int bias) { private static long getCurrentDay(int bias) {
@@ -198,32 +108,46 @@ public class DruidData {
return getCurrentDay(0); return getCurrentDay(0);
} }
/**
* 关闭当前DruidData
*/
public void closeConn(){
try {
DruidUtils.closeConnection();
} catch (SQLException exception) {
exception.printStackTrace();
}
}
public static String getDruidQuerySql(Long originBeginTime, int currentPart, long timeGrad){ public static String getDruidQuerySql(List<String> attackTypeList, Long originBeginTime, int currentPart, long timeGrad){
long startTime = originBeginTime + currentPart * timeGrad; long startTime = originBeginTime + currentPart * timeGrad;
long endTime = originBeginTime + (currentPart+1) * timeGrad; long endTime = originBeginTime + (currentPart+1) * timeGrad;
attackTypeList = attackTypeList.stream().map(attack -> "'"+attack+"'").collect(Collectors.toList());
String attackList = "(" + StringUtils.join(attackTypeList, ",") + ")";
String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + startTime + " >= MILLIS_TO_TIMESTAMP(" + startTime
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + endTime + ")"; + " < MILLIS_TO_TIMESTAMP(" + endTime + ")";
String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME return "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE + " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + timeFilter; // FOR TEST + " WHERE " + ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
return sql; + " IN " + attackList
+ " AND " + timeFilter;
} }
/**
* 描述分割Map
* @param map 原始数据
* @param pageSize 每个map数量
* @return ListList<Map<K, V>>
*/
public static <K, V> List<Map<K, V>> splitMap(Map<K, V> map, int pageSize){
if(map == null || map.isEmpty()){
return Collections.emptyList();
}
List<Map<K, V>> newList = new ArrayList<>();
int j = 0;
for(K k :map.keySet()){
if(j%pageSize == 0) {
newList.add(new HashMap<>());
}
newList.get(newList.size()-1).put(k, map.get(k));
j++;
}
return newList;
}
} }

View File

@@ -1,44 +0,0 @@
package cn.mesalab.dao.Impl;
import cn.mesalab.dao.ResultSetToListService;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author yjy
* @version 1.0
* @date 2021/7/24 4:29 下午
*/
public class ResultSetToListServiceImp implements ResultSetToListService {
/**
* SELECT 查询记录以List结构返回每一个元素是一条记录
* 每条记录保存在Map<String, Object>里面String类型指字段名字Object对应字段值
*
* @param rs
* @return List<Map<String, Object>>
*/
@Override
public List<Map<String, Object>> selectAll(ResultSet rs) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
try {
ResultSetMetaData rmd = rs.getMetaData();
int columnCount = rmd.getColumnCount();
while (rs.next()) {
Map<String, Object> rowData = new HashMap<String, Object>();
for (int i = 1; i <= columnCount; ++i) {
rowData.put(rmd.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
} catch (Exception ex) {
ex.printStackTrace();
}
return list;
}
}

View File

@@ -6,7 +6,8 @@ import org.apache.calcite.avatica.AvaticaStatement;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@@ -16,11 +17,11 @@ import java.util.concurrent.CountDownLatch;
* @version 1.0 * @version 1.0
* @date 2021/8/3 8:10 下午 * @date 2021/8/3 8:10 下午
*/ */
public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, Object>>> { public class ReadHistoricalDruidData implements Callable<Map<String, List<Map<String, Object>>>> {
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class); private static final Logger LOG = LoggerFactory.getLogger(ReadHistoricalDruidData.class);
private String sql; private final String sql;
private CountDownLatch countDownLatch; private final CountDownLatch countDownLatch;
public ReadHistoricalDruidData( public ReadHistoricalDruidData(
String sql, String sql,
@@ -31,15 +32,14 @@ public class ReadHistoricalDruidData implements Callable<ArrayList<Map<String, O
} }
@Override @Override
public ArrayList<Map<String, Object>> call() { public Map<String, List<Map<String, Object>>> call() {
ArrayList<Map<String, Object>> resultData = new ArrayList<>(); Map<String, List<Map<String, Object>>> resultData = new HashMap<>();
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
AvaticaConnection connection = DruidUtils.getConn(); AvaticaConnection connection = DruidUtils.getConn();
AvaticaStatement stat = connection.createStatement(); AvaticaStatement stat = connection.createStatement();
resultData.addAll(DruidData.readFromDruid(sql, stat)); Map<String, List<Map<String, Object>>> readFromDruid = DruidData.readFromDruid(sql, stat);
resultData.putAll(readFromDruid);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start)); LOG.info(sql + "\n读取" + resultData.size() + "条数据,运行时间:" + (end - start));

View File

@@ -1,24 +0,0 @@
package cn.mesalab.dao;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
/**
* @author yjy
* @version 1.0
* @date 2021/7/24 4:27 下午
*/
public interface ResultSetToListService {
/**
* SELECT * FROM websites
* 查询所有记录以List返回
* list对象的每一个元素都是一条记录
* 每条记录保存在Map<String, Object>里面String类型指字段名字Object对应字段值
*
* @param rs
* @return List<Map < String, Object>>
*/
public List<Map<String, Object>> selectAll(ResultSet rs);
}

View File

@@ -3,18 +3,14 @@ package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig; import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData; import cn.mesalab.dao.DruidData;
import cn.mesalab.dao.ReadHistoricalDruidData; import cn.mesalab.dao.ReadHistoricalDruidData;
import cn.mesalab.utils.DruidUtils;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.vavr.Tuple2; import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection; import org.apache.commons.collections.ListUtils;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
@@ -30,21 +26,16 @@ public class BaselineGeneration {
private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable(); private static final Table hbaseTable = HbaseUtils.getInstance().getHbaseTable();
private static final List<String> ATTACK_TYPE_LIST = Arrays.asList( private static final List<String> ATTACK_TYPE_LIST = Arrays.asList(
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD
ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, // ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, // ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL // ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL
); );
private static final Integer BASELINE_POINT_NUM = private static final Integer BASELINE_POINT_NUM =
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit(); private static final Tuple2<Long, Long> START_END_TIMES = DruidData.getTimeLimit();
private static final String TIME_FILTER = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME private static final Map<String, List<Map<String, Object>>> allFromDruid = new HashMap<>();
+ " >= MILLIS_TO_TIMESTAMP(" + START_END_TIMES._2
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + START_END_TIMES._1 + ")";
private static final ArrayList<Map<String, Object>> allFromDruid = new ArrayList<>();
/** /**
* 程序执行 * 程序执行
@@ -85,25 +76,27 @@ public class BaselineGeneration {
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), loadDataThreadFactory,
new ThreadPoolExecutor.AbortPolicy()); new ThreadPoolExecutor.AbortPolicy());
long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR; long timeGrad = 3600000 * ApplicationConfig.DRUID_READ_BATCH_TIME_GRAD_HOUR;
int threadPoolNum = (int) ((START_END_TIMES._1-START_END_TIMES._2)/timeGrad); int threadPoolNum = (int) ((START_END_TIMES._2-START_END_TIMES._1)/timeGrad);
ArrayList<Future<ArrayList<Map<String, Object>>>> resultList = new ArrayList<>(); ArrayList<Future<Map<String, List<Map<String, Object>>>>> resultList = new ArrayList<>();
CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum); CountDownLatch loadDataCountDownLatch = new CountDownLatch(threadPoolNum);
for (int i = 0; i < threadNum; i++) { for (int i = 0; i < threadPoolNum; i++) {
String sql = DruidData.getDruidQuerySql(START_END_TIMES._1, i, timeGrad); String sql = DruidData.getDruidQuerySql(ATTACK_TYPE_LIST, START_END_TIMES._1, i, timeGrad);
ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData( ReadHistoricalDruidData readHistoricalDruidData = new ReadHistoricalDruidData(
sql, sql,
loadDataCountDownLatch loadDataCountDownLatch
); );
Future<ArrayList<Map<String, Object>>> future = loadDataExecutor.submit(readHistoricalDruidData); Future<Map<String, List<Map<String, Object>>>> future = loadDataExecutor.submit(readHistoricalDruidData);
resultList.add(future); resultList.add(future);
} }
loadDataExecutor.shutdown(); loadDataExecutor.shutdown();
loadDataCountDownLatch.await(); loadDataCountDownLatch.await();
for(Future<ArrayList<Map<String, Object>>> future: resultList){ for(Future<Map<String, List<Map<String, Object>>>> future: resultList){
try { try {
if(future.get()!=null){ Map<String, List<Map<String, Object>>> queryBatchIpData = future.get();
allFromDruid.addAll(future.get()); if(queryBatchIpData !=null){
queryBatchIpData.forEach((ip, data)->
allFromDruid.merge(ip, data, ListUtils::union));
}else{ }else{
LOG.error("future.get()未获取到结果"); LOG.error("future.get()未获取到结果");
} }
@@ -115,8 +108,6 @@ public class BaselineGeneration {
LOG.info("Druid 加载数据共耗时:"+(last-start)); LOG.info("Druid 加载数据共耗时:"+(last-start));
// BaseLine生成 // BaseLine生成
// 获取IP列表
List<String> destinationIps = DruidData.getServerIpList(allFromDruid);
ThreadFactory generationThreadFactory = new ThreadFactoryBuilder() ThreadFactory generationThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-generate-%d").build(); .setNameFormat("baseline-generate-%d").build();
ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor( ThreadPoolExecutor generationExecutor = new ThreadPoolExecutor(
@@ -124,21 +115,19 @@ public class BaselineGeneration {
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), generationThreadFactory,
new ThreadPoolExecutor.AbortPolicy()); new ThreadPoolExecutor.AbortPolicy());
LOG.info("共查询到服务端ip " +destinationIps.size() + ""); LOG.info("共查询到服务端ip " +allFromDruid.size() + "");
LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); LOG.info("Baseline batch 大小: " + ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
// 分批进行IP baseline生成和处理
List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE); List<Map<String, List<Map<String, Object>>>> batchDruidDataLists = DruidData.splitMap(allFromDruid, ApplicationConfig.BASELINE_GENERATE_BATCH_SIZE);
CountDownLatch generateCountDownLatch = new CountDownLatch(threadPoolNum); CountDownLatch generateCountDownLatch = new CountDownLatch(batchDruidDataLists.size());
for (List<String> batchIps: batchIpLists){ for (Map<String, List<Map<String, Object>>>batchDruidData: batchDruidDataLists){
if(batchIps.size()>0){ if(batchDruidData.size()>0){
BaselineSingleThread baselineSingleThread = new BaselineSingleThread( BaselineSingleThread baselineSingleThread = new BaselineSingleThread(
batchIps,
hbaseTable, hbaseTable,
ATTACK_TYPE_LIST, ATTACK_TYPE_LIST,
BASELINE_POINT_NUM, BASELINE_POINT_NUM,
TIME_FILTER, batchDruidData,
allFromDruid,
generateCountDownLatch generateCountDownLatch
); );
generationExecutor.execute(baselineSingleThread); generationExecutor.execute(baselineSingleThread);

View File

@@ -1,12 +1,9 @@
package cn.mesalab.service; package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig; import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
import cn.mesalab.service.algorithm.KalmanFilter; import cn.mesalab.service.algorithm.KalmanFilter;
import cn.mesalab.utils.HbaseUtils; import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.SeriesUtils; import cn.mesalab.utils.SeriesUtils;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.math3.stat.StatUtils; import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@@ -29,43 +26,38 @@ import java.util.stream.Collectors;
public class BaselineSingleThread extends Thread { public class BaselineSingleThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class); private static final Logger LOG = LoggerFactory.getLogger(BaselineSingleThread.class);
private List<String> ipList; private final Table hbaseTable;
private Table hbaseTable; private final List<String> attackTypeList;
private List<String> attackTypeList; private final Integer historicalPointNum;
private Integer BASELINE_POINT_NUM; private final Map<String,List<Map<String, Object>>> batchDruidData;
private String timeFilter; private final CountDownLatch countDownLatch;
private List<Map<String, Object>> batchDruidData;
private List<Map<String, Object>> historicalData;
private CountDownLatch countDownLatch;
public BaselineSingleThread( public BaselineSingleThread(
List<String> batchIpList,
Table hbaseTable, Table hbaseTable,
List<String> attackTypeList, List<String> attackTypeList,
Integer BASELINE_POINT_NUM, Integer baselinePointNum,
String timeFilter, Map<String,List<Map<String, Object>>> batchDruidData,
List<Map<String, Object>> historicalData,
CountDownLatch countDownLatch CountDownLatch countDownLatch
){ ){
this.ipList = batchIpList;
this.hbaseTable = hbaseTable; this.hbaseTable = hbaseTable;
this.attackTypeList = attackTypeList; this.attackTypeList = attackTypeList;
this.BASELINE_POINT_NUM = BASELINE_POINT_NUM; this.historicalPointNum = baselinePointNum;
this.timeFilter = timeFilter; this.batchDruidData = batchDruidData;
this.historicalData = historicalData;
this.countDownLatch = countDownLatch; this.countDownLatch = countDownLatch;
} }
@Override @Override
public void run(){ public void run(){
batchDruidData = DruidData.getBatchData(historicalData, ipList);
List<Put> putList = new ArrayList<>(); List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){ for(String attackType: attackTypeList){
for(String ip: ipList){ for(String ip: batchDruidData.keySet()){
int[] ipBaseline = generateSingleIpBaseline(ip, attackType); // 筛选指定ip指定攻击类型的数据
List<Map<String, Object>> ipDruidData = batchDruidData.get(ip).stream()
.filter(i -> i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)).collect(Collectors.toList());
// baseline生成
int[] ipBaseline = generateSingleIpBaseline(ipDruidData);
if (ipBaseline!= null){ if (ipBaseline!= null){
putList = HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); HbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
} }
} }
} }
@@ -76,46 +68,40 @@ public class BaselineSingleThread extends Thread {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
countDownLatch.countDown(); countDownLatch.countDown();
LOG.info("本线程读取完毕,剩余线程数量:" + countDownLatch.getCount()); LOG.info("本线程处理完毕,剩余线程数量:" + countDownLatch.getCount());
} }
} }
/** /**
* 单ip baseline生成逻辑 * 单ip baseline生成逻辑
* @param ip ip
* @param attackType 攻击类型
* @return baseline序列长度为 60/HISTORICAL_GRAD*24 * @return baseline序列长度为 60/HISTORICAL_GRAD*24
*/ */
private int[] generateSingleIpBaseline(String ip, String attackType){ private int[] generateSingleIpBaseline(List<Map<String, Object>> ipDruidData){
// 查询 if (ipDruidData.size()==0){
List<Map<String, Object>> originSeries = DruidData.getTimeSeriesData(batchDruidData, ip, attackType);
if (originSeries.size()==0){
return null; return null;
} }
// 时间序列缺失值补0 // 时间序列缺失值补0
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries); List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(ipDruidData);
int[] baselineArr = new int[BASELINE_POINT_NUM]; int[] baselineArr = new int[historicalPointNum];
List<Integer>series = completSeries.stream().map( List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
// 判断ip出现频率 // 判断ip出现频率
if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ if(ipDruidData.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){
// 高频率 // 高频率
double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(),
ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE);
Arrays.fill(baselineArr, (int)percentile); Arrays.fill(baselineArr, (int)percentile);
baselineArr = baselineFunction(series); baselineArr = baselineFunction(series);
} else { } else {
// 判断周期性 // 判断周期性
if (SeriesUtils.isPeriod(series)){ if (SeriesUtils.isPeriod(series)){
baselineArr = baselineFunction(series); baselineArr = baselineFunction(series);
} else { } else {
int ipPercentile = SeriesUtils.percentile( int ipPercentile = SeriesUtils.percentile(
originSeries.stream().map(i -> ipDruidData.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
Arrays.fill(baselineArr, ipPercentile); Arrays.fill(baselineArr, ipPercentile);
@@ -135,11 +121,11 @@ public class BaselineSingleThread extends Thread {
switch (ApplicationConfig.BASELINE_FUNCTION){ switch (ApplicationConfig.BASELINE_FUNCTION){
case "KalmanFilter": case "KalmanFilter":
KalmanFilter kalmanFilter = new KalmanFilter(); KalmanFilter kalmanFilter = new KalmanFilter();
kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); kalmanFilter.forcast(timeSeries, historicalPointNum);
result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray();
break; break;
default: default:
result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); result = timeSeries.subList(0, historicalPointNum).stream().mapToInt(Integer::valueOf).toArray();
} }
return result; return result;
} }

View File

@@ -50,9 +50,9 @@ public class SeriesUtils {
* 时序数据补齐 * 时序数据补齐
*/ */
public static List<Map<String, Object>> complementSeries(List<Map<String, Object>> originSeries){ public static List<Map<String, Object>> complementSeries(List<Map<String, Object>> originSeries){
LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(DruidData.getTimeLimit()._1), TimeZone
.getDefault().toZoneId()); .getDefault().toZoneId());
LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(DruidData.getTimeLimit()._2), TimeZone
.getDefault().toZoneId()); .getDefault().toZoneId());
List<String> dateList = completionDate(startTime, endTime); List<String> dateList = completionDate(startTime, endTime);

View File

@@ -26,6 +26,7 @@ read.druid.time.limit.type=1
read.druid.min.time=1625414400000 read.druid.min.time=1625414400000
#06-01 #06-01
#read.druid.min.time=1622476800000 #read.druid.min.time=1622476800000
#07-08
read.druid.max.time=1625673600000 read.druid.max.time=1625673600000
#读取过去N天数据最小值为3天需要判断周期性 #读取过去N天数据最小值为3天需要判断周期性