8 Commits

Author SHA1 Message Date
chaoc
b7c739a955 chore: update version 2023-12-15 10:32:17 +08:00
梁超
1f6ef08a30 Merge branch 'hotfix/illegal-flags' into 'release/1.2'
[GAL-444] fix: fix error caused by invalid flags

See merge request galaxy/tsg_olap/sip-rtp-correlation!23
2023-12-15 02:27:43 +00:00
chaoc
87abd1e2ca [GAL-444] fix: fix error caused by invalid flags 2023-12-15 10:24:06 +08:00
梁超
2542a8bfd2 Merge branch 'bugfix/repeat-voip' into 'main'
chore: update version

See merge request galaxy/tsg_olap/sip-rtp-correlation!21
2023-12-07 09:22:19 +00:00
chaoc
1c0259a95c chore: update version 2023-12-07 17:20:20 +08:00
梁超
37f49c40d5 Merge branch 'bugfix/repeat-voip' into 'main'
fix: repeated voip record

See merge request galaxy/tsg_olap/sip-rtp-correlation!20
2023-12-07 09:19:32 +00:00
chaoc
beef47df4c fix: add ipaddr non-empty check 2023-12-07 17:17:05 +08:00
chaoc
6d77d1c3c0 fix: remove rtp from state when fusion successfully 2023-12-07 17:15:11 +08:00
3 changed files with 22 additions and 6 deletions

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2-rc2</version>
<version>1.2</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -22,6 +21,8 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
@@ -108,11 +109,13 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
@@ -121,12 +124,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && (
if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
@@ -146,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
public static <T, R> R executeSafely(Function<T, R> function, T v) {
try {
return function.apply(v);
} catch (Exception e) {
return null;
}
}
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;

View File

@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C: