Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2542a8bfd2 | ||
|
|
1c0259a95c | ||
|
|
37f49c40d5 | ||
|
|
beef47df4c | ||
|
|
6d77d1c3c0 |
2
pom.xml
2
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.2-rc2</version>
|
||||
<version>1.2-rc3</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
@@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
||||
Collector<ObjectNode> out) throws Exception {
|
||||
final Record record = new Record(obj);
|
||||
// Check for invalid or meaningless addresses and ports
|
||||
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
|
||||
StringUtils.isNotBlank(record.getServerIp()) &&
|
||||
boolean cond1 = isIPAddress(record.getClientIp()) &&
|
||||
isIPAddress(record.getServerIp()) &&
|
||||
record.getClientPort() > 0 &&
|
||||
record.getServerPort() > 0;
|
||||
|
||||
@@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
||||
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
||||
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||
|
||||
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
|
||||
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
|
||||
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
|
||||
|
||||
// Both client and server addresses in the data are valid.
|
||||
if (cond1 && (
|
||||
if (cond1 && (!cond5 || cond7) && (
|
||||
// The address in the SIP one-way stream is valid and not an internal network address.
|
||||
cond2 && cond3 && cond4 && cond5
|
||||
// The coordinating addresses in the SIP double directional stream are valid
|
||||
|
||||
@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
rtpRecord.merge(sipObj)
|
||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||
out.collect(rtpObj);
|
||||
iterator.remove();
|
||||
|
||||
switch (entry.getKey()) {
|
||||
case S2C:
|
||||
|
||||
Reference in New Issue
Block a user