4 Commits

Author SHA1 Message Date
chaoc
c3f4f94a8f chore: update version 2023-12-15 10:36:11 +08:00
chaoc
07beccc732 [GAL-444] fix: fix error caused by invalid flags 2023-12-15 10:27:02 +08:00
梁超
93ed6bcddc Merge branch 'hotfix/duplicate-voip' into 'release/1.1'
merge: merge 1.2

See merge request galaxy/tsg_olap/sip-rtp-correlation!22
2023-12-08 06:36:00 +00:00
chaoc
4ab96737a1 merge: merge 1.2
fix:
1. Duplicate Voip
2. VSysID default value
3. Output sip when them not pair
2023-12-08 14:35:23 +08:00
9 changed files with 27 additions and 132 deletions

View File

@@ -22,19 +22,18 @@ flink run -c com.zdjizhi.flink.voip.CorrelateApp path/to/sip-rtp-correlation-<ve
## 配置项说明
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
|----------------------------------| ------------------- | ---------- | ---------------------------------------------------------- |-------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |
| job.name | STRING | N | correlation_sip_rtp_session | Job 名 |
| 配置项 | 类型 | 必需 | 默认值 | 描述 |
| --------------------------- | ------------------- | ---------- | ----------------------------------------------------------- |--------------------------------------------|
| source.kafka.topic | STRING | Y | | 将要读取的 Kafka Topic 名称,其包含 SIP 和 RTP 原始数据 |
| source.kafka.props.* | MAP<STRING, STRING> | Y | | 将要读取的 Kafka 的 Properties |
| sink.kafka.topic | STRING | Y | | 将合成的 VoIP 及 未关联成功的 RTP 数据写出的 Kafka Topic 名 |
| sink.kafka.props.* | MAP<STRING, STRING> | Y | | 数据输出的 Kafka 的 Properties |
| error.records.output.enable | BOOLEAN | N | False | 是否开启异常数据的输出 【IP 或 Port 为空】 |
| include.intranet.ip | BOOLEAN | N | True | 是否对 SIP 协商主叫 IP 或被叫 IP 为内网地址的数据进行关联 |
| error.sink.kafka.topic | STRING | N | | 异常数据输出到的 Kafka Topic 名 |
| error.sink.kafka.props.* | MAP<STRING, STRING> | N | | 异常数据输出的 Kafka 的 Properties |
| sip.state.clear.interval.minutes | INT | N | 1 | SIP 单向流关联的窗口大小(单位:分钟) |
| rtp.state.clear.interval.minutes | INT | N | 6 | SIP 和 RTP 关联的窗口大小(单位:分钟) |

View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.2.2</version>
<version>1.1</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -59,7 +59,7 @@ public class CorrelateApp {
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("start_timestamp_ms").asLong()));
element.get("common_start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
@@ -103,6 +103,6 @@ public class CorrelateApp {
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute(config.get(JOB_NAME));
env.execute("SIP-RTP-CORRELATION");
}
}

View File

@@ -95,13 +95,4 @@ public class FusionConfigs {
.intType()
.defaultValue(6)
.withDescription("The interval at which RTP state data should be cleared.");
/**
* Configuration option for specifying the name of a job.
*/
public static final ConfigOption<String> JOB_NAME =
ConfigOptions.key("job.name")
.stringType()
.defaultValue("correlation_sip_rtp_session")
.withDescription("The name of current job.");
}

View File

@@ -1,8 +1,9 @@
package com.zdjizhi.flink.voip.functions;
import com.zdjizhi.flink.voip.conf.FusionConfigs;
import com.zdjizhi.flink.voip.records.*;
import org.apache.commons.lang3.StringUtils;
import com.zdjizhi.flink.voip.records.Record;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
@@ -79,8 +80,6 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
final ObjectNode rtpObj = entry.getValue();
final Record rtpRecord = new Record(rtpObj);
completeOriginatorField(rtpRecord, new SIPRecord(sipObj));
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
@@ -122,8 +121,6 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
final StreamDir streamDir = rtpRecord.getStreamDir();
if (null != info) {
completeOriginatorField(rtpRecord, new SIPRecord(info.getObj()));
rtpRecord.merge(info.getObj())
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
@@ -153,28 +150,9 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
KeyedCoProcessFunction<Tuple2<Integer, Address>, ObjectNode, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
for (ObjectNode obj : rtpState.values()) {
final Record rtpRecord = new Record(obj);
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
out.collect(obj);
}
rtpState.clear();
sipState.clear();
}
// ======================================================================
// PRIVATE HELPER
// ======================================================================
private void completeOriginatorField(final Record rtpRecord, final SIPRecord sipRecord) {
if (StringUtils.isNoneBlank(rtpRecord.getClientIp())) {
if (StringUtils.equals(sipRecord.getOriginatorSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.C2S.getCode());
return;
} else if (StringUtils.equals(sipRecord.getResponderSdpConnectIp(), rtpRecord.getClientIp())) {
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.S2C.getCode());
return;
}
}
rtpRecord.setInt(RTPRecord.F_ORIGINATOR_DIR, RTPRecord.OriginatorDir.UNKNOWN.getCode());
}
}

View File

@@ -1,27 +0,0 @@
package com.zdjizhi.flink.voip.records;
import lombok.Getter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
public class RTPRecord extends Record {
public static final String F_ORIGINATOR_DIR = "rtp_originator_dir";
public RTPRecord(ObjectNode obj) {
super(obj);
}
@Getter
public enum OriginatorDir {
UNKNOWN(0),
C2S(1),
S2C(2);
private final int code;
OriginatorDir(int code) {
this.code = code;
}
}
}

View File

@@ -20,35 +20,31 @@ public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
public static final String F_COMMON_VSYS_ID = "vsys_id";
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
/**
* 字段名:数据记录中的流类型的 Flags
*/
public static final String F_FLAGS = "flags";
/**
* 字段名:数据记录中的服务端地址
*/
public static final String F_COMMON_SERVER_IP = "server_ip";
public static final String F_COMMON_SERVER_IP = "common_server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
public static final String F_COMMON_SERVER_PORT = "server_port";
public static final String F_COMMON_SERVER_PORT = "common_server_port";
/**
* 字段名:数据记录中的客户端地址
*/
public static final String F_COMMON_CLIENT_IP = "client_ip";
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
public static final String F_COMMON_CLIENT_PORT = "client_port";
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
/**
* ObjectNode data.
@@ -80,7 +76,7 @@ public class Record {
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
}
/**
@@ -174,30 +170,6 @@ public class Record {
return getInt(obj, field, 0);
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a long.
* @return The long value from the field or the default value if the field is not found or is not a long.
*/
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isNumber() ? node.asLong() : defaultValue;
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The long value from the field or 0 if the field is not found or is not a long.
*/
private static long getLong(final ObjectNode obj, final String field) {
return getLong(obj, field, 0L);
}
/**
* Get a string value from the specified field in the ObjectNode.
*

View File

@@ -48,22 +48,4 @@ public enum StreamDir {
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
/**
* Get the StreamDir enum based on the provided flags value.
*
* @param flags The flags.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir ofFlags(long flags) {
int v = 0;
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return of(v);
}
}

View File

@@ -34,10 +34,10 @@ public class RecordTest {
final ObjectNode obj = mapper.createObjectNode();
final Record record = new Record(obj);
record.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.RTP.getValue());
assertEquals(SchemaType.RTP, record.getSchemaType());
assertEquals(SchemaType.RTP.getValue(), record.getSchemaType());
obj.set(Record.F_COMMON_SCHEMA_TYPE, TextNode.valueOf(SchemaType.VOIP.getValue()));
assertEquals(SchemaType.VOIP, record.getSchemaType());
assertEquals(SchemaType.VOIP.getValue(), record.getSchemaType());
}
@Test