23 Commits

Author SHA1 Message Date
chaoc
b7c739a955 chore: update version 2023-12-15 10:32:17 +08:00
梁超
1f6ef08a30 Merge branch 'hotfix/illegal-flags' into 'release/1.2'
[GAL-444] fix: fix error caused by invalid flags

See merge request galaxy/tsg_olap/sip-rtp-correlation!23
2023-12-15 02:27:43 +00:00
chaoc
87abd1e2ca [GAL-444] fix: fix error caused by invalid flags 2023-12-15 10:24:06 +08:00
梁超
2542a8bfd2 Merge branch 'bugfix/repeat-voip' into 'main'
chore: update version

See merge request galaxy/tsg_olap/sip-rtp-correlation!21
2023-12-07 09:22:19 +00:00
chaoc
1c0259a95c chore: update version 2023-12-07 17:20:20 +08:00
梁超
37f49c40d5 Merge branch 'bugfix/repeat-voip' into 'main'
fix: repeated voip record

See merge request galaxy/tsg_olap/sip-rtp-correlation!20
2023-12-07 09:19:32 +00:00
chaoc
beef47df4c fix: add ipaddr non-empty check 2023-12-07 17:17:05 +08:00
chaoc
6d77d1c3c0 fix: remove rtp from state when fusion successfully 2023-12-07 17:15:11 +08:00
梁超
4179a0a887 Merge branch 'bugfix/some-err' into 'main'
[GAL-444] fix: output unmatched sip record

See merge request galaxy/tsg_olap/sip-rtp-correlation!19
2023-12-06 10:17:02 +00:00
chaoc
6ebefc9026 chore: update version 2023-12-06 18:15:37 +08:00
chaoc
701019c38a [GAL-444] fix: output unmatched sip record 2023-12-06 18:14:52 +08:00
chaoc
6ae7fdef06 [GAL-444] fix: use default '1' instead of '0' 2023-12-06 18:14:33 +08:00
梁超
e277117c6d Merge branch 'release/1.2' into 'main'
chore: add distribution management

See merge request galaxy/tsg_olap/sip-rtp-correlation!18
2023-12-05 09:53:19 +00:00
chaoc
d54c93b61d chore: add distribution management 2023-12-05 17:52:49 +08:00
梁超
3b06d3dfd5 Merge branch 'release/1.2' into 'main'
chore: update version

See merge request galaxy/tsg_olap/sip-rtp-correlation!17
2023-12-05 09:51:35 +00:00
chaoc
e718120be1 chore: update version 2023-12-05 17:47:51 +08:00
梁超
1dffb8fb6f Merge branch 'feature/field-update' into 'main'
[GAL-444] feat: field update

See merge request galaxy/tsg_olap/sip-rtp-correlation!16
2023-12-05 09:45:50 +00:00
chaoc
0994219ede fix: long value reader 2023-12-05 17:43:32 +08:00
chaoc
6f915e5162 fix: update flag 2023-12-04 17:19:32 +08:00
chaoc
35247d7414 fix: update field name 2023-12-04 17:08:53 +08:00
chaoc
64f19b528e chore: update version 2023-12-04 10:29:04 +08:00
chaoc
d0c3ebd60f feat: update stream dir field read method 2023-12-04 10:28:03 +08:00
chaoc
114c180742 feature: field update 2023-11-21 09:59:44 +08:00
7 changed files with 103 additions and 17 deletions

14
pom.xml
View File

@@ -7,7 +7,7 @@
<groupId>com.zdjizhi</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>1.1-rc2</version>
<version>1.2</version>
<name>Flink : SIP-RTP : Correlation</name>
@@ -24,6 +24,18 @@
<jackson.version>2.13.2.20220328</jackson.version>
</properties>
<distributionManagement>
<repository>
<id>platform-releases</id>
<url>http://192.168.40.153:8099/content/repositories/platform-release</url>
<uniqueVersion>true</uniqueVersion>
</repository>
<snapshotRepository>
<id>platform-snapshots</id>
<url>http://192.168.40.153:8099/content/repositories/platform-snapshot</url>
</snapshotRepository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>

View File

@@ -59,7 +59,7 @@ public class CorrelateApp {
.<ObjectNode>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<ObjectNode>)
(element, recordTimestamp) ->
element.get("common_start_timestamp_ms").asLong()));
element.get("start_timestamp_ms").asLong()));
final ErrorHandler errorHandler = new ErrorHandler(config);
@@ -99,7 +99,9 @@ public class CorrelateApp {
new JsonNodeSerializationSchema(),
fusionConfiguration.getProperties(SINK_KAFKA_PROPERTIES_PREFIX));
voIpOperator.union(sipDoubleDirOperator).addSink(producer);
voIpOperator
.union(sipDoubleDirOperator.getSideOutput(SIPPairingFunction.SIP_OUTPUT_TAG))
.addSink(producer);
env.execute("SIP-RTP-CORRELATION");
}

View File

@@ -9,7 +9,6 @@ import com.zdjizhi.flink.voip.records.SIPRecord;
import com.zdjizhi.flink.voip.records.SchemaType;
import com.zdjizhi.flink.voip.records.StreamDir;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -22,6 +21,8 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
/**
* The ErrorHandler class is responsible for handling and filtering error records from the input data stream.
* It checks for records that have invalid or meaningless addresses and ports, and outputs them to a separate stream if enabled.
@@ -108,11 +109,13 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
Collector<ObjectNode> out) throws Exception {
final Record record = new Record(obj);
// Check for invalid or meaningless addresses and ports
boolean cond1 = StringUtils.isNotBlank(record.getClientIp()) &&
StringUtils.isNotBlank(record.getServerIp()) &&
boolean cond1 = isIPAddress(record.getClientIp()) &&
isIPAddress(record.getServerIp()) &&
record.getClientPort() > 0 &&
record.getServerPort() > 0;
boolean cond8 = null != executeSafely(Record::getStreamDir, record);
final SIPRecord sipRecord = new SIPRecord(obj);
boolean cond2 = isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| isIPAddress(sipRecord.getResponderSdpConnectIp());
@@ -121,12 +124,16 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
boolean cond4 = !isIPAddress(sipRecord.getOriginatorSdpConnectIp())
|| (includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond5 = SchemaType.SIP.equals(sipRecord.getSchemaType());
boolean cond6 = StreamDir.DOUBLE == sipRecord.getStreamDir() &&
boolean cond6 = StreamDir.DOUBLE == executeSafely(Record::getStreamDir, sipRecord) &&
(includeIntranetIp || !isInternalIp(sipRecord.getResponderSdpConnectIp())) &&
(includeIntranetIp || !isInternalIp(sipRecord.getOriginatorSdpConnectIp()));
boolean cond7 = isIPAddress(sipRecord.getOriginatorSdpConnectIp()) &&
isIPAddress(sipRecord.getResponderSdpConnectIp()) &&
sipRecord.getResponderSdpMediaPort() > 0 && sipRecord.getOriginatorSdpMediaPort() > 0;
// Both client and server addresses in the data are valid.
if (cond1 && (
if (cond1 && cond8 && (!cond5 || cond7) && (
// The address in the SIP one-way stream is valid and not an internal network address.
cond2 && cond3 && cond4 && cond5
// The coordinating addresses in the SIP double directional stream are valid
@@ -146,6 +153,14 @@ class MeaninglessAddressProcessFunction extends ProcessFunction<ObjectNode, Obje
// ======================================================================================
// ----------------------------------- private helper -----------------------------------
public static <T, R> R executeSafely(Function<T, R> function, T v) {
try {
return function.apply(v);
} catch (Exception e) {
return null;
}
}
private static boolean isIPAddress(final String ipaddr) {
if (null == ipaddr) {
return false;

View File

@@ -7,11 +7,13 @@ import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* A KeyedProcessFunction that pairs SIP records based on their addresses and stream direction.
@@ -23,6 +25,9 @@ import org.apache.flink.util.Collector;
public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>
implements FunctionHelper {
public static final OutputTag<ObjectNode> SIP_OUTPUT_TAG =
new OutputTag<>("unmatched-sip", TypeInformation.of(ObjectNode.class));
private transient Time fireInterval;
private transient ValueState<ObjectNode> valueState;
@@ -63,7 +68,7 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
out.collect(value);
valueState.clear();
} else {
// If the address is not yet in the mapState.
// If the address is not yet in the valueState.
valueState.update(value);
}
} else {
@@ -77,6 +82,10 @@ public class SIPPairingFunction extends KeyedProcessFunction<Tuple3<Integer, Str
public void onTimer(long timestamp,
KeyedProcessFunction<Tuple3<Integer, String, Address>, ObjectNode, ObjectNode>.OnTimerContext ctx,
Collector<ObjectNode> out) throws Exception {
final ObjectNode value = valueState.value();
if (value != null) {
ctx.output(SIP_OUTPUT_TAG, value);
}
valueState.clear();
}
}

View File

@@ -83,6 +83,7 @@ public class VoIPFusionFunction extends KeyedCoProcessFunction<Tuple2<Integer, A
rtpRecord.merge(sipObj)
.setString(Record.F_COMMON_SCHEMA_TYPE, SchemaType.VOIP.getValue());
out.collect(rtpObj);
iterator.remove();
switch (entry.getKey()) {
case S2C:

View File

@@ -20,31 +20,35 @@ public class Record {
/**
* 字段名:数据记录中的所属 vsys
*/
public static final String F_COMMON_VSYS_ID = "common_vsys_id";
public static final String F_COMMON_VSYS_ID = "vsys_id";
/**
* 字段名:数据记录中的字段类型
*/
public static final String F_COMMON_SCHEMA_TYPE = "common_schema_type";
public static final String F_COMMON_SCHEMA_TYPE = "decoded_as";
/**
* 字段名:数据记录中的流类型
*/
public static final String F_COMMON_STREAM_DIR = "common_stream_dir";
/**
* 字段名:数据记录中的流类型的 Flags
*/
public static final String F_FLAGS = "flags";
/**
* 字段名:数据记录中的服务端地址
*/
public static final String F_COMMON_SERVER_IP = "common_server_ip";
public static final String F_COMMON_SERVER_IP = "server_ip";
/**
* 字段名:数据记录中的服务端端口
*/
public static final String F_COMMON_SERVER_PORT = "common_server_port";
public static final String F_COMMON_SERVER_PORT = "server_port";
/**
* 字段名:数据记录中的客户端地址
*/
public static final String F_COMMON_CLIENT_IP = "common_client_ip";
public static final String F_COMMON_CLIENT_IP = "client_ip";
/**
* 字段名:数据记录中的客户端端口
*/
public static final String F_COMMON_CLIENT_PORT = "common_client_port";
public static final String F_COMMON_CLIENT_PORT = "client_port";
/**
* ObjectNode data.
@@ -57,7 +61,8 @@ public class Record {
* @return The VSys ID as an integer.
*/
public int getVSysID() {
return Record.getInt(obj, F_COMMON_VSYS_ID);
int v = Record.getInt(obj, F_COMMON_VSYS_ID);
return v == 0 ? 1 : v;
}
/**
@@ -75,7 +80,7 @@ public class Record {
* @return The stream direction.
*/
public final StreamDir getStreamDir() {
return StreamDir.of(Record.getInt(obj, F_COMMON_STREAM_DIR));
return StreamDir.ofFlags(Record.getLong(obj, F_FLAGS));
}
/**
@@ -169,6 +174,30 @@ public class Record {
return getInt(obj, field, 0);
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @param defaultValue The default value to return if the field is not found or is not a long.
* @return The long value from the field or the default value if the field is not found or is not a long.
*/
public static long getLong(final ObjectNode obj, final String field, final long defaultValue) {
final JsonNode node = obj.get(field);
return node != null && node.isNumber() ? node.asLong() : defaultValue;
}
/**
* Gets a long value from the specified field in the ObjectNode.
*
* @param obj The ObjectNode to get the value from.
* @param field The name of the field.
* @return The long value from the field or 0 if the field is not found or is not a long.
*/
private static long getLong(final ObjectNode obj, final String field) {
return getLong(obj, field, 0L);
}
/**
* Get a string value from the specified field in the ObjectNode.
*

View File

@@ -48,4 +48,22 @@ public enum StreamDir {
}
throw new IllegalArgumentException("Unknown StreamDir value '" + value + "'.");
}
/**
* Get the StreamDir enum based on the provided flags value.
*
* @param flags The flags.
* @return The corresponding StreamDir enum.
* @throws IllegalArgumentException if the provided value does not match any known StreamDir.
*/
public static StreamDir ofFlags(long flags) {
int v = 0;
if ((flags & 8192) == 8192) {
v += 1;
}
if ((flags & 16384) == 16384) {
v += 2;
}
return of(v);
}
}