diff --git a/pom.xml b/pom.xml
index 460e46d..fb408bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.zdjizhi
sip-rtp-correlation
- 1.1-rc2
+ 1.1-rc3-SNAPSHOT
Flink : SIP-RTP : Correlation
diff --git a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
index 5b699f3..8701891 100644
--- a/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
+++ b/src/main/java/com/zdjizhi/flink/voip/CorrelateApp.java
@@ -59,7 +59,7 @@ public class CorrelateApp {
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner)
(element, recordTimestamp) ->
- element.get("common_start_timestamp_ms").asLong()));
+ element.get("start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/Record.java b/src/main/java/com/zdjizhi/flink/voip/records/Record.java
index 8e57e17..41560e4 100644
--- a/src/main/java/com/zdjizhi/flink/voip/records/Record.java
+++ b/src/main/java/com/zdjizhi/flink/voip/records/Record.java
@@ -20,31 +20,35 @@ public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
- public static final String F_COMMON_VSYS_ID = "common_vsys_id";
+ public static final String F_COMMON_VSYS_ID = "vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
- public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
+ public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
+ /**
+ * 字段名:数据记录中的流类型的 Flags
+ */
+ public static final String F_FLAGS = "flags";
/**
* 字段名:数据记录中的服务端地址
*/
- public static final String F_COMMON_SERVER_IP = "common_server_ip";
+ public static final String F_COMMON_SERVER_IP = "server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
- public static final String F_COMMON_SERVER_PORT = "common_server_port";
+ public static final String F_COMMON_SERVER_PORT = "server_port";
/**
* 字段名:数据记录中的客户端地址
*/
- public static final String F_COMMON_CLIENT_IP = "common_client_ip";
+ public static final String F_COMMON_CLIENT_IP = "client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
- public static final String F_COMMON_CLIENT_PORT = "common_client_port";
+ public static final String F_COMMON_CLIENT_PORT = "client_port";
/**
* ObjectNode data.
@@ -75,7 +79,7 @@ public class Record {
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
- return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
+ return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
}
/**
@@ -169,6 +173,30 @@ public class Record {
return getInt(obj, field, 0);
}
+ /**
+ * Gets a long value from the specified field in the ObjectNode.
+ *
+ * @param obj The ObjectNode to get the value from.
+ * @param field The name of the field.
+ * @param defaultValue The default value to return if the field is not found or is not a long.
+ * @return The long value from the field or the default value if the field is not found or is not a long.
+ */
+ public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
+ final JsonNode node = obj.get(field);
+ return node != null && node.isNumber() ? node.asLong() : defaultValue;
+ }
+
+ /**
+ * Gets a long value from the specified field in the ObjectNode.
+ *
+ * @param obj The ObjectNode to get the value from.
+ * @param field The name of the field.
+ * @return The long value from the field or 0 if the field is not found or is not a long.
+ */
+ private static long getLong(final ObjectNode obj, final String field) {
+ return getLong(obj, field, 0L);
+ }
+
/**
* Get a string value from the specified field in the ObjectNode.
*
diff --git a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
index 84418c8..3732058 100644
--- a/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
+++ b/src/main/java/com/zdjizhi/flink/voip/records/StreamDir.java
@@ -48,4 +48,22 @@ public enum StreamDir {
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
+
+ /**
+ * Get the StreamDir enum based on the provided flags value.
+ *
+ * @param flags The flags.
+ * @return The corresponding StreamDir enum.
+ * @throws IllegalArgumentException if the provided value does not match any known StreamDir.
+ */
+ public static StreamDir ofFlags(long flags) {
+ int v = 0;
+ if ((flags & 8192) == 8192) {
+ v += 1;
+ }
+ if ((flags & 16384) == 16384) {
+ v += 2;
+ }
+ return of(v);
+ }
}