update .gitignore

This commit is contained in:
yinjiangyi
2021-08-03 16:56:37 +08:00
parent a39609557c
commit 03849d5f3f
22 changed files with 0 additions and 22583 deletions

43
.gitignore vendored
View File

@@ -1,43 +0,0 @@
# Created by .ignore support plugin (hsz.mobi)
*.class
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.ear
*.zip
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
builds/
nbbuild/
dist/
nbdist/
.nb-gradle/
log/
logs/

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

89
pom.xml
View File

@@ -1,89 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.mesalab</groupId>
<artifactId>generate-baselines</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>cn.mesalab.main.BaselineApplication</mainClass> <!-- 此处为主入口-->
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>1.0.18</version>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
</dependencies>
</project>

View File

@@ -1,3 +0,0 @@
Manifest-Version: 1.0
Main-Class: cn.mesalab.main.BaselineApplication

View File

@@ -1,53 +0,0 @@
package cn.mesalab.config;
import cn.mesalab.utils.ConfigUtils;
/**
* @author yjy
* @version 1.0
* @date 2021/7/24 10:23 上午
*/
public class ApplicationConfig {
public static final String DRUID_URL= ConfigUtils.getStringProperty("druid.url");
public static final String DRUID_DRIVER = ConfigUtils.getStringProperty("druid.driver");
public static final String DRUID_TABLE = ConfigUtils.getStringProperty("druid.table");
public static final Integer DRUID_TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("read.druid.time.limit.type");
public static final Long READ_DRUID_MAX_TIME = ConfigUtils.getLongProperty("read.druid.max.time");
public static final Long READ_DRUID_MIN_TIME = ConfigUtils.getLongProperty("read.druid.min.time");
public static final Integer READ_HISTORICAL_DAYS = ConfigUtils.getIntProperty("read.historical.days");
public static final Integer HISTORICAL_GRAD = ConfigUtils.getIntProperty("historical.grad");
public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format");
public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type");
public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood");
public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood");
public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood");
public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification");
public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname");
public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname");
public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname");
public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold");
public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold");
public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile");
public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function");
public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days");
public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile");
public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table");
public static final String HBASE_ZOOKEEPER_QUORUM= ConfigUtils.getStringProperty("hbase.zookeeper.quorum");
public static final String HBASE_ZOOKEEPER_CLIENT_PORT= ConfigUtils.getStringProperty("hbase.zookeeper.client.port");
public static final Double BASELINE_KALMAN_Q = ConfigUtils.getDoubleProperty("baseline.kalman.q");
public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r");
public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count");
public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size");
}

View File

@@ -1,187 +0,0 @@
package cn.mesalab.dao;
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.Impl.ResultSetToListServiceImp;
import cn.mesalab.utils.DruidUtils;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author yjy
* @version 1.0
* Druid 数据库操作
* @date 2021/7/23 4:56 下午
*/
public class DruidData {
private static final Logger LOG = LoggerFactory.getLogger(DruidData.class);
private static DruidData druidData;
private AvaticaConnection connection;
private AvaticaStatement statement;
private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2
+ ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")";
{
connectionInit();
}
/**
* 连接初始化
*/
private void connectionInit(){
try {
connection = DruidUtils.getConn();
statement = connection.createStatement();
statement.setQueryTimeout(0);
} catch (SQLException exception) {
exception.printStackTrace();
}
}
/**
* 获取实例
* @return DruidData实例
*/
public static DruidData getInstance() {
druidData = new DruidData();
return druidData;
}
/**
* 获取distinct server ip
* @return ArrayList<String> ip列表
*/
public ArrayList<String> getServerIpList() {
Long startQueryIpLIstTime = System.currentTimeMillis();
ArrayList<String> serverIps = new ArrayList<String>();
String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + timeFilter
+ " LIMIT 10";// FOR TEST
try{
ResultSet resultSet = DruidUtils.executeQuery(statement,sql);
while(resultSet.next()){
String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME);
serverIps.add(ip);
}
} catch (Exception e){
e.printStackTrace();
}
Long endQueryIpListTime = System.currentTimeMillis();
LOG.info("性能测试ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime));
return serverIps;
}
/**
* 从Druid读取目标IP相关数据
* @param ipList ip列表
* @return 数据库读取结果
*/
public List<Map<String, Object>> readFromDruid(List<String> ipList){
List<Map<String, Object>> rsList = null;
ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList());
String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")";
String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME
+ ", "+ ApplicationConfig.BASELINE_METRIC_TYPE
+ ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME
+ " FROM " + ApplicationConfig.DRUID_TABLE
+ " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME
+ " IN " + ipString
+ " AND " + timeFilter;
try{
ResultSet resultSet = DruidUtils.executeQuery(statement, sql);
ResultSetToListService service = new ResultSetToListServiceImp();
rsList = service.selectAll(resultSet);
} catch (Exception e){
e.printStackTrace();
}
return rsList;
}
/**
* 从数据库读取结果中筛选指定ip的指定攻击类型的数据
* @param allData 数据库读取结果
* @param ip 指定ip
* @param attackType 指定攻击类型
* @return 筛选结果
*/
public List<Map<String, Object>> getTimeSeriesData(List<Map<String, Object>> allData, String ip, String attackType){
List<Map<String, Object>> rsList = new ArrayList<>();
try{
rsList = allData.stream().
filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip))
)&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType)))
.collect(Collectors.toList());
} catch (NullPointerException e){
}
return rsList;
}
/**
* 计算查询时间范围,可指定时间范围(测试)或使用默认配置
* @return 时间范围起始点和终止点
*/
public Tuple2<Long, Long> getTimeLimit(){
long maxTime = 0L;
long minTime = 0L;
switch(ApplicationConfig.DRUID_TIME_LIMIT_TYPE){
case 0:
maxTime = getCurrentDay();
minTime = getCurrentDay(-ApplicationConfig.READ_HISTORICAL_DAYS);
break;
case 1:
maxTime = ApplicationConfig.READ_DRUID_MAX_TIME;
minTime = ApplicationConfig.READ_DRUID_MIN_TIME;
break;
default:
LOG.warn("没有设置Druid数据读取方式");
}
return Tuple.of(maxTime, minTime);
}
private long getCurrentDay(int bias) {
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + bias);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
return calendar.getTimeInMillis();
}
private long getCurrentDay(){
return getCurrentDay(0);
}
/**
* 关闭当前DruidData
*/
public void closeConn(){
try {
DruidUtils.closeConnection();
} catch (SQLException exception) {
exception.printStackTrace();
}
}
}

View File

@@ -1,44 +0,0 @@
package cn.mesalab.dao.Impl;
import cn.mesalab.dao.ResultSetToListService;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author yjy
* @version 1.0
* @date 2021/7/24 4:29 下午
*/
public class ResultSetToListServiceImp implements ResultSetToListService {
/**
* SELECT 查询记录以List结构返回每一个元素是一条记录
* 每条记录保存在Map<String, Object>里面String类型指字段名字Object对应字段值
*
* @param rs
* @return List<Map<String, Object>>
*/
@Override
public List<Map<String, Object>> selectAll(ResultSet rs) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
try {
ResultSetMetaData rmd = rs.getMetaData();
int columnCount = rmd.getColumnCount();
while (rs.next()) {
Map<String, Object> rowData = new HashMap<String, Object>();
for (int i = 1; i <= columnCount; ++i) {
rowData.put(rmd.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
} catch (Exception ex) {
ex.printStackTrace();
}
return list;
}
}

View File

@@ -1,24 +0,0 @@
package cn.mesalab.dao;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
/**
* @author yjy
* @version 1.0
* @date 2021/7/24 4:27 下午
*/
public interface ResultSetToListService {
/**
* SELECT * FROM websites
* 查询所有记录以List返回
* list对象的每一个元素都是一条记录
* 每条记录保存在Map<String, Object>里面String类型指字段名字Object对应字段值
*
* @param rs
* @return List<Map < String, Object>>
*/
public List<Map<String, Object>> selectAll(ResultSet rs);
}

View File

@@ -1,15 +0,0 @@
package cn.mesalab.main;
import cn.mesalab.service.BaselineGeneration;
import sun.rmi.runtime.Log;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 5:34 下午
*/
public class BaselineApplication {
public static void main(String[] args) {
BaselineGeneration.perform();
}
}

View File

@@ -1,206 +0,0 @@
package cn.mesalab.service;
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
import cn.mesalab.service.algorithm.KalmanFilter;
import cn.mesalab.utils.HbaseUtils;
import cn.mesalab.utils.SeriesUtils;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author yjy
* @version 1.0
* baseline生成及写入
* @date 2021/7/23 5:38 下午
*/
public class BaselineGeneration {
private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class);
private static DruidData druidData;
private static HbaseUtils hbaseUtils;
private static Table hbaseTable;
private static List<Map<String, Object>> batchDruidData = new ArrayList<>();
private static List<String> attackTypeList = Arrays.asList(
ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD,
ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL
);
private static final Integer BASELINE_POINT_NUM =
ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD);
/**
* 程序执行
*/
public static void perform() {
long start = System.currentTimeMillis();
druidData = DruidData.getInstance();
hbaseUtils = HbaseUtils.getInstance();
hbaseTable = hbaseUtils.getHbaseTable();
LOG.info("Druid 成功建立连接");
try{
// baseline生成并写入
generateBaselinesThread();
long last = System.currentTimeMillis();
LOG.warn("运行时间:" + (last - start));
druidData.closeConn();
hbaseTable.close();
LOG.info("Druid 关闭连接");
} catch (Exception e){
e.printStackTrace();
}
System.exit(0);
}
/**
* 多线程baseline生成入口
* @throws InterruptedException
*/
private static void generateBaselinesThread() throws InterruptedException {
int threadNum = Runtime.getRuntime().availableProcessors();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("baseline-demo-%d").build();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadNum,
threadNum,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
// IP列表获取
ArrayList<String> destinationIps = druidData.getServerIpList();
LOG.info("共查询到服务端ip " +destinationIps.size() + "");
LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE);
// 分批进行IP baseline生成和处理
List<List<String>> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE);
for (List<String> batchIps: batchIpLists){
if(batchIps.size()>0){
executor.execute(() -> generateBaselines(batchIps));
}
}
executor.shutdown();
executor.awaitTermination(10L, TimeUnit.HOURS);
}
/**
* 批量生成IP baseline
* @param ipList ip列表
*/
public static void generateBaselines(List<String> ipList){
druidData = DruidData.getInstance();
batchDruidData = druidData.readFromDruid(ipList);
List<Put> putList = new ArrayList<>();
for(String attackType: attackTypeList){
for(String ip: ipList){
int[] ipBaseline = generateSingleIpBaseline(ip, attackType);
if (ipBaseline!= null){
putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE);
}
}
}
try {
hbaseTable.put(putList);
LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size());
} catch (IOException e) {
e.printStackTrace();
}
druidData.closeConn();
}
/**
* 单ip baseline生成逻辑
* @param ip ip
* @param attackType 攻击类型
* @return baseline序列长度为 60/HISTORICAL_GRAD*24
*/
private static int[] generateSingleIpBaseline(String ip, String attackType){
// 查询
List<Map<String, Object>> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType);
if (originSeries.size()==0){
return null;
}
// 时间序列缺失值补0
List<Map<String, Object>> completSeries = SeriesUtils.complementSeries(originSeries);
int[] baselineArr = new int[BASELINE_POINT_NUM];
List<Integer>series = completSeries.stream().map(
i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList());
// 判断ip出现频率
if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){
// 高频率
double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(),
ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE);
Arrays.fill(baselineArr, (int)percentile);
baselineArr = baselineFunction(series);
} else {
// 判断周期性
if (SeriesUtils.isPeriod(series)){
baselineArr = baselineFunction(series);
} else {
int ipPercentile = SeriesUtils.percentile(
originSeries.stream().map(i ->
Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()),
ApplicationConfig.BASELINE_RATIONAL_PERCENTILE);
Arrays.fill(baselineArr, ipPercentile);
}
}
return baselineArr;
}
/**
* baseline 生成算法
* @param timeSeries 输入序列
* @return 输出序列
*/
private static int[] baselineFunction(List<Integer> timeSeries){
int[] result;
switch (ApplicationConfig.BASELINE_FUNCTION){
case "KalmanFilter":
KalmanFilter kalmanFilter = new KalmanFilter();
kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM);
result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray();
break;
default:
result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray();
}
return result;
}
public static void main(String[] args) {
perform();
}
}

View File

@@ -1,90 +0,0 @@
package cn.mesalab.service.algorithm;
import cn.mesalab.config.ApplicationConfig;
import java.util.ArrayList;
import java.util.List;
/**
* @author yjy
* @version 1.0
* kalman滤波器
* @date 2021/7/25 1:42 下午
*/
public class KalmanFilter {
private Integer predict;
private Integer current;
private Integer estimate;
private double pdelt;
private double mdelt;
private double Gauss;
private double kalmanGain;
private final static double Q = ApplicationConfig.BASELINE_KALMAN_Q;
private final static double R = ApplicationConfig.BASELINE_KALMAN_R;
public KalmanFilter() {
initial();
}
public void initial(){
// TODO 调整
pdelt = 1;
mdelt = 1;
}
private ArrayList<Integer> smoothSeries;
private ArrayList<Integer> forecastSeries;
public Integer calSingleKalPoint(Integer oldValue, Integer value){
//第一个估计值
predict = oldValue;
current = value;
//高斯噪声方差
Gauss = Math.sqrt(pdelt * pdelt + mdelt * mdelt) + Q;
//估计方差
kalmanGain = Math.sqrt((Gauss * Gauss)/(Gauss * Gauss + pdelt * pdelt)) + R;
//估计值
estimate = (int) (kalmanGain * (current - predict) + predict);
//新的估计方差
mdelt = Math.sqrt((1-kalmanGain) * Gauss * Gauss);
return estimate;
}
public void forcast(List<Integer> historicalSeries, Integer length){
int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size();
smoothSeries = new ArrayList<Integer>();
for(int i = 0; i < historicalSeries.size(); i++){
int value = historicalSeries.get(i);
oldvalue = calSingleKalPoint(oldvalue,value);
smoothSeries.add(oldvalue);
}
forecastSeries = new ArrayList<>();
Integer partitonNum = historicalSeries.size()/length;
for(int i = 0; i<length; i++){
long sum = 0;
for (int period=0; period<partitonNum; period++){
sum += smoothSeries.get(length*period+i);
}
forecastSeries.add((int)sum/partitonNum);
}
}
public ArrayList<Integer> getSmoothSeries() {
return smoothSeries;
}
public ArrayList<Integer> getAllRangeSeries() {
ArrayList<Integer> results = new ArrayList<>();
results.addAll(smoothSeries);
results.addAll(forecastSeries);
return results;
}
public ArrayList<Integer> getForecastSeries() {
return forecastSeries;
}
}

View File

@@ -1,45 +0,0 @@
package cn.mesalab.utils;
import org.apache.log4j.Logger;
import java.util.Properties;
public class ConfigUtils {
private static final Logger LOG = Logger.getLogger(ConfigUtils.class);
private static Properties propCommon = new Properties();
public static String getStringProperty(String key) {
return propCommon.getProperty(key);
}
public static Float getFloatProperty(String key) {
return Float.parseFloat(propCommon.getProperty(key));
}
public static Integer getIntProperty(String key) {
return Integer.parseInt(propCommon.getProperty(key));
}
public static Long getLongProperty(String key) {
return Long.parseLong(propCommon.getProperty(key));
}
public static Double getDoubleProperty(String key) {
return Double.parseDouble(propCommon.getProperty(key));
}
public static Boolean getBooleanProperty(String key) {
return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
}
static {
try {
propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties"));
} catch (Exception e) {
propCommon = null;
LOG.error("配置加载失败");
}
}
}

View File

@@ -1,55 +0,0 @@
package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;
import org.apache.hadoop.hbase.client.Table;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
/**
* @author yjy
* @version 1.0
* @date 2021/7/23 4:50 下午
*/
public class DruidUtils {
private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<AvaticaConnection>();
private static final String DRUID_URL = ApplicationConfig.DRUID_URL;
private static AvaticaStatement statement = null;
/**
* 打开连接
* @throws SQLException
*/
public static AvaticaConnection getConn() throws SQLException {
Properties properties = new Properties();
properties.setProperty("connectTimeout", String.valueOf(10*60*60));
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);
threadLocal.set(connection);
return connection;
}
/**
* 关闭连接
*/
public static void closeConnection() throws SQLException{
AvaticaConnection conn = threadLocal.get();
if(conn != null){
conn.close();
threadLocal.remove();
}
}
/**
* 根据sql查询结果
*/
public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{
ResultSet resultSet = statement.executeQuery(sql);
return resultSet;
}
}

View File

@@ -1 +0,0 @@
package cn.mesalab.utils;

View File

@@ -1,6 +0,0 @@
package cn.mesalab.utils;/**
* @author yjy
* @date 2021/8/3 3:57 下午
* @version 1.0
*/public class HttpClientService {
}

View File

@@ -1,212 +0,0 @@
package cn.mesalab.utils;
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
import cn.mesalab.service.BaselineGeneration;
import com.google.common.collect.Lists;
import org.jfree.util.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileReader;
import java.lang.reflect.Array;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Stream;
/**
* @author joy
*/
public class SeriesUtils {
private static final Logger LOG = LoggerFactory.getLogger(SeriesUtils.class);
private static DruidData druidData = new DruidData();
public static List<Map<String, Object>> readCsvToList(String filePath) {
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
String line;
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
br.readLine();
while ((line = br.readLine()) != null) {
List<String> column = Arrays.asList(line.split(","));
// 保存记录中的每个<字段名-字段值>
Map<String, Object> rowData = new HashMap<String, Object>();
rowData.put("__time", column.get(0));
rowData.put(ApplicationConfig.BASELINE_METRIC_TYPE, Integer.valueOf(column.get(1)));
list.add(rowData);
}
} catch (Exception e) {
e.printStackTrace();
}
return list;
}
/**
* 时序数据补齐
*/
public static List<Map<String, Object>> complementSeries(List<Map<String, Object>> originSeries){
LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone
.getDefault().toZoneId());
LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone
.getDefault().toZoneId());
List<String> dateList = completionDate(startTime, endTime);
// 补全后的结果
List<Map<String, Object>> result = new ArrayList<>();
boolean dbDateExist = false;
for (String date : dateList) {
//table为数据库查询出来的对象列表结构为List<Map<String, Object>>
for (Map<String, Object> row : originSeries) {
if (row.get(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME).toString().substring(0,19).equals(date)) {
//集合已包含该日期
dbDateExist = true;
result.add(row);
break;
}
}
//添加补全的数据到最后结果列表
if (!dbDateExist) {
Map<String, Object> temp = new HashMap<>(2);
temp.put(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME, date);
temp.put(ApplicationConfig.BASELINE_METRIC_TYPE, 0);
result.add(temp);
}
dbDateExist = false;
}
return result;
}
private static List<String> completionDate(LocalDateTime startTime, LocalDateTime endTime) {
//日期格式化
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ApplicationConfig.TIME_FORMAT);
List<String> timeList = new ArrayList<>();
//遍历给定的日期期间的每一天
for (int i = 0; !Duration.between(startTime.plusMinutes(i+1), endTime).isNegative(); i+= ApplicationConfig.HISTORICAL_GRAD) {
//添加日期
timeList.add(startTime.plusMinutes(i).format(formatter));
}
return timeList;
}
/**
* 判断是否存在以天为单位的周期特征
* @param historicalSeries
* @return
*/
public static Boolean isPeriod(List<Integer> historicalSeries){
Boolean result = true;
List<List<Integer>> partitions = Lists.partition(historicalSeries, 24*60/ApplicationConfig.HISTORICAL_GRAD);
List<Integer> aggregatedPart = Arrays.asList();
try{
aggregatedPart = columnAverage(partitions.subList(0, ApplicationConfig.READ_HISTORICAL_DAYS-1));
} catch (IndexOutOfBoundsException e){
Log.error("历史");
}
// Pearson corrcoef
double pearsonCorrelationScore = getPearsonCorrelationScore(aggregatedPart.stream().mapToInt(Integer::valueOf).toArray(),
partitions.get(partitions.size() - 1).stream().mapToInt(Integer::valueOf).toArray());
if (pearsonCorrelationScore < ApplicationConfig.BASELINE_PERIOD_CORR_THRE){
result=false;
}
return result;
}
public static double getPearsonCorrelationScore(int[] xData, int[] yData) {
if (xData.length != yData.length) {
Log.error("Pearson CorrelationScore 数组长度不相等!");
}
int xMeans;
int yMeans;
double numerator = 0;
double denominator = 0;
double result = 0;
// 拿到两个数据的平均值
xMeans = (int) getMeans(xData);
yMeans = (int) getMeans(yData);
// 计算皮尔逊系数的分子
numerator = generateNumerator(xData, xMeans, yData, yMeans);
// 计算皮尔逊系数的分母
denominator = generateDenomiator(xData, xMeans, yData, yMeans);
// 计算皮尔逊系数
if(denominator>0) {
result = numerator / denominator;
}
return result;
}
private static int generateNumerator(int[] xData, int xMeans, int[] yData, int yMeans) {
int numerator = 0;
for (int i = 0; i < xData.length; i++) {
numerator += (xData[i] - xMeans) * (yData[i] - yMeans);
}
return numerator;
}
private static double generateDenomiator(int[] xData, int xMeans, int[] yData, int yMeans) {
double xSum = 0.0;
for (int i = 0; i < xData.length; i++) {
xSum += (xData[i] - xMeans) * (xData[i] - xMeans);
}
double ySum = 0.0;
for (int i = 0; i < yData.length; i++) {
ySum += (yData[i] - yMeans) * (yData[i] - yMeans);
}
return Math.sqrt(xSum) * Math.sqrt(ySum);
}
private static double getMeans(int[] datas) {
double sum = 0.0;
for (int i = 0; i < datas.length; i++) {
sum += datas[i];
}
return sum / datas.length;
}
public static List<Integer> columnAverage(List<List<Integer>> list){
ArrayList<Integer> averages = new ArrayList<>();
for(int i=0; i<list.get(0).size(); i++){
int columnSum = 0;
for(int j = 0; j< list.size(); j++){
columnSum += list.get(j).get(i);
}
averages.add(columnSum / list.size());
}
return averages;
}
public static int percentile(List<Integer> latencies, double percentile) {
Collections.sort(latencies);
int index = (int) Math.ceil(percentile * latencies.size());
return latencies.get(index-1);
}
public static void main(String[] args) {
List<Integer> test = Arrays.asList(
1,2,3,4,5,
1,2,3,4,5,
1,2,3,4,5,
1,2,3,4,5,
1,2,3,4,5,
1,2,3,4,5,
1,2,3,4,5);
System.out.println(columnAverage(Lists.partition(test, 5)));
}
}

View File

@@ -1,58 +0,0 @@
#Druid配置
druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/
druid.driver=org.apache.calcite.avatica.remote.Driver
druid.table=top_server_ip_test_log
#字段映射
druid.attacktype.tcpsynflood=sessions
druid.attacktype.udpflood=bytes
druid.attacktype.icmpflood=packets
druid.attacktype.dnsamplification=packets
druid.serverip.columnname=destination
druid.attacktype.columnname=order_by
druid.recvtime.columnname=__time
#baseline生成metric
baseline.metric.type=session_num
#HBase配置
hbase.table=ddos_traffic_baselines
hbase.zookeeper.quorum=192.168.44.12
hbase.zookeeper.client.port=2181
#读取druid时间范围方式0读取默认范围read.druid.time.range天数1指定时间范围
read.druid.time.limit.type=1
#07-01
read.druid.min.time=1625068800000
#06-01
#read.druid.min.time=1622476800000
read.druid.max.time=1625673600000
#读取过去N天数据最小值为3天需要判断周期性
read.historical.days=7
#历史数据汇聚粒度为10分钟
historical.grad=10
#baseline生成方法
baseline.function=KalmanFilter
#baseline时间1天
baseline.range.days=1
# 数据库Time格式
time.format=yyyy-MM-dd HH:mm:ss
#算法参数
baseline.period.correlative.threshold=0.5
baseline.historical.ratio.threshold=0.1
baseline.historical.sparse.fill.percentile=0.95
baseline.rational.percentile=0.95
#Kalman Filter
baseline.kalman.q=0.000001
baseline.kalman.r=0.002
# 每更新1000个记录打印log
log.write.count=10000
# FOR TEST
generate.batch.size=1

View File

@@ -1,19 +0,0 @@
######################### logger ##############################
log4j.logger.org.apache.http=OFF
log4j.logger.org.apache.http.wire=OFF
#Log4j
log4j.rootLogger=debug,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=info
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=info
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
log4j.appender.file.file=./logs/ddos_baselines.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n

View File

@@ -1,92 +0,0 @@
package cn.mesalab.service;
/**
* @author yjy
* @version 1.0
* @date 2021/8/3 11:21 上午
*/
import cn.mesalab.config.ApplicationConfig;
import cn.mesalab.dao.DruidData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class HBaseTest {
public static void main(String[] args) throws IOException {
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM);
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT);
TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE);
Connection conn = ConnectionFactory.createConnection(config);
Table table = conn.getTable(tableName);
DruidData druidData = DruidData.getInstance();
ArrayList<String> destinationIps = druidData.getServerIpList();
for (String ip : destinationIps){
Get abcGet = new Get(Bytes.toBytes(ip));
Result r = table.get(abcGet);
ArrayWritable w = new ArrayWritable(IntWritable.class);
List<String> attackTypeList = Arrays.asList(
"TCP SYN Flood",
"ICMP Flood",
"UDP Flood",
"DNS Amplification"
);
for (String attackType : attackTypeList){
byte[] session_nums = r.getValue(Bytes.toBytes(attackType), Bytes.toBytes("session_num"));
if (session_nums==null){
continue;
}
w.readFields(new DataInputStream(new ByteArrayInputStream(session_nums)));
ArrayList<Integer> arr2 = fromWritable(w);
System.out.println(ip + "-" + attackType + ": " + arr2.toString());
}
}
// Get abcGet = new Get(Bytes.toBytes("1.0.0.1"));
// Result r = table.get(abcGet);
// ArrayWritable w = new ArrayWritable(IntWritable.class);
// w.readFields(new DataInputStream(new ByteArrayInputStream(r.getValue(Bytes.toBytes("TCP SYN Flood"), Bytes.toBytes("session_num")))));
// ArrayList<Integer> arr2 = fromWritable(w);
// System.out.println(arr2.toString());
}
public static Writable toWritable(int[] arr) {
Writable[] content = new Writable[arr.length];
for (int i = 0; i < content.length; i++) {
content[i] = new IntWritable(arr[i]);
}
return new ArrayWritable(IntWritable.class, content);
}
public static ArrayList<Integer> fromWritable(ArrayWritable writable) {
Writable[] writables = ((ArrayWritable) writable).get();
ArrayList<Integer> list = new ArrayList<Integer>(writables.length);
for (Writable wrt : writables) {
list.add(((IntWritable)wrt).get());
}
return list;
}
}