10 Commits

Author SHA1 Message Date
梁超
53c6c267e8 Merge branch 'hotfix/field-completion' into 'main'
[GAL-568] fix: add field 'rtp_originator_dir'

See merge request galaxy/tsg_olap/sip-rtp-correlation!25
2024-05-10 05:50:10 +00:00
chaochaoc
20e8421900 chore: update versions 2024-05-10 13:49:37 +08:00
chaochaoc
d01235e092 [GAL-568] fix: add field 'rtp_originator_dir' 2024-05-10 11:58:35 +08:00
梁超
12828291a9 Merge branch 'hotfix/app-name' into 'main'
fix: modify job name

See merge request galaxy/tsg_olap/sip-rtp-correlation!24
2023-12-18 07:52:56 +00:00
chaoc
f21e814763 chore: update versions 2023-12-18 15:47:41 +08:00
chaoc
11c2c641bb docs: add job name config 2023-12-18 15:47:30 +08:00
chaoc
59cabb4868 fix: add job name config 2023-12-18 15:47:23 +08:00
chaoc
b7c739a955 chore: update version 2023-12-15 10:32:17 +08:00
梁超
1f6ef08a30 Merge branch 'hotfix/illegal-flags' into 'release/1.2'
[GAL-444] fix: fix error caused by invalid flags

See merge request galaxy/tsg_olap/sip-rtp-correlation!23
2023-12-15 02:27:43 +00:00
chaoc
87abd1e2ca [GAL-444] fix: fix error caused by invalid flags 2023-12-15 10:24:06 +08:00
8 changed files with 92 additions and 21 deletions

View File

@@ -22,18 +22,19 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
## 配置项说明 ## 配置项说明
| 配置项 | 类型 | 必需 | 默认值 | 描述 | | 配置项 | 类型 | 必需 | 默认值 | 描述 |
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------| |----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 | | source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties | | source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 | | sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties | | sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | | error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 | | include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 | | error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties | | error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) | | sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) | | rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId> <artifactId>sip-rtp-correlation</artifactId>
<version>1.2-rc3</version> <version>1.2.2</version>
<name>Flink : SIP-RTP : Correlation</name> <name>Flink : SIP-RTP : Correlation</name>

View File

@@ -103,6 +103,6 @@ public class CorrelateApp {
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG)) .union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer); .addSink(producer);
env.execute("SIP-RTP-CORRELATION"); env.execute(config.get(JOB_NAME));
} }
} }

View File

@@ -95,4 +95,13 @@ public class FusionConfigs {
.intType() .intType()
.defaultValue(6) .defaultValue(6)
.withDescription("The interval at which RTP state data should be cleared."); .withDescription("The interval at which RTP state data should be cleared.");
/**
* Configuration option for specifying the name of a job.
*/
public static final ConfigOption<String> JOB_NAME =
ConfigOptions.key("job.name")
.stringType()
.defaultValue("correlation_sip_rtp_session")
.withDescription("The name of current job.");
} }

View File

@@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.function.Function;
/** /**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream. * The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled. * It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
@@ -112,6 +114,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getClientPort() > 0 && record.getClientPort() > 0 &&
record.getServerPort() > 0; record.getServerPort() > 0;
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
final SIPRecord sipRecord = new SIPRecord(obj); final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp()); || isIPAddress(sipRecord.getResponderSdpConnectIp());
@@ -120,7 +124,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp()) boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); || (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType()); boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() && boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) && (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp())); (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
@@ -129,7 +133,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0; sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid. // Both client and server addresses in the data are valid.
if (cond1 && (!cond5 || cond7) && ( if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address. // The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5 cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid // The coordinating addresses in the SIP double directional stream are valid
@@ -149,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// ====================================================================================== // ======================================================================================
// ----------------------------------- private helper ----------------------------------- // ----------------------------------- private helper -----------------------------------
public static <T, R> R executeSafely(Function<T, R> function, T v) {
try {
return function.apply(v);
} catch (Exception e) {
return null;
}
}
private static boolean isIPAddress(final String ipaddr) { private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) { if (null == ipaddr) {
return false; return false;

View File

@@ -1,9 +1,8 @@
package com.zdjizhi.flink.voip.functions; package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs; import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.Record; import com.zdjizhi.flink.voip.records.*;
import com.zdjizhi.flink.voip.records.SchemaType; import org.apache.commons.lang3.StringUtils;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
@@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
final ObjectNode rtpObj = entry.getValue(); final ObjectNode rtpObj = entry.getValue();
final Record rtpRecord = new Record(rtpObj); final Record rtpRecord = new Record(rtpObj);
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
rtpRecord.merge(sipObj) rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj); out.collect(rtpObj);
@@ -121,6 +122,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
final StreamDir streamDir = rtpRecord.getStreamDir(); final StreamDir streamDir = rtpRecord.getStreamDir();
if (null != info) { if (null != info) {
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
rtpRecord.merge(info.getObj()) rtpRecord.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj); out.collect(rtpObj);
@@ -150,9 +153,28 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx, KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception { Collector<ObjectNode> out) throws Exception {
for (ObjectNode obj : rtpState.values()) { for (ObjectNode obj : rtpState.values()) {
final Record rtpRecord = new Record(obj);
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
out.collect(obj); out.collect(obj);
} }
rtpState.clear(); rtpState.clear();
sipState.clear(); sipState.clear();
} }
// ======================================================================
// PRIVATE HELPER
// ======================================================================
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
return;
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
return;
}
}
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
}
} }

View File

@@ -0,0 +1,27 @@
package com.zdjizhi.flink.voip.records;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class RTPRecord extends Record {
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
public RTPRecord(ObjectNode obj) {
super(obj);
}
@Getter
public enum OriginatorDir {
UNKNOWN(0),
C2S(1),
S2C(2);
private final int code;
OriginatorDir(int code) {
this.code = code;
}
}
}

View File

@@ -34,10 +34,10 @@ public class RecordTest {
final ObjectNode obj = mapper.createObjectNode(); final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj); final Record record = new Record(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue()); record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType()); assertEquals(SchemaType.RTP, record.getSchemaType());
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue())); obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType()); assertEquals(SchemaType.VOIP, record.getSchemaType());
} }
@Test @Test