Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45891bc734 | ||
|
|
19e0bce58f | ||
|
|
10ce6cfa07 | ||
|
|
9d5d99974b | ||
|
|
970977ba3c | ||
|
|
732d9f5aa9 | ||
|
|
96fa19aea1 | ||
|
|
1b7c33d078 | ||
|
|
b9463f07ac | ||
|
|
35e2807a91 | ||
|
|
2275f349d1 | ||
|
|
1fedfbe4b8 | ||
|
|
b2f15b3919 | ||
|
|
8fc8cc7c2d | ||
|
|
98bb843159 | ||
|
|
edb044596e | ||
|
|
557156af79 | ||
|
|
32a811fb1c | ||
|
|
36cbaebf0c | ||
|
|
da572f4bd0 | ||
|
|
77cdd73f02 | ||
|
|
5481a7b9ee |
7
CHANGELOG.md
Normal file
7
CHANGELOG.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Changelog
|
||||
|
||||
### Hotfix
|
||||
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
|
||||
|
||||
### Other
|
||||
- 输出 SIP Record
|
||||
@@ -28,7 +28,8 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
|
||||
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||
| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||
| determine.intranet.ip.be.abnormal | BOOLEAN | N | True | SIP 中协商四元组中存在内网 IP 地址时,是否将其判定为异常数据 |
|
||||
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||
|
||||
16
pom.xml
16
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.0-rc1</version>
|
||||
<version>1.1-rc1</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
@@ -285,6 +285,20 @@
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>io.github.zlika</groupId>
|
||||
<artifactId>reproducible-build-maven-plugin</artifactId>
|
||||
<version>0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>strip-jar</goal>
|
||||
</goals>
|
||||
<phase>package</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
|
||||
@@ -4,8 +4,8 @@ import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||
import com.zdjizhi.flink.voip.error.ErrorHandler;
|
||||
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||
import com.zdjizhi.flink.voip.functions.*;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
|
||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.utils.ParameterTool;
|
||||
@@ -13,12 +13,13 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import static com.zdjizhi.flink.voip.conf.FusionConfigs.*;
|
||||
|
||||
/**
|
||||
@@ -52,7 +53,13 @@ public class CorrelateApp {
|
||||
fusionConfiguration
|
||||
.getProperties(SOURCE_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
final DataStreamSource<ObjectNode> sourceStream = env.addSource(kafkaConsumer);
|
||||
final DataStream<ObjectNode> sourceStream = env.addSource(kafkaConsumer)
|
||||
.assignTimestampsAndWatermarks(
|
||||
WatermarkStrategy
|
||||
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
|
||||
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
|
||||
(element, recordTimestamp) ->
|
||||
element.get("common_start_timestamp_ms").asLong()));
|
||||
|
||||
final ErrorHandler errorHandler = new ErrorHandler(config);
|
||||
|
||||
@@ -92,8 +99,8 @@ public class CorrelateApp {
|
||||
new JsonNodeSerializationSchema(),
|
||||
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
|
||||
|
||||
voIpOperator.addSink(producer);
|
||||
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
|
||||
|
||||
env.execute("VoIP Fusion Job");
|
||||
env.execute("SIP-RTP-CORRELATION");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,15 @@ public class FusionConfigs {
|
||||
.withDescription("Enable or disable the output of error records. " +
|
||||
"If set to true, the error records will be sent to the specified Kafka topic.");
|
||||
|
||||
/**
|
||||
* Configuration option to determine whether intranet IP addresses should be considered abnormal.
|
||||
*/
|
||||
public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL =
|
||||
ConfigOptions.key("determine.intranet.ip.be.abnormal")
|
||||
.booleanType()
|
||||
.defaultValue(true)
|
||||
.withDescription("Specifies whether intranet IP addresses should be treated as abnormal.");
|
||||
|
||||
/**
|
||||
* Configuration option for specifying the Kafka topic name where the error data will be sent.
|
||||
* This configuration option is used when the output of error records is enabled.
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip.error;
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
|
||||
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
|
||||
import com.zdjizhi.flink.voip.functions.FunctionHelper;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SIPRecord;
|
||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||
@@ -88,10 +89,19 @@ public class ErrorHandler {
|
||||
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
|
||||
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
|
||||
*/
|
||||
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> {
|
||||
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
|
||||
|
||||
private transient boolean determineIntranetIpBeAbnormal;
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
final Configuration config = getGlobalConfiguration();
|
||||
determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(ObjectNode obj,
|
||||
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
|
||||
@@ -100,20 +110,20 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
||||
// Check for invalid or meaningless addresses and ports
|
||||
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
|
||||
StringUtils.isNotBlank(record.getServerIp()) &&
|
||||
record.getClientPort() >= 0 &&
|
||||
record.getServerPort() >= 0;
|
||||
record.getClientPort() > 0 &&
|
||||
record.getServerPort() > 0;
|
||||
|
||||
final SIPRecord sipRecord = new SIPRecord(obj);
|
||||
boolean cond2 = !IPUtil.isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| IPUtil.internalIp(sipRecord.getOriginatorSdpConnectIp());
|
||||
boolean cond3 = !IPUtil.isIPAddress(sipRecord.getResponderSdpConnectIp())
|
||||
|| IPUtil.internalIp(sipRecord.getResponderSdpConnectIp());
|
||||
boolean cond4 = IPUtil.isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| IPUtil.isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|
||||
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp()));
|
||||
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
||||
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
||||
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
||||
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
|
||||
IPUtil.internalIp(sipRecord.getResponderSdpConnectIp()) &&
|
||||
IPUtil.internalIp(sipRecord.getOriginatorSdpConnectIp());
|
||||
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
||||
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
||||
|
||||
// Both client and server addresses in the data are valid.
|
||||
if (cond1 && (
|
||||
@@ -132,4 +142,21 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ======================================================================================
|
||||
// ----------------------------------- private helper -----------------------------------
|
||||
|
||||
private static boolean isIPAddress(final String ipaddr) {
|
||||
if (null == ipaddr) {
|
||||
return false;
|
||||
}
|
||||
return IPUtil.isIPAddress(ipaddr);
|
||||
}
|
||||
|
||||
private static boolean isInternalIp(final String ipaddr) {
|
||||
if (!isIPAddress(ipaddr)) {
|
||||
return false;
|
||||
}
|
||||
return IPUtil.internalIp(ipaddr);
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,7 @@ public interface FunctionHelper extends RichFunction {
|
||||
}
|
||||
|
||||
default void registerNextFireTimestamp(TimerService timerService, long interval) {
|
||||
long current = timerService.currentProcessingTime();
|
||||
timerService.registerProcessingTimeTimer(current + interval);
|
||||
long current = timerService.currentWatermark();
|
||||
timerService.registerEventTimeTimer(current + interval);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user