diff --git a/pom.xml b/pom.xml index 1ac6d36..75c3b5b 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.geedgenetworks.application sip-rtp-correlation - 2.0-rc7 + 2.0-rc8 Flink : SIP-RTP : Correlation diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java index 20d0746..d718a14 100644 --- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/VoipUDFFactory.java @@ -20,6 +20,7 @@ public class VoipUDFFactory implements UDFFactory { put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress()); put("STREAM_DIR", new StreamDir()); + put("STREAM_DIR_SET", new StreamDirSet()); put("FIND_NOT_BLANK", new FindNotBlank()); put("SORT_ADDRESS", new SortAddress()); diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java index ab422b7..13ab4a6 100644 --- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/SortAddress.java @@ -2,6 +2,7 @@ package com.geedgenetworks.flink.easy.application.voip.udf; import com.google.common.collect.Lists; import com.zdjizhi.utils.IPUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.functions.ScalarFunction; @@ -17,6 +18,9 @@ public class SortAddress extends ScalarFunction { public static String of( Tuple2 a1, Tuple2 a2) { var list = Lists.newArrayList(a1, a2); + if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)) { + return ""; + } list.sort((a, b) -> { if (a.f1.equals(b.f1)) { return Long.compare( diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java index 3f5166f..e571cf4 100644 --- a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDir.java @@ -18,9 +18,4 @@ public class StreamDir extends ScalarFunction { } return v; } - - public static void main(String[] args) { - System.out.println(8192L + 16384L); - System.out.println(new StreamDir().eval(8192L + 16384L)); - } } \ No newline at end of file diff --git a/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java new file mode 100644 index 0000000..de1d727 --- /dev/null +++ b/src/main/java/com/geedgenetworks/flink/easy/application/voip/udf/StreamDirSet.java @@ -0,0 +1,22 @@ +package com.geedgenetworks.flink.easy.application.voip.udf; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; + +public class StreamDirSet extends ScalarFunction { + + public @DataTypeHint("BIGINT") Long eval(Long flags) { + if (flags == null) { + return 8192 + 16384L; + } + Long r = null; + if ((flags & 8192) == 0) { + r += 8192; + } + if ((flags & 16384) == 0) { + r += 16384; + } + return r; + } + +} diff --git a/src/main/resources/jobs/job.yml b/src/main/resources/jobs/job.yml index 2789c11..cd4bbdf 100644 --- a/src/main/resources/jobs/job.yml +++ b/src/main/resources/jobs/job.yml @@ -335,13 +335,13 @@ pipeline: splits: # Invalid ip or port - name: error1-records - where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0 + where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port.isNull || client_port <= 0 || server_port.isNull || server_port <= 0 # Invalid stream dir - name: error2-records where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3 # Invalid: SIP one-way stream and has invalid network address - name: error3-records - where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 ) + where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port.isNull || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port.isNull && sip_responder_sdp_media_port <= 0 ) - name: error4-records where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) ) @@ -643,7 +643,9 @@ pipeline: - if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags) then: - |- - OUTPUT ok FROM withColumns(recv_time to sip_call_id), + OUTPUT ok FROM withColumns(recv_time to t_vsys_id), + STREAM_DIR_SET(flags) AS flags, + withColumns(flags_identify_info to sip_call_id), FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description, FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description, FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,