5 Commits

Author SHA1 Message Date
梁超
2542a8bfd2 Merge branch 'bugfix/repeat-voip' into 'main'
chore: update version

See merge request galaxy/tsg_olap/sip-rtp-correlation!21
2023-12-07 09:22:19 +00:00
chaoc
1c0259a95c chore: update version 2023-12-07 17:20:20 +08:00
梁超
37f49c40d5 Merge branch 'bugfix/repeat-voip' into 'main'
fix: repeated voip record

See merge request galaxy/tsg_olap/sip-rtp-correlation!20
2023-12-07 09:19:32 +00:00
chaoc
beef47df4c fix: add ipaddr non-empty check 2023-12-07 17:17:05 +08:00
chaoc
6d77d1c3c0 fix: remove rtp from state when fusion successfully 2023-12-07 17:15:11 +08:00
3 changed files with 9 additions and 5 deletions

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2-rc2</version>
<version>1.2-rc3</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
@@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && (
if (cond1 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid

View File

@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C: