Merge branch 'bugfix/repeat-voip' into 'main'

fix: repeated voip record

See merge request galaxy/tsg_olap/sip-rtp-correlation!20
This commit is contained in:
梁超
2023-12-07 09:19:32 +00:00
2 changed files with 8 additions and 4 deletions

View File

@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -108,8 +107,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
@@ -125,8 +124,12 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && (
if (cond1 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid

View File

@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C: