23 Commits

Author SHA1 Message Date
梁超
93ed6bcddc Merge branch 'hotfix/duplicate-voip' into 'release/1.1'
merge: merge 1.2

See merge request galaxy/tsg_olap/sip-rtp-correlation!22
2023-12-08 06:36:00 +00:00
chaoc
4ab96737a1 merge: merge 1.2
fix:
1. Duplicate Voip
2. VSysID default value
3. Output sip when them not pair
2023-12-08 14:35:23 +08:00
梁超
e51d693fa0 Merge branch 'release/1.1' into 'main'
chore: update version

See merge request galaxy/tsg_olap/sip-rtp-correlation!15
2023-10-25 08:43:28 +00:00
chaoc
757808d313 chore: update version 2023-10-25 16:42:31 +08:00
梁超
e123233af9 Merge branch 'feature/config-key' into 'main'
[GAL-419] feat: modify config option name

See merge request galaxy/tsg_olap/sip-rtp-correlation!14
2023-10-25 08:40:51 +00:00
chaoc
ad3072cc7a docs: modify config option name 2023-10-25 16:35:30 +08:00
chaoc
617ddab7ab feat: modify config option name 2023-10-25 16:30:27 +08:00
梁超
45891bc734 Merge branch 'feature/internal-ip-config' into 'main'
feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL'

See merge request galaxy/tsg_olap/sip-rtp-correlation!13
2023-10-13 06:43:38 +00:00
chaoc
19e0bce58f chore: update version 2023-10-13 14:43:08 +08:00
chaoc
10ce6cfa07 feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL' 2023-10-13 14:38:23 +08:00
梁超
9d5d99974b Merge branch 'release/1.0' into 'main'
release: 1.0

See merge request galaxy/tsg_olap/sip-rtp-correlation!12
2023-10-12 06:52:09 +00:00
梁超
970977ba3c Merge branch 'hotfix/output-sip' into 'release/1.0'
hotfix: add sip record output

See merge request galaxy/tsg_olap/sip-rtp-correlation!11
2023-10-12 02:44:32 +00:00
chaoc
732d9f5aa9 style: update version 2023-10-12 10:42:48 +08:00
chaoc
96fa19aea1 fix: use event time timer 2023-10-12 10:36:44 +08:00
chaoc
1b7c33d078 fix: output sip record 2023-10-12 10:36:29 +08:00
liang chao
b9463f07ac Merge branch 'release/1.0' into 'main'
merge: 1.0-rc3

See merge request galaxy/tsg_olap/sip-rtp-correlation!10
2023-08-28 03:30:06 +00:00
liang chao
35e2807a91 Merge branch 'hotfix/rename' into 'release/1.0'
style: rename job name

See merge request galaxy/tsg_olap/sip-rtp-correlation!9
2023-08-28 03:29:10 +00:00
chaoc
2275f349d1 Merge remote-tracking branch 'origin/release/1.0' into hotfix/rename 2023-08-28 11:27:23 +08:00
chaoc
1fedfbe4b8 style: add plugin reproducible 2023-08-28 11:26:25 +08:00
chaoc
b2f15b3919 style: modify job name 2023-08-28 11:11:02 +08:00
liang chao
da572f4bd0 Merge branch 'release/1.0' into 'main'
merge: 1.0.rc1

See merge request galaxy/tsg_olap/sip-rtp-correlation!5
2023-08-16 03:05:22 +00:00
liang chao
77cdd73f02 Merge branch 'hotfix/no-collect-expire-data' into 'main'
fix: cannot collect data due to expiration

See merge request galaxy/tsg_olap/sip-rtp-correlation!4
2023-08-11 06:18:46 +00:00
liang chao
5481a7b9ee Merge branch 'feature/address-keyby-impl' into 'main'
feature: develop job using java

See merge request galaxy/tsg_olap/sip-rtp-correlation!3
2023-08-10 09:39:51 +00:00
10 changed files with 110 additions and 36 deletions

View File

@@ -2,3 +2,7 @@
### Hotfix ### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常 - [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Feature
- 输出 SIP Record
- [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中。

View File

@@ -23,12 +23,13 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
## 配置项说明 ## 配置项说明
| 配置项 | 类型 | 必需 | 默认值 | 描述 | | 配置项 | 类型 | 必需 | 默认值 | 描述 |
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- | ----------------------------------------------------------- | | --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 | | source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties | | source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 | | sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties | | sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 | | error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 | | error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties | | error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) | | sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |

28
pom.xml
View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId> <groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId> <artifactId>sip-rtp-correlation</artifactId>
<version>1.0-rc2</version> <version>1.1-rc3</version>
<name>Flink : SIP-RTP : Correlation</name> <name>Flink : SIP-RTP : Correlation</name>
@@ -24,6 +24,18 @@
<jackson.version>2.13.2.20220328</jackson.version> <jackson.version>2.13.2.20220328</jackson.version>
</properties> </properties>
<distributionManagement>
<repository>
<id>platform-releases</id>
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
<uniqueVersion>true</uniqueVersion>
</repository>
<snapshotRepository>
<id>platform-snapshots</id>
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
</snapshotRepository>
</distributionManagement>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
@@ -285,6 +297,20 @@
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>0.2</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <artifactId>maven-shade-plugin</artifactId>

View File

@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.error.ErrorHandler; import com.zdjizhi.flink.voip.error.ErrorHandler;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.*; import com.zdjizhi.flink.voip.functions.*;
import com.zdjizhi.flink.voip.records.Record; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import com.zdjizhi.flink.voip.records.SIPRecord; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.api.java.utils.ParameterTool;
@@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema; import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.time.Duration;
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*; import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
/** /**
@@ -52,7 +53,13 @@ public class CorrelateApp {
fusionConfiguration fusionConfiguration
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX)); .getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer); final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("common_start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config); final ErrorHandler errorHandler = new ErrorHandler(config);
@@ -92,8 +99,10 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(), new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX)); fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.addSink(producer); voIpOperator
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute("VoIP Fusion Job"); env.execute("SIP-RTP-CORRELATION");
} }
} }

View File

@@ -56,6 +56,16 @@ public class FusionConfigs {
.withDescription("Enable or disable the output of error records. " + .withDescription("Enable or disable the output of error records. " +
"If set to true, the error records will be sent to the specified Kafka topic."); "If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether to perform data correlate for intranet addresses.
*/
public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
ConfigOptions.key("include.intranet.ip")
.booleanType()
.defaultValue(true)
.withDescription("Whether to perform data correlate for intranet addresses");
/** /**
* Configuration option for specifying the Kafka topic name where the error data will be sent. * Configuration option for specifying the Kafka topic name where the error data will be sent.
* This configuration option is used when the output of error records is enabled. * This configuration option is used when the output of error records is enabled.

View File

@@ -3,12 +3,12 @@ package com.zdjizhi.flink.voip.error;
import com.zdjizhi.flink.voip.conf.FusionConfigs; import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.conf.FusionConfiguration; import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema; import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.FunctionHelper;
import com.zdjizhi.flink.voip.records.Record; import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord; import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType; import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir; import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil; import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -88,35 +88,48 @@ public class ErrorHandler {
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records * The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary. * with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
*/ */
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> { class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class); private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean includeIntranetIp;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
}
@Override @Override
public void processElement(ObjectNode obj, public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx, ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
Collector<ObjectNode> out) throws Exception { Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj); final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports // Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) && boolean cond1 = isIPAddress(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) && isIPAddress(record.getServerIp()) &&
record.getClientPort() >= 0 && record.getClientPort() > 0 &&
record.getServerPort() >= 0; record.getServerPort() > 0;
final SIPRecord sipRecord = new SIPRecord(obj); final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp()) boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isInternalIp(sipRecord.getOriginatorSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| isInternalIp(sipRecord.getResponderSdpConnectIp());
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp()); || isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType()); boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() && boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
isInternalIp(sipRecord.getResponderSdpConnectIp()) && (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
isInternalIp(sipRecord.getOriginatorSdpConnectIp()); (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid. // Both client and server addresses in the data are valid.
if (cond1 && ( if (cond1 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address. // The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5 cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid // The coordinating addresses in the SIP double directional stream are valid

View File

@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
} }
default void registerNextFireTimestamp(TimerService timerService, long interval) { default void registerNextFireTimestamp(TimerService timerService, long interval) {
long current = timerService.currentProcessingTime(); long current = timerService.currentWatermark();
timerService.registerProcessingTimeTimer(current + interval); timerService.registerEventTimeTimer(current + interval);
} }
} }

View File

@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/** /**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction. * A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode> public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper { implements FunctionHelper {
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
private transient Time fireInterval; private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState; private transient ValueState<ObjectNode> valueState;
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
out.collect(value); out.collect(value);
valueState.clear(); valueState.clear();
} else { } else {
// If the address is not yet in the mapState. // If the address is not yet in the valueState.
valueState.update(value); valueState.update(value);
} }
} else { } else {
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
public void onTimer(long timestamp, public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx, KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception { Collector<ObjectNode> out) throws Exception {
final ObjectNode value = valueState.value();
if (value != null) {
ctx.output(SIP_OUTPUT_TAG, value);
}
valueState.clear(); valueState.clear();
} }
} }

View File

@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
rtpRecord.merge(sipObj) rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue()); .setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj); out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) { switch (entry.getKey()) {
case S2C: case S2C:

View File

@@ -57,7 +57,8 @@ public class Record {
* @return The VSys ID as an integer. * @return The VSys ID as an integer.
*/ */
public int getVSysID() { public int getVSysID() {
return Record.getInt(obj, F_COMMON_VSYS_ID); int v = Record.getInt(obj, F_COMMON_VSYS_ID);
return v == 0 ? 1 : v;
} }
/** /**