Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
970977ba3c | ||
|
|
732d9f5aa9 | ||
|
|
96fa19aea1 | ||
|
|
1b7c33d078 |
@@ -1,4 +1,7 @@
|
||||
# Changelog
|
||||
|
||||
### Hotfix
|
||||
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
|
||||
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
|
||||
|
||||
### Other
|
||||
- 输出 SIP Record
|
||||
2
pom.xml
2
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.0-rc3</version>
|
||||
<version>1.0-rc4</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
|
||||
@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||
import com.zdjizhi.flink.voip.error.ErrorHandler;
|
||||
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||
import com.zdjizhi.flink.voip.functions.*;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
@@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
|
||||
|
||||
/**
|
||||
@@ -52,7 +53,13 @@ public class CorrelateApp {
|
||||
fusionConfiguration
|
||||
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
|
||||
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
|
||||
.assignTimestampsAndWatermarks(
|
||||
WatermarkStrategy
|
||||
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
|
||||
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
|
||||
(element, recordTimestamp) ->
|
||||
element.get("common_start_timestamp_ms").asLong()));
|
||||
|
||||
final ErrorHandler errorHandler = new ErrorHandler(config);
|
||||
|
||||
@@ -92,7 +99,7 @@ public class CorrelateApp {
|
||||
new JsonNodeSerializationSchema(),
|
||||
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
voIpOperator.addSink(producer);
|
||||
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
|
||||
|
||||
env.execute("SIP-RTP-CORRELATION");
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
|
||||
}
|
||||
|
||||
default void registerNextFireTimestamp(TimerService timerService, long interval) {
|
||||
long current = timerService.currentProcessingTime();
|
||||
timerService.registerProcessingTimeTimer(current + interval);
|
||||
long current = timerService.currentWatermark();
|
||||
timerService.registerEventTimeTimer(current + interval);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user