5 Commits

Author SHA1 Message Date
梁超
e51d693fa0 Merge branch 'release/1.1' into 'main'
chore: update version

See merge request galaxy/tsg_olap/sip-rtp-correlation!15
2023-10-25 08:43:28 +00:00
chaoc
757808d313 chore: update version 2023-10-25 16:42:31 +08:00
梁超
e123233af9 Merge branch 'feature/config-key' into 'main'
[GAL-419] feat: modify config option name

See merge request galaxy/tsg_olap/sip-rtp-correlation!14
2023-10-25 08:40:51 +00:00
chaoc
ad3072cc7a docs: modify config option name 2023-10-25 16:35:30 +08:00
chaoc
617ddab7ab feat: modify config option name 2023-10-25 16:30:27 +08:00
5 changed files with 29 additions and 27 deletions

View File

@@ -3,5 +3,6 @@
### Hotfix
- [#5](https://git.mesalab.cn/galaxy/tsg_olap/sip-rtp-correlation/-/issues/5) 修复了由于 IPUtil 在判断 Ipv6 地址没有判空而引起的空指针异常
### Other
- 输出 SIP Record
### Feature
- 输出 SIP Record
- [GAL-419](https://jira.geedge.net/browse/GAL-419) 增加配置项 `include.intranet.ip`, 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联,不关联则输出到异常 Topic 中。

View File

@@ -22,18 +22,18 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
## 配置项说明
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- | ----------------------------------------------------------- |
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| determine.intranet.ip.be.abnormal | BOOLEAN | N | True | SIP 协商四元组中存在内网 IP 地址时,是否将其判定为异常数据 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.1-rc1</version>
<version>1.1-rc2</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -57,13 +57,14 @@ public class FusionConfigs {
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether intranet IP addresses should be considered abnormal.
* Configuration option to determine whether to perform data correlate for intranet addresses.
*/
public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL =
ConfigOptions.key("determine.intranet.ip.be.abnormal")
public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
ConfigOptions.key("include.intranet.ip")
.booleanType()
.defaultValue(true)
.withDescription("Specifies whether intranet IP addresses should be treated as abnormal.");
.withDescription("Whether to perform data correlate for intranet addresses");
/**
* Configuration option for specifying the Kafka topic name where the error data will be sent.

View File

@@ -93,13 +93,13 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean determineIntranetIpBeAbnormal;
private transient boolean includeIntranetIp;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL);
includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
}
@Override
@@ -114,16 +114,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getServerPort() > 0;
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
// Both client and server addresses in the data are valid.
if (cond1 && (