|
|
|
|
@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
|
|
|
|
|
import com.zdjizhi.flink.voip.records.SchemaType;
|
|
|
|
|
import com.zdjizhi.flink.voip.records.StreamDir;
|
|
|
|
|
import com.zdjizhi.utils.IPUtil;
|
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|
|
|
|
import org.apache.flink.configuration.Configuration;
|
|
|
|
|
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
|
|
|
|
@@ -22,6 +21,8 @@ import org.apache.flink.util.OutputTag;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import java.util.function.Function;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
|
|
|
|
|
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
|
|
|
|
|
@@ -108,11 +109,13 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|
|
|
|
Collector<ObjectNode> out) throws Exception {
|
|
|
|
|
final Record record = new Record(obj);
|
|
|
|
|
// Check for invalid or meaningless addresses and ports
|
|
|
|
|
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
|
|
|
|
|
StringUtils.isNotBlank(record.getServerIp()) &&
|
|
|
|
|
boolean cond1 = isIPAddress(record.getClientIp()) &&
|
|
|
|
|
isIPAddress(record.getServerIp()) &&
|
|
|
|
|
record.getClientPort() > 0 &&
|
|
|
|
|
record.getServerPort() > 0;
|
|
|
|
|
|
|
|
|
|
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
|
|
|
|
|
|
|
|
|
|
final SIPRecord sipRecord = new SIPRecord(obj);
|
|
|
|
|
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
|
|
|
|
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
|
|
|
|
|
@@ -121,12 +124,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|
|
|
|
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|
|
|
|
|
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
|
|
|
|
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
|
|
|
|
|
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
|
|
|
|
|
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
|
|
|
|
|
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
|
|
|
|
|
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
|
|
|
|
|
|
|
|
|
|
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
|
|
|
|
|
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
|
|
|
|
|
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
|
|
|
|
|
|
|
|
|
|
// Both client and server addresses in the data are valid.
|
|
|
|
|
if (cond1 && (
|
|
|
|
|
if (cond1 && cond8 && (!cond5 || cond7) && (
|
|
|
|
|
// The address in the SIP one-way stream is valid and not an internal network address.
|
|
|
|
|
cond2 && cond3 && cond4 && cond5
|
|
|
|
|
// The coordinating addresses in the SIP double directional stream are valid
|
|
|
|
|
@@ -146,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
|
|
|
|
|
// ======================================================================================
|
|
|
|
|
// ----------------------------------- private helper -----------------------------------
|
|
|
|
|
|
|
|
|
|
public static <T, R> R executeSafely(Function<T, R> function, T v) {
|
|
|
|
|
try {
|
|
|
|
|
return function.apply(v);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static boolean isIPAddress(final String ipaddr) {
|
|
|
|
|
if (null == ipaddr) {
|
|
|
|
|
return false;
|
|
|
|
|
|