Merge branch 'tsg-22.06' of git.mesalab.cn:bigdata/tsg/flink-dos-detection
This commit is contained in:
@@ -64,6 +64,8 @@ public class CommonConfig {
|
||||
public static final String BIFANG_SERVER_LOGIN_PATH = CommonConfigurations.getStringProperty("bifang.server.login.path");
|
||||
public static final String BIFANG_SERVER_POLICY_THRESHOLD_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.threshold.path");
|
||||
|
||||
public static final String BIFANG_SERVER_POLICY_VSYSID_PATH = CommonConfigurations.getStringProperty("bifang.server.policy.vaysid.path");
|
||||
|
||||
public static final int HTTP_POOL_MAX_CONNECTION = CommonConfigurations.getIntProperty("http.pool.max.connection");
|
||||
public static final int HTTP_POOL_MAX_PER_ROUTE = CommonConfigurations.getIntProperty("http.pool.max.per.route");
|
||||
public static final int HTTP_POOL_REQUEST_TIMEOUT = CommonConfigurations.getIntProperty("http.pool.request.timeout");
|
||||
|
||||
22
src/main/java/com/zdjizhi/common/DosVsysId.java
Normal file
22
src/main/java/com/zdjizhi/common/DosVsysId.java
Normal file
@@ -0,0 +1,22 @@
|
||||
package com.zdjizhi.common;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class DosVsysId {
|
||||
private int vsysId;
|
||||
|
||||
public int getVsysId() {
|
||||
return vsysId;
|
||||
}
|
||||
|
||||
public void setVsysId(int vsysId) {
|
||||
this.vsysId = vsysId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DosVsysId{" +
|
||||
"vsysId=" + vsysId +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -193,7 +193,7 @@ public class DosDetection extends RichMapFunction<DosSketchLog, DosEventLog> {
|
||||
switch (type) {
|
||||
case STATIC_CONDITION_TYPE:
|
||||
return new StrBuilder()
|
||||
.append(tag).append(" > ")
|
||||
.append("Rate > ")
|
||||
.append(base).append(" ")
|
||||
.append(tag).append("/s")
|
||||
.toString();
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.zdjizhi.etl;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import com.zdjizhi.common.DosDetectionThreshold;
|
||||
import com.zdjizhi.common.DosVsysId;
|
||||
import com.zdjizhi.utils.HttpClientUtils;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import inet.ipaddr.IPAddress;
|
||||
@@ -29,6 +30,7 @@ public class ParseStaticThreshold {
|
||||
private static JsonMapper jsonMapperInstance = JsonMapper.getInstance();
|
||||
private static JavaType hashmapJsonType = jsonMapperInstance.createCollectionType(HashMap.class, String.class, Object.class);
|
||||
private static JavaType thresholdType = jsonMapperInstance.createCollectionType(ArrayList.class, DosDetectionThreshold.class);
|
||||
private static JavaType vsysIDType = jsonMapperInstance.createCollectionType(ArrayList.class, DosVsysId.class);
|
||||
|
||||
static {
|
||||
//加载加密登录密码
|
||||
@@ -99,19 +101,18 @@ public class ParseStaticThreshold {
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取静态阈值配置列表
|
||||
* 获取vsysId配置列表
|
||||
*
|
||||
* @return thresholds
|
||||
* @return vsysIdList
|
||||
*/
|
||||
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
|
||||
ArrayList<DosDetectionThreshold> thresholds = null;
|
||||
private static ArrayList<DosVsysId> getVsysId() {
|
||||
ArrayList<DosVsysId> vsysIdList = null;
|
||||
try {
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HashMap<String, Object> parms = new HashMap<>();
|
||||
parms.put("pageSize", -1);
|
||||
parms.put("orderBy", "profileId asc");
|
||||
parms.put("isValid", 1);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
|
||||
parms.put("orderBy", "vsysId desc");
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_VSYSID_PATH, parms);
|
||||
String token = CommonConfig.BIFANG_SERVER_TOKEN;
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||
@@ -125,19 +126,70 @@ public class ParseStaticThreshold {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
|
||||
logger.info("获取到静态阈值配置{}条", thresholds.size());
|
||||
vsysIdList = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), vsysIDType);
|
||||
logger.info("获取到vsysId{}条", vsysIdList.size());
|
||||
} else {
|
||||
logger.warn("静态阈值配置为空");
|
||||
logger.warn("vsysIdList为空");
|
||||
}
|
||||
} else {
|
||||
logger.error(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("获取vsysId失败,请检查bifang服务或登录配置信息 ", e);
|
||||
}
|
||||
return vsysIdList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据vsysId获取静态阈值配置列表
|
||||
*
|
||||
* @return thresholds
|
||||
*/
|
||||
private static ArrayList<DosDetectionThreshold> getDosDetectionThreshold() {
|
||||
ArrayList<DosDetectionThreshold> thresholds = null;
|
||||
// ArrayList<DosVsysId> vsysId = getVsysId();
|
||||
try {
|
||||
// if (vsysId != null){
|
||||
// for (DosVsysId dosVsysId : vsysId) {
|
||||
URIBuilder uriBuilder = new URIBuilder(CommonConfig.BIFANG_SERVER_URI);
|
||||
HashMap<String, Object> parms = new HashMap<>();
|
||||
parms.put("pageSize", -1);
|
||||
parms.put("orderBy", "profileId asc");
|
||||
parms.put("isValid", 1);
|
||||
// parms.put("vsysId", dosVsysId.getVsysId());
|
||||
parms.put("vsysId", 1);
|
||||
HttpClientUtils.setUrlWithParams(uriBuilder, CommonConfig.BIFANG_SERVER_POLICY_THRESHOLD_PATH, parms);
|
||||
String token = CommonConfig.BIFANG_SERVER_TOKEN;
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(token)) {
|
||||
BasicHeader authorization = new BasicHeader("Authorization", token);
|
||||
BasicHeader authorization1 = new BasicHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
String resposeJsonStr = HttpClientUtils.httpGet(uriBuilder.build(), authorization, authorization1);
|
||||
if (!HttpClientUtils.ERROR_MESSAGE.equals(resposeJsonStr)) {
|
||||
HashMap<String, Object> resposeMap = jsonMapperInstance.fromJson(resposeJsonStr, hashmapJsonType);
|
||||
boolean success = (boolean) resposeMap.get("success");
|
||||
String msg = resposeMap.get("msg").toString();
|
||||
if (success) {
|
||||
HashMap<String, Object> data = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(resposeMap.get("data")), hashmapJsonType);
|
||||
Object list = data.get("list");
|
||||
if (list != null) {
|
||||
thresholds = jsonMapperInstance.fromJson(jsonMapperInstance.toJson(list), thresholdType);
|
||||
logger.info("获取到静态阈值配置{}条", thresholds.size());
|
||||
} else {
|
||||
logger.warn("静态阈值配置为空");
|
||||
}
|
||||
} else {
|
||||
logger.error(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
// }
|
||||
// }
|
||||
} catch (Exception e) {
|
||||
logger.error("获取静态阈值配置失败,请检查bifang服务或登录配置信息 ", e);
|
||||
}
|
||||
|
||||
return thresholds;
|
||||
}
|
||||
|
||||
@@ -196,7 +248,6 @@ public class ParseStaticThreshold {
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
ArrayList<DosDetectionThreshold> dosDetectionThreshold = getDosDetectionThreshold();
|
||||
dosDetectionThreshold.forEach(System.out::println);
|
||||
|
||||
@@ -214,7 +265,8 @@ public class ParseStaticThreshold {
|
||||
}
|
||||
System.out.println("------------------------");
|
||||
}
|
||||
|
||||
// String s = loginBifangServer();
|
||||
// System.out.println(s);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.zdjizhi.utils;
|
||||
|
||||
import com.zdjizhi.common.CommonConfig;
|
||||
import org.apache.flink.streaming.api.CheckpointingMode;
|
||||
import org.apache.flink.streaming.api.environment.CheckpointConfig;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
|
||||
|
||||
@@ -12,6 +14,33 @@ public class FlinkEnvironmentUtils {
|
||||
|
||||
static {
|
||||
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
|
||||
|
||||
/*
|
||||
// 每 1000ms 开始一次 checkpoint
|
||||
streamExeEnv.enableCheckpointing(CommonConfig.FLINK_WINDOW_MAX_TIME * 1000);
|
||||
|
||||
// 设置模式为精确一次 (这是默认值)
|
||||
streamExeEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
|
||||
|
||||
// 确认 checkpoints 之间的时间会进行 500 ms
|
||||
streamExeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
|
||||
|
||||
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
|
||||
streamExeEnv.getCheckpointConfig().setCheckpointTimeout(60000);
|
||||
|
||||
// 允许两个连续的 checkpoint 错误
|
||||
streamExeEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
|
||||
|
||||
// 同一时间只允许一个 checkpoint 进行
|
||||
streamExeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
||||
|
||||
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
|
||||
streamExeEnv.getCheckpointConfig().enableExternalizedCheckpoints(
|
||||
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
|
||||
|
||||
// 开启实验性的 unaligned checkpoints
|
||||
streamExeEnv.getCheckpointConfig().enableUnalignedCheckpoints();
|
||||
*/
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -22,12 +22,14 @@ public class KafkaUtils {
|
||||
}
|
||||
|
||||
public static FlinkKafkaProducer<String> getKafkaSink(String topic){
|
||||
return new FlinkKafkaProducer<String>(
|
||||
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
|
||||
topic,
|
||||
new SimpleStringSchema(),
|
||||
getKafkaSinkProperty(),
|
||||
Optional.empty()
|
||||
);
|
||||
kafkaProducer.setLogFailuresOnly(true);
|
||||
return kafkaProducer;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,23 +15,25 @@ kafka.input.topic.name=DOS-SKETCH-RECORD
|
||||
kafka.input.bootstrap.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||
|
||||
#读取kafka group id
|
||||
kafka.input.group.id=2203241552
|
||||
kafka.input.group.id=2112080949
|
||||
#kafka.input.group.id=dos-detection-job-210813-1
|
||||
|
||||
#发送kafka metrics并行度大小
|
||||
kafka.output.metric.parallelism=1
|
||||
|
||||
#发送kafka metrics topic名
|
||||
kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
|
||||
#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
|
||||
kafka.output.metric.topic.name=test
|
||||
|
||||
#发送kafka event并行度大小
|
||||
kafka.output.event.parallelism=1
|
||||
|
||||
#发送kafka event topic名
|
||||
kafka.output.event.topic.name=DOS-EVENT
|
||||
#kafka.output.event.topic.name=DOS-EVENT
|
||||
kafka.output.event.topic.name=storm-dos-test
|
||||
|
||||
#kafka输出地址
|
||||
kafka.output.bootstrap.servers=192.168.40.223:9094
|
||||
kafka.output.bootstrap.servers=192.168.44.12:9094
|
||||
#kafka.output.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
|
||||
|
||||
#zookeeper地址
|
||||
@@ -102,6 +104,9 @@ bifang.server.encryptpwd.path=/v1/user/encryptpwd
|
||||
#登录bifang服务路径信息
|
||||
bifang.server.login.path=/v1/user/login
|
||||
|
||||
#获取vaysId路径信息
|
||||
bifang.server.policy.vaysid.path=/v1/system/vsys/
|
||||
|
||||
#获取静态阈值路径信息
|
||||
bifang.server.policy.threshold.path=/v1/policy/profile/DoS/detection/threshold
|
||||
|
||||
@@ -130,6 +135,7 @@ baseline.threshold.schedule.days=1
|
||||
#kafka用户认证配置参数
|
||||
sasl.jaas.config.user=admin
|
||||
#sasl.jaas.config.password=galaxy2019
|
||||
#sasl.jaas.config.password=ENC(6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ)
|
||||
sasl.jaas.config.password=6MleDyA3Z73HSaXiKsDJ2k7Ys8YWLhEJ
|
||||
|
||||
#是否开启kafka用户认证配置,1:是;0:否
|
||||
|
||||
Reference in New Issue
Block a user