14 Commits

Author SHA1 Message Date
梁超
970977ba3c Merge branch 'hotfix/output-sip' into 'release/1.0'
hotfix: add sip record output

See merge request galaxy/tsg_olap/sip-rtp-correlation!11
2023-10-12 02:44:32 +00:00
chaoc
732d9f5aa9 style: update version 2023-10-12 10:42:48 +08:00
chaoc
96fa19aea1 fix: use event time timer 2023-10-12 10:36:44 +08:00
chaoc
1b7c33d078 fix: output sip record 2023-10-12 10:36:29 +08:00
liang chao
35e2807a91 Merge branch 'hotfix/rename' into 'release/1.0'
style: rename job name

See merge request galaxy/tsg_olap/sip-rtp-correlation!9
2023-08-28 03:29:10 +00:00
chaoc
2275f349d1 Merge remote-tracking branch 'origin/release/1.0' into hotfix/rename 2023-08-28 11:27:23 +08:00
chaoc
1fedfbe4b8 style: add plugin reproducible 2023-08-28 11:26:25 +08:00
chaoc
b2f15b3919 style: modify job name 2023-08-28 11:11:02 +08:00
liang chao
8fc8cc7c2d Merge branch 'hotfix/null-point-err' into 'release/1.0'
style: update versions

See merge request galaxy/tsg_olap/sip-rtp-correlation!8
2023-08-21 09:09:25 +00:00
chaoc
98bb843159 style: update versions 2023-08-21 17:08:51 +08:00
liang chao
edb044596e Merge branch 'hotfix/null-point-err' into 'release/1.0'
docs: add hotfix changelog

See merge request galaxy/tsg_olap/sip-rtp-correlation!7
2023-08-21 09:06:49 +00:00
chaoc
557156af79 docs: add hotfix changelog 2023-08-21 17:06:22 +08:00
liang chao
32a811fb1c Merge branch 'hotfix/null-point-err' into 'release/1.0'
fix(utils): fix null point err

See merge request galaxy/tsg_olap/sip-rtp-correlation!6
2023-08-21 08:59:11 +00:00
chaoc
36cbaebf0c fix(utils): fix null point err 2023-08-21 16:57:13 +08:00
5 changed files with 62 additions and 17 deletions

7
CHANGELOG.md Normal file
View File

@@ -0,0 +1,7 @@
# Changelog
### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Other
- 输出 SIP Record

16
pom.xml
View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.0-rc1</version>
<version>1.0-rc4</version>
<name>Flink : SIP-RTP : Correlation</name>
@@ -285,6 +285,20 @@
</executions>
</plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>

View File

@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/**
@@ -52,7 +53,13 @@ public class CorrelateApp {
fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("common_start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
@@ -92,8 +99,8 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.addSink(producer);
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
env.execute("VoIP Fusion Job");
env.execute("SIP-RTP-CORRELATION");
}
}

View File

@@ -104,16 +104,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getServerPort() >= 0;
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !IPUtil.isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| IPUtil.internalIp(sipRecord.getOriginatorSdpConnectIp());
boolean cond3 = !IPUtil.isIPAddress(sipRecord.getResponderSdpConnectIp())
|| IPUtil.internalIp(sipRecord.getResponderSdpConnectIp());
boolean cond4 = IPUtil.isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| IPUtil.isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isInternalIp(sipRecord.getOriginatorSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| isInternalIp(sipRecord.getResponderSdpConnectIp());
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
IPUtil.internalIp(sipRecord.getResponderSdpConnectIp()) &&
IPUtil.internalIp(sipRecord.getOriginatorSdpConnectIp());
isInternalIp(sipRecord.getResponderSdpConnectIp()) &&
isInternalIp(sipRecord.getOriginatorSdpConnectIp());
// Both client and server addresses in the data are valid.
if (cond1 && (
@@ -132,4 +132,21 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
}
}
}
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;
}
return IPUtil.isIPAddress(ipaddr);
}
private static boolean isInternalIp(final String ipaddr) {
if (!isIPAddress(ipaddr)) {
return false;
}
return IPUtil.internalIp(ipaddr);
}
}

View File

@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
}
default void registerNextFireTimestamp(TimerService timerService, long interval) {
long current = timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(current + interval);
long current = timerService.currentWatermark();
timerService.registerEventTimeTimer(current + interval);
}
}