Compare commits
4 Commits
master
...
release/1.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3f4f94a8f | ||
|
|
07beccc732 | ||
|
|
93ed6bcddc | ||
|
|
4ab96737a1 |
14
pom.xml
14
pom.xml
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>sip-rtp-correlation</artifactId>
|
<artifactId>sip-rtp-correlation</artifactId>
|
||||||
<version>1.1-rc2</version>
|
<version>1.1</version>
|
||||||
|
|
||||||
<name>Flink : SIP-RTP : Correlation</name>
|
<name>Flink : SIP-RTP : Correlation</name>
|
||||||
|
|
||||||
@@ -24,6 +24,18 @@
|
|||||||
<jackson.version>2.13.2.20220328</jackson.version>
|
<jackson.version>2.13.2.20220328</jackson.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
<distributionManagement>
|
||||||
|
<repository>
|
||||||
|
<id>platform-releases</id>
|
||||||
|
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
|
||||||
|
<uniqueVersion>true</uniqueVersion>
|
||||||
|
</repository>
|
||||||
|
<snapshotRepository>
|
||||||
|
<id>platform-snapshots</id>
|
||||||
|
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
|
||||||
|
</snapshotRepository>
|
||||||
|
</distributionManagement>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.flink</groupId>
|
<groupId>org.apache.flink</groupId>
|
||||||
|
|||||||
@@ -99,7 +99,9 @@ public class CorrelateApp {
|
|||||||
new JsonNodeSerializationSchema(),
|
new JsonNodeSerializationSchema(),
|
||||||
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
||||||
|
|
||||||
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
|
voIpOperator
|
||||||
|
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
|
||||||
|
.addSink(producer);
|
||||||
|
|
||||||
env.execute("SIP-RTP-CORRELATION");
|
env.execute("SIP-RTP-CORRELATION");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
|
|||||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||||
import com.zdjizhi.utils.IPUtil;
|
import com.zdjizhi.utils.IPUtil;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
@@ -22,6 +21,8 @@ import org.apache.flink.util.OutputTag;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
||||||
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
||||||
@@ -108,11 +109,13 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
Collector<ObjectNode> out) throws Exception {
|
Collector<ObjectNode> out) throws Exception {
|
||||||
final Record record = new Record(obj);
|
final Record record = new Record(obj);
|
||||||
// Check for invalid or meaningless addresses and ports
|
// Check for invalid or meaningless addresses and ports
|
||||||
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
|
boolean cond1 = isIPAddress(record.getClientIp()) &&
|
||||||
StringUtils.isNotBlank(record.getServerIp()) &&
|
isIPAddress(record.getServerIp()) &&
|
||||||
record.getClientPort() > 0 &&
|
record.getClientPort() > 0 &&
|
||||||
record.getServerPort() > 0;
|
record.getServerPort() > 0;
|
||||||
|
|
||||||
|
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
|
||||||
|
|
||||||
final SIPRecord sipRecord = new SIPRecord(obj);
|
final SIPRecord sipRecord = new SIPRecord(obj);
|
||||||
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||||
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||||
@@ -121,12 +124,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||||
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||||
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
||||||
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
|
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
|
||||||
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
||||||
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||||
|
|
||||||
|
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
|
||||||
|
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
|
||||||
|
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
|
||||||
|
|
||||||
// Both client and server addresses in the data are valid.
|
// Both client and server addresses in the data are valid.
|
||||||
if (cond1 && (
|
if (cond1 && cond8 && (!cond5 || cond7) && (
|
||||||
// The address in the SIP one-way stream is valid and not an internal network address.
|
// The address in the SIP one-way stream is valid and not an internal network address.
|
||||||
cond2 && cond3 && cond4 && cond5
|
cond2 && cond3 && cond4 && cond5
|
||||||
// The coordinating addresses in the SIP double directional stream are valid
|
// The coordinating addresses in the SIP double directional stream are valid
|
||||||
@@ -146,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
// ======================================================================================
|
// ======================================================================================
|
||||||
// ----------------------------------- private helper -----------------------------------
|
// ----------------------------------- private helper -----------------------------------
|
||||||
|
|
||||||
|
public static <T, R> R executeSafely(Function<T, R> function, T v) {
|
||||||
|
try {
|
||||||
|
return function.apply(v);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isIPAddress(final String ipaddr) {
|
private static boolean isIPAddress(final String ipaddr) {
|
||||||
if (null == ipaddr) {
|
if (null == ipaddr) {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
|
|||||||
import org.apache.flink.api.common.state.ValueState;
|
import org.apache.flink.api.common.state.ValueState;
|
||||||
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||||
import org.apache.flink.api.common.time.Time;
|
import org.apache.flink.api.common.time.Time;
|
||||||
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||||
import org.apache.flink.api.java.tuple.Tuple3;
|
import org.apache.flink.api.java.tuple.Tuple3;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
import org.apache.flink.util.OutputTag;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
||||||
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
|
|||||||
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
|
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
|
||||||
implements FunctionHelper {
|
implements FunctionHelper {
|
||||||
|
|
||||||
|
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
|
||||||
|
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
|
||||||
|
|
||||||
private transient Time fireInterval;
|
private transient Time fireInterval;
|
||||||
|
|
||||||
private transient ValueState<ObjectNode> valueState;
|
private transient ValueState<ObjectNode> valueState;
|
||||||
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
|
|||||||
out.collect(value);
|
out.collect(value);
|
||||||
valueState.clear();
|
valueState.clear();
|
||||||
} else {
|
} else {
|
||||||
// If the address is not yet in the mapState.
|
// If the address is not yet in the valueState.
|
||||||
valueState.update(value);
|
valueState.update(value);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
|
|||||||
public void onTimer(long timestamp,
|
public void onTimer(long timestamp,
|
||||||
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||||
Collector<ObjectNode> out) throws Exception {
|
Collector<ObjectNode> out) throws Exception {
|
||||||
|
final ObjectNode value = valueState.value();
|
||||||
|
if (value != null) {
|
||||||
|
ctx.output(SIP_OUTPUT_TAG, value);
|
||||||
|
}
|
||||||
valueState.clear();
|
valueState.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
rtpRecord.merge(sipObj)
|
rtpRecord.merge(sipObj)
|
||||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
out.collect(rtpObj);
|
out.collect(rtpObj);
|
||||||
|
iterator.remove();
|
||||||
|
|
||||||
switch (entry.getKey()) {
|
switch (entry.getKey()) {
|
||||||
case S2C:
|
case S2C:
|
||||||
|
|||||||
@@ -57,7 +57,8 @@ public class Record {
|
|||||||
* @return The VSys ID as an integer.
|
* @return The VSys ID as an integer.
|
||||||
*/
|
*/
|
||||||
public int getVSysID() {
|
public int getVSysID() {
|
||||||
return Record.getInt(obj, F_COMMON_VSYS_ID);
|
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
|
||||||
|
return v == 0 ? 1 : v;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user