Compare commits
5 Commits
V23.09-239
...
feature/se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d544eb8752 | ||
|
|
4962a40f97 | ||
|
|
ed07a3dbfb | ||
|
|
4cda43724d | ||
|
|
68b4805c4f |
4
pom.xml
4
pom.xml
@@ -5,8 +5,8 @@
|
|||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<groupId>com.zdjizhi</groupId>
|
<groupId>com.zdjizhi</groupId>
|
||||||
<artifactId>log-completion-schema</artifactId>
|
<artifactId>log-stream-doublewrite</artifactId>
|
||||||
<version>230907</version>
|
<version>24.01</version>
|
||||||
|
|
||||||
<name>log-completion-schema</name>
|
<name>log-completion-schema</name>
|
||||||
<url>http://www.example.com</url>
|
<url>http://www.example.com</url>
|
||||||
|
|||||||
35
properties/action_definition.properties
Normal file
35
properties/action_definition.properties
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
none=0
|
||||||
|
Monitor=1
|
||||||
|
monitor=1
|
||||||
|
Intercept=2
|
||||||
|
intercept=2
|
||||||
|
NoIntercept=3
|
||||||
|
nointercept=3
|
||||||
|
ActiveDefence=4
|
||||||
|
activedefence=4
|
||||||
|
WANNAT=8
|
||||||
|
wannat=8
|
||||||
|
Reject=16
|
||||||
|
reject=16
|
||||||
|
Deny=16
|
||||||
|
deny=16
|
||||||
|
Shaping=32
|
||||||
|
shaping=32
|
||||||
|
Manipulate=48
|
||||||
|
manipulate=48
|
||||||
|
ServiceChaining=64
|
||||||
|
servicechaining=64
|
||||||
|
Allow=96
|
||||||
|
allow=96
|
||||||
|
Bypass=96
|
||||||
|
bypass=96
|
||||||
|
Shunt=128
|
||||||
|
shunt=128
|
||||||
|
Statistics=129
|
||||||
|
statistics=129
|
||||||
|
redirect=48
|
||||||
|
replace=48
|
||||||
|
hijack=48
|
||||||
|
insert=48
|
||||||
|
edit_element=48
|
||||||
|
run_script=48
|
||||||
68
properties/percent_proxy_event.properties
Normal file
68
properties/percent_proxy_event.properties
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
#session-record
|
||||||
|
tcp_c2s_ip_fragments=common_c2s_ipfrag_num
|
||||||
|
tcp_s2c_ip_fragments=common_s2c_ipfrag_num
|
||||||
|
tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num
|
||||||
|
tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num
|
||||||
|
http_response_latency_ms=http_response_latency_ms
|
||||||
|
http_session_duration_ms=http_session_duration_ms
|
||||||
|
security_rule_list=security_rule_list
|
||||||
|
monitor_rule_list=monitor_rule_list
|
||||||
|
tcp_handshake_latency_ms=common_establish_latency_ms
|
||||||
|
mail_protocol_type=mail_protocol_type
|
||||||
|
mail_account=mail_account
|
||||||
|
mail_password=mail_passwd
|
||||||
|
mail_from_cmd=mail_from_cmd
|
||||||
|
mail_to_cmd=mail_to_cmd
|
||||||
|
mail_from=mail_from
|
||||||
|
mail_to=mail_to
|
||||||
|
mail_cc=mail_cc
|
||||||
|
mail_bcc=mail_bcc
|
||||||
|
mail_subject=mail_subject
|
||||||
|
mail_subject_charset=mail_subject_charset
|
||||||
|
mail_attachment_name=mail_attachment_name
|
||||||
|
mail_attachment_name_charset=mail_attachment_name_charset
|
||||||
|
mail_eml_file=mail_eml_file
|
||||||
|
dns_message_id=dns_message_id
|
||||||
|
dns_qr=dns_qr
|
||||||
|
dns_opcode=dns_opcode
|
||||||
|
dns_aa=dns_aa
|
||||||
|
dns_tc=dns_tc
|
||||||
|
dns_rd=dns_rd
|
||||||
|
dns_ra=dns_ra
|
||||||
|
dns_rcode=dns_rcode
|
||||||
|
dns_qdcount=dns_qdcount
|
||||||
|
dns_ancount=dns_ancount
|
||||||
|
dns_nscount=dns_nscount
|
||||||
|
dns_arcount=dns_arcount
|
||||||
|
dns_qname=dns_qname
|
||||||
|
dns_qtype=dns_qtype
|
||||||
|
dns_qclass=dns_qclass
|
||||||
|
dns_cname=dns_cname
|
||||||
|
dns_sub=dns_sub
|
||||||
|
dns_rr=dns_rr
|
||||||
|
ssl_version=ssl_version
|
||||||
|
ssl_sni=ssl_sni
|
||||||
|
ssl_san=ssl_san
|
||||||
|
ssl_cn=ssl_cn
|
||||||
|
ssl_handshake_latency_ms=ssl_con_latency_ms
|
||||||
|
ssl_ja3_hash=ssl_ja3_hash
|
||||||
|
ssl_cert_issuer=ssl_cert_issuer
|
||||||
|
ssl_cert_subject=ssl_cert_subject
|
||||||
|
quic_version=quic_version
|
||||||
|
quic_sni=quic_sni
|
||||||
|
quic_user_agent=quic_user_agent
|
||||||
|
ftp_account=ftp_account
|
||||||
|
ftp_url=ftp_url
|
||||||
|
ftp_link_type=ftp_link_type
|
||||||
|
http_proxy_flag=http_proxy_flag
|
||||||
|
http_sequence=http_sequence
|
||||||
|
tcp_client_isn=common_tcp_client_isn
|
||||||
|
tcp_server_isn=common_tcp_server_isn
|
||||||
|
sent_pkts=common_c2s_pkt_num
|
||||||
|
received_pkts=common_s2c_pkt_num
|
||||||
|
app=common_app_label
|
||||||
|
out_link_id=common_egress_link_id
|
||||||
|
in_link_id=common_ingress_link_id
|
||||||
|
duration_ms=common_con_duration_ms
|
||||||
|
http_request_line=http_request_line
|
||||||
|
http_response_line=http_response_line
|
||||||
4
properties/percent_security_event.properties
Normal file
4
properties/percent_security_event.properties
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
tcp_c2s_ip_fragments=common_c2s_ipfrag_num
|
||||||
|
tcp_s2c_ip_fragments=common_s2c_ipfrag_num
|
||||||
|
tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num
|
||||||
|
tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num
|
||||||
42
properties/percent_session_record.properties
Normal file
42
properties/percent_session_record.properties
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
#security-event
|
||||||
|
http_request_body=http_request_body
|
||||||
|
http_response_body=http_response_body
|
||||||
|
http_response_latency_ms=http_response_latency_ms
|
||||||
|
http_session_duration_ms=http_session_duration_ms
|
||||||
|
security_rule_list=security_rule_list
|
||||||
|
monitor_rule_list=monitor_rule_list
|
||||||
|
tcp_handshake_latency_ms=common_establish_latency_ms
|
||||||
|
#proxy-event
|
||||||
|
http_action_file_size=http_action_file_size
|
||||||
|
doh_url=doh_url
|
||||||
|
doh_host=doh_host
|
||||||
|
doh_cookie=doh_cookie
|
||||||
|
doh_referer=doh_referer
|
||||||
|
doh_user_agent=doh_user_agent
|
||||||
|
doh_version=doh_version
|
||||||
|
doh_message_id=doh_message_id
|
||||||
|
doh_qr=doh_qr
|
||||||
|
doh_opcode=doh_opcode
|
||||||
|
doh_aa=doh_aa
|
||||||
|
doh_tc=doh_tc
|
||||||
|
doh_rd=doh_rd
|
||||||
|
doh_ra=doh_ra
|
||||||
|
doh_rcode=doh_rcode
|
||||||
|
doh_qdcount=doh_qdcount
|
||||||
|
doh_ancount=doh_ancount
|
||||||
|
doh_nscount=doh_nscount
|
||||||
|
doh_arcount=doh_arcount
|
||||||
|
doh_qname=doh_qname
|
||||||
|
doh_qtype=doh_qtype
|
||||||
|
doh_qclass=doh_qclass
|
||||||
|
doh_cname=doh_cname
|
||||||
|
doh_sub=doh_sub
|
||||||
|
doh_rr=doh_rr
|
||||||
|
proxy_rule_list=proxy_rule_list
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
#--------------------------------地址配置------------------------------#
|
#--------------------------------地址配置------------------------------#
|
||||||
#管理kafka地址
|
#管理kafka地址
|
||||||
source.kafka.servers=192.168.44.12:9094
|
source.kafka.servers=192.168.44.12:9094
|
||||||
|
#source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
|
||||||
#百分点输出kafka地址
|
#百分点输出kafka地址
|
||||||
percent.sink.kafka.servers=192.168.44.12:9094
|
percent.sink.kafka.servers=192.168.44.12:9094
|
||||||
#文件源数据topic输出kafka地址
|
#文件源数据topic输出kafka地址
|
||||||
@@ -18,20 +18,22 @@ tools.library=D:\\workerspace\\dat\\
|
|||||||
|
|
||||||
#--------------------------------nacos配置------------------------------#
|
#--------------------------------nacos配置------------------------------#
|
||||||
#nacos 地址
|
#nacos 地址
|
||||||
nacos.server=192.168.44.67:8848
|
nacos.server=192.168.44.12:8848
|
||||||
|
|
||||||
#schema namespace名称
|
#schema namespace名称
|
||||||
nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb
|
nacos.schema.namespace=P19
|
||||||
|
|
||||||
#schema data id名称
|
#schema data id名称
|
||||||
nacos.schema.data.id=session_record.json
|
nacos.schema.data.id=session_record.json
|
||||||
|
|
||||||
#--------------------------------Kafka消费/生产配置------------------------------#
|
#--------------------------------Kafka消费/生产配置------------------------------#
|
||||||
#kafka 接收数据topic
|
#kafka 接收数据topic
|
||||||
source.kafka.topic=test
|
source.kafka.topic=SESSION-RECORD
|
||||||
|
|
||||||
|
|
||||||
sink.percent.kafka.topic=PERCENT-RECORD
|
sink.percent.kafka.topic.session=PERCENT-SESSION-RECORD
|
||||||
|
sink.percent.kafka.topic.security=PERCENT-SECURITY-RECORD
|
||||||
|
sink.percent.kafka.topic.proxy=PERCENT-POLICY-RECORD
|
||||||
sink.file.data.kafka.topic=test-file-data
|
sink.file.data.kafka.topic=test-file-data
|
||||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||||
|
|
||||||
@@ -46,22 +48,31 @@ transform.parallelism=1
|
|||||||
|
|
||||||
deal.file.parallelism=1
|
deal.file.parallelism=1
|
||||||
sink.file.data.parallelism=1
|
sink.file.data.parallelism=1
|
||||||
sink.percent.parallelism=1
|
sink.percent.session.parallelism=1
|
||||||
|
sink.percent.security.parallelism=1
|
||||||
|
sink.percent.proxy.parallelism=1
|
||||||
#数据中心,取值范围(0-31)
|
#数据中心,取值范围(0-31)
|
||||||
data.center.id.num=0
|
data.center.id.num=0
|
||||||
|
|
||||||
#hbase 更新时间,如填写0则不更新缓存
|
#hbase 更新时间,如填写0则不更新缓存
|
||||||
hbase.tick.tuple.freq.secs=180
|
hbase.tick.tuple.freq.secs=180
|
||||||
|
|
||||||
#--------------------------------默认值配置------------------------------#
|
#--------------------------------默认值配置------------------------------#
|
||||||
#生产者压缩模式 none or snappy
|
#生产者压缩模式 none or snappy
|
||||||
producer.kafka.compression.type=snappy
|
producer.kafka.compression.type=snappy
|
||||||
|
|
||||||
#------------------------------------OOS配置------------------------------------#
|
#------------------------------------OOS配置------------------------------------#
|
||||||
#oos地址
|
#oos地址
|
||||||
oos.servers=10.3.45.100:8057
|
oos.servers=172.18.10.153:8058,172.18.10.154:8058,172.18.10.155:8058,172.18.10.156:8058,172.18.10.157:8058
|
||||||
#prometheus-httpserver
|
#prometheus-httpserver
|
||||||
prometheus.pushgateway.address=192.168.44.12:9091
|
prometheus.pushgateway.address=192.168.44.12:9091
|
||||||
pushgateway.statistics.time=300
|
pushgateway.statistics.time=300
|
||||||
deal.file.statistics.time=60
|
deal.file.statistics.time=60
|
||||||
|
#------------------------------------knowledge配置------------------------------------#
|
||||||
|
knowledge.execution.minutes=600
|
||||||
|
knowledge.base.uri=http://192.168.44.12:9999
|
||||||
|
knowledge.base.path=/v1/knowledge_base
|
||||||
|
ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289
|
||||||
|
ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9
|
||||||
|
asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea
|
||||||
|
hos.url=http://192.168.44.12:9098/hos/traffic_file_bucket/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
134
properties/session_record_mapping_table.properties
Normal file
134
properties/session_record_mapping_table.properties
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
recv_time=common_recv_time
|
||||||
|
log_id=common_log_id
|
||||||
|
start_timestamp_ms=common_start_timestamp_ms
|
||||||
|
end_timestamp_ms=common_end_timestamp_ms
|
||||||
|
duration_ms=common_con_duration_ms
|
||||||
|
processing_time=common_processing_time
|
||||||
|
device_id=common_device_id
|
||||||
|
data_center=common_data_center
|
||||||
|
sled_ip=common_sled_ip
|
||||||
|
device_tag=common_device_tag
|
||||||
|
client_ip=common_client_ip
|
||||||
|
client_port=common_client_port
|
||||||
|
client_asn=common_client_asn
|
||||||
|
subscriber_id=common_subscriber_id
|
||||||
|
imei=common_imei
|
||||||
|
imsi=common_imsi
|
||||||
|
phone_number=common_phone_number
|
||||||
|
server_ip=common_server_ip
|
||||||
|
server_port=common_server_port
|
||||||
|
server_asn=common_server_asn
|
||||||
|
address_type=common_address_type
|
||||||
|
out_link_id=common_egress_link_id
|
||||||
|
in_link_id=common_ingress_link_id
|
||||||
|
client_geolocation=common_client_location
|
||||||
|
server_geolocation=common_server_location
|
||||||
|
app=common_app_label
|
||||||
|
ip_protocol=common_l4_protocol
|
||||||
|
sent_pkts=common_c2s_pkt_num
|
||||||
|
received_pkts=common_s2c_pkt_num
|
||||||
|
sent_bytes=common_c2s_byte_num
|
||||||
|
received_bytes=common_s2c_byte_num
|
||||||
|
tcp_client_isn=common_tcp_client_isn
|
||||||
|
tcp_server_isn=common_tcp_server_isn
|
||||||
|
decoded_as=common_schema_type
|
||||||
|
session_id=common_stream_trace_id
|
||||||
|
http_url=http_url
|
||||||
|
http_host=http_host
|
||||||
|
http_proxy_flag=http_proxy_flag
|
||||||
|
http_sequence=http_sequence
|
||||||
|
http_cookie=http_cookie
|
||||||
|
http_referer=http_referer
|
||||||
|
http_user_agent=http_user_agent
|
||||||
|
http_request_content_length=http_request_content_length
|
||||||
|
http_request_content_type=http_request_content_type
|
||||||
|
http_response_content_length=http_response_content_length
|
||||||
|
http_response_content_type=http_response_content_type
|
||||||
|
http_set_cookie=http_set_cookie
|
||||||
|
http_version=http_version
|
||||||
|
mail_protocol_type=mail_protocol_type
|
||||||
|
mail_account=mail_account
|
||||||
|
mail_password=mail_passwd
|
||||||
|
mail_from_cmd=mail_from_cmd
|
||||||
|
mail_to_cmd=mail_to_cmd
|
||||||
|
mail_from=mail_from
|
||||||
|
mail_to=mail_to
|
||||||
|
mail_cc=mail_cc
|
||||||
|
mail_bcc=mail_bcc
|
||||||
|
mail_subject=mail_subject
|
||||||
|
mail_subject_charset=mail_subject_charset
|
||||||
|
mail_attachment_name=mail_attachment_name
|
||||||
|
mail_attachment_name_charset=mail_attachment_name_charset
|
||||||
|
mail_eml_file=mail_eml_file
|
||||||
|
dns_message_id=dns_message_id
|
||||||
|
dns_qr=dns_qr
|
||||||
|
dns_opcode=dns_opcode
|
||||||
|
dns_aa=dns_aa
|
||||||
|
dns_tc=dns_tc
|
||||||
|
dns_rd=dns_rd
|
||||||
|
dns_ra=dns_ra
|
||||||
|
dns_rcode=dns_rcode
|
||||||
|
dns_qdcount=dns_qdcount
|
||||||
|
dns_ancount=dns_ancount
|
||||||
|
dns_nscount=dns_nscount
|
||||||
|
dns_arcount=dns_arcount
|
||||||
|
dns_qname=dns_qname
|
||||||
|
dns_qtype=dns_qtype
|
||||||
|
dns_qclass=dns_qclass
|
||||||
|
dns_cname=dns_cname
|
||||||
|
dns_sub=dns_sub
|
||||||
|
dns_rr=dns_rr
|
||||||
|
ssl_version=ssl_version
|
||||||
|
ssl_sni=ssl_sni
|
||||||
|
ssl_san=ssl_san
|
||||||
|
ssl_cn=ssl_cn
|
||||||
|
ssl_handshake_latency_ms=ssl_con_latency_ms
|
||||||
|
ssl_ja3_hash=ssl_ja3_hash
|
||||||
|
ssl_cert_issuer=ssl_cert_issuer
|
||||||
|
ssl_cert_subject=ssl_cert_subject
|
||||||
|
quic_version=quic_version
|
||||||
|
quic_sni=quic_sni
|
||||||
|
quic_user_agent=quic_user_agent
|
||||||
|
ftp_account=ftp_account
|
||||||
|
ftp_url=ftp_url
|
||||||
|
ftp_link_type=ftp_link_type
|
||||||
|
#security-event
|
||||||
|
http_request_body=http_request_body
|
||||||
|
http_response_body=http_response_body
|
||||||
|
http_response_latency_ms=http_response_latency_ms
|
||||||
|
http_session_duration_ms=http_session_duration_ms
|
||||||
|
security_rule_list=security_rule_list
|
||||||
|
monitor_rule_list=monitor_rule_list
|
||||||
|
tcp_handshake_latency_ms=common_establish_latency_ms
|
||||||
|
#proxy-event
|
||||||
|
http_action_file_size=http_action_file_size
|
||||||
|
doh_url=doh_url
|
||||||
|
doh_host=doh_host
|
||||||
|
doh_cookie=doh_cookie
|
||||||
|
doh_referer=doh_referer
|
||||||
|
doh_user_agent=doh_user_agent
|
||||||
|
doh_version=doh_version
|
||||||
|
doh_message_id=doh_message_id
|
||||||
|
doh_qr=doh_qr
|
||||||
|
doh_opcode=doh_opcode
|
||||||
|
doh_aa=doh_aa
|
||||||
|
doh_tc=doh_tc
|
||||||
|
doh_rd=doh_rd
|
||||||
|
doh_ra=doh_ra
|
||||||
|
doh_rcode=doh_rcode
|
||||||
|
doh_qdcount=doh_qdcount
|
||||||
|
doh_ancount=doh_ancount
|
||||||
|
doh_nscount=doh_nscount
|
||||||
|
doh_arcount=doh_arcount
|
||||||
|
doh_qname=doh_qname
|
||||||
|
doh_qtype=doh_qtype
|
||||||
|
doh_qclass=doh_qclass
|
||||||
|
doh_cname=doh_cname
|
||||||
|
doh_sub=doh_sub
|
||||||
|
doh_rr=doh_rr
|
||||||
|
proxy_rule_list=proxy_rule_list
|
||||||
|
#session-record
|
||||||
|
tcp_c2s_ip_fragments=common_c2s_ipfrag_num
|
||||||
|
tcp_s2c_ip_fragments=common_s2c_ipfrag_num
|
||||||
|
tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num
|
||||||
|
tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num
|
||||||
@@ -83,8 +83,10 @@ public class FlowWriteConfig {
|
|||||||
public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout");
|
public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout");
|
||||||
public static final Integer DEAL_FILE_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "deal.file.parallelism");
|
public static final Integer DEAL_FILE_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "deal.file.parallelism");
|
||||||
public static final Integer SINK_FILE_DATA_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.file.data.parallelism");
|
public static final Integer SINK_FILE_DATA_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.file.data.parallelism");
|
||||||
public static final Integer SINK_PERCENT_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.parallelism");
|
|
||||||
|
|
||||||
|
public static final Integer SINK_PERCENT_SESSION_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.session.parallelism");
|
||||||
|
public static final Integer SINK_PERCENT_SECURITY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.security.parallelism");
|
||||||
|
public static final Integer SINK_PERCENT_PROXY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.proxy.parallelism");
|
||||||
/**
|
/**
|
||||||
* HBase
|
* HBase
|
||||||
*/
|
*/
|
||||||
@@ -134,6 +136,11 @@ public class FlowWriteConfig {
|
|||||||
public static final String SINK_PERCENT_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic");
|
public static final String SINK_PERCENT_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic");
|
||||||
public static final String SINK_FILE_DATA_SINK_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.file.data.kafka.topic");
|
public static final String SINK_FILE_DATA_SINK_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.file.data.kafka.topic");
|
||||||
|
|
||||||
|
|
||||||
|
public static final String SINK_PERCENT_KAFKA_TOPIC_SESSION = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.session");
|
||||||
|
public static final String SINK_PERCENT_KAFKA_TOPIC_SECURITY = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.security");
|
||||||
|
public static final String SINK_PERCENT_KAFKA_TOPIC_PROXY = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.proxy");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* connection kafka
|
* connection kafka
|
||||||
*/
|
*/
|
||||||
@@ -161,4 +168,18 @@ public class FlowWriteConfig {
|
|||||||
public static final Integer PUSHGATEWAY_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "pushgateway.statistics.time");
|
public static final Integer PUSHGATEWAY_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "pushgateway.statistics.time");
|
||||||
public static final Integer DEAL_FILE_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "deal.file.statistics.time");
|
public static final Integer DEAL_FILE_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "deal.file.statistics.time");
|
||||||
|
|
||||||
|
|
||||||
|
public static final String KNOWLEDGE_BASE_URL = ConfigurationsUtils.getStringProperty(propService, "knowledge.base.uri");
|
||||||
|
public static final String KNOWLEDGE_BASE_PATH = ConfigurationsUtils.getStringProperty(propService, "knowledge.base.path");
|
||||||
|
|
||||||
|
|
||||||
|
public static final String ASN_BUILTIN_KD_ID = ConfigurationsUtils.getStringProperty(propService, "asn.builtin.kd.id");
|
||||||
|
public static final String IP_BUILTIN_KD_ID = ConfigurationsUtils.getStringProperty(propService, "ip.builtin.kd.id");
|
||||||
|
public static final String IP_USER_DEFINED_KD_ID = ConfigurationsUtils.getStringProperty(propService, "ip.user.defined.kd.id");
|
||||||
|
public static final String HOS_URL = ConfigurationsUtils.getStringProperty(propService, "hos.url");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public static final Long KNOWLEDGE_EXECUTION_MINUTES = ConfigurationsUtils.getLongProperty(propService,"knowledge.execution.minutes");
|
||||||
}
|
}
|
||||||
@@ -6,7 +6,7 @@ import com.zdjizhi.common.FlowWriteConfig;
|
|||||||
import io.prometheus.client.CollectorRegistry;
|
import io.prometheus.client.CollectorRegistry;
|
||||||
import io.prometheus.client.Gauge;
|
import io.prometheus.client.Gauge;
|
||||||
import io.prometheus.client.exporter.PushGateway;
|
import io.prometheus.client.exporter.PushGateway;
|
||||||
import org.apache.flink.api.java.tuple.Tuple5;
|
import org.apache.flink.api.java.tuple.Tuple7;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
@@ -15,13 +15,15 @@ import java.io.IOException;
|
|||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
|
||||||
public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, Long, Long>, String> {
|
public class SendCountProcess extends ProcessFunction<Tuple7<Long, Long, Long, Long, Long, Long, Long>, String> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
private long recordCount = 0L;
|
private long recordCount = 0L;
|
||||||
private long failedCount = 0L;
|
private long failedCount = 0L;
|
||||||
private long httpRequestCount = 0L;
|
private long httpRequestCount = 0L;
|
||||||
private long httpResponseCount = 0L;
|
private long httpResponseCount = 0L;
|
||||||
private long mailEmlCount = 0L;
|
private long mailEmlCount = 0L;
|
||||||
|
private long securityCount = 0L;
|
||||||
|
private long proxyCount = 0L;
|
||||||
|
|
||||||
|
|
||||||
static final Gauge recordCountsGauge = Gauge.build()
|
static final Gauge recordCountsGauge = Gauge.build()
|
||||||
@@ -37,6 +39,10 @@ public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, L
|
|||||||
.name("httpResponseCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
|
.name("httpResponseCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
|
||||||
static final Gauge mailEmlCountCountGauge = Gauge.build()
|
static final Gauge mailEmlCountCountGauge = Gauge.build()
|
||||||
.name("mailEmlCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
|
.name("mailEmlCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
|
||||||
|
static final Gauge securityCountGauge = Gauge.build()
|
||||||
|
.name("securityCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
|
||||||
|
static final Gauge proxyCountGauge = Gauge.build()
|
||||||
|
.name("proxyCount").labelNames("ServerName", "Duration").help("The general intput log volume, the unit is slip").register();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) {
|
public void open(Configuration parameters) {
|
||||||
@@ -53,6 +59,9 @@ public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, L
|
|||||||
httpRequestCountGauge.labels("httpRequestCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(httpRequestCount);
|
httpRequestCountGauge.labels("httpRequestCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(httpRequestCount);
|
||||||
httpResponseCountGauge.labels("httpResponseCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(httpResponseCount);
|
httpResponseCountGauge.labels("httpResponseCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(httpResponseCount);
|
||||||
mailEmlCountCountGauge.labels("mailEmlCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(mailEmlCount);
|
mailEmlCountCountGauge.labels("mailEmlCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(mailEmlCount);
|
||||||
|
securityCountGauge.labels("securityCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(securityCount);
|
||||||
|
proxyCountGauge.labels("proxyCount", String.valueOf(FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME / 60)).set(proxyCount);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
//将指标推送至pushgateway
|
//将指标推送至pushgateway
|
||||||
push();
|
push();
|
||||||
@@ -72,13 +81,15 @@ public class SendCountProcess extends ProcessFunction<Tuple5<Long, Long, Long, L
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processElement(Tuple5<Long, Long, Long, Long, Long> value, Context ctx, Collector<String> out) {
|
public void processElement(Tuple7<Long, Long, Long, Long, Long, Long, Long> value, Context ctx, Collector<String> out) {
|
||||||
try {
|
try {
|
||||||
recordCount = recordCount + value.f0;
|
recordCount = recordCount + value.f0;
|
||||||
failedCount = failedCount + value.f1;
|
failedCount = failedCount + value.f1;
|
||||||
httpRequestCount = httpRequestCount + value.f2;
|
httpRequestCount = httpRequestCount + value.f2;
|
||||||
httpResponseCount = httpResponseCount + value.f3;
|
httpResponseCount = httpResponseCount + value.f3;
|
||||||
mailEmlCount = mailEmlCount + value.f4;
|
mailEmlCount = mailEmlCount + value.f4;
|
||||||
|
securityCount = securityCount + value.f5;
|
||||||
|
proxyCount = proxyCount + value.f6;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("统计指标处理失败,原因为" + e);
|
logger.error("统计指标处理失败,原因为" + e);
|
||||||
|
|||||||
@@ -3,43 +3,63 @@ package com.zdjizhi.operator.map;
|
|||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.geedgenetworks.utils.IpLookupV2;
|
|
||||||
import com.zdjizhi.tools.general.IpLookupUtils;
|
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||||
|
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
|
||||||
import com.zdjizhi.tools.transform.TransForm;
|
import com.zdjizhi.tools.transform.TransForm;
|
||||||
import com.zdjizhi.tools.json.MetaUtil;
|
import com.zdjizhi.tools.json.MetaUtil;
|
||||||
import org.apache.flink.api.java.tuple.Tuple2;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
|
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author qidaijie
|
||||||
* @version 2021/5/27 15:01
|
* @version 2021/5/27 15:01
|
||||||
*/
|
*/
|
||||||
public class MapCompleted extends ProcessFunction<String,com.alibaba.fastjson2.JSONObject> {
|
public class MapCompleted extends ProcessFunction<String, com.alibaba.fastjson2.JSONObject> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
|
private ConvertRecordToPERCENT securityEvnetConvert;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
|
try {
|
||||||
|
Properties securityProp = new Properties();
|
||||||
|
securityProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
|
||||||
|
securityEvnetConvert = new ConvertRecordToPERCENT(securityProp);
|
||||||
|
logger.info("ecurity_event日志Schema加载成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("security_event日志Schema加载失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processElement(String message, ProcessFunction<String, com.alibaba.fastjson2.JSONObject>.Context ctx, Collector<com.alibaba.fastjson2.JSONObject> out) throws Exception {
|
public void processElement(String message, ProcessFunction<String, com.alibaba.fastjson2.JSONObject>.Context ctx, Collector<com.alibaba.fastjson2.JSONObject> out) {
|
||||||
try {
|
try {
|
||||||
JSONObject jsonObject = JSONObject.parseObject(message);
|
JSONObject record = JSONObject.parseObject(message);
|
||||||
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
|
|
||||||
|
|
||||||
MetaUtil.dropJsonField(jsonObject);
|
JSONObject jsonObject = null;
|
||||||
TransForm.transformLog(jsonObject);
|
if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) {
|
||||||
out.collect(jsonObject);
|
jsonObject = securityEvnetConvert.convertToPERCENT(record);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jsonObject != null) {
|
||||||
|
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
|
||||||
|
|
||||||
|
MetaUtil.dropJsonField(jsonObject);
|
||||||
|
TransForm.transformLog(jsonObject);
|
||||||
|
out.collect(jsonObject);
|
||||||
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("TransForm log failed ( The field type is not verified ),The exception is :{}\n The error Message is:{}", e, message);
|
logger.error("TransForm log failed ( The field type is not verified ),The exception is :{}\n The error Message is:{}", e, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,19 @@ package com.zdjizhi.operator.map;
|
|||||||
|
|
||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
|
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||||
import com.zdjizhi.tools.json.MetaUtil;
|
import com.zdjizhi.tools.json.MetaUtil;
|
||||||
|
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
|
||||||
import com.zdjizhi.tools.transform.TransForm;
|
import com.zdjizhi.tools.transform.TransForm;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -17,21 +23,63 @@ import org.apache.flink.util.Collector;
|
|||||||
*/
|
*/
|
||||||
public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
|
public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
|
private ConvertRecordToPERCENT convertRecordToPERCENT;
|
||||||
|
Properties Prop = new Properties();
|
||||||
|
Properties actionProp = new Properties();
|
||||||
|
private HashMap<String, Integer> actionMap = new HashMap<String, Integer>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
|
try {
|
||||||
|
Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("session_record_mapping_table.properties"));
|
||||||
|
convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop);
|
||||||
|
logger.info("session_record_mapping_table.properties日志加载成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("session_record_mapping_table.properties日志加载失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
actionProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("action_definition.properties"));
|
||||||
|
for (String key : actionProp.stringPropertyNames()) {
|
||||||
|
final String action = actionProp.getProperty(key);
|
||||||
|
actionMap.put(key, Integer.valueOf(action));
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processElement(String message, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {
|
public void processElement(String message, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) {
|
||||||
try {
|
try {
|
||||||
JSONObject jsonObject = JSONObject.parseObject(message);
|
JSONObject record = JSONObject.parseObject(message);
|
||||||
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
|
JSONObject jsonObject = convertRecordToPERCENT.convertToPERCENT(record);
|
||||||
|
|
||||||
TransForm.transformLog(jsonObject);
|
if (jsonObject != null) {
|
||||||
MetaUtil.typeTransform(jsonObject);
|
|
||||||
out.collect(jsonObject);
|
if (record.containsKey("security_rule_list")) {
|
||||||
|
jsonObject.put("common_policy_id", JSONArray.from(record.get("security_rule_list")).get(0));
|
||||||
|
jsonObject.put("common_action", actionMap.get(record.get("security_action").toString().replace(" ","")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (record.containsKey("monitor_rule_list")) {
|
||||||
|
jsonObject.put("common_policy_id", JSONArray.from(record.get("monitor_rule_list")).get(0));
|
||||||
|
jsonObject.put("common_action", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (record.containsKey("proxy_rule_list")) {
|
||||||
|
jsonObject.put("common_policy_id", JSONArray.from(record.get("proxy_rule_list")).get(0));
|
||||||
|
jsonObject.put("common_action", actionMap.get(record.get("proxy_action").toString().replace(" ","")));
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
|
||||||
|
TransForm.transformLog(jsonObject);
|
||||||
|
MetaUtil.typeTransform(jsonObject);
|
||||||
|
out.collect(jsonObject);
|
||||||
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("TransForm logs failed( The field type is verified ),The exception is :{}\n The error Message is:{}", e, message);
|
logger.error("TransForm logs failed( The field type is verified ),The exception is :{}\n The error Message is:{}", e, message);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,40 @@
|
|||||||
|
package com.zdjizhi.operator.percent;
|
||||||
|
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||||
|
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
public class PercentProxyProcess extends ProcessFunction<JSONObject, String> {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
Properties prop = new Properties();
|
||||||
|
private ConvertRecordToPERCENT convertRecordToPERCENT;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
try {
|
||||||
|
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_proxy_event.properties"));
|
||||||
|
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
|
||||||
|
logger.info("percent_proxy_event.properties日志加载成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("percent_proxy_event.properties日志加载失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
|
||||||
|
try {
|
||||||
|
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("删减proxy_event日志字段失败,失败原因为:{},数据为:{}", e, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,42 @@
|
|||||||
|
package com.zdjizhi.operator.percent;
|
||||||
|
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||||
|
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
public class PercentSecurityProcess extends ProcessFunction<JSONObject, String> {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
Properties prop = new Properties();
|
||||||
|
private ConvertRecordToPERCENT convertRecordToPERCENT;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
try {
|
||||||
|
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_security_event.properties"));
|
||||||
|
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
|
||||||
|
logger.info("percent_security_event.properties日志加载成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("percent_security_event.properties日志加载失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
|
||||||
|
try {
|
||||||
|
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("删减security_event日志字段失败,失败原因为:{},数据为:{}", e, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
package com.zdjizhi.operator.percent;
|
||||||
|
|
||||||
|
import cn.hutool.log.Log;
|
||||||
|
import cn.hutool.log.LogFactory;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||||
|
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
|
import org.apache.flink.util.Collector;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
|
||||||
|
public class PercentSessionProcess extends ProcessFunction<JSONObject, String> {
|
||||||
|
private static final Log logger = LogFactory.get();
|
||||||
|
Properties prop = new Properties();
|
||||||
|
private ConvertRecordToPERCENT convertRecordToPERCENT;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(Configuration parameters) throws Exception {
|
||||||
|
super.open(parameters);
|
||||||
|
try {
|
||||||
|
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_session_record.properties"));
|
||||||
|
convertRecordToPERCENT = new ConvertRecordToPERCENT(prop);
|
||||||
|
logger.info("percent_session_record.properties日志加载成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("percent_session_record.properties日志加载失败,失败原因为:" + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processElement(JSONObject value, Context ctx, Collector<String> out) {
|
||||||
|
try {
|
||||||
|
out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value)));
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("删减percent_session日志字段失败,失败原因为:{},数据为:{}", e, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -10,12 +10,13 @@ import com.zdjizhi.common.FlowWriteConfig;
|
|||||||
import com.zdjizhi.common.pojo.FileMeta;
|
import com.zdjizhi.common.pojo.FileMeta;
|
||||||
import com.zdjizhi.common.pojo.SourceList;
|
import com.zdjizhi.common.pojo.SourceList;
|
||||||
import com.zdjizhi.tools.general.FileEdit;
|
import com.zdjizhi.tools.general.FileEdit;
|
||||||
import org.apache.flink.api.java.tuple.Tuple5;
|
import org.apache.flink.api.java.tuple.Tuple7;
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
import org.apache.flink.util.OutputTag;
|
import org.apache.flink.util.OutputTag;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
|
||||||
@@ -26,10 +27,16 @@ import java.util.TimerTask;
|
|||||||
* @Description:
|
* @Description:
|
||||||
* @date 2023/0928
|
* @date 2023/0928
|
||||||
*/
|
*/
|
||||||
public class DealFileProcessFunction extends ProcessFunction<JSONObject, String> {
|
public class DealFileProcess extends ProcessFunction<JSONObject, JSONObject> {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
|
|
||||||
public static final OutputTag<Tuple5<Long, Long, Long, Long, Long>> dealFileMetircTag = new OutputTag<Tuple5<Long, Long, Long, Long, Long>>("DealFileMetircTag") {
|
public static final OutputTag<Tuple7<Long, Long, Long, Long, Long, Long, Long>> dealFileMetircTag = new OutputTag<Tuple7<Long, Long, Long, Long, Long, Long, Long>>("DealFileMetircTag") {
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
public static final OutputTag<JSONObject> percentSecurityTag = new OutputTag<JSONObject>("percentSecurityTag") {
|
||||||
|
};
|
||||||
|
public static final OutputTag<JSONObject> percentProxyTag = new OutputTag<JSONObject>("percentProxyTag") {
|
||||||
};
|
};
|
||||||
|
|
||||||
private String rpUrlValue;
|
private String rpUrlValue;
|
||||||
@@ -53,6 +60,9 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
|
|||||||
private long httpResponseCount = 0L;
|
private long httpResponseCount = 0L;
|
||||||
private long mailEmlCount = 0L;
|
private long mailEmlCount = 0L;
|
||||||
private boolean metricSendFlag = true;
|
private boolean metricSendFlag = true;
|
||||||
|
private long securityCount = 0L;
|
||||||
|
private long proxyCount = 0L;
|
||||||
|
private Random random;
|
||||||
|
|
||||||
|
|
||||||
//初始化侧输流的标记
|
//初始化侧输流的标记
|
||||||
@@ -62,6 +72,7 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
|
|||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
|
random = new Random();
|
||||||
Timer timer = new Timer();
|
Timer timer = new Timer();
|
||||||
//注册定时器
|
//注册定时器
|
||||||
timer.schedule(new TimerTask() {
|
timer.schedule(new TimerTask() {
|
||||||
@@ -77,18 +88,20 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void processElement(JSONObject message, Context context, Collector<String> collector) throws Exception {
|
public void processElement(JSONObject message, Context context, Collector<JSONObject> collector) throws Exception {
|
||||||
try {
|
try {
|
||||||
//定时向下游推送指标
|
//定时向下游推送指标
|
||||||
if (metricSendFlag) {
|
if (metricSendFlag) {
|
||||||
metricSendFlag = false;
|
metricSendFlag = false;
|
||||||
if (recordCount > 0 || failedCount > 0 || httpRequestCount > 0 || httpResponseCount > 0 || mailEmlCount > 0) {
|
if (recordCount > 0 || failedCount > 0 || httpRequestCount > 0 || httpResponseCount > 0 || mailEmlCount > 0) {
|
||||||
context.output(dealFileMetircTag, Tuple5.of(recordCount, failedCount, httpRequestCount, httpResponseCount, mailEmlCount));
|
context.output(dealFileMetircTag, Tuple7.of(recordCount, failedCount, httpRequestCount, httpResponseCount, mailEmlCount, securityCount, proxyCount));
|
||||||
recordCount = 0L;
|
recordCount = 0L;
|
||||||
failedCount = 0L;
|
failedCount = 0L;
|
||||||
httpRequestCount = 0;
|
httpRequestCount = 0;
|
||||||
httpResponseCount = 0;
|
httpResponseCount = 0;
|
||||||
mailEmlCount = 0L;
|
mailEmlCount = 0L;
|
||||||
|
securityCount = 0L;
|
||||||
|
proxyCount = 0L;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
recordCount++;
|
recordCount++;
|
||||||
@@ -102,37 +115,42 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
|
|||||||
sPort = (int) message.get("common_client_port");
|
sPort = (int) message.get("common_client_port");
|
||||||
dIp = (String) message.get("common_server_ip");
|
dIp = (String) message.get("common_server_ip");
|
||||||
dPort = (int) message.get("common_server_port");
|
dPort = (int) message.get("common_server_port");
|
||||||
foundTime = (long) message.get("common_recv_time");
|
foundTime = (long) message.get("common_end_timestamp_ms");
|
||||||
schemaType = (String) message.get("common_schema_type");
|
schemaType = (String) message.get("common_schema_type");
|
||||||
domain = (String) message.getOrDefault("common_server_domain", "");
|
domain = (String) message.getOrDefault("common_server_domain", "");
|
||||||
account = (String) message.getOrDefault("common_subscribe_id", "");
|
account = (String) message.getOrDefault("common_subscribe_id", "");
|
||||||
FileMeta fileMeta = new FileMeta();
|
FileMeta fileMeta = new FileMeta();
|
||||||
JSONArray jsonarray = new JSONArray();
|
JSONArray jsonarray = new JSONArray();
|
||||||
|
|
||||||
|
final String[] oosArr = FlowWriteConfig.OOS_SERVERS.split(",");
|
||||||
|
|
||||||
|
final int i = random.nextInt(oosArr.length);
|
||||||
|
|
||||||
|
|
||||||
if (StringUtil.isNotBlank(rqUrlValue)) {
|
if (StringUtil.isNotBlank(rqUrlValue)) {
|
||||||
String fileId = FileEdit.getFileId(rqUrlValue, "_1");
|
String fileId = FileEdit.getFileId(rqUrlValue, "_1");
|
||||||
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
|
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
|
||||||
SourceList request = new SourceList();
|
SourceList request = new SourceList();
|
||||||
request.setSource_oss_path(rqUrlValue);
|
request.setSource_oss_path(FlowWriteConfig.HOS_URL + rqUrlValue);
|
||||||
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId));
|
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
|
||||||
jsonarray.add(request);
|
jsonarray.add(request);
|
||||||
httpRequestCount++;
|
httpRequestCount++;
|
||||||
}
|
}
|
||||||
if (StringUtil.isNotBlank(rpUrlValue)) {
|
if (StringUtil.isNotBlank(rpUrlValue)) {
|
||||||
|
|
||||||
String fileId = FileEdit.getFileId(rpUrlValue, "_2");
|
String fileId = FileEdit.getFileId(rpUrlValue, "_2");
|
||||||
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
|
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
|
||||||
SourceList response = new SourceList();
|
SourceList response = new SourceList();
|
||||||
response.setSource_oss_path(rpUrlValue);
|
response.setSource_oss_path(FlowWriteConfig.HOS_URL + rpUrlValue);
|
||||||
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId));
|
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
|
||||||
jsonarray.add(response);
|
jsonarray.add(response);
|
||||||
httpResponseCount++;
|
httpResponseCount++;
|
||||||
}
|
}
|
||||||
if (StringUtil.isNotBlank(emailUrlValue)) {
|
if (StringUtil.isNotBlank(emailUrlValue)) {
|
||||||
String fileId = FileEdit.getFileId(emailUrlValue, "_9");
|
String fileId = FileEdit.getFileId(emailUrlValue, "_9");
|
||||||
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
|
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
|
||||||
SourceList emailFile = new SourceList();
|
SourceList emailFile = new SourceList();
|
||||||
emailFile.setSource_oss_path(emailUrlValue);
|
emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL + emailUrlValue);
|
||||||
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId));
|
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
|
||||||
jsonarray.add(emailFile);
|
jsonarray.add(emailFile);
|
||||||
mailEmlCount++;
|
mailEmlCount++;
|
||||||
}
|
}
|
||||||
@@ -143,8 +161,15 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
|
|||||||
fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
|
fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
|
||||||
context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
|
context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
|
||||||
}
|
}
|
||||||
collector.collect(JSONObject.toJSONString(message));
|
|
||||||
|
|
||||||
|
if (message.containsKey("security_rule_list") || message.containsKey("monitor_rule_list")) {
|
||||||
|
context.output(percentSecurityTag, message);
|
||||||
|
securityCount++;
|
||||||
|
} else if (message.containsKey("proxy_rule_list")) {
|
||||||
|
context.output(percentProxyTag, message);
|
||||||
|
proxyCount++;
|
||||||
|
}
|
||||||
|
collector.collect(message);
|
||||||
}
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
|
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
|
||||||
@@ -12,6 +12,7 @@ import org.apache.http.client.HttpRequestRetryHandler;
|
|||||||
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.client.config.RequestConfig;
|
||||||
import org.apache.http.client.methods.*;
|
import org.apache.http.client.methods.*;
|
||||||
import org.apache.http.client.protocol.HttpClientContext;
|
import org.apache.http.client.protocol.HttpClientContext;
|
||||||
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
import org.apache.http.config.Registry;
|
import org.apache.http.config.Registry;
|
||||||
import org.apache.http.config.RegistryBuilder;
|
import org.apache.http.config.RegistryBuilder;
|
||||||
import org.apache.http.conn.ConnectTimeoutException;
|
import org.apache.http.conn.ConnectTimeoutException;
|
||||||
@@ -35,10 +36,12 @@ import java.net.*;
|
|||||||
import java.security.KeyManagementException;
|
import java.security.KeyManagementException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.security.cert.X509Certificate;
|
import java.security.cert.X509Certificate;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
public class HttpClientService {
|
public class HttpClientService {
|
||||||
|
|
||||||
private static final Log log = LogFactory.get();
|
private static final Log log = LogFactory.get();
|
||||||
|
public static final String ERROR_MESSAGE = "-1";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 在调用SSL之前需要重写验证方法,取消检测SSL
|
* 在调用SSL之前需要重写验证方法,取消检测SSL
|
||||||
@@ -254,4 +257,73 @@ public class HttpClientService {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET请求
|
||||||
|
*
|
||||||
|
* @param uri 请求地
|
||||||
|
* @return message
|
||||||
|
*/
|
||||||
|
public String httpGet(URI uri, int socketTimeout, Header... headers) {
|
||||||
|
String msg = ERROR_MESSAGE;
|
||||||
|
|
||||||
|
// 获取客户端连接对象
|
||||||
|
CloseableHttpClient httpClient = getHttpClient(socketTimeout);
|
||||||
|
CloseableHttpResponse response = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
log.info("http get uri {}", uri);
|
||||||
|
// 创建GET请求对象
|
||||||
|
HttpGet httpGet = new HttpGet(uri);
|
||||||
|
|
||||||
|
if (StringUtil.isNotEmpty(headers)) {
|
||||||
|
for (Header h : headers) {
|
||||||
|
httpGet.addHeader(h);
|
||||||
|
log.info("request header : {}", h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 执行请求
|
||||||
|
response = httpClient.execute(httpGet);
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
// 获取响应实体
|
||||||
|
HttpEntity entity = response.getEntity();
|
||||||
|
// 获取响应信息
|
||||||
|
msg = EntityUtils.toString(entity, "UTF-8");
|
||||||
|
|
||||||
|
if (statusCode != HttpStatus.SC_OK) {
|
||||||
|
log.error("Http get content is :{}", msg);
|
||||||
|
}
|
||||||
|
} catch (ClientProtocolException e) {
|
||||||
|
log.error("协议错误: {}", e.getMessage());
|
||||||
|
} catch (ParseException e) {
|
||||||
|
log.error("解析错误: {}", e.getMessage());
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("IO错误: {}", e.getMessage());
|
||||||
|
} finally {
|
||||||
|
if (null != response) {
|
||||||
|
try {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
|
response.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("释放链接错误: {}", e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUrlWithParams(URIBuilder uriBuilder, String path, Map<String, Object> params) {
|
||||||
|
try {
|
||||||
|
uriBuilder.setPath(path);
|
||||||
|
if (params != null && !params.isEmpty()) {
|
||||||
|
for (Map.Entry<String, Object> kv : params.entrySet()) {
|
||||||
|
uriBuilder.setParameter(kv.getKey(), kv.getValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,9 +30,9 @@ public class KafkaProducer {
|
|||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FlinkKafkaProducer<String> getPercentKafkaProducer() {
|
public static FlinkKafkaProducer<String> getPercentKafkaProducer(String kafkaTopic) {
|
||||||
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
|
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
|
||||||
FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC,
|
kafkaTopic,
|
||||||
new SimpleStringSchema(),
|
new SimpleStringSchema(),
|
||||||
createProducerConfig(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS), Optional.empty());
|
createProducerConfig(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS), Optional.empty());
|
||||||
|
|
||||||
|
|||||||
@@ -1,49 +1,36 @@
|
|||||||
package com.zdjizhi.tools.general;
|
package com.zdjizhi.tools.general;
|
||||||
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import com.zdjizhi.tools.ordinary.MD5Utils;
|
import com.zdjizhi.tools.ordinary.MD5Utils;
|
||||||
|
|
||||||
import static com.zdjizhi.common.FlowWriteConfig.judgeFileType;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 文件字段操作工具
|
* 文件字段操作工具
|
||||||
*/
|
*/
|
||||||
public class FileEdit {
|
public class FileEdit {
|
||||||
|
|
||||||
|
|
||||||
public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){
|
public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId,String oosServer) {
|
||||||
String fileType = null;
|
String fileType = null;
|
||||||
if (judgeFileType(getFileType(urlValue))){
|
if (schemaType.equals("HTTP")) {
|
||||||
fileType = getFileType(urlValue);
|
fileType = "html";
|
||||||
}else {
|
}
|
||||||
if (schemaType.equals("HTTP")){
|
if (schemaType.equals("MAIL")) {
|
||||||
fileType = "html";
|
fileType = "eml";
|
||||||
}
|
|
||||||
if (schemaType.equals("MAIL")){
|
|
||||||
fileType = "eml";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account;
|
return "http://" + oosServer + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getFileDownloadUrl(String fileId){
|
public static String getFileDownloadUrl(String fileId,String oosServer) {
|
||||||
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId;
|
return "http://" + oosServer + "/v3/download?file_id=" + fileId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getFileType(String url) {
|
||||||
public static String getFileType(String url){
|
|
||||||
String[] split = url.split("\\.");
|
String[] split = url.split("\\.");
|
||||||
return split[split.length-1];
|
return split[split.length - 1];
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getFileId(String url,String fileSuffix) throws Exception {
|
public static String getFileId(String filename, String fileSuffix) throws Exception {
|
||||||
|
|
||||||
String[] arr = url.split("/");
|
|
||||||
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
|
|
||||||
String prefix = MD5Utils.md5Encode(filename);
|
String prefix = MD5Utils.md5Encode(filename);
|
||||||
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
|
return prefix + fileSuffix;
|
||||||
return prefix+fileSuffix;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,34 +4,32 @@ import cn.hutool.crypto.digest.DigestUtil;
|
|||||||
import cn.hutool.log.Log;
|
import cn.hutool.log.Log;
|
||||||
import cn.hutool.log.LogFactory;
|
import cn.hutool.log.LogFactory;
|
||||||
import com.alibaba.fastjson2.*;
|
import com.alibaba.fastjson2.*;
|
||||||
import com.alibaba.nacos.api.config.ConfigService;
|
|
||||||
import com.alibaba.nacos.api.config.listener.Listener;
|
|
||||||
import com.alibaba.nacos.api.exception.NacosException;
|
|
||||||
import com.geedgenetworks.utils.IpLookupV2;
|
import com.geedgenetworks.utils.IpLookupV2;
|
||||||
import com.geedgenetworks.utils.StringUtil;
|
import com.geedgenetworks.utils.StringUtil;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.zdjizhi.common.CommonConfig;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
|
import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
|
||||||
import com.zdjizhi.tools.connections.http.HttpClientService;
|
import com.zdjizhi.tools.connections.http.HttpClientService;
|
||||||
import com.zdjizhi.tools.connections.nacos.NacosConnection;
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.Map;
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author qidaijie
|
* @author wangchengcheng
|
||||||
* @version 2022/11/16 15:23
|
* @version 2023/11/10 15:23
|
||||||
*/
|
*/
|
||||||
public class IpLookupUtils {
|
public class IpLookupUtils {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb";
|
private static final String ipBuiltInName = "ip_builtin.mmdb";
|
||||||
private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb";
|
private static final String ipUserDefinedName = "ip_user_defined.mmdb";
|
||||||
private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb";
|
|
||||||
private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb";
|
private static final String asnName = "asn_builtin.mmdb";
|
||||||
private static final String asnV4Name = "asn_v4.mmdb";
|
|
||||||
private static final String asnV6Name = "asn_v6.mmdb";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ip定位库
|
* ip定位库
|
||||||
@@ -58,142 +56,151 @@ public class IpLookupUtils {
|
|||||||
*/
|
*/
|
||||||
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
|
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
|
||||||
|
|
||||||
|
private static String currentSha256IpUserDefined = "";
|
||||||
|
|
||||||
|
private static String currentSha256IpBuiltin = "";
|
||||||
|
|
||||||
|
|
||||||
|
private static String currentSha256AsnBuiltin = "";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
JSONPath jsonPath = JSONPath.of(getFilterParameter());
|
|
||||||
httpClientService = new HttpClientService();
|
httpClientService = new HttpClientService();
|
||||||
|
|
||||||
NacosConnection nacosConnection = new NacosConnection();
|
|
||||||
ConfigService schemaService = nacosConnection.getPublicService();
|
|
||||||
try {
|
try {
|
||||||
String configInfo = schemaService.getConfigAndSignListener(FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID, FlowWriteConfig.NACOS_PUBLIC_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT, new Listener() {
|
stuffKnowledgeMetaCache();
|
||||||
|
Timer timer = new Timer();
|
||||||
|
timer.schedule(new TimerTask() {
|
||||||
@Override
|
@Override
|
||||||
public Executor getExecutor() {
|
public void run() {
|
||||||
return null;
|
stuffKnowledgeMetaCache();
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void receiveConfigInfo(String configInfo) {
|
|
||||||
if (StringUtil.isNotBlank(configInfo)) {
|
|
||||||
updateIpLookup(jsonPath, configInfo);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}, 0, FlowWriteConfig.KNOWLEDGE_EXECUTION_MINUTES * 1000 * 60);
|
||||||
|
} catch (Exception e) {
|
||||||
if (StringUtil.isNotBlank(configInfo)) {
|
logger.error("知识库加载失败,失败原因为:" + e);
|
||||||
updateIpLookup(jsonPath, configInfo);
|
|
||||||
}
|
|
||||||
} catch (NacosException e) {
|
|
||||||
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void updateIpLookup(JSONPath jsonPath, String configInfo) {
|
|
||||||
String extract = jsonPath.extract(JSONReader.of(configInfo)).toString();
|
private static void stuffKnowledgeMetaCache() {
|
||||||
if (StringUtil.isNotBlank(extract)) {
|
final KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_BUILTIN_KD_ID);
|
||||||
JSONArray jsonArray = JSON.parseArray(extract);
|
if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
|
||||||
if (jsonArray.size() > 0) {
|
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat());
|
||||||
for (int i = 0; i < jsonArray.size(); i++) {
|
knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta);
|
||||||
KnowlegeBaseMeta knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
|
}
|
||||||
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(knowlegeBaseMeta.getName(), knowlegeBaseMeta.getFormat());
|
final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID);
|
||||||
knowledgeMetaCache.put(fileName, knowlegeBaseMeta);
|
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) {
|
||||||
}
|
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat());
|
||||||
reloadIpLookup();
|
knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final KnowlegeBaseMeta asnBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.ASN_BUILTIN_KD_ID);
|
||||||
|
if (!currentSha256AsnBuiltin.equals(asnBuiltinknowlegeBaseMeta.getSha256())) {
|
||||||
|
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(asnBuiltinknowlegeBaseMeta.getName(), asnBuiltinknowlegeBaseMeta.getFormat());
|
||||||
|
knowledgeMetaCache.put(fileName, asnBuiltinknowlegeBaseMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256()) || !currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256()) || !currentSha256AsnBuiltin.equals(asnBuiltinknowlegeBaseMeta.getSha256())) {
|
||||||
|
currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256();
|
||||||
|
currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256();
|
||||||
|
currentSha256AsnBuiltin = asnBuiltinknowlegeBaseMeta.getSha256();
|
||||||
|
reloadIpLookup();
|
||||||
|
logger.info("知识库加载成功.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 从HDFS下载文件更新IpLookup
|
* 从HDFS下载文件更新IpLookup
|
||||||
*
|
|
||||||
* @return 更新后的IpLookup
|
|
||||||
*/
|
*/
|
||||||
public static void reloadIpLookup() {
|
private static void reloadIpLookup() {
|
||||||
int retryNum = 0;
|
|
||||||
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
|
IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
|
||||||
for (String fileName : knowledgeMetaCache.keySet()) {
|
for (String fileName : knowledgeMetaCache.keySet()) {
|
||||||
|
int retryNum = 0;
|
||||||
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
|
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
|
||||||
String metaSha256 = knowlegeBaseMeta.getSha256();
|
String metaSha256 = knowlegeBaseMeta.getSha256();
|
||||||
do {
|
while (retryNum < TRY_TIMES) {
|
||||||
|
System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath());
|
||||||
|
Long startTime = System.currentTimeMillis();
|
||||||
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
|
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
|
||||||
if (httpGetByte.length > 0) {
|
if (httpGetByte != null && httpGetByte.length > 0) {
|
||||||
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
|
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
|
||||||
if (metaSha256.equals(downloadFileSha256)) {
|
if (metaSha256.equals(downloadFileSha256)) {
|
||||||
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
|
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
|
||||||
switch (fileName) {
|
switch (fileName) {
|
||||||
case ipv4BuiltInName:
|
case ipBuiltInName:
|
||||||
builder.loadDataFileV4(inputStream);
|
builder.loadDataFile(inputStream);
|
||||||
break;
|
break;
|
||||||
case ipv6BuiltInName:
|
case ipUserDefinedName:
|
||||||
builder.loadDataFileV6(inputStream);
|
builder.loadDataFilePrivate(inputStream);
|
||||||
break;
|
break;
|
||||||
case ipv4UserDefinedName:
|
case asnName:
|
||||||
builder.loadDataFilePrivateV4(inputStream);
|
builder.loadAsnDataFile(inputStream);
|
||||||
break;
|
|
||||||
case ipv6UserDefinedName:
|
|
||||||
builder.loadDataFilePrivateV6(inputStream);
|
|
||||||
break;
|
|
||||||
case asnV4Name:
|
|
||||||
builder.loadAsnDataFileV4(inputStream);
|
|
||||||
break;
|
|
||||||
case asnV6Name:
|
|
||||||
builder.loadAsnDataFileV6(inputStream);
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms");
|
||||||
retryNum = TRY_TIMES;
|
retryNum = TRY_TIMES;
|
||||||
} else {
|
} else {
|
||||||
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
|
logger.error("通过HOS下载{}的sha256为:{} ,网关内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
|
||||||
retryNum++;
|
retryNum++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
|
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
|
||||||
retryNum++;
|
retryNum++;
|
||||||
}
|
}
|
||||||
} while (retryNum < TRY_TIMES);
|
}
|
||||||
}
|
}
|
||||||
ipLookup = builder.build();
|
ipLookup = builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static IpLookupV2 getIpLookup() {
|
||||||
|
return ipLookup;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 根据配置组合生成知识库元数据过滤参数
|
* 根据配置组合生成知识库元数据过滤参数
|
||||||
*
|
*
|
||||||
* @return 过滤参数
|
* @return 过滤参数
|
||||||
*/
|
*/
|
||||||
private static String getFilterParameter() {
|
private static String getFilterParameter() {
|
||||||
String[] typeList = CommonConfig.KNOWLEDGEBASE_TYPE_LIST.split(",");
|
|
||||||
String[] nameList = CommonConfig.KNOWLEDGEBASE_NAME_LIST.split(",");
|
|
||||||
String expr = "[?(@.version=='latest')]";
|
|
||||||
|
|
||||||
if (typeList.length > 1) {
|
String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined','asn_builtin'))]";
|
||||||
StringBuilder typeBuilder = new StringBuilder();
|
|
||||||
typeBuilder.append("[?(@.type in (");
|
|
||||||
for (int i = 0; i < typeList.length; i++) {
|
|
||||||
if (i == typeList.length - 1) {
|
|
||||||
typeBuilder.append("'").append(typeList[i]).append("'))]");
|
|
||||||
} else {
|
|
||||||
typeBuilder.append("'").append(typeList[i]).append("',");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
expr = expr + typeBuilder;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nameList.length > 1) {
|
|
||||||
StringBuilder nameBuilder = new StringBuilder();
|
|
||||||
nameBuilder.append("[?(@.name in (");
|
|
||||||
for (int i = 0; i < nameList.length; i++) {
|
|
||||||
if (i == nameList.length - 1) {
|
|
||||||
nameBuilder.append("'").append(nameList[i]).append("'))]");
|
|
||||||
} else {
|
|
||||||
nameBuilder.append("'").append(nameList[i]).append("',");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
expr = expr + nameBuilder;
|
|
||||||
}
|
|
||||||
|
|
||||||
return expr;
|
return expr;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static IpLookupV2 getIpLookup() {
|
public static String getCountryLookup(String ip) {
|
||||||
return ipLookup;
|
return ipLookup.countryLookup(ip);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) {
|
||||||
|
KnowlegeBaseMeta knowlegeBaseMeta = null;
|
||||||
|
String knowledgeInfo = null;
|
||||||
|
try {
|
||||||
|
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.KNOWLEDGE_BASE_URL);
|
||||||
|
HashMap<String, Object> parms = new HashMap<>();
|
||||||
|
parms.put("kb_id", kd_id);
|
||||||
|
httpClientService.setUrlWithParams(uriBuilder, FlowWriteConfig.KNOWLEDGE_BASE_PATH, parms);
|
||||||
|
knowledgeInfo = httpClientService.httpGet(uriBuilder.build(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
|
||||||
|
if (knowledgeInfo.contains("200")) {
|
||||||
|
final Map<String, Object> jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class);
|
||||||
|
JSONPath jsonPath = JSONPath.of(getFilterParameter());
|
||||||
|
String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString();
|
||||||
|
if (StringUtil.isNotBlank(extract)) {
|
||||||
|
JSONArray jsonArray = JSON.parseArray(extract);
|
||||||
|
if (jsonArray.size() > 0) {
|
||||||
|
for (int i = 0; i < jsonArray.size(); i++) {
|
||||||
|
knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.error("获取knowledge_base失败,请求回执为" + knowledgeInfo);
|
||||||
|
}
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
logger.error("构造URI异常", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("获取knowledge_base失败", e);
|
||||||
|
}
|
||||||
|
return knowlegeBaseMeta;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -143,15 +143,17 @@ public class MetaUtil {
|
|||||||
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
|
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
|
||||||
String key = entry.getKey();
|
String key = entry.getKey();
|
||||||
Object value = entry.getValue();
|
Object value = entry.getValue();
|
||||||
if (schemaFieldsTypeMap.containsKey(key)) {
|
if (value != null) {
|
||||||
try {
|
if (schemaFieldsTypeMap.containsKey(key)) {
|
||||||
Class<?> schemaFieldClass = schemaFieldsTypeMap.get(key);
|
try {
|
||||||
if (schemaFieldClass != value.getClass()) {
|
Class<?> schemaFieldClass = schemaFieldsTypeMap.get(key);
|
||||||
String simpleName = schemaFieldClass.getSimpleName();
|
if (schemaFieldClass != value.getClass()) {
|
||||||
DataTypeCheck.typeConverter(jsonObject, key, value, simpleName);
|
String simpleName = schemaFieldClass.getSimpleName();
|
||||||
|
DataTypeCheck.typeConverter(jsonObject, key, value, simpleName);
|
||||||
|
}
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
logger.error("The {} field type conversion is abnormal! message is:", key, e);
|
||||||
}
|
}
|
||||||
} catch (RuntimeException e) {
|
|
||||||
logger.error("The {} field type conversion is abnormal! message is:", key, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,72 @@
|
|||||||
|
package com.zdjizhi.tools.logtransformation;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
public class ConvertRecordToPERCENT {
|
||||||
|
private Properties prop;
|
||||||
|
private HashMap<String, String> recordSchema;
|
||||||
|
|
||||||
|
public ConvertRecordToPERCENT(Properties prop) {
|
||||||
|
this.prop = prop;
|
||||||
|
final HashMap<String, String> schemaMap = new HashMap<String, String>();
|
||||||
|
for (String key : prop.stringPropertyNames()) {
|
||||||
|
final String schema = prop.getProperty(key);
|
||||||
|
schemaMap.put(key, schema);
|
||||||
|
}
|
||||||
|
this.recordSchema = schemaMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JSONObject convertToPERCENT(JSONObject record) {
|
||||||
|
final JSONObject percent = new JSONObject();
|
||||||
|
for (Map.Entry<String, Object> entry : record.entrySet()) {
|
||||||
|
if (recordSchema.containsKey(entry.getKey())) {
|
||||||
|
percent.put(recordSchema.get(entry.getKey()), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//填充common_start_time、common_end_time
|
||||||
|
percent.put("common_start_time", (long) record.get("start_timestamp_ms") / 1000);
|
||||||
|
percent.put("common_end_time", (long) record.get("end_timestamp_ms") / 1000);
|
||||||
|
|
||||||
|
//填充common_sessions
|
||||||
|
percent.put("common_sessions", 1);
|
||||||
|
|
||||||
|
//填充common_internal_ip、common_external_ip、common_direction、common_stream_dir
|
||||||
|
if (record.containsKey("flags")) {
|
||||||
|
final int flags = (int) record.get("flags");
|
||||||
|
if (flags > 0) {
|
||||||
|
if ((8L & flags) == 8L && (16L & flags) != 16L) {
|
||||||
|
percent.put("common_internal_ip", record.get("common_client_ip"));
|
||||||
|
percent.put("common_external_ip", record.get("common_server_ip"));
|
||||||
|
percent.put("common_direction", 69);
|
||||||
|
} else if ((8L & flags) != 8L && (16L & flags) == 16L) {
|
||||||
|
percent.put("common_internal_ip", record.get("common_server_ip"));
|
||||||
|
percent.put("common_external_ip", record.get("common_client_ip"));
|
||||||
|
percent.put("common_direction", 73);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((8192L & flags) == 8192L && (16384L & flags) == 16384L) {
|
||||||
|
percent.put("common_stream_dir", 3);
|
||||||
|
} else if ((8192L & flags) == 8192L) {
|
||||||
|
percent.put("common_stream_dir", 1);
|
||||||
|
} else if ((16384L & flags) == 16384L) {
|
||||||
|
percent.put("common_stream_dir", 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return percent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JSONObject removeFields(JSONObject record) {
|
||||||
|
for (Map.Entry<String, String> entry : recordSchema.entrySet()) {
|
||||||
|
if (record.containsKey(entry.getValue())) {
|
||||||
|
record.remove(entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -7,15 +7,16 @@ import com.alibaba.fastjson2.JSONObject;
|
|||||||
import com.zdjizhi.common.FlowWriteConfig;
|
import com.zdjizhi.common.FlowWriteConfig;
|
||||||
import com.zdjizhi.operator.count.SendCountProcess;
|
import com.zdjizhi.operator.count.SendCountProcess;
|
||||||
import com.zdjizhi.operator.map.MapCompleted;
|
import com.zdjizhi.operator.map.MapCompleted;
|
||||||
|
import com.zdjizhi.operator.percent.PercentProxyProcess;
|
||||||
|
import com.zdjizhi.operator.percent.PercentSecurityProcess;
|
||||||
import com.zdjizhi.operator.map.TypeMapCompleted;
|
import com.zdjizhi.operator.map.TypeMapCompleted;
|
||||||
import com.zdjizhi.operator.process.DealFileProcessFunction;
|
import com.zdjizhi.operator.percent.PercentSessionProcess;
|
||||||
|
import com.zdjizhi.operator.process.DealFileProcess;
|
||||||
import com.zdjizhi.tools.connections.kafka.KafkaConsumer;
|
import com.zdjizhi.tools.connections.kafka.KafkaConsumer;
|
||||||
import com.zdjizhi.tools.connections.kafka.KafkaProducer;
|
import com.zdjizhi.tools.connections.kafka.KafkaProducer;
|
||||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public class LogFlowWriteTopology {
|
public class LogFlowWriteTopology {
|
||||||
private static final Log logger = LogFactory.get();
|
private static final Log logger = LogFactory.get();
|
||||||
|
|
||||||
@@ -37,12 +38,21 @@ public class LogFlowWriteTopology {
|
|||||||
|
|
||||||
}
|
}
|
||||||
//处理带有非结构化文件字段的数据
|
//处理带有非结构化文件字段的数据
|
||||||
SingleOutputStreamOperator<String> dealFileProcessFunction = completedStream.process(new DealFileProcessFunction()).name("DealFileProcessFunction").uid("DealFile-ProcessFunction").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM);
|
SingleOutputStreamOperator<JSONObject> dealFileProcessFunction = completedStream.process(new DealFileProcess()).name("DealFileProcess").uid("DealFile-Process").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM);
|
||||||
//补全后的数据发送至百分点的kafka
|
//补全后的数据发送至百分点的kafka
|
||||||
dealFileProcessFunction.addSink(KafkaProducer.getPercentKafkaProducer()).name("ToPercentKafka").uid("To-Percent-Kafka").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM);
|
final SingleOutputStreamOperator<String> percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM);
|
||||||
|
|
||||||
|
final SingleOutputStreamOperator<String> percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM);
|
||||||
|
|
||||||
|
final SingleOutputStreamOperator<String> percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM);
|
||||||
|
|
||||||
|
percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM);
|
||||||
|
percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM);
|
||||||
|
percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM);
|
||||||
|
|
||||||
//文件元数据发送至TRAFFIC-FILE-METADATA
|
//文件元数据发送至TRAFFIC-FILE-METADATA
|
||||||
dealFileProcessFunction.getSideOutput(DealFileProcessFunction.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM);
|
dealFileProcessFunction.getSideOutput(DealFileProcess.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM);
|
||||||
dealFileProcessFunction.getSideOutput(DealFileProcessFunction.dealFileMetircTag).process(new SendCountProcess()).name("SendCountProcess").uid("Send-Count-Process").setParallelism(1);
|
dealFileProcessFunction.getSideOutput(DealFileProcess.dealFileMetircTag).process(new SendCountProcess()).name("SendCountProcess").uid("Send-Count-Process").setParallelism(1);
|
||||||
try {
|
try {
|
||||||
environment.execute(args[0]);
|
environment.execute(args[0]);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|||||||
@@ -1,75 +0,0 @@
|
|||||||
package com.zdjizhi.hdfs;
|
|
||||||
|
|
||||||
import cn.hutool.log.Log;
|
|
||||||
import cn.hutool.log.LogFactory;
|
|
||||||
import com.zdjizhi.common.FlowWriteConfig;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.*;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author qidaijie
|
|
||||||
* @Package com.zdjizhi.tools.connections.hadoop
|
|
||||||
* @Description:
|
|
||||||
* @date 2022/11/217:57
|
|
||||||
*/
|
|
||||||
public class FileUtilsTest {
|
|
||||||
private static final Log logger = LogFactory.get();
|
|
||||||
|
|
||||||
private static FileSystem fileSystem;
|
|
||||||
|
|
||||||
static {
|
|
||||||
Configuration configuration = new Configuration();
|
|
||||||
try {
|
|
||||||
configuration.set("fs.defaultFS","hdfs://ns1");
|
|
||||||
configuration.set("hadoop.proxyuser.root.hosts","*");
|
|
||||||
configuration.set("hadoop.proxyuser.root.groups","*");
|
|
||||||
configuration.set("ha.zookeeper.quorum","192.168.44.83:2181,192.168.44.84:2181,192.168.44.85:2181");
|
|
||||||
configuration.set("dfs.nameservices","ns1");
|
|
||||||
configuration.set("dfs.ha.namenodes.ns1","nn1,nn2");
|
|
||||||
configuration.set("dfs.namenode.rpc-address.ns1.nn1","192.168.44.85:9000");
|
|
||||||
configuration.set("dfs.namenode.rpc-address.ns1.nn2","192.168.44.86:9000");
|
|
||||||
configuration.set("dfs.client.failover.proxy.provider.ns1","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
|
|
||||||
//创建fileSystem,用于连接hdfs
|
|
||||||
fileSystem = FileSystem.get(configuration);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void mkdir() throws Exception{
|
|
||||||
fileSystem.mkdirs(new Path("/knowledgebase/test"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void create() throws Exception{
|
|
||||||
FSDataOutputStream outputStream = fileSystem.create(new Path("/knowledgebase/test/test.txt"));
|
|
||||||
outputStream.write("Hello World".getBytes());
|
|
||||||
outputStream.flush();
|
|
||||||
outputStream.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void cat() throws Exception{
|
|
||||||
FSDataInputStream inputStream = fileSystem.open(new Path("/knowledgebase/test/test.txt"));
|
|
||||||
IOUtils.copyBytes(inputStream, System.out, 1024);
|
|
||||||
inputStream.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void rename() throws Exception{
|
|
||||||
fileSystem.rename(new Path("/knowledgebase/test/test.txt"), new Path("/knowledgebase/test/test1.txt"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void delete() throws Exception{
|
|
||||||
fileSystem.delete(new Path("/knowledgebase/test"),true);//是否递归删除
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
22
src/test/java/com/zdjizhi/schema/SecurityEventSchema.java
Normal file
22
src/test/java/com/zdjizhi/schema/SecurityEventSchema.java
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package com.zdjizhi.schema;
|
||||||
|
|
||||||
|
import com.zdjizhi.tools.general.ConfigurationsUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public class SecurityEventSchema {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
Properties prop = new Properties();
|
||||||
|
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
|
||||||
|
final HashMap<String, String> securityEventSchema = new HashMap<>();
|
||||||
|
|
||||||
|
for (String key : prop.stringPropertyNames()) {
|
||||||
|
final String schema = prop.getProperty(key);
|
||||||
|
securityEventSchema.put(key,schema);
|
||||||
|
}
|
||||||
|
System.out.println(securityEventSchema);
|
||||||
|
}
|
||||||
|
}
|
||||||
10
src/test/java/com/zdjizhi/schema/Test.java
Normal file
10
src/test/java/com/zdjizhi/schema/Test.java
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
package com.zdjizhi.schema;
|
||||||
|
|
||||||
|
import com.zdjizhi.tools.general.FileEdit;
|
||||||
|
import com.zdjizhi.tools.ordinary.MD5Utils;
|
||||||
|
|
||||||
|
public class Test {
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
System.out.println(FileEdit.getFileId("4856d031-521e-4426-a03a-42ca4bb96028", "_9"));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user