diff --git a/pom.xml b/pom.xml index 77588e3..df61106 100644 --- a/pom.xml +++ b/pom.xml @@ -279,6 +279,13 @@ + + + com.alibaba.fastjson2 + fastjson2 + 2.0.32 + + com.alibaba.nacos nacos-client diff --git a/src/main/java/com/zdjizhi/sink/DosEventSink.java b/src/main/java/com/zdjizhi/sink/DosEventSink.java index 2501883..18694ba 100644 --- a/src/main/java/com/zdjizhi/sink/DosEventSink.java +++ b/src/main/java/com/zdjizhi/sink/DosEventSink.java @@ -1,8 +1,9 @@ package com.zdjizhi.sink; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosEventLog; -import com.zdjizhi.utils.JsonMapper; +//import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.KafkaUtils; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -13,7 +14,8 @@ class DosEventSink { static void dosEventOutputSink(SingleOutputStreamOperator dosEventLogOutputStream){ dosEventLogOutputStream .filter(Objects::nonNull) - .map(JsonMapper::toJsonString) +// .map(JsonMapper::toJsonString) + .map(JSONObject::toJSONString) .addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_EVENT_TOPIC_NAME)) .setParallelism(CommonConfig.KAFKA_OUTPUT_EVENT_PARALLELISM); } diff --git a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java index 5f4a235..7c5faa4 100644 --- a/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java +++ b/src/main/java/com/zdjizhi/sink/TrafficServerIpMetricsSink.java @@ -1,5 +1,6 @@ package com.zdjizhi.sink; +import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.DosMetricsLog; import com.zdjizhi.common.DosSketchLog; @@ -14,7 +15,8 @@ class TrafficServerIpMetricsSink { static void sideOutputMetricsSink(SingleOutputStreamOperator outputStream){ DataStream sideOutput = outputStream.getSideOutput(outputTag); - sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) +// sideOutput.map(JsonMapper::toJsonString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) + sideOutput.map(JSONObject::toJSONString).addSink(KafkaUtils.getKafkaSink(CommonConfig.KAFKA_OUTPUT_METRIC_TOPIC_NAME)) .setParallelism(CommonConfig.KAFKA_OUTPUT_METRIC_PARALLELISM);