diff --git a/properties/action_definition.properties b/properties/action_definition.properties new file mode 100644 index 0000000..2a3f59e --- /dev/null +++ b/properties/action_definition.properties @@ -0,0 +1,35 @@ +none=0 +Monitor=1 +monitor=1 +Intercept=2 +intercept=2 +NoIntercept=3 +nointercept=3 +ActiveDefence=4 +activedefence=4 +WANNAT=8 +wannat=8 +Reject=16 +reject=16 +Deny=16 +deny=16 +Shaping=32 +shaping=32 +Manipulate=48 +manipulate=48 +ServiceChaining=64 +servicechaining=64 +Allow=96 +allow=96 +Bypass=96 +bypass=96 +Shunt=128 +shunt=128 +Statistics=129 +statistics=129 +redirect=48 +replace=48 +hijack=48 +insert=48 +edit_element=48 +run_script=48 \ No newline at end of file diff --git a/properties/percent_proxy_event.properties b/properties/percent_proxy_event.properties new file mode 100644 index 0000000..6347654 --- /dev/null +++ b/properties/percent_proxy_event.properties @@ -0,0 +1,68 @@ +#session-record +tcp_c2s_ip_fragments=common_c2s_ipfrag_num +tcp_s2c_ip_fragments=common_s2c_ipfrag_num +tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num +tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num +http_response_latency_ms=http_response_latency_ms +http_session_duration_ms=http_session_duration_ms +security_rule_list=security_rule_list +monitor_rule_list=monitor_rule_list +tcp_handshake_latency_ms=common_establish_latency_ms +mail_protocol_type=mail_protocol_type +mail_account=mail_account +mail_password=mail_passwd +mail_from_cmd=mail_from_cmd +mail_to_cmd=mail_to_cmd +mail_from=mail_from +mail_to=mail_to +mail_cc=mail_cc +mail_bcc=mail_bcc +mail_subject=mail_subject +mail_subject_charset=mail_subject_charset +mail_attachment_name=mail_attachment_name +mail_attachment_name_charset=mail_attachment_name_charset +mail_eml_file=mail_eml_file +dns_message_id=dns_message_id +dns_qr=dns_qr +dns_opcode=dns_opcode +dns_aa=dns_aa +dns_tc=dns_tc +dns_rd=dns_rd +dns_ra=dns_ra +dns_rcode=dns_rcode +dns_qdcount=dns_qdcount +dns_ancount=dns_ancount +dns_nscount=dns_nscount +dns_arcount=dns_arcount +dns_qname=dns_qname +dns_qtype=dns_qtype +dns_qclass=dns_qclass +dns_cname=dns_cname +dns_sub=dns_sub +dns_rr=dns_rr +ssl_version=ssl_version +ssl_sni=ssl_sni +ssl_san=ssl_san +ssl_cn=ssl_cn +ssl_handshake_latency_ms=ssl_con_latency_ms +ssl_ja3_hash=ssl_ja3_hash +ssl_cert_issuer=ssl_cert_issuer +ssl_cert_subject=ssl_cert_subject +quic_version=quic_version +quic_sni=quic_sni +quic_user_agent=quic_user_agent +ftp_account=ftp_account +ftp_url=ftp_url +ftp_link_type=ftp_link_type +http_proxy_flag=http_proxy_flag +http_sequence=http_sequence +tcp_client_isn=common_tcp_client_isn +tcp_server_isn=common_tcp_server_isn +sent_pkts=common_c2s_pkt_num +received_pkts=common_s2c_pkt_num +app=common_app_label +out_link_id=common_egress_link_id +in_link_id=common_ingress_link_id +duration_ms=common_con_duration_ms +http_request_line=http_request_line +http_response_line=http_response_line \ No newline at end of file diff --git a/properties/percent_security_event.properties b/properties/percent_security_event.properties new file mode 100644 index 0000000..8d67770 --- /dev/null +++ b/properties/percent_security_event.properties @@ -0,0 +1,4 @@ +tcp_c2s_ip_fragments=common_c2s_ipfrag_num +tcp_s2c_ip_fragments=common_s2c_ipfrag_num +tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num +tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num \ No newline at end of file diff --git a/properties/percent_session_record.properties b/properties/percent_session_record.properties new file mode 100644 index 0000000..1c84db1 --- /dev/null +++ b/properties/percent_session_record.properties @@ -0,0 +1,42 @@ +#security-event +http_request_body=http_request_body +http_response_body=http_response_body +http_response_latency_ms=http_response_latency_ms +http_session_duration_ms=http_session_duration_ms +security_rule_list=security_rule_list +monitor_rule_list=monitor_rule_list +tcp_handshake_latency_ms=common_establish_latency_ms +#proxy-event +http_action_file_size=http_action_file_size +doh_url=doh_url +doh_host=doh_host +doh_cookie=doh_cookie +doh_referer=doh_referer +doh_user_agent=doh_user_agent +doh_version=doh_version +doh_message_id=doh_message_id +doh_qr=doh_qr +doh_opcode=doh_opcode +doh_aa=doh_aa +doh_tc=doh_tc +doh_rd=doh_rd +doh_ra=doh_ra +doh_rcode=doh_rcode +doh_qdcount=doh_qdcount +doh_ancount=doh_ancount +doh_nscount=doh_nscount +doh_arcount=doh_arcount +doh_qname=doh_qname +doh_qtype=doh_qtype +doh_qclass=doh_qclass +doh_cname=doh_cname +doh_sub=doh_sub +doh_rr=doh_rr +proxy_rule_list=proxy_rule_list + + + + + + + diff --git a/properties/proxy_event_mapping_table.properties b/properties/proxy_event_mapping_table.properties deleted file mode 100644 index a504416..0000000 --- a/properties/proxy_event_mapping_table.properties +++ /dev/null @@ -1,68 +0,0 @@ -recv_time=common_recv_time -log_id=common_log_id -start_timestamp_ms=common_start_timestamp_ms -end_timestamp_ms=common_end_timestamp_ms -processing_time=common_processing_time -device_id=common_device_id -data_center=common_data_center -sled_ip=common_sled_ip -device_tag=common_device_tag -client_ip=common_client_ip -client_port=common_client_port -client_asn=common_client_asn -subscriber_id=common_subscriber_id -imei=common_imei -imsi=common_imsi -phone_number=common_phone_number -server_ip=common_server_ip -server_port=common_server_port -server_asn=common_server_asn -address_type=common_address_type -http_url=http_url -http_host=http_host -http_request_line=http_request_line -http_response_line=http_response_line -http_request_body=http_request_body -http_response_body=http_response_body -http_cookie=http_cookie -http_referer=http_referer -http_user_agent=http_user_agent -http_request_content_length=http_request_content_length -http_request_content_type=http_request_content_type -http_response_content_length=http_response_content_length -http_response_content_type=http_response_content_type -http_set_cookie=http_set_cookie -http_version=http_version -http_action_file_size=http_action_file_size -doh_url=doh_url -doh_host=doh_host -doh_cookie=doh_cookie -doh_referer=doh_referer -doh_user_agent=doh_user_agent -doh_version=doh_version -doh_message_id=doh_message_id -doh_qr=doh_qr -doh_opcode=doh_opcode -doh_aa=doh_aa -doh_tc=doh_tc -doh_rd=doh_rd -doh_ra=doh_ra -doh_rcode=doh_rcode -doh_qdcount=doh_qdcount -doh_ancount=doh_ancount -doh_nscount=doh_nscount -doh_arcount=doh_arcount -doh_qname=doh_qname -doh_qtype=doh_qtype -doh_qclass=doh_qclass -doh_cname=doh_cname -doh_sub=doh_sub -doh_rr=doh_rr -client_geolocation=common_client_location -server_geolocation=common_server_location -ip_protocol=common_l4_protocol -sent_bytes=common_c2s_byte_num -received_bytes=common_s2c_byte_num -decoded_as=common_schema_type -proxy_rule_list=proxy_rule_list -session_id=common_stream_trace_id \ No newline at end of file diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 125e092..3afe10d 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -#source.kafka.servers=192.168.44.12:9094 -source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 +source.kafka.servers=192.168.44.12:9094 +#source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #百分点输出kafka地址 percent.sink.kafka.servers=192.168.44.12:9094 #文件源数据topic输出kafka地址 @@ -24,14 +24,16 @@ nacos.server=192.168.44.12:8848 nacos.schema.namespace=P19 #schema data id名称 -nacos.schema.data.id=proxy_event.json +nacos.schema.data.id=session_record.json #--------------------------------Kafka消费/生产配置------------------------------# #kafka 接收数据topic source.kafka.topic=SESSION-RECORD -sink.percent.kafka.topic=PERCENT-RECORD +sink.percent.kafka.topic.session=PERCENT-SESSION-RECORD +sink.percent.kafka.topic.security=PERCENT-SECURITY-RECORD +sink.percent.kafka.topic.proxy=PERCENT-POLICY-RECORD sink.file.data.kafka.topic=test-file-data #读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据; @@ -47,17 +49,13 @@ transform.parallelism=1 deal.file.parallelism=1 sink.file.data.parallelism=1 sink.percent.parallelism=1 - #数据中心,取值范围(0-31) data.center.id.num=0 - #hbase 更新时间,如填写0则不更新缓存 hbase.tick.tuple.freq.secs=180 - #--------------------------------默认值配置------------------------------# #生产者压缩模式 none or snappy producer.kafka.compression.type=snappy - #------------------------------------OOS配置------------------------------------# #oos地址 oos.servers=10.3.45.100:8057 @@ -65,15 +63,14 @@ oos.servers=10.3.45.100:8057 prometheus.pushgateway.address=192.168.44.12:9091 pushgateway.statistics.time=300 deal.file.statistics.time=60 - #------------------------------------knowledge配置------------------------------------# knowledge.execution.minutes=600 -knowledge.base.uri=http://192.168.44.67:9999 +knowledge.base.uri=http://192.168.44.12:9999 knowledge.base.path=/v1/knowledge_base ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289 ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9 asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea -hos.url=http://192.168.44.67:9098/hos/traffic_file_bucket/ +hos.url=http://192.168.44.12:9098/hos/traffic_file_bucket/ diff --git a/properties/security_event_mapping_table.properties b/properties/session_record_mapping_table.properties similarity index 79% rename from properties/security_event_mapping_table.properties rename to properties/session_record_mapping_table.properties index aab8e11..6c5f9a5 100644 --- a/properties/security_event_mapping_table.properties +++ b/properties/session_record_mapping_table.properties @@ -19,10 +19,22 @@ server_ip=common_server_ip server_port=common_server_port server_asn=common_server_asn address_type=common_address_type +out_link_id=common_egress_link_id +in_link_id=common_ingress_link_id +client_geolocation=common_client_location +server_geolocation=common_server_location +app=common_app_label +ip_protocol=common_l4_protocol +sent_pkts=common_c2s_pkt_num +received_pkts=common_s2c_pkt_num +sent_bytes=common_c2s_byte_num +received_bytes=common_s2c_byte_num +tcp_client_isn=common_tcp_client_isn +tcp_server_isn=common_tcp_server_isn +decoded_as=common_schema_type +session_id=common_stream_trace_id http_url=http_url http_host=http_host -http_request_body=http_request_body -http_response_body=http_response_body http_proxy_flag=http_proxy_flag http_sequence=http_sequence http_cookie=http_cookie @@ -34,8 +46,6 @@ http_response_content_length=http_response_content_length http_response_content_type=http_response_content_type http_set_cookie=http_set_cookie http_version=http_version -http_response_latency_ms=http_response_latency_ms -http_session_duration_ms=http_session_duration_ms mail_protocol_type=mail_protocol_type mail_account=mail_account mail_password=mail_passwd @@ -68,26 +78,6 @@ dns_qclass=dns_qclass dns_cname=dns_cname dns_sub=dns_sub dns_rr=dns_rr -quic_version=quic_version -quic_sni=quic_sni -quic_user_agent=quic_user_agent -ftp_account=ftp_account -ftp_url=ftp_url -ftp_link_type=ftp_link_type -out_link_id=common_egress_link_id -in_link_id=common_ingress_link_id -client_geolocation=common_client_location -server_geolocation=common_server_location -app=common_app_label -ip_protocol=common_l4_protocol -sent_pkts=common_c2s_pkt_num -received_pkts=common_s2c_pkt_num -sent_bytes=common_c2s_byte_num -received_bytes=common_s2c_byte_num -tcp_client_isn=common_tcp_client_isn -tcp_server_isn=common_tcp_server_isn -decoded_as=common_schema_type -session_id=common_stream_trace_id ssl_version=ssl_version ssl_sni=ssl_sni ssl_san=ssl_san @@ -96,6 +86,49 @@ ssl_handshake_latency_ms=ssl_con_latency_ms ssl_ja3_hash=ssl_ja3_hash ssl_cert_issuer=ssl_cert_issuer ssl_cert_subject=ssl_cert_subject +quic_version=quic_version +quic_sni=quic_sni +quic_user_agent=quic_user_agent +ftp_account=ftp_account +ftp_url=ftp_url +ftp_link_type=ftp_link_type +#security-event +http_request_body=http_request_body +http_response_body=http_response_body +http_response_latency_ms=http_response_latency_ms +http_session_duration_ms=http_session_duration_ms security_rule_list=security_rule_list monitor_rule_list=monitor_rule_list tcp_handshake_latency_ms=common_establish_latency_ms +#proxy-event +http_action_file_size=http_action_file_size +doh_url=doh_url +doh_host=doh_host +doh_cookie=doh_cookie +doh_referer=doh_referer +doh_user_agent=doh_user_agent +doh_version=doh_version +doh_message_id=doh_message_id +doh_qr=doh_qr +doh_opcode=doh_opcode +doh_aa=doh_aa +doh_tc=doh_tc +doh_rd=doh_rd +doh_ra=doh_ra +doh_rcode=doh_rcode +doh_qdcount=doh_qdcount +doh_ancount=doh_ancount +doh_nscount=doh_nscount +doh_arcount=doh_arcount +doh_qname=doh_qname +doh_qtype=doh_qtype +doh_qclass=doh_qclass +doh_cname=doh_cname +doh_sub=doh_sub +doh_rr=doh_rr +proxy_rule_list=proxy_rule_list +#session-record +tcp_c2s_ip_fragments=common_c2s_ipfrag_num +tcp_s2c_ip_fragments=common_s2c_ipfrag_num +tcp_c2s_o3_pkts=common_c2s_tcp_unorder_num +tcp_s2c_o3_pkts=common_s2c_tcp_unorder_num \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 8ec12a6..cd3ca43 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -134,6 +134,11 @@ public class FlowWriteConfig { public static final String SINK_PERCENT_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic"); public static final String SINK_FILE_DATA_SINK_KAFKA_TOPIC = ConfigurationsUtils.getStringProperty(propService, "sink.file.data.kafka.topic"); + + public static final String SINK_PERCENT_KAFKA_TOPIC_SESSION = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.session"); + public static final String SINK_PERCENT_KAFKA_TOPIC_SECURITY = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.security"); + public static final String SINK_PERCENT_KAFKA_TOPIC_PROXY = ConfigurationsUtils.getStringProperty(propService, "sink.percent.kafka.topic.proxy"); + /** * connection kafka */ diff --git a/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java b/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java index 06543e4..009345f 100644 --- a/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java +++ b/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java @@ -6,7 +6,7 @@ import com.zdjizhi.common.FlowWriteConfig; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import io.prometheus.client.exporter.PushGateway; -import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; @@ -15,13 +15,15 @@ import java.io.IOException; import java.util.Timer; import java.util.TimerTask; -public class SendCountProcess extends ProcessFunction, String> { +public class SendCountProcess extends ProcessFunction, String> { private static final Log logger = LogFactory.get(); private long recordCount = 0L; private long failedCount = 0L; private long httpRequestCount = 0L; private long httpResponseCount = 0L; private long mailEmlCount = 0L; + private long securityCount = 0L; + private long proxyCount = 0L; static final Gauge recordCountsGauge = Gauge.build() @@ -37,6 +39,10 @@ public class SendCountProcess extends ProcessFunction value, Context ctx, Collector out) { + public void processElement(Tuple7 value, Context ctx, Collector out) { try { recordCount = recordCount + value.f0; failedCount = failedCount + value.f1; httpRequestCount = httpRequestCount + value.f2; httpResponseCount = httpResponseCount + value.f3; mailEmlCount = mailEmlCount + value.f4; + securityCount = securityCount + value.f5; + proxyCount = proxyCount + value.f6; } catch (Exception e) { logger.error("统计指标处理失败,原因为" + e); diff --git a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java index 9bbf95f..f88aeb1 100644 --- a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java +++ b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java @@ -2,6 +2,7 @@ package com.zdjizhi.operator.map; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.tools.general.ConfigurationsUtils; @@ -12,6 +13,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import java.util.HashMap; import java.util.Properties; @@ -23,17 +25,27 @@ public class TypeMapCompleted extends ProcessFunction { private static final Log logger = LogFactory.get(); private ConvertRecordToPERCENT convertRecordToPERCENT; Properties Prop = new Properties(); + Properties actionProp = new Properties(); + private HashMap actionMap = new HashMap(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); try { - if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) { - Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties")); - } else if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) { - Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties")); - } + Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("session_record_mapping_table.properties")); convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop); + logger.info("session_record_mapping_table.properties日志加载成功"); + } catch (Exception e) { + logger.error("session_record_mapping_table.properties日志加载失败,失败原因为:" + e); + } + + try { + actionProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("action_definition.properties")); + for (String key : actionProp.stringPropertyNames()) { + final String action = actionProp.getProperty(key); + actionMap.put(key, Integer.valueOf(action)); + } + logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功"); } catch (Exception e) { logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e); @@ -44,22 +56,27 @@ public class TypeMapCompleted extends ProcessFunction { public void processElement(String message, ProcessFunction.Context ctx, Collector out) { try { JSONObject record = JSONObject.parseObject(message); - JSONObject jsonObject = null; - if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) { - if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) { - jsonObject = convertRecordToPERCENT.convertToPERCENT(record); - } - } - - if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) { - if (record.containsKey("proxy_rule_list")) { - jsonObject = convertRecordToPERCENT.convertToPERCENT(record); - } - } + JSONObject jsonObject = convertRecordToPERCENT.convertToPERCENT(record); if (jsonObject != null) { - jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); + if (record.containsKey("security_rule_list")) { + jsonObject.put("common_policy_id", JSONArray.from(record.get("security_rule_list")).get(0)); + jsonObject.put("common_action", actionMap.get(record.get("security_action").toString().replace(" ",""))); + } + + if (record.containsKey("monitor_rule_list")) { + jsonObject.put("common_policy_id", JSONArray.from(record.get("monitor_rule_list")).get(0)); + jsonObject.put("common_action", 1); + } + + if (record.containsKey("proxy_rule_list")) { + jsonObject.put("common_policy_id", JSONArray.from(record.get("proxy_rule_list")).get(0)); + jsonObject.put("common_action", actionMap.get(record.get("proxy_action").toString().replace(" ",""))); + } + + + jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); TransForm.transformLog(jsonObject); MetaUtil.typeTransform(jsonObject); out.collect(jsonObject); diff --git a/src/main/java/com/zdjizhi/operator/percent/PercentProxyProcess.java b/src/main/java/com/zdjizhi/operator/percent/PercentProxyProcess.java new file mode 100644 index 0000000..d100f2b --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/percent/PercentProxyProcess.java @@ -0,0 +1,40 @@ +package com.zdjizhi.operator.percent; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; +import com.zdjizhi.tools.general.ConfigurationsUtils; +import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.Properties; + + +public class PercentProxyProcess extends ProcessFunction { + private static final Log logger = LogFactory.get(); + Properties prop = new Properties(); + private ConvertRecordToPERCENT convertRecordToPERCENT; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + try { + prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_proxy_event.properties")); + convertRecordToPERCENT = new ConvertRecordToPERCENT(prop); + logger.info("percent_proxy_event.properties日志加载成功"); + } catch (Exception e) { + logger.error("percent_proxy_event.properties日志加载失败,失败原因为:" + e); + } + } + + @Override + public void processElement(JSONObject value, Context ctx, Collector out) { + try { + out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value))); + } catch (Exception e) { + logger.error("删减proxy_event日志字段失败,失败原因为:{},数据为:{}", e, value); + } + } +} diff --git a/src/main/java/com/zdjizhi/operator/percent/PercentSecurityProcess.java b/src/main/java/com/zdjizhi/operator/percent/PercentSecurityProcess.java new file mode 100644 index 0000000..632c1d8 --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/percent/PercentSecurityProcess.java @@ -0,0 +1,42 @@ +package com.zdjizhi.operator.percent; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; + +import com.alibaba.fastjson2.JSONObject; +import com.zdjizhi.tools.general.ConfigurationsUtils; +import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.Properties; + + +public class PercentSecurityProcess extends ProcessFunction { + private static final Log logger = LogFactory.get(); + Properties prop = new Properties(); + private ConvertRecordToPERCENT convertRecordToPERCENT; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + try { + prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_security_event.properties")); + convertRecordToPERCENT = new ConvertRecordToPERCENT(prop); + logger.info("percent_security_event.properties日志加载成功"); + } catch (Exception e) { + logger.error("percent_security_event.properties日志加载失败,失败原因为:" + e); + } + + } + + @Override + public void processElement(JSONObject value, Context ctx, Collector out) { + try { + out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value))); + } catch (Exception e) { + logger.error("删减security_event日志字段失败,失败原因为:{},数据为:{}", e, value); + } + } +} diff --git a/src/main/java/com/zdjizhi/operator/percent/PercentSessionProcess.java b/src/main/java/com/zdjizhi/operator/percent/PercentSessionProcess.java new file mode 100644 index 0000000..6606e9a --- /dev/null +++ b/src/main/java/com/zdjizhi/operator/percent/PercentSessionProcess.java @@ -0,0 +1,41 @@ +package com.zdjizhi.operator.percent; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson2.JSONObject; +import com.zdjizhi.tools.general.ConfigurationsUtils; +import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.Properties; + + +public class PercentSessionProcess extends ProcessFunction { + private static final Log logger = LogFactory.get(); + Properties prop = new Properties(); + private ConvertRecordToPERCENT convertRecordToPERCENT; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + try { + prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("percent_session_record.properties")); + convertRecordToPERCENT = new ConvertRecordToPERCENT(prop); + logger.info("percent_session_record.properties日志加载成功"); + } catch (Exception e) { + logger.error("percent_session_record.properties日志加载失败,失败原因为:" + e); + } + + } + + @Override + public void processElement(JSONObject value, Context ctx, Collector out) { + try { + out.collect(JSONObject.toJSONString(convertRecordToPERCENT.removeFields(value))); + } catch (Exception e) { + logger.error("删减percent_session日志字段失败,失败原因为:{},数据为:{}", e, value); + } + } +} diff --git a/src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java similarity index 82% rename from src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java rename to src/main/java/com/zdjizhi/operator/process/DealFileProcess.java index 7cad645..d893c78 100644 --- a/src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java +++ b/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java @@ -11,6 +11,7 @@ import com.zdjizhi.common.pojo.FileMeta; import com.zdjizhi.common.pojo.SourceList; import com.zdjizhi.tools.general.FileEdit; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; @@ -26,10 +27,16 @@ import java.util.TimerTask; * @Description: * @date 2023/0928 */ -public class DealFileProcessFunction extends ProcessFunction { +public class DealFileProcess extends ProcessFunction { private static final Log logger = LogFactory.get(); - public static final OutputTag> dealFileMetircTag = new OutputTag>("DealFileMetircTag") { + public static final OutputTag> dealFileMetircTag = new OutputTag>("DealFileMetircTag") { + }; + + + public static final OutputTag percentSecurityTag = new OutputTag("percentSecurityTag") { + }; + public static final OutputTag percentProxyTag = new OutputTag("percentProxyTag") { }; private String rpUrlValue; @@ -53,6 +60,8 @@ public class DealFileProcessFunction extends ProcessFunction private long httpResponseCount = 0L; private long mailEmlCount = 0L; private boolean metricSendFlag = true; + private long securityCount = 0L; + private long proxyCount = 0L; //初始化侧输流的标记 @@ -77,18 +86,20 @@ public class DealFileProcessFunction extends ProcessFunction @SuppressWarnings("unchecked") @Override - public void processElement(JSONObject message, Context context, Collector collector) throws Exception { + public void processElement(JSONObject message, Context context, Collector collector) throws Exception { try { //定时向下游推送指标 if (metricSendFlag) { metricSendFlag = false; if (recordCount > 0 || failedCount > 0 || httpRequestCount > 0 || httpResponseCount > 0 || mailEmlCount > 0) { - context.output(dealFileMetircTag, Tuple5.of(recordCount, failedCount, httpRequestCount, httpResponseCount, mailEmlCount)); + context.output(dealFileMetircTag, Tuple7.of(recordCount, failedCount, httpRequestCount, httpResponseCount, mailEmlCount, securityCount, proxyCount)); recordCount = 0L; failedCount = 0L; httpRequestCount = 0; httpResponseCount = 0; mailEmlCount = 0L; + securityCount = 0L; + proxyCount = 0L; } } recordCount++; @@ -112,7 +123,7 @@ public class DealFileProcessFunction extends ProcessFunction String fileId = FileEdit.getFileId(rqUrlValue, "_1"); message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId)); SourceList request = new SourceList(); - request.setSource_oss_path(FlowWriteConfig.HOS_URL+rqUrlValue); + request.setSource_oss_path(FlowWriteConfig.HOS_URL + rqUrlValue); request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); jsonarray.add(request); httpRequestCount++; @@ -122,7 +133,7 @@ public class DealFileProcessFunction extends ProcessFunction String fileId = FileEdit.getFileId(rpUrlValue, "_2"); message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId)); SourceList response = new SourceList(); - response.setSource_oss_path(FlowWriteConfig.HOS_URL+rpUrlValue); + response.setSource_oss_path(FlowWriteConfig.HOS_URL + rpUrlValue); response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); jsonarray.add(response); httpResponseCount++; @@ -131,7 +142,7 @@ public class DealFileProcessFunction extends ProcessFunction String fileId = FileEdit.getFileId(emailUrlValue, "_9"); message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId)); SourceList emailFile = new SourceList(); - emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL+emailUrlValue); + emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL + emailUrlValue); emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); jsonarray.add(emailFile); mailEmlCount++; @@ -143,7 +154,15 @@ public class DealFileProcessFunction extends ProcessFunction fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000)); context.output(metaToKafa, JSONObject.toJSONString(fileMeta)); } - collector.collect(JSONObject.toJSONString(message)); + + if (message.containsKey("security_rule_list") || message.containsKey("monitor_rule_list")) { + context.output(percentSecurityTag, message); + securityCount++; + } else if (message.containsKey("proxy_rule_list")) { + context.output(percentProxyTag, message); + proxyCount++; + } + collector.collect(message); } } catch (RuntimeException e) { logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message); diff --git a/src/main/java/com/zdjizhi/tools/connections/kafka/KafkaProducer.java b/src/main/java/com/zdjizhi/tools/connections/kafka/KafkaProducer.java index c8a4d35..08392e8 100644 --- a/src/main/java/com/zdjizhi/tools/connections/kafka/KafkaProducer.java +++ b/src/main/java/com/zdjizhi/tools/connections/kafka/KafkaProducer.java @@ -30,9 +30,9 @@ public class KafkaProducer { return properties; } - public static FlinkKafkaProducer getPercentKafkaProducer() { + public static FlinkKafkaProducer getPercentKafkaProducer(String kafkaTopic) { FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( - FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC, + kafkaTopic, new SimpleStringSchema(), createProducerConfig(FlowWriteConfig.PERCENT_SINK_KAFKA_SERVERS), Optional.empty()); diff --git a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java index e6dc4c9..439f77d 100644 --- a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java +++ b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java @@ -6,14 +6,14 @@ import com.alibaba.fastjson2.JSONObject; import java.util.*; public class ConvertRecordToPERCENT { - private Properties securityProp; + private Properties prop; private HashMap recordSchema; - public ConvertRecordToPERCENT(Properties securityProp) { - this.securityProp = securityProp; + public ConvertRecordToPERCENT(Properties prop) { + this.prop = prop; final HashMap schemaMap = new HashMap(); - for (String key : securityProp.stringPropertyNames()) { - final String schema = securityProp.getProperty(key); + for (String key : prop.stringPropertyNames()) { + final String schema = prop.getProperty(key); schemaMap.put(key, schema); } this.recordSchema = schemaMap; @@ -27,6 +27,12 @@ public class ConvertRecordToPERCENT { } } + + //填充common_start_time、common_end_time + percent.put("common_start_time", (long) record.get("start_timestamp_ms") / 1000); + percent.put("common_end_time", (long) record.get("end_timestamp_ms") / 1000); + + if (record.containsKey("security_rule_list")) { percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0)); percent.put("common_action", fillingCommonAction((String) record.get("security_action"))); @@ -37,20 +43,19 @@ public class ConvertRecordToPERCENT { percent.put("common_action", 1); } - if (record.containsKey("proxy_rule_list")){ + if (record.containsKey("proxy_rule_list")) { percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0)); percent.put("common_action", fillingCommonAction((String) record.get("proxy_action"))); } - //填充common_start_time、common_end_time - percent.put("common_start_time",record.get("start_timestamp_ms")); - percent.put("common_end_time",record.get("end_timestamp_ms")); + //填充common_sessions + percent.put("common_sessions", 1); //填充common_internal_ip、common_external_ip、common_direction、common_stream_dir if (record.containsKey("flags")) { final int flags = (int) record.get("flags"); if (flags > 0) { - if ((8L & flags) == 8L && (16L & flags) != 16L) { + if ((8L & flags) == 8L && (16L & flags) != 16L) { percent.put("common_internal_ip", record.get("common_client_ip")); percent.put("common_external_ip", record.get("common_server_ip")); percent.put("common_direction", 69); @@ -59,6 +64,7 @@ public class ConvertRecordToPERCENT { percent.put("common_external_ip", record.get("common_client_ip")); percent.put("common_direction", 73); } + if ((8192L & flags) == 8192L && (16384L & flags) == 16384L) { percent.put("common_stream_dir", 3); } else if ((8192L & flags) == 8192L) { @@ -71,17 +77,18 @@ public class ConvertRecordToPERCENT { return percent; } - private int fillingCommonAction(String action) { - int number = 0; + int number; switch (action) { case "none": number = 0; break; case "Monitor": + case "monitor": number = 1; break; case "Intercept": + case "intercept": number = 2; break; case "No Intercept": @@ -122,4 +129,13 @@ public class ConvertRecordToPERCENT { return number; } + public JSONObject removeFields(JSONObject record) { + for (Map.Entry entry : recordSchema.entrySet()) { + if (record.containsKey(entry.getValue())) { + record.remove(entry.getValue()); + } + } + return record; + } + } diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index e473df6..7dbf5de 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -7,13 +7,16 @@ import com.alibaba.fastjson2.JSONObject; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.operator.count.SendCountProcess; import com.zdjizhi.operator.map.MapCompleted; +import com.zdjizhi.operator.percent.PercentProxyProcess; +import com.zdjizhi.operator.percent.PercentSecurityProcess; import com.zdjizhi.operator.map.TypeMapCompleted; -import com.zdjizhi.operator.process.DealFileProcessFunction; +import com.zdjizhi.operator.percent.PercentSessionProcess; +import com.zdjizhi.operator.process.DealFileProcess; import com.zdjizhi.tools.connections.kafka.KafkaConsumer; import com.zdjizhi.tools.connections.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - +import org.apache.flink.table.descriptors.Kafka; public class LogFlowWriteTopology { @@ -37,12 +40,21 @@ public class LogFlowWriteTopology { } //处理带有非结构化文件字段的数据 - SingleOutputStreamOperator dealFileProcessFunction = completedStream.process(new DealFileProcessFunction()).name("DealFileProcessFunction").uid("DealFile-ProcessFunction").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM); + SingleOutputStreamOperator dealFileProcessFunction = completedStream.process(new DealFileProcess()).name("DealFileProcess").uid("DealFile-Process").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM); //补全后的数据发送至百分点的kafka - dealFileProcessFunction.addSink(KafkaProducer.getPercentKafkaProducer()).name("ToPercentKafka").uid("To-Percent-Kafka").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); + final SingleOutputStreamOperator percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(1); + + final SingleOutputStreamOperator percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(1); + + final SingleOutputStreamOperator percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(1); + + percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); + percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); + percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); + //文件元数据发送至TRAFFIC-FILE-METADATA - dealFileProcessFunction.getSideOutput(DealFileProcessFunction.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM); - dealFileProcessFunction.getSideOutput(DealFileProcessFunction.dealFileMetircTag).process(new SendCountProcess()).name("SendCountProcess").uid("Send-Count-Process").setParallelism(1); + dealFileProcessFunction.getSideOutput(DealFileProcess.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM); + dealFileProcessFunction.getSideOutput(DealFileProcess.dealFileMetircTag).process(new SendCountProcess()).name("SendCountProcess").uid("Send-Count-Process").setParallelism(1); try { environment.execute(args[0]); } catch (Exception e) {