Compare commits
57 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53c6c267e8 | ||
|
|
20e8421900 | ||
|
|
d01235e092 | ||
|
|
12828291a9 | ||
|
|
f21e814763 | ||
|
|
11c2c641bb | ||
|
|
59cabb4868 | ||
|
|
b7c739a955 | ||
|
|
1f6ef08a30 | ||
|
|
87abd1e2ca | ||
|
|
2542a8bfd2 | ||
|
|
1c0259a95c | ||
|
|
37f49c40d5 | ||
|
|
beef47df4c | ||
|
|
6d77d1c3c0 | ||
|
|
4179a0a887 | ||
|
|
6ebefc9026 | ||
|
|
701019c38a | ||
|
|
6ae7fdef06 | ||
|
|
e277117c6d | ||
|
|
d54c93b61d | ||
|
|
3b06d3dfd5 | ||
|
|
e718120be1 | ||
|
|
1dffb8fb6f | ||
|
|
0994219ede | ||
|
|
6f915e5162 | ||
|
|
35247d7414 | ||
|
|
64f19b528e | ||
|
|
d0c3ebd60f | ||
|
|
114c180742 | ||
|
|
e51d693fa0 | ||
|
|
757808d313 | ||
|
|
e123233af9 | ||
|
|
ad3072cc7a | ||
|
|
617ddab7ab | ||
|
|
45891bc734 | ||
|
|
19e0bce58f | ||
|
|
10ce6cfa07 | ||
|
|
9d5d99974b | ||
|
|
970977ba3c | ||
|
|
732d9f5aa9 | ||
|
|
96fa19aea1 | ||
|
|
1b7c33d078 | ||
|
|
b9463f07ac | ||
|
|
35e2807a91 | ||
|
|
2275f349d1 | ||
|
|
1fedfbe4b8 | ||
|
|
b2f15b3919 | ||
|
|
8fc8cc7c2d | ||
|
|
98bb843159 | ||
|
|
edb044596e | ||
|
|
557156af79 | ||
|
|
32a811fb1c | ||
|
|
36cbaebf0c | ||
|
|
da572f4bd0 | ||
|
|
77cdd73f02 | ||
|
|
5481a7b9ee |
8
CHANGELOG.md
Normal file
8
CHANGELOG.md
Normal file
@@ -0,0 +1,8 @@
|
||||
# Changelog
|
||||
|
||||
### Hotfix
|
||||
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
|
||||
|
||||
### Feature
|
||||
- 输出 SIP Record
|
||||
- [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中。
|
||||
24
README.md
24
README.md
@@ -22,17 +22,19 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
|
||||
|
||||
## 配置项说明
|
||||
|
||||
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
||||
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- | ----------------------------------------------------------- |
|
||||
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
||||
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||
| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
||||
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
||||
|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
|
||||
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
||||
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
|
||||
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
||||
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
|
||||
|
||||
|
||||
|
||||
|
||||
28
pom.xml
28
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.0-rc1</version>
|
||||
<version>1.2.2</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
@@ -24,6 +24,18 @@
|
||||
<jackson.version>2.13.2.20220328</jackson.version>
|
||||
</properties>
|
||||
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>platform-releases</id>
|
||||
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
|
||||
<uniqueVersion>true</uniqueVersion>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>platform-snapshots</id>
|
||||
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
|
||||
</snapshotRepository>
|
||||
</distributionManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
@@ -285,6 +297,20 @@
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>io.github.zlika</groupId>
|
||||
<artifactId>reproducible-build-maven-plugin</artifactId>
|
||||
<version>0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>strip-jar</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
|
||||
@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||
import com.zdjizhi.flink.voip.error.ErrorHandler;
|
||||
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||
import com.zdjizhi.flink.voip.functions.*;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
@@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
|
||||
|
||||
/**
|
||||
@@ -52,7 +53,13 @@ public class CorrelateApp {
|
||||
fusionConfiguration
|
||||
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
|
||||
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
|
||||
.assignTimestampsAndWatermarks(
|
||||
WatermarkStrategy
|
||||
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
|
||||
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
|
||||
(element, recordTimestamp) ->
|
||||
element.get("start_timestamp_ms").asLong()));
|
||||
|
||||
final ErrorHandler errorHandler = new ErrorHandler(config);
|
||||
|
||||
@@ -92,8 +99,10 @@ public class CorrelateApp {
|
||||
new JsonNodeSerializationSchema(),
|
||||
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
voIpOperator.addSink(producer);
|
||||
voIpOperator
|
||||
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
|
||||
.addSink(producer);
|
||||
|
||||
env.execute("VoIP Fusion Job");
|
||||
env.execute(config.get(JOB_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,16 @@ public class FusionConfigs {
|
||||
.withDescription("Enable or disable the output of error records. " +
|
||||
"If set to true, the error records will be sent to the specified Kafka topic.");
|
||||
|
||||
/**
|
||||
* Configuration option to determine whether to perform data correlate for intranet addresses.
|
||||
*/
|
||||
public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
|
||||
ConfigOptions.key("include.intranet.ip")
|
||||
.booleanType()
|
||||
.defaultValue(true)
|
||||
.withDescription("Whether to perform data correlate for intranet addresses");
|
||||
|
||||
|
||||
/**
|
||||
* Configuration option for specifying the Kafka topic name where the error data will be sent.
|
||||
* This configuration option is used when the output of error records is enabled.
|
||||
@@ -85,4 +95,13 @@ public class FusionConfigs {
|
||||
.intType()
|
||||
.defaultValue(6)
|
||||
.withDescription("The interval at which RTP state data should be cleared.");
|
||||
|
||||
/**
|
||||
* Configuration option for specifying the name of a job.
|
||||
*/
|
||||
public static final ConfigOption<String> JOB_NAME =
|
||||
ConfigOptions.key("job.name")
|
||||
.stringType()
|
||||
.defaultValue("correlation_sip_rtp_session")
|
||||
.withDescription("The name of current job.");
|
||||
}
|
||||
@@ -3,12 +3,12 @@ package com.zdjizhi.flink.voip.error;
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||
import com.zdjizhi.flink.voip.functions.FunctionHelper;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||
import com.zdjizhi.utils.IPUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
@@ -21,6 +21,8 @@ import org.apache.flink.util.OutputTag;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
||||
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
||||
@@ -88,35 +90,50 @@ public class ErrorHandler {
|
||||
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
|
||||
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
|
||||
*/
|
||||
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> {
|
||||
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
|
||||
|
||||
private transient boolean includeIntranetIp;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
final Configuration config = getGlobalConfiguration();
|
||||
includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(ObjectNode obj,
|
||||
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
|
||||
Collector<ObjectNode> out) throws Exception {
|
||||
final Record record = new Record(obj);
|
||||
// Check for invalid or meaningless addresses and ports
|
||||
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
|
||||
StringUtils.isNotBlank(record.getServerIp()) &&
|
||||
record.getClientPort() >= 0 &&
|
||||
record.getServerPort() >= 0;
|
||||
boolean cond1 = isIPAddress(record.getClientIp()) &&
|
||||
isIPAddress(record.getServerIp()) &&
|
||||
record.getClientPort() > 0 &&
|
||||
record.getServerPort() > 0;
|
||||
|
||||
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
|
||||
|
||||
final SIPRecord sipRecord = new SIPRecord(obj);
|
||||
boolean cond2 = !IPUtil.isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| IPUtil.internalIp(sipRecord.getOriginatorSdpConnectIp());
|
||||
boolean cond3 = !IPUtil.isIPAddress(sipRecord.getResponderSdpConnectIp())
|
||||
|| IPUtil.internalIp(sipRecord.getResponderSdpConnectIp());
|
||||
boolean cond4 = IPUtil.isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| IPUtil.isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|
||||
|| (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
|
||||
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
||||
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
|
||||
IPUtil.internalIp(sipRecord.getResponderSdpConnectIp()) &&
|
||||
IPUtil.internalIp(sipRecord.getOriginatorSdpConnectIp());
|
||||
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
|
||||
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
||||
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||
|
||||
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
|
||||
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
|
||||
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
|
||||
|
||||
// Both client and server addresses in the data are valid.
|
||||
if (cond1 && (
|
||||
if (cond1 && cond8 && (!cond5 || cond7) && (
|
||||
// The address in the SIP one-way stream is valid and not an internal network address.
|
||||
cond2 && cond3 && cond4 && cond5
|
||||
// The coordinating addresses in the SIP double directional stream are valid
|
||||
@@ -132,4 +149,29 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ======================================================================================
|
||||
// ----------------------------------- private helper -----------------------------------
|
||||
|
||||
public static <T, R> R executeSafely(Function<T, R> function, T v) {
|
||||
try {
|
||||
return function.apply(v);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isIPAddress(final String ipaddr) {
|
||||
if (null == ipaddr) {
|
||||
return false;
|
||||
}
|
||||
return IPUtil.isIPAddress(ipaddr);
|
||||
}
|
||||
|
||||
private static boolean isInternalIp(final String ipaddr) {
|
||||
if (!isIPAddress(ipaddr)) {
|
||||
return false;
|
||||
}
|
||||
return IPUtil.internalIp(ipaddr);
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
|
||||
}
|
||||
|
||||
default void registerNextFireTimestamp(TimerService timerService, long interval) {
|
||||
long current = timerService.currentProcessingTime();
|
||||
timerService.registerProcessingTimeTimer(current + interval);
|
||||
long current = timerService.currentWatermark();
|
||||
timerService.registerEventTimeTimer(current + interval);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
|
||||
import org.apache.flink.api.common.state.ValueState;
|
||||
import org.apache.flink.api.common.state.ValueStateDescriptor;
|
||||
import org.apache.flink.api.common.time.Time;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.OutputTag;
|
||||
|
||||
/**
|
||||
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
|
||||
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
|
||||
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
|
||||
implements FunctionHelper {
|
||||
|
||||
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
|
||||
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
|
||||
|
||||
private transient Time fireInterval;
|
||||
|
||||
private transient ValueState<ObjectNode> valueState;
|
||||
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
|
||||
out.collect(value);
|
||||
valueState.clear();
|
||||
} else {
|
||||
// If the address is not yet in the mapState.
|
||||
// If the address is not yet in the valueState.
|
||||
valueState.update(value);
|
||||
}
|
||||
} else {
|
||||
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
|
||||
public void onTimer(long timestamp,
|
||||
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||
Collector<ObjectNode> out) throws Exception {
|
||||
final ObjectNode value = valueState.value();
|
||||
if (value != null) {
|
||||
ctx.output(SIP_OUTPUT_TAG, value);
|
||||
}
|
||||
valueState.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
package com.zdjizhi.flink.voip.functions;
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||
import com.zdjizhi.flink.voip.records.*;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||
import org.apache.flink.api.common.state.*;
|
||||
import org.apache.flink.api.common.time.Time;
|
||||
@@ -80,9 +79,12 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
final ObjectNode rtpObj = entry.getValue();
|
||||
final Record rtpRecord = new Record(rtpObj);
|
||||
|
||||
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
|
||||
|
||||
rtpRecord.merge(sipObj)
|
||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||
out.collect(rtpObj);
|
||||
iterator.remove();
|
||||
|
||||
switch (entry.getKey()) {
|
||||
case S2C:
|
||||
@@ -120,6 +122,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
final StreamDir streamDir = rtpRecord.getStreamDir();
|
||||
if (null != info) {
|
||||
|
||||
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
|
||||
|
||||
rtpRecord.merge(info.getObj())
|
||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||
out.collect(rtpObj);
|
||||
@@ -149,9 +153,28 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||
Collector<ObjectNode> out) throws Exception {
|
||||
for (ObjectNode obj : rtpState.values()) {
|
||||
final Record rtpRecord = new Record(obj);
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||
out.collect(obj);
|
||||
}
|
||||
rtpState.clear();
|
||||
sipState.clear();
|
||||
}
|
||||
|
||||
// ======================================================================
|
||||
// PRIVATE HELPER
|
||||
// ======================================================================
|
||||
|
||||
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
|
||||
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
|
||||
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
|
||||
return;
|
||||
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
|
||||
return;
|
||||
}
|
||||
}
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||
}
|
||||
}
|
||||
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package com.zdjizhi.flink.voip.records;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class RTPRecord extends Record {
|
||||
|
||||
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
|
||||
|
||||
public RTPRecord(ObjectNode obj) {
|
||||
super(obj);
|
||||
}
|
||||
|
||||
@Getter
|
||||
public enum OriginatorDir {
|
||||
|
||||
UNKNOWN(0),
|
||||
C2S(1),
|
||||
S2C(2);
|
||||
|
||||
private final int code;
|
||||
|
||||
OriginatorDir(int code) {
|
||||
this.code = code;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,31 +20,35 @@ public class Record {
|
||||
/**
|
||||
* 字段名:数据记录中的所属 vsys
|
||||
*/
|
||||
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
|
||||
public static final String F_COMMON_VSYS_ID = "vsys_id";
|
||||
/**
|
||||
* 字段名:数据记录中的字段类型
|
||||
*/
|
||||
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
|
||||
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
|
||||
/**
|
||||
* 字段名:数据记录中的流类型
|
||||
*/
|
||||
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
|
||||
/**
|
||||
* 字段名:数据记录中的流类型的 Flags
|
||||
*/
|
||||
public static final String F_FLAGS = "flags";
|
||||
/**
|
||||
* 字段名:数据记录中的服务端地址
|
||||
*/
|
||||
public static final String F_COMMON_SERVER_IP = "common_server_ip";
|
||||
public static final String F_COMMON_SERVER_IP = "server_ip";
|
||||
/**
|
||||
* 字段名:数据记录中的服务端端口
|
||||
*/
|
||||
public static final String F_COMMON_SERVER_PORT = "common_server_port";
|
||||
public static final String F_COMMON_SERVER_PORT = "server_port";
|
||||
/**
|
||||
* 字段名:数据记录中的客户端地址
|
||||
*/
|
||||
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
|
||||
public static final String F_COMMON_CLIENT_IP = "client_ip";
|
||||
/**
|
||||
* 字段名:数据记录中的客户端端口
|
||||
*/
|
||||
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
|
||||
public static final String F_COMMON_CLIENT_PORT = "client_port";
|
||||
|
||||
/**
|
||||
* ObjectNode data.
|
||||
@@ -57,7 +61,8 @@ public class Record {
|
||||
* @return The VSys ID as an integer.
|
||||
*/
|
||||
public int getVSysID() {
|
||||
return Record.getInt(obj, F_COMMON_VSYS_ID);
|
||||
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
|
||||
return v == 0 ? 1 : v;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -75,7 +80,7 @@ public class Record {
|
||||
* @return The stream direction.
|
||||
*/
|
||||
public final StreamDir getStreamDir() {
|
||||
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
|
||||
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -169,6 +174,30 @@ public class Record {
|
||||
return getInt(obj, field, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a long value from the specified field in the ObjectNode.
|
||||
*
|
||||
* @param obj The ObjectNode to get the value from.
|
||||
* @param field The name of the field.
|
||||
* @param defaultValue The default value to return if the field is not found or is not a long.
|
||||
* @return The long value from the field or the default value if the field is not found or is not a long.
|
||||
*/
|
||||
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
|
||||
final JsonNode node = obj.get(field);
|
||||
return node != null && node.isNumber() ? node.asLong() : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a long value from the specified field in the ObjectNode.
|
||||
*
|
||||
* @param obj The ObjectNode to get the value from.
|
||||
* @param field The name of the field.
|
||||
* @return The long value from the field or 0 if the field is not found or is not a long.
|
||||
*/
|
||||
private static long getLong(final ObjectNode obj, final String field) {
|
||||
return getLong(obj, field, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a string value from the specified field in the ObjectNode.
|
||||
*
|
||||
|
||||
@@ -48,4 +48,22 @@ public enum StreamDir {
|
||||
}
|
||||
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the StreamDir enum based on the provided flags value.
|
||||
*
|
||||
* @param flags The flags.
|
||||
* @return The corresponding StreamDir enum.
|
||||
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
|
||||
*/
|
||||
public static StreamDir ofFlags(long flags) {
|
||||
int v = 0;
|
||||
if ((flags & 8192) == 8192) {
|
||||
v += 1;
|
||||
}
|
||||
if ((flags & 16384) == 16384) {
|
||||
v += 2;
|
||||
}
|
||||
return of(v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,10 +34,10 @@ public class RecordTest {
|
||||
final ObjectNode obj = mapper.createObjectNode();
|
||||
final Record record = new Record(obj);
|
||||
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
||||
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
|
||||
assertEquals(SchemaType.RTP, record.getSchemaType());
|
||||
|
||||
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
||||
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
|
||||
assertEquals(SchemaType.VOIP, record.getSchemaType());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user