From 77ec061844374f7e1929b1224ae858665deb4bb5 Mon Sep 17 00:00:00 2001 From: wangchengcheng Date: Fri, 12 Jan 2024 18:42:21 +0800 Subject: [PATCH] fix:refactor the project and optimize the configuration loading method --- pom.xml | 2 +- .../com/zdjizhi/common/FlowWriteConfig.java | 105 ------- .../java/com/zdjizhi/conf/DosConfigs.java | 259 +++++++++++++++++ .../com/zdjizhi/conf/DosConfiguration.java | 36 +++ .../com/zdjizhi/etl/EtlProcessFunction.java | 108 ------- .../java/com/zdjizhi/etl/ParseSketchLog.java | 84 ------ .../zdjizhi/etl/TrafficServerIpMetrics.java | 36 --- .../DosDetectionFunction.java} | 141 +++++---- .../zdjizhi/function/EtlProcessFunction.java | 141 +++++++++ .../zdjizhi/function/FlatSketchFunction.java | 53 ++++ .../zdjizhi/function/SketchKeysSelector.java | 15 + .../zdjizhi/main/DosDetectionApplication.java | 79 ++++- .../java/com/zdjizhi/sink/DosEventSink.java | 23 -- .../com/zdjizhi/sink/OutputStreamSink.java | 65 ----- .../sink/TrafficServerIpMetricsSink.java | 25 -- .../com/zdjizhi/source/DosSketchSource.java | 37 --- .../zdjizhi/utils/CommonConfigurations.java | 45 --- .../com/zdjizhi/utils/DistributedLock.java | 200 ------------- .../zdjizhi/utils/FlinkEnvironmentUtils.java | 44 --- .../com/zdjizhi/utils/HttpClientUtils.java | 274 ------------------ .../java/com/zdjizhi/utils/KafkaUtils.java | 35 --- .../utils/{ => Snowflakeld}/SnowflakeId.java | 67 ++--- .../Threshold}/ParseBaselineThreshold.java | 48 +-- .../Threshold}/ParseStaticThreshold.java | 71 ++--- .../com/zdjizhi/utils/ZookeeperUtils.java | 136 --------- .../{ => connections/hbase}/HbaseUtils.java | 2 +- .../connections/http/HttpClientService.java | 107 ++++++- .../connections/kafka/KafkaConsumer.java | 17 ++ .../connections/kafka/KafkaProducer.java | 23 ++ .../utils/exception/FlowWriteException.java | 2 - .../{ => knowledgebase}/IpLookupUtils.java | 66 ++--- src/main/resources/common.properties | 149 ---------- .../resources/detection_dos_attack.properties | 44 +++ src/test/java/com/zdjizhi/Http/HttpTest.java | 50 ---- .../java/com/zdjizhi/common/HbaseTest.java | 12 +- .../java/com/zdjizhi/common/NacosTest.java | 2 +- .../com/zdjizhi/etl/DosDetectionTest.java | 10 +- 37 files changed, 978 insertions(+), 1635 deletions(-) delete mode 100644 src/main/java/com/zdjizhi/common/FlowWriteConfig.java create mode 100644 src/main/java/com/zdjizhi/conf/DosConfigs.java create mode 100644 src/main/java/com/zdjizhi/conf/DosConfiguration.java delete mode 100644 src/main/java/com/zdjizhi/etl/EtlProcessFunction.java delete mode 100644 src/main/java/com/zdjizhi/etl/ParseSketchLog.java delete mode 100644 src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java rename src/main/java/com/zdjizhi/{etl/DosDetection.java => function/DosDetectionFunction.java} (71%) create mode 100644 src/main/java/com/zdjizhi/function/EtlProcessFunction.java create mode 100644 src/main/java/com/zdjizhi/function/FlatSketchFunction.java create mode 100644 src/main/java/com/zdjizhi/function/SketchKeysSelector.java delete mode 100644 src/main/java/com/zdjizhi/sink/DosEventSink.java delete mode 100644 src/main/java/com/zdjizhi/sink/OutputStreamSink.java delete mode 100644 src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java delete mode 100644 src/main/java/com/zdjizhi/source/DosSketchSource.java delete mode 100644 src/main/java/com/zdjizhi/utils/CommonConfigurations.java delete mode 100644 src/main/java/com/zdjizhi/utils/DistributedLock.java delete mode 100644 src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java delete mode 100644 src/main/java/com/zdjizhi/utils/HttpClientUtils.java delete mode 100644 src/main/java/com/zdjizhi/utils/KafkaUtils.java rename src/main/java/com/zdjizhi/utils/{ => Snowflakeld}/SnowflakeId.java (71%) rename src/main/java/com/zdjizhi/{etl => utils/Threshold}/ParseBaselineThreshold.java (69%) rename src/main/java/com/zdjizhi/{etl => utils/Threshold}/ParseStaticThreshold.java (82%) delete mode 100644 src/main/java/com/zdjizhi/utils/ZookeeperUtils.java rename src/main/java/com/zdjizhi/utils/{ => connections/hbase}/HbaseUtils.java (97%) create mode 100644 src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java create mode 100644 src/main/java/com/zdjizhi/utils/connections/kafka/KafkaProducer.java rename src/main/java/com/zdjizhi/utils/{ => knowledgebase}/IpLookupUtils.java (77%) delete mode 100644 src/main/resources/common.properties create mode 100644 src/main/resources/detection_dos_attack.properties delete mode 100644 src/test/java/com/zdjizhi/Http/HttpTest.java diff --git a/pom.xml b/pom.xml index 17e482f..6d32e6b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi flink-dos-detection - 23.12 + 24.01 flink-dos-detection http://www.example.com diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java deleted file mode 100644 index 1a88d40..0000000 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ /dev/null @@ -1,105 +0,0 @@ -package com.zdjizhi.common; - -import com.zdjizhi.utils.CommonConfigurations; -import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; - -/** - * @author wlh - * @date 2021/1/6 - */ -public class FlowWriteConfig { - - /** - * 定位库默认分隔符 - */ - - private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - - static { - encryptor.setPassword("galaxy"); - } - - public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = CommonConfigurations.getIntProperty("stream.execution.environment.parallelism"); - public static final String STREAM_EXECUTION_JOB_NAME = CommonConfigurations.getStringProperty("stream.execution.job.name"); - - public static final int KAFKA_INPUT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.input.parallelism"); - public static final String KAFKA_INPUT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.input.topic.name"); - public static final String KAFKA_INPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.input.bootstrap.servers"); - public static final String KAFKA_GROUP_ID = CommonConfigurations.getStringProperty("kafka.input.group.id"); - - public static final int KAFKA_OUTPUT_METRIC_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.metric.parallelism"); - public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.metric.topic.name"); - public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.event.parallelism"); - public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name"); - public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers"); - - public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum"); - public static final int HBASE_CLIENT_OPERATION_TIMEOUT = CommonConfigurations.getIntProperty("hbase.client.operation.timeout"); - public static final int HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = CommonConfigurations.getIntProperty("hbase.client.scanner.timeout.period"); - - public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name"); - public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num"); - public static final int HBASE_BASELINE_TTL = CommonConfigurations.getIntProperty("hbase.baseline.ttl"); - - public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism"); - public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism"); - public static final int FLINK_WATERMARK_MAX_ORDERNESS = CommonConfigurations.getIntProperty("flink.watermark.max.orderness"); - public static final int FLINK_WINDOW_MAX_TIME = CommonConfigurations.getIntProperty("flink.window.max.time"); - - public static final int SOURCE_IP_LIST_LIMIT = CommonConfigurations.getIntProperty("source.ip.list.limit"); - public static final int DESTINATION_IP_PARTITION_NUM = CommonConfigurations.getIntProperty("destination.ip.partition.num"); - public static final int DATA_CENTER_ID_NUM = CommonConfigurations.getIntProperty("data.center.id.num"); - - - public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri"); - public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path"); - public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path"); - public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path"); - - public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path"); - - public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection"); - public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route"); - public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout"); - public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout"); - public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout"); - - public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes"); - public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days"); - - public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user"); - public static final String SASL_JAAS_CONFIG_PASSWORD = encryptor.decrypt(CommonConfigurations.getStringProperty("sasl.jaas.config.password")); - - public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag"); - - public static final Integer HTTP_SOCKET_TIMEOUT = CommonConfigurations.getIntProperty("http.socket.timeout"); - - public static final Long KNOWLEDGE_EXECUTION_INTERVAL = CommonConfigurations.getLongProperty("knowledge.execution.interval"); - - - public static final String KNOWLEDGE_BASE_URL = CommonConfigurations.getStringProperty("knowledge.base.uri"); - public static final String KNOWLEDGE_BASE_PATH = CommonConfigurations.getStringProperty("knowledge.base.path"); - public static final String IP_USER_DEFINED_KD_ID = CommonConfigurations.getStringProperty("ip.user.defined.kd.id"); - public static final String IP_BUILTIN_KD_ID = CommonConfigurations.getStringProperty("ip.builtin.kd.id"); - - - public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token"); - - - public static final Integer STATIC_SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("static.sensitivity.threshold"); - public static final Double BASELINE_SENSITIVITY_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sensitivity.threshold"); - - public static final Double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold"); - - - public static final Double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold"); - - public static final Double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold"); - - public static final Double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold"); - public static final Double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold"); - - -} - - diff --git a/src/main/java/com/zdjizhi/conf/DosConfigs.java b/src/main/java/com/zdjizhi/conf/DosConfigs.java new file mode 100644 index 0000000..03edd60 --- /dev/null +++ b/src/main/java/com/zdjizhi/conf/DosConfigs.java @@ -0,0 +1,259 @@ +package com.zdjizhi.conf; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class DosConfigs { + + + /** + * The prefix for Kafka properties used in the source. + */ + public static final String SOURCE_KAFKA_PROPERTIES_PREFIX = "source.kafka.props."; + + /** + * The prefix for Kafka properties used in the sink. + */ + public static final String SINK_KAFKA_PROPERTIES_PREFIX = "sink.kafka.props."; + + /** + * Configuration option for the Kafka topic used in the source. + */ + public static final ConfigOption SOURCE_KAFKA_TOPIC = + ConfigOptions.key("source.kafka.topic") + .stringType() + .noDefaultValue(); + + public static final ConfigOption FLINK_WINDOW_MAX_TIME = + ConfigOptions.key("flink.window.max.time") + .longType() + .noDefaultValue(); + + public static final ConfigOption FLINK_WATERMARK_MAX_ORDERNESS = + ConfigOptions.key("flink.watermark.max.orderness") + .longType() + .noDefaultValue(); + /** + * Configuration option for the Kafka topic used in the sink. + */ + public static final ConfigOption KAFKA_SINK_EVENT_TOPIC = + ConfigOptions.key("kafka.sink.event.topic.name") + .stringType() + .noDefaultValue(); + + public static final ConfigOption KAFKA_SINK_METRIC_TOPIC = + ConfigOptions.key("kafka.sink.metric.topic") + .stringType() + .noDefaultValue(); + + + public static final ConfigOption HBASE_ZOOKEEPER_QUORUM = + ConfigOptions.key("hbase.zookeeper.quorum") + .stringType() + .noDefaultValue(); + + public static final ConfigOption BIFANG_SERVER_URI = + ConfigOptions.key("bifang.server.uri") + .stringType() + .noDefaultValue(); + + public static final ConfigOption KNOWLEDGE_BASE_URL = + ConfigOptions.key("knowledge.base.uri") + .stringType() + .noDefaultValue(); + + + //==============================The following variables have default values===================================== + /** + * Configuration option for the source parallelism used in the source. + */ + public static final ConfigOption SOURCE_PARALLELISM = + ConfigOptions.key("source.parallelism") + .intType() + .defaultValue(1); + + public static final ConfigOption Flink_FIRST_AGG_PATALLELISM = + ConfigOptions.key("flink.first.agg.parallelism") + .intType() + .defaultValue(1); + + public static final ConfigOption FLINK_DETECTION_MAP_PARALLELISM = + ConfigOptions.key("flink.detection.map.parallelism") + .intType() + .defaultValue(1); + + public static final ConfigOption KAFKA_SINK_EVENT_PARALLELISM = + ConfigOptions.key("kafka.sink.event.parallelism") + .intType() + .defaultValue(1); + + public static final ConfigOption KAFKA_SINK_METRIC_PARALLELISM = + ConfigOptions.key("kafka.sink.metric.parallelism") + .intType() + .defaultValue(1); + + public static final ConfigOption IP_BUILTIN_KD_ID = + ConfigOptions.key("ip.builtin.kd.id") + .stringType() + .defaultValue("64af7077-eb9b-4b8f-80cf-2ceebc89bea9"); + + public static final ConfigOption IP_USER_DEFINED_KD_ID = + ConfigOptions.key("ip.user.defined.kd.id") + .stringType() + .defaultValue("004390bc-3135-4a6f-a492-3662ecb9e289"); + + public static final ConfigOption HTTP_SOCKET_TIMEOUT = + ConfigOptions.key("http.socket.timeout") + .intType() + .defaultValue(90000); + + + public static final ConfigOption KNOWLEDGE_BASE_PATH = + ConfigOptions.key("knowledge.base.path") + .stringType() + .defaultValue("/v1/knowledge_base"); + + + public static final ConfigOption STATIC_THRESHOLD_SCHEDULE_MINUTES = + ConfigOptions.key("static.threshold.schedule.minutes") + .intType() + .defaultValue(10); + + + public static final ConfigOption BASELINE_THRESHOLD_SCHEDULE_DAYS = + ConfigOptions.key("baseline.threshold.schedule.days") + .intType() + .defaultValue(7); + + public static final ConfigOption STATIC_SENSITIVITY_THRESHOLD = + ConfigOptions.key("static.sensitivity.threshold") + .intType() + .defaultValue(1); + + public static final ConfigOption BASELINE_SENSITIVITY_THRESHOLD = + ConfigOptions.key("baseline.sensitivity.threshold") + .doubleType() + .defaultValue(0.2); + + public static final ConfigOption BASELINE_SESSIONS_MINOR_THRESHOLD = + ConfigOptions.key("baseline.sessions.minor.threshold") + .doubleType() + .defaultValue(0.2); + + public static final ConfigOption BASELINE_SESSIONS_WARNING_THRESHOLD = + ConfigOptions.key("baseline.sessions.warning.threshold") + .doubleType() + .defaultValue(1.0); + + public static final ConfigOption BASELINE_SESSIONS_MAJOR_THRESHOLD = + ConfigOptions.key("baseline.sessions.major.threshold") + .doubleType() + .defaultValue(2.5); + + public static final ConfigOption BASELINE_SESSIONS_SEVERE_THRESHOLD = + ConfigOptions.key("baseline.sessions.severe.threshold") + .doubleType() + .defaultValue(5.0); + + public static final ConfigOption BASELINE_SESSIONS_CRITICAL_THRESHOLD = + ConfigOptions.key("baseline.sessions.critical.threshold") + .doubleType() + .defaultValue(8.0); + + + public static final ConfigOption BIFANG_SERVER_ENCRYPTPWD_PATH = + ConfigOptions.key("bifang.server.encryptpwd.path") + .stringType() + .defaultValue("/v1/user/encryptpwd"); + + public static final ConfigOption BIFANG_SERVER_POLICY_VSYSID_PATH = + ConfigOptions.key("bifang.server.policy.vaysid.path") + .stringType() + .defaultValue("/v1/admin/vsys"); + + public static final ConfigOption BIFANG_SERVER_TOKEN = + ConfigOptions.key("bifang.server.token") + .stringType() + .defaultValue("aa2bdec5518ad131f71944b13ce5c298&1&"); + + public static final ConfigOption BIFANG_SERVER_POLICY_THRESHOLD_PATH = + ConfigOptions.key("bifang.server.policy.threshold.path") + .stringType() + .defaultValue("/v1/policy/profile/dos_detection"); + + public static final ConfigOption BIFANG_SERVER_LOGIN_PATH = + ConfigOptions.key("bifang.server.login.path") + .stringType() + .defaultValue("/v1/user/login"); + + public static final ConfigOption HTTP_POOL_MAX_CONNECTION = + ConfigOptions.key("http.pool.max.connection") + .intType() + .defaultValue(400); + + public static final ConfigOption HTTP_POOL_MAX_PER_ROUTE = + ConfigOptions.key("http.pool.max.per.route") + .intType() + .defaultValue(80); + + public static final ConfigOption HTTP_POOL_REQUEST_TIMEOUT = + ConfigOptions.key("http.pool.request.timeout") + .intType() + .defaultValue(60000); + + public static final ConfigOption HTTP_POOL_CONNECT_TIMEOUT = + ConfigOptions.key("http.pool.connect.timeout") + .intType() + .defaultValue(60000); + + public static final ConfigOption DATA_CENTER_ID_NUM = + ConfigOptions.key("data.center.id.num") + .intType() + .defaultValue(15); + + public static final ConfigOption HBASE_CLIENT_OPERATION_TIMEOUT = + ConfigOptions.key("hbase.client.operation.timeout") + .intType() + .defaultValue(30000); + + public static final ConfigOption HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = + ConfigOptions.key("hbase.client.scanner.timeout.period") + .intType() + .defaultValue(30000); + + public static final ConfigOption HBASE_BASELINE_TABLE_NAME = + ConfigOptions.key("hbase.baseline.table.name") + .stringType() + .defaultValue("dos:ddos_traffic_baselines"); + + public static final ConfigOption HBASE_BASELINE_TTL = + ConfigOptions.key("hbase.baseline.ttl") + .intType() + .defaultValue(10); + + + public static final ConfigOption HBASE_BASELINE_TOTAL_NUM = + ConfigOptions.key("hbase.baseline.total.num") + .intType() + .defaultValue(1000000); + + public static final ConfigOption DESTINATION_IP_PARTITION_NUM = + ConfigOptions.key("destination.ip.partition.num") + .intType() + .defaultValue(10000); + + public static final ConfigOption SOURCE_IP_LIST_LIMIT = + ConfigOptions.key("source.ip.list.limit") + .intType() + .defaultValue(10000); + + /** + * Knowledge base scheduling cycle, in minutes + */ + public static final ConfigOption KNOWLEDGE_BASE_SCHEDULE_MINUTES = + ConfigOptions.key("knowledge.base.schedule.minutes") + .longType() + .defaultValue(60L); + + +} \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/conf/DosConfiguration.java b/src/main/java/com/zdjizhi/conf/DosConfiguration.java new file mode 100644 index 0000000..2971d54 --- /dev/null +++ b/src/main/java/com/zdjizhi/conf/DosConfiguration.java @@ -0,0 +1,36 @@ +package com.zdjizhi.conf; + +import org.apache.flink.configuration.Configuration; + +import java.util.Properties; + + +public class DosConfiguration { + private final Configuration config; + + public DosConfiguration(final Configuration config) { + this.config = config; + } + + /** + * Retrieves properties from the underlying `Configuration` instance that start with the specified + * `prefix`. The properties are then converted into a `java.util.Properties` object and returned. + * + * @param prefix The prefix to filter properties. + * @return A `java.util.Properties` object containing the properties with the specified prefix. + */ + public Properties getProperties(final String prefix) { + if (prefix == null) { + final Properties props = new Properties(); + props.putAll(config.toMap()); + return props; + } + return config.toMap() + .entrySet() + .stream() + .filter(entry -> entry.getKey().startsWith(prefix)) + .collect(Properties::new, (props, e) -> + props.setProperty(e.getKey().substring(prefix.length()), e.getValue()), + Properties::putAll); + } +} diff --git a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java b/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java deleted file mode 100644 index a163036..0000000 --- a/src/main/java/com/zdjizhi/etl/EtlProcessFunction.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.DosSketchLog; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.HashSet; - -import static com.zdjizhi.sink.OutputStreamSink.outputTag; - -/** - * @author 94976 - */ -public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> { - -// private static final Logger logger = LoggerFactory.getLogger(EtlProcessFunction.class); - private static final Log logger = LogFactory.get(); - private static final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0"; - private static final String EMPTY_SOURCE_IP_IPV6 = "::"; - - @Override - public void process(Tuple3 keys, - Context context, Iterable elements, - Collector out) { - DosSketchLog middleResult = getMiddleResult(keys, elements); - try { - if (middleResult != null){ - out.collect(middleResult); - logger.debug("获取中间聚合结果:{}",middleResult.toString()); - context.output(outputTag,TrafficServerIpMetrics.getOutputMetric(middleResult)); - } - }catch (Exception e){ - logger.error("获取中间聚合结果失败,middleResult: {}\n{}",middleResult.toString(),e); - } - } - - private DosSketchLog getMiddleResult(Tuple3 keys,Iterable elements){ - - DosSketchLog midResuleLog = new DosSketchLog(); - Tuple7 values = sketchAggregate(elements); - try { - if (values != null){ - midResuleLog.setAttack_type(keys.f0); - midResuleLog.setDestination_ip(keys.f1); - midResuleLog.setVsys_id(keys.f2); - midResuleLog.setSketch_start_time(values.f4); - midResuleLog.setSketch_duration(values.f5); - midResuleLog.setSource_ip(values.f3); - midResuleLog.setSketch_sessions(values.f0); - midResuleLog.setSketch_packets(values.f1); - midResuleLog.setSketch_bytes(values.f2); - midResuleLog.setCommon_recv_time(values.f6); - return midResuleLog; - } - } catch (Exception e){ - logger.error("加载中间结果集失败,keys: {} values: {}\n{}",keys,values,e); - } - return null; - } - - private Tuple7 sketchAggregate(Iterable elements){ - long sessions = 0; - long packets = 0 ; - long bytes = 0; - long startTime = System.currentTimeMillis()/1000; - long endTime = System.currentTimeMillis()/1000; - long duration = 0; - long recvtime = 0; - HashSet sourceIpSet = new HashSet<>(); - try { - for (DosSketchLog newSketchLog : elements){ - if (recvtime == 0){ - recvtime = newSketchLog.getCommon_recv_time(); - }else if (recvtime > newSketchLog.getCommon_recv_time()){ - recvtime = newSketchLog.getCommon_recv_time(); - } - String sourceIp = newSketchLog.getSource_ip(); - if (StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp,EMPTY_SOURCE_IP_IPV6)){ - sessions += newSketchLog.getSketch_sessions(); - packets += newSketchLog.getSketch_packets(); - bytes += newSketchLog.getSketch_bytes(); - startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time(); - endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime; - duration = endTime - startTime == 0 ? 5 : endTime - startTime; - }else { - if (sourceIpSet.size() < FlowWriteConfig.SOURCE_IP_LIST_LIMIT){ - sourceIpSet.add(sourceIp); - } - } - } - - String sourceIpList = StringUtils.join(sourceIpSet, ","); - return Tuple7.of(sessions/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,packets/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME, - bytes*8/ FlowWriteConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration,recvtime); - }catch (Exception e){ - logger.error("聚合中间结果集失败 {}",e); - } - return null; - } - -} diff --git a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java b/src/main/java/com/zdjizhi/etl/ParseSketchLog.java deleted file mode 100644 index 08c91a5..0000000 --- a/src/main/java/com/zdjizhi/etl/ParseSketchLog.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.zdjizhi.etl; - -import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.utils.StringUtil; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.source.DosSketchSource; -import com.zdjizhi.utils.FlinkEnvironmentUtils; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.util.*; - -/** - * @author wlh - */ -public class ParseSketchLog { - - private static Logger logger = LoggerFactory.getLogger(ParseSketchLog.class); - - - public static SingleOutputStreamOperator getSketchSource() { - return flatSketchSource().assignTimestampsAndWatermarks(createWatermarkStrategy()); - } - - private static SingleOutputStreamOperator flatSketchSource() { - return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog()); - } - - private static WatermarkStrategy createWatermarkStrategy() { - return WatermarkStrategy - .forBoundedOutOfOrderness(Duration.ofSeconds(FlowWriteConfig.FLINK_WATERMARK_MAX_ORDERNESS)) - .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); - } - - private static class FlatSketchLog implements FlatMapFunction { - @Override - public void flatMap(String s, Collector collector) { - try { - if (StringUtil.isNotBlank(s)) { - - final long recv_time = System.currentTimeMillis()/1000; - - HashMap sketchSource = JSONObject.parseObject(s, HashMap.class); - long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); - long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); - String attackType = sketchSource.get("attack_type").toString(); - int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); - String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list")); - - ArrayList> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class); - - for (HashMap obj : reportIpList) { - DosSketchLog dosSketchLog = new DosSketchLog(); - dosSketchLog.setCommon_recv_time(recv_time); - dosSketchLog.setSketch_start_time(sketchStartTime); - dosSketchLog.setSketch_duration(sketchDuration); - dosSketchLog.setAttack_type(attackType); - dosSketchLog.setVsys_id(vsysId); - String sourceIp = obj.get("source_ip").toString(); - String destinationIp = obj.get("destination_ip").toString(); - long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); - long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); - long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); - dosSketchLog.setSource_ip(sourceIp); - dosSketchLog.setDestination_ip(destinationIp); - dosSketchLog.setSketch_sessions(sketchSessions); - dosSketchLog.setSketch_packets(sketchPackets); - dosSketchLog.setSketch_bytes(sketchBytes); - collector.collect(dosSketchLog); - logger.debug("数据解析成功:{}", dosSketchLog.toString()); - } - } - } catch (Exception e) { - logger.error("数据解析错误:{} \n{}", s, e); - } - } - } -} diff --git a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java b/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java deleted file mode 100644 index a4e5bdd..0000000 --- a/src/main/java/com/zdjizhi/etl/TrafficServerIpMetrics.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.zdjizhi.etl; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.DosMetricsLog; -import com.zdjizhi.common.DosSketchLog; - -class TrafficServerIpMetrics { - -// private static final Logger logger = LoggerFactory.getLogger(TrafficServerIpMetrics.class); - private static final Log logger = LogFactory.get(); - - static DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) { - DosMetricsLog dosMetricsLog = new DosMetricsLog(); - dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis()/1000)); - dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip()); - dosMetricsLog.setAttack_type(midResuleLog.getAttack_type()); - dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); - dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets()); - dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes()); - dosMetricsLog.setVsys_id(midResuleLog.getVsys_id()); - dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip())); - logger.debug("metric 结果已加载:{}",dosMetricsLog.toString()); - return dosMetricsLog; - } - - private static long timeFloor(long sketchStartTime){ - return sketchStartTime / FlowWriteConfig.FLINK_WINDOW_MAX_TIME * FlowWriteConfig.FLINK_WINDOW_MAX_TIME; - } - - private static int getPartitionNumByIp(String destinationIp){ - return Math.abs(destinationIp.hashCode()) % FlowWriteConfig.DESTINATION_IP_PARTITION_NUM; - } - -} diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/function/DosDetectionFunction.java similarity index 71% rename from src/main/java/com/zdjizhi/etl/DosDetection.java rename to src/main/java/com/zdjizhi/function/DosDetectionFunction.java index 3fb5a7c..e556124 100644 --- a/src/main/java/com/zdjizhi/etl/DosDetection.java +++ b/src/main/java/com/zdjizhi/function/DosDetectionFunction.java @@ -1,15 +1,18 @@ -package com.zdjizhi.etl; +package com.zdjizhi.function; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.geedgenetworks.utils.DateUtils; import com.geedgenetworks.utils.StringUtil; import com.zdjizhi.common.*; -import com.zdjizhi.utils.*; +import com.zdjizhi.utils.Snowflakeld.SnowflakeId; +import com.zdjizhi.utils.Threshold.ParseBaselineThreshold; +import com.zdjizhi.utils.Threshold.ParseStaticThreshold; +import com.zdjizhi.utils.connections.http.HttpClientService; +import com.zdjizhi.utils.knowledgebase.IpLookupUtils; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -18,48 +21,88 @@ import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.text.NumberFormat; import java.util.*; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; + +import static com.zdjizhi.conf.DosConfigs.*; /** * @author wlh */ -public class DosDetection extends ProcessFunction { - +public class DosDetectionFunction extends ProcessFunction { private static final Log logger = LogFactory.get(); - private static Map> baselineMap = new HashMap<>(); - private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); + private Map> baselineMap = new HashMap<>(); + private final NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance(); private HashMap>> thresholdRangeMap; - - private final static int BASELINE_SIZE = 144; - private final static int STATIC_CONDITION_TYPE = 1; - private final static int BASELINE_CONDITION_TYPE = 2; - private final static int SENSITIVITY_CONDITION_TYPE = 3; - - private final static String SESSIONS_TAG = "sessions"; - private final static String PACKETS_TAG = "packets"; - private final static String BITS_TAG = "bits"; - - private final static int OTHER_BASELINE_TYPE = 3; - - - + private final int BASELINE_SIZE = 144; + private final int STATIC_CONDITION_TYPE = 1; + private final int BASELINE_CONDITION_TYPE = 2; + private final int SENSITIVITY_CONDITION_TYPE = 3; + private final String SESSIONS_TAG = "sessions"; + private final String PACKETS_TAG = "packets"; + private final String BITS_TAG = "bits"; + private final int OTHER_BASELINE_TYPE = 3; + private SnowflakeId snowflakeId; + private Configuration configuration; + private HttpClientService httpClientService; + private IpLookupUtils ipLookupUtils; + private ParseBaselineThreshold parseBaselineThresholdld; + private ParseStaticThreshold parseStaticThreshold; @Override public void open(Configuration parameters) { - ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, - new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build()); - try { - super.open(parameters); - executorService.scheduleAtFixedRate(() -> thresholdRangeMap = ParseStaticThreshold.createStaticThreshold(), 0, - FlowWriteConfig.STATIC_THRESHOLD_SCHEDULE_MINUTES, TimeUnit.MINUTES); - executorService.scheduleAtFixedRate(() -> baselineMap = ParseBaselineThreshold.readFromHbase(), 0, - FlowWriteConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS); + configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + httpClientService = new HttpClientService(configuration); + + snowflakeId = new SnowflakeId(configuration.get(DATA_CENTER_ID_NUM), getRuntimeContext().getIndexOfThisSubtask()); + + try { + ipLookupUtils = new IpLookupUtils(configuration, httpClientService); + ipLookupUtils.stuffKnowledgeMetaCache(); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + ipLookupUtils.stuffKnowledgeMetaCache(); + logger.info("定位库定时调度成功"); + } + }, configuration.get(KNOWLEDGE_BASE_SCHEDULE_MINUTES) * 60 * 1000, configuration.get(KNOWLEDGE_BASE_SCHEDULE_MINUTES) * 60 * 1000); } catch (Exception e) { - logger.error("定时器任务执行失败", e); + logger.error("定位库加载失败,具体原因为" + e); } + + try { + parseStaticThreshold = new ParseStaticThreshold(configuration, httpClientService); + thresholdRangeMap = parseStaticThreshold.createStaticThreshold(); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + thresholdRangeMap = parseStaticThreshold.createStaticThreshold(); + logger.info("基于静态阈值构建threshold RangeMap成功,Threshold RangeMap:" + thresholdRangeMap.toString()); + } + }, configuration.get(STATIC_THRESHOLD_SCHEDULE_MINUTES) * 60 * 1000, configuration.get(STATIC_THRESHOLD_SCHEDULE_MINUTES) * 60 * 1000); + + } catch (Exception e) { + logger.error("基于静态阈值构建threshold RangeMap失败,失败原因为:" + e); + } + + try { + parseBaselineThresholdld = new ParseBaselineThreshold(configuration); + baselineMap = parseBaselineThresholdld.readFromHbase(); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + baselineMap = parseBaselineThresholdld.readFromHbase(); + logger.info("从Hbase获取baselineMap成功,baselineMap:" + thresholdRangeMap.toString()); + } + }, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000, configuration.get(BASELINE_THRESHOLD_SCHEDULE_DAYS) * 24 * 60 * 60 * 1000); + + } catch (Exception e) { + logger.error("从Hbase获取baselineMap失败,失败原因为:" + e); + } + PERCENT_INSTANCE.setMinimumFractionDigits(2); } @@ -92,7 +135,6 @@ public class DosDetection extends ProcessFunction { } catch (Exception e) { logger.error("判定失败\n {} \n{}", value, e); } - if (finalResult != null) { out.collect(finalResult); } @@ -101,7 +143,7 @@ public class DosDetection extends ProcessFunction { private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) { long sketchSessions = value.getSketch_sessions(); - Integer staticSensitivityThreshold = FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD; + Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD); long diff = sketchSessions - staticSensitivityThreshold; return getDosEventLog(value, staticSensitivityThreshold, diff, 0, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG); } @@ -127,7 +169,7 @@ public class DosDetection extends ProcessFunction { double diffSessionPercent = 0.0; double diffPktPercent = 0.0; double diffBitPercent = 0.0; - //todo 代码Review发现该部分存在bug,23.11版本做修复,需测试。 + if (sessionBase > 0) { diffSessionPercent = getDiffPercent(diffSession, sessionBase) * 100; } @@ -161,14 +203,13 @@ public class DosDetection extends ProcessFunction { if (diff > 0 && base != 0) { double percent = getDiffPercent(diff, base); Severity severity = judgeSeverity(percent); - Integer staticSensitivityThreshold = FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD; + Integer staticSensitivityThreshold = configuration.get(STATIC_SENSITIVITY_THRESHOLD); if (severity != Severity.NORMAL) { - if (type == BASELINE_CONDITION_TYPE && percent < FlowWriteConfig.BASELINE_SENSITIVITY_THRESHOLD) { + if (type == BASELINE_CONDITION_TYPE && percent < configuration.get(BASELINE_SENSITIVITY_THRESHOLD)) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); } else if ((type == BASELINE_CONDITION_TYPE || type == SENSITIVITY_CONDITION_TYPE) && value.getSketch_sessions() < staticSensitivityThreshold) { logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过静态敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value); } else { -// result = getResult(value, base, profileId, severity, percent+1, type, tag); result = getResult(value, base, profileId, severity, percent, type, tag); if (type == SENSITIVITY_CONDITION_TYPE) { result.setSeverity(Severity.MAJOR.severity); @@ -185,7 +226,7 @@ public class DosDetection extends ProcessFunction { private DosEventLog getResult(DosSketchLog value, long base, long profileId, Severity severity, double percent, int type, String tag) { DosEventLog dosEventLog = new DosEventLog(); dosEventLog.setRecv_time(value.getCommon_recv_time()); - dosEventLog.setLog_id(SnowflakeId.generateId()); + dosEventLog.setLog_id(snowflakeId.nextId()); dosEventLog.setVsys_id(value.getVsys_id()); dosEventLog.setStart_time(value.getSketch_start_time()); dosEventLog.setEnd_time(value.getSketch_start_time() + value.getSketch_duration()); @@ -193,9 +234,8 @@ public class DosDetection extends ProcessFunction { dosEventLog.setAttack_type(value.getAttack_type()); dosEventLog.setSeverity(severity.severity); dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent), base, value.getSketch_sessions(), type, tag, dosEventLog)); -// dosEventLog.setConditions(getConditions(percent, base, value.getSketch_sessions(), type, tag,dosEventLog)); dosEventLog.setDestination_ip(value.getDestination_ip()); - dosEventLog.setDestination_country(IpLookupUtils.getCountryLookup(value.getDestination_ip())); + dosEventLog.setDestination_country(ipLookupUtils.getCountryLookup(value.getDestination_ip())); String ipList = value.getSource_ip(); dosEventLog.setSource_ip_list(ipList); dosEventLog.setSource_country_list(getSourceCountryList(ipList)); @@ -219,8 +259,8 @@ public class DosDetection extends ProcessFunction { logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule); base = defaultVaule; } - if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD) { - base = FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD; + if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < configuration.get(STATIC_SENSITIVITY_THRESHOLD)) { + base = configuration.get(STATIC_SENSITIVITY_THRESHOLD); } } } @@ -267,7 +307,7 @@ public class DosDetection extends ProcessFunction { String[] ipArr = sourceIpList.split(","); HashSet countrySet = new HashSet<>(); for (String ip : ipArr) { - String country = IpLookupUtils.getCountryLookup(ip); + String country = ipLookupUtils.getCountryLookup(ip); if (StringUtil.isNotBlank(country)) { countrySet.add(country); } @@ -296,7 +336,6 @@ public class DosDetection extends ProcessFunction { } - private Double getDiffPercent(long diff, long base) { try { return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue(); @@ -308,15 +347,15 @@ public class DosDetection extends ProcessFunction { } private Severity judgeSeverity(double diffPercent) { - if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_WARNING_THRESHOLD) { + if (diffPercent >= configuration.get(BASELINE_SESSIONS_MINOR_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_WARNING_THRESHOLD)) { return Severity.MINOR; - } else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD) { + } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_WARNING_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_MAJOR_THRESHOLD)) { return Severity.WARNING; - } else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD) { + } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_MAJOR_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_SEVERE_THRESHOLD)) { return Severity.MAJOR; - } else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < FlowWriteConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) { + } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_SEVERE_THRESHOLD) && diffPercent < configuration.get(BASELINE_SESSIONS_CRITICAL_THRESHOLD)) { return Severity.SEVERE; - } else if (diffPercent >= FlowWriteConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) { + } else if (diffPercent >= configuration.get(BASELINE_SESSIONS_CRITICAL_THRESHOLD)) { return Severity.CRITICAL; } else { return Severity.NORMAL; diff --git a/src/main/java/com/zdjizhi/function/EtlProcessFunction.java b/src/main/java/com/zdjizhi/function/EtlProcessFunction.java new file mode 100644 index 0000000..5cb5e4d --- /dev/null +++ b/src/main/java/com/zdjizhi/function/EtlProcessFunction.java @@ -0,0 +1,141 @@ +package com.zdjizhi.function; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zdjizhi.common.DosMetricsLog; +import com.zdjizhi.common.DosSketchLog; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.HashSet; + +import static com.zdjizhi.conf.DosConfigs.*; + +/** + * @author 94976 + */ +public class EtlProcessFunction extends ProcessWindowFunction, TimeWindow> { + + private static final Log logger = LogFactory.get(); + private final String EMPTY_SOURCE_IP_IPV4 = "0.0.0.0"; + private final String EMPTY_SOURCE_IP_IPV6 = "::"; + public static OutputTag outputTag = new OutputTag("traffic server ip metrics") { + }; + private Configuration configuration; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + configuration = (Configuration) getRuntimeContext() + .getExecutionConfig().getGlobalJobParameters(); + } + + @Override + public void process(Tuple3 keys, + Context context, Iterable elements, + Collector out) { + DosSketchLog middleResult = getMiddleResult(keys, elements); + try { + if (middleResult != null) { + out.collect(middleResult); + logger.debug("获取中间聚合结果:{}", middleResult.toString()); + context.output(outputTag, getOutputMetric(middleResult)); + } + } catch (Exception e) { + logger.error("获取中间聚合结果失败,middleResult: {}\n{}", middleResult.toString(), e); + } + } + + + private DosMetricsLog getOutputMetric(DosSketchLog midResuleLog) { + DosMetricsLog dosMetricsLog = new DosMetricsLog(); + dosMetricsLog.setSketch_start_time(timeFloor(System.currentTimeMillis() / 1000)); + dosMetricsLog.setDestination_ip(midResuleLog.getDestination_ip()); + dosMetricsLog.setAttack_type(midResuleLog.getAttack_type()); + dosMetricsLog.setSession_rate(midResuleLog.getSketch_sessions()); + dosMetricsLog.setPacket_rate(midResuleLog.getSketch_packets()); + dosMetricsLog.setBit_rate(midResuleLog.getSketch_bytes()); + dosMetricsLog.setVsys_id(midResuleLog.getVsys_id()); + dosMetricsLog.setPartition_num(getPartitionNumByIp(midResuleLog.getDestination_ip())); + logger.debug("metric 结果已加载:{}", dosMetricsLog.toString()); + return dosMetricsLog; + } + + private long timeFloor(long sketchStartTime) { + return sketchStartTime / configuration.get(FLINK_WINDOW_MAX_TIME) * configuration.get(FLINK_WINDOW_MAX_TIME); + } + + private int getPartitionNumByIp(String destinationIp) { + return Math.abs(destinationIp.hashCode()) % configuration.get(DESTINATION_IP_PARTITION_NUM); + } + + private DosSketchLog getMiddleResult(Tuple3 keys, Iterable elements) { + + DosSketchLog midResuleLog = new DosSketchLog(); + Tuple7 values = sketchAggregate(elements); + try { + if (values != null) { + midResuleLog.setAttack_type(keys.f0); + midResuleLog.setDestination_ip(keys.f1); + midResuleLog.setVsys_id(keys.f2); + midResuleLog.setSketch_start_time(values.f4); + midResuleLog.setSketch_duration(values.f5); + midResuleLog.setSource_ip(values.f3); + midResuleLog.setSketch_sessions(values.f0); + midResuleLog.setSketch_packets(values.f1); + midResuleLog.setSketch_bytes(values.f2); + midResuleLog.setCommon_recv_time(values.f6); + return midResuleLog; + } + } catch (Exception e) { + logger.error("加载中间结果集失败,keys: {} values: {}\n{}", keys, values, e); + } + return null; + } + + private Tuple7 sketchAggregate(Iterable elements) { + long sessions = 0; + long packets = 0; + long bytes = 0; + long startTime = System.currentTimeMillis() / 1000; + long endTime = System.currentTimeMillis() / 1000; + long duration = 0; + long recvtime = 0; + HashSet sourceIpSet = new HashSet<>(); + try { + for (DosSketchLog newSketchLog : elements) { + if (recvtime == 0) { + recvtime = newSketchLog.getCommon_recv_time(); + } else if (recvtime > newSketchLog.getCommon_recv_time()) { + recvtime = newSketchLog.getCommon_recv_time(); + } + String sourceIp = newSketchLog.getSource_ip(); + if (StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV4) || StringUtils.equals(sourceIp, EMPTY_SOURCE_IP_IPV6)) { + sessions += newSketchLog.getSketch_sessions(); + packets += newSketchLog.getSketch_packets(); + bytes += newSketchLog.getSketch_bytes(); + startTime = newSketchLog.getSketch_start_time() > startTime ? startTime : newSketchLog.getSketch_start_time(); + endTime = newSketchLog.getSketch_start_time() > endTime ? newSketchLog.getSketch_start_time() : endTime; + duration = endTime - startTime == 0 ? 5 : endTime - startTime; + } else { + if (sourceIpSet.size() < configuration.get(SOURCE_IP_LIST_LIMIT)) { + sourceIpSet.add(sourceIp); + } + } + } + String sourceIpList = StringUtils.join(sourceIpSet, ","); + return Tuple7.of(sessions / configuration.get(FLINK_WINDOW_MAX_TIME), packets / configuration.get(FLINK_WINDOW_MAX_TIME), + bytes * 8 / configuration.get(FLINK_WINDOW_MAX_TIME), sourceIpList, startTime, duration, recvtime); + } catch (Exception e) { + logger.error("聚合中间结果集失败 {}", e); + } + return null; + } + +} diff --git a/src/main/java/com/zdjizhi/function/FlatSketchFunction.java b/src/main/java/com/zdjizhi/function/FlatSketchFunction.java new file mode 100644 index 0000000..4b3777b --- /dev/null +++ b/src/main/java/com/zdjizhi/function/FlatSketchFunction.java @@ -0,0 +1,53 @@ +package com.zdjizhi.function; + +import com.alibaba.fastjson2.JSONObject; +import com.geedgenetworks.utils.StringUtil; +import com.zdjizhi.common.DosSketchLog; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; + +public class FlatSketchFunction implements FlatMapFunction { + private static Logger logger = LoggerFactory.getLogger(FlatSketchFunction.class); + @Override + public void flatMap(String value, Collector out) { + try { + if (StringUtil.isNotBlank(value)) { + final long recv_time = System.currentTimeMillis()/1000; + HashMap sketchSource = JSONObject.parseObject(value, HashMap.class); + long sketchStartTime = Long.parseLong(sketchSource.get("sketch_start_time").toString()); + long sketchDuration = Long.parseLong(sketchSource.get("sketch_duration").toString()); + String attackType = sketchSource.get("attack_type").toString(); + int vsysId = Integer.parseInt(sketchSource.getOrDefault("common_vsys_id", 1).toString()); + String report_ip_list = JSONObject.toJSONString(sketchSource.get("report_ip_list")); + ArrayList> reportIpList = JSONObject.parseObject(report_ip_list, ArrayList.class); + for (HashMap obj : reportIpList) { + DosSketchLog dosSketchLog = new DosSketchLog(); + dosSketchLog.setCommon_recv_time(recv_time); + dosSketchLog.setSketch_start_time(sketchStartTime); + dosSketchLog.setSketch_duration(sketchDuration); + dosSketchLog.setAttack_type(attackType); + dosSketchLog.setVsys_id(vsysId); + String sourceIp = obj.get("source_ip").toString(); + String destinationIp = obj.get("destination_ip").toString(); + long sketchSessions = Long.parseLong(obj.get("sketch_sessions").toString()); + long sketchPackets = Long.parseLong(obj.get("sketch_packets").toString()); + long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString()); + dosSketchLog.setSource_ip(sourceIp); + dosSketchLog.setDestination_ip(destinationIp); + dosSketchLog.setSketch_sessions(sketchSessions); + dosSketchLog.setSketch_packets(sketchPackets); + dosSketchLog.setSketch_bytes(sketchBytes); + out.collect(dosSketchLog); + logger.debug("数据解析成功:{}", dosSketchLog); + } + } + } catch (Exception e) { + logger.error("数据解析错误:{} \n{}", value, e); + } + } +} diff --git a/src/main/java/com/zdjizhi/function/SketchKeysSelector.java b/src/main/java/com/zdjizhi/function/SketchKeysSelector.java new file mode 100644 index 0000000..edc3f0a --- /dev/null +++ b/src/main/java/com/zdjizhi/function/SketchKeysSelector.java @@ -0,0 +1,15 @@ +package com.zdjizhi.function; + +import com.zdjizhi.common.DosSketchLog; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple3; + +public class SketchKeysSelector implements KeySelector> { + @Override + public Tuple3 getKey(DosSketchLog dosSketchLog){ + return Tuple3.of( + dosSketchLog.getAttack_type(), + dosSketchLog.getDestination_ip(), + dosSketchLog.getVsys_id()); + } +} diff --git a/src/main/java/com/zdjizhi/main/DosDetectionApplication.java b/src/main/java/com/zdjizhi/main/DosDetectionApplication.java index e78d462..3b5d770 100644 --- a/src/main/java/com/zdjizhi/main/DosDetectionApplication.java +++ b/src/main/java/com/zdjizhi/main/DosDetectionApplication.java @@ -1,15 +1,84 @@ package com.zdjizhi.main; -import com.zdjizhi.sink.OutputStreamSink; +import com.alibaba.fastjson2.JSONObject; +import com.zdjizhi.common.DosEventLog; +import com.zdjizhi.common.DosSketchLog; +import com.zdjizhi.conf.DosConfiguration; +import com.zdjizhi.function.DosDetectionFunction; +import com.zdjizhi.function.EtlProcessFunction; +import com.zdjizhi.function.FlatSketchFunction; +import com.zdjizhi.function.SketchKeysSelector; +import com.zdjizhi.utils.connections.kafka.KafkaConsumer; +import com.zdjizhi.utils.connections.kafka.KafkaProducer; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; + +import java.time.Duration; +import java.util.Objects; + +import static com.zdjizhi.conf.DosConfigs.*; /** - * @author wlh + * @author wangchengcheng * 程序主类入口 */ public class DosDetectionApplication { - public static void main(String[] args) { - OutputStreamSink.finalOutputSink(); - } + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // param check + if (args.length < 1) { + throw new IllegalArgumentException("Error: Not found properties path. " + + "\nUsage: flink -c xxx xxx.jar app.properties."); + } + final ParameterTool tool = ParameterTool.fromPropertiesFile(args[0]); + + final Configuration config = tool.getConfiguration(); + env.getConfig().setGlobalJobParameters(config); + + final DosConfiguration DosConfiguration = new DosConfiguration(config); + + //Source settings + final DataStreamSource dosStreamSource = env.addSource(KafkaConsumer.getKafkaConsumer(config.get(SOURCE_KAFKA_TOPIC), DosConfiguration + .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX))).setParallelism(config.get(SOURCE_PARALLELISM)); + + //Watermark settings + final WatermarkStrategy dosSketchLogWatermarkStrategy = WatermarkStrategy. + forBoundedOutOfOrderness(Duration.ofSeconds(config.get(FLINK_WATERMARK_MAX_ORDERNESS))) + .withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000); + + //Data preprocessing + final SingleOutputStreamOperator sketchSource = dosStreamSource.flatMap(new FlatSketchFunction()) + .assignTimestampsAndWatermarks(dosSketchLogWatermarkStrategy); + + //windowed aggregation + final SingleOutputStreamOperator middleStream = sketchSource.keyBy(new SketchKeysSelector()) + .window(TumblingEventTimeWindows.of(Time.seconds(config.get(FLINK_WINDOW_MAX_TIME)))).process(new EtlProcessFunction()) + .setParallelism(config.get(Flink_FIRST_AGG_PATALLELISM)); + + //dos detection + final SingleOutputStreamOperator dosEventLogOutputStream = middleStream.process(new DosDetectionFunction()) + .setParallelism(config.get(FLINK_DETECTION_MAP_PARALLELISM)); + + //dos event output + dosEventLogOutputStream.filter(Objects::nonNull) + .map(JSONObject::toJSONString) + .addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_EVENT_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX))) + .setParallelism(config.get(KAFKA_SINK_EVENT_PARALLELISM)); + + //traffic server ip metrics output + middleStream.getSideOutput(EtlProcessFunction.outputTag).map(JSONObject::toJSONString) + .addSink(KafkaProducer.getKafkaProducer(config.get(KAFKA_SINK_METRIC_TOPIC), DosConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX))) + .setParallelism(config.get(KAFKA_SINK_METRIC_PARALLELISM)); + + env.execute(args[0]); + } } diff --git a/src/main/java/com/zdjizhi/sink/DosEventSink.java b/src/main/java/com/zdjizhi/sink/DosEventSink.java deleted file mode 100644 index 87795e6..0000000 --- a/src/main/java/com/zdjizhi/sink/DosEventSink.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.zdjizhi.sink; - -import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.DosEventLog; -//import com.zdjizhi.utils.JsonMapper; -import com.zdjizhi.utils.KafkaUtils; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - -import java.util.Objects; - -class DosEventSink { - - static void dosEventOutputSink(SingleOutputStreamOperator dosEventLogOutputStream){ - dosEventLogOutputStream - .filter(Objects::nonNull) -// .map(JsonMapper::toJsonString) - .map(JSONObject::toJSONString) - .addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME)) - .setParallelism(FlowWriteConfig.KAFKA_OUTPUT_EVENT_PARALLELISM); - } - -} diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java deleted file mode 100644 index 2075395..0000000 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.zdjizhi.sink; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.DosEventLog; -import com.zdjizhi.common.DosMetricsLog; -import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.etl.DosDetection; -import com.zdjizhi.etl.EtlProcessFunction; -import com.zdjizhi.etl.ParseSketchLog; -import com.zdjizhi.utils.FlinkEnvironmentUtils; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.datastream.*; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.util.OutputTag; - - -/** - * @author 94976 - */ -public class OutputStreamSink { - // private static final Logger logger = LoggerFactory.getLogger(OutputStreamSink.class); - private static final Log logger = LogFactory.get(); - - public static OutputTag outputTag = new OutputTag("traffic server ip metrics"){}; - - public static void finalOutputSink(){ - try { - SingleOutputStreamOperator middleStream = getMiddleStream(); - DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream)); - TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream); - FlinkEnvironmentUtils.streamExeEnv.execute(FlowWriteConfig.STREAM_EXECUTION_JOB_NAME); - } catch (Exception e) { - logger.error("任务启动失败 {}",e); - } - } - - private static SingleOutputStreamOperator getEventSinkStream(SingleOutputStreamOperator middleStream){ - return middleStream - .process(new DosDetection()).setParallelism(FlowWriteConfig.FLINK_DETECTION_MAP_PARALLELISM); - - } - - private static SingleOutputStreamOperator getMiddleStream(){ - return ParseSketchLog.getSketchSource() - .keyBy(new KeysSelector()) - .window(TumblingEventTimeWindows.of(Time.seconds(FlowWriteConfig.FLINK_WINDOW_MAX_TIME))) - .process(new EtlProcessFunction()) - .setParallelism(FlowWriteConfig.FLINK_FIRST_AGG_PARALLELISM); - } - - private static class KeysSelector implements KeySelector>{ - @Override - public Tuple3 getKey(DosSketchLog dosSketchLog){ - return Tuple3.of( - dosSketchLog.getAttack_type(), - dosSketchLog.getDestination_ip(), - dosSketchLog.getVsys_id()); - } - } - -} diff --git a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java deleted file mode 100644 index 0025544..0000000 --- a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.zdjizhi.sink; - -import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.common.DosMetricsLog; -import com.zdjizhi.common.DosSketchLog; - -import com.zdjizhi.utils.KafkaUtils; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; - -import static com.zdjizhi.sink.OutputStreamSink.outputTag; - -class TrafficServerIpMetricsSink { - - static void sideOutputMetricsSink(SingleOutputStreamOperator outputStream){ - DataStream sideOutput = outputStream.getSideOutput(outputTag); -// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) - sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(FlowWriteConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) - .setParallelism(FlowWriteConfig.KAFKA_OUTPUT_METRIC_PARALLELISM); - - - } - -} diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java deleted file mode 100644 index 238e350..0000000 --- a/src/main/java/com/zdjizhi/source/DosSketchSource.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.zdjizhi.source; - - - -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.FlinkEnvironmentUtils; -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; - -import java.util.Properties; - -/** - * @author wlh - */ -public class DosSketchSource { - - private static StreamExecutionEnvironment streamExeEnv = FlinkEnvironmentUtils.streamExeEnv; - - public static DataStreamSource createDosSketchSource(){ - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS); - properties.setProperty("group.id", FlowWriteConfig.KAFKA_GROUP_ID); - if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){ - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); - } - - return streamExeEnv.addSource(new FlinkKafkaConsumer( - FlowWriteConfig.KAFKA_INPUT_TOPIC_NAME, - new SimpleStringSchema(), properties)) - .setParallelism(FlowWriteConfig.KAFKA_INPUT_PARALLELISM); - } - -} diff --git a/src/main/java/com/zdjizhi/utils/CommonConfigurations.java b/src/main/java/com/zdjizhi/utils/CommonConfigurations.java deleted file mode 100644 index 221a1de..0000000 --- a/src/main/java/com/zdjizhi/utils/CommonConfigurations.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.zdjizhi.utils; - -import java.util.Properties; - -public final class CommonConfigurations { - - private static Properties propService = new Properties(); - - - public static String getStringProperty(String key) { - - return propService.getProperty(key); - - - } - - public static Integer getIntProperty(String key) { - - return Integer.parseInt(propService.getProperty(key)); - - } - - public static Double getDoubleProperty(String key) { - - return Double.parseDouble(propService.getProperty(key)); - - } - - public static Long getLongProperty(String key) { - return Long.parseLong(propService.getProperty(key)); - - } - - public static Boolean getBooleanProperty(Integer type, String key) { - return "true".equals(propService.getProperty(key).toLowerCase().trim()); - } - - static { - try { - propService.load(CommonConfigurations.class.getClassLoader().getResourceAsStream("common.properties")); - } catch (Exception e) { - propService = null; - } - } -} diff --git a/src/main/java/com/zdjizhi/utils/DistributedLock.java b/src/main/java/com/zdjizhi/utils/DistributedLock.java deleted file mode 100644 index 99932a4..0000000 --- a/src/main/java/com/zdjizhi/utils/DistributedLock.java +++ /dev/null @@ -1,200 +0,0 @@ -package com.zdjizhi.utils; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - - - -public class DistributedLock implements Lock, Watcher { -// private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class); - private static final Log logger = LogFactory.get(); - - private ZooKeeper zk = null; - /** - * 根节点 - */ - private final String ROOT_LOCK = "/locks"; - /** - * 竞争的资源 - */ - private String lockName; - /** - * 等待的前一个锁 - */ - private String waitLock; - /** - * 当前锁 - */ - private String currentLock; - /** - * 计数器 - */ - private CountDownLatch countDownLatch; - - private int sessionTimeout = 2000; - - private List exceptionList = new ArrayList(); - - /** - * 配置分布式锁 - * - * @param config 连接的url - * @param lockName 竞争资源 - */ - public DistributedLock(String config, String lockName) { - this.lockName = lockName; - try { - // 连接zookeeper - zk = new ZooKeeper(config, sessionTimeout, this); - Stat stat = zk.exists(ROOT_LOCK, false); - if (stat == null) { - // 如果根节点不存在,则创建根节点 - zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (IOException | InterruptedException | KeeperException e) { - logger.error("Node already exists!"); - } - } - - /** - * 节点监视器 - */ - @Override - public void process(WatchedEvent event) { - if (this.countDownLatch != null) { - this.countDownLatch.countDown(); - } - } - - @Override - public void lock() { - if (exceptionList.size() > 0) { - throw new LockException(exceptionList.get(0)); - } - try { - if (this.tryLock()) { - logger.info(Thread.currentThread().getName() + " " + lockName + "获得了锁"); - } else { - // 等待锁 - waitForLock(waitLock, sessionTimeout); - } - } catch (InterruptedException | KeeperException e) { - logger.error("获取锁异常" + e); - } - } - - @Override - public boolean tryLock() { - try { - String splitStr = "_lock_"; - if (lockName.contains(splitStr)) { - throw new LockException("锁名有误"); - } - // 创建临时有序节点 - currentLock = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); - // 取所有子节点 - List subNodes = zk.getChildren(ROOT_LOCK, false); - // 取出所有lockName的锁 - List lockObjects = new ArrayList(); - for (String node : subNodes) { - String tmpNode = node.split(splitStr)[0]; - if (tmpNode.equals(lockName)) { - lockObjects.add(node); - } - } - Collections.sort(lockObjects); - // 若当前节点为最小节点,则获取锁成功 - if (currentLock.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { - return true; - } - // 若不是最小节点,则找到自己的前一个节点 - String prevNode = currentLock.substring(currentLock.lastIndexOf("/") + 1); - waitLock = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); - } catch (InterruptedException | KeeperException e) { - logger.error("获取锁过程异常" + e); - } - return false; - } - - - @Override - public boolean tryLock(long timeout, TimeUnit unit) { - try { - if (this.tryLock()) { - return true; - } - return waitForLock(waitLock, timeout); - } catch (KeeperException | InterruptedException | RuntimeException e) { - logger.error("判断是否锁定异常" + e); - } - return false; - } - - /** - * 等待锁 - * - * @param prev 锁名称 - * @param waitTime 等待时间 - * @return - * @throws KeeperException - * @throws InterruptedException - */ - private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { - Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); - - if (stat != null) { - this.countDownLatch = new CountDownLatch(1); - // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 - this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); - this.countDownLatch = null; - } - return true; - } - - @Override - public void unlock() { - try { - zk.delete(currentLock, -1); - currentLock = null; - zk.close(); - } catch (InterruptedException | KeeperException e) { - logger.error("关闭锁异常" + e); - } - } - - @Override - public Condition newCondition() { - return null; - } - - @Override - public void lockInterruptibly() throws InterruptedException { - this.lock(); - } - - - public class LockException extends RuntimeException { - private static final long serialVersionUID = 1L; - - public LockException(String e) { - super(e); - } - - public LockException(Exception e) { - super(e); - } - } - -} diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java deleted file mode 100644 index f3d6d11..0000000 --- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.zdjizhi.utils; - -import com.zdjizhi.common.FlowWriteConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - - -/** - * @author wlh - */ -public class FlinkEnvironmentUtils { - public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - - static { - streamExeEnv.setParallelism(FlowWriteConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM); - - /* - // 每 1000ms 开始一次 checkpoint - streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000); - - // 设置模式为精确一次 (这是默认值) - streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - - // 确认 checkpoints 之间的时间会进行 500 ms - streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); - - // Checkpoint 必须在一分钟内完成,否则就会被抛弃 - streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000); - - // 允许两个连续的 checkpoint 错误 - streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); - - // 同一时间只允许一个 checkpoint 进行 - streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - - // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 - streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints( - CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - - // 开启实验性的 unaligned checkpoints - streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints(); - */ - } - -} diff --git a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java b/src/main/java/com/zdjizhi/utils/HttpClientUtils.java deleted file mode 100644 index b69e38b..0000000 --- a/src/main/java/com/zdjizhi/utils/HttpClientUtils.java +++ /dev/null @@ -1,274 +0,0 @@ -package com.zdjizhi.utils; - -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import com.geedgenetworks.utils.StringUtil; - -import com.zdjizhi.common.FlowWriteConfig; -import org.apache.http.*; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpRequestRetryHandler; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.conn.ConnectionKeepAliveStrategy; -import org.apache.http.conn.HttpHostConnectException; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.http.message.BasicHeaderElementIterator; -import org.apache.http.protocol.HTTP; -import org.apache.http.util.EntityUtils; - -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLHandshakeException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.Map; - -/** - * http client工具类 - * @author wlh - */ -public class HttpClientUtils { - /** 全局连接池对象 */ - private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager(); - -// private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); - private static final Log logger = LogFactory.get(); - public static final String ERROR_MESSAGE = "-1"; - - /* - * 静态代码块配置连接池信息 - */ - static { - - // 设置最大连接数 - CONN_MANAGER.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION); - // 设置每个连接的路由数 - CONN_MANAGER.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE); - - } - - /** - * 获取Http客户端连接对象 - * @return Http客户端连接对象 - */ - private static CloseableHttpClient getHttpClient() { - // 创建Http请求配置参数 - RequestConfig requestConfig = RequestConfig.custom() - // 获取连接超时时间 - .setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT) - // 请求超时时间 - .setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT) - // 响应超时时间 - .setSocketTimeout(FlowWriteConfig.HTTP_POOL_RESPONSE_TIMEOUT) - .build(); - - /* - * 测出超时重试机制为了防止超时不生效而设置 - * 如果直接放回false,不重试 - * 这里会根据情况进行判断是否重试 - */ - HttpRequestRetryHandler retry = (exception, executionCount, context) -> { - if (executionCount >= 3) {// 如果已经重试了3次,就放弃 - return false; - } - if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 - return true; - } - if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 - return false; - } - if (exception instanceof UnknownHostException) {// 目标服务器不可达 - return false; - } - if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 - return false; - } - if (exception instanceof HttpHostConnectException) {// 连接被拒绝 - return false; - } - if (exception instanceof SSLException) {// ssl握手异常 - return false; - } - if (exception instanceof InterruptedIOException) {// 超时 - return true; - } - HttpClientContext clientContext = HttpClientContext.adapt(context); - HttpRequest request = clientContext.getRequest(); - // 如果请求是幂等的,就再次尝试 - return !(request instanceof HttpEntityEnclosingRequest); - }; - - - ConnectionKeepAliveStrategy myStrategy = (response, context) -> { - HeaderElementIterator it = new BasicHeaderElementIterator - (response.headerIterator(HTTP.CONN_KEEP_ALIVE)); - while (it.hasNext()) { - HeaderElement he = it.nextElement(); - String param = he.getName(); - String value = he.getValue(); - if (value != null && "timeout".equalsIgnoreCase(param)) { - return Long.parseLong(value) * 1000; - } - } - return 60 * 1000;//如果没有约定,则默认定义时长为60s - }; - - // 创建httpClient - return HttpClients.custom() - // 把请求相关的超时信息设置到连接客户端 - .setDefaultRequestConfig(requestConfig) - // 把请求重试设置到连接客户端 - .setRetryHandler(retry) - .setKeepAliveStrategy(myStrategy) - // 配置连接池管理对象 - .setConnectionManager(CONN_MANAGER) - .build(); - } - - - /** - * GET请求 - * - * @param uri 请求地 - * @return message - */ - public static String httpGet(URI uri, Header... headers) { - String msg = ERROR_MESSAGE; - - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(); - CloseableHttpResponse response = null; - - try { - logger.info("http get uri {}",uri); - // 创建GET请求对象 - HttpGet httpGet = new HttpGet(uri); - - if (StringUtil.isNotEmpty(headers)) { - for (Header h : headers) { - httpGet.addHeader(h); - logger.info("request header : {}",h); - } - } - // 执行请求 - response = httpClient.execute(httpGet); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - - if (statusCode != HttpStatus.SC_OK) { - logger.error("Http get content is :{}" , msg); - } - - } catch (ClientProtocolException e) { - logger.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - logger.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - logger.error("IO错误: {}",e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consume(response.getEntity()); - response.close(); - } catch (IOException e) { - logger.error("释放链接错误: {}", e.getMessage()); - - } - } - } - - return msg; - } - /** - * POST 请求 - * @param uri uri参数 - * @param requestBody 请求体 - * @return post请求返回结果 - */ - public static String httpPost(URI uri, String requestBody, Header... headers) { - String msg = ERROR_MESSAGE; - // 获取客户端连接对象 - CloseableHttpClient httpClient = getHttpClient(); - - // 创建POST请求对象 - CloseableHttpResponse response = null; - try { - - logger.info("http post uri:{}, http post body:{}", uri, requestBody); - - HttpPost httpPost = new HttpPost(uri); - httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded"); - if (StringUtil.isNotEmpty(headers)) { - for (Header h : headers) { - httpPost.addHeader(h); - logger.info("request header : {}",h); - } - } - - if(StringUtil.isNotBlank(requestBody)) { - byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8); - httpPost.setEntity(new ByteArrayEntity(bytes)); - } - - response = httpClient.execute(httpPost); - int statusCode = response.getStatusLine().getStatusCode(); - // 获取响应实体 - HttpEntity entity = response.getEntity(); - // 获取响应信息 - msg = EntityUtils.toString(entity, "UTF-8"); - - if (statusCode != HttpStatus.SC_OK) { - logger.error("Http post content is :{}" , msg); - } - } catch (ClientProtocolException e) { - logger.error("协议错误: {}", e.getMessage()); - } catch (ParseException e) { - logger.error("解析错误: {}", e.getMessage()); - } catch (IOException e) { - logger.error("IO错误: {}", e.getMessage()); - } finally { - if (null != response) { - try { - EntityUtils.consumeQuietly(response.getEntity()); - response.close(); - } catch (IOException e) { - logger.error("释放链接错误: {}", e.getMessage()); - - } - } - } - return msg; - } - - /** - * 拼装url - * url ,参数map - */ - public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map params) { - try { - uriBuilder.setPath(path); - if (params != null && !params.isEmpty()){ - for (Map.Entry kv : params.entrySet()) { - uriBuilder.setParameter(kv.getKey(),kv.getValue().toString()); - } - } - } catch (Exception e) { - logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params); - } - } - -} diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java deleted file mode 100644 index 4298c40..0000000 --- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.zdjizhi.utils; - -import com.zdjizhi.common.FlowWriteConfig; -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; - -import java.util.Optional; -import java.util.Properties; - -public class KafkaUtils { - - private static Properties getKafkaSinkProperty(){ - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", FlowWriteConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS); - if (FlowWriteConfig.SASL_JAAS_CONFIG_FLAG == 1){ - properties.put("security.protocol", "SASL_PLAINTEXT"); - properties.put("sasl.mechanism", "PLAIN"); - properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+ FlowWriteConfig.SASL_JAAS_CONFIG_PASSWORD+"\";"); - } - - return properties; - } - - public static FlinkKafkaProducer getKafkaSink(String topic){ - FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( - topic, - new SimpleStringSchema(), - getKafkaSinkProperty(), - Optional.empty() - ); - kafkaProducer.setLogFailuresOnly(true); - return kafkaProducer; - } - -} diff --git a/src/main/java/com/zdjizhi/utils/SnowflakeId.java b/src/main/java/com/zdjizhi/utils/Snowflakeld/SnowflakeId.java similarity index 71% rename from src/main/java/com/zdjizhi/utils/SnowflakeId.java rename to src/main/java/com/zdjizhi/utils/Snowflakeld/SnowflakeId.java index 0af5582..6061b50 100644 --- a/src/main/java/com/zdjizhi/utils/SnowflakeId.java +++ b/src/main/java/com/zdjizhi/utils/Snowflakeld/SnowflakeId.java @@ -1,13 +1,18 @@ -package com.zdjizhi.utils; +package com.zdjizhi.utils.Snowflakeld; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; -import com.zdjizhi.common.FlowWriteConfig; +import org.apache.flink.configuration.Configuration; + +import static com.zdjizhi.conf.DosConfigs.DATA_CENTER_ID_NUM; +import static com.zdjizhi.conf.DosConfigs.HBASE_ZOOKEEPER_QUORUM; public class SnowflakeId { -// private static final Logger logger = LoggerFactory.getLogger(SnowflakeId.class); + private static final Log logger = LogFactory.get(); + private Configuration configuration; + /** * 共64位 第一位为符号位 默认0 * 时间戳 39位(17 year), centerId:(关联每个环境或任务数) :7位(0-127), @@ -87,46 +92,32 @@ public class SnowflakeId { */ private long lastTimestamp = -1L; - - /** - * 设置允许时间回拨的最大限制10s - */ - private static final long ROLL_BACK_TIME = 10000L; + private static final long rollBackTime = 10000L; - private static SnowflakeId idWorker; - private static ZookeeperUtils zookeeperUtils = new ZookeeperUtils(); - static { - idWorker = new SnowflakeId(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, FlowWriteConfig.DATA_CENTER_ID_NUM); - } //==============================Constructors===================================== /** - * 构造函数 + * 初始化雪花ID + * + * @param dataCenterIdNum 数据中心编号 + * @param tmpWorkerId worker编号 */ - private SnowflakeId(String zookeeperIp, long dataCenterIdNum) { - DistributedLock lock = new DistributedLock(FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM, "disLocks1"); - try { - lock.lock(); - int tmpWorkerId = zookeeperUtils.modifyNode("/Snowflake/" + "worker" + dataCenterIdNum, zookeeperIp); - if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { - throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); - } - if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { - throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); - } - this.workerId = tmpWorkerId; - this.dataCenterId = dataCenterIdNum; - } catch (RuntimeException e) { - logger.error("This is not usual error!!!===>>>" + e + "<<<==="); - }finally { - lock.unlock(); + public SnowflakeId(long dataCenterIdNum, long tmpWorkerId) { + if (tmpWorkerId > maxWorkerId || tmpWorkerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } - } + if (dataCenterIdNum > maxDataCenterId || dataCenterIdNum < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than ", maxDataCenterId)); + } + this.workerId = tmpWorkerId; + this.dataCenterId = dataCenterIdNum; + + } // ==============================Methods========================================== /** @@ -134,10 +125,10 @@ public class SnowflakeId { * * @return SnowflakeId */ - private synchronized long nextId() { + public synchronized long nextId() { long timestamp = timeGen(); //设置一个允许回拨限制时间,系统时间回拨范围在rollBackTime内可以等待校准 - if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < ROLL_BACK_TIME) { + if (lastTimestamp - timestamp > 0 && lastTimestamp - timestamp < rollBackTime) { timestamp = tilNextMillis(lastTimestamp); } //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 @@ -194,12 +185,4 @@ public class SnowflakeId { } - /** - * 静态工具类 - */ - public static Long generateId() { - return idWorker.nextId(); - } - - } diff --git a/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java b/src/main/java/com/zdjizhi/utils/Threshold/ParseBaselineThreshold.java similarity index 69% rename from src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java rename to src/main/java/com/zdjizhi/utils/Threshold/ParseBaselineThreshold.java index 8d5b9ca..8183c5f 100644 --- a/src/main/java/com/zdjizhi/etl/ParseBaselineThreshold.java +++ b/src/main/java/com/zdjizhi/utils/Threshold/ParseBaselineThreshold.java @@ -1,11 +1,12 @@ -package com.zdjizhi.etl; +package com.zdjizhi.utils.Threshold; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.geedgenetworks.utils.DateUtils; -import com.zdjizhi.common.FlowWriteConfig; + import com.zdjizhi.common.DosBaselineThreshold; -import com.zdjizhi.utils.HbaseUtils; +import com.zdjizhi.utils.connections.hbase.HbaseUtils; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -15,47 +16,52 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.*; +import static com.zdjizhi.conf.DosConfigs.*; + public class ParseBaselineThreshold { - private static final Log logger = LogFactory.get(); - private static ArrayList floodTypeList = new ArrayList<>(); + private ArrayList floodTypeList = new ArrayList<>(); + + private Configuration configuration; private static Table table = null; private static Scan scan = null; - static { - floodTypeList.add("TCP SYN Flood"); - floodTypeList.add("UDP Flood"); - floodTypeList.add("ICMP Flood"); - floodTypeList.add("DNS Flood"); + public ParseBaselineThreshold(Configuration configuration) { + this.configuration = configuration; + this.floodTypeList.add("TCP SYN Flood"); + this.floodTypeList.add("UDP Flood"); + this.floodTypeList.add("ICMP Flood"); + this.floodTypeList.add("DNS Flood"); } - private static void prepareHbaseEnv() throws IOException { + + private void prepareHbaseEnv() throws IOException { org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM); + config.set("hbase.zookeeper.quorum", configuration.get(HBASE_ZOOKEEPER_QUORUM)); config.set("hbase.client.retries.number", "3"); config.set("hbase.bulkload.retries.number", "3"); config.set("zookeeper.recovery.retry", "3"); config.set("hbase.defaults.for.version", "2.2.3"); config.set("hbase.defaults.for.version.skip", "true"); - config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT); - config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, configuration.get(HBASE_CLIENT_OPERATION_TIMEOUT)); + config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, configuration.get(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); - TableName tableName = TableName.valueOf(FlowWriteConfig.HBASE_BASELINE_TABLE_NAME); + TableName tableName = TableName.valueOf(configuration.get(HBASE_BASELINE_TABLE_NAME)); Connection conn = ConnectionFactory.createConnection(config); table = conn.getTable(tableName); long currentTimeMillis = System.currentTimeMillis(); scan = new Scan() .setAllowPartialResults(true) - .setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(FlowWriteConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis) - .setLimit(FlowWriteConfig.HBASE_BASELINE_TOTAL_NUM); + .setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(configuration.get(HBASE_BASELINE_TTL))).getTime(), currentTimeMillis) + .setLimit(configuration.get(HBASE_BASELINE_TOTAL_NUM)); logger.info("连接hbase成功,正在读取baseline数据"); } - static Map> readFromHbase() { + public Map> readFromHbase() { Map> baselineMap = new HashMap<>(); try { prepareHbaseEnv(); @@ -64,16 +70,16 @@ public class ParseBaselineThreshold { for (Result result : rs) { Map floodTypeMap = new HashMap<>(); String rowkey = Bytes.toString(result.getRow()); - for (String type:floodTypeList){ + for (String type : floodTypeList) { DosBaselineThreshold baselineThreshold = new DosBaselineThreshold(); ArrayList sessionRate = HbaseUtils.getArraylist(result, type, "session_rate"); - if (sessionRate != null && !sessionRate.isEmpty()){ + if (sessionRate != null && !sessionRate.isEmpty()) { Integer defaultValue = HbaseUtils.getIntegerValue(result, type, "session_rate_default_value"); Integer rateBaselineType = HbaseUtils.getIntegerValue(result, type, "session_rate_baseline_type"); baselineThreshold.setSession_rate(sessionRate); baselineThreshold.setSession_rate_default_value(defaultValue); baselineThreshold.setSession_rate_baseline_type(rateBaselineType); - floodTypeMap.put(type,baselineThreshold); + floodTypeMap.put(type, baselineThreshold); } } baselineMap.put(rowkey, floodTypeMap); diff --git a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java b/src/main/java/com/zdjizhi/utils/Threshold/ParseStaticThreshold.java similarity index 82% rename from src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java rename to src/main/java/com/zdjizhi/utils/Threshold/ParseStaticThreshold.java index 36d4ce4..3875e9a 100644 --- a/src/main/java/com/zdjizhi/etl/ParseStaticThreshold.java +++ b/src/main/java/com/zdjizhi/utils/Threshold/ParseStaticThreshold.java @@ -1,18 +1,17 @@ -package com.zdjizhi.etl; +package com.zdjizhi.utils.Threshold; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.DosDetectionThreshold; import com.zdjizhi.common.DosVsysId; -import com.zdjizhi.utils.HttpClientUtils; +import com.zdjizhi.utils.connections.http.HttpClientService; import inet.ipaddr.IPAddress; import inet.ipaddr.IPAddressString; +import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Range; import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap; - import org.apache.http.client.utils.URIBuilder; import org.apache.http.message.BasicHeader; @@ -27,30 +26,35 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.zdjizhi.conf.DosConfigs.*; + /** * @author wlh */ public class ParseStaticThreshold { private static final Log logger = LogFactory.get(); - private static String encryptpwd; + public Configuration configuration; + private String encryptpwd; + private HttpClientService httpClientService; - static { - //加载加密登录密码 - encryptpwd = getEncryptpwd(); + + public ParseStaticThreshold(Configuration configuration, HttpClientService httpClientService) { + this.configuration = configuration; + this.httpClientService = httpClientService; } /** * 获取加密密码 */ - private static String getEncryptpwd() { - String psw = HttpClientUtils.ERROR_MESSAGE; + private String getEncryptpwd() { + String psw = httpClientService.ERROR_MESSAGE; try { - URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); + URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI)); HashMap parms = new HashMap<>(); parms.put("password", "admin"); - HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_ENCRYPTPWD_PATH, parms); - String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build()); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_ENCRYPTPWD_PATH), parms); + String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT)); + if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class); boolean success = (boolean) resposeMap.get("success"); String msg = resposeMap.get("msg").toString(); @@ -76,21 +80,21 @@ public class ParseStaticThreshold { * * @return vsysIdList */ - private static ArrayList getVsysId() { + private ArrayList getVsysId() { ArrayList vsysIdList = null; try { - URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); + URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI)); HashMap parms = new HashMap<>(); parms.put("page_size", -1); // parms.put("orderBy", "vsysId desc"); parms.put("type", 1); - HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms); - String token = FlowWriteConfig.BIFANG_SERVER_TOKEN; - if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { + httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_POLICY_VSYSID_PATH), parms); + String token = configuration.get(BIFANG_SERVER_TOKEN); + if (!httpClientService.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); - String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT),authorization, authorization1); + if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = JSONObject.parseObject(resposeJsonStr, HashMap.class); boolean success = (boolean) resposeMap.get("success"); String msg = resposeMap.get("msg").toString(); @@ -119,7 +123,7 @@ public class ParseStaticThreshold { * 根据vsysId获取静态阈值配置列表 * @return thresholds */ - private static ArrayList getDosDetectionThreshold() { + private ArrayList getDosDetectionThreshold() { ArrayList vsysThresholds = new ArrayList<>(); ArrayList vsysIds = getVsysId(); try { @@ -127,19 +131,19 @@ public class ParseStaticThreshold { for (DosVsysId dosVsysId : vsysIds) { Integer vsysId = dosVsysId.getId() == null ? 1 : dosVsysId.getId(); Integer[] superiorIds = dosVsysId.getSuperior_ids(); - URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.BIFANG_SERVER_URI); + URIBuilder uriBuilder = new URIBuilder(configuration.get(BIFANG_SERVER_URI)); HashMap parms = new HashMap<>(); parms.put("page_size", -1); // parms.put("order_by", "profileId asc"); parms.put("is_valid", 1); parms.put("vsys_id", vsysId); - HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms); - String token = FlowWriteConfig.BIFANG_SERVER_TOKEN; - if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) { + httpClientService.setUrlWithParams(uriBuilder, configuration.get(BIFANG_SERVER_POLICY_THRESHOLD_PATH), parms); + String token = configuration.get(BIFANG_SERVER_TOKEN); + if (!httpClientService.ERROR_MESSAGE.equals(token)) { BasicHeader authorization = new BasicHeader("Authorization", token); BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded"); - String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1); - if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) { + String resposeJsonStr = httpClientService.httpGet(uriBuilder.build(),configuration.get(HTTP_SOCKET_TIMEOUT),authorization, authorization1); + if (!httpClientService.ERROR_MESSAGE.equals(resposeJsonStr)) { HashMap resposeMap = JSONObject.parseObject(resposeJsonStr,HashMap.class); boolean success = (boolean) resposeMap.get("success"); String msg = resposeMap.get("msg").toString(); @@ -175,7 +179,7 @@ public class ParseStaticThreshold { * * @return threshold RangeMap */ - static HashMap>> createStaticThreshold() { + public HashMap>> createStaticThreshold() { HashMap>> thresholdRangeMap = new HashMap<>(4); try { ArrayList dosDetectionThreshold = getDosDetectionThreshold(); @@ -185,10 +189,7 @@ public class ParseStaticThreshold { String attackType = threshold.getAttack_type(); int vsysId = threshold.getVsys_id(); HashMap> rangeMap = thresholdRangeMap.getOrDefault(vsysId, new HashMap<>()); - TreeRangeMap treeRangeMap = rangeMap.getOrDefault(attackType, TreeRangeMap.create()); - - ArrayList serverIpList = threshold.getServer_ip_list(); for (String sip : serverIpList) { @@ -239,11 +240,11 @@ public class ParseStaticThreshold { * * @return token */ - private static String loginBifangServer() { - String token = HttpClientUtils.ERROR_MESSAGE; + private String loginBifangServer() { + String token = httpClientService.ERROR_MESSAGE; try { final HashMap parmsMap = new HashMap<>(); - String urlString = FlowWriteConfig.BIFANG_SERVER_URI+FlowWriteConfig.BIFANG_SERVER_LOGIN_PATH; + String urlString = configuration.get(BIFANG_SERVER_URI)+configuration.get(BIFANG_SERVER_LOGIN_PATH); parmsMap.put("username","admin"); parmsMap.put("password",encryptpwd); parmsMap.put("auth_mode",""); diff --git a/src/main/java/com/zdjizhi/utils/ZookeeperUtils.java b/src/main/java/com/zdjizhi/utils/ZookeeperUtils.java deleted file mode 100644 index 7007d8e..0000000 --- a/src/main/java/com/zdjizhi/utils/ZookeeperUtils.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.zdjizhi.utils; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.log.Log; -import cn.hutool.log.LogFactory; -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - - -public class ZookeeperUtils implements Watcher { -// private static final Logger logger = LoggerFactory.getLogger(ZookeeperUtils.class); - private static final Log logger = LogFactory.get(); - - private ZooKeeper zookeeper; - - private static final int SESSION_TIME_OUT = 20000; - - private CountDownLatch countDownLatch = new CountDownLatch(1); - - @Override - public void process(WatchedEvent event) { - if (event.getState() == Event.KeeperState.SyncConnected) { - countDownLatch.countDown(); - } - } - - - /** - * 修改节点信息 - * - * @param path 节点路径 - */ - int modifyNode(String path, String zookeeperIp) { - createNode(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, zookeeperIp); - int workerId = 0; - try { - connectZookeeper(zookeeperIp); - Stat stat = zookeeper.exists(path, true); - workerId = Integer.parseInt(getNodeDate(path)); - if (workerId > 63) { - workerId = 0; - zookeeper.setData(path, "1".getBytes(), stat.getVersion()); - } else { - String result = String.valueOf(workerId + 1); - if (stat != null) { - zookeeper.setData(path, result.getBytes(), stat.getVersion()); - } else { - logger.error("Node does not exist!,Can't modify"); - } - } - } catch (KeeperException | InterruptedException e) { - logger.error("modify error Can't modify," + e); - } finally { - closeConn(); - } - logger.warn("workerID is:" + workerId); - return workerId; - } - - /** - * 连接zookeeper - * - * @param host 地址 - */ - private void connectZookeeper(String host) { - try { - zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this); - countDownLatch.await(); - } catch (IOException | InterruptedException e) { - logger.error("Connection to the Zookeeper Exception! message:" + e); - } - } - - /** - * 关闭连接 - */ - private void closeConn() { - try { - if (zookeeper != null) { - zookeeper.close(); - } - } catch (InterruptedException e) { - logger.error("Close the Zookeeper connection Exception! message:" + e); - } - } - - /** - * 获取节点内容 - * - * @param path 节点路径 - * @return 内容/异常null - */ - private String getNodeDate(String path) { - String result = null; - Stat stat = new Stat(); - try { - byte[] resByte = zookeeper.getData(path, true, stat); - - result = StrUtil.str(resByte, "UTF-8"); - } catch (KeeperException | InterruptedException e) { - logger.error("Get node information exception" + e); - } - return result; - } - - /** - * @param path 节点创建的路径 - * @param date 节点所存储的数据的byte[] - * @param acls 控制权限策略 - */ - private void createNode(String path, byte[] date, List acls, String zookeeperIp) { - try { - connectZookeeper(zookeeperIp); - Stat exists = zookeeper.exists(path, true); - if (exists == null) { - Stat existsSnowflakeld = zookeeper.exists("/Snowflake", true); - if (existsSnowflakeld == null) { - zookeeper.create("/Snowflake", null, acls, CreateMode.PERSISTENT); - } - zookeeper.create(path, date, acls, CreateMode.PERSISTENT); - } else { - logger.warn("Node already exists ! Don't need to create"); - } - } catch (KeeperException | InterruptedException e) { - logger.error(e.toString()); - } finally { - closeConn(); - } - } - -} diff --git a/src/main/java/com/zdjizhi/utils/HbaseUtils.java b/src/main/java/com/zdjizhi/utils/connections/hbase/HbaseUtils.java similarity index 97% rename from src/main/java/com/zdjizhi/utils/HbaseUtils.java rename to src/main/java/com/zdjizhi/utils/connections/hbase/HbaseUtils.java index 428298b..8e8cdd2 100644 --- a/src/main/java/com/zdjizhi/utils/HbaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/connections/hbase/HbaseUtils.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils; +package com.zdjizhi.utils.connections.hbase; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; diff --git a/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java b/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java index 280c0a2..413350f 100644 --- a/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java +++ b/src/main/java/com/zdjizhi/utils/connections/http/HttpClientService.java @@ -4,9 +4,9 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.geedgenetworks.utils.StringUtil; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.utils.exception.FlowWriteException; import org.apache.commons.io.IOUtils; +import org.apache.flink.configuration.Configuration; import org.apache.http.*; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpRequestRetryHandler; @@ -14,6 +14,7 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; @@ -34,14 +35,23 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.net.SocketTimeoutException; +import java.net.URI; import java.net.UnknownHostException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; +import java.util.Map; + +import static com.zdjizhi.conf.DosConfigs.*; public class HttpClientService { + private static final Log logger = LogFactory.get(); + public static final String ERROR_MESSAGE = "-1"; + private Configuration configuration; - private static final Log log = LogFactory.get(); + public HttpClientService(Configuration configuration) { + this.configuration = configuration; + } /** * 在调用SSL之前需要重写验证方法,取消检测SSL @@ -75,9 +85,9 @@ public class HttpClientService { // 创建ConnectionManager,添加Connection配置信息 PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); // 设置最大连接数 - connManager.setMaxTotal(FlowWriteConfig.HTTP_POOL_MAX_CONNECTION); + connManager.setMaxTotal(configuration.get(HTTP_POOL_MAX_CONNECTION)); // 设置每个连接的路由数 - connManager.setDefaultMaxPerRoute(FlowWriteConfig.HTTP_POOL_MAX_PER_ROUTE); + connManager.setDefaultMaxPerRoute(configuration.get(HTTP_POOL_MAX_PER_ROUTE)); return connManager; } catch (KeyManagementException | NoSuchAlgorithmException e) { throw new FlowWriteException(e.getMessage()); @@ -94,9 +104,9 @@ public class HttpClientService { // 创建Http请求配置参数 RequestConfig requestConfig = RequestConfig.custom() // 获取连接超时时间 - .setConnectionRequestTimeout(FlowWriteConfig.HTTP_POOL_REQUEST_TIMEOUT) + .setConnectionRequestTimeout(configuration.get(HTTP_POOL_REQUEST_TIMEOUT)) // 请求超时时间 - .setConnectTimeout(FlowWriteConfig.HTTP_POOL_CONNECT_TIMEOUT) + .setConnectTimeout(configuration.get(HTTP_POOL_CONNECT_TIMEOUT)) // 响应超时时间 .setSocketTimeout(socketTimeOut) .build(); @@ -192,13 +202,13 @@ public class HttpClientService { // 获取响应信息 EntityUtils.consume(response.getEntity()); } catch (ClientProtocolException e) { - log.error("current file: {},Protocol error:{}", url, e.getMessage()); + logger.error("current file: {},Protocol error:{}", url, e.getMessage()); } catch (ParseException e) { - log.error("current file: {}, Parser error:{}", url, e.getMessage()); + logger.error("current file: {}, Parser error:{}", url, e.getMessage()); } catch (IOException e) { - log.error("current file: {},IO error:{}", url, e.getMessage()); + logger.error("current file: {},IO error:{}", url, e.getMessage()); } finally { if (null != response) { @@ -206,7 +216,7 @@ public class HttpClientService { EntityUtils.consume(response.getEntity()); response.close(); } catch (IOException e) { - log.error("Release Connection error:{}", e.getMessage()); + logger.error("Release Connection error:{}", e.getMessage()); } } @@ -236,13 +246,13 @@ public class HttpClientService { // 获取响应信息 EntityUtils.consume(response.getEntity()); } catch (ClientProtocolException e) { - log.error("current file: {},Protocol error:{}", url, e.getMessage()); + logger.error("current file: {},Protocol error:{}", url, e.getMessage()); } catch (ParseException e) { - log.error("current file: {}, Parser error:{}", url, e.getMessage()); + logger.error("current file: {}, Parser error:{}", url, e.getMessage()); } catch (IOException e) { - log.error("current file: {},IO error:{}", url, e.getMessage()); + logger.error("current file: {},IO error:{}", url, e.getMessage()); } finally { if (null != response) { @@ -250,7 +260,7 @@ public class HttpClientService { EntityUtils.consume(response.getEntity()); response.close(); } catch (IOException e) { - log.error("Release Connection error:{}", e.getMessage()); + logger.error("Release Connection error:{}", e.getMessage()); } } @@ -258,4 +268,73 @@ public class HttpClientService { } } + /** + * GET请求 + * + * @param uri 请求地 + * @return message + */ + public String httpGet(URI uri, int socketTimeout, Header... headers) { + String msg = ERROR_MESSAGE; + + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(socketTimeout); + CloseableHttpResponse response = null; + + try { + logger.info("http get uri {}", uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + logger.info("request header : {}", h); + } + } + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + logger.error("Http get content is :{}", msg); + } + } catch (ClientProtocolException e) { + logger.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + logger.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + logger.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + logger.error("释放链接错误: {}", e.getMessage()); + + } + } + } + + return msg; + } + + public void setUrlWithParams(URIBuilder uriBuilder, String path, Map params) { + try { + uriBuilder.setPath(path); + if (params != null && !params.isEmpty()) { + for (Map.Entry kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(), kv.getValue().toString()); + } + } + } catch (Exception e) { + logger.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params); + } + } + } diff --git a/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java b/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java new file mode 100644 index 0000000..01ef035 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaConsumer.java @@ -0,0 +1,17 @@ +package com.zdjizhi.utils.connections.kafka; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; + +import java.util.Properties; + + +public class KafkaConsumer { + + public static FlinkKafkaConsumer getKafkaConsumer(String topic, Properties Properties){ + final FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), Properties); + return kafkaConsumer; + } + +} + diff --git a/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaProducer.java new file mode 100644 index 0000000..717570c --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/connections/kafka/KafkaProducer.java @@ -0,0 +1,23 @@ +package com.zdjizhi.utils.connections.kafka; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import java.util.Optional; +import java.util.Properties; + + +public class KafkaProducer { + + public static FlinkKafkaProducer getKafkaProducer(String topic, Properties Properties){ + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( + topic, + new SimpleStringSchema(), + Properties, + Optional.empty() + ); + kafkaProducer.setLogFailuresOnly(true); + + return kafkaProducer; + } +} diff --git a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java index 44fdef8..da4dfe1 100644 --- a/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java +++ b/src/main/java/com/zdjizhi/utils/exception/FlowWriteException.java @@ -1,8 +1,6 @@ package com.zdjizhi.utils.exception; - public class FlowWriteException extends RuntimeException { - public FlowWriteException() { } diff --git a/src/main/java/com/zdjizhi/utils/IpLookupUtils.java b/src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java similarity index 77% rename from src/main/java/com/zdjizhi/utils/IpLookupUtils.java rename to src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java index 2939f63..d219aa7 100644 --- a/src/main/java/com/zdjizhi/utils/IpLookupUtils.java +++ b/src/main/java/com/zdjizhi/utils/knowledgebase/IpLookupUtils.java @@ -1,4 +1,4 @@ -package com.zdjizhi.utils; +package com.zdjizhi.utils.knowledgebase; import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.log.Log; @@ -7,17 +7,17 @@ import com.alibaba.fastjson2.*; import com.geedgenetworks.utils.IpLookupV2; import com.geedgenetworks.utils.StringUtil; import com.google.common.base.Joiner; -import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.pojo.KnowlegeBaseMeta; import com.zdjizhi.utils.connections.http.HttpClientService; +import org.apache.flink.configuration.Configuration; import org.apache.http.client.utils.URIBuilder; import java.io.ByteArrayInputStream; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; + +import static com.zdjizhi.conf.DosConfigs.*; /** @@ -26,9 +26,10 @@ import java.util.TimerTask; */ public class IpLookupUtils { private static final Log logger = LogFactory.get(); - private static final String ipBuiltInName = "ip_builtin.mmdb"; - private static final String ipUserDefinedName = "ip_user_defined.mmdb"; + private final String ipBuiltInName = "ip_builtin.mmdb"; + private final String ipUserDefinedName = "ip_user_defined.mmdb"; + private Configuration configuration; /** * ip定位库 */ @@ -47,7 +48,7 @@ public class IpLookupUtils { /** * http connections */ - private static final HttpClientService httpClientService; + private final HttpClientService httpClientService; /** * 定位库元数据缓存 @@ -58,27 +59,19 @@ public class IpLookupUtils { private static String currentSha256IpBuiltin = ""; - static { - httpClientService = new HttpClientService(); - stuffKnowledgeMetaCache(); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - stuffKnowledgeMetaCache(); - } - }, 0, FlowWriteConfig.KNOWLEDGE_EXECUTION_INTERVAL); + public IpLookupUtils(Configuration configuration, HttpClientService httpClientService) { + this.configuration = configuration; + this.httpClientService = httpClientService; } + public void stuffKnowledgeMetaCache() { + KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(configuration.get(IP_BUILTIN_KD_ID)); - - private static void stuffKnowledgeMetaCache(){ - final KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_BUILTIN_KD_ID); if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) { String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat()); knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta); } - final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID); + final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(configuration.get(IP_USER_DEFINED_KD_ID)); if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) { String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat()); knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta); @@ -87,13 +80,14 @@ public class IpLookupUtils { currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256(); currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256(); reloadIpLookup(); + logger.debug("定位库信息重新加载成功,当前ip_builtin.mmdb的Sha256编码为:" + currentSha256IpBuiltin, "ip_user_defined.mmdb的Sha256编码为" + currentSha256IpUserDefined); } } /** * 从HDFS下载文件更新IpLookup */ - private static void reloadIpLookup() { + private void reloadIpLookup() { IpLookupV2.Builder builder = new IpLookupV2.Builder(false); for (String fileName : knowledgeMetaCache.keySet()) { int retryNum = 0; @@ -102,7 +96,7 @@ public class IpLookupUtils { while (retryNum < TRY_TIMES) { System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath()); Long startTime = System.currentTimeMillis(); - byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT); + byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), configuration.get(HTTP_SOCKET_TIMEOUT)); if (httpGetByte != null && httpGetByte.length > 0) { String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte); if (metaSha256.equals(downloadFileSha256)) { @@ -137,28 +131,23 @@ public class IpLookupUtils { * * @return 过滤参数 */ - private static String getFilterParameter() { - + private String getFilterParameter() { String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined'))]"; - return expr; } - public static String getCountryLookup(String ip) { - return ipLookup.countryLookup(ip); - } - - private static KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) { + private KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) { KnowlegeBaseMeta knowlegeBaseMeta = null; String knowledgeInfo = null; try { - URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.KNOWLEDGE_BASE_URL); + URIBuilder uriBuilder = new URIBuilder(configuration.get(KNOWLEDGE_BASE_URL)); HashMap parms = new HashMap<>(); parms.put("kb_id", kd_id); - HttpClientUtils.setUrlWithParams(uriBuilder, FlowWriteConfig.KNOWLEDGE_BASE_PATH, parms); - knowledgeInfo = HttpClientUtils.httpGet(uriBuilder.build()); + httpClientService.setUrlWithParams(uriBuilder, configuration.get(KNOWLEDGE_BASE_PATH), parms); + knowledgeInfo = httpClientService.httpGet(uriBuilder.build(), configuration.get(HTTP_SOCKET_TIMEOUT)); if (knowledgeInfo.contains("200")) { final Map jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class); + logger.debug("获取kd_id为[" + kd_id + "]的knowledge_base成功,响应信息为" + jsonObject); JSONPath jsonPath = JSONPath.of(getFilterParameter()); String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString(); if (StringUtil.isNotBlank(extract)) { @@ -179,4 +168,13 @@ public class IpLookupUtils { } return knowlegeBaseMeta; } + + public String getCountryLookup(String ip) { + if (ipLookup != null) { + return ipLookup.countryLookup(ip); + } else { + return null; + } + } + } diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties deleted file mode 100644 index 060756e..0000000 --- a/src/main/resources/common.properties +++ /dev/null @@ -1,149 +0,0 @@ -#flink运行环境并行度,其优先级低于算子并行度,如果未设置算子并行度,则使用该数值 -stream.execution.environment.parallelism=1 - -#flink任务名,一般不变 -stream.execution.job.name=DOS-DETECTION-APPLICATION - -#输入kafka并行度大小 -kafka.input.parallelism=3 - -#输入kafka topic名 -kafka.input.topic.name=DOS-SKETCH-RECORD - -#输入kafka地址 -#kafka.input.bootstrap.servers=192.168.44.12:9094 -kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 - -#读取kafka group id -kafka.input.group.id=dos-detection-job-221125-23132 -#kafka.input.group.id=dos-detection-job-210813-1 - -#发送kafka metrics并行度大小 -kafka.output.metric.parallelism=3 - -#发送kafka metrics topic名 -#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS -kafka.output.metric.topic.name=test - -#发送kafka event并行度大小 -kafka.output.event.parallelism=3 - -#发送kafka event topic名 -#kafka.output.event.topic.name=DOS-EVENT -kafka.output.event.topic.name=dos-test - -#kafka输出地址 -kafka.output.bootstrap.servers=192.168.44.12:9094 -#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092 - -#zookeeper地址 -hbase.zookeeper.quorum=192.168.44.12:2181 -#hbase.zookeeper.quorum=192.168.40.151:2181,192.168.40.152:2181,192.168.40.203:2181 -#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181 - -#hbase客户端处理时间 -hbase.client.operation.timeout=30000 -hbase.client.scanner.timeout.period=30000 - -##hbase baseline表名 -hbase.baseline.table.name=dos:ddos_traffic_baselines - -#读取baseline限制 -hbase.baseline.total.num=1000000 - -#baseline ttl,单位:天 -hbase.baseline.ttl=10 - -#设置聚合并行度,2个key -flink.first.agg.parallelism=1 - -#设置结果判定并行度 -flink.detection.map.parallelism=1 - -#watermark延迟 -flink.watermark.max.orderness=300 - -#计算窗口大小,默认600s -flink.window.max.time=60 - -#dos event结果中distinct source IP限制 -source.ip.list.limit=10000 - -#基于目的IP的分区数,默认为10000,一般不变 -destination.ip.partition.num=10000 - -data.center.id.num=15 - -#http请求相关参数 -#最大连接数 -http.pool.max.connection=400 - -#单路由最大连接数 -http.pool.max.per.route=80 - -#向服务端请求超时时间设置(单位:毫秒) -http.pool.request.timeout=60000 - -#向服务端连接超时时间设置(单位:毫秒) -http.pool.connect.timeout=60000 - -#服务端响应超时时间设置(单位:毫秒) -http.pool.response.timeout=60000 - -#获取静态阈值周期,默认十分钟 -static.threshold.schedule.minutes=10 - -#获取baseline周期,默认7天 -baseline.threshold.schedule.days=1 - -#kafka用户认证配置参数 -sasl.jaas.config.user=admin -#sasl.jaas.config.password=galaxy2019 -sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ - -#是否开启kafka用户认证配置,1:是;0:否 -sasl.jaas.config.flag=1 - -http.socket.timeout=90000 - -############################## Knowledge Base 配置 ###################################### -knowledge.execution.interval=30000 -knowledge.base.uri=http://192.168.44.12:9999 -knowledge.base.path=/v1/knowledge_base -ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289 -ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9 - -############################## Bifang Server 配置 ###################################### -bifang.server.token=aa2bdec5518ad131f71944b13ce5c298&1& -#bifang服务访问地址 -bifang.server.uri=http://192.168.44.72 -#bifang.server.uri=http://192.168.44.3:80 - -#加密密码路径信息 -bifang.server.encryptpwd.path=/v1/user/encryptpwd - -#登录bifang服务路径信息 -bifang.server.login.path=/v1/user/login - -#获取vaysId路径信息 -bifang.server.policy.vaysid.path=/v1/admin/vsys - -#获取静态阈值路径信息 -bifang.server.policy.threshold.path=/v1/policy/profile/dos_detection - - -############################## 基线 配置 ###################################### -static.sensitivity.threshold=1 -#基线敏感阈值 -baseline.sensitivity.threshold=0.2 - -#基于baseline判定dos攻击的上下限 -baseline.sessions.minor.threshold=0.2 -baseline.sessions.warning.threshold=1 -baseline.sessions.major.threshold=2.5 -baseline.sessions.severe.threshold=5 -baseline.sessions.critical.threshold=8 - - - - diff --git a/src/main/resources/detection_dos_attack.properties b/src/main/resources/detection_dos_attack.properties new file mode 100644 index 0000000..e0f8191 --- /dev/null +++ b/src/main/resources/detection_dos_attack.properties @@ -0,0 +1,44 @@ +#kafka source +source.kafka.topic=DOS-SKETCH-RECORD +source.kafka.props.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +source.kafka.props.group.id=dsadsadsad +source.kafka.props.session.timeout.ms=60000 +source.kafka.props.max.poll.records=5000 +source.kafka.props.max.partition.fetch.bytes=31457280 +source.kafka.props.security.protocol=SASL_PLAINTEXT +source.kafka.props.sasl.mechanism=PLAIN +source.kafka.props.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin"password="galaxy2019"; +#kafka sink +kafka.sink.event.topic.name=dos-test +kafka.sink.metric.topic=test +sink.kafka.props.bootstrap.servers=192.168.44.12:9092 +sink.kafka.props.acks=1 +sink.kafka.props.retries=0 +sink.kafka.props.linger.ms=10 +sink.kafka.props.request.timeout.ms=30000 +sink.kafka.props.batch.size=262144 +sink.kafka.props.buffer.memory=134217728 +sink.kafka.props.max.request.size=10485760 +sink.kafka.props.compression.type=none +#zookeeper地址 +hbase.zookeeper.quorum=192.168.44.12:2181 +flink.watermark.max.orderness=300 +#计算窗口大小,默认600s +flink.window.max.time=600 +#bifang服务访问地址 +bifang.server.uri=http://192.168.44.72 +knowledge.base.uri=http://192.168.44.67:9999 +############################## 阈值 配置 ###################################### +static.sensitivity.threshold=1 +#基线敏感阈值 +baseline.sensitivity.threshold=0.2 +#基于baseline判定dos攻击的上下限 +baseline.sessions.minor.threshold=0.2 +baseline.sessions.warning.threshold=1 +baseline.sessions.major.threshold=2.5 +baseline.sessions.severe.threshold=5 +baseline.sessions.critical.threshold=8 + + + + diff --git a/src/test/java/com/zdjizhi/Http/HttpTest.java b/src/test/java/com/zdjizhi/Http/HttpTest.java deleted file mode 100644 index 3e747fa..0000000 --- a/src/test/java/com/zdjizhi/Http/HttpTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zdjizhi.Http; - -import com.alibaba.fastjson2.JSON; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.utils.HttpClientUtils; - -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.HashMap; - -public class HttpTest { - public static void main(String[] args) { - String token = HttpClientUtils.ERROR_MESSAGE; - try { - - String urlString = FlowWriteConfig.BIFANG_SERVER_URI+"/v1/user/encryptpwd"; - final HashMap parmsMap = new HashMap<>(); - parmsMap.put("username","admin"); - - final String jsonInputString = JSON.toJSONString(parmsMap); - System.out.println("URL:"+urlString); - System.out.println("parmsString:"+jsonInputString); - - - final URL url = new URL(urlString); - - final HttpURLConnection connection = (HttpURLConnection)url.openConnection(); - - connection.setRequestMethod("POST"); - - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Accept", "application/json"); - connection.setDoOutput(true); - OutputStream os = connection.getOutputStream(); - os.write(jsonInputString.getBytes()); - os.flush(); - os.close(); - - - int responseCode = connection.getResponseCode(); - System.out.println("Response Code: " + responseCode); - - - - } catch (Exception e) { - System.out.println("失败"); - } - } -} diff --git a/src/test/java/com/zdjizhi/common/HbaseTest.java b/src/test/java/com/zdjizhi/common/HbaseTest.java index 3467cc3..18678e1 100644 --- a/src/test/java/com/zdjizhi/common/HbaseTest.java +++ b/src/test/java/com/zdjizhi/common/HbaseTest.java @@ -19,12 +19,12 @@ public class HbaseTest { public static void main(String[] args) throws IOException { org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); - config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM); - config.set("hbase.client.retries.number", "3"); - config.set("hbase.bulkload.retries.number", "3"); - config.set("zookeeper.recovery.retry", "3"); - config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT); - config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); +// config.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_QUORUM); +// config.set("hbase.client.retries.number", "3"); +// config.set("hbase.bulkload.retries.number", "3"); +// config.set("zookeeper.recovery.retry", "3"); +// config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, FlowWriteConfig.HBASE_CLIENT_OPERATION_TIMEOUT); +// config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, FlowWriteConfig.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); TableName tableName = TableName.valueOf("dos_test"); Connection conn = ConnectionFactory.createConnection(config); diff --git a/src/test/java/com/zdjizhi/common/NacosTest.java b/src/test/java/com/zdjizhi/common/NacosTest.java index 0fe6cb2..00fe64b 100644 --- a/src/test/java/com/zdjizhi/common/NacosTest.java +++ b/src/test/java/com/zdjizhi/common/NacosTest.java @@ -55,7 +55,7 @@ public class NacosTest { String content = configService.getConfig(DATA_ID, GROUP, 5000); Properties nacosConfigMap = new Properties(); nacosConfigMap.load(new StringReader(content)); - System.out.println(FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD); +// System.out.println(FlowWriteConfig.STATIC_SENSITIVITY_THRESHOLD); } catch (Exception e) { e.printStackTrace(); } diff --git a/src/test/java/com/zdjizhi/etl/DosDetectionTest.java b/src/test/java/com/zdjizhi/etl/DosDetectionTest.java index 4428dc0..5634e4a 100644 --- a/src/test/java/com/zdjizhi/etl/DosDetectionTest.java +++ b/src/test/java/com/zdjizhi/etl/DosDetectionTest.java @@ -5,8 +5,6 @@ import com.zdjizhi.common.DosDetectionThreshold; import com.zdjizhi.common.DosEventLog; import com.zdjizhi.common.DosSketchLog; -import com.zdjizhi.utils.IpLookupUtils; -import com.zdjizhi.utils.SnowflakeId; import org.apache.commons.lang3.StringUtils; import org.junit.Test; @@ -174,10 +172,10 @@ public class DosDetectionTest { String[] ipArr = sourceIpList.split(","); HashSet countrySet = new HashSet<>(); for (String ip : ipArr) { - String country = IpLookupUtils.getCountryLookup(ip); - if (StringUtil.isNotBlank(country)){ - countrySet.add(country); - } + // String country = IpLookupUtils.getCountryLookup(ip); + // if (StringUtil.isNotBlank(country)){ + // countrySet.add(country); + // } } countryList = StringUtils.join(countrySet, ", "); return countryList;