diff --git a/pom.xml b/pom.xml
index df6185c..8c50e8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.zdjizhi
sip-rtp-correlation
- 1.2.1
+ 1.2.2
Flink : SIP-RTP : Correlation
diff --git a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java
index a08f86b..c8e32b7 100644
--- a/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java
+++ b/src/main/java/com/zdjizhi/flink/voip/functions/VoIPFusionFunction.java
@@ -1,9 +1,8 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
-import com.zdjizhi.flink.voip.records.Record;
-import com.zdjizhi.flink.voip.records.SchemaType;
-import com.zdjizhi.flink.voip.records.StreamDir;
+import com.zdjizhi.flink.voip.records.*;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
@@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector out) throws Exception {
for (ObjectNode obj : rtpState.values()) {
+ final Record rtpRecord = new Record(obj);
+ rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
out.collect(obj);
}
rtpState.clear();
sipState.clear();
}
+
+ // ======================================================================
+ // PRIVATE HELPER
+ // ======================================================================
+
+ private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
+ if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
+ if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
+ rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
+ return;
+ } else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
+ rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
+ return;
+ }
+ }
+ rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
+ }
}
\ No newline at end of file
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
new file mode 100644
index 0000000..c8df7db
--- /dev/null
+++ b/src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
@@ -0,0 +1,27 @@
+package com.zdjizhi.flink.voip.records;
+
+import lombok.Getter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class RTPRecord extends Record {
+
+ public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
+
+ public RTPRecord(ObjectNode obj) {
+ super(obj);
+ }
+
+ @Getter
+ public enum OriginatorDir {
+
+ UNKNOWN(0),
+ C2S(1),
+ S2C(2);
+
+ private final int code;
+
+ OriginatorDir(int code) {
+ this.code = code;
+ }
+ }
+}
diff --git a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
index 829f5f5..0f22986 100644
--- a/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
+++ b/src/test/java/com/zdjizhi/flink/voip/records/RecordTest.java
@@ -34,10 +34,10 @@ public class RecordTest {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
- assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
+ assertEquals(SchemaType.RTP, record.getSchemaType());
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
- assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
+ assertEquals(SchemaType.VOIP, record.getSchemaType());
}
@Test