2 Commits

Author SHA1 Message Date
梁超
93ed6bcddc Merge branch 'hotfix/duplicate-voip' into 'release/1.1'
merge: merge 1.2

See merge request galaxy/tsg_olap/sip-rtp-correlation!22
2023-12-08 06:36:00 +00:00
chaoc
4ab96737a1 merge: merge 1.2
fix:
1. Duplicate Voip
2. VSysID default value
3. Output sip when them not pair
2023-12-08 14:35:23 +08:00
5 changed files with 11 additions and 69 deletions

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2</version>
<version>1.1-rc3</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -59,7 +59,7 @@ public class CorrelateApp {
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("start_timestamp_ms").asLong()));
element.get("common_start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);

View File

@@ -21,8 +21,6 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
@@ -114,8 +112,6 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
record.getClientPort() > 0 &&
record.getServerPort() > 0;
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
@@ -124,7 +120,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
@@ -133,7 +129,7 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && cond8 && (!cond5 || cond7) && (
if (cond1 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
@@ -153,14 +149,6 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
public static <T, R> R executeSafely(Function<T, R> function, T v) {
try {
return function.apply(v);
} catch (Exception e) {
return null;
}
}
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;

View File

@@ -20,35 +20,31 @@ public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
public static final String F_COMMON_VSYS_ID = "vsys_id";
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
/**
* 字段名:数据记录中的流类型的 Flags
*/
public static final String F_FLAGS = "flags";
/**
* 字段名:数据记录中的服务端地址
*/
public static final String F_COMMON_SERVER_IP = "server_ip";
public static final String F_COMMON_SERVER_IP = "common_server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
public static final String F_COMMON_SERVER_PORT = "server_port";
public static final String F_COMMON_SERVER_PORT = "common_server_port";
/**
* 字段名:数据记录中的客户端地址
*/
public static final String F_COMMON_CLIENT_IP = "client_ip";
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
public static final String F_COMMON_CLIENT_PORT = "client_port";
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
/**
* ObjectNode data.
@@ -80,7 +76,7 @@ public class Record {
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
}
/**
@@ -174,30 +170,6 @@ public class Record {
return getInt(obj, field, 0);
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a long.
* @return The long value from the field or the default value if the field is not found or is not a long.
*/
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isNumber() ? node.asLong() : defaultValue;
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The long value from the field or 0 if the field is not found or is not a long.
*/
private static long getLong(final ObjectNode obj, final String field) {
return getLong(obj, field, 0L);
}
/**
* Get a string value from the specified field in the ObjectNode.
*

View File

@@ -48,22 +48,4 @@ public enum StreamDir {
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
/**
* Get the StreamDir enum based on the provided flags value.
*
* @param flags The flags.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir ofFlags(long flags) {
int v = 0;
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return of(v);
}
}