diff --git a/pom.xml b/pom.xml
index ea52841..89029f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,6 +141,7 @@
org.apache.flink
flink-json
${flink.version}
+ provided
diff --git a/src/main/java/com/zdjizhi/etl/DosDetection.java b/src/main/java/com/zdjizhi/etl/DosDetection.java
index fe30c70..1107377 100644
--- a/src/main/java/com/zdjizhi/etl/DosDetection.java
+++ b/src/main/java/com/zdjizhi/etl/DosDetection.java
@@ -7,6 +7,9 @@ import com.zdjizhi.sink.OutputStreamSink;
import com.zdjizhi.utils.IpUtils;
import com.zdjizhi.utils.SnowflakeId;
import org.apache.commons.lang.StringUtils;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
@@ -29,6 +32,10 @@ public class DosDetection extends BroadcastProcessFunction>>> descriptor = new MapStateDescriptor<>("boradcast-state",
+ Types.STRING,
+ new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class>) (Class>) List.class).getTypeClass()));
+
private final static NumberFormat PERCENT_INSTANCE = NumberFormat.getPercentInstance();
@Override
@@ -39,7 +46,7 @@ public class DosDetection extends BroadcastProcessFunction out) throws Exception {
try {
- Map>> broadcast = ctx.getBroadcastState(OutputStreamSink.descriptor).get("broadcast-state");
+ Map>> broadcast = ctx.getBroadcastState(descriptor).get("broadcast-state");
String destinationIp = value.getDestination_ip();
String attackType = value.getAttack_type();
logger.info("当前判断IP:{}, 类型: {}",destinationIp,attackType);
@@ -72,8 +79,12 @@ public class DosDetection extends BroadcastProcessFunction>> value, Context ctx, Collector out) throws Exception {
- ctx.getBroadcastState(OutputStreamSink.descriptor).put("broadcast-state", value);
+ public void processBroadcastElement(Map>> value, Context ctx, Collector out) {
+ try {
+ ctx.getBroadcastState(descriptor).put("broadcast-state", value);
+ }catch (Exception e){
+ logger.error("更新广播状态失败 {}",e);
+ }
}
public static void main(String[] args) {
@@ -86,8 +97,8 @@ public class DosDetection extends BroadcastProcessFunction flatSketchSource(){
- return DosSketchSource.createDosSketchSource().flatMap(new flatSketchLog());
+ return DosSketchSource.createDosSketchSource().flatMap(new FlatSketchLog());
}
private static WatermarkStrategy createWatermarkStrategy(){
@@ -35,7 +35,7 @@ public class ParseSketchLog {
.withTimestampAssigner((event, timestamp) -> event.getSketch_start_time() * 1000);
}
- private static class flatSketchLog implements FlatMapFunction {
+ private static class FlatSketchLog implements FlatMapFunction {
@Override
public void flatMap(String s, Collector collector) throws Exception {
try {
diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
index 755d9bf..a8fc74c 100644
--- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
+++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java
@@ -34,7 +34,7 @@ public class OutputStreamSink {
public static OutputTag outputTag = new OutputTag("traffic server ip metrics"){};
- public static MapStateDescriptor>>> descriptor = new MapStateDescriptor<>("boradcast-state",
+ private static MapStateDescriptor>>> descriptor = new MapStateDescriptor<>("boradcast-state",
Types.STRING,
new MapTypeInfo<>(String.class, new MapTypeInfo<>(String.class, (Class>) (Class>) List.class).getTypeClass()));
@@ -46,7 +46,7 @@ public class OutputStreamSink {
TrafficServerIpMetricsSink.sideOutputMetricsSink(middleStream);
FlinkEnvironmentUtils.streamExeEnv.execute(CommonConfig.STREAM_EXECUTION_JOB_NAME);
} catch (Exception e) {
- logger.error("");
+ logger.error("任务启动失败 {}",e);
}
}
diff --git a/src/main/java/com/zdjizhi/source/BaselineSource.java b/src/main/java/com/zdjizhi/source/BaselineSource.java
index 9998dc3..a6d0429 100644
--- a/src/main/java/com/zdjizhi/source/BaselineSource.java
+++ b/src/main/java/com/zdjizhi/source/BaselineSource.java
@@ -63,6 +63,7 @@ public class BaselineSource extends RichSourceFunction