Compare commits
5 Commits
feature/24
...
tsg-v08
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c790354c0 | ||
|
|
ca19b6c0e6 | ||
|
|
b95ca44c01 | ||
|
|
b2936abe1a | ||
|
|
e4425f8116 |
66
pom.xml
66
pom.xml
@@ -114,13 +114,6 @@
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka_2.11</artifactId>
|
||||
@@ -128,36 +121,6 @@
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
|
||||
<!--Flink modules-->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-api-java</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-planner-blink_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-table-planner_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<!--<scope>provided</scope>-->
|
||||
</dependency>
|
||||
|
||||
<!-- CLI dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
@@ -181,6 +144,23 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<version>2.2.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
@@ -203,24 +183,12 @@
|
||||
<version>4.5.6</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-all</artifactId>
|
||||
<version>5.5.2</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.github.seancfoley</groupId>
|
||||
<artifactId>ipaddress</artifactId>
|
||||
<version>5.3.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
|
||||
@@ -19,6 +19,8 @@ public class CommonConfig {
|
||||
public static final String KAFKA_OUTPUT_METRIC_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.metric.topic.name");
|
||||
public static final int KAFKA_OUTPUT_EVENT_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.event.parallelism");
|
||||
public static final String KAFKA_OUTPUT_EVENT_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.event.topic.name");
|
||||
public static final String KAFKA_OUTPUT_SKETCH_TOPIC_NAME = CommonConfigurations.getStringProperty("kafka.output.sketch.topic.name");
|
||||
public static final int KAFKA_OUTPUT_SKETCH_PARALLELISM = CommonConfigurations.getIntProperty("kafka.output.sketch.parallelism");
|
||||
public static final String KAFKA_OUTPUT_BOOTSTRAP_SERVERS = CommonConfigurations.getStringProperty("kafka.output.bootstrap.servers");
|
||||
|
||||
public static final String HBASE_ZOOKEEPER_QUORUM = CommonConfigurations.getStringProperty("hbase.zookeeper.quorum");
|
||||
@@ -39,23 +41,14 @@ public class CommonConfig {
|
||||
|
||||
public static final String IP_MMDB_PATH = CommonConfigurations.getStringProperty("ip.mmdb.path");
|
||||
|
||||
public static final int SENSITIVITY_THRESHOLD = CommonConfigurations.getIntProperty("sensitivity.threshold");
|
||||
|
||||
public static final double BASELINE_SESSIONS_MINOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.minor.threshold");
|
||||
public static final double BASELINE_SESSIONS_WARNING_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.warning.threshold");
|
||||
public static final double BASELINE_SESSIONS_MAJOR_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.major.threshold");
|
||||
public static final double BASELINE_SESSIONS_SEVERE_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.severe.threshold");
|
||||
public static final double BASELINE_SESSIONS_CRITICAL_THRESHOLD = CommonConfigurations.getDoubleProperty("baseline.sessions.critical.threshold");
|
||||
|
||||
public static final String BIFANG_SERVER_URI = CommonConfigurations.getStringProperty("bifang.server.uri");
|
||||
public static final String BIFANG_SERVER_USER = CommonConfigurations.getStringProperty("bifang.server.user");
|
||||
public static final String BIFANG_SERVER_PASSWORD = CommonConfigurations.getStringProperty("bifang.server.password");
|
||||
public static final String BIFANG_SERVER_ENCRYPTPWD_PATH = CommonConfigurations.getStringProperty("bifang.server.encryptpwd.path");
|
||||
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
|
||||
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
|
||||
|
||||
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
|
||||
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
|
||||
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
|
||||
public static final int HTTP_POOL_CONNECT_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.connect.timeout");
|
||||
public static final int HTTP_POOL_RESPONSE_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.response.timeout");
|
||||
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
|
||||
|
||||
}
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class DosDetectionThreshold implements Serializable {
|
||||
private String profileId;
|
||||
private String attackType;
|
||||
private ArrayList<String> serverIpList;
|
||||
private String serverIpAddr;
|
||||
private long packetsPerSec;
|
||||
private long bitsPerSec;
|
||||
private long sessionsPerSec;
|
||||
private int isValid;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosDetectionThreshold{" +
|
||||
"profileId='" + profileId + '\'' +
|
||||
", attackType='" + attackType + '\'' +
|
||||
", serverIpList=" + serverIpList +
|
||||
", serverIpAddr='" + serverIpAddr + '\'' +
|
||||
", packetsPerSec=" + packetsPerSec +
|
||||
", bitsPerSec=" + bitsPerSec +
|
||||
", sessionsPerSec=" + sessionsPerSec +
|
||||
", isValid=" + isValid +
|
||||
'}';
|
||||
}
|
||||
|
||||
public String getProfileId() {
|
||||
return profileId;
|
||||
}
|
||||
|
||||
public void setProfileId(String profileId) {
|
||||
this.profileId = profileId;
|
||||
}
|
||||
|
||||
public String getAttackType() {
|
||||
return attackType;
|
||||
}
|
||||
|
||||
public void setAttackType(String attackType) {
|
||||
this.attackType = attackType;
|
||||
}
|
||||
|
||||
public ArrayList<String> getServerIpList() {
|
||||
return serverIpList;
|
||||
}
|
||||
|
||||
public void setServerIpList(ArrayList<String> serverIpList) {
|
||||
this.serverIpList = serverIpList;
|
||||
}
|
||||
|
||||
public String getServerIpAddr() {
|
||||
return serverIpAddr;
|
||||
}
|
||||
|
||||
public void setServerIpAddr(String serverIpAddr) {
|
||||
this.serverIpAddr = serverIpAddr;
|
||||
}
|
||||
|
||||
public long getPacketsPerSec() {
|
||||
return packetsPerSec;
|
||||
}
|
||||
|
||||
public void setPacketsPerSec(long packetsPerSec) {
|
||||
this.packetsPerSec = packetsPerSec;
|
||||
}
|
||||
|
||||
public long getBitsPerSec() {
|
||||
return bitsPerSec;
|
||||
}
|
||||
|
||||
public void setBitsPerSec(long bitsPerSec) {
|
||||
this.bitsPerSec = bitsPerSec;
|
||||
}
|
||||
|
||||
public long getSessionsPerSec() {
|
||||
return sessionsPerSec;
|
||||
}
|
||||
|
||||
public void setSessionsPerSec(long sessionsPerSec) {
|
||||
this.sessionsPerSec = sessionsPerSec;
|
||||
}
|
||||
|
||||
public int getIsValid() {
|
||||
return isValid;
|
||||
}
|
||||
|
||||
public void setIsValid(int isValid) {
|
||||
this.isValid = isValid;
|
||||
}
|
||||
}
|
||||
@@ -7,15 +7,19 @@ import com.zdjizhi.utils.HbaseUtils;
|
||||
import com.zdjizhi.utils.IpUtils;
|
||||
import com.zdjizhi.utils.SnowflakeId;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.text.NumberFormat;
|
||||
import java.text.ParseException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
@@ -29,7 +33,17 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) {
|
||||
baselineMap = HbaseUtils.baselineMap;
|
||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2,
|
||||
new BasicThreadFactory.Builder().namingPattern("Dos-Detection-%d").daemon(true).build());
|
||||
try {
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
//do something
|
||||
baselineMap = HbaseUtils.readFromHbase();
|
||||
}, 0, CommonConfig.BASELINE_THRESHOLD_SCHEDULE_DAYS, TimeUnit.DAYS);
|
||||
|
||||
}catch (Exception e){
|
||||
logger.error("定时器任务执行失败", e);
|
||||
}
|
||||
PERCENT_INSTANCE.setMinimumFractionDigits(2);
|
||||
}
|
||||
|
||||
@@ -39,20 +53,20 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
String destinationIp = value.getDestination_ip();
|
||||
String attackType = value.getAttack_type();
|
||||
logger.debug("当前判断IP:{}, 类型: {}", destinationIp, attackType);
|
||||
if (baselineMap.containsKey(destinationIp)) {
|
||||
long sketchSessions = value.getSketch_sessions();
|
||||
if (sketchSessions > CommonConfig.SENSITIVITY_THRESHOLD && baselineMap.containsKey(destinationIp)) {
|
||||
Tuple2<ArrayList<Integer>, Integer> floodTypeTup = baselineMap.get(destinationIp).get(attackType);
|
||||
Integer base = getBaseValue(floodTypeTup, value);
|
||||
long diff = value.getSketch_sessions() - base;
|
||||
long diff = sketchSessions - base;
|
||||
if (diff > 0 && base != 0) {
|
||||
String percent = getDiffPercent(diff, base);
|
||||
double diffPercentDouble = getDiffPercentDouble(percent);
|
||||
Severity severity = judgeSeverity(diffPercentDouble);
|
||||
double percent = getDiffPercent(diff, base);
|
||||
Severity severity = judgeSeverity(percent);
|
||||
if (severity != Severity.NORMAL) {
|
||||
DosEventLog result = getResult(value, severity, percent);
|
||||
logger.info("检测到当前server IP {} 存在 {} 异常,日志详情\n {}", destinationIp, attackType, result.toString());
|
||||
logger.info("检测到当前server IP {} 存在 {} 异常,超出基线{} {}倍,日志详情\n {}", destinationIp, attackType, base, percent, result);
|
||||
return result;
|
||||
} else {
|
||||
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value.toString());
|
||||
logger.debug("当前server IP:{} 未出现 {} 异常,日志详情 {}", destinationIp, attackType, value);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -64,14 +78,14 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
return null;
|
||||
}
|
||||
|
||||
private DosEventLog getResult(DosSketchLog value, Severity severity, String percent) {
|
||||
private DosEventLog getResult(DosSketchLog value, Severity severity, double percent) {
|
||||
DosEventLog dosEventLog = new DosEventLog();
|
||||
dosEventLog.setLog_id(SnowflakeId.generateId());
|
||||
dosEventLog.setStart_time(value.getSketch_start_time());
|
||||
dosEventLog.setEnd_time(value.getSketch_start_time() + CommonConfig.FLINK_WINDOW_MAX_TIME);
|
||||
dosEventLog.setAttack_type(value.getAttack_type());
|
||||
dosEventLog.setSeverity(severity.toString());
|
||||
dosEventLog.setConditions(getConditions(percent));
|
||||
dosEventLog.setConditions(getConditions(PERCENT_INSTANCE.format(percent)));
|
||||
dosEventLog.setDestination_ip(value.getDestination_ip());
|
||||
dosEventLog.setDestination_country(IpUtils.ipLookup.countryLookup(value.getDestination_ip()));
|
||||
String ipList = value.getSource_ip();
|
||||
@@ -86,7 +100,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
private Integer getBaseValue(Tuple2<ArrayList<Integer>, Integer> floodTypeTup, DosSketchLog value) {
|
||||
Integer base = 0;
|
||||
try {
|
||||
if (floodTypeTup != null){
|
||||
if (floodTypeTup != null) {
|
||||
ArrayList<Integer> baselines = floodTypeTup.f0;
|
||||
Integer defaultVaule = floodTypeTup.f1;
|
||||
if (baselines != null && baselines.size() == BASELINE_SIZE) {
|
||||
@@ -123,19 +137,16 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
return Integer.parseInt(Long.toString(indexLong));
|
||||
}
|
||||
|
||||
private String getDiffPercent(long diff, long base) {
|
||||
double diffDou = Double.parseDouble(Long.toString(diff));
|
||||
double baseDou = Double.parseDouble(Long.toString(base));
|
||||
return PERCENT_INSTANCE.format(diffDou / baseDou);
|
||||
private Double getDiffPercent(long diff, long base) {
|
||||
return BigDecimal.valueOf((float) diff / base).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
System.out.println(new DosDetection().getDiffPercent(219, 0));
|
||||
System.out.println(new DosDetection().getDiffPercentDouble("∞%"));
|
||||
}
|
||||
|
||||
private double getDiffPercentDouble(String diffPercent) throws ParseException {
|
||||
return PERCENT_INSTANCE.parse(diffPercent).doubleValue();
|
||||
public static void main(String[] args) {
|
||||
DosDetection dosDetection = new DosDetection();
|
||||
double diffPercent = dosDetection.getDiffPercent(135, 17);
|
||||
System.out.println(diffPercent);
|
||||
System.out.println(dosDetection.judgeSeverity(4.2857142857142856E14));
|
||||
System.out.println(BigDecimal.valueOf((float) 10 / 3).setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue());
|
||||
}
|
||||
|
||||
private Severity judgeSeverity(double diffPercent) {
|
||||
|
||||
@@ -80,7 +80,9 @@ public class EtlProcessFunction extends ProcessWindowFunction<DosSketchLog, DosS
|
||||
}
|
||||
}
|
||||
String sourceIpList = StringUtils.join(sourceIpSet, ",");
|
||||
return Tuple6.of(sessions/cnt,packets/cnt,bytes/cnt,sourceIpList,startTime,duration);
|
||||
// return Tuple6.of(sessions/cnt/duration,packets/cnt/duration,bytes/cnt/duration,sourceIpList,startTime,duration);
|
||||
return Tuple6.of(sessions/CommonConfig.FLINK_WINDOW_MAX_TIME,packets/CommonConfig.FLINK_WINDOW_MAX_TIME,
|
||||
bytes/CommonConfig.FLINK_WINDOW_MAX_TIME,sourceIpList,startTime,duration);
|
||||
}catch (Exception e){
|
||||
logger.error("聚合中间结果集失败 {}",e);
|
||||
}
|
||||
|
||||
@@ -60,9 +60,9 @@ public class ParseSketchLog {
|
||||
long sketchBytes = Long.parseLong(obj.get("sketch_bytes").toString());
|
||||
dosSketchLog.setSource_ip(sourceIp);
|
||||
dosSketchLog.setDestination_ip(destinationIp);
|
||||
dosSketchLog.setSketch_sessions(sketchSessions/sketchDuration);
|
||||
dosSketchLog.setSketch_packets(sketchPackets/sketchDuration);
|
||||
dosSketchLog.setSketch_bytes(sketchBytes*8/sketchDuration);
|
||||
dosSketchLog.setSketch_sessions(sketchSessions);
|
||||
dosSketchLog.setSketch_packets(sketchPackets);
|
||||
dosSketchLog.setSketch_bytes(sketchBytes);
|
||||
collector.collect(dosSketchLog);
|
||||
logger.debug("数据解析成功:{}",dosSketchLog.toString());
|
||||
}
|
||||
|
||||
@@ -1,163 +0,0 @@
|
||||
package com.zdjizhi.etl;
|
||||
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosDetectionThreshold;
|
||||
import com.zdjizhi.utils.HttpClientUtils;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import inet.ipaddr.IPAddress;
|
||||
import inet.ipaddr.IPAddressString;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author wlh
|
||||
*/
|
||||
public class ParseStaticThreshold {
|
||||
private static Logger logger = LoggerFactory.getLogger(ParseStaticThreshold.class);
|
||||
private static String encryptpwd;
|
||||
|
||||
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
|
||||
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
|
||||
|
||||
static {
|
||||
//加载加密登录密码
|
||||
encryptpwd = getEncryptpwd();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取加密密码
|
||||
*/
|
||||
private static String getEncryptpwd(){
|
||||
String psw = HttpClientUtils.ERROR_MESSAGE;
|
||||
try {
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HashMap<String, String> parms = new HashMap<>();
|
||||
parms.put("password",CommonConfig.BIFANG_SERVER_PASSWORD);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_ENCRYPTPWD_PATH,parms);
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build());
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean)resposeMap.get("success");
|
||||
if (success){
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
psw = data.get("encryptpwd").toString();
|
||||
}
|
||||
}
|
||||
}catch (URISyntaxException e){
|
||||
logger.error("构造URI异常",e);
|
||||
}catch (Exception e){
|
||||
logger.error("获取encryptpwd失败",e);
|
||||
}
|
||||
return psw;
|
||||
}
|
||||
|
||||
/**
|
||||
* 登录bifang服务,获取token
|
||||
* @return token
|
||||
*/
|
||||
private static String loginBifangServer(){
|
||||
String token = HttpClientUtils.ERROR_MESSAGE;
|
||||
try {
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(encryptpwd)){
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HashMap<String, String> parms = new HashMap<>();
|
||||
parms.put("username",CommonConfig.BIFANG_SERVER_USER);
|
||||
parms.put("password",encryptpwd);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_LOGIN_PATH,parms);
|
||||
String resposeJsonStr = HttpClientUtils.httpPost(uriBuilder.build(), null);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean)resposeMap.get("success");
|
||||
if (success){
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
token = data.get("token").toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("登录失败,未获取到token ",e);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取静态阈值配置列表
|
||||
* @return thresholds
|
||||
*/
|
||||
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold(){
|
||||
ArrayList<DosDetectionThreshold> thresholds = null;
|
||||
try {
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder,CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH,null);
|
||||
String token = loginBifangServer();
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)){
|
||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)){
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean)resposeMap.get("success");
|
||||
if (success){
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(data.get("list")), thresholdType);
|
||||
logger.info("获取到静态阈值配置{}条",thresholds.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ",e);
|
||||
}
|
||||
return thresholds;
|
||||
}
|
||||
|
||||
/**
|
||||
* 基于静态阈值构建threshold RangeMap,k:IP段或具体IP,v:配置信息
|
||||
* @return threshold RangeMap
|
||||
*/
|
||||
public static TreeRangeMap<IPAddress, DosDetectionThreshold> createStaticThreshold(){
|
||||
TreeRangeMap<IPAddress, DosDetectionThreshold> thresholdRangeMap = null;
|
||||
try {
|
||||
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
||||
if (dosDetectionThreshold != null && !dosDetectionThreshold.isEmpty()){
|
||||
thresholdRangeMap = TreeRangeMap.create();
|
||||
for (DosDetectionThreshold threshold:dosDetectionThreshold){
|
||||
ArrayList<String> serverIpList = threshold.getServerIpList();
|
||||
for (String sip:serverIpList){
|
||||
IPAddressString ipAddressString = new IPAddressString(sip);
|
||||
if (ipAddressString.isIPAddress()){
|
||||
IPAddress address = ipAddressString.getAddress();
|
||||
thresholdRangeMap.put(Range.closed(address.getLower(),address.getUpper()),threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("构建threshold RangeMap失败",e);
|
||||
}
|
||||
return thresholdRangeMap;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
TreeRangeMap<IPAddress, DosDetectionThreshold> staticThreshold = createStaticThreshold();
|
||||
Map<Range<IPAddress>, DosDetectionThreshold> rangeDosDetectionThresholdMap = staticThreshold.asMapOfRanges();
|
||||
Set<Range<IPAddress>> ranges = rangeDosDetectionThresholdMap.keySet();
|
||||
for (Range<IPAddress> range:ranges){
|
||||
System.out.println(range+"--"+rangeDosDetectionThresholdMap.get(range));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
21
src/main/java/com/zdjizhi/sink/DosSketchSink.java
Normal file
21
src/main/java/com/zdjizhi/sink/DosSketchSink.java
Normal file
@@ -0,0 +1,21 @@
|
||||
package com.zdjizhi.sink;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosSketchLog;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.KafkaUtils;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
class DosSketchSink {
|
||||
|
||||
static void dosSketchOutputSink(SingleOutputStreamOperator<DosSketchLog> sketchSource){
|
||||
sketchSource
|
||||
.filter(Objects::nonNull)
|
||||
.map(JsonMapper::toJsonString)
|
||||
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_SKETCH_TOPIC_NAME))
|
||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_SKETCH_PARALLELISM);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -27,6 +27,9 @@ public class OutputStreamSink {
|
||||
|
||||
public static void finalOutputSink(){
|
||||
try {
|
||||
SingleOutputStreamOperator<DosSketchLog> sketchSource = ParseSketchLog.getSketchSource();
|
||||
DosSketchSink.dosSketchOutputSink(sketchSource);
|
||||
|
||||
SingleOutputStreamOperator<DosSketchLog> middleStream = getMiddleStream();
|
||||
DosEventSink.dosEventOutputSink(getEventSinkStream(middleStream));
|
||||
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
|
||||
|
||||
@@ -2,8 +2,6 @@ package com.zdjizhi.utils;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
|
||||
|
||||
/**
|
||||
@@ -12,16 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
public class FlinkEnvironmentUtils {
|
||||
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
|
||||
public static StreamTableEnvironment getStreamTableEnv() {
|
||||
static {
|
||||
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
|
||||
|
||||
EnvironmentSettings settings = EnvironmentSettings.newInstance()
|
||||
.useBlinkPlanner()
|
||||
.inStreamingMode()
|
||||
.build();
|
||||
|
||||
return StreamTableEnvironment.create(streamExeEnv, settings);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ public class HbaseUtils {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HbaseUtils.class);
|
||||
private static Table table = null;
|
||||
private static Scan scan = null;
|
||||
public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
|
||||
private static ArrayList<String> floodTypeList = new ArrayList<>();
|
||||
|
||||
static {
|
||||
@@ -33,7 +32,6 @@ public class HbaseUtils {
|
||||
floodTypeList.add("UDP Flood");
|
||||
floodTypeList.add("ICMP Flood");
|
||||
floodTypeList.add("DNS Amplification");
|
||||
readFromHbase();
|
||||
}
|
||||
|
||||
private static void prepareHbaseEnv() throws IOException {
|
||||
@@ -54,6 +52,7 @@ public class HbaseUtils {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = readFromHbase();
|
||||
Set<String> keySet = baselineMap.keySet();
|
||||
for (String key : keySet) {
|
||||
Map<String, Tuple2<ArrayList<Integer>, Integer>> stringTuple2Map = baselineMap.get(key);
|
||||
@@ -66,7 +65,8 @@ public class HbaseUtils {
|
||||
System.out.println(baselineMap.size());
|
||||
}
|
||||
|
||||
private static void readFromHbase() {
|
||||
public static Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> readFromHbase() {
|
||||
Map<String, Map<String, Tuple2<ArrayList<Integer>, Integer>>> baselineMap = new HashMap<>();
|
||||
try {
|
||||
prepareHbaseEnv();
|
||||
logger.info("开始读取baseline数据");
|
||||
@@ -87,6 +87,7 @@ public class HbaseUtils {
|
||||
} catch (Exception e) {
|
||||
logger.error("读取hbase数据失败", e);
|
||||
}
|
||||
return baselineMap;
|
||||
}
|
||||
|
||||
private static Integer getDefaultValue(Result result, String family, String qualifier) {
|
||||
|
||||
@@ -1,268 +0,0 @@
|
||||
package com.zdjizhi.utils;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import org.apache.http.*;
|
||||
import org.apache.http.client.ClientProtocolException;
|
||||
import org.apache.http.client.HttpRequestRetryHandler;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.*;
|
||||
import org.apache.http.client.protocol.HttpClientContext;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.conn.ConnectionKeepAliveStrategy;
|
||||
import org.apache.http.conn.HttpHostConnectException;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
|
||||
import org.apache.http.message.BasicHeaderElementIterator;
|
||||
import org.apache.http.protocol.HTTP;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* http client工具类
|
||||
*/
|
||||
public class HttpClientUtils {
|
||||
/** 全局连接池对象 */
|
||||
private static final PoolingHttpClientConnectionManager CONN_MANAGER = new PoolingHttpClientConnectionManager();
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class);
|
||||
public static final String ERROR_MESSAGE = "-1";
|
||||
|
||||
/*
|
||||
* 静态代码块配置连接池信息
|
||||
*/
|
||||
static {
|
||||
|
||||
// 设置最大连接数
|
||||
CONN_MANAGER.setMaxTotal(CommonConfig.HTTP_POOL_MAX_CONNECTION);
|
||||
// 设置每个连接的路由数
|
||||
CONN_MANAGER.setDefaultMaxPerRoute(CommonConfig.HTTP_POOL_MAX_PER_ROUTE);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取Http客户端连接对象
|
||||
* @return Http客户端连接对象
|
||||
*/
|
||||
private static CloseableHttpClient getHttpClient() {
|
||||
// 创建Http请求配置参数
|
||||
RequestConfig requestConfig = RequestConfig.custom()
|
||||
// 获取连接超时时间
|
||||
.setConnectionRequestTimeout(CommonConfig.HTTP_POOL_REQUEST_TIMEOUT)
|
||||
// 请求超时时间
|
||||
.setConnectTimeout(CommonConfig.HTTP_POOL_CONNECT_TIMEOUT)
|
||||
// 响应超时时间
|
||||
.setSocketTimeout(CommonConfig.HTTP_POOL_RESPONSE_TIMEOUT)
|
||||
.build();
|
||||
|
||||
/*
|
||||
* 测出超时重试机制为了防止超时不生效而设置
|
||||
* 如果直接放回false,不重试
|
||||
* 这里会根据情况进行判断是否重试
|
||||
*/
|
||||
HttpRequestRetryHandler retry = (exception, executionCount, context) -> {
|
||||
if (executionCount >= 3) {// 如果已经重试了3次,就放弃
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
|
||||
return true;
|
||||
}
|
||||
if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof UnknownHostException) {// 目标服务器不可达
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof HttpHostConnectException) {// 连接被拒绝
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof SSLException) {// ssl握手异常
|
||||
return false;
|
||||
}
|
||||
if (exception instanceof InterruptedIOException) {// 超时
|
||||
return true;
|
||||
}
|
||||
HttpClientContext clientContext = HttpClientContext.adapt(context);
|
||||
HttpRequest request = clientContext.getRequest();
|
||||
// 如果请求是幂等的,就再次尝试
|
||||
return !(request instanceof HttpEntityEnclosingRequest);
|
||||
};
|
||||
|
||||
|
||||
ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
|
||||
HeaderElementIterator it = new BasicHeaderElementIterator
|
||||
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
|
||||
while (it.hasNext()) {
|
||||
HeaderElement he = it.nextElement();
|
||||
String param = he.getName();
|
||||
String value = he.getValue();
|
||||
if (value != null && "timeout".equalsIgnoreCase(param)) {
|
||||
return Long.parseLong(value) * 1000;
|
||||
}
|
||||
}
|
||||
return 60 * 1000;//如果没有约定,则默认定义时长为60s
|
||||
};
|
||||
|
||||
// 创建httpClient
|
||||
return HttpClients.custom()
|
||||
// 把请求相关的超时信息设置到连接客户端
|
||||
.setDefaultRequestConfig(requestConfig)
|
||||
// 把请求重试设置到连接客户端
|
||||
.setRetryHandler(retry)
|
||||
.setKeepAliveStrategy(myStrategy)
|
||||
// 配置连接池管理对象
|
||||
.setConnectionManager(CONN_MANAGER)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* GET请求
|
||||
*
|
||||
* @param uri 请求地
|
||||
* @return message
|
||||
*/
|
||||
public static String httpGet(URI uri, Header... headers) {
|
||||
String msg = ERROR_MESSAGE;
|
||||
|
||||
// 获取客户端连接对象
|
||||
CloseableHttpClient httpClient = getHttpClient();
|
||||
CloseableHttpResponse response = null;
|
||||
|
||||
try {
|
||||
logger.info("http get uri {}",uri);
|
||||
// 创建GET请求对象
|
||||
HttpGet httpGet = new HttpGet(uri);
|
||||
|
||||
if (StringUtil.isNotEmpty(headers)) {
|
||||
for (Header h : headers) {
|
||||
httpGet.addHeader(h);
|
||||
logger.info("request header : {}",h);
|
||||
}
|
||||
}
|
||||
// 执行请求
|
||||
response = httpClient.execute(httpGet);
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
// 获取响应实体
|
||||
HttpEntity entity = response.getEntity();
|
||||
// 获取响应信息
|
||||
msg = EntityUtils.toString(entity, "UTF-8");
|
||||
|
||||
if (statusCode != HttpStatus.SC_OK) {
|
||||
logger.error("Http get content is :{}" , msg);
|
||||
}
|
||||
|
||||
} catch (ClientProtocolException e) {
|
||||
logger.error("协议错误: {}", e.getMessage());
|
||||
} catch (ParseException e) {
|
||||
logger.error("解析错误: {}", e.getMessage());
|
||||
} catch (IOException e) {
|
||||
logger.error("IO错误: {}",e.getMessage());
|
||||
} finally {
|
||||
if (null != response) {
|
||||
try {
|
||||
EntityUtils.consume(response.getEntity());
|
||||
response.close();
|
||||
} catch (IOException e) {
|
||||
logger.error("释放链接错误: {}", e.getMessage());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return msg;
|
||||
}
|
||||
/**
|
||||
* POST 请求
|
||||
* @param uri uri参数
|
||||
* @param requestBody 请求体
|
||||
* @return post请求返回结果
|
||||
*/
|
||||
public static String httpPost(URI uri, String requestBody, Header... headers) {
|
||||
String msg = ERROR_MESSAGE;
|
||||
// 获取客户端连接对象
|
||||
CloseableHttpClient httpClient = getHttpClient();
|
||||
|
||||
// 创建POST请求对象
|
||||
CloseableHttpResponse response = null;
|
||||
try {
|
||||
|
||||
logger.info("http post uri:{}, http post body:{}", uri, requestBody);
|
||||
|
||||
HttpPost httpPost = new HttpPost(uri);
|
||||
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
if (StringUtil.isNotEmpty(headers)) {
|
||||
for (Header h : headers) {
|
||||
httpPost.addHeader(h);
|
||||
logger.info("request header : {}",h);
|
||||
}
|
||||
}
|
||||
|
||||
if(StringUtil.isNotBlank(requestBody)) {
|
||||
byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
|
||||
httpPost.setEntity(new ByteArrayEntity(bytes));
|
||||
}
|
||||
|
||||
response = httpClient.execute(httpPost);
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
// 获取响应实体
|
||||
HttpEntity entity = response.getEntity();
|
||||
// 获取响应信息
|
||||
msg = EntityUtils.toString(entity, "UTF-8");
|
||||
|
||||
if (statusCode != HttpStatus.SC_OK) {
|
||||
logger.error("Http post content is :{}" , msg);
|
||||
}
|
||||
} catch (ClientProtocolException e) {
|
||||
logger.error("协议错误: {}", e.getMessage());
|
||||
} catch (ParseException e) {
|
||||
logger.error("解析错误: {}", e.getMessage());
|
||||
} catch (IOException e) {
|
||||
logger.error("IO错误: {}", e.getMessage());
|
||||
} finally {
|
||||
if (null != response) {
|
||||
try {
|
||||
EntityUtils.consumeQuietly(response.getEntity());
|
||||
response.close();
|
||||
} catch (IOException e) {
|
||||
logger.error("释放链接错误: {}", e.getMessage());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* 拼装url
|
||||
* url ,参数map
|
||||
*/
|
||||
public static void setUrlWithParams(URIBuilder uriBuilder,String path, Map<String, String> params) {
|
||||
try {
|
||||
uriBuilder.setPath(path);
|
||||
if (params != null && !params.isEmpty()){
|
||||
for (Map.Entry<String, String> kv : params.entrySet()) {
|
||||
uriBuilder.setParameter(kv.getKey(),kv.getValue());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("拼接url出错,uri : {}, path : {},参数: {}",uriBuilder.toString(),path,params);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -9,9 +9,9 @@ public class IpUtils {
|
||||
*/
|
||||
public static IpLookup ipLookup = new IpLookup.Builder(false)
|
||||
.loadDataFileV4(CommonConfig.IP_MMDB_PATH + "ip_v4.mmdb")
|
||||
// .loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
|
||||
// .loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb")
|
||||
// .loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
|
||||
.loadDataFileV6(CommonConfig.IP_MMDB_PATH + "ip_v6.mmdb")
|
||||
.loadDataFilePrivateV4(CommonConfig.IP_MMDB_PATH + "ip_private_v4.mmdb")
|
||||
.loadDataFilePrivateV6(CommonConfig.IP_MMDB_PATH + "ip_private_v6.mmdb")
|
||||
.build();
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -11,6 +11,9 @@ public class KafkaUtils {
|
||||
private static Properties getKafkaSinkProperty(){
|
||||
Properties propertiesproducer = new Properties();
|
||||
propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
|
||||
// propertiesproducer.setProperty("security.protocol", "SASL_PLAINTEXT");
|
||||
// propertiesproducer.setProperty("sasl.mechanism", "PLAIN");
|
||||
// propertiesproducer.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"galaxy2019\";");
|
||||
|
||||
return propertiesproducer;
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ kafka.input.topic.name=DOS-SKETCH-LOG
|
||||
kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
|
||||
|
||||
#读取kafka group id
|
||||
kafka.input.group.id=2108161121
|
||||
kafka.input.group.id=2109061121
|
||||
#kafka.input.group.id=dos-detection-job-210813-1
|
||||
|
||||
#发送kafka metrics并行度大小
|
||||
@@ -32,8 +32,13 @@ kafka.output.event.parallelism=1
|
||||
#kafka.output.event.topic.name=DOS-EVENT-LOG
|
||||
kafka.output.event.topic.name=test
|
||||
|
||||
#sketch日志 topic名以及并行度
|
||||
#kafka.output.sketch.topic.name=FLATTEN-DOS-SKETCH-LOG
|
||||
kafka.output.sketch.topic.name=test
|
||||
kafka.output.sketch.parallelism=1
|
||||
|
||||
#kafka输出地址
|
||||
kafka.output.bootstrap.servers=192.168.44.12:9092
|
||||
kafka.output.bootstrap.servers=192.168.44.33:9092
|
||||
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
|
||||
|
||||
#zookeeper地址
|
||||
@@ -45,7 +50,7 @@ hbase.client.operation.timeout=30000
|
||||
hbase.client.scanner.timeout.period=30000
|
||||
|
||||
##hbase baseline表名
|
||||
hbase.baseline.table.name=ddos_traffic_baselines
|
||||
hbase.baseline.table.name=dos:ddos_traffic_baselines
|
||||
|
||||
#读取baseline限制
|
||||
hbase.baseline.total.num=1000000
|
||||
@@ -72,8 +77,10 @@ data.center.id.num=15
|
||||
|
||||
#IP mmdb库路径
|
||||
ip.mmdb.path=D:\\data\\dat\\
|
||||
#ip.mmdb.path=/home/bigdata/topology/dat/
|
||||
#ip.mmdb.path=/home/bigdata/wlh/topology/dos-detection/dat/
|
||||
#ip.mmdb.path=ip.mmdb.path=/home/ceiec/topology/dat/
|
||||
|
||||
#敏感阈值,速率小于此值不报警
|
||||
sensitivity.threshold=100
|
||||
|
||||
#基于baseline判定dos攻击的上下限
|
||||
baseline.sessions.minor.threshold=0.1
|
||||
@@ -82,34 +89,5 @@ baseline.sessions.major.threshold=1
|
||||
baseline.sessions.severe.threshold=3
|
||||
baseline.sessions.critical.threshold=8
|
||||
|
||||
#bifang服务访问地址
|
||||
bifang.server.uri=http://192.168.44.3:80
|
||||
|
||||
#访问bifang服务用户名密码
|
||||
bifang.server.user=admin
|
||||
bifang.server.password=admin
|
||||
|
||||
#加密密码路径信息
|
||||
bifang.server.encryptpwd.path=/v1/user/encryptpwd
|
||||
|
||||
#登录bifang服务路径信息
|
||||
bifang.server.login.path=/v1/user/login
|
||||
|
||||
#获取静态阈值路径信息
|
||||
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
|
||||
|
||||
#http请求相关参数
|
||||
#最大连接数
|
||||
http.pool.max.connection=400
|
||||
|
||||
#单路由最大连接数
|
||||
http.pool.max.per.route=80
|
||||
|
||||
#向服务端请求超时时间设置(单位:毫秒)
|
||||
http.pool.request.timeout=60000
|
||||
|
||||
#向服务端连接超时时间设置(单位:毫秒)
|
||||
http.pool.connect.timeout=60000
|
||||
|
||||
#服务端响应超时时间设置(单位:毫秒)
|
||||
http.pool.response.timeout=60000
|
||||
#获取baseline周期,默认7天
|
||||
baseline.threshold.schedule.days=7
|
||||
@@ -1,7 +0,0 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
public class HttpTest {
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,84 +0,0 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import inet.ipaddr.Address;
|
||||
import inet.ipaddr.AddressStringException;
|
||||
import inet.ipaddr.IPAddress;
|
||||
import inet.ipaddr.IPAddressString;
|
||||
import inet.ipaddr.format.util.AddressTrieMap;
|
||||
import inet.ipaddr.format.util.AssociativeAddressTrie;
|
||||
import inet.ipaddr.ipv4.IPv4Address;
|
||||
import inet.ipaddr.ipv4.IPv4AddressAssociativeTrie;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.Range;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.collect.TreeRangeMap;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class IpTest {
|
||||
public static void main(String[] args) throws Exception {
|
||||
IPv4AddressAssociativeTrie<Integer> trie = new IPv4AddressAssociativeTrie<>();
|
||||
|
||||
IPAddress str1 = new IPAddressString("1.2.3.4").getAddress();
|
||||
IPAddress str2 = new IPAddressString("10.0.0.0/15").getAddress();
|
||||
IPAddress str3 = new IPAddressString("25.4.2.0/23").getAddress();
|
||||
IPAddress str4 = new IPAddressString("192.168.8.0/21").getAddress();
|
||||
IPAddress str5 = new IPAddressString("240.0.0.0/4").getAddress();
|
||||
IPAddress str6 = new IPAddressString("fc00::0/64").getAddress();
|
||||
IPAddress str7 = new IPAddressString("fc00::10:1").getAddress();
|
||||
|
||||
TreeRangeMap<IPAddress, Object> rangeMap = TreeRangeMap.create();
|
||||
rangeMap.put(Range.closed(str1.getLower(),str1.getUpper()),1);
|
||||
rangeMap.put(Range.closed(str2.getLower(),str2.getUpper()),2);
|
||||
rangeMap.put(Range.closed(str3.getLower(),str3.getUpper()),3);
|
||||
rangeMap.put(Range.closed(str4.getLower(),str4.getUpper()),4);
|
||||
rangeMap.put(Range.closed(str5.getLower(),str5.getUpper()),5);
|
||||
rangeMap.put(Range.closed(str6.getLower(),str6.getUpper()),6);
|
||||
rangeMap.put(Range.closed(str7.getLower(),str7.getUpper()),7);
|
||||
|
||||
IPAddress pv4 = new IPAddressString("255.255.14.255").getAddress();
|
||||
IPAddress pv42 = new IPAddressString("1.2.3.4").getAddress();
|
||||
IPAddress pv43 = new IPAddressString("fc00::").getAddress();
|
||||
IPAddress pv44 = new IPAddressString("fc00::10:1").getAddress();
|
||||
|
||||
System.out.println(rangeMap.get(pv4));
|
||||
System.out.println(rangeMap.get(pv42));
|
||||
System.out.println(rangeMap.get(pv43));
|
||||
System.out.println(rangeMap.get(pv44));
|
||||
|
||||
/*
|
||||
System.out.println(str5.toSequentialRange());
|
||||
// System.out.println(str2.contains(new IPAddressString("10.0.0.2")));
|
||||
// System.out.println(str5.toAddress().toIPv4().toSequentialRange());
|
||||
|
||||
|
||||
trie.put(str1,1);
|
||||
trie.put(str2,2);
|
||||
trie.put(str3,3);
|
||||
trie.put(str4,4);
|
||||
trie.put(str5,5);
|
||||
|
||||
AddressTrieMap<IPv4Address, Integer> trieMap = new AddressTrieMap<>(trie);
|
||||
|
||||
|
||||
|
||||
trieMap.forEach((k,v) -> {
|
||||
System.out.println(k.toString() + "--" + v);
|
||||
});
|
||||
|
||||
System.out.println("-----------------");
|
||||
|
||||
trie.forEach((k) -> System.out.println(k.toString()));
|
||||
|
||||
System.out.println(str5.contains(pv4));
|
||||
System.out.println(trie.contains(pv4));
|
||||
System.out.println(trieMap.get(pv4));
|
||||
System.out.println(trieMap.containsKey(pv4));
|
||||
// System.out.println(trieMap.getRange());
|
||||
// IPAddress str3 = new IPAddressString("fc00::10:1").getAddress();
|
||||
// IPAddress str4 = new IPAddressString("fc00::10:2/64").getAddress();
|
||||
|
||||
// System.out.println(Arrays.toString(str1.mergeToPrefixBlocks(str2,str3,str4)));
|
||||
|
||||
*/
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import org.apache.flink.table.functions.TableFunction;
|
||||
import org.apache.flink.types.Row;
|
||||
|
||||
public class UdtfTest extends TableFunction<Row> {
|
||||
|
||||
public void eval(Row[] rows) {
|
||||
for (Row row : rows) {
|
||||
collect(row);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user