[TSG-22767] fix: fix function exception

This commit is contained in:
chaochaoc
2024-10-21 14:37:26 +08:00
parent 4ef6c25e69
commit 16d71addda
6 changed files with 33 additions and 9 deletions

View File

@@ -7,7 +7,7 @@
<groupId>com.geedgenetworks.application</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>2.0-rc7</version>
<version>2.0-rc8</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -20,6 +20,7 @@ public class VoipUDFFactory implements UDFFactory {
put("HAS_EXTERNAL_IP_ADDRESS", new HasExternalIpAddress());
put("STREAM_DIR", new StreamDir());
put("STREAM_DIR_SET", new StreamDirSet());
put("FIND_NOT_BLANK", new FindNotBlank());
put("SORT_ADDRESS", new SortAddress());

View File

@@ -2,6 +2,7 @@ package com.geedgenetworks.flink.easy.application.voip.udf;
import com.google.common.collect.Lists;
import com.zdjizhi.utils.IPUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
@@ -17,6 +18,9 @@ public class SortAddress extends ScalarFunction {
public static String of(
Tuple2<String, Integer> a1, Tuple2<String, Integer> a2) {
var list = Lists.newArrayList(a1, a2);
if (a1.f1 == null || a2.f1 == null || StringUtils.isAnyEmpty(a1.f0, a2.f0)) {
return "";
}
list.sort((a, b) -> {
if (a.f1.equals(b.f1)) {
return Long.compare(

View File

@@ -18,9 +18,4 @@ public class StreamDir extends ScalarFunction {
}
return v;
}
public static void main(String[] args) {
System.out.println(8192L + 16384L);
System.out.println(new StreamDir().eval(8192L + 16384L));
}
}

View File

@@ -0,0 +1,22 @@
package com.geedgenetworks.flink.easy.application.voip.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class StreamDirSet extends ScalarFunction {
public @DataTypeHint("BIGINT") Long eval(Long flags) {
if (flags == null) {
return 8192 + 16384L;
}
Long r = null;
if ((flags & 8192) == 0) {
r += 8192;
}
if ((flags & 16384) == 0) {
r += 16384;
}
return r;
}
}

View File

@@ -335,13 +335,13 @@ pipeline:
splits:
# Invalid ip or port
- name: error1-records
where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0
where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port.isNull || client_port <= 0 || server_port.isNull || server_port <= 0
# Invalid stream dir
- name: error2-records
where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
# Invalid: SIP one-way stream and has invalid network address
- name: error3-records
where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 )
where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port.isNull || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port.isNull && sip_responder_sdp_media_port <= 0 )
- name: error4-records
where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
@@ -643,7 +643,9 @@ pipeline:
- if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags)
then:
- |-
OUTPUT ok FROM withColumns(recv_time to sip_call_id),
OUTPUT ok FROM withColumns(recv_time to t_vsys_id),
STREAM_DIR_SET(flags) AS flags,
withColumns(flags_identify_info to sip_call_id),
FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description,
FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description,
FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,