Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e277117c6d | ||
|
|
d54c93b61d | ||
|
|
3b06d3dfd5 | ||
|
|
e718120be1 | ||
|
|
1dffb8fb6f | ||
|
|
0994219ede | ||
|
|
6f915e5162 | ||
|
|
35247d7414 | ||
|
|
64f19b528e | ||
|
|
d0c3ebd60f | ||
|
|
114c180742 |
14
pom.xml
14
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.1-rc2</version>
|
||||
<version>1.2-rc1</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
@@ -24,6 +24,18 @@
|
||||
<jackson.version>2.13.2.20220328</jackson.version>
|
||||
</properties>
|
||||
|
||||
<distributionManagement>
|
||||
<repository>
|
||||
<id>platform-releases</id>
|
||||
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
|
||||
<uniqueVersion>true</uniqueVersion>
|
||||
</repository>
|
||||
<snapshotRepository>
|
||||
<id>platform-snapshots</id>
|
||||
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
|
||||
</snapshotRepository>
|
||||
</distributionManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
|
||||
@@ -59,7 +59,7 @@ public class CorrelateApp {
|
||||
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
|
||||
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
|
||||
(element, recordTimestamp) ->
|
||||
element.get("common_start_timestamp_ms").asLong()));
|
||||
element.get("start_timestamp_ms").asLong()));
|
||||
|
||||
final ErrorHandler errorHandler = new ErrorHandler(config);
|
||||
|
||||
|
||||
@@ -20,31 +20,35 @@ public class Record {
|
||||
/**
|
||||
* 字段名:数据记录中的所属 vsys
|
||||
*/
|
||||
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
|
||||
public static final String F_COMMON_VSYS_ID = "vsys_id";
|
||||
/**
|
||||
* 字段名:数据记录中的字段类型
|
||||
*/
|
||||
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
|
||||
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
|
||||
/**
|
||||
* 字段名:数据记录中的流类型
|
||||
*/
|
||||
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
|
||||
/**
|
||||
* 字段名:数据记录中的流类型的 Flags
|
||||
*/
|
||||
public static final String F_FLAGS = "flags";
|
||||
/**
|
||||
* 字段名:数据记录中的服务端地址
|
||||
*/
|
||||
public static final String F_COMMON_SERVER_IP = "common_server_ip";
|
||||
public static final String F_COMMON_SERVER_IP = "server_ip";
|
||||
/**
|
||||
* 字段名:数据记录中的服务端端口
|
||||
*/
|
||||
public static final String F_COMMON_SERVER_PORT = "common_server_port";
|
||||
public static final String F_COMMON_SERVER_PORT = "server_port";
|
||||
/**
|
||||
* 字段名:数据记录中的客户端地址
|
||||
*/
|
||||
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
|
||||
public static final String F_COMMON_CLIENT_IP = "client_ip";
|
||||
/**
|
||||
* 字段名:数据记录中的客户端端口
|
||||
*/
|
||||
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
|
||||
public static final String F_COMMON_CLIENT_PORT = "client_port";
|
||||
|
||||
/**
|
||||
* ObjectNode data.
|
||||
@@ -75,7 +79,7 @@ public class Record {
|
||||
* @return The stream direction.
|
||||
*/
|
||||
public final StreamDir getStreamDir() {
|
||||
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
|
||||
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -169,6 +173,30 @@ public class Record {
|
||||
return getInt(obj, field, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a long value from the specified field in the ObjectNode.
|
||||
*
|
||||
* @param obj The ObjectNode to get the value from.
|
||||
* @param field The name of the field.
|
||||
* @param defaultValue The default value to return if the field is not found or is not a long.
|
||||
* @return The long value from the field or the default value if the field is not found or is not a long.
|
||||
*/
|
||||
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
|
||||
final JsonNode node = obj.get(field);
|
||||
return node != null && node.isNumber() ? node.asLong() : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a long value from the specified field in the ObjectNode.
|
||||
*
|
||||
* @param obj The ObjectNode to get the value from.
|
||||
* @param field The name of the field.
|
||||
* @return The long value from the field or 0 if the field is not found or is not a long.
|
||||
*/
|
||||
private static long getLong(final ObjectNode obj, final String field) {
|
||||
return getLong(obj, field, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a string value from the specified field in the ObjectNode.
|
||||
*
|
||||
|
||||
@@ -48,4 +48,22 @@ public enum StreamDir {
|
||||
}
|
||||
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the StreamDir enum based on the provided flags value.
|
||||
*
|
||||
* @param flags The flags.
|
||||
* @return The corresponding StreamDir enum.
|
||||
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
|
||||
*/
|
||||
public static StreamDir ofFlags(long flags) {
|
||||
int v = 0;
|
||||
if ((flags & 8192) == 8192) {
|
||||
v += 1;
|
||||
}
|
||||
if ((flags & 16384) == 16384) {
|
||||
v += 2;
|
||||
}
|
||||
return of(v);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user