feat: modify config option name

This commit is contained in:
chaoc
2023-10-25 16:30:27 +08:00
parent 45891bc734
commit 617ddab7ab
2 changed files with 14 additions and 13 deletions

View File

@@ -57,13 +57,14 @@ public class FusionConfigs {
"If set to true, the error records will be sent to the specified Kafka topic.");
/**
* Configuration option to determine whether intranet IP addresses should be considered abnormal.
* Configuration option to determine whether to perform data correlate for intranet addresses.
*/
public static final ConfigOption<Boolean> DETERMINE_INTRANET_IP_BE_ABNORMAL =
ConfigOptions.key("determine.intranet.ip.be.abnormal")
public static final ConfigOption<Boolean> INCLUDE_INTRANET_IP =
ConfigOptions.key("include.intranet.ip")
.booleanType()
.defaultValue(true)
.withDescription("Specifies whether intranet IP addresses should be treated as abnormal.");
.withDescription("Whether to perform data correlate for intranet addresses");
/**
* Configuration option for specifying the Kafka topic name where the error data will be sent.

View File

@@ -93,13 +93,13 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
private static final Logger LOG = LoggerFactory.getLogger(MeaninglessAddressProcessFunction.class);
private transient boolean determineIntranetIpBeAbnormal;
private transient boolean includeIntranetIp;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
final Configuration config = getGlobalConfiguration();
determineIntranetIpBeAbnormal = config.get(FusionConfigs.DETERMINE_INTRANET_IP_BE_ABNORMAL);
includeIntranetIp = config.get(FusionConfigs.INCLUDE_INTRANET_IP);
}
@Override
@@ -114,16 +114,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getServerPort() > 0;
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
boolean cond3 = !isIPAddress(sipRecord.getResponderSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp()));
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(!determineIntranetIpBeAbnormal || isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
// Both client and server addresses in the data are valid.
if (cond1 && (