Compare commits
30 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53c6c267e8 | ||
|
|
20e8421900 | ||
|
|
d01235e092 | ||
|
|
12828291a9 | ||
|
|
f21e814763 | ||
|
|
11c2c641bb | ||
|
|
59cabb4868 | ||
|
|
b7c739a955 | ||
|
|
1f6ef08a30 | ||
|
|
87abd1e2ca | ||
|
|
2542a8bfd2 | ||
|
|
1c0259a95c | ||
|
|
37f49c40d5 | ||
|
|
beef47df4c | ||
|
|
6d77d1c3c0 | ||
|
|
4179a0a887 | ||
|
|
6ebefc9026 | ||
|
|
701019c38a | ||
|
|
6ae7fdef06 | ||
|
|
e277117c6d | ||
|
|
d54c93b61d | ||
|
|
3b06d3dfd5 | ||
|
|
e718120be1 | ||
|
|
1dffb8fb6f | ||
|
|
0994219ede | ||
|
|
6f915e5162 | ||
|
|
35247d7414 | ||
|
|
64f19b528e | ||
|
|
d0c3ebd60f | ||
|
|
114c180742 |
25
README.md
25
README.md
@@ -22,18 +22,19 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
|
||||
|
||||
## 配置项说明
|
||||
|
||||
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
||||
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------|
|
||||
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
||||
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
|
||||
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
||||
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|
||||
|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
|
||||
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
|
||||
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
|
||||
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
|
||||
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
|
||||
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
|
||||
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
|
||||
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
|
||||
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
|
||||
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
|
||||
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
|
||||
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
|
||||
|
||||
|
||||
|
||||
|
||||
2
pom.xml
2
pom.xml
@@ -7,7 +7,7 @@
|
||||
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>sip-rtp-correlation</artifactId>
|
||||
<version>1.1</version>
|
||||
<version>1.2.2</version>
|
||||
|
||||
<name>Flink : SIP-RTP : Correlation</name>
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ public class CorrelateApp {
|
||||
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
|
||||
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
|
||||
(element, recordTimestamp) ->
|
||||
element.get("common_start_timestamp_ms").asLong()));
|
||||
element.get("start_timestamp_ms").asLong()));
|
||||
|
||||
final ErrorHandler errorHandler = new ErrorHandler(config);
|
||||
|
||||
@@ -103,6 +103,6 @@ public class CorrelateApp {
|
||||
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
|
||||
.addSink(producer);
|
||||
|
||||
env.execute("SIP-RTP-CORRELATION");
|
||||
env.execute(config.get(JOB_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,4 +95,13 @@ public class FusionConfigs {
|
||||
.intType()
|
||||
.defaultValue(6)
|
||||
.withDescription("The interval at which RTP state data should be cleared.");
|
||||
|
||||
/**
|
||||
* Configuration option for specifying the name of a job.
|
||||
*/
|
||||
public static final ConfigOption<String> JOB_NAME =
|
||||
ConfigOptions.key("job.name")
|
||||
.stringType()
|
||||
.defaultValue("correlation_sip_rtp_session")
|
||||
.withDescription("The name of current job.");
|
||||
}
|
||||
@@ -1,9 +1,8 @@
|
||||
package com.zdjizhi.flink.voip.functions;
|
||||
|
||||
import com.zdjizhi.flink.voip.conf.FusionConfigs;
|
||||
import com.zdjizhi.flink.voip.records.Record;
|
||||
import com.zdjizhi.flink.voip.records.SchemaType;
|
||||
import com.zdjizhi.flink.voip.records.StreamDir;
|
||||
import com.zdjizhi.flink.voip.records.*;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.flink.api.common.functions.RuntimeContext;
|
||||
import org.apache.flink.api.common.state.*;
|
||||
import org.apache.flink.api.common.time.Time;
|
||||
@@ -80,6 +79,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
final ObjectNode rtpObj = entry.getValue();
|
||||
final Record rtpRecord = new Record(rtpObj);
|
||||
|
||||
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
|
||||
|
||||
rtpRecord.merge(sipObj)
|
||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||
out.collect(rtpObj);
|
||||
@@ -121,6 +122,8 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
final StreamDir streamDir = rtpRecord.getStreamDir();
|
||||
if (null != info) {
|
||||
|
||||
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
|
||||
|
||||
rtpRecord.merge(info.getObj())
|
||||
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
|
||||
out.collect(rtpObj);
|
||||
@@ -150,9 +153,28 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
|
||||
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
|
||||
Collector<ObjectNode> out) throws Exception {
|
||||
for (ObjectNode obj : rtpState.values()) {
|
||||
final Record rtpRecord = new Record(obj);
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||
out.collect(obj);
|
||||
}
|
||||
rtpState.clear();
|
||||
sipState.clear();
|
||||
}
|
||||
|
||||
// ======================================================================
|
||||
// PRIVATE HELPER
|
||||
// ======================================================================
|
||||
|
||||
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
|
||||
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
|
||||
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
|
||||
return;
|
||||
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
|
||||
return;
|
||||
}
|
||||
}
|
||||
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
|
||||
}
|
||||
}
|
||||
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
27
src/main/java/com/zdjizhi/flink/voip/records/RTPRecord.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package com.zdjizhi.flink.voip.records;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class RTPRecord extends Record {
|
||||
|
||||
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
|
||||
|
||||
public RTPRecord(ObjectNode obj) {
|
||||
super(obj);
|
||||
}
|
||||
|
||||
@Getter
|
||||
public enum OriginatorDir {
|
||||
|
||||
UNKNOWN(0),
|
||||
C2S(1),
|
||||
S2C(2);
|
||||
|
||||
private final int code;
|
||||
|
||||
OriginatorDir(int code) {
|
||||
this.code = code;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,31 +20,35 @@ public class Record {
|
||||
/**
|
||||
* 字段名:数据记录中的所属 vsys
|
||||
*/
|
||||
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
|
||||
public static final String F_COMMON_VSYS_ID = "vsys_id";
|
||||
/**
|
||||
* 字段名:数据记录中的字段类型
|
||||
*/
|
||||
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
|
||||
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
|
||||
/**
|
||||
* 字段名:数据记录中的流类型
|
||||
*/
|
||||
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
|
||||
/**
|
||||
* 字段名:数据记录中的流类型的 Flags
|
||||
*/
|
||||
public static final String F_FLAGS = "flags";
|
||||
/**
|
||||
* 字段名:数据记录中的服务端地址
|
||||
*/
|
||||
public static final String F_COMMON_SERVER_IP = "common_server_ip";
|
||||
public static final String F_COMMON_SERVER_IP = "server_ip";
|
||||
/**
|
||||
* 字段名:数据记录中的服务端端口
|
||||
*/
|
||||
public static final String F_COMMON_SERVER_PORT = "common_server_port";
|
||||
public static final String F_COMMON_SERVER_PORT = "server_port";
|
||||
/**
|
||||
* 字段名:数据记录中的客户端地址
|
||||
*/
|
||||
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
|
||||
public static final String F_COMMON_CLIENT_IP = "client_ip";
|
||||
/**
|
||||
* 字段名:数据记录中的客户端端口
|
||||
*/
|
||||
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
|
||||
public static final String F_COMMON_CLIENT_PORT = "client_port";
|
||||
|
||||
/**
|
||||
* ObjectNode data.
|
||||
@@ -76,7 +80,7 @@ public class Record {
|
||||
* @return The stream direction.
|
||||
*/
|
||||
public final StreamDir getStreamDir() {
|
||||
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
|
||||
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -170,6 +174,30 @@ public class Record {
|
||||
return getInt(obj, field, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a long value from the specified field in the ObjectNode.
|
||||
*
|
||||
* @param obj The ObjectNode to get the value from.
|
||||
* @param field The name of the field.
|
||||
* @param defaultValue The default value to return if the field is not found or is not a long.
|
||||
* @return The long value from the field or the default value if the field is not found or is not a long.
|
||||
*/
|
||||
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
|
||||
final JsonNode node = obj.get(field);
|
||||
return node != null && node.isNumber() ? node.asLong() : defaultValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a long value from the specified field in the ObjectNode.
|
||||
*
|
||||
* @param obj The ObjectNode to get the value from.
|
||||
* @param field The name of the field.
|
||||
* @return The long value from the field or 0 if the field is not found or is not a long.
|
||||
*/
|
||||
private static long getLong(final ObjectNode obj, final String field) {
|
||||
return getLong(obj, field, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a string value from the specified field in the ObjectNode.
|
||||
*
|
||||
|
||||
@@ -48,4 +48,22 @@ public enum StreamDir {
|
||||
}
|
||||
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the StreamDir enum based on the provided flags value.
|
||||
*
|
||||
* @param flags The flags.
|
||||
* @return The corresponding StreamDir enum.
|
||||
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
|
||||
*/
|
||||
public static StreamDir ofFlags(long flags) {
|
||||
int v = 0;
|
||||
if ((flags & 8192) == 8192) {
|
||||
v += 1;
|
||||
}
|
||||
if ((flags & 16384) == 16384) {
|
||||
v += 2;
|
||||
}
|
||||
return of(v);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,10 +34,10 @@ public class RecordTest {
|
||||
final ObjectNode obj = mapper.createObjectNode();
|
||||
final Record record = new Record(obj);
|
||||
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
|
||||
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
|
||||
assertEquals(SchemaType.RTP, record.getSchemaType());
|
||||
|
||||
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
|
||||
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
|
||||
assertEquals(SchemaType.VOIP, record.getSchemaType());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user