From b4f919647a8e4c7dfbcfeb458005d8953495042c Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Tue, 24 Aug 2021 16:35:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=A0=B9=E6=8D=AE=E9=9D=99?= =?UTF-8?q?=E6=80=81=E9=98=88=E5=80=BC=E5=88=A4=E5=AE=9Ados=E6=94=BB?= =?UTF-8?q?=E5=87=BB=E9=80=BB=E8=BE=91=20=E6=96=B0=E5=A2=9E=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E5=99=A8=EF=BC=8C=E5=AE=9A=E6=97=B6=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E9=9D=99=E6=80=81=E9=98=88=E5=80=BC=E4=B8=8Ebaseline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zdjizhi/common/CommonConfig.java | 3 + .../java/com/zdjizhi/etl/DosDetection.java | 114 +++++++++++------ .../java/com/zdjizhi/etl/ParseSketchLog.java | 9 +- .../com/zdjizhi/etl/ParseStaticThreshold.java | 116 +++++++++++------- .../java/com/zdjizhi/utils/HbaseUtils.java | 7 +- .../com/zdjizhi/utils/HttpClientUtils.java | 1 + src/main/resources/common.properties | 10 +- 7 files changed, 173 insertions(+), 87 deletions(-) diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index b0b1f97..2da067b 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -58,4 +58,7 @@ public class CommonConfig { public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout"); public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout"); + public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes"); + public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days"); + } diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index f572107..7175ae7 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -10,6 +10,7 @@ import com.zdjizhi.utils.SnowflakeId; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -20,6 +21,9 @@ import org.slf4j.LoggerFactory; import java.text.NumberFormat; import java.text.ParseException; import java.util.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author wlh @@ -27,15 +31,28 @@ import java.util.*; public class DosDetection extends RichMapFunction { private static final Logger logger = LoggerFactory.getLogger(DosDetection.class); - private static Map, Integer>>> baselineMap; + private static Map, Integer>>> baselineMap = new HashMap<>(); private final static int BASELINE_SIZE = 144; private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); private TreeRangeMap> thresholdRangeMap; @Override public void open(Configuration parameters) { - baselineMap = HbaseUtils.baselineMap; - thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, + new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build()); + try { + executorService.scheduleAtFixedRate(() -> { + //do something + thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(); + }, 0, CommonConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); + + executorService.scheduleAtFixedRate(() -> { + //do something + baselineMap = HbaseUtils.readFromHbase(); + }, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); + } catch (Exception e) { + logger.error("定时器任务执行失败", e); + } PERCENT_INSTANCE.setMinimumFractionDigits(2); } @@ -48,72 +65,90 @@ public class DosDetection extends RichMapFunction { IPAddress destinationIpAddress = new IPAddressString(destinationIp).getAddress(); Map thresholdMap = thresholdRangeMap.get(destinationIpAddress); logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType); - if (baselineMap.containsKey(destinationIp) && thresholdMap == null) { - finalResult = getDosEventLogByBaseline(value, destinationIp, attackType); - } else if (!baselineMap.containsKey(destinationIp) && thresholdMap != null) { - finalResult = getDosEventLogByStaticThreshold(value,thresholdMap); - }else if (baselineMap.containsKey(destinationIp) && thresholdMap != null){ - DosEventLog eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType); - DosEventLog eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap); - finalResult = mergeFinalResult(eventLogByBaseline,eventLogByStaticThreshold); - }else { + if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap == null) { + finalResult = getDosEventLogByBaseline(value, destinationIp, attackType).f1; + } else if (baselineMap != null && !baselineMap.containsKey(destinationIp) && thresholdMap != null) { + finalResult = getDosEventLogByStaticThreshold(value, thresholdMap).f1; + } else if (baselineMap != null && baselineMap.containsKey(destinationIp) && thresholdMap != null) { + Tuple2 eventLogByBaseline = getDosEventLogByBaseline(value, destinationIp, attackType); + Tuple2 eventLogByStaticThreshold = getDosEventLogByStaticThreshold(value, thresholdMap); + finalResult = mergeFinalResult(eventLogByBaseline, eventLogByStaticThreshold); + } else { logger.debug("未获取到当前server IP:{} 类型 {} 静态阈值 和 baseline", destinationIp, attackType); } + } catch (Exception e) { logger.error("判定失败\n {} \n{}", value, e); } return finalResult; } - private DosEventLog mergeFinalResult(DosEventLog eventLogByBaseline,DosEventLog eventLogByStaticThreshold){ - return eventLogByStaticThreshold; + private DosEventLog mergeFinalResult(Tuple2 eventLogByBaseline, Tuple2 eventLogByStaticThreshold) { + if (eventLogByBaseline.f0.score > eventLogByStaticThreshold.f0.score) { + mergeCondition(eventLogByBaseline.f1, eventLogByStaticThreshold.f1); + logger.info("merge eventLogByBaseline {} \neventLogByStaticThreshold {}",eventLogByBaseline,eventLogByStaticThreshold); + return eventLogByBaseline.f1; + } else { + mergeCondition(eventLogByStaticThreshold.f1, eventLogByBaseline.f1); + logger.info("merge eventLogByStaticThreshold {} \neventLogByBaseline {}",eventLogByStaticThreshold,eventLogByBaseline); + return eventLogByStaticThreshold.f1; + } } - private DosEventLog getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) throws ParseException { + private void mergeCondition(DosEventLog log1, DosEventLog log2) { + if (log1 != null && log2 != null) { + String conditions1 = log1.getConditions(); + String conditions2 = log2.getConditions(); + log1.setConditions(conditions1 + " and " + conditions2); + } + } + + private Tuple2 getDosEventLogByBaseline(DosSketchLog value, String destinationIp, String attackType) throws ParseException { Tuple2, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType); Integer base = getBaseValue(floodTypeTup, value); long diff = value.getSketch_sessions() - base; - return getDosEventLog(value, base, diff); + return getDosEventLog(value, base, diff, "baseline"); } - private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, Map thresholdMap) throws ParseException { - DosEventLog result = null; + private Tuple2 getDosEventLogByStaticThreshold(DosSketchLog value, Map thresholdMap) throws ParseException { + Tuple2 result = Tuple2.of(Severity.NORMAL, null); String attackType = value.getAttack_type(); if (thresholdMap.containsKey(attackType)) { DosDetectionThreshold threshold = thresholdMap.get(attackType); long base = threshold.getSessionsPerSec(); long diff = value.getSketch_sessions() - base; - result = getDosEventLog(value, base, diff); + result = getDosEventLog(value, base, diff, "static"); } return result; } - private DosEventLog getDosEventLog(DosSketchLog value, long base, long diff) throws ParseException { + private Tuple2 getDosEventLog(DosSketchLog value, long base, long diff, String tag) throws ParseException { DosEventLog result = null; String destinationIp = value.getDestination_ip(); String attackType = value.getAttack_type(); + Severity severity = Severity.NORMAL; if (diff > 0 && base != 0) { String percent = getDiffPercent(diff, base); double diffPercentDouble = getDiffPercentDouble(percent); - Severity severity = judgeSeverity(diffPercentDouble); + severity = judgeSeverity(diffPercentDouble); if (severity != Severity.NORMAL) { - result = getResult(value, severity, percent); + result = getResult(value, severity, percent, tag); logger.info("检测到当前server IP {} 存在 {} 异常,日志详情\n {}", destinationIp, attackType, result.toString()); } else { logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString()); } } - return result; + return Tuple2.of(severity, result); } - private DosEventLog getResult(DosSketchLog value, Severity severity, String percent) { + private DosEventLog getResult(DosSketchLog value, Severity severity, String percent, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setLog_id(SnowflakeId.generateId()); dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME); dosEventLog.setAttack_type(value.getAttack_type()); - dosEventLog.setSeverity(severity.toString()); - dosEventLog.setConditions(getConditions(percent)); + dosEventLog.setSeverity(severity.severity); + dosEventLog.setConditions(getConditions(percent, value.getSketch_sessions(), tag)); dosEventLog.setDestination_ip(value.getDestination_ip()); dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip())); String ipList = value.getSource_ip(); @@ -146,8 +181,15 @@ public class DosDetection extends RichMapFunction { return base; } - private String getConditions(String percent) { - return "sessions > " + percent + " of baseline"; + private String getConditions(String percent, long sessions, String tag) { + switch (tag) { + case "baseline": + return "sessions > " + percent + " of baseline"; + case "static": + return "sessions > " + sessions + " sessions/s"; + default: + return null; + } } private String getSourceCountryList(String sourceIpList) { @@ -195,22 +237,24 @@ public class DosDetection extends RichMapFunction { /** * 判断严重程度枚举类型 */ - CRITICAL("Critical"), - SEVERE("Severe"), - MAJOR("Major"), - WARNING("Warning"), - MINOR("Minor"), - NORMAL("Normal"); + CRITICAL("Critical", 5), + SEVERE("Severe", 4), + MAJOR("Major", 3), + WARNING("Warning", 2), + MINOR("Minor", 1), + NORMAL("Normal", 0); private final String severity; + private final int score; @Override public String toString() { return this.severity; } - Severity(String severity) { + Severity(String severity, int score) { this.severity = severity; + this.score = score; } } diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java index 8560aa2..3059f78 100644 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java @@ -1,5 +1,6 @@ package com.zdjizhi.etl; +import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosSketchLog; import com.zdjizhi.source.DosSketchSource; @@ -23,6 +24,10 @@ import java.util.HashMap; public class ParseSketchLog { private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); + private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); + private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); + private static JavaType listType = jsonMapperInstance.createCollectionType(ArrayList.class, HashMap.class); + public static SingleOutputStreamOperator getSketchSource(){ return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); @@ -43,11 +48,11 @@ public class ParseSketchLog { public void flatMap(String s, Collector collector) { try { if (StringUtil.isNotBlank(s)){ - HashMap sketchSource = (HashMap) JsonMapper.fromJsonString(s, Object.class); + HashMap sketchSource = jsonMapperInstance.fromJson(s, hashmapJsonType); long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); String attackType = sketchSource.get("attack_type").toString(); - ArrayList> reportIpList = (ArrayList>) sketchSource.get("report_ip_list"); + ArrayList> reportIpList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(sketchSource.get("report_ip_list")), listType); for (HashMap obj : reportIpList) { DosSketchLog dosSketchLog = new DosSketchLog(); dosSketchLog.setSketch_start_time(sketchStartTime); diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 1a246f7..3c2d1b4 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -18,7 +18,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import java.util.Set; /** * @author wlh @@ -39,116 +38,143 @@ public class ParseStaticThreshold { /** * 获取加密密码 */ - private static String getEncryptpwd(){ + private static String getEncryptpwd() { String psw = HttpClientUtils.ERROR_MESSAGE; try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); - parms.put("password",CommonConfig.BIFANG_SERVER_PASSWORD); - HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH,parms); + parms.put("password", CommonConfig.BIFANG_SERVER_PASSWORD); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms); String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build()); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){ + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); - boolean success = (boolean)resposeMap.get("success"); - if (success){ + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); psw = data.get("encryptpwd").toString(); + } else { + logger.error(msg); } } - }catch (URISyntaxException e){ - logger.error("构造URI异常",e); - }catch (Exception e){ - logger.error("获取encryptpwd失败",e); + } catch (URISyntaxException e) { + logger.error("构造URI异常", e); + } catch (Exception e) { + logger.error("获取encryptpwd失败", e); } return psw; } /** * 登录bifang服务,获取token + * * @return token */ - private static String loginBifangServer(){ + private static String loginBifangServer() { String token = HttpClientUtils.ERROR_MESSAGE; try { - if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)){ + if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)) { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); - parms.put("username",CommonConfig.BIFANG_SERVER_USER); - parms.put("password",encryptpwd); - HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_LOGIN_PATH,parms); + parms.put("username", CommonConfig.BIFANG_SERVER_USER); + parms.put("password", encryptpwd); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_LOGIN_PATH, parms); String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){ + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); - boolean success = (boolean)resposeMap.get("success"); - if (success){ + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); token = data.get("token").toString(); + } else { + logger.error(msg); } } } - }catch (Exception e){ - logger.error("登录失败,未获取到token ",e); + } catch (Exception e) { + logger.error("登录失败,未获取到token ", e); } return token; } /** * 获取静态阈值配置列表 + * * @return thresholds */ - private static ArrayList getDosDetectionThreshold(){ + private static ArrayList getDosDetectionThreshold() { ArrayList thresholds = null; try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); - HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH,null); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, null); String token = loginBifangServer(); - if (!HttpClientUtils.ERROR_MESSAGE.equals(token)){ + if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){ + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); - boolean success = (boolean)resposeMap.get("success"); - if (success){ + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(data.get("list")), thresholdType); - logger.info("获取到静态阈值配置{}条",thresholds.size()); + logger.info("获取到静态阈值配置{}条", thresholds.size()); + } else { + logger.error(msg); } } } - }catch (Exception e){ - logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ",e); + } catch (Exception e) { + logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); } return thresholds; } /** * 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息 + * * @return threshold RangeMap */ - public static TreeRangeMap> createStaticThreshold(){ - TreeRangeMap> thresholdRangeMap = null; + static TreeRangeMap> createStaticThreshold() { + TreeRangeMap> thresholdRangeMap = TreeRangeMap.create(); try { ArrayList dosDetectionThreshold = getDosDetectionThreshold(); - if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()){ - thresholdRangeMap = TreeRangeMap.create(); - for (DosDetectionThreshold threshold:dosDetectionThreshold){ + if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()) { + for (DosDetectionThreshold threshold : dosDetectionThreshold) { + String attackType = threshold.getAttackType(); + switch (attackType) { + case "tcp_syn_flood": + threshold.setAttackType("TCP SYN Flood"); + break; + case "udp_flood": + threshold.setAttackType("UDP Flood"); + break; + case "icmp_flood": + threshold.setAttackType("ICMP Flood"); + break; + case "dns_amplification": + threshold.setAttackType("DNS Amplification"); + break; + default: + } ArrayList serverIpList = threshold.getServerIpList(); - for (String sip:serverIpList){ + for (String sip : serverIpList) { IPAddressString ipAddressString = new IPAddressString(sip); - if (ipAddressString.isIPAddress()){ + if (ipAddressString.isIPAddress()) { IPAddress address = ipAddressString.getAddress(); Map floodTypeThresholdMap = thresholdRangeMap.get(address); - if (floodTypeThresholdMap == null){ + if (floodTypeThresholdMap == null) { floodTypeThresholdMap = new HashMap<>(); } - floodTypeThresholdMap.put(threshold.getAttackType(),threshold); - thresholdRangeMap.put(Range.closed(address.getLower(),address.getUpper()),floodTypeThresholdMap); + floodTypeThresholdMap.put(threshold.getAttackType(), threshold); + thresholdRangeMap.put(Range.closed(address.getLower(), address.getUpper()), floodTypeThresholdMap); } } } } - }catch (Exception e){ - logger.error("构建threshold RangeMap失败",e); + } catch (Exception e) { + logger.error("构建threshold RangeMap失败", e); } return thresholdRangeMap; } @@ -156,11 +182,11 @@ public class ParseStaticThreshold { public static void main(String[] args) { TreeRangeMap> staticThreshold = createStaticThreshold(); Map, Map> rangeMapMap = staticThreshold.asMapOfRanges(); - for (Range range:rangeMapMap.keySet()){ + for (Range range : rangeMapMap.keySet()) { Map thresholdMap = rangeMapMap.get(range); - for (String type:thresholdMap.keySet()){ + for (String type : thresholdMap.keySet()) { DosDetectionThreshold threshold = thresholdMap.get(type); - System.out.println(range+"---"+type+"---"+threshold); + System.out.println(range + "---" + type + "---" + threshold); } } } diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/HbaseUtils.java index bf9ced5..c147e61 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/HbaseUtils.java @@ -25,7 +25,6 @@ public class HbaseUtils { private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class); private static Table table = null; private static Scan scan = null; - public static Map, Integer>>> baselineMap = new HashMap<>(); private static ArrayList floodTypeList = new ArrayList<>(); static { @@ -33,7 +32,6 @@ public class HbaseUtils { floodTypeList.add("UDP Flood"); floodTypeList.add("ICMP Flood"); floodTypeList.add("DNS Amplification"); - readFromHbase(); } private static void prepareHbaseEnv() throws IOException { @@ -54,6 +52,7 @@ public class HbaseUtils { } public static void main(String[] args) { + Map, Integer>>> baselineMap = readFromHbase(); Set keySet = baselineMap.keySet(); for (String key : keySet) { Map, Integer>> stringTuple2Map = baselineMap.get(key); @@ -66,7 +65,8 @@ public class HbaseUtils { System.out.println(baselineMap.size()); } - private static void readFromHbase() { + public static Map, Integer>>> readFromHbase() { + Map, Integer>>> baselineMap = new HashMap<>(); try { prepareHbaseEnv(); logger.info("开始读取baseline数据"); @@ -87,6 +87,7 @@ public class HbaseUtils { } catch (Exception e) { logger.error("读取hbase数据失败", e); } + return baselineMap; } private static Integer getDefaultValue(Result result, String family, String qualifier) { diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java index d358300..6a9af77 100644 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java +++ b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java @@ -32,6 +32,7 @@ import java.util.Map; /** * http client工具类 + * @author wlh */ public class HttpClientUtils { /** 全局连接池对象 */ diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index cde96d8..82cea3e 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-LOG kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #读取kafka group id -kafka.input.group.id=2108161121 +kafka.input.group.id=2108231709 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 @@ -112,4 +112,10 @@ http.pool.request.timeout=60000 http.pool.connect.timeout=60000 #服务端响应超时时间设置(单位:毫秒) -http.pool.response.timeout=60000 \ No newline at end of file +http.pool.response.timeout=60000 + +#获取静态阈值周期,默认十分钟 +static.threshold.schedule.minutes=10 + +#获取baseline周期,默认7天 +baseline.threshold.schedule.days=7 \ No newline at end of file