diff --git a/CHANGELOG.md b/CHANGELOG.md index 22339a4..c098da0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ # Changelog ### Hotfix - - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常 \ No newline at end of file + - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常 + +### Other + - 输出 SIP Record \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9ad26c3..eceeb27 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.zdjizhi sip-rtp-correlation - 1.0-rc3 + 1.0-rc4 Flink : SIP-RTP : Correlation diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index 58b5e53..5b699f3 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration; import com.zdjizhi.flink.voip.error.ErrorHandler; import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; import com.zdjizhi.flink.voip.functions.*; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SIPRecord; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; @@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.formats.json.JsonNodeDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import java.time.Duration; + import static com.zdjizhi.flink.voip.conf.FusionConfigs.*; /** @@ -52,7 +53,13 @@ public class CorrelateApp { fusionConfiguration .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); - final DataStreamSource sourceStream = env.addSource(kafkaConsumer); + final DataStream sourceStream = env.addSource(kafkaConsumer) + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(5)) + .withTimestampAssigner((SerializableTimestampAssigner) + (element, recordTimestamp) -> + element.get("common_start_timestamp_ms").asLong())); final ErrorHandler errorHandler = new ErrorHandler(config); @@ -92,7 +99,7 @@ public class CorrelateApp { new JsonNodeSerializationSchema(), fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); - voIpOperator.addSink(producer); + voIpOperator.union(sipDoubleDirOperator).addSink(producer); env.execute("SIP-RTP-CORRELATION"); } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java index c055db0..216e283 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/FunctionHelper.java @@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction { } default void registerNextFireTimestamp(TimerService timerService, long interval) { - long current = timerService.currentProcessingTime(); - timerService.registerProcessingTimeTimer(current + interval); + long current = timerService.currentWatermark(); + timerService.registerEventTimeTimer(current + interval); } }