diff --git a/pom.xml b/pom.xml index 460e46d..98af14f 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.zdjizhi sip-rtp-correlation - 1.1-rc2 + 1.1-rc3 Flink : SIP-RTP : Correlation @@ -24,6 +24,18 @@ 2.13.2.20220328 + + + platform-releases + http://192.168.40.153:8099/content/repositories/platform-release + true + + + platform-snapshots + http://192.168.40.153:8099/content/repositories/platform-snapshot + + + org.apache.flink diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index 5b699f3..ecad56f 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -99,7 +99,9 @@ public class CorrelateApp { new JsonNodeSerializationSchema(), fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); - voIpOperator.union(sipDoubleDirOperator).addSink(producer); + voIpOperator + .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG)) + .addSink(producer); env.execute("SIP-RTP-CORRELATION"); } diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java index 2f1b767..98bbbe0 100644 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java @@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord; import com.zdjizhi.flink.voip.records.SchemaType; import com.zdjizhi.flink.voip.records.StreamDir; import com.zdjizhi.utils.IPUtil; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction out) throws Exception { final Record record = new Record(obj); // Check for invalid or meaningless addresses and ports - boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) && - StringUtils.isNotBlank(record.getServerIp()) && + boolean cond1 = isIPAddress(record.getClientIp()) && + isIPAddress(record.getServerIp()) && record.getClientPort() > 0 && record.getServerPort() > 0; @@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction 0 && sipRecord.getOriginatorSdpMediaPort() > 0; + // Both client and server addresses in the data are valid. - if (cond1 && ( + if (cond1 && (!cond5 || cond7) && ( // The address in the SIP one-way stream is valid and not an internal network address. cond2 && cond3 && cond4 && cond5 // The coordinating addresses in the SIP double directional stream are valid diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java index 6b9f8d2..274da5d 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/SIPPairingFunction.java @@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; /** * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction. @@ -23,6 +25,9 @@ import org.apache.flink.util.Collector; public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode> implements FunctionHelper { + public static final OutputTag SIP_OUTPUT_TAG = + new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class)); + private transient Time fireInterval; private transient ValueState valueState; @@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector out) throws Exception { + final ObjectNode value = valueState.value(); + if (value != null) { + ctx.output(SIP_OUTPUT_TAG, value); + } valueState.clear(); } } diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index 19dc44e..a08f86b 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction