4 Commits

Author SHA1 Message Date
liang chao
b9463f07ac Merge branch 'release/1.0' into 'main'
merge: 1.0-rc3

See merge request galaxy/tsg_olap/sip-rtp-correlation!10
2023-08-28 03:30:06 +00:00
liang chao
da572f4bd0 Merge branch 'release/1.0' into 'main'
merge: 1.0.rc1

See merge request galaxy/tsg_olap/sip-rtp-correlation!5
2023-08-16 03:05:22 +00:00
liang chao
77cdd73f02 Merge branch 'hotfix/no-collect-expire-data' into 'main'
fix: cannot collect data due to expiration

See merge request galaxy/tsg_olap/sip-rtp-correlation!4
2023-08-11 06:18:46 +00:00
liang chao
5481a7b9ee Merge branch 'feature/address-keyby-impl' into 'main'
feature: develop job using java

See merge request galaxy/tsg_olap/sip-rtp-correlation!3
2023-08-10 09:39:51 +00:00
4 changed files with 9 additions and 19 deletions

View File

@@ -1,7 +1,4 @@
# Changelog
### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Other
- 输出 SIP Record
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.0-rc4</version>
<version>1.0-rc3</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -13,13 +13,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/**
@@ -53,13 +52,7 @@ public class CorrelateApp {
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("common_start_timestamp_ms").asLong()));
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
final ErrorHandler errorHandler = new ErrorHandler(config);
@@ -99,7 +92,7 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
voIpOperator.addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}

View File

@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
}
default void registerNextFireTimestamp(TimerService timerService, long interval) {
long current = timerService.currentWatermark();
timerService.registerEventTimeTimer(current + interval);
long current = timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(current + interval);
}
}