diff --git a/pom.xml b/pom.xml
index d3bc95b..04639d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,11 @@
easy-stream-correlate-pipeline
${easy.stream.version}
+
+ com.geedgenetworks.flink
+ easy-stream-union-pipeline
+ ${easy.stream.version}
+
com.geedgenetworks.flink
diff --git a/src/main/resources/job.yml b/src/main/resources/job.yml
index f44f17d..ffadcf2 100644
--- a/src/main/resources/job.yml
+++ b/src/main/resources/job.yml
@@ -1,25 +1,22 @@
job:
- name: A Stream Example
- parallelism: 1
+ name: correlation_sip_rtp_session
+ parallelism: 10
active-pipeline:
- - console
+ - only-voip-records
+ - fusion-fail-records
+ - all-errors-records
source:
- name: session-records
type: kafka
option:
- topic: SESSION-RECORD
+ topic: VOIP-RECORD
properties:
- bootstrap.servers: 192.168.44.12:9092
- group.id: easy-stream-tester9
- client.id: easy-stream-tester9
- # type: file
- # option:
- # path: E:\java-workspace\sip-rtp-correlation\feature\easy-refactor\src\main\resources\session-records.txt
-# type: socket
-# option:
-# hostname: localhost
-# port: 9999
+ bootstrap.servers: 192.168.44.12:9094
+ group.id: sip-rtp-correlation
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
format: json
schema:
## General
@@ -290,12 +287,45 @@ source:
data-type: STRING
- name: rtp_originator_dir
data-type: INT
-pipeline:
- - name: console
- category: PRINT
- # on: sip-records
+
+sink:
+ - name: all-errors-records
+ type: kafka
+ on: errors-records
+ option:
+ topic: VOIP-CONVERSATTON-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+ # 关联成功的 VOIP
+ - name: only-voip-records
on: voip-fusion.ok
- use-err: true
+ type: kafka
+ option:
+ topic: VOIP-CONVERSATTON-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+ # 没有关联成功的 SIP 和 RTP
+ - name: fusion-fail-records
+ on: cannot-fusion-records
+ type: kafka
+ option:
+ topic: VOIP-CONVERSATTON-RECORD
+ properties:
+ bootstrap.servers: 192.168.44.12:9094
+ security.protocol: SASL_PLAINTEXT
+ sasl.mechanism: PLAIN
+ sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="galaxy2019";
+ format: json
+
+pipeline:
- name: split-for-valid
category: SPLIT
on: session-records
@@ -1399,8 +1429,7 @@ pipeline:
@i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
@i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@i.$rtp_pcap_path AS rtp_pcap_path,
- ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ),
- @i.$rtp_originator_dir AS rtp_originator_dir
+ ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
- TRUNCATE rtp
- SCHEDULING USING EVENT TIME FOR NOW + 6 * 60 * 1000
- on: rtp-records
@@ -1486,8 +1515,7 @@ pipeline:
@i.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
@i.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@i.$rtp_pcap_path AS rtp_pcap_path,
- ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ),
- @i.$rtp_originator_dir AS rtp_originator_dir
+ ( @i.$client_ip == sip_originator_sdp_connect_ip).?(1, (@i.$client_ip == sip_responder_sdp_connect_ip).?(2, 0) ) AS rtp_originator_dir
- SCHEDULING USING EVENT TIME FOR NOW + 6 * 60 * 1000
schedule:
- if: '@rtp.isNotNull && @rtp.cardinality > 0'
@@ -1754,5 +1782,15 @@ pipeline:
@sip.$rtp_pcap_path AS rtp_pcap_path,
@sip.$rtp_originator_dir AS rtp_originator_dir
- TRUNCATE sip
-
-
+ - name: cannot-fusion-records
+ category: UNION
+ on:
+ - sip-double-way-records.fail # 没有双向关联成功的 SIP 单向流日志
+ - voip-fusion.fail # 没有关联上 SIP 的 RTP 日志 & 没关联上 RTP 的 DOUBLE SIP 日志
+ - name: errors-records
+ category: UNION
+ on:
+ - error1-records
+ - error2-records
+ - error3-records
+ - error4-records
\ No newline at end of file