From 1fcdb79739c396727207becb24358221edcc94f7 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Tue, 17 May 2022 13:54:43 +0800 Subject: [PATCH 1/3] =?UTF-8?q?DoS=E4=BA=8B=E4=BB=B6=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E5=AF=B9Conditions=E5=9F=BA=E4=BA=8E=E9=80=9F=E7=8E=87?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E5=B1=9E=E6=80=A7=E5=80=BC=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zdjizhi/etl/DosDetection.java | 2 +- .../zdjizhi/utils/FlinkEnvironmentUtils.java | 29 +++++++++++++++++++ .../java/com/zdjizhi/utils/KafkaUtils.java | 4 ++- src/main/resources/common.properties | 14 ++++----- 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 1f5568d..af5bafb 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -193,7 +193,7 @@ public class DosDetection extends RichMapFunction { switch (type) { case STATIC_CONDITION_TYPE: return new StrBuilder() - .append(tag).append(" > ") + .append("Rate > ") .append(base).append(" ") .append(tag).append("/s") .toString(); diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java index e34ce28..cd628c5 100644 --- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java +++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java @@ -1,6 +1,8 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils { static { streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + + /* + // 每 1000ms 开始一次 checkpoint + streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000); + + // 设置模式为精确一次 (这是默认值) + streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + + // 确认 checkpoints 之间的时间会进行 500 ms + streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + + // Checkpoint 必须在一分钟内完成,否则就会被抛弃 + streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000); + + // 允许两个连续的 checkpoint 错误 + streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); + + // 同一时间只允许一个 checkpoint 进行 + streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 + streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + + // 开启实验性的 unaligned checkpoints + streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints(); + */ } } diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java index 6e6167a..b0312a5 100644 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java @@ -22,12 +22,14 @@ public class KafkaUtils { } public static FlinkKafkaProducer getKafkaSink(String topic){ - return new FlinkKafkaProducer( + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( topic, new SimpleStringSchema(), getKafkaSinkProperty(), Optional.empty() ); + kafkaProducer.setLogFailuresOnly(true); + return kafkaProducer; } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 1331475..92d0520 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-RECORD kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #读取kafka group id -kafka.input.group.id=2112080949 +kafka.input.group.id=dos-detection-job-220516-1 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 @@ -37,8 +37,8 @@ kafka.output.bootstrap.servers=192.168.44.12:9094 #kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #zookeeper地址 -hbase.zookeeper.quorum=192.168.44.12:2181 -#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 +#hbase.zookeeper.quorum=192.168.44.12:2181 +hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 #hbase客户端处理时间 hbase.client.operation.timeout=30000 @@ -74,12 +74,12 @@ destination.ip.partition.num=10000 data.center.id.num=15 #IP mmdb库路径 -ip.mmdb.path=D:\\data\\dat\\ +ip.mmdb.path=D:\\data\\dat\\bak\\ #ip.mmdb.path=/home/bigdata/topology/dat/ #ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/ #静态敏感阈值,速率小于此值不报警 -static.sensitivity.threshold=500 +static.sensitivity.threshold=10 #基线敏感阈值 baseline.sensitivity.threshold=0.2 @@ -92,8 +92,8 @@ baseline.sessions.severe.threshold=5 baseline.sessions.critical.threshold=8 #bifang服务访问地址 -#bifang.server.uri=http://192.168.44.72:80 -bifang.server.uri=http://192.168.44.3:80 +bifang.server.uri=http://192.168.44.72:80 +#bifang.server.uri=http://192.168.44.3:80 #访问bifang只读权限token,bifang内置,无需修改 bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867 From 3dc29a07befd0383b286b729606bdc74b7f07e29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E9=B9=8F=E9=A3=9E?= Date: Tue, 31 May 2022 18:00:36 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=A0=B9=E6=8D=AEvsysId?= =?UTF-8?q?=E4=BB=8Ebifang=E8=8E=B7=E5=8F=96=E9=9D=99=E6=80=81=E9=98=88?= =?UTF-8?q?=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zdjizhi/common/CommonConfig.java | 2 + .../java/com/zdjizhi/common/DosVsysId.java | 22 ++++++ .../com/zdjizhi/etl/ParseStaticThreshold.java | 77 ++++++++++++++++--- src/main/resources/common.properties | 15 ++-- 4 files changed, 98 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/zdjizhi/common/DosVsysId.java diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 325bc29..846a2f5 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -63,6 +63,8 @@ public class CommonConfig { public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path"); public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path"); + public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path"); + public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection"); public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route"); public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout"); diff --git a/src/main/java/com/zdjizhi/common/DosVsysId.java b/src/main/java/com/zdjizhi/common/DosVsysId.java new file mode 100644 index 0000000..27c0eaf --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DosVsysId.java @@ -0,0 +1,22 @@ +package com.zdjizhi.common; + +import java.util.Objects; + +public class DosVsysId { + private int vsysId; + + public int getVsysId() { + return vsysId; + } + + public void setVsysId(int vsysId) { + this.vsysId = vsysId; + } + + @Override + public String toString() { + return "DosVsysId{" + + "vsysId=" + vsysId + + '}'; + } +} diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 77ff8e9..8bb5ff3 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -3,6 +3,7 @@ package com.zdjizhi.etl; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosDetectionThreshold; +import com.zdjizhi.common.DosVsysId; import com.zdjizhi.utils.HttpClientUtils; import com.zdjizhi.utils.JsonMapper; import inet.ipaddr.IPAddress; @@ -29,6 +30,7 @@ public class ParseStaticThreshold { private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class); + private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class); static { //加载加密登录密码 @@ -99,19 +101,18 @@ public class ParseStaticThreshold { } /** - * 获取静态阈值配置列表 + * 获取vsysId配置列表,只需要拿到id的集合,输出出去即可 * - * @return thresholds + * @return vsysIdList */ - private static ArrayList getDosDetectionThreshold() { - ArrayList thresholds = null; + private static ArrayList getVsysId() { + ArrayList vsysIdList = null; try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); parms.put("pageSize", -1); - parms.put("orderBy", "profileId asc"); - parms.put("isValid", 1); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); + parms.put("orderBy", "vsysId desc"); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms); String token = CommonConfig.BIFANG_SERVER_TOKEN; if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); @@ -125,19 +126,69 @@ public class ParseStaticThreshold { HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); Object list = data.get("list"); if (list != null) { - thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); - logger.info("获取到静态阈值配置{}条", thresholds.size()); + vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType); + logger.info("获取到vsysId{}条", vsysIdList.size()); } else { - logger.warn("静态阈值配置为空"); + logger.warn("vsysIdList为空"); } } else { logger.error(msg); } } } + } catch (Exception e) { + logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e); + } + return vsysIdList; + } + + /** + * 获取静态阈值配置列表,遍历id的集合,并将每个id放入请求体中 + * + * @return thresholds + */ + private static ArrayList getDosDetectionThreshold() { + ArrayList thresholds = null; + ArrayList vsysId = getVsysId(); + try { + if (vsysId != null){ + for (DosVsysId dosVsysId : vsysId) { + URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + HashMap parms = new HashMap<>(); + parms.put("pageSize", -1); + parms.put("orderBy", "profileId asc"); + parms.put("isValid", 1); + parms.put("vsysId", dosVsysId.getVsysId()); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); + String token = CommonConfig.BIFANG_SERVER_TOKEN; + if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { + BasicHeader authorization = new BasicHeader("Authorization", token); + BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); + Object list = data.get("list"); + if (list != null) { + thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); + logger.info("获取到静态阈值配置{}条", thresholds.size()); + } else { + logger.warn("静态阈值配置为空"); + } + } else { + logger.error(msg); + } + } + } + } + } } catch (Exception e) { logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); } + return thresholds; } @@ -196,7 +247,8 @@ public class ParseStaticThreshold { } public static void main(String[] args) { - +// ArrayList vsysId = getVsysId(); +// vsysId.forEach(System.out::println); ArrayList dosDetectionThreshold = getDosDetectionThreshold(); dosDetectionThreshold.forEach(System.out::println); @@ -214,7 +266,8 @@ public class ParseStaticThreshold { } System.out.println("------------------------"); } - +// String s = loginBifangServer(); +// System.out.println(s); } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 92d0520..5939a6c 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -22,15 +22,15 @@ kafka.input.group.id=dos-detection-job-220516-1 kafka.output.metric.parallelism=1 #发送kafka metrics topic名 -#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS -kafka.output.metric.topic.name=test +kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS +#kafka.output.metric.topic.name=test #发送kafka event并行度大小 kafka.output.event.parallelism=1 #发送kafka event topic名 -#kafka.output.event.topic.name=DOS-EVENT -kafka.output.event.topic.name=storm-dos-test +kafka.output.event.topic.name=DOS-EVENT +#kafka.output.event.topic.name=storm-dos-test #kafka输出地址 kafka.output.bootstrap.servers=192.168.44.12:9094 @@ -92,8 +92,8 @@ baseline.sessions.severe.threshold=5 baseline.sessions.critical.threshold=8 #bifang服务访问地址 -bifang.server.uri=http://192.168.44.72:80 -#bifang.server.uri=http://192.168.44.3:80 +#bifang.server.uri=http://192.168.44.72:80 +bifang.server.uri=http://192.168.44.3:80 #访问bifang只读权限token,bifang内置,无需修改 bifang.server.token=ed04b942-7df4-4e3d-b9a9-a881ca98a867 @@ -104,6 +104,9 @@ bifang.server.encryptpwd.path=/v1/user/encryptpwd #登录bifang服务路径信息 bifang.server.login.path=/v1/user/login +#获取vaysId路径信息 +bifang.server.policy.vaysid.path=/v1/system/vsys/ + #获取静态阈值路径信息 bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold From 2d98c3b6e688c31aff7612e5d7e632763932d4b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E9=B9=8F=E9=A3=9E?= Date: Mon, 13 Jun 2022 18:00:57 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E9=9D=99=E6=80=81=E9=98=88=E5=80=BC=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E9=BB=98=E8=AE=A4=E6=8C=87=E5=AE=9AvsysID?= =?UTF-8?q?=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/zdjizhi/etl/ParseStaticThreshold.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 8bb5ff3..cfaea86 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -101,7 +101,7 @@ public class ParseStaticThreshold { } /** - * 获取vsysId配置列表,只需要拿到id的集合,输出出去即可 + * 获取vsysId配置列表 * * @return vsysIdList */ @@ -143,22 +143,23 @@ public class ParseStaticThreshold { } /** - * 获取静态阈值配置列表,遍历id的集合,并将每个id放入请求体中 + * 根据vsysId获取静态阈值配置列表 * * @return thresholds */ private static ArrayList getDosDetectionThreshold() { ArrayList thresholds = null; - ArrayList vsysId = getVsysId(); +// ArrayList vsysId = getVsysId(); try { - if (vsysId != null){ - for (DosVsysId dosVsysId : vsysId) { +// if (vsysId != null){ +// for (DosVsysId dosVsysId : vsysId) { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); parms.put("pageSize", -1); parms.put("orderBy", "profileId asc"); parms.put("isValid", 1); - parms.put("vsysId", dosVsysId.getVsysId()); +// parms.put("vsysId", dosVsysId.getVsysId()); + parms.put("vsysId", 1); HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); String token = CommonConfig.BIFANG_SERVER_TOKEN; if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { @@ -183,8 +184,8 @@ public class ParseStaticThreshold { } } } - } - } +// } +// } } catch (Exception e) { logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); } @@ -247,8 +248,6 @@ public class ParseStaticThreshold { } public static void main(String[] args) { -// ArrayList vsysId = getVsysId(); -// vsysId.forEach(System.out::println); ArrayList dosDetectionThreshold = getDosDetectionThreshold(); dosDetectionThreshold.forEach(System.out::println);