From d01235e092820c8c962935d4bab87ee4e74bb1bd Mon Sep 17 00:00:00 2001 From: chaochaoc <13051077615@126.com> Date: Fri, 10 May 2024 11:58:35 +0800 Subject: [PATCH 1/2] [GAL-568] fix: add field 'rtp_originator_dir' --- .../voip/functions/VoIPFusionFunction.java | 28 +++++++++++++++++-- .../zdjizhi/flink/voip/records/RTPRecord.java | 27 ++++++++++++++++++ .../flink/voip/records/RecordTest.java | 4 +-- 3 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java index a08f86b..c8e32b7 100644 --- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java +++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java @@ -1,9 +1,8 @@ package com.zdjizhi.flink.voip.functions; import com.zdjizhi.flink.voip.conf.FusionConfigs; -import com.zdjizhi.flink.voip.records.Record; -import com.zdjizhi.flink.voip.records.SchemaType; -import com.zdjizhi.flink.voip.records.StreamDir; +import com.zdjizhi.flink.voip.records.*; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; @@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx, Collector out) throws Exception { for (ObjectNode obj : rtpState.values()) { + final Record rtpRecord = new Record(obj); + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode()); out.collect(obj); } rtpState.clear(); sipState.clear(); } + + // ====================================================================== + // PRIVATE HELPER + // ====================================================================== + + private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) { + if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) { + if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) { + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode()); + return; + } else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) { + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode()); + return; + } + } + rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode()); + } } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java new file mode 100644 index 0000000..c8df7db --- /dev/null +++ b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java @@ -0,0 +1,27 @@ +package com.zdjizhi.flink.voip.records; + +import lombok.Getter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +public class RTPRecord extends Record { + + public static final String F_ORIGINATOR_DIR = "rtp_originator_dir"; + + public RTPRecord(ObjectNode obj) { + super(obj); + } + + @Getter + public enum OriginatorDir { + + UNKNOWN(0), + C2S(1), + S2C(2); + + private final int code; + + OriginatorDir(int code) { + this.code = code; + } + } +} diff --git a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java index 829f5f5..0f22986 100644 --- a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java +++ b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java @@ -34,10 +34,10 @@ public class RecordTest { final ObjectNode obj = mapper.createObjectNode(); final Record record = new Record(obj); record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue()); - assertEquals(SchemaType.RTP.getValue(), record.getSchemaType()); + assertEquals(SchemaType.RTP, record.getSchemaType()); obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue())); - assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType()); + assertEquals(SchemaType.VOIP, record.getSchemaType()); } @Test From 20e842190079d0394ff93f6b2479d20a5bba058f Mon Sep 17 00:00:00 2001 From: chaochaoc <13051077615@126.com> Date: Fri, 10 May 2024 13:49:37 +0800 Subject: [PATCH 2/2] chore: update versions --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index df6185c..8c50e8c 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.zdjizhi sip-rtp-correlation - 1.2.1 + 1.2.2 Flink : SIP-RTP : Correlation