[GAL-602] refactor: impl sip doube-ways correlate

This commit is contained in:
chaochaoc
2024-06-27 15:31:14 +08:00
parent 1ff8c985c7
commit c077c16a3a

View File

@@ -3,16 +3,24 @@ job:
parallelism: 1
active-pipeline:
- console
- console1
source:
- name: session-records
type: kafka
# type: kafka
# option:
# topic: SESSION-RECORD
# properties:
# bootstrap.servers: 192.168.44.12:9092
# group.id: easy-stream-tester9
# client.id: easy-stream-tester9
# type: file
# option:
# path: E:\java-workspace\sip-rtp-correlation\feature\easy-refactor\src\main\resources\session-records.txt
type: socket
option:
topic: SESSION-RECORD
properties:
bootstrap.servers: 192.168.44.12:9092
group.id: easy-stream-tester9
client.id: easy-stream-tester9
hostname: localhost
port: 9999
format: json
schema:
- name: session_id
@@ -49,6 +57,8 @@ source:
data-type: INT
- name: direction
data-type: INT
- name: vsys_id
data-type: BIGINT
- name: t_vsys_id
data-type: BIGINT
- name: flags
@@ -212,21 +222,380 @@ source:
pipeline:
- name: console
category: PRINT
on: split-for-error
- name: split-for-error
# on: sip-records
on: sip-double-way-records.ok
use-err: true
- name: console1
category: PRINT
on: sip-records
use-err: true
- name: split-for-valid
category: SPLIT
on: session-records
splits:
# Invalid stream dir
- name: error1-records
where: STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
# Invalid ip or port
- name: error2-records
- name: error1-records
where: NOT(IS_IP_ADDRESS(client_ip)) || NOT(IS_IP_ADDRESS(server_ip)) || client_port <= 0 || server_port <= 0
# - name: split-by-protocol
# category: SPLIT
# splits:
# - name: rtp-records
# where: "decoded_as == 'RTP'"
# - name: sip-records
# where: "decoded_as == 'SIP'"
# Invalid stream dir
- name: error2-records
where: decoded_as == 'SIP' &&STREAM_DIR(flags) != 1 && STREAM_DIR(flags) != 2 && STREAM_DIR(flags) != 3
# Invalid: SIP one-way stream and has invalid network address
- name: error3-records
where: decoded_as == 'SIP' && ( NOT(HAS_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip)) || sip_originator_sdp_media_port <= 0 || sip_responder_sdp_media_port <= 0 )
- name: error4-records
where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
### Notes: If internal IP address correlate is needed, please uncomment the following two items
# # Invalid: SIP one-way stream and internal network address
# - name: internal-error1-records
# where: decoded_as == 'SIP' && NOT(HAS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip, sip_responder_sdp_connect_ip))
# # Invalid: SIP double-way stream and internal network address
# - name: internal-error2-records
# where: decoded_as == 'SIP' && STREAM_DIR(flags) == 3 && ( NOT( IS_EXTERNAL_IP_ADDRESS(sip_originator_sdp_connect_ip) ) || NOT( IS_EXTERNAL_IP_ADDRESS(sip_responder_sdp_connect_ip) ) )
- name: split-by-protocol
category: SPLIT
on: split-for-valid
splits:
- name: rtp-records
where: decoded_as == 'RTP'
- name: sip-records
where: decoded_as == 'SIP'
- name: sip-double-way-records
category: CORRELATE
on: sip-records
cache:
- name: v1
type: VALUE
ttl: 1 minute
schema:
- name: session_id
data-type: BIGINT NOT NULL
- name: start_timestamp_ms
data-type: BIGINT NOT NULL
- name: start_timestamp
data-type: TIMESTAMP_LTZ(3)
- name: end_timestamp_ms
data-type: BIGINT NOT NULL
- name: decoded_as
data-type: STRING NOT NULL
- name: duration_ms
data-type: BIGINT NOT NULL
- name: tcp_handshake_latency_ms
data-type: BIGINT
- name: device_id
data-type: STRING NOT NULL
- name: out_link_id
data-type: BIGINT
- name: in_link_id
data-type: BIGINT
- name: device_tag
data-type: STRING
- name: data_center
data-type: STRING
- name: device_group
data-type: STRING
- name: sled_ip
data-type: STRING
- name: address_type
data-type: INT
- name: direction
data-type: INT
- name: vsys_id
data-type: BIGINT
- name: t_vsys_id
data-type: BIGINT
- name: flags
data-type: BIGINT
- name: flags_identify_info
data-type: STRING
## Treatment
- name: security_rule_list
data-type: ARRAY<BIGINT>
- name: security_action
data-type: STRING
- name: monitor_rule_list
data-type: ARRAY<BIGINT>
- name: shaping_rule_list
data-type: ARRAY<BIGINT>
- name: sc_rule_list
data-type: ARRAY<BIGINT>
- name: statistics_rule_list
data-type: ARRAY<BIGINT>
- name: sc_rsp_raw
data-type: ARRAY<BIGINT>
- name: sc_rsp_decrypted
data-type: ARRAY<BIGINT>
- name: proxy_rule_list
data-type: ARRAY<BIGINT>
- name: proxy_action
data-type: STRING
- name: proxy_pinning_status
data-type: INT
- name: proxy_intercept_status
data-type: INT
- name: proxy_passthrough_reason
data-type: STRING
- name: proxy_client_side_latency_ms
data-type: BIGINT
- name: proxy_server_side_latency_ms
data-type: BIGINT
- name: proxy_client_side_version
data-type: STRING
- name: proxy_server_side_version
data-type: STRING
- name: proxy_cert_verify
data-type: INT
- name: proxy_intercept_error
data-type: STRING
- name: monitor_mirrored_pkts
data-type: INT
- name: monitor_mirrored_bytes
data-type: INT
## Source
- name: client_ip
data-type: STRING
- name: client_port
data-type: INT
- name: client_os_desc
data-type: STRING
- name: client_geolocation
data-type: STRING
- name: client_country
data-type: STRING
- name: client_super_administrative_area
data-type: STRING
- name: client_administrative_area
data-type: STRING
- name: client_sub_administrative_area
data-type: STRING
- name: client_asn
data-type: BIGINT
- name: subscriber_id
data-type: STRING
- name: imei
data-type: STRING
- name: imsi
data-type: STRING
- name: phone_number
data-type: STRING
- name: apn
data-type: STRING
## Destination
- name: server_ip
data-type: STRING
- name: server_port
data-type: INT
- name: server_os_desc
data-type: STRING
- name: server_geolocation
data-type: STRING
- name: server_country
data-type: STRING
- name: server_super_administrative_area
data-type: STRING
- name: server_administrative_area
data-type: STRING
- name: server_sub_administrative_area
data-type: STRING
- name: server_asn
data-type: BIGINT
- name: server_fqdn
data-type: STRING
- name: server_domain
data-type: STRING
- name: fqdn_category_list
data-type: ARRAY<BIGINT>
## Application
- name: app_transition
data-type: STRING
- name: app
data-type: STRING
- name: app_debug_info
data-type: STRING
- name: app_content
data-type: STRING
- name: app_extra_info
data-type: STRING
## Protocol
- name: ip_protocol
data-type: STRING
- name: decoded_path
data-type: STRING
## SIP
- name: sip_call_id
data-type: STRING
- name: sip_originator_description
data-type: STRING
- name: sip_responder_description
data-type: STRING
- name: sip_user_agent
data-type: STRING
- name: sip_server
data-type: STRING
- name: sip_originator_sdp_connect_ip
data-type: STRING
- name: sip_originator_sdp_media_port
data-type: INT
- name: sip_originator_sdp_media_type
data-type: STRING
- name: sip_originator_sdp_content
data-type: STRING
- name: sip_responder_sdp_connect_ip
data-type: STRING
- name: sip_responder_sdp_media_port
data-type: INT
- name: sip_responder_sdp_media_type
data-type: STRING
- name: sip_responder_sdp_content
data-type: STRING
- name: sip_duration_s
data-type: INT
- name: sip_bye
data-type: STRING
## RTP
- name: rtp_payload_type_c2s
data-type: INT
- name: rtp_payload_type_s2c
data-type: INT
- name: rtp_pcap_path
data-type: STRING
- name: rtp_originator_dir
data-type: INT
where:
- on: sip-records
key-by: vsys_id, sip_call_id, SORT_ADDRESS( client_ip, client_port, server_ip, server_port )
process:
- if: STREAM_DIR(flags) != 3 && @v1.isNotNull && STREAM_DIR(@v1.$flags) != STREAM_DIR(flags)
then:
- |-
OUTPUT ok FROM session_id,
start_timestamp_ms,
withColumns(end_timestamp_ms to sip_call_id),
FIND_NOT_BLANK(@v1.$sip_originator_description, sip_originator_description) AS sip_originator_description,
FIND_NOT_BLANK(@v1.$sip_responder_description, sip_responder_description) AS sip_responder_description,
FIND_NOT_BLANK(@v1.$sip_user_agent, sip_user_agent) AS sip_user_agent,
FIND_NOT_BLANK(@v1.$sip_server, sip_server) AS sip_server,
FIND_NOT_BLANK(@v1.$sip_originator_sdp_connect_ip, sip_originator_sdp_connect_ip) AS sip_originator_sdp_connect_ip,
(@v1.$sip_originator_sdp_media_port > 0).?(@v1.$sip_originator_sdp_media_port, sip_originator_sdp_media_port) AS sip_originator_sdp_media_port,
FIND_NOT_BLANK(@v1.$sip_originator_sdp_media_type, sip_originator_sdp_media_type) AS sip_originator_sdp_media_type,
FIND_NOT_BLANK(@v1.$sip_originator_sdp_content, sip_originator_sdp_content) AS sip_originator_sdp_content,
FIND_NOT_BLANK(@v1.$sip_responder_sdp_connect_ip, sip_responder_sdp_connect_ip) AS sip_responder_sdp_connect_ip,
(@v1.$sip_responder_sdp_media_port > 0).?(@v1.$sip_responder_sdp_media_port, sip_responder_sdp_media_port) AS sip_responder_sdp_media_port,
FIND_NOT_BLANK(@v1.$sip_responder_sdp_media_type, sip_responder_sdp_media_type) AS sip_responder_sdp_media_type,
FIND_NOT_BLANK(@v1.$sip_responder_sdp_content, sip_responder_sdp_content) AS sip_responder_sdp_content,
@v1.$sip_duration_s + sip_duration_s AS sip_duration_s,
FIND_NOT_BLANK(@v1.$sip_bye, sip_bye) AS sip_bye,
rtp_payload_type_c2s,
rtp_payload_type_s2c,
rtp_pcap_path,
rtp_originator_dir
- TRUNCATE v1
- if: STREAM_DIR(flags) != 3 && @v1.isNull
then:
- |-
SET v1 FROM withColumns(session_id to rtp_originator_dir)
- if: STREAM_DIR(flags) == 3
then:
- |-
OUTPUT ok FROM session_id,
start_timestamp_ms, withColumns(end_timestamp_ms to rtp_originator_dir)
- SCHEDULING USING EVENT TIME FOR NOW + 60 * 1000
schedule:
- if: '@v1.isNotNull'
then:
- |-
OUTPUT fail FROM @v1.$session_id AS session_id,
@v1.$start_timestamp_ms AS start_timestamp_ms,
@v1.$end_timestamp_ms AS end_timestamp_ms,
@v1.$decoded_as AS decoded_as,
@v1.$duration_ms AS duration_ms,
@v1.$tcp_handshake_latency_ms AS tcp_handshake_latency_ms,
@v1.$device_id AS device_id,
@v1.$out_link_id AS out_link_id,
@v1.$in_link_id AS in_link_id,
@v1.$device_tag AS device_tag,
@v1.$data_center AS data_center,
@v1.$device_group AS device_group,
@v1.$sled_ip AS sled_ip,
@v1.$address_type AS address_type,
@v1.$direction AS direction,
@v1.$vsys_id AS vsys_id,
@v1.$t_vsys_id AS t_vsys_id,
@v1.$flags AS flags,
@v1.$flags_identify_info AS flags_identify_info,
@v1.$security_rule_list AS security_rule_list,
@v1.$security_action AS security_action,
@v1.$monitor_rule_list AS monitor_rule_list,
@v1.$shaping_rule_list AS shaping_rule_list,
@v1.$sc_rule_list AS sc_rule_list,
@v1.$statistics_rule_list AS statistics_rule_list,
@v1.$sc_rsp_raw AS sc_rsp_raw,
@v1.$sc_rsp_decrypted AS sc_rsp_decrypted,
@v1.$proxy_rule_list AS proxy_rule_list,
@v1.$proxy_action AS proxy_action,
@v1.$proxy_pinning_status AS proxy_pinning_status,
@v1.$proxy_intercept_status AS proxy_intercept_status,
@v1.$proxy_passthrough_reason AS proxy_passthrough_reason,
@v1.$proxy_client_side_latency_ms AS proxy_client_side_latency_ms,
@v1.$proxy_server_side_latency_ms AS proxy_server_side_latency_ms,
@v1.$proxy_client_side_version AS proxy_client_side_version,
@v1.$proxy_server_side_version AS proxy_server_side_version,
@v1.$proxy_cert_verify AS proxy_cert_verify,
@v1.$proxy_intercept_error AS proxy_intercept_error,
@v1.$monitor_mirrored_pkts AS monitor_mirrored_pkts,
@v1.$monitor_mirrored_bytes AS monitor_mirrored_bytes,
@v1.$client_ip AS client_ip,
@v1.$client_port AS client_port,
@v1.$client_os_desc AS client_os_desc,
@v1.$client_geolocation AS client_geolocation,
@v1.$client_country AS client_country,
@v1.$client_super_administrative_area AS client_super_administrative_area,
@v1.$client_administrative_area AS client_administrative_area,
@v1.$client_sub_administrative_area AS client_sub_administrative_area,
@v1.$client_asn AS client_asn,
@v1.$subscriber_id AS subscriber_id,
@v1.$imei AS imei,
@v1.$imsi AS imsi,
@v1.$phone_number AS phone_number,
@v1.$apn AS apn,
@v1.$server_ip AS server_ip,
@v1.$server_port AS server_port,
@v1.$server_os_desc AS server_os_desc,
@v1.$server_geolocation AS server_geolocation,
@v1.$server_country AS server_country,
@v1.$server_super_administrative_area AS server_super_administrative_area,
@v1.$server_administrative_area AS server_administrative_area,
@v1.$server_sub_administrative_area AS server_sub_administrative_area,
@v1.$server_asn AS server_asn,
@v1.$server_fqdn AS server_fqdn,
@v1.$server_domain AS server_domain,
@v1.$fqdn_category_list AS fqdn_category_list,
@v1.$app_transition AS app_transition,
@v1.$app AS app,
@v1.$app_debug_info AS app_debug_info,
@v1.$app_content AS app_content,
@v1.$app_extra_info AS app_extra_info,
@v1.$ip_protocol AS ip_protocol,
@v1.$decoded_path AS decoded_path,
@v1.$sip_call_id AS sip_call_id,
@v1.$sip_originator_description AS sip_originator_description,
@v1.$sip_responder_description AS sip_responder_description,
@v1.$sip_user_agent AS sip_user_agent,
@v1.$sip_server AS sip_server,
@v1.$sip_originator_sdp_connect_ip AS sip_originator_sdp_connect_ip,
@v1.$sip_originator_sdp_media_port AS sip_originator_sdp_media_port,
@v1.$sip_originator_sdp_media_type AS sip_originator_sdp_media_type,
@v1.$sip_originator_sdp_content AS sip_originator_sdp_content,
@v1.$sip_responder_sdp_connect_ip AS sip_responder_sdp_connect_ip,
@v1.$sip_responder_sdp_media_port AS sip_responder_sdp_media_port,
@v1.$sip_responder_sdp_media_type AS sip_responder_sdp_media_type,
@v1.$sip_responder_sdp_content AS sip_responder_sdp_content,
@v1.$sip_duration_s AS sip_duration_s,
@v1.$sip_bye AS sip_bye,
@v1.$rtp_payload_type_c2s AS rtp_payload_type_c2s,
@v1.$rtp_payload_type_s2c AS rtp_payload_type_s2c,
@v1.$rtp_pcap_path AS rtp_pcap_path,
@v1.$rtp_originator_dir AS rtp_originator_dir
- TRUNCATE v1