2 Commits

Author SHA1 Message Date
梁超
93ed6bcddc Merge branch 'hotfix/duplicate-voip' into 'release/1.1'
merge: merge 1.2

See merge request galaxy/tsg_olap/sip-rtp-correlation!22
2023-12-08 06:36:00 +00:00
chaoc
4ab96737a1 merge: merge 1.2
fix:
1. Duplicate Voip
2. VSysID default value
3. Output sip when them not pair
2023-12-08 14:35:23 +08:00
6 changed files with 36 additions and 8 deletions

14
pom.xml
View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.1-rc2</version>
<version>1.1-rc3</version>
<name>Flink : SIP-RTP : Correlation</name>
@@ -24,6 +24,18 @@
<jackson.version>2.13.2.20220328</jackson.version>
</properties>
<distributionManagement>
<repository>
<id>platform-releases</id>
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
<uniqueVersion>true</uniqueVersion>
</repository>
<snapshotRepository>
<id>platform-snapshots</id>
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
</snapshotRepository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>

View File

@@ -99,7 +99,9 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
voIpOperator
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}

View File

@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
@@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && (
if (cond1 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid

View File

@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the mapState.
// If the address is not yet in the valueState.
valueState.update(value);
}
} else {
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNode value = valueState.value();
if (value != null) {
ctx.output(SIP_OUTPUT_TAG, value);
}
valueState.clear();
}
}

View File

@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C:

View File

@@ -57,7 +57,8 @@ public class Record {
* @return The VSys ID as an integer.
*/
public int getVSysID() {
return Record.getInt(obj, F_COMMON_VSYS_ID);
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
return v == 0 ? 1 : v;
}
/**