diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c446418 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# Created by .ignore support plugin (hsz.mobi) +*.class + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear +*.zip + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + + +target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +nbproject/private/ +builds/ +nbbuild/ +dist/ +nbdist/ +.nb-gradle/ + +log/ +logs/ \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..20e75ff --- /dev/null +++ b/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + + cn.mesalab + generate-baselines + 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + + + + true + cn.mesalab.main.BaselineApplication + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + + + org.apache.hbase + hbase-client + 2.2.3 + + + + org.jfree + jfreechart + 1.0.18 + + + + org.apache.calcite.avatica + avatica-core + 1.15.0 + + + + com.typesafe + config + 1.2.1 + + + + com.google.protobuf + protobuf-java + 3.5.1 + + + + io.vavr + vavr + 0.10.2 + + + + org.springframework + spring-core + 5.1.4.RELEASE + + + org.slf4j + slf4j-api + 1.7.26 + + + com.zdjizhi + galaxy + 1.0.6 + + + org.slf4j + slf4j-log4j12 + + + + + + + + + \ No newline at end of file diff --git a/src/META-INF/MANIFEST.MF b/src/META-INF/MANIFEST.MF new file mode 100644 index 0000000..4993e7c --- /dev/null +++ b/src/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: cn.mesalab.main.BaselineApplication + diff --git a/src/main/java/cn/mesalab/config/ApplicationConfig.java b/src/main/java/cn/mesalab/config/ApplicationConfig.java new file mode 100644 index 0000000..68b8c50 --- /dev/null +++ b/src/main/java/cn/mesalab/config/ApplicationConfig.java @@ -0,0 +1,64 @@ +package cn.mesalab.config; + +import cn.mesalab.utils.ConfigUtils; + +/** + * @author yjy + * @version 1.0 + * @date 2021/7/24 10:23 上午 + */ +public class ApplicationConfig { + + public static final String DRUID_URL= ConfigUtils.getStringProperty("druid.url"); + public static final String DRUID_DRIVER = ConfigUtils.getStringProperty("druid.driver"); + public static final String DRUID_TABLE = ConfigUtils.getStringProperty("druid.table"); + + + public static final Integer DRUID_TIME_LIMIT_TYPE = ConfigUtils.getIntProperty("read.druid.time.limit.type"); + public static final Long READ_DRUID_MAX_TIME = ConfigUtils.getLongProperty("read.druid.max.time"); + public static final Long READ_DRUID_MIN_TIME = ConfigUtils.getLongProperty("read.druid.min.time"); + + public static final Integer READ_HISTORICAL_DAYS = ConfigUtils.getIntProperty("read.historical.days"); + public static final Integer HISTORICAL_GRAD = ConfigUtils.getIntProperty("historical.grad"); + public static final String TIME_FORMAT = ConfigUtils.getStringProperty("time.format"); + public static final String BASELINE_METRIC_TYPE = ConfigUtils.getStringProperty("baseline.metric.type"); + + public static final String DRUID_ATTACKTYPE_TCP_SYN_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.tcpsynflood"); + public static final String DRUID_ATTACKTYPE_UDP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.udpflood"); + public static final String DRUID_ATTACKTYPE_ICMP_FLOOD = ConfigUtils.getStringProperty("druid.attacktype.icmpflood"); + public static final String DRUID_ATTACKTYPE_DNS_AMPL = ConfigUtils.getStringProperty("druid.attacktype.dnsamplification"); + public static final String DRUID_SERVERIP_COLUMN_NAME = ConfigUtils.getStringProperty("druid.serverip.columnname"); + public static final String DRUID_ATTACKTYPE_COLUMN_NAME = ConfigUtils.getStringProperty("druid.attacktype.columnname"); + public static final String DRUID_RECVTIME_COLUMN_NAME = ConfigUtils.getStringProperty("druid.recvtime.columnname"); + + public static final float BASELINE_PERIOD_CORR_THRE = ConfigUtils.getFloatProperty("baseline.period.correlative.threshold"); + public static final float BASELINE_HISTORICAL_RATIO = ConfigUtils.getFloatProperty("baseline.historical.ratio.threshold"); + public static final float BASELINE_SPARSE_FILL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.historical.sparse.fill.percentile"); + public static final String BASELINE_FUNCTION = ConfigUtils.getStringProperty("baseline.function"); + public static final Integer BASELINE_RANGE_DAYS = ConfigUtils.getIntProperty("baseline.range.days"); + public static final float BASELINE_RATIONAL_PERCENTILE = ConfigUtils.getFloatProperty("baseline.rational.percentile"); + + + public static final String HBASE_TABLE = ConfigUtils.getStringProperty("hbase.table"); + public static final String HBASE_ZOOKEEPER_QUORUM= ConfigUtils.getStringProperty("hbase.zookeeper.quorum"); + public static final String HBASE_ZOOKEEPER_CLIENT_PORT= ConfigUtils.getStringProperty("hbase.zookeeper.client.port"); + + + public static final Double BASELINE_KALMAN_Q = ConfigUtils.getDoubleProperty("baseline.kalman.q"); + public static final Double BASELINE_KALMAN_R = ConfigUtils.getDoubleProperty("baseline.kalman.r"); + + public static final Integer LOG_WRITE_COUNT = ConfigUtils.getIntProperty("log.write.count"); + public static final Integer GENERATE_BATCH_SIZE = ConfigUtils.getIntProperty("generate.batch.size"); + + + + // http config + + public static final Integer HTTP_REQUEST_TIMEOUT = ConfigUtils.getIntProperty("http.request.timeout"); + public static final Integer HTTP_RESPONSE_TIMEOUT = ConfigUtils.getIntProperty("http.response.timeout"); + public static final Integer HTTP_CONNECTION_TIMEOUT = ConfigUtils.getIntProperty("http.connection.timeout"); + public static final Integer HTTP_MAX_CONNECTION_NUM = ConfigUtils.getIntProperty("http.max.connection.num"); + public static final Integer HTTP_MAX_PER_ROUTE = ConfigUtils.getIntProperty("http.max.per.route"); + +} + diff --git a/src/main/java/cn/mesalab/dao/DruidData.java b/src/main/java/cn/mesalab/dao/DruidData.java new file mode 100644 index 0000000..ec28278 --- /dev/null +++ b/src/main/java/cn/mesalab/dao/DruidData.java @@ -0,0 +1,187 @@ +package cn.mesalab.dao; + +import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.Impl.ResultSetToListServiceImp; +import cn.mesalab.utils.DruidUtils; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +/** + * @author yjy + * @version 1.0 + * Druid 数据库操作 + * @date 2021/7/23 4:56 下午 + */ +public class DruidData { + + private static final Logger LOG = LoggerFactory.getLogger(DruidData.class); + private static DruidData druidData; + private AvaticaConnection connection; + private AvaticaStatement statement; + private String timeFilter = ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " >= MILLIS_TO_TIMESTAMP(" + getTimeLimit()._2 + + ") AND " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " < MILLIS_TO_TIMESTAMP(" + getTimeLimit()._1 + ")"; + + + { + connectionInit(); + } + + /** + * 连接初始化 + */ + private void connectionInit(){ + try { + connection = DruidUtils.getConn(); + statement = connection.createStatement(); + statement.setQueryTimeout(0); + + } catch (SQLException exception) { + exception.printStackTrace(); + } + } + + /** + * 获取实例 + * @return DruidData实例 + */ + public static DruidData getInstance() { + druidData = new DruidData(); + return druidData; + } + + /** + * 获取distinct server ip + * @return ArrayList ip列表 + */ + public ArrayList getServerIpList() { + Long startQueryIpLIstTime = System.currentTimeMillis(); + ArrayList serverIps = new ArrayList(); + String sql = "SELECT distinct " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + " FROM " + ApplicationConfig.DRUID_TABLE + + " WHERE " + timeFilter + + " LIMIT 1000";// FOR TEST + try{ + ResultSet resultSet = DruidUtils.executeQuery(statement,sql); + while(resultSet.next()){ + String ip = resultSet.getString(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME); + serverIps.add(ip); + } + } catch (Exception e){ + e.printStackTrace(); + } + Long endQueryIpListTime = System.currentTimeMillis(); + LOG.info("性能测试:ip list查询耗时——"+(endQueryIpListTime-startQueryIpLIstTime)); + + return serverIps; + } + + /** + * 从Druid读取目标IP相关数据 + * @param ipList ip列表 + * @return 数据库读取结果 + */ + public List> readFromDruid(List ipList){ + List> rsList = null; + ipList = ipList.stream().map( ip -> "\'"+ip+"\'").collect(Collectors.toList()); + String ipString = "(" + StringUtils.join(ipList, ",").toString() + ")"; + String sql = "SELECT "+ ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + ", "+ ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME + + ", "+ ApplicationConfig.BASELINE_METRIC_TYPE + + ", " + ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME + + " FROM " + ApplicationConfig.DRUID_TABLE + + " WHERE " + ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME + + " IN " + ipString + + " AND " + timeFilter; + try{ + ResultSet resultSet = DruidUtils.executeQuery(statement, sql); + ResultSetToListService service = new ResultSetToListServiceImp(); + rsList = service.selectAll(resultSet); + } catch (Exception e){ + e.printStackTrace(); + } + return rsList; + } + + /** + * 从数据库读取结果中筛选指定ip的指定攻击类型的数据 + * @param allData 数据库读取结果 + * @param ip 指定ip + * @param attackType 指定攻击类型 + * @return 筛选结果 + */ + public List> getTimeSeriesData(List> allData, String ip, String attackType){ + List> rsList = new ArrayList<>(); + try{ + rsList = allData.stream(). + filter(i->((i.get(ApplicationConfig.DRUID_SERVERIP_COLUMN_NAME).equals(ip)) + )&&(i.get(ApplicationConfig.DRUID_ATTACKTYPE_COLUMN_NAME).equals(attackType))) + .collect(Collectors.toList()); + } catch (NullPointerException e){ + } + return rsList; + } + + /** + * 计算查询时间范围,可指定时间范围(测试)或使用默认配置 + * @return 时间范围起始点和终止点 + */ + public Tuple2 getTimeLimit(){ + long maxTime = 0L; + long minTime = 0L; + switch(ApplicationConfig.DRUID_TIME_LIMIT_TYPE){ + case 0: + maxTime = getCurrentDay(); + minTime = getCurrentDay(-ApplicationConfig.READ_HISTORICAL_DAYS); + break; + case 1: + maxTime = ApplicationConfig.READ_DRUID_MAX_TIME; + minTime = ApplicationConfig.READ_DRUID_MIN_TIME; + break; + default: + LOG.warn("没有设置Druid数据读取方式"); + } + return Tuple.of(maxTime, minTime); + } + + private long getCurrentDay(int bias) { + Calendar calendar = Calendar.getInstance(); + calendar.set(Calendar.DAY_OF_YEAR, calendar.get(Calendar.DAY_OF_YEAR) + bias); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + return calendar.getTimeInMillis(); + } + + private long getCurrentDay(){ + return getCurrentDay(0); + } + + /** + * 关闭当前DruidData + */ + public void closeConn(){ + try { + DruidUtils.closeConnection(); + } catch (SQLException exception) { + exception.printStackTrace(); + } + } +} diff --git a/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java new file mode 100644 index 0000000..7867353 --- /dev/null +++ b/src/main/java/cn/mesalab/dao/Impl/ResultSetToListServiceImp.java @@ -0,0 +1,44 @@ +package cn.mesalab.dao.Impl; + +import cn.mesalab.dao.ResultSetToListService; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author yjy + * @version 1.0 + * @date 2021/7/24 4:29 下午 + */ +public class ResultSetToListServiceImp implements ResultSetToListService { + + /** + * SELECT 查询记录以List结构返回,每一个元素是一条记录 + * 每条记录保存在Map里面,String类型指字段名字,Object对应字段值 + * + * @param rs + * @return List> + */ + @Override + public List> selectAll(ResultSet rs) { + List> list = new ArrayList>(); + try { + ResultSetMetaData rmd = rs.getMetaData(); + int columnCount = rmd.getColumnCount(); + while (rs.next()) { + Map rowData = new HashMap(); + for (int i = 1; i <= columnCount; ++i) { + rowData.put(rmd.getColumnName(i), rs.getObject(i)); + } + list.add(rowData); + } + } catch (Exception ex) { + ex.printStackTrace(); + } + return list; + } +} \ No newline at end of file diff --git a/src/main/java/cn/mesalab/dao/ResultSetToListService.java b/src/main/java/cn/mesalab/dao/ResultSetToListService.java new file mode 100644 index 0000000..103e330 --- /dev/null +++ b/src/main/java/cn/mesalab/dao/ResultSetToListService.java @@ -0,0 +1,24 @@ +package cn.mesalab.dao; + +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; + + +/** + * @author yjy + * @version 1.0 + * @date 2021/7/24 4:27 下午 + */ +public interface ResultSetToListService { + /** + * SELECT * FROM websites + * 查询所有记录,以List返回 + * list对象的每一个元素都是一条记录 + * 每条记录保存在Map里面,String类型指字段名字,Object对应字段值 + * + * @param rs + * @return List> + */ + public List> selectAll(ResultSet rs); +} \ No newline at end of file diff --git a/src/main/java/cn/mesalab/main/BaselineApplication.java b/src/main/java/cn/mesalab/main/BaselineApplication.java new file mode 100644 index 0000000..8bd6f13 --- /dev/null +++ b/src/main/java/cn/mesalab/main/BaselineApplication.java @@ -0,0 +1,15 @@ +package cn.mesalab.main; + +import cn.mesalab.service.BaselineGeneration; +import sun.rmi.runtime.Log; + +/** + * @author yjy + * @version 1.0 + * @date 2021/7/23 5:34 下午 + */ +public class BaselineApplication { + public static void main(String[] args) { + BaselineGeneration.perform(); + } +} diff --git a/src/main/java/cn/mesalab/service/BaselineGeneration.java b/src/main/java/cn/mesalab/service/BaselineGeneration.java new file mode 100644 index 0000000..e72a0e6 --- /dev/null +++ b/src/main/java/cn/mesalab/service/BaselineGeneration.java @@ -0,0 +1,206 @@ +package cn.mesalab.service; + +import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.DruidData; +import cn.mesalab.service.algorithm.KalmanFilter; +import cn.mesalab.utils.HbaseUtils; +import cn.mesalab.utils.SeriesUtils; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.math3.stat.StatUtils; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * @author yjy + * @version 1.0 + * baseline生成及写入 + * @date 2021/7/23 5:38 下午 + */ +public class BaselineGeneration { + private static final Logger LOG = LoggerFactory.getLogger(BaselineGeneration.class); + + private static DruidData druidData; + private static HbaseUtils hbaseUtils; + private static Table hbaseTable; + private static List> batchDruidData = new ArrayList<>(); + + private static List attackTypeList = Arrays.asList( + ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD, + ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD, + ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD, + ApplicationConfig.DRUID_ATTACKTYPE_DNS_AMPL + ); + private static final Integer BASELINE_POINT_NUM = + ApplicationConfig.BASELINE_RANGE_DAYS * 24 * (60/ApplicationConfig.HISTORICAL_GRAD); + + /** + * 程序执行 + */ + public static void perform() { + long start = System.currentTimeMillis(); + + druidData = DruidData.getInstance(); + hbaseUtils = HbaseUtils.getInstance(); + hbaseTable = hbaseUtils.getHbaseTable(); + LOG.info("Druid 成功建立连接"); + + try{ + // baseline生成并写入 + generateBaselinesThread(); + + long last = System.currentTimeMillis(); + LOG.warn("运行时间:" + (last - start)); + + druidData.closeConn(); + hbaseTable.close(); + LOG.info("Druid 关闭连接"); + + } catch (Exception e){ + e.printStackTrace(); + } + System.exit(0); + } + + /** + * 多线程baseline生成入口 + * @throws InterruptedException + */ + private static void generateBaselinesThread() throws InterruptedException { + int threadNum = Runtime.getRuntime().availableProcessors(); + + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("baseline-demo-%d").build(); + + // 创建线程池 + ThreadPoolExecutor executor = new ThreadPoolExecutor( + threadNum, + threadNum, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(1024), + namedThreadFactory, + new ThreadPoolExecutor.AbortPolicy()); + + // IP列表获取 + ArrayList destinationIps = druidData.getServerIpList(); + + LOG.info("共查询到服务端ip " +destinationIps.size() + " 个"); + LOG.info("Baseline batch 大小: " + ApplicationConfig.GENERATE_BATCH_SIZE); + + // 分批进行IP baseline生成和处理 + List> batchIpLists = Lists.partition(destinationIps, ApplicationConfig.GENERATE_BATCH_SIZE); + for (List batchIps: batchIpLists){ + if(batchIps.size()>0){ + executor.execute(() -> generateBaselines(batchIps)); + } + } + + executor.shutdown(); + executor.awaitTermination(10L, TimeUnit.HOURS); + } + + /** + * 批量生成IP baseline + * @param ipList ip列表 + */ + public static void generateBaselines(List ipList){ + druidData = DruidData.getInstance(); + batchDruidData = druidData.readFromDruid(ipList); + + List putList = new ArrayList<>(); + for(String attackType: attackTypeList){ + for(String ip: ipList){ + int[] ipBaseline = generateSingleIpBaseline(ip, attackType); + if (ipBaseline!= null){ + putList = hbaseUtils.cachedInPut(putList, ip, ipBaseline, attackType, ApplicationConfig.BASELINE_METRIC_TYPE); + } + } + } + + try { + hbaseTable.put(putList); + LOG.info("Baseline 线程 " + Thread.currentThread().getId() + " 成功写入Baseline条数共计 " + putList.size()); + } catch (IOException e) { + e.printStackTrace(); + } + + druidData.closeConn(); + } + + /** + * 单ip baseline生成逻辑 + * @param ip ip + * @param attackType 攻击类型 + * @return baseline序列,长度为 60/HISTORICAL_GRAD*24 + */ + private static int[] generateSingleIpBaseline(String ip, String attackType){ + // 查询 + List> originSeries = druidData.getTimeSeriesData(batchDruidData, ip, attackType); + + if (originSeries.size()==0){ + return null; + } + + // 时间序列缺失值补0 + List> completSeries = SeriesUtils.complementSeries(originSeries); + + int[] baselineArr = new int[BASELINE_POINT_NUM]; + Listseries = completSeries.stream().map( + i -> Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()); + + // 判断ip出现频率 + if(originSeries.size()/(float)completSeries.size()>ApplicationConfig.BASELINE_HISTORICAL_RATIO){ + // 高频率 + double percentile = StatUtils.percentile(series.stream().mapToDouble(Double::valueOf).toArray(), + ApplicationConfig.BASELINE_SPARSE_FILL_PERCENTILE); + Arrays.fill(baselineArr, (int)percentile); + baselineArr = baselineFunction(series); + + } else { + // 判断周期性 + if (SeriesUtils.isPeriod(series)){ + baselineArr = baselineFunction(series); + } else { + int ipPercentile = SeriesUtils.percentile( + originSeries.stream().map(i -> + Integer.valueOf(i.get(ApplicationConfig.BASELINE_METRIC_TYPE).toString())).collect(Collectors.toList()), + ApplicationConfig.BASELINE_RATIONAL_PERCENTILE); + Arrays.fill(baselineArr, ipPercentile); + } + } + + return baselineArr; + } + + /** + * baseline 生成算法 + * @param timeSeries 输入序列 + * @return 输出序列 + */ + private static int[] baselineFunction(List timeSeries){ + int[] result; + switch (ApplicationConfig.BASELINE_FUNCTION){ + case "KalmanFilter": + KalmanFilter kalmanFilter = new KalmanFilter(); + kalmanFilter.forcast(timeSeries, BASELINE_POINT_NUM); + result = kalmanFilter.getForecastSeries().stream().mapToInt(Integer::valueOf).toArray(); + break; + default: + result = timeSeries.subList(0, BASELINE_POINT_NUM).stream().mapToInt(Integer::valueOf).toArray(); + } + return result; + } + + public static void main(String[] args) { + perform(); + } + +} diff --git a/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java new file mode 100644 index 0000000..11a40c3 --- /dev/null +++ b/src/main/java/cn/mesalab/service/algorithm/KalmanFilter.java @@ -0,0 +1,90 @@ +package cn.mesalab.service.algorithm; + +import cn.mesalab.config.ApplicationConfig; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author yjy + * @version 1.0 + * kalman滤波器 + * @date 2021/7/25 1:42 下午 + */ + +public class KalmanFilter { + private Integer predict; + private Integer current; + private Integer estimate; + private double pdelt; + private double mdelt; + private double Gauss; + private double kalmanGain; + private final static double Q = ApplicationConfig.BASELINE_KALMAN_Q; + private final static double R = ApplicationConfig.BASELINE_KALMAN_R; + + public KalmanFilter() { + initial(); + } + + public void initial(){ + // TODO 调整 + pdelt = 1; + mdelt = 1; + } + + private ArrayList smoothSeries; + private ArrayList forecastSeries; + + public Integer calSingleKalPoint(Integer oldValue, Integer value){ + //第一个估计值 + predict = oldValue; + current = value; + //高斯噪声方差 + Gauss = Math.sqrt(pdelt * pdelt + mdelt * mdelt) + Q; + //估计方差 + kalmanGain = Math.sqrt((Gauss * Gauss)/(Gauss * Gauss + pdelt * pdelt)) + R; + //估计值 + estimate = (int) (kalmanGain * (current - predict) + predict); + //新的估计方差 + mdelt = Math.sqrt((1-kalmanGain) * Gauss * Gauss); + + return estimate; + } + + + public void forcast(List historicalSeries, Integer length){ + int oldvalue = (historicalSeries.stream().mapToInt(Integer::intValue).sum())/historicalSeries.size(); + smoothSeries = new ArrayList(); + for(int i = 0; i < historicalSeries.size(); i++){ + int value = historicalSeries.get(i); + oldvalue = calSingleKalPoint(oldvalue,value); + smoothSeries.add(oldvalue); + } + + forecastSeries = new ArrayList<>(); + Integer partitonNum = historicalSeries.size()/length; + for(int i = 0; i getSmoothSeries() { + return smoothSeries; + } + + public ArrayList getAllRangeSeries() { + ArrayList results = new ArrayList<>(); + results.addAll(smoothSeries); + results.addAll(forecastSeries); + return results; + } + + public ArrayList getForecastSeries() { + return forecastSeries; + } +} diff --git a/src/main/java/cn/mesalab/utils/ConfigUtils.java b/src/main/java/cn/mesalab/utils/ConfigUtils.java new file mode 100644 index 0000000..718648b --- /dev/null +++ b/src/main/java/cn/mesalab/utils/ConfigUtils.java @@ -0,0 +1,45 @@ +package cn.mesalab.utils; + + +import org.apache.log4j.Logger; + +import java.util.Properties; + +public class ConfigUtils { + private static final Logger LOG = Logger.getLogger(ConfigUtils.class); + private static Properties propCommon = new Properties(); + + public static String getStringProperty(String key) { + return propCommon.getProperty(key); + } + public static Float getFloatProperty(String key) { + return Float.parseFloat(propCommon.getProperty(key)); + } + + + public static Integer getIntProperty(String key) { + return Integer.parseInt(propCommon.getProperty(key)); + } + + public static Long getLongProperty(String key) { + return Long.parseLong(propCommon.getProperty(key)); + } + + public static Double getDoubleProperty(String key) { + return Double.parseDouble(propCommon.getProperty(key)); + } + + public static Boolean getBooleanProperty(String key) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); + } + + static { + try { + propCommon.load(ConfigUtils.class.getClassLoader().getResourceAsStream("application.properties")); + + } catch (Exception e) { + propCommon = null; + LOG.error("配置加载失败"); + } + } +} diff --git a/src/main/java/cn/mesalab/utils/DruidUtils.java b/src/main/java/cn/mesalab/utils/DruidUtils.java new file mode 100644 index 0000000..8224d37 --- /dev/null +++ b/src/main/java/cn/mesalab/utils/DruidUtils.java @@ -0,0 +1,55 @@ +package cn.mesalab.utils; + +import cn.mesalab.config.ApplicationConfig; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.hadoop.hbase.client.Table; + +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; + +/** + * @author yjy + * @version 1.0 + * @date 2021/7/23 4:50 下午 + */ +public class DruidUtils { + private static ThreadLocal threadLocal = new ThreadLocal(); + + private static final String DRUID_URL = ApplicationConfig.DRUID_URL; + private static AvaticaStatement statement = null; + + /** + * 打开连接 + * @throws SQLException + */ + public static AvaticaConnection getConn() throws SQLException { + Properties properties = new Properties(); + properties.setProperty("connectTimeout", String.valueOf(10*60*60)); + AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties); + threadLocal.set(connection); + return connection; + } + + /** + * 关闭连接 + */ + public static void closeConnection() throws SQLException{ + AvaticaConnection conn = threadLocal.get(); + if(conn != null){ + conn.close(); + threadLocal.remove(); + } + } + + /** + * 根据sql查询结果 + */ + public static ResultSet executeQuery (AvaticaStatement statement, String sql) throws SQLException{ + ResultSet resultSet = statement.executeQuery(sql); + return resultSet; + } + +} diff --git a/src/main/java/cn/mesalab/utils/HbaseUtils.java b/src/main/java/cn/mesalab/utils/HbaseUtils.java new file mode 100644 index 0000000..86123bb --- /dev/null +++ b/src/main/java/cn/mesalab/utils/HbaseUtils.java @@ -0,0 +1 @@ +package cn.mesalab.utils; import cn.mesalab.config.ApplicationConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author yjy * @version 1.0 * @date 2021/7/23 4:56 下午 */ public class HbaseUtils { private static final Logger LOG = LoggerFactory.getLogger(HbaseUtils.class); private static HbaseUtils hbaseUtils; static { hbaseUtils = HbaseUtils.getInstance(); } public static HbaseUtils getInstance(){ if (hbaseUtils == null) { hbaseUtils = new HbaseUtils(); } return hbaseUtils; } public Table getHbaseTable(){ Table hbaseTable = null; try{ Configuration config = HBaseConfiguration.create(); config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); Connection conn = ConnectionFactory.createConnection(config); hbaseTable = conn.getTable(tableName); } catch (IOException e){ LOG.error("HBase 创建HBase table失败!"); e.printStackTrace(); } return hbaseTable; } public List cachedInPut(List putList, String ip, int[] baseline, String attackType, String metricType){ Put rowPut = new Put(Bytes.toBytes(ip)); // FOR TEST // start if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_TCP_SYN_FLOOD)){ attackType = "TCP SYN Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_UDP_FLOOD)){ attackType = "UDP Flood"; } else if(attackType.equals(ApplicationConfig.DRUID_ATTACKTYPE_ICMP_FLOOD)){ attackType = "ICMP Flood"; } else { attackType = "DNS Amplification"; } // end rowPut.addColumn( Bytes.toBytes(attackType), Bytes.toBytes(metricType), WritableUtils.toByteArray(toWritable(baseline))); putList.add(rowPut); return putList; } private static Writable toWritable(int[] arr) { Writable[] content = new Writable[arr.length]; for (int i = 0; i < content.length; i++) { content[i] = new IntWritable(arr[i]); } return new ArrayWritable(IntWritable.class, content); } public static ArrayList fromWritable(ArrayWritable writable) { Writable[] writables = ((ArrayWritable) writable).get(); ArrayList list = new ArrayList(writables.length); for (Writable wrt : writables) { list.add(((IntWritable)wrt).get()); } return list; } } \ No newline at end of file diff --git a/src/main/java/cn/mesalab/utils/HttpClientUtils.java b/src/main/java/cn/mesalab/utils/HttpClientUtils.java new file mode 100644 index 0000000..b5b7382 --- /dev/null +++ b/src/main/java/cn/mesalab/utils/HttpClientUtils.java @@ -0,0 +1,485 @@ +package cn.mesalab.utils; + +import cn.mesalab.config.ApplicationConfig; +import com.google.common.collect.Maps; +import org.apache.http.*; +import com.zdjizhi.utils.StringUtil; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpRequestRetryHandler; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PostConstruct; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author yjy + * @version 1.0 + * @date 2021/8/3 3:57 下午 + */ + +public class HttpClientUtils { + private static final Logger LOG = LoggerFactory.getLogger(HttpClientUtils.class); + + //全局连接池对象 + private PoolingHttpClientConnectionManager connectionManager; + + /** + * 初始化连接池信息 + */ + @PostConstruct + public void initConnectionManager() { + if (connectionManager == null) { + connectionManager = new PoolingHttpClientConnectionManager(); + // 整个连接池最大连接数 + connectionManager.setMaxTotal(ApplicationConfig.HTTP_MAX_CONNECTION_NUM); + // 每路由最大连接数,默认值是2 + connectionManager.setDefaultMaxPerRoute(ApplicationConfig.HTTP_MAX_PER_ROUTE); + } + LOG.info("Initializing PoolingHttpClientConnectionManager Complete"); + } + + /** + * 获取Http客户端连接对象 + * + * @param socketTimeOut 响应超时时间 + * @return Http客户端连接对象 + */ + public CloseableHttpClient getHttpClient(int socketTimeOut) { + // 创建Http请求配置参数 + RequestConfig requestConfig = RequestConfig.custom() + // 获取连接超时时间 + .setConnectionRequestTimeout(ApplicationConfig.HTTP_CONNECTION_TIMEOUT) + // 请求超时时间 + .setConnectTimeout(ApplicationConfig.HTTP_REQUEST_TIMEOUT) + // 响应超时时间 + .setSocketTimeout(socketTimeOut) + .build(); + + /** + * 测出超时重试机制为了防止超时不生效而设置 + * 如果直接放回false,不重试 + * 这里会根据情况进行判断是否重试 + */ + HttpRequestRetryHandler retry = (exception, executionCount, context) -> { + if (executionCount >= 3) {// 如果已经重试了3次,就放弃 + return false; + } + if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 + return true; + } + if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 + return false; + } + if (exception instanceof InterruptedIOException) {// 超时 + return true; + } + if (exception instanceof UnknownHostException) {// 目标服务器不可达 + return false; + } + if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 + return false; + } + if (exception instanceof SSLException) {// ssl握手异常 + return false; + } + HttpClientContext clientContext = HttpClientContext.adapt(context); + HttpRequest request = clientContext.getRequest(); + // 如果请求是幂等的,就再次尝试 + if (!(request instanceof HttpEntityEnclosingRequest)) { + return true; + } + return false; + }; + + + ConnectionKeepAliveStrategy myStrategy = (response, context) -> { + HeaderElementIterator it = new BasicHeaderElementIterator + (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + String value = he.getValue(); + if (value != null && param.equalsIgnoreCase("timeout")) { + return Long.parseLong(value) * 1000; + } + } + return 60 * 1000;//如果没有约定,则默认定义时长为60s + }; + + // 创建httpClient + return HttpClients.custom() + // 把请求相关的超时信息设置到连接客户端 + .setDefaultRequestConfig(requestConfig) + // 把请求重试设置到连接客户端 + .setRetryHandler(retry) + .setKeepAliveStrategy(myStrategy) + // 配置连接池管理对象 + .setConnectionManager(connectionManager) + .build(); + } + + /** + * Desc: 发起http delete请求,返回status code与response body + * @param url + * @param socketTimeout + * @return {@link Map< String, String>} + * @created by wWei + * @date 2021/1/8 3:29 下午 + */ + public Map httpDelete(String url, int socketTimeout) { + Map resultMap = Maps.newHashMap(); + // 创建GET请求对象 + CloseableHttpResponse response = null; + try { + HttpDelete httpDelete = new HttpDelete(url); + // 执行请求 + response = getHttpClient(socketTimeout).execute(httpDelete); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); + resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (ParseException e) { + LOG.error("解析错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (IOException e) { + LOG.error("IO错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); + resultMap.put("message", e.getMessage()); + } catch (Exception e) { + LOG.error("其它错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + resultMap.put("message", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e.getMessage()); + } + } + } + return resultMap; + } + + /** + * 返回status code与response body + * @param url:请求地址 + * @param socketTimeout: 响应超时时间 + * + **/ + public Map httpGet(String url, int socketTimeout) { + Map resultMap = Maps.newHashMap(); + // 创建GET请求对象 + CloseableHttpResponse response = null; + try { + HttpGet httpGet = new HttpGet(url); + // 执行请求 + response = getHttpClient(socketTimeout).execute(httpGet); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); + resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); + } catch (ClientProtocolException e) { + LOG.error("ClientProtocolException:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (ParseException e) { + LOG.error("ParseException:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (IOException e) { + LOG.error("IOException:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); + resultMap.put("message", e.getMessage()); + } catch (Exception e) { + LOG.error("Exception:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + resultMap.put("message", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("CloseConnectionException:{}", e.getMessage()); + } + } + } + return resultMap; + } + + /** + * 返回status code与response body + * @param url:请求地址 + * @param headers: Headers + * @param socketTimeOut: 响应超时时间 + * @return: java.util.Map + **/ + public Map httpGet(String url, Map headers, int socketTimeOut) { + Map resultMap = Maps.newHashMap(); + // 创建GET请求对象 + CloseableHttpResponse response = null; + try { + HttpGet httpGet = new HttpGet(url); + for (String key : headers.keySet()) { + httpGet.setHeader(key, headers.get(key)); + } + // 执行请求 + response = getHttpClient(socketTimeOut).execute(httpGet); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); + resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); + } catch (ClientProtocolException e) { + LOG.error("ClientProtocolException:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (ParseException e) { + LOG.error("ParseException:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (IOException e) { + LOG.error("IOException:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); + resultMap.put("message", e.getMessage()); + } catch (Exception e) { + LOG.error("Exception:{}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + resultMap.put("message", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("CloseConnectionException:{}", e.getMessage()); + } + } + } + return resultMap; + } + + /** + * 返回status code与response body + * @param url:请求地址 + * @param jsonString:请求参数 + * @param socketTimeOut:响应超时时间 + **/ + public Map httpPost(String url, String jsonString, int socketTimeOut) { + Map resultMap = Maps.newHashMap(); + // 创建GET请求对象 + CloseableHttpResponse response = null; + try { + HttpPost httpPost = new HttpPost(url); + httpPost.setHeader("Content-Type", "application/json"); + httpPost.setEntity(new ByteArrayEntity(jsonString.getBytes("utf-8"))); + response = getHttpClient(socketTimeOut).execute(httpPost); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + resultMap.put("status", String.valueOf(response.getStatusLine().getStatusCode())); + resultMap.put("result", EntityUtils.toString(entity, "UTF-8")); + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (ParseException e) { + LOG.error("解析错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)); + resultMap.put("message", e.getMessage()); + } catch (IOException e) { + LOG.error("IO错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_BAD_GATEWAY)); + resultMap.put("message", e.getMessage()); + } catch (Exception e) { + LOG.error("其它错误: {}", e.getMessage()); + resultMap.put("status", String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + resultMap.put("message", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e.getMessage()); + } + } + } + return resultMap; + } + + /** + * 返回status code与response body + * @param url:请求地址 + * @param headers: Headers + * @param socketTimeOut: 响应超时时间 + **/ + public Map getHttpPostResponseHeads(String url, Map headers, int socketTimeOut) { + CloseableHttpResponse response = null; + HashMap map = Maps.newHashMap(); + try { + HttpPost httpPost = new HttpPost(url); + for (Object k : headers.keySet()) { + httpPost.setHeader(k.toString(), headers.get(k).toString()); + } + response = getHttpClient(socketTimeOut).execute(httpPost); + Header[] Headers = response.getAllHeaders(); + for (Header h : Headers) { + map.put(h.getName().toUpperCase(), h.getValue()); + } + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + LOG.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + LOG.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e.getMessage()); + } + } + } + return map; + } + + /** + * @param url:请求地址 + **/ + public String httpGet(String url) { + String msg = "-1"; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(ApplicationConfig.HTTP_RESPONSE_TIMEOUT); + CloseableHttpResponse response = null; + try { + URL ul = new URL(url); + URI uri = new URI(ul.getProtocol(), null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); + LOG.info("http get uri {}", uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + if (statusCode != HttpStatus.SC_OK) { + LOG.error("Http get content is :" + msg); + System.exit(1); + } + } catch (URISyntaxException e) { + LOG.error("URI 转换错误: {}", e.getMessage()); + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + LOG.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + LOG.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e.getMessage()); + } + } + } + return msg; + } + + /** + * @param url: 请求地址 + * @param requestBody: 请求参数 + * @param headers: Header + **/ + public String httpPost(String url, String requestBody, Header... headers) { + String msg = "-1"; + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(ApplicationConfig.HTTP_RESPONSE_TIMEOUT); + // 创建POST请求对象 + CloseableHttpResponse response = null; + try { + + URL ul = new URL(url); + URI uri = new URI(ul.getProtocol(), null, ul.getHost(), ul.getPort(), ul.getPath(), ul.getQuery(), null); + LOG.debug("http post uri:{}, http post body:{}", uri, requestBody); + HttpPost httpPost = new HttpPost(uri); + httpPost.setHeader("Content-Type", "application/json"); + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpPost.addHeader(h); + } + } + if (StringUtil.isNotBlank(requestBody)) { + httpPost.setEntity(new ByteArrayEntity(requestBody.getBytes("utf-8"))); + } + response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_CREATED) { + LOG.error(msg); + System.exit(1); + } + } catch (URISyntaxException e) { + LOG.error("URI 转换错误: {}", e.getMessage()); + } catch (ClientProtocolException e) { + LOG.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + LOG.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + LOG.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consumeQuietly(response.getEntity()); + response.close(); + } catch (IOException e) { + LOG.error("释放链接错误: {}", e.getMessage()); + } + } + } + return msg; + } +} diff --git a/src/main/java/cn/mesalab/utils/SeriesUtils.java b/src/main/java/cn/mesalab/utils/SeriesUtils.java new file mode 100644 index 0000000..17f84b3 --- /dev/null +++ b/src/main/java/cn/mesalab/utils/SeriesUtils.java @@ -0,0 +1,212 @@ +package cn.mesalab.utils; + +import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.DruidData; +import cn.mesalab.service.BaselineGeneration; +import com.google.common.collect.Lists; +import org.jfree.util.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.lang.reflect.Array; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Stream; + + +/** + * @author joy + */ +public class SeriesUtils { + private static final Logger LOG = LoggerFactory.getLogger(SeriesUtils.class); + + private static DruidData druidData = new DruidData(); + + public static List> readCsvToList(String filePath) { + List> list = new ArrayList>(); + + String line; + try (BufferedReader br = new BufferedReader(new FileReader(filePath))) { + br.readLine(); + while ((line = br.readLine()) != null) { + List column = Arrays.asList(line.split(",")); + // 保存记录中的每个<字段名-字段值> + Map rowData = new HashMap(); + rowData.put("__time", column.get(0)); + rowData.put(ApplicationConfig.BASELINE_METRIC_TYPE, Integer.valueOf(column.get(1))); + + list.add(rowData); + } + } catch (Exception e) { + e.printStackTrace(); + } + return list; + } + + + /** + * 时序数据补齐 + */ + public static List> complementSeries(List> originSeries){ + LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._2), TimeZone + .getDefault().toZoneId()); + LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(druidData.getTimeLimit()._1), TimeZone + .getDefault().toZoneId()); + List dateList = completionDate(startTime, endTime); + + // 补全后的结果 + List> result = new ArrayList<>(); + boolean dbDateExist = false; + for (String date : dateList) { + //table为数据库查询出来的对象列表,结构为List> + for (Map row : originSeries) { + if (row.get(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME).toString().substring(0,19).equals(date)) { + //集合已包含该日期 + dbDateExist = true; + result.add(row); + break; + } + } + //添加补全的数据到最后结果列表 + if (!dbDateExist) { + Map temp = new HashMap<>(2); + temp.put(ApplicationConfig.DRUID_RECVTIME_COLUMN_NAME, date); + temp.put(ApplicationConfig.BASELINE_METRIC_TYPE, 0); + result.add(temp); + } + dbDateExist = false; + } + + return result; + } + + private static List completionDate(LocalDateTime startTime, LocalDateTime endTime) { + //日期格式化 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ApplicationConfig.TIME_FORMAT); + List timeList = new ArrayList<>(); + //遍历给定的日期期间的每一天 + for (int i = 0; !Duration.between(startTime.plusMinutes(i+1), endTime).isNegative(); i+= ApplicationConfig.HISTORICAL_GRAD) { + //添加日期 + timeList.add(startTime.plusMinutes(i).format(formatter)); + } + return timeList; + } + + /** + * 判断是否存在以天为单位的周期特征 + * @param historicalSeries + * @return + */ + public static Boolean isPeriod(List historicalSeries){ + Boolean result = true; + List> partitions = Lists.partition(historicalSeries, 24*60/ApplicationConfig.HISTORICAL_GRAD); + List aggregatedPart = Arrays.asList(); + try{ + aggregatedPart = columnAverage(partitions.subList(0, ApplicationConfig.READ_HISTORICAL_DAYS-1)); + } catch (IndexOutOfBoundsException e){ + Log.error("历史"); + } + + // Pearson corrcoef + double pearsonCorrelationScore = getPearsonCorrelationScore(aggregatedPart.stream().mapToInt(Integer::valueOf).toArray(), + partitions.get(partitions.size() - 1).stream().mapToInt(Integer::valueOf).toArray()); + + if (pearsonCorrelationScore < ApplicationConfig.BASELINE_PERIOD_CORR_THRE){ + result=false; + } + return result; + } + + public static double getPearsonCorrelationScore(int[] xData, int[] yData) { + if (xData.length != yData.length) { + Log.error("Pearson CorrelationScore 数组长度不相等!"); + } + int xMeans; + int yMeans; + double numerator = 0; + double denominator = 0; + + double result = 0; + // 拿到两个数据的平均值 + xMeans = (int) getMeans(xData); + yMeans = (int) getMeans(yData); + // 计算皮尔逊系数的分子 + numerator = generateNumerator(xData, xMeans, yData, yMeans); + // 计算皮尔逊系数的分母 + denominator = generateDenomiator(xData, xMeans, yData, yMeans); + // 计算皮尔逊系数 + if(denominator>0) { + result = numerator / denominator; + } + return result; + } + + private static int generateNumerator(int[] xData, int xMeans, int[] yData, int yMeans) { + int numerator = 0; + for (int i = 0; i < xData.length; i++) { + numerator += (xData[i] - xMeans) * (yData[i] - yMeans); + } + return numerator; + } + + private static double generateDenomiator(int[] xData, int xMeans, int[] yData, int yMeans) { + double xSum = 0.0; + for (int i = 0; i < xData.length; i++) { + xSum += (xData[i] - xMeans) * (xData[i] - xMeans); + } + double ySum = 0.0; + for (int i = 0; i < yData.length; i++) { + ySum += (yData[i] - yMeans) * (yData[i] - yMeans); + } + return Math.sqrt(xSum) * Math.sqrt(ySum); + } + + private static double getMeans(int[] datas) { + double sum = 0.0; + for (int i = 0; i < datas.length; i++) { + sum += datas[i]; + } + return sum / datas.length; + } + + public static List columnAverage(List> list){ + ArrayList averages = new ArrayList<>(); + for(int i=0; i latencies, double percentile) { + Collections.sort(latencies); + int index = (int) Math.ceil(percentile * latencies.size()); + return latencies.get(index-1); + } + + public static void main(String[] args) { + List test = Arrays.asList( + 1,2,3,4,5, + 1,2,3,4,5, + 1,2,3,4,5, + 1,2,3,4,5, + 1,2,3,4,5, + 1,2,3,4,5, + 1,2,3,4,5); + System.out.println(columnAverage(Lists.partition(test, 5))); + + + + } + + +} + diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..8a705ec --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,68 @@ + +#Druid配置 +druid.url=jdbc:avatica:remote:url=http://192.168.44.12:8082/druid/v2/sql/avatica/ +druid.driver=org.apache.calcite.avatica.remote.Driver +druid.table=top_server_ip_test_log + +#字段映射 +druid.attacktype.tcpsynflood=sessions +druid.attacktype.udpflood=bytes +druid.attacktype.icmpflood=packets +druid.attacktype.dnsamplification=packets +druid.serverip.columnname=destination +druid.attacktype.columnname=order_by +druid.recvtime.columnname=__time +#baseline生成metric +baseline.metric.type=session_num + +#HBase配置 +hbase.table=ddos_traffic_baselines +hbase.zookeeper.quorum=192.168.44.12 +hbase.zookeeper.client.port=2181 + +#读取druid时间范围方式,0:读取默认范围read.druid.time.range天数;1:指定时间范围 +read.druid.time.limit.type=1 +#07-01 +read.druid.min.time=1625068800000 +#06-01 +#read.druid.min.time=1622476800000 +read.druid.max.time=1625673600000 + + +#读取过去N天数据,最小值为3天(需要判断周期性) +read.historical.days=7 +#历史数据汇聚粒度为10分钟 +historical.grad=10 +#baseline生成方法 +baseline.function=KalmanFilter +#baseline时间1天 +baseline.range.days=1 +# 数据库Time格式 +time.format=yyyy-MM-dd HH:mm:ss + + +#算法参数 +baseline.period.correlative.threshold=0.5 +baseline.historical.ratio.threshold=0.1 +baseline.historical.sparse.fill.percentile=0.95 +baseline.rational.percentile=0.95 +#Kalman Filter +baseline.kalman.q=0.000001 +baseline.kalman.r=0.002 + + +# 每更新1000个记录打印log +log.write.count=10000 +# FOR TEST +generate.batch.size=10 + + +# http client配置 +http.request.timeout=10000 +http.response.timeout=60000 +http.connection.timeout=500 +http.max.connection.num=500 +http.max.per.route=200 + + + diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..ac2c528 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,19 @@ +######################### logger ############################## +log4j.logger.org.apache.http=OFF +log4j.logger.org.apache.http.wire=OFF + +#Log4j +log4j.rootLogger=debug,console,file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n + +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.Threshold=info +log4j.appender.file.encoding=UTF-8 +log4j.appender.file.Append=true +log4j.appender.file.file=./logs/ddos_baselines.log +log4j.appender.file.DatePattern='.'yyyy-MM-dd +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n diff --git a/src/test/java/cn/mesalab/service/HBaseTest.java b/src/test/java/cn/mesalab/service/HBaseTest.java new file mode 100644 index 0000000..9598865 --- /dev/null +++ b/src/test/java/cn/mesalab/service/HBaseTest.java @@ -0,0 +1,92 @@ +package cn.mesalab.service; + +/** + * @author yjy + * @version 1.0 + * @date 2021/8/3 11:21 上午 + */ + +import cn.mesalab.config.ApplicationConfig; +import cn.mesalab.dao.DruidData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class HBaseTest { + public static void main(String[] args) throws IOException { + org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); + + config.set(HConstants.ZOOKEEPER_QUORUM, ApplicationConfig.HBASE_ZOOKEEPER_QUORUM); + config.set(HConstants.ZOOKEEPER_CLIENT_PORT, ApplicationConfig.HBASE_ZOOKEEPER_CLIENT_PORT); + + TableName tableName = TableName.valueOf(ApplicationConfig.HBASE_TABLE); + Connection conn = ConnectionFactory.createConnection(config); + Table table = conn.getTable(tableName); + + + DruidData druidData = DruidData.getInstance(); + ArrayList destinationIps = druidData.getServerIpList(); + + for (String ip : destinationIps){ + Get abcGet = new Get(Bytes.toBytes(ip)); + Result r = table.get(abcGet); + ArrayWritable w = new ArrayWritable(IntWritable.class); + List attackTypeList = Arrays.asList( + "TCP SYN Flood", + "ICMP Flood", + "UDP Flood", + "DNS Amplification" + ); + for (String attackType : attackTypeList){ + byte[] session_nums = r.getValue(Bytes.toBytes(attackType), Bytes.toBytes("session_num")); + if (session_nums==null){ + continue; + } + w.readFields(new DataInputStream(new ByteArrayInputStream(session_nums))); + ArrayList arr2 = fromWritable(w); + System.out.println(ip + "-" + attackType + ": " + arr2.toString()); + } + + } + +// Get abcGet = new Get(Bytes.toBytes("1.0.0.1")); +// Result r = table.get(abcGet); +// ArrayWritable w = new ArrayWritable(IntWritable.class); +// w.readFields(new DataInputStream(new ByteArrayInputStream(r.getValue(Bytes.toBytes("TCP SYN Flood"), Bytes.toBytes("session_num"))))); +// ArrayList arr2 = fromWritable(w); +// System.out.println(arr2.toString()); + + + } + + public static Writable toWritable(int[] arr) { + Writable[] content = new Writable[arr.length]; + for (int i = 0; i < content.length; i++) { + content[i] = new IntWritable(arr[i]); + } + return new ArrayWritable(IntWritable.class, content); + } + + public static ArrayList fromWritable(ArrayWritable writable) { + Writable[] writables = ((ArrayWritable) writable).get(); + ArrayList list = new ArrayList(writables.length); + for (Writable wrt : writables) { + list.add(((IntWritable)wrt).get()); + } + return list; + } + +} diff --git a/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java new file mode 100644 index 0000000..a2d713e --- /dev/null +++ b/src/test/java/cn/mesalab/utils/HttpClientUtilsTest.java @@ -0,0 +1,14 @@ +package cn.mesalab.utils; + +import com.zdjizhi.utils.JsonMapper; + +/** + * @author yjy + * @version 1.0 + * @date 2021/8/3 4:43 下午 + */ +public class HttpClientUtilsTest { + + + +} \ No newline at end of file