diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index c47361b..dd6a6f8 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -64,6 +64,8 @@ public class CommonConfig { public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path"); public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path"); + public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path"); + public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection"); public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route"); public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout"); diff --git a/src/main/java/com/zdjizhi/common/DosVsysId.java b/src/main/java/com/zdjizhi/common/DosVsysId.java new file mode 100644 index 0000000..27c0eaf --- /dev/null +++ b/src/main/java/com/zdjizhi/common/DosVsysId.java @@ -0,0 +1,22 @@ +package com.zdjizhi.common; + +import java.util.Objects; + +public class DosVsysId { + private int vsysId; + + public int getVsysId() { + return vsysId; + } + + public void setVsysId(int vsysId) { + this.vsysId = vsysId; + } + + @Override + public String toString() { + return "DosVsysId{" + + "vsysId=" + vsysId + + '}'; + } +} diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java index 1f5568d..af5bafb 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/etl/DosDetection.java @@ -193,7 +193,7 @@ public class DosDetection extends RichMapFunction { switch (type) { case STATIC_CONDITION_TYPE: return new StrBuilder() - .append(tag).append(" > ") + .append("Rate > ") .append(base).append(" ") .append(tag).append("/s") .toString(); diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java index 77ff8e9..cfaea86 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java @@ -3,6 +3,7 @@ package com.zdjizhi.etl; import com.fasterxml.jackson.databind.JavaType; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosDetectionThreshold; +import com.zdjizhi.common.DosVsysId; import com.zdjizhi.utils.HttpClientUtils; import com.zdjizhi.utils.JsonMapper; import inet.ipaddr.IPAddress; @@ -29,6 +30,7 @@ public class ParseStaticThreshold { private static JsonMapper jsonMapperInstance = JsonMapper.getInstance(); private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class); private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class); + private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class); static { //加载加密登录密码 @@ -99,19 +101,18 @@ public class ParseStaticThreshold { } /** - * 获取静态阈值配置列表 + * 获取vsysId配置列表 * - * @return thresholds + * @return vsysIdList */ - private static ArrayList getDosDetectionThreshold() { - ArrayList thresholds = null; + private static ArrayList getVsysId() { + ArrayList vsysIdList = null; try { URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); HashMap parms = new HashMap<>(); parms.put("pageSize", -1); - parms.put("orderBy", "profileId asc"); - parms.put("isValid", 1); - HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); + parms.put("orderBy", "vsysId desc"); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms); String token = CommonConfig.BIFANG_SERVER_TOKEN; if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); @@ -125,19 +126,70 @@ public class ParseStaticThreshold { HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); Object list = data.get("list"); if (list != null) { - thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); - logger.info("获取到静态阈值配置{}条", thresholds.size()); + vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType); + logger.info("获取到vsysId{}条", vsysIdList.size()); } else { - logger.warn("静态阈值配置为空"); + logger.warn("vsysIdList为空"); } } else { logger.error(msg); } } } + } catch (Exception e) { + logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e); + } + return vsysIdList; + } + + /** + * 根据vsysId获取静态阈值配置列表 + * + * @return thresholds + */ + private static ArrayList getDosDetectionThreshold() { + ArrayList thresholds = null; +// ArrayList vsysId = getVsysId(); + try { +// if (vsysId != null){ +// for (DosVsysId dosVsysId : vsysId) { + URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI); + HashMap parms = new HashMap<>(); + parms.put("pageSize", -1); + parms.put("orderBy", "profileId asc"); + parms.put("isValid", 1); +// parms.put("vsysId", dosVsysId.getVsysId()); + parms.put("vsysId", 1); + HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); + String token = CommonConfig.BIFANG_SERVER_TOKEN; + if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { + BasicHeader authorization = new BasicHeader("Authorization", token); + BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); + String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); + if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + HashMap resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType); + boolean success = (boolean) resposeMap.get("success"); + String msg = resposeMap.get("msg").toString(); + if (success) { + HashMap data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType); + Object list = data.get("list"); + if (list != null) { + thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType); + logger.info("获取到静态阈值配置{}条", thresholds.size()); + } else { + logger.warn("静态阈值配置为空"); + } + } else { + logger.error(msg); + } + } + } +// } +// } } catch (Exception e) { logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e); } + return thresholds; } @@ -196,7 +248,6 @@ public class ParseStaticThreshold { } public static void main(String[] args) { - ArrayList dosDetectionThreshold = getDosDetectionThreshold(); dosDetectionThreshold.forEach(System.out::println); @@ -214,7 +265,8 @@ public class ParseStaticThreshold { } System.out.println("------------------------"); } - +// String s = loginBifangServer(); +// System.out.println(s); } diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java index e34ce28..cd628c5 100644 --- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java +++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java @@ -1,6 +1,8 @@ package com.zdjizhi.utils; import com.zdjizhi.common.CommonConfig; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils { static { streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); + + /* + // 每 1000ms 开始一次 checkpoint + streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000); + + // 设置模式为精确一次 (这是默认值) + streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + + // 确认 checkpoints 之间的时间会进行 500 ms + streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + + // Checkpoint 必须在一分钟内完成,否则就会被抛弃 + streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000); + + // 允许两个连续的 checkpoint 错误 + streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); + + // 同一时间只允许一个 checkpoint 进行 + streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 + streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + + // 开启实验性的 unaligned checkpoints + streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints(); + */ } } diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java index 6e6167a..b0312a5 100644 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java @@ -22,12 +22,14 @@ public class KafkaUtils { } public static FlinkKafkaProducer getKafkaSink(String topic){ - return new FlinkKafkaProducer( + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( topic, new SimpleStringSchema(), getKafkaSinkProperty(), Optional.empty() ); + kafkaProducer.setLogFailuresOnly(true); + return kafkaProducer; } } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 16d0fec..7237611 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -15,23 +15,25 @@ kafka.input.topic.name=DOS-SKETCH-RECORD kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #读取kafka group id -kafka.input.group.id=2203241552 +kafka.input.group.id=2112080949 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 kafka.output.metric.parallelism=1 #发送kafka metrics topic名 -kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS +#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS +kafka.output.metric.topic.name=test #发送kafka event并行度大小 kafka.output.event.parallelism=1 #发送kafka event topic名 -kafka.output.event.topic.name=DOS-EVENT +#kafka.output.event.topic.name=DOS-EVENT +kafka.output.event.topic.name=storm-dos-test #kafka输出地址 -kafka.output.bootstrap.servers=192.168.40.223:9094 +kafka.output.bootstrap.servers=192.168.44.12:9094 #kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 #zookeeper地址 @@ -102,6 +104,9 @@ bifang.server.encryptpwd.path=/v1/user/encryptpwd #登录bifang服务路径信息 bifang.server.login.path=/v1/user/login +#获取vaysId路径信息 +bifang.server.policy.vaysid.path=/v1/system/vsys/ + #获取静态阈值路径信息 bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold @@ -130,6 +135,7 @@ baseline.threshold.schedule.days=1 #kafka用户认证配置参数 sasl.jaas.config.user=admin #sasl.jaas.config.password=galaxy2019 +#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ) sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ #是否开启kafka用户认证配置,1:是;0:否