diff --git a/pom.xml b/pom.xml
index 0510a9e..52bd475 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,10 @@
jdk.tools
jdk.tools
+
+ guava
+ com.google.guava
+
@@ -222,6 +226,25 @@
+
+
+
+
+
+
+
+
+ com.alibaba.nacos
+ nacos-client
+ 1.2.0
+
+
+
+ junit
+ junit
+ 4.12
+
+
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index 325bc29..c47361b 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -1,6 +1,7 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.CommonConfigurations;
+import com.zdjizhi.utils.NacosUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
@@ -48,14 +49,14 @@ public class CommonConfig {
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
- public static final int STATIC_SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("static.sensitivity.threshold");
- public static final double BASELINE_SENSITIVITY_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sensitivity.threshold");
+ public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold");
+ public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold");
- public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
- public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
- public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
- public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
- public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
+ public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold");
+ public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold");
+ public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold");
+ public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold");
+ public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold");
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
@@ -82,7 +83,8 @@ public class CommonConfig {
// 配置加密解密的密码/salt值
encryptor.setPassword("galaxy");
// 对"raw_password"进行加密:S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
- String password = "galaxy2019";
+// String password = "galaxy2019";
+ String password = "nacos";
String encPwd = encryptor.encrypt(password);
System.out.println(encPwd);
// 再进行解密:raw_password
diff --git a/src/main/java/com/zdjizhi/utils/NacosUtils.java b/src/main/java/com/zdjizhi/utils/NacosUtils.java
new file mode 100644
index 0000000..5dd753c
--- /dev/null
+++ b/src/main/java/com/zdjizhi/utils/NacosUtils.java
@@ -0,0 +1,89 @@
+package com.zdjizhi.utils;
+
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+public class NacosUtils {
+ private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class);
+ private static Properties nacosProperties = new Properties();
+ private static Properties commonProperties = new Properties();
+
+
+ private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
+ private static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
+ private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
+ private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
+ private static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
+ private static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
+ private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout");
+
+ static {
+ createConfigService();
+ }
+
+ private static void getProperties() {
+ nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR);
+ nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE);
+ nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME);
+ nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD);
+ }
+
+ private static void createConfigService() {
+ try {
+ getProperties();
+ ConfigService configService = NacosFactory.createConfigService(nacosProperties);
+ String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
+ commonProperties.load(new StringReader(config));
+
+ configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ try {
+ commonProperties.load(new StringReader(configMsg));
+ } catch (IOException e) {
+ logger.error("监听nacos配置失败", e);
+ }
+ System.out.println(configMsg);
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("获取nacos配置失败", e);
+ }
+ }
+
+ public static String getStringProperty(String key) {
+ return commonProperties.getProperty(key);
+ }
+
+ public static Integer getIntProperty(String key) {
+ return Integer.parseInt(commonProperties.getProperty(key));
+ }
+
+ public static Double getDoubleProperty(String key) {
+ return Double.parseDouble(commonProperties.getProperty(key));
+ }
+
+ public static Long getLongProperty(String key) {
+ return Long.parseLong(commonProperties.getProperty(key));
+ }
+
+ public static Boolean getBooleanProperty(String key) {
+ return "true".equals(commonProperties.getProperty(key).toLowerCase().trim());
+ }
+
+}
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 1331475..16d0fec 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -15,25 +15,23 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
-kafka.input.group.id=2112080949
+kafka.input.group.id=2203241552
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
kafka.output.metric.parallelism=1
#发送kafka metrics topic名
-#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
-kafka.output.metric.topic.name=test
+kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
#发送kafka event并行度大小
kafka.output.event.parallelism=1
#发送kafka event topic名
-#kafka.output.event.topic.name=DOS-EVENT
-kafka.output.event.topic.name=storm-dos-test
+kafka.output.event.topic.name=DOS-EVENT
#kafka输出地址
-kafka.output.bootstrap.servers=192.168.44.12:9094
+kafka.output.bootstrap.servers=192.168.40.223:9094
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
@@ -132,8 +130,16 @@ baseline.threshold.schedule.days=1
#kafka用户认证配置参数
sasl.jaas.config.user=admin
#sasl.jaas.config.password=galaxy2019
-#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#是否开启kafka用户认证配置,1:是;0:否
-sasl.jaas.config.flag=1
\ No newline at end of file
+sasl.jaas.config.flag=1
+
+#nacos配置
+nacos.server.addr=192.168.44.12:8848
+nacos.namespace=flink
+nacos.username=nacos
+nacos.password=nacos
+nacos.data.id=dos_baseline.properties
+nacos.group=Galaxy
+nacos.read.timeout=5000
\ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/common/IpTest.java b/src/test/java/com/zdjizhi/common/IpTest.java
index 4463f84..219bdb2 100644
--- a/src/test/java/com/zdjizhi/common/IpTest.java
+++ b/src/test/java/com/zdjizhi/common/IpTest.java
@@ -41,14 +41,18 @@ public class IpTest {
IPAddress pv43 = new IPAddressString("fc00::").getAddress();
IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress();
- IPAddress pv45 = new IPAddressString("192.168.42.1/32").getAddress();
+ IPAddress pv45 = new IPAddressString("192.168.42.1").getAddress();
IPAddress pv46 = new IPAddressString("192.168.42.1/32").getAddress();
IPAddress pv47 = new IPAddressString("12.56.4.0").getAddress();
+
+ IPAddress mask = pv45.getNetwork().getNetworkMask(24, false);
+
System.out.println(pv45.isMultiple());
System.out.println(pv46.isMultiple());
System.out.println(pv46.isPrefixed());
System.out.println(pv47.isPrefixed());
System.out.println(pv45+"---"+pv45.toMaxHost().withoutPrefixLength()+"---"+pv45.adjustPrefixLength(pv45.getBitCount()));
+ System.out.println(pv45+"---mask:"+pv45.mask(mask).toString());
System.out.println(pv45.adjustPrefixLength(pv45.getBitCount())+"---"+pv45.toMaxHost().withoutPrefixLength());
diff --git a/src/test/java/com/zdjizhi/common/NacosTest.java b/src/test/java/com/zdjizhi/common/NacosTest.java
new file mode 100644
index 0000000..e7aa00b
--- /dev/null
+++ b/src/test/java/com/zdjizhi/common/NacosTest.java
@@ -0,0 +1,101 @@
+package com.zdjizhi.common;
+
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import com.alibaba.nacos.api.exception.NacosException;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+
+/**
+ * @author qidaijie
+ * @Package com.zdjizhi
+ * @Description:
+ * @date 2022/3/1016:58
+ */
+public class NacosTest {
+
+ /**
+ *
+ * com.alibaba.nacos
+ * nacos-client
+ * 1.2.0
+ *
+ */
+
+ private static Properties properties = new Properties();
+ /**
+ * config data id = config name
+ */
+ private static final String DATA_ID = "dos_baseline.properties";
+ /**
+ * config group
+ */
+ private static final String GROUP = "Galaxy";
+
+ private void getProperties() {
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
+ properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
+ properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
+ properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
+ }
+
+
+ @Test
+ public void GetConfigurationTest() {
+ try {
+ getProperties();
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String content = configService.getConfig(DATA_ID, GROUP, 5000);
+ Properties nacosConfigMap = new Properties();
+ nacosConfigMap.load(new StringReader(content));
+ System.out.println(nacosConfigMap.getProperty("static.sensitivity.threshold"));
+ } catch (NacosException | IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Test
+ public void ListenerConfigurationTest() {
+ getProperties();
+ try {
+ //first get config
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ String config = configService.getConfig(DATA_ID, GROUP, 5000);
+// System.out.println(config);
+
+ //start listenner
+ configService.addListener(DATA_ID, GROUP, new Listener() {
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configMsg) {
+ System.out.println(configMsg);
+ }
+ });
+ } catch (NacosException e) {
+ e.printStackTrace();
+ }
+
+ //keep running,change nacos config,print new config
+ /*
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ */
+ }
+}