11 Commits
tsg-v11 ... dev

Author SHA1 Message Date
unknown
84a1e6879a 修复动态获取nacos配置 2022-06-28 11:15:49 +08:00
unknown
ab8f6aba81 修复动态获取nacos配置 2022-06-28 11:14:54 +08:00
unknown
94e8fb807a 集成nacos,修复依赖冲突,启动报错 2022-06-22 15:58:49 +08:00
wanglihui
cead1d4d99 Merge branch 'tsg-22.06' of git.mesalab.cn:bigdata/tsg/flink-dos-detection 2022-06-20 18:15:24 +08:00
徐鹏飞
2d98c3b6e6 修改获取静态阈值逻辑,根据默认指定vsysID获取 2022-06-13 18:00:57 +08:00
徐鹏飞
3dc29a07be 新增根据vsysId从bifang获取静态阈值 2022-05-31 18:00:36 +08:00
wanglihui
1fcdb79739 DoS事件日志对Conditions基于速率检测属性值修正 2022-05-17 13:54:43 +08:00
wanglihui
3d974217d9 集成nacos,将部分配置放到nacos中管理。 2022-04-08 10:16:29 +08:00
wanglihui
db17064f73 更新工具库到1.0.8适配新版MMDB定位库 2022-02-11 16:06:24 +08:00
wanglihui
065e5abb09 增加jasypt配置加密方式 2021-12-20 13:50:18 +08:00
wanglihui
75bbdd2962 更新DoS检测程序,新增读取baseline TTL配置。
修复DoS检测-Conditions阈值描述语言逻辑问题。
2021-12-08 13:51:09 +08:00
13 changed files with 495 additions and 81 deletions

58
pom.xml
View File

@@ -102,6 +102,13 @@
<dependencies>
<!-- https://mvnrepository.com/artifact/org.jasypt/jasypt -->
<dependency>
<groupId>org.jasypt</groupId>
<artifactId>jasypt</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -141,6 +148,10 @@
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
@@ -199,7 +210,7 @@
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.6</version>
<version>1.0.8</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@@ -215,6 +226,51 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>1.2.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies>

View File

@@ -1,6 +1,8 @@
package com.zdjizhi.common;
import com.zdjizhi.utils.CommonConfigurations;
import com.zdjizhi.utils.NacosUtils;
import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
/**
* @author wlh
@@ -8,6 +10,12 @@ import com.zdjizhi.utils.CommonConfigurations;
*/
public class CommonConfig {
private static StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
static {
encryptor.setPassword("galaxy");
}
public static final int STREAM_EXECUTION_ENVIRONMENT_PARALLELISM = CommonConfigurations.getIntProperty("stream.execution.environment.parallelism");
public static final String STREAM_EXECUTION_JOB_NAME = CommonConfigurations.getStringProperty("stream.execution.job.name");
@@ -28,6 +36,7 @@ public class CommonConfig {
public static final String HBASE_BASELINE_TABLE_NAME = CommonConfigurations.getStringProperty("hbase.baseline.table.name");
public static final int HBASE_BASELINE_TOTAL_NUM = CommonConfigurations.getIntProperty("hbase.baseline.total.num");
public static final int HBASE_BASELINE_TTL = CommonConfigurations.getIntProperty("hbase.baseline.ttl");
public static final int FLINK_FIRST_AGG_PARALLELISM = CommonConfigurations.getIntProperty("flink.first.agg.parallelism");
public static final int FLINK_DETECTION_MAP_PARALLELISM = CommonConfigurations.getIntProperty("flink.detection.map.parallelism");
@@ -40,14 +49,14 @@ public class CommonConfig {
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
public static final int STATIC_SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("static.sensitivity.threshold");
public static final double BASELINE_SENSITIVITY_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sensitivity.threshold");
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
// public static final int STATIC_SENSITIVITY_THRESHOLD = NacosUtils.getIntProperty("static.sensitivity.threshold");
// public static final double BASELINE_SENSITIVITY_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sensitivity.threshold");
//
// public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold");
// public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold");
// public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.major.threshold");
// public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold");
// public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold");
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
public static final String BIFANG_SERVER_TOKEN = CommonConfigurations.getStringProperty("bifang.server.token");
@@ -55,6 +64,8 @@ public class CommonConfig {
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path");
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
@@ -65,7 +76,21 @@ public class CommonConfig {
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user");
public static final String SASL_JAAS_CONFIG_PASSWORD = CommonConfigurations.getStringProperty("sasl.jaas.config.password");
public static final String SASL_JAAS_CONFIG_PASSWORD = encryptor.decrypt(CommonConfigurations.getStringProperty("sasl.jaas.config.password"));
public static final int SASL_JAAS_CONFIG_FLAG = CommonConfigurations.getIntProperty("sasl.jaas.config.flag");
public static void main(String[] args) {
StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor();
// 配置加密解密的密码/salt值
encryptor.setPassword("galaxy");
// 对"raw_password"进行加密S5kR+Y7CI8k7MaecZpde25yK8NKUnd6p
// String password = "galaxy2019";
String password = "nacos";
String encPwd = encryptor.encrypt(password);
System.out.println(encPwd);
// 再进行解密raw_password
String rawPwd = encryptor.decrypt(encPwd);
System.out.println(rawPwd);
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.common;
import java.util.Objects;
public class DosVsysId {
private int vsysId;
public int getVsysId() {
return vsysId;
}
public void setVsysId(int vsysId) {
this.vsysId = vsysId;
}
@Override
public String toString() {
return "DosVsysId{" +
"vsysId=" + vsysId +
'}';
}
}

View File

@@ -35,6 +35,10 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private final static int BASELINE_CONDITION_TYPE = 2;
private final static int SENSITIVITY_CONDITION_TYPE = 3;
private final static String SESSIONS_TAG = "sessions";
private final static String PACKETS_TAG = "packets";
private final static String BITS_TAG = "bits";
private final static int OTHER_BASELINE_TYPE = 3;
@Override
@@ -81,8 +85,9 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private DosEventLog getDosEventLogBySensitivityThreshold(DosSketchLog value) {
DosEventLog result = null;
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
result = getDosEventLog(value, CommonConfig.STATIC_SENSITIVITY_THRESHOLD, sketchSessions - CommonConfig.STATIC_SENSITIVITY_THRESHOLD, 3, "sessions");
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
long diff = sketchSessions - NacosUtils.getIntProperty("static.sensitivity.threshold");
result = getDosEventLog(value, NacosUtils.getIntProperty("static.sensitivity.threshold"), diff, SENSITIVITY_CONDITION_TYPE, SESSIONS_TAG);
result.setSeverity(Severity.MAJOR.severity);
}
return result;
@@ -93,10 +98,11 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
long sketchSessions = value.getSketch_sessions();
if (sketchSessions > CommonConfig.STATIC_SENSITIVITY_THRESHOLD) {
if (sketchSessions > NacosUtils.getIntProperty("static.sensitivity.threshold")) {
DosBaselineThreshold dosBaselineThreshold = baselineMap.get(destinationIp).get(attackType);
Integer base = getBaseValue(dosBaselineThreshold, value);
result = getDosEventLog(value, base, sketchSessions - base, 2, "sessions");
long diff = sketchSessions - base;
result = getDosEventLog(value, base, diff, BASELINE_CONDITION_TYPE, SESSIONS_TAG);
}
return result;
}
@@ -104,15 +110,15 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
private DosEventLog getDosEventLogByStaticThreshold(DosSketchLog value, DosDetectionThreshold threshold) {
long base = threshold.getSessionsPerSec();
long diff = value.getSketch_sessions() - base;
DosEventLog result = getDosEventLog(value, base, diff, 1, "sessions");
DosEventLog result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, SESSIONS_TAG);
if (result == null) {
base = threshold.getPacketsPerSec();
diff = value.getSketch_packets() - base;
result = getDosEventLog(value, base, diff, 1, "packets");
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, PACKETS_TAG);
if (result == null) {
base = threshold.getBitsPerSec();
diff = value.getSketch_bytes() - base;
result = getDosEventLog(value, base, diff, 1, "bits");
result = getDosEventLog(value, base, diff, STATIC_CONDITION_TYPE, BITS_TAG);
}
}
return result;
@@ -126,10 +132,10 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
double percent = getDiffPercent(diff, base);
Severity severity = judgeSeverity(percent);
if (severity != Severity.NORMAL) {
if (type == BASELINE_CONDITION_TYPE && percent < CommonConfig.BASELINE_SENSITIVITY_THRESHOLD) {
if (type == BASELINE_CONDITION_TYPE && percent < NacosUtils.getDoubleProperty("baseline.sensitivity.threshold")) {
logger.debug("当前server IP:{},类型:{},基线值{}百分比{}未超过基线敏感阈值,日志详情\n{}", destinationIp, attackType, base, percent, value);
} else {
result = getResult(value, base, severity, percent, type, tag);
result = getResult(value, base, severity, percent+1, type, tag);
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,基于{}:{}检测,日志详情\n {}", destinationIp,attackType,base,percent,type,tag,result);
}
} else {
@@ -172,8 +178,8 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
logger.debug("获取到当前IP: {},类型: {} baseline值为0,替换为P95观测值{}", value.getDestination_ip(), value.getAttack_type(), defaultVaule);
base = defaultVaule;
}
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < CommonConfig.STATIC_SENSITIVITY_THRESHOLD){
base = CommonConfig.STATIC_SENSITIVITY_THRESHOLD;
if (sessionRateBaselineType == OTHER_BASELINE_TYPE && base < NacosUtils.getIntProperty("static.sensitivity.threshold")){
base = NacosUtils.getIntProperty("static.sensitivity.threshold");
}
}
}
@@ -187,7 +193,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
switch (type) {
case STATIC_CONDITION_TYPE:
return new StrBuilder()
.append(tag).append(" > ")
.append("Rate > ")
.append(base).append(" ")
.append(tag).append("/s")
.toString();
@@ -214,9 +220,12 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
String[] ipArr = sourceIpList.split(",");
HashSet<String> countrySet = new HashSet<>();
for (String ip : ipArr) {
countrySet.add(IpUtils.ipLookup.countryLookup(ip));
String country = IpUtils.ipLookup.countryLookup(ip);
if (StringUtil.isNotBlank(country)){
countrySet.add(country);
}
}
countryList = StringUtils.join(countrySet, ",");
countryList = StringUtils.join(countrySet, ", ");
return countryList;
} catch (Exception e) {
logger.error("{} source IP lists 获取国家失败", sourceIpList, e);
@@ -240,13 +249,11 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
}
public static void main(String[] args) {
Date date = new Date(1631548860 * 1000L);
System.out.println(date);
Date p1D = DateUtils.getTimeFloor(date, "P1D");
System.out.println(p1D + " " + p1D.getTime() / 1000);
System.out.println(new DosDetection().getCurrentTimeIndex(1634659080));
System.out.println(new DosDetection().getConditions(PERCENT_INSTANCE.format(1.64862), 100, 100, 3, "packets"));
System.out.println(10 + 10 * 0.2);
// System.out.println(new DosDetection().getSourceCountryList("192.0.2.3,138.199.14.31,255.255.255.255,121.14.89.209," +
// "23.200.74.224,161.117.68.253"));
// DosDetection dosDetection = new DosDetection();
// System.out.println(dosDetection.judgeSeverity(dosDetection.getDiffPercent(499, 1000)));
}
private Double getDiffPercent(long diff, long base) {
@@ -254,15 +261,15 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
}
private Severity judgeSeverity(double diffPercent) {
if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MINOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD) {
if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.minor.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold")) {
return Severity.MINOR;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_WARNING_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD) {
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.warning.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.major.threshold")) {
return Severity.WARNING;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_MAJOR_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD) {
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.major.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold")) {
return Severity.MAJOR;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_SEVERE_THRESHOLD && diffPercent < CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.severe.threshold") && diffPercent < NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
return Severity.SEVERE;
} else if (diffPercent >= CommonConfig.BASELINE_SESSIONS_CRITICAL_THRESHOLD) {
} else if (diffPercent >= NacosUtils.getDoubleProperty("baseline.sessions.critical.threshold")) {
return Severity.CRITICAL;
} else {
return Severity.NORMAL;

View File

@@ -2,6 +2,7 @@ package com.zdjizhi.etl;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosBaselineThreshold;
import com.zdjizhi.utils.DateUtils;
import com.zdjizhi.utils.HbaseUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -12,10 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
public class ParseBaselineThreshold {
@@ -45,7 +43,11 @@ public class ParseBaselineThreshold {
TableName tableName = TableName.valueOf(CommonConfig.HBASE_BASELINE_TABLE_NAME);
Connection conn = ConnectionFactory.createConnection(config);
table = conn.getTable(tableName);
scan = new Scan().setAllowPartialResults(true).setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
long currentTimeMillis = System.currentTimeMillis();
scan = new Scan()
.setAllowPartialResults(true)
.setTimeRange(DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime(), currentTimeMillis)
.setLimit(CommonConfig.HBASE_BASELINE_TOTAL_NUM);
logger.info("连接hbase成功正在读取baseline数据");
}
@@ -81,6 +83,13 @@ public class ParseBaselineThreshold {
}
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
long p200D = DateUtils.getSomeDate(new Date(currentTimeMillis), Math.negateExact(CommonConfig.HBASE_BASELINE_TTL)).getTime();
System.out.println(p200D);
System.out.println(currentTimeMillis);
System.out.println(currentTimeMillis - p200D);
Map<String, Map<String, DosBaselineThreshold>> baselineMap = readFromHbase();
Set<String> keySet = baselineMap.keySet();
for (String key : keySet) {

View File

@@ -3,6 +3,7 @@ package com.zdjizhi.etl;
import com.fasterxml.jackson.databind.JavaType;
import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.DosDetectionThreshold;
import com.zdjizhi.common.DosVsysId;
import com.zdjizhi.utils.HttpClientUtils;
import com.zdjizhi.utils.JsonMapper;
import inet.ipaddr.IPAddress;
@@ -29,6 +30,7 @@ public class ParseStaticThreshold {
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
static {
//加载加密登录密码
@@ -99,19 +101,18 @@ public class ParseStaticThreshold {
}
/**
* 获取静态阈值配置列表
* 获取vsysId配置列表
*
* @return thresholds
* @return vsysIdList
*/
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
ArrayList<DosDetectionThreshold> thresholds = null;
private static ArrayList<DosVsysId> getVsysId() {
ArrayList<DosVsysId> vsysIdList = null;
try {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "profileId asc");
parms.put("isValid", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
parms.put("orderBy", "vsysId desc");
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
String token = CommonConfig.BIFANG_SERVER_TOKEN;
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
@@ -125,19 +126,70 @@ public class ParseStaticThreshold {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
logger.info("获取到静态阈值配置{}条", thresholds.size());
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
logger.info("获取到vsysId{}条", vsysIdList.size());
} else {
logger.warn("静态阈值配置为空");
logger.warn("vsysIdList为空");
}
} else {
logger.error(msg);
}
}
}
} catch (Exception e) {
logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e);
}
return vsysIdList;
}
/**
* 根据vsysId获取静态阈值配置列表
*
* @return thresholds
*/
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
ArrayList<DosDetectionThreshold> thresholds = null;
// ArrayList<DosVsysId> vsysId = getVsysId();
try {
// if (vsysId != null){
// for (DosVsysId dosVsysId : vsysId) {
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
HashMap<String, Object> parms = new HashMap<>();
parms.put("pageSize", -1);
parms.put("orderBy", "profileId asc");
parms.put("isValid", 1);
// parms.put("vsysId", dosVsysId.getVsysId());
parms.put("vsysId", 1);
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
String token = CommonConfig.BIFANG_SERVER_TOKEN;
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
BasicHeader authorization = new BasicHeader("Authorization", token);
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
boolean success = (boolean) resposeMap.get("success");
String msg = resposeMap.get("msg").toString();
if (success) {
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
Object list = data.get("list");
if (list != null) {
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
logger.info("获取到静态阈值配置{}条", thresholds.size());
} else {
logger.warn("静态阈值配置为空");
}
} else {
logger.error(msg);
}
}
}
// }
// }
} catch (Exception e) {
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
}
return thresholds;
}
@@ -196,7 +248,6 @@ public class ParseStaticThreshold {
}
public static void main(String[] args) {
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
dosDetectionThreshold.forEach(System.out::println);
@@ -214,7 +265,8 @@ public class ParseStaticThreshold {
}
System.out.println("------------------------");
}
// String s = loginBifangServer();
// System.out.println(s);
}

View File

@@ -1,6 +1,8 @@
package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils {
static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
/*
// 每 1000ms 开始一次 checkpoint
streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000);
// 设置模式为精确一次 (这是默认值)
streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints这样 checkpoint 在作业取消后仍就会被保留
streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints();
*/
}
}

View File

@@ -7,20 +7,20 @@ public class IpUtils {
/**
* IP定位库工具类
*/
public static IpLookup ipLookup = new IpLookup.Builder(false)
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb")
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
public static IpLookupV2 ipLookup = new IpLookupV2.Builder(false)
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4_built_in.mmdb")
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6_built_in.mmdb")
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_v4_user_defined.mmdb")
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_v6_user_defined.mmdb")
.build();
public static void main(String[] args) {
System.out.println(ipLookup.countryLookup("49.7.115.37"));
// String ips = "192.168.50.23,192.168.50.45,192.168.56.9,192.168.56.8,192.168.50.58,192.168.56.7,192.168.56.6,192.168.50.40,192.168.50.19,192.168.50.6,192.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10";
// for (String ip:ips.split(",")){
// System.out.println(ip+"--"+ipLookup.countryLookup(ip));
// }
String ips = "182.168.50.23,182.168.50.45,182.168.56.9,182.168.56.8,92.168.50.58,19.168.56.7,12.168.56.6,2.168.50.40,1.168.50.19,9.168.50.6,2.168.50.4,192.168.56.17,192.168.50.27,192.168.50.26,192.168.50.18,192.168.56.3,192.168.56.10";
for (String ip:ips.split(",")){
System.out.println(ip+"--"+ipLookup.countryLookup(ip));
}
}

View File

@@ -22,12 +22,14 @@ public class KafkaUtils {
}
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
return new FlinkKafkaProducer<String>(
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
topic,
new SimpleStringSchema(),
getKafkaSinkProperty(),
Optional.empty()
);
kafkaProducer.setLogFailuresOnly(true);
return kafkaProducer;
}
}

View File

@@ -0,0 +1,90 @@
package com.zdjizhi.utils;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.Executor;
public class NacosUtils {
private static final Logger logger = LoggerFactory.getLogger(NacosUtils.class);
private static Properties nacosProperties = new Properties();
private static Properties commonProperties = new Properties();
private static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr");
private static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace");
private static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username");
private static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password");
private static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id");
private static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group");
private static final long NACOS_READ_TIMEOUT = CommonConfigurations.getLongProperty("nacos.read.timeout");
static {
createConfigService();
}
private static void getProperties() {
nacosProperties.setProperty(PropertyKeyConst.SERVER_ADDR, NACOS_SERVER_ADDR);
nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, NACOS_NAMESPACE);
nacosProperties.setProperty(PropertyKeyConst.USERNAME, NACOS_USERNAME);
nacosProperties.setProperty(PropertyKeyConst.PASSWORD, NACOS_PASSWORD);
}
private static void createConfigService() {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(nacosProperties);
String config = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_READ_TIMEOUT);
commonProperties.load(new StringReader(config));
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
try {
commonProperties.clear();
commonProperties.load(new StringReader(configMsg));
} catch (IOException e) {
logger.error("监听nacos配置失败", e);
}
System.out.println(configMsg);
}
});
} catch (Exception e) {
e.printStackTrace();
logger.error("获取nacos配置失败", e);
}
}
public static String getStringProperty(String key) {
return commonProperties.getProperty(key);
}
public static Integer getIntProperty(String key) {
return Integer.parseInt(commonProperties.getProperty(key));
}
public static Double getDoubleProperty(String key) {
return Double.parseDouble(commonProperties.getProperty(key));
}
public static Long getLongProperty(String key) {
return Long.parseLong(commonProperties.getProperty(key));
}
public static Boolean getBooleanProperty(String key) {
return "true".equals(commonProperties.getProperty(key).toLowerCase().trim());
}
}

View File

@@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#读取kafka group id
kafka.input.group.id=2109160928
kafka.input.group.id=2112080949
#kafka.input.group.id=dos-detection-job-210813-1
#发送kafka metrics并行度大小
@@ -37,8 +37,8 @@ kafka.output.bootstrap.servers=192.168.44.12:9094
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#zookeeper地址
#hbase.zookeeper.quorum=192.168.44.12:2181
hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
hbase.zookeeper.quorum=192.168.44.12:2181
#hbase.zookeeper.quorum=192.168.44.11:2181,192.168.44.14:2181,192.168.44.15:2181
#hbase客户端处理时间
hbase.client.operation.timeout=30000
@@ -50,6 +50,9 @@ hbase.baseline.table.name=dos:ddos_traffic_baselines
#读取baseline限制
hbase.baseline.total.num=1000000
#baseline ttl单位
hbase.baseline.ttl=30
#设置聚合并行度2个key
flink.first.agg.parallelism=1
@@ -75,18 +78,18 @@ ip.mmdb.path=D:\\data\\dat\\
#ip.mmdb.path=/home/bigdata/topology/dat/
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
#静态敏感阈值,速率小于此值不报警
static.sensitivity.threshold=500
#基线敏感阈值
baseline.sensitivity.threshold=0.2
#基于baseline判定dos攻击的上下限
baseline.sessions.minor.threshold=0.5
baseline.sessions.warning.threshold=1
baseline.sessions.major.threshold=2.5
baseline.sessions.severe.threshold=5
baseline.sessions.critical.threshold=8
##静态敏感阈值,速率小于此值不报警
#static.sensitivity.threshold=500
#
##基线敏感阈值
#baseline.sensitivity.threshold=0.2
#
##基于baseline判定dos攻击的上下限
#baseline.sessions.minor.threshold=0.5
#baseline.sessions.warning.threshold=1
#baseline.sessions.major.threshold=2.5
#baseline.sessions.severe.threshold=5
#baseline.sessions.critical.threshold=8
#bifang服务访问地址
bifang.server.uri=http://192.168.44.72:80
@@ -101,6 +104,9 @@ bifang.server.encryptpwd.path=/v1/user/encryptpwd
#登录bifang服务路径信息
bifang.server.login.path=/v1/user/login
#获取vaysId路径信息
bifang.server.policy.vaysid.path=/v1/system/vsys/
#获取静态阈值路径信息
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
@@ -128,7 +134,18 @@ baseline.threshold.schedule.days=1
#kafka用户认证配置参数
sasl.jaas.config.user=admin
sasl.jaas.config.password=galaxy2019
#sasl.jaas.config.password=galaxy2019
#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
#是否开启kafka用户认证配置10
sasl.jaas.config.flag=1
sasl.jaas.config.flag=1
#nacos配置
nacos.server.addr=192.168.40.42:8848
nacos.namespace=prod
nacos.username=nacos
nacos.password=nacos
nacos.data.id=dos_detection.properties
nacos.group=Galaxy
nacos.read.timeout=5000

View File

@@ -41,14 +41,18 @@ public class IpTest {
IPAddress pv43 = new IPAddressString("fc00::").getAddress();
IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress();
IPAddress pv45 = new IPAddressString("192.168.42.1/32").getAddress();
IPAddress pv45 = new IPAddressString("192.168.42.1").getAddress();
IPAddress pv46 = new IPAddressString("192.168.42.1/32").getAddress();
IPAddress pv47 = new IPAddressString("12.56.4.0").getAddress();
IPAddress mask = pv45.getNetwork().getNetworkMask(24, false);
System.out.println(pv45.isMultiple());
System.out.println(pv46.isMultiple());
System.out.println(pv46.isPrefixed());
System.out.println(pv47.isPrefixed());
System.out.println(pv45+"---"+pv45.toMaxHost().withoutPrefixLength()+"---"+pv45.adjustPrefixLength(pv45.getBitCount()));
System.out.println(pv45+"---mask:"+pv45.mask(mask).toString());
System.out.println(pv45.adjustPrefixLength(pv45.getBitCount())+"---"+pv45.toMaxHost().withoutPrefixLength());

View File

@@ -0,0 +1,101 @@
package com.zdjizhi.common;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import org.junit.Test;
import java.io.IOException;
import java.io.StringReader;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* @author qidaijie
* @Package com.zdjizhi
* @Description:
* @date 2022/3/1016:58
*/
public class NacosTest {
/**
* <dependency>
* <groupId>com.alibaba.nacos</groupId>
* <artifactId>nacos-client</artifactId>
* <version>1.2.0</version>
* </dependency>
*/
private static Properties properties = new Properties();
/**
* config data id = config name
*/
private static final String DATA_ID = "dos_baseline.properties";
/**
* config group
*/
private static final String GROUP = "Galaxy";
private void getProperties() {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "192.168.44.12:8848");
properties.setProperty(PropertyKeyConst.NAMESPACE, "flink");
properties.setProperty(PropertyKeyConst.USERNAME, "nacos");
properties.setProperty(PropertyKeyConst.PASSWORD, "nacos");
}
@Test
public void GetConfigurationTest() {
try {
getProperties();
ConfigService configService = NacosFactory.createConfigService(properties);
String content = configService.getConfig(DATA_ID, GROUP, 5000);
Properties nacosConfigMap = new Properties();
nacosConfigMap.load(new StringReader(content));
System.out.println(nacosConfigMap.getProperty("static.sensitivity.threshold"));
} catch (NacosException | IOException e) {
e.printStackTrace();
}
}
@Test
public void ListenerConfigurationTest() {
getProperties();
try {
//first get config
ConfigService configService = NacosFactory.createConfigService(properties);
String config = configService.getConfig(DATA_ID, GROUP, 5000);
// System.out.println(config);
//start listenner
configService.addListener(DATA_ID, GROUP, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configMsg) {
System.out.println(configMsg);
}
});
} catch (NacosException e) {
e.printStackTrace();
}
//keep running,change nacos config,print new config
/*
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
*/
}
}