Merge branch 'hotfix/field-completion' into 'main'
[GAL-568] fix: add field 'rtp_originator_dir' See merge request galaxy/tsg_olap/sip-rtp-correlation!25
This commit is contained in:
2
pom.xml
2
pom.xml
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>sip-rtp-correlation</artifactId>
|
<artifactId>sip-rtp-correlation</artifactId>
|
||||||
<version>1.2.1</version>
|
<version>1.2.2</version>
|
||||||
|
|
||||||
<name>Flink : SIP-RTP : Correlation</name>
|
<name>Flink : SIP-RTP : Correlation</name>
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
package com.zdjizhi.flink.voip.functions;
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||||
import com.zdjizhi.flink.voip.records.Record;
|
import com.zdjizhi.flink.voip.records.*;
|
||||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
|
||||||
import org.apache.flink.api.common.functions.RuntimeContext;
|
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||||
import org.apache.flink.api.common.state.*;
|
import org.apache.flink.api.common.state.*;
|
||||||
import org.apache.flink.api.common.time.Time;
|
import org.apache.flink.api.common.time.Time;
|
||||||
@@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
final ObjectNode rtpObj = entry.getValue();
|
final ObjectNode rtpObj = entry.getValue();
|
||||||
final Record rtpRecord = new Record(rtpObj);
|
final Record rtpRecord = new Record(rtpObj);
|
||||||
|
|
||||||
|
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
|
||||||
|
|
||||||
rtpRecord.merge(sipObj)
|
rtpRecord.merge(sipObj)
|
||||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
out.collect(rtpObj);
|
out.collect(rtpObj);
|
||||||
@@ -121,6 +122,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
final StreamDir streamDir = rtpRecord.getStreamDir();
|
final StreamDir streamDir = rtpRecord.getStreamDir();
|
||||||
if (null != info) {
|
if (null != info) {
|
||||||
|
|
||||||
|
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
|
||||||
|
|
||||||
rtpRecord.merge(info.getObj())
|
rtpRecord.merge(info.getObj())
|
||||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
out.collect(rtpObj);
|
out.collect(rtpObj);
|
||||||
@@ -150,9 +153,28 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||||
Collector<ObjectNode> out) throws Exception {
|
Collector<ObjectNode> out) throws Exception {
|
||||||
for (ObjectNode obj : rtpState.values()) {
|
for (ObjectNode obj : rtpState.values()) {
|
||||||
|
final Record rtpRecord = new Record(obj);
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||||
out.collect(obj);
|
out.collect(obj);
|
||||||
}
|
}
|
||||||
rtpState.clear();
|
rtpState.clear();
|
||||||
sipState.clear();
|
sipState.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ======================================================================
|
||||||
|
// PRIVATE HELPER
|
||||||
|
// ======================================================================
|
||||||
|
|
||||||
|
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
|
||||||
|
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
|
||||||
|
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
|
||||||
|
return;
|
||||||
|
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
public class RTPRecord extends Record {
|
||||||
|
|
||||||
|
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
|
||||||
|
|
||||||
|
public RTPRecord(ObjectNode obj) {
|
||||||
|
super(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
public enum OriginatorDir {
|
||||||
|
|
||||||
|
UNKNOWN(0),
|
||||||
|
C2S(1),
|
||||||
|
S2C(2);
|
||||||
|
|
||||||
|
private final int code;
|
||||||
|
|
||||||
|
OriginatorDir(int code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -34,10 +34,10 @@ public class RecordTest {
|
|||||||
final ObjectNode obj = mapper.createObjectNode();
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
final Record record = new Record(obj);
|
final Record record = new Record(obj);
|
||||||
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
||||||
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
|
assertEquals(SchemaType.RTP, record.getSchemaType());
|
||||||
|
|
||||||
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
||||||
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
|
assertEquals(SchemaType.VOIP, record.getSchemaType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user