Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53c6c267e8 | ||
|
|
20e8421900 | ||
|
|
d01235e092 | ||
|
|
12828291a9 | ||
|
|
f21e814763 | ||
|
|
11c2c641bb | ||
|
|
59cabb4868 | ||
|
|
b7c739a955 | ||
|
|
1f6ef08a30 | ||
|
|
87abd1e2ca |
25
README.md
25
README.md
@@ -22,18 +22,19 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
|
|||||||
|
|
||||||
## 配置项说明
|
## 配置项说明
|
||||||
|
|
||||||
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
||||||
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------|
|
|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
|
||||||
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
||||||
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||||
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||||
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||||
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||||
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
|
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
|
||||||
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||||
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||||
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||||
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
||||||
|
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2
pom.xml
2
pom.xml
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>sip-rtp-correlation</artifactId>
|
<artifactId>sip-rtp-correlation</artifactId>
|
||||||
<version>1.2-rc3</version>
|
<version>1.2.2</version>
|
||||||
|
|
||||||
<name>Flink : SIP-RTP : Correlation</name>
|
<name>Flink : SIP-RTP : Correlation</name>
|
||||||
|
|
||||||
|
|||||||
@@ -103,6 +103,6 @@ public class CorrelateApp {
|
|||||||
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
|
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
|
||||||
.addSink(producer);
|
.addSink(producer);
|
||||||
|
|
||||||
env.execute("SIP-RTP-CORRELATION");
|
env.execute(config.get(JOB_NAME));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -95,4 +95,13 @@ public class FusionConfigs {
|
|||||||
.intType()
|
.intType()
|
||||||
.defaultValue(6)
|
.defaultValue(6)
|
||||||
.withDescription("The interval at which RTP state data should be cleared.");
|
.withDescription("The interval at which RTP state data should be cleared.");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option for specifying the name of a job.
|
||||||
|
*/
|
||||||
|
public static final ConfigOption<String> JOB_NAME =
|
||||||
|
ConfigOptions.key("job.name")
|
||||||
|
.stringType()
|
||||||
|
.defaultValue("correlation_sip_rtp_session")
|
||||||
|
.withDescription("The name of current job.");
|
||||||
}
|
}
|
||||||
@@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
||||||
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
||||||
@@ -112,6 +114,8 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
record.getClientPort() > 0 &&
|
record.getClientPort() > 0 &&
|
||||||
record.getServerPort() > 0;
|
record.getServerPort() > 0;
|
||||||
|
|
||||||
|
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
|
||||||
|
|
||||||
final SIPRecord sipRecord = new SIPRecord(obj);
|
final SIPRecord sipRecord = new SIPRecord(obj);
|
||||||
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||||
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||||
@@ -120,7 +124,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||||
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||||
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
||||||
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
|
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
|
||||||
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
||||||
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||||
|
|
||||||
@@ -129,7 +133,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
|
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
|
||||||
|
|
||||||
// Both client and server addresses in the data are valid.
|
// Both client and server addresses in the data are valid.
|
||||||
if (cond1 && (!cond5 || cond7) && (
|
if (cond1 && cond8 && (!cond5 || cond7) && (
|
||||||
// The address in the SIP one-way stream is valid and not an internal network address.
|
// The address in the SIP one-way stream is valid and not an internal network address.
|
||||||
cond2 && cond3 && cond4 && cond5
|
cond2 && cond3 && cond4 && cond5
|
||||||
// The coordinating addresses in the SIP double directional stream are valid
|
// The coordinating addresses in the SIP double directional stream are valid
|
||||||
@@ -149,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|||||||
// ======================================================================================
|
// ======================================================================================
|
||||||
// ----------------------------------- private helper -----------------------------------
|
// ----------------------------------- private helper -----------------------------------
|
||||||
|
|
||||||
|
public static <T, R> R executeSafely(Function<T, R> function, T v) {
|
||||||
|
try {
|
||||||
|
return function.apply(v);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isIPAddress(final String ipaddr) {
|
private static boolean isIPAddress(final String ipaddr) {
|
||||||
if (null == ipaddr) {
|
if (null == ipaddr) {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
package com.zdjizhi.flink.voip.functions;
|
package com.zdjizhi.flink.voip.functions;
|
||||||
|
|
||||||
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||||
import com.zdjizhi.flink.voip.records.Record;
|
import com.zdjizhi.flink.voip.records.*;
|
||||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
|
||||||
import org.apache.flink.api.common.functions.RuntimeContext;
|
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||||
import org.apache.flink.api.common.state.*;
|
import org.apache.flink.api.common.state.*;
|
||||||
import org.apache.flink.api.common.time.Time;
|
import org.apache.flink.api.common.time.Time;
|
||||||
@@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
final ObjectNode rtpObj = entry.getValue();
|
final ObjectNode rtpObj = entry.getValue();
|
||||||
final Record rtpRecord = new Record(rtpObj);
|
final Record rtpRecord = new Record(rtpObj);
|
||||||
|
|
||||||
|
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
|
||||||
|
|
||||||
rtpRecord.merge(sipObj)
|
rtpRecord.merge(sipObj)
|
||||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
out.collect(rtpObj);
|
out.collect(rtpObj);
|
||||||
@@ -121,6 +122,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
final StreamDir streamDir = rtpRecord.getStreamDir();
|
final StreamDir streamDir = rtpRecord.getStreamDir();
|
||||||
if (null != info) {
|
if (null != info) {
|
||||||
|
|
||||||
|
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
|
||||||
|
|
||||||
rtpRecord.merge(info.getObj())
|
rtpRecord.merge(info.getObj())
|
||||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||||
out.collect(rtpObj);
|
out.collect(rtpObj);
|
||||||
@@ -150,9 +153,28 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
|||||||
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||||
Collector<ObjectNode> out) throws Exception {
|
Collector<ObjectNode> out) throws Exception {
|
||||||
for (ObjectNode obj : rtpState.values()) {
|
for (ObjectNode obj : rtpState.values()) {
|
||||||
|
final Record rtpRecord = new Record(obj);
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||||
out.collect(obj);
|
out.collect(obj);
|
||||||
}
|
}
|
||||||
rtpState.clear();
|
rtpState.clear();
|
||||||
sipState.clear();
|
sipState.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ======================================================================
|
||||||
|
// PRIVATE HELPER
|
||||||
|
// ======================================================================
|
||||||
|
|
||||||
|
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
|
||||||
|
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
|
||||||
|
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
|
||||||
|
return;
|
||||||
|
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package com.zdjizhi.flink.voip.records;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
|
public class RTPRecord extends Record {
|
||||||
|
|
||||||
|
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
|
||||||
|
|
||||||
|
public RTPRecord(ObjectNode obj) {
|
||||||
|
super(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
public enum OriginatorDir {
|
||||||
|
|
||||||
|
UNKNOWN(0),
|
||||||
|
C2S(1),
|
||||||
|
S2C(2);
|
||||||
|
|
||||||
|
private final int code;
|
||||||
|
|
||||||
|
OriginatorDir(int code) {
|
||||||
|
this.code = code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -34,10 +34,10 @@ public class RecordTest {
|
|||||||
final ObjectNode obj = mapper.createObjectNode();
|
final ObjectNode obj = mapper.createObjectNode();
|
||||||
final Record record = new Record(obj);
|
final Record record = new Record(obj);
|
||||||
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
||||||
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
|
assertEquals(SchemaType.RTP, record.getSchemaType());
|
||||||
|
|
||||||
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
||||||
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
|
assertEquals(SchemaType.VOIP, record.getSchemaType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user