diff --git a/pom.xml b/pom.xml
index 75c3b5b..506aad1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,8 +62,8 @@
- com.zdjizhi
- galaxy
+ cn.hutool
+ hutool-all
xyz.downgoon
@@ -239,23 +239,9 @@
- com.zdjizhi
- galaxy
- 1.1.3
-
-
- org.slf4j
- slf4j-log4j12
-
-
- log4j
- log4j
-
-
- org.apache.commons
- commons-lang3
-
-
+ cn.hutool
+ hutool-all
+ 5.8.32
@@ -588,7 +574,8 @@
${project.groupId}
easy-stream-common
${project.version}
- ${project.distributionManagement.repository.url}
+ ${project.distributionManagement.repository.url}
+
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
index d718a14..4e1622a 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java
@@ -22,6 +22,7 @@ public class VoipUDFFactory implements UDFFactory {
put("STREAM_DIR", new StreamDir());
put("STREAM_DIR_SET", new StreamDirSet());
put("FIND_NOT_BLANK", new FindNotBlank());
+ put("DISTINCT_CONCAT", new DistinctConcat());
put("SORT_ADDRESS", new SortAddress());
put("SNOWFLAKE_ID", new SnowflakeID());
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/DistinctConcat.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/DistinctConcat.java
new file mode 100644
index 0000000..2c3914a
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/DistinctConcat.java
@@ -0,0 +1,18 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DistinctConcat extends ScalarFunction {
+
+ public @DataTypeHint("STRING") String eval(String s1, String s2) {
+ return Stream.of(s1, s2).filter(StringUtils::isNotBlank)
+ .map(StringUtils::trim)
+ .distinct()
+ .collect(Collectors.joining(","));
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java
index 7dddbc7..93e013b 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/HasIpAddress.java
@@ -1,17 +1,17 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
-import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.functions.ScalarFunction;
-public class HasIpAddress extends ScalarFunction {
+public class HasIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String... ipaddr) {
if (null == ipaddr) {
return false;
}
for (var ip : ipaddr) {
- return ip != null && IPUtil.isIPAddress(ip);
+ if (ip != null && isIpAddress(ip)) {
+ return true;
+ }
}
return false;
}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IpAddressScalarFunction.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IpAddressScalarFunction.java
new file mode 100644
index 0000000..2f0066f
--- /dev/null
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IpAddressScalarFunction.java
@@ -0,0 +1,25 @@
+package com.geedgenetworks.flink.easy.application.voip.udf;
+
+import cn.hutool.core.lang.Validator;
+import cn.hutool.core.net.NetUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.table.functions.ScalarFunction;
+
+public abstract class IpAddressScalarFunction extends ScalarFunction {
+
+ protected static boolean isIpAddress(String ipaddr) {
+ return isIpv4(ipaddr) || isIpv6(ipaddr);
+ }
+
+ protected static boolean isIpv4(String ipaddr) {
+ return Validator.isIpv4(StringUtils.trim(ipaddr));
+ }
+
+ protected static boolean isIpv6(String ipaddr) {
+ return Validator.isIpv6(StringUtils.trim(ipaddr));
+ }
+
+ protected static boolean isInternalIpAddress(String ipaddr) {
+ return NetUtil.isInnerIP(StringUtils.trim(ipaddr));
+ }
+}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java
index a2970a6..8e3eeda 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsExternalIpAddress.java
@@ -1,17 +1,13 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
-import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.functions.ScalarFunction;
-import static com.zdjizhi.utils.IPUtil.isIPAddress;
-
-public class IsExternalIpAddress extends ScalarFunction {
+public class IsExternalIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
- if (ipaddr == null || !isIPAddress(ipaddr)) {
+ if (ipaddr == null || !isIpAddress(ipaddr)) {
return false;
}
- return !IPUtil.internalIp(ipaddr);
+ return !isInternalIpAddress(ipaddr);
}
}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java
index 55839ba..f485c60 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsInternalIpAddress.java
@@ -1,17 +1,13 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
-import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.functions.ScalarFunction;
-import static com.zdjizhi.utils.IPUtil.isIPAddress;
-
-public class IsInternalIpAddress extends ScalarFunction {
+public class IsInternalIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
- if (!isIPAddress(ipaddr)) {
+ if (!isIpAddress(ipaddr)) {
return false;
}
- return IPUtil.internalIp(ipaddr);
+ return isInternalIpAddress(ipaddr);
}
}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java
index cbef834..262f855 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/IsIpAddress.java
@@ -1,15 +1,13 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
-import com.zdjizhi.utils.IPUtil;
import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.functions.ScalarFunction;
-public class IsIpAddress extends ScalarFunction {
+public class IsIpAddress extends IpAddressScalarFunction {
public @DataTypeHint("BOOLEAN") Boolean eval(String ipaddr) {
if (null == ipaddr) {
return false;
}
- return IPUtil.isIPAddress(ipaddr);
+ return isIpAddress(ipaddr);
}
}
diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
index 13ab4a6..49380be 100644
--- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
+++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java
@@ -1,13 +1,14 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
-import com.google.common.collect.Lists;
-import com.zdjizhi.utils.IPUtil;
+import cn.hutool.core.net.NetUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.functions.ScalarFunction;
-public class SortAddress extends ScalarFunction {
+import java.util.ArrayList;
+import java.util.List;
+
+public class SortAddress extends IpAddressScalarFunction {
public @DataTypeHint("STRING")
String eval(
@@ -17,14 +18,14 @@ public class SortAddress extends ScalarFunction {
public static String of(
Tuple2 a1, Tuple2 a2) {
- var list = Lists.newArrayList(a1, a2);
- if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)) {
- return "";
+ var list = new ArrayList<>(List.of(a1, a2));
+ if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)
+ || !isIpAddress(a1.f0) || !isIpAddress(a2.f0)) {
+ return a1.f0 + ":" + a1.f1 + "," + a2.f0 + ":" + a2.f1;
}
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
- return Long.compare(
- IPUtil.getIpDesimal(a.f0), IPUtil.getIpDesimal(b.f0));
+ return compareAddress(a.f0, b.f0);
} else {
return a.f1.compareTo(b.f1);
}
@@ -32,4 +33,16 @@ public class SortAddress extends ScalarFunction {
return String.format("%s:%s,%s:%s",
list.get(0).f0, list.get(0).f1, list.get(1).f0, list.get(1).f1);
}
+
+ private static int compareAddress(String ip1, String ip2) {
+ try {
+ var v1 = isIpv4(ip1) ? NetUtil.ipv4ToLong(ip1) :
+ NetUtil.ipv6ToBigInteger(ip1).longValue();
+ var v2 = isIpv4(ip2) ? NetUtil.ipv4ToLong(ip2) :
+ NetUtil.ipv6ToBigInteger(ip2).longValue();
+ return Long.compare(v1, v2);
+ } catch (Exception e) {
+ return 0;
+ }
+ }
}
\ No newline at end of file
diff --git a/src/main/resources/jobs/job.yml b/src/main/resources/jobs/job.yml
index cd4bbdf..46fe2df 100644
--- a/src/main/resources/jobs/job.yml
+++ b/src/main/resources/jobs/job.yml
@@ -16,7 +16,7 @@ source:
group.id: sip-rtp-correlation
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
- sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="example" password="example";
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="EXAMPLE-KAFKA-USERNAME" password="EXAMPLE-KAFKA-PASSWORD";
format: json
schema:
## General
@@ -301,7 +301,7 @@ sink:
bootstrap.servers: localhost:9092
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
- sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="example" password="example";
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="EXAMPLE-KAFKA-USERNAME" password="EXAMPLE-KAFKA-PASSWORD";
format: json
# 关联成功的 VOIP
- name: only-voip-records
@@ -313,7 +313,7 @@ sink:
bootstrap.servers: localhost:9092
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
- sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="example" password="example";
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="EXAMPLE-KAFKA-USERNAME" password="EXAMPLE-KAFKA-PASSWORD";
format: json
# 没有关联成功的 SIP 和 RTP
- name: fusion-fail-records
@@ -325,7 +325,7 @@ sink:
bootstrap.servers: localhost:9092
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
- sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="example" password="example";
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="EXAMPLE-KAFKA-USERNAME" password="EXAMPLE-KAFKA-PASSWORD";
format: json
pipeline:
@@ -345,7 +345,7 @@ pipeline:
- name: error4-records
where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
- ### Notes: If internal IP address correlate is needed, please uncomment the following two items
+ ### Notes: If internal IP address correlate is needed, please comment the following two items
# # Invalid: SIP one-way stream and internal network address
# - name: internal-error1-records
# where: decoded_as == 'SIP' && NOT(HAS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip))
@@ -643,9 +643,17 @@ pipeline:
- if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags)
then:
- |-
- OUTPUT ok FROM withColumns(recv_time to t_vsys_id),
+ OUTPUT ok FROM withColumns(recv_time to device_tag),
+ DISTINCT_CONCAT(@v1.$data_center, data_center) AS data_center,
+ DISTINCT_CONCAT(@v1.$device_group, device_group) AS device_group,
+ withColumns(sled_ip to t_vsys_id),
STREAM_DIR_SET(flags) AS flags,
- withColumns(flags_identify_info to sip_call_id),
+ withColumns(flags_identify_info to decoded_path),
+ @v1.$sent_pkts + sent_pkts AS sent_pkts,
+ @v1.$received_pkts + received_pkts AS received_pkts,
+ @v1.$sent_bytes + sent_bytes AS sent_bytes,
+ @v1.$received_bytes + received_bytes AS received_bytes ,
+ withColumns(tcp_c2s_ip_fragments to sip_call_id),
FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description,
FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description,
FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,
@@ -674,7 +682,7 @@ pipeline:
then:
- |-
OUTPUT ok FROM withColumns(recv_time to rtp_originator_dir)
- - SCHEDULING USING EVENT TIME FOR NOW + 60 * 1000
+ - SCHEDULING USING PROCESS TIME FOR NOW + 60 * 1000
schedule:
- if: '@v1.isNotNull'
then:
@@ -1383,14 +1391,14 @@ pipeline:
@i.$out_link_id AS out_link_id,
@i.$in_link_id AS in_link_id,
@i.$device_tag AS device_tag,
- @i.$data_center AS data_center,
- @i.$device_group AS device_group,
+ DISTINCT_CONCAT(@i.$data_center, data_center) AS data_center,
+ DISTINCT_CONCAT(@i.$device_group, device_group) AS device_group,
@i.$sled_ip AS sled_ip,
@i.$address_type AS address_type,
@i.$direction AS direction,
@i.$vsys_id AS vsys_id,
@i.$t_vsys_id AS t_vsys_id,
- @i.$flags AS flags,
+ flags AS flags,
@i.$flags_identify_info AS flags_identify_info,
@i.$c2s_ttl AS c2s_ttl,
@@ -1418,10 +1426,10 @@ pipeline:
@i.$ip_protocol AS ip_protocol,
- @i.$sent_pkts AS sent_pkts,
- @i.$received_pkts AS received_pkts,
- @i.$sent_bytes AS sent_bytes,
- @i.$received_bytes AS received_bytes,
+ @i.$sent_pkts + sent_pkts AS sent_pkts,
+ @i.$received_pkts + received_pkts AS received_pkts,
+ @i.$sent_bytes + sent_bytes AS sent_bytes,
+ @i.$received_bytes + received_bytes AS received_bytes,
withColumns(sip_call_id to sip_bye_reason),
@@ -1462,8 +1470,8 @@ pipeline:
@i.$direction AS direction,
@i.$vsys_id AS vsys_id,
@i.$t_vsys_id AS t_vsys_id,
- @i.$flags AS flags,
- @i.$flags_identify_info AS flags_identify_info,
+ @sip.$flags AS flags,
+ @sip.$flags_identify_info AS flags_identify_info,
@i.$c2s_ttl AS c2s_ttl,
@i.$s2c_ttl AS s2c_ttl,
@@ -1490,10 +1498,10 @@ pipeline:
@i.$ip_protocol AS ip_protocol,
- @i.$sent_pkts AS sent_pkts,
- @i.$received_pkts AS received_pkts,
- @i.$sent_bytes AS sent_bytes,
- @i.$received_bytes AS received_bytes,
+ @i.$sent_pkts + sent_pkts AS sent_pkts,
+ @i.$received_pkts + received_pkts AS received_pkts,
+ @i.$sent_bytes + sent_bytes AS sent_bytes,
+ @i.$received_bytes + received_bytes AS received_bytes,
@sip.$sip_call_id AS sip_call_id,
@sip.$sip_originator_description AS sip_originator_description,
diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml
index 3647f17..995d482 100644
--- a/tools/maven/spotbugs-exclude.xml
+++ b/tools/maven/spotbugs-exclude.xml
@@ -6,6 +6,9 @@
+
+
+