From 114c1807421ddc57d22dc2230420543fa3e3309e Mon Sep 17 00:00:00 2001 From: chaoc Date: Tue, 21 Nov 2023 09:59:44 +0800 Subject: [PATCH 1/6] feature: field update --- .../java/com/zdjizhi/flink/voip/records/Record.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java index 8e57e17..fab7299 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java @@ -20,31 +20,32 @@ public class Record { /** * 字段名:数据记录中的所属 vsys */ - public static final String F_COMMON_VSYS_ID = "common_vsys_id"; + public static final String F_COMMON_VSYS_ID = "vsys_id"; /** * 字段名:数据记录中的字段类型 */ - public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type"; + public static final String F_COMMON_SCHEMA_TYPE = "decoded_as"; /** * 字段名:数据记录中的流类型 */ + // TODO public static final String F_COMMON_STREAM_DIR = "common_stream_dir"; /** * 字段名:数据记录中的服务端地址 */ - public static final String F_COMMON_SERVER_IP = "common_server_ip"; + public static final String F_COMMON_SERVER_IP = "server_ip"; /** * 字段名:数据记录中的服务端端口 */ - public static final String F_COMMON_SERVER_PORT = "common_server_port"; + public static final String F_COMMON_SERVER_PORT = "server_port"; /** * 字段名:数据记录中的客户端地址 */ - public static final String F_COMMON_CLIENT_IP = "common_client_ip"; + public static final String F_COMMON_CLIENT_IP = "client_ip"; /** * 字段名:数据记录中的客户端端口 */ - public static final String F_COMMON_CLIENT_PORT = "common_client_port"; + public static final String F_COMMON_CLIENT_PORT = "client_port"; /** * ObjectNode data. From d0c3ebd60fdf5c37c651f458e2f4d23ed42a44b2 Mon Sep 17 00:00:00 2001 From: chaoc Date: Mon, 4 Dec 2023 10:27:48 +0800 Subject: [PATCH 2/6] feat: update stream dir field read method --- .../zdjizhi/flink/voip/records/Record.java | 31 +++++++++++++++++-- .../zdjizhi/flink/voip/records/StreamDir.java | 21 +++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java index fab7299..555db18 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java @@ -28,8 +28,11 @@ public class Record { /** * 字段名:数据记录中的流类型 */ - // TODO public static final String F_COMMON_STREAM_DIR = "common_stream_dir"; + /** + * 字段名:数据记录中的流类型的 Flags + */ + public static final String F_FLAGS = "flags"; /** * 字段名:数据记录中的服务端地址 */ @@ -76,7 +79,7 @@ public class Record { * @return The stream direction. */ public final StreamDir getStreamDir() { - return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR)); + return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS)); } /** @@ -170,6 +173,30 @@ public class Record { return getInt(obj, field, 0); } + /** + * Gets a long value from the specified field in the ObjectNode. + * + * @param obj The ObjectNode to get the value from. + * @param field The name of the field. + * @param defaultValue The default value to return if the field is not found or is not a long. + * @return The long value from the field or the default value if the field is not found or is not a long. + */ + public static long getLong(final ObjectNode obj, final String field, final long defaultValue) { + final JsonNode node = obj.get(field); + return node != null && node.isLong() ? node.asLong(defaultValue) : defaultValue; + } + + /** + * Gets a long value from the specified field in the ObjectNode. + * + * @param obj The ObjectNode to get the value from. + * @param field The name of the field. + * @return The long value from the field or 0 if the field is not found or is not a long. + */ + private static long getLong(final ObjectNode obj, final String field) { + return getLong(obj, field, 0L); + } + /** * Get a string value from the specified field in the ObjectNode. * diff --git a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java index 84418c8..19b8a7e 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java @@ -48,4 +48,25 @@ public enum StreamDir { } throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'."); } + + /** + * Get the StreamDir enum based on the provided flags value. + * + * @param flags The flags. + * @return The corresponding StreamDir enum. + * @throws IllegalArgumentException if the provided value does not match any known StreamDir. + */ + public static StreamDir ofFlags(long flags) { + int v = 0; + if ((flags & 8192) == 8192) { + v += 1; + } + if ((flags & 16384) == 16384) { + v += 2; + } + if ((flags & 32768) == 32768) { + v = 3; + } + return of(v); + } } From 64f19b528e76cb6dce272ef4caa5219870974fb8 Mon Sep 17 00:00:00 2001 From: chaoc Date: Mon, 4 Dec 2023 10:29:04 +0800 Subject: [PATCH 3/6] chore: update version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 460e46d..fb408bf 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.zdjizhi sip-rtp-correlation - 1.1-rc2 + 1.1-rc3-SNAPSHOT Flink : SIP-RTP : Correlation From 35247d7414cf5eb83fefe44806716c2647031e53 Mon Sep 17 00:00:00 2001 From: chaoc Date: Mon, 4 Dec 2023 17:08:53 +0800 Subject: [PATCH 4/6] fix: update field name --- src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java index 5b699f3..8701891 100644 --- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java +++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java @@ -59,7 +59,7 @@ public class CorrelateApp { .forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> - element.get("common_start_timestamp_ms").asLong())); + element.get("start_timestamp_ms").asLong())); final ErrorHandler errorHandler = new ErrorHandler(config); From 6f915e5162b8500f80c6203d4793f5f507e50988 Mon Sep 17 00:00:00 2001 From: chaoc Date: Mon, 4 Dec 2023 17:19:32 +0800 Subject: [PATCH 5/6] fix: update flag --- src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java index 19b8a7e..3732058 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java @@ -64,9 +64,6 @@ public enum StreamDir { if ((flags & 16384) == 16384) { v += 2; } - if ((flags & 32768) == 32768) { - v = 3; - } return of(v); } } From 0994219ede78ac18ff3b594d4456cacccf98bc8f Mon Sep 17 00:00:00 2001 From: chaoc Date: Tue, 5 Dec 2023 17:43:32 +0800 Subject: [PATCH 6/6] fix: long value reader --- src/main/java/com/zdjizhi/flink/voip/records/Record.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java index 555db18..41560e4 100644 --- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java +++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java @@ -183,7 +183,7 @@ public class Record { */ public static long getLong(final ObjectNode obj, final String field, final long defaultValue) { final JsonNode node = obj.get(field); - return node != null && node.isLong() ? node.asLong(defaultValue) : defaultValue; + return node != null && node.isNumber() ? node.asLong() : defaultValue; } /**