GAL-352 DoS检测适配Fastjson2序列化库
This commit is contained in:
7
pom.xml
7
pom.xml
@@ -279,6 +279,13 @@
|
|||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.fastjson2</groupId>
|
||||||
|
<artifactId>fastjson2</artifactId>
|
||||||
|
<version>2.0.32</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba.nacos</groupId>
|
<groupId>com.alibaba.nacos</groupId>
|
||||||
<artifactId>nacos-client</artifactId>
|
<artifactId>nacos-client</artifactId>
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
package com.zdjizhi.sink;
|
package com.zdjizhi.sink;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.zdjizhi.common.CommonConfig;
|
import com.zdjizhi.common.CommonConfig;
|
||||||
import com.zdjizhi.common.DosEventLog;
|
import com.zdjizhi.common.DosEventLog;
|
||||||
import com.zdjizhi.utils.JsonMapper;
|
//import com.zdjizhi.utils.JsonMapper;
|
||||||
import com.zdjizhi.utils.KafkaUtils;
|
import com.zdjizhi.utils.KafkaUtils;
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
|
|
||||||
@@ -13,7 +14,8 @@ class DosEventSink {
|
|||||||
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
|
static void dosEventOutputSink(SingleOutputStreamOperator<DosEventLog> dosEventLogOutputStream){
|
||||||
dosEventLogOutputStream
|
dosEventLogOutputStream
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.map(JsonMapper::toJsonString)
|
// .map(JsonMapper::toJsonString)
|
||||||
|
.map(JSONObject::toJSONString)
|
||||||
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
|
.addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME))
|
||||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
|
.setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.zdjizhi.sink;
|
package com.zdjizhi.sink;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.zdjizhi.common.CommonConfig;
|
import com.zdjizhi.common.CommonConfig;
|
||||||
import com.zdjizhi.common.DosMetricsLog;
|
import com.zdjizhi.common.DosMetricsLog;
|
||||||
import com.zdjizhi.common.DosSketchLog;
|
import com.zdjizhi.common.DosSketchLog;
|
||||||
@@ -14,7 +15,8 @@ class TrafficServerIpMetricsSink {
|
|||||||
|
|
||||||
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
|
static void sideOutputMetricsSink(SingleOutputStreamOperator<DosSketchLog> outputStream){
|
||||||
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
|
DataStream<DosMetricsLog> sideOutput = outputStream.getSideOutput(outputTag);
|
||||||
sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
||||||
|
sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME))
|
||||||
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
|
.setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user