diff --git a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java index 2f1b767..98bbbe0 100644 --- a/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java +++ b/src/main/java/com/zdjizhi/flink/voip/error/ErrorHandler.java @@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord; import com.zdjizhi.flink.voip.records.SchemaType; import com.zdjizhi.flink.voip.records.StreamDir; import com.zdjizhi.utils.IPUtil; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction out) throws Exception { final Record record = new Record(obj); // Check for invalid or meaningless addresses and ports - boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) && - StringUtils.isNotBlank(record.getServerIp()) && + boolean cond1 = isIPAddress(record.getClientIp()) && + isIPAddress(record.getServerIp()) && record.getClientPort() > 0 && record.getServerPort() > 0; @@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction 0 && sipRecord.getOriginatorSdpMediaPort() > 0; + // Both client and server addresses in the data are valid. - if (cond1 && ( + if (cond1 && (!cond5 || cond7) && ( // The address in the SIP one-way stream is valid and not an internal network address. cond2 && cond3 && cond4 && cond5 // The coordinating addresses in the SIP double directional stream are valid diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index 19dc44e..a08f86b 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction