[GAL-602] refactor: add sink fail records

This commit is contained in:
chaochaoc
2024-06-28 10:21:02 +08:00
parent eab517c44f
commit 6164546dbb
2 changed files with 68 additions and 25 deletions

View File

@@ -111,6 +111,11 @@
<artifactId>easy-stream-correlate-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>
<artifactId>easy-stream-union-pipeline</artifactId>
<version>${easy.stream.version}</version>
</dependency>
<dependency>
<groupId>com.geedgenetworks.flink</groupId>

View File

@@ -1,25 +1,22 @@
job:
name: A Stream Example
parallelism: 1
name: correlation_sip_rtp_session
parallelism: 10
active-pipeline:
- console
- only-voip-records
- fusion-fail-records
- all-errors-records
source:
- name: session-records
type: kafka
option:
topic: SESSION-RECORD
topic: VOIP-RECORD
properties:
bootstrap.servers: 192.168.44.12:9092
group.id: easy-stream-tester9
client.id: easy-stream-tester9
# type: file
# option:
# path: E:\java-workspace\sip-rtp-correlation\feature\easy-refactor\src\main\resources\session-records.txt
# type: socket
# option:
# hostname: localhost
# port: 9999
bootstrap.servers: 192.168.44.12:9094
group.id: sip-rtp-correlation
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
format: json
schema:
## General
@@ -290,12 +287,45 @@ source:
data-type: STRING
- name: rtp_originator_dir
data-type: INT
pipeline:
- name: console
category: PRINT
# on: sip-records
sink:
- name: all-errors-records
type: kafka
on: errors-records
option:
topic: VOIP-CONVERSATTON-RECORD
properties:
bootstrap.servers: 192.168.44.12:9094
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
format: json
# 关联成功的 VOIP
- name: only-voip-records
on: voip-fusion.ok
use-err: true
type: kafka
option:
topic: VOIP-CONVERSATTON-RECORD
properties:
bootstrap.servers: 192.168.44.12:9094
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
format: json
# 没有关联成功的 SIP 和 RTP
- name: fusion-fail-records
on: cannot-fusion-records
type: kafka
option:
topic: VOIP-CONVERSATTON-RECORD
properties:
bootstrap.servers: 192.168.44.12:9094
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
format: json
pipeline:
- name: split-for-valid
category: SPLIT
on: session-records
@@ -1399,8 +1429,7 @@ pipeline:
@i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
@i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@i.$rtp_pcap_path AS rtp_pcap_path,
( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ),
@i.$rtp_originator_dir AS rtp_originator_dir
( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
- TRUNCATE rtp
- SCHEDULING USING EVENT TIME FOR NOW + 6 * 60 * 1000
- on: rtp-records
@@ -1486,8 +1515,7 @@ pipeline:
@i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
@i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@i.$rtp_pcap_path AS rtp_pcap_path,
( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ),
@i.$rtp_originator_dir AS rtp_originator_dir
( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
- SCHEDULING USING EVENT TIME FOR NOW + 6 * 60 * 1000
schedule:
- if: '@rtp.isNotNull && @rtp.cardinality > 0'
@@ -1754,5 +1782,15 @@ pipeline:
@sip.$rtp_pcap_path AS rtp_pcap_path,
@sip.$rtp_originator_dir AS rtp_originator_dir
- TRUNCATE sip
- name: cannot-fusion-records
category: UNION
on:
- sip-double-way-records.fail # 没有双向关联成功的 SIP 单向流日志
- voip-fusion.fail # 没有关联上 SIP 的 RTP 日志 & 没关联上 RTP 的 DOUBLE SIP 日志
- name: errors-records
category: UNION
on:
- error1-records
- error2-records
- error3-records
- error4-records