Merge branch 'hotfix/duplicate-data' into 'master'

hotfix: handle duplicate data

See merge request galaxy/tsg_olap/sip-rtp-correlation!39
This commit is contained in:
梁超
2024-11-18 03:04:24 +00:00
4 changed files with 69547 additions and 6 deletions

View File

@@ -7,7 +7,7 @@
<groupId>com.geedgenetworks.application</groupId>
<artifactId>sip-rtp-correlation</artifactId>
<version>2.1</version>
<version>2.1.1</version>
<name>Flink : SIP-RTP : Correlation</name>

View File

@@ -825,6 +825,12 @@ pipeline:
- name: voip-fusion
category: CORRELATE
cache:
- name: sip_status
type: VALUE
ttl: 10 minute
schema:
- name: be_used
data-type: BOOLEAN NOT NULL
- name: sip
type: VALUE
ttl: 7 minute
@@ -1380,7 +1386,7 @@ pipeline:
- if: '@sip.isNotNull'
then:
- |-
OUTPUT fail FROM @sip.$recv_time AS recv_time,
OUTPUT fail-sip-because-duplicate FROM @sip.$recv_time AS recv_time,
@sip.$log_id AS log_id,
@sip.$decoded_as AS decoded_as,
@sip.$session_id AS session_id,
@@ -1575,6 +1581,7 @@ pipeline:
@i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@i.$rtp_pcap_path AS rtp_pcap_path,
( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
- SET sip_status FROM true AS be_used
- TRUNCATE rtp
# TODO USE EVENT
- SCHEDULING USING PROCESS TIME FOR NOW + 6 * 60 * 1000
@@ -1662,12 +1669,14 @@ pipeline:
@i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@i.$rtp_pcap_path AS rtp_pcap_path,
( @i.$client_ip == @sip.$sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == @sip.$sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
- SET sip_status FROM true AS be_used
- TRUNCATE rtp
- SCHEDULING USING PROCESS TIME FOR NOW + 6 * 60 * 1000
schedule:
- if: '@rtp.isNotNull && @rtp.cardinality > 0'
then:
- |-
FLAT OUTPUT fail FOR i IN rtp FROM @i.$recv_time AS recv_time,
FLAT OUTPUT fail-rtp FOR i IN rtp FROM @i.$recv_time AS recv_time,
@i.$log_id AS log_id,
@i.$decoded_as AS decoded_as,
@i.$session_id AS session_id,
@@ -1797,10 +1806,10 @@ pipeline:
@i.$rtp_pcap_path AS rtp_pcap_path,
@i.$rtp_originator_dir AS rtp_originator_dir
- TRUNCATE rtp
- if: '@sip.isNotNull'
- if: '@sip.isNotNull && ( @sip_status.isNull || !@sip_status.$be_used )'
then:
- |-
OUTPUT fail FROM @sip.$recv_time AS recv_time,
OUTPUT fail-sip FROM @sip.$recv_time AS recv_time,
@sip.$log_id AS log_id,
@sip.$decoded_as AS decoded_as,
@sip.$session_id AS session_id,
@@ -1934,7 +1943,10 @@ pipeline:
category: UNION
on:
- sip-double-way-records.fail # 没有双向关联成功的 SIP 单向流日志
- voip-fusion.fail # 没有关联上 SIP 的 RTP 日志 & 没关联上 RTP 的 DOUBLE SIP 日志
# 没有关联上 SIP 的 RTP 日志 & 没关联上 RTP 的 DOUBLE SIP 日志
- voip-fusion.fail-sip
- voip-fusion.fail-rtp
- voip-fusion.fail-sip-because-duplicate
- name: errors-records
category: UNION
on:

View File

@@ -0,0 +1,3 @@
#!/bin/bash
flink run -t yarn-per-job -Djobmanager.memory.process.size=1024m -Dtaskmanager.memory.process.size=4G -Dtaskmanager.numberOfTaskSlots=1 -Dtaskmanager.memory.framework.off-heap.size=256m -Dtaskmanager.memory.jvm-metaspace.size=256m -Dtaskmanager.memory.network.max=256m -Dyarn.application.name=voip-fusion -Drest.flamegraph.enabled=true -p 1 -d -c com.geedgenetworks.flink.easy.core.Runner /home/tsg/olap/flink/topology/sip-rtp-correlation/sip-rtp-correlation-2.1.jar /home/tsg/olap/flink/topology/sip-rtp-correlation/job.yml

File diff suppressed because it is too large Load Diff