8 Commits

Author SHA1 Message Date
梁超
45891bc734 Merge branch 'feature/internal-ip-config' into 'main'
feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL'

See merge request galaxy/tsg_olap/sip-rtp-correlation!13
2023-10-13 06:43:38 +00:00
chaoc
19e0bce58f chore: update version 2023-10-13 14:43:08 +08:00
chaoc
10ce6cfa07 feat: add config 'DETERMINE_INTRANET_IP_BE_ABNORMAL' 2023-10-13 14:38:23 +08:00
梁超
9d5d99974b Merge branch 'release/1.0' into 'main'
release: 1.0

See merge request galaxy/tsg_olap/sip-rtp-correlation!12
2023-10-12 06:52:09 +00:00
liang chao
b9463f07ac Merge branch 'release/1.0' into 'main'
merge: 1.0-rc3

See merge request galaxy/tsg_olap/sip-rtp-correlation!10
2023-08-28 03:30:06 +00:00
liang chao
da572f4bd0 Merge branch 'release/1.0' into 'main'
merge: 1.0.rc1

See merge request galaxy/tsg_olap/sip-rtp-correlation!5
2023-08-16 03:05:22 +00:00
liang chao
77cdd73f02 Merge branch 'hotfix/no-collect-expire-data' into 'main'
fix: cannot collect data due to expiration

See merge request galaxy/tsg_olap/sip-rtp-correlation!4
2023-08-11 06:18:46 +00:00
liang chao
5481a7b9ee Merge branch 'feature/address-keyby-impl' into 'main'
feature: develop job using java

See merge request galaxy/tsg_olap/sip-rtp-correlation!3
2023-08-10 09:39:51 +00:00
4 changed files with 29 additions and 9 deletions

View File

@@ -28,7 +28,8 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | STRING | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| determine.intranet.ip.be.abnormal | BOOLEAN | N | True | SIP 中协商四元组中存在内网 IP 地址时,是否将其判定为异常数据 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.0-rc4</version>
<version>1.1-rc1</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -56,6 +56,15 @@ public class FusionConfigs {
.withDescription("Enable or disable the output of error records. " +
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether intranet IP addresses should be considered abnormal.
*/
public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL =
ConfigOptions.key("determine.intranet.ip.be.abnormal")
.booleanType()
.defaultValue(true)
.withDescription("Specifies whether intranet IP addresses should be treated as abnormal.");
/**
* Configuration option for specifying the Kafka topic name where the error data will be sent.
* This configuration option is used when the output of error records is enabled.

View File

@@ -3,6 +3,7 @@ package com.zdjizhi.flink.voip.error;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.conf.FusionConfiguration;
import com.zdjizhi.flink.voip.formats.JsonNodeSerializationSchema;
import com.zdjizhi.flink.voip.functions.FunctionHelper;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
@@ -88,10 +89,19 @@ public class ErrorHandler {
* The MeaninglessAddressProcessFunction is a custom ProcessFunction used to check for records
* with invalid or meaningless addresses and ports. It separates them into the invalid output tag if necessary.
*/
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> {
class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, ObjectNode> implements FunctionHelper {
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean determineIntranetIpBeAbnormal;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL);
}
@Override
public void processElement(ObjectNode obj,
ProcessFunction<ObjectNode, ObjectNode>.Context ctx,
@@ -100,20 +110,20 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
record.getClientPort() >= 0 &&
record.getServerPort() >= 0;
record.getClientPort() > 0 &&
record.getServerPort() > 0;
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isInternalIp(sipRecord.getOriginatorSdpConnectIp());
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| isInternalIp(sipRecord.getResponderSdpConnectIp());
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
isInternalIp(sipRecord.getResponderSdpConnectIp()) &&
isInternalIp(sipRecord.getOriginatorSdpConnectIp());
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
// Both client and server addresses in the data are valid.
if (cond1 && (