diff --git a/properties/proxy_event_mapping_table.properties b/properties/proxy_event_mapping_table.properties new file mode 100644 index 0000000..a504416 --- /dev/null +++ b/properties/proxy_event_mapping_table.properties @@ -0,0 +1,68 @@ +recv_time=common_recv_time +log_id=common_log_id +start_timestamp_ms=common_start_timestamp_ms +end_timestamp_ms=common_end_timestamp_ms +processing_time=common_processing_time +device_id=common_device_id +data_center=common_data_center +sled_ip=common_sled_ip +device_tag=common_device_tag +client_ip=common_client_ip +client_port=common_client_port +client_asn=common_client_asn +subscriber_id=common_subscriber_id +imei=common_imei +imsi=common_imsi +phone_number=common_phone_number +server_ip=common_server_ip +server_port=common_server_port +server_asn=common_server_asn +address_type=common_address_type +http_url=http_url +http_host=http_host +http_request_line=http_request_line +http_response_line=http_response_line +http_request_body=http_request_body +http_response_body=http_response_body +http_cookie=http_cookie +http_referer=http_referer +http_user_agent=http_user_agent +http_request_content_length=http_request_content_length +http_request_content_type=http_request_content_type +http_response_content_length=http_response_content_length +http_response_content_type=http_response_content_type +http_set_cookie=http_set_cookie +http_version=http_version +http_action_file_size=http_action_file_size +doh_url=doh_url +doh_host=doh_host +doh_cookie=doh_cookie +doh_referer=doh_referer +doh_user_agent=doh_user_agent +doh_version=doh_version +doh_message_id=doh_message_id +doh_qr=doh_qr +doh_opcode=doh_opcode +doh_aa=doh_aa +doh_tc=doh_tc +doh_rd=doh_rd +doh_ra=doh_ra +doh_rcode=doh_rcode +doh_qdcount=doh_qdcount +doh_ancount=doh_ancount +doh_nscount=doh_nscount +doh_arcount=doh_arcount +doh_qname=doh_qname +doh_qtype=doh_qtype +doh_qclass=doh_qclass +doh_cname=doh_cname +doh_sub=doh_sub +doh_rr=doh_rr +client_geolocation=common_client_location +server_geolocation=common_server_location +ip_protocol=common_l4_protocol +sent_bytes=common_c2s_byte_num +received_bytes=common_s2c_byte_num +decoded_as=common_schema_type +proxy_rule_list=proxy_rule_list +session_id=common_stream_trace_id \ No newline at end of file diff --git a/properties/security_event_mapping_table.properties b/properties/security_event_mapping_table.properties new file mode 100644 index 0000000..aab8e11 --- /dev/null +++ b/properties/security_event_mapping_table.properties @@ -0,0 +1,101 @@ +recv_time=common_recv_time +log_id=common_log_id +start_timestamp_ms=common_start_timestamp_ms +end_timestamp_ms=common_end_timestamp_ms +duration_ms=common_con_duration_ms +processing_time=common_processing_time +device_id=common_device_id +data_center=common_data_center +sled_ip=common_sled_ip +device_tag=common_device_tag +client_ip=common_client_ip +client_port=common_client_port +client_asn=common_client_asn +subscriber_id=common_subscriber_id +imei=common_imei +imsi=common_imsi +phone_number=common_phone_number +server_ip=common_server_ip +server_port=common_server_port +server_asn=common_server_asn +address_type=common_address_type +http_url=http_url +http_host=http_host +http_request_body=http_request_body +http_response_body=http_response_body +http_proxy_flag=http_proxy_flag +http_sequence=http_sequence +http_cookie=http_cookie +http_referer=http_referer +http_user_agent=http_user_agent +http_request_content_length=http_request_content_length +http_request_content_type=http_request_content_type +http_response_content_length=http_response_content_length +http_response_content_type=http_response_content_type +http_set_cookie=http_set_cookie +http_version=http_version +http_response_latency_ms=http_response_latency_ms +http_session_duration_ms=http_session_duration_ms +mail_protocol_type=mail_protocol_type +mail_account=mail_account +mail_password=mail_passwd +mail_from_cmd=mail_from_cmd +mail_to_cmd=mail_to_cmd +mail_from=mail_from +mail_to=mail_to +mail_cc=mail_cc +mail_bcc=mail_bcc +mail_subject=mail_subject +mail_subject_charset=mail_subject_charset +mail_attachment_name=mail_attachment_name +mail_attachment_name_charset=mail_attachment_name_charset +mail_eml_file=mail_eml_file +dns_message_id=dns_message_id +dns_qr=dns_qr +dns_opcode=dns_opcode +dns_aa=dns_aa +dns_tc=dns_tc +dns_rd=dns_rd +dns_ra=dns_ra +dns_rcode=dns_rcode +dns_qdcount=dns_qdcount +dns_ancount=dns_ancount +dns_nscount=dns_nscount +dns_arcount=dns_arcount +dns_qname=dns_qname +dns_qtype=dns_qtype +dns_qclass=dns_qclass +dns_cname=dns_cname +dns_sub=dns_sub +dns_rr=dns_rr +quic_version=quic_version +quic_sni=quic_sni +quic_user_agent=quic_user_agent +ftp_account=ftp_account +ftp_url=ftp_url +ftp_link_type=ftp_link_type +out_link_id=common_egress_link_id +in_link_id=common_ingress_link_id +client_geolocation=common_client_location +server_geolocation=common_server_location +app=common_app_label +ip_protocol=common_l4_protocol +sent_pkts=common_c2s_pkt_num +received_pkts=common_s2c_pkt_num +sent_bytes=common_c2s_byte_num +received_bytes=common_s2c_byte_num +tcp_client_isn=common_tcp_client_isn +tcp_server_isn=common_tcp_server_isn +decoded_as=common_schema_type +session_id=common_stream_trace_id +ssl_version=ssl_version +ssl_sni=ssl_sni +ssl_san=ssl_san +ssl_cn=ssl_cn +ssl_handshake_latency_ms=ssl_con_latency_ms +ssl_ja3_hash=ssl_ja3_hash +ssl_cert_issuer=ssl_cert_issuer +ssl_cert_subject=ssl_cert_subject +security_rule_list=security_rule_list +monitor_rule_list=monitor_rule_list +tcp_handshake_latency_ms=common_establish_latency_ms diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index d212526..125e092 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=192.168.44.12:9094 - +#source.kafka.servers=192.168.44.12:9094 +source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094 #百分点输出kafka地址 percent.sink.kafka.servers=192.168.44.12:9094 #文件源数据topic输出kafka地址 @@ -18,17 +18,17 @@ tools.library=D:\\workerspace\\dat\\ #--------------------------------nacos配置------------------------------# #nacos 地址 -nacos.server=192.168.44.67:8848 +nacos.server=192.168.44.12:8848 #schema namespace名称 -nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb +nacos.schema.namespace=P19 #schema data id名称 -nacos.schema.data.id=session_record.json +nacos.schema.data.id=proxy_event.json #--------------------------------Kafka消费/生产配置------------------------------# #kafka 接收数据topic -source.kafka.topic=test +source.kafka.topic=SESSION-RECORD sink.percent.kafka.topic=PERCENT-RECORD @@ -64,4 +64,16 @@ oos.servers=10.3.45.100:8057 #prometheus-httpserver prometheus.pushgateway.address=192.168.44.12:9091 pushgateway.statistics.time=300 -deal.file.statistics.time=60 \ No newline at end of file +deal.file.statistics.time=60 + +#------------------------------------knowledge配置------------------------------------# +knowledge.execution.minutes=600 +knowledge.base.uri=http://192.168.44.67:9999 +knowledge.base.path=/v1/knowledge_base +ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289 +ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9 +asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea +hos.url=http://192.168.44.67:9098/hos/traffic_file_bucket/ + + + diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index 84db558..8ec12a6 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -161,4 +161,18 @@ public class FlowWriteConfig { public static final Integer PUSHGATEWAY_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "pushgateway.statistics.time"); public static final Integer DEAL_FILE_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "deal.file.statistics.time"); + + public static final String KNOWLEDGE_BASE_URL = ConfigurationsUtils.getStringProperty(propService, "knowledge.base.uri"); + public static final String KNOWLEDGE_BASE_PATH = ConfigurationsUtils.getStringProperty(propService, "knowledge.base.path"); + + + public static final String ASN_BUILTIN_KD_ID = ConfigurationsUtils.getStringProperty(propService, "asn.builtin.kd.id"); + public static final String IP_BUILTIN_KD_ID = ConfigurationsUtils.getStringProperty(propService, "ip.builtin.kd.id"); + public static final String IP_USER_DEFINED_KD_ID = ConfigurationsUtils.getStringProperty(propService, "ip.user.defined.kd.id"); + public static final String HOS_URL = ConfigurationsUtils.getStringProperty(propService, "hos.url"); + + + + + public static final Long KNOWLEDGE_EXECUTION_MINUTES = ConfigurationsUtils.getLongProperty(propService,"knowledge.execution.minutes"); } \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/operator/map/MapCompleted.java b/src/main/java/com/zdjizhi/operator/map/MapCompleted.java index 8a03915..aafc7d5 100644 --- a/src/main/java/com/zdjizhi/operator/map/MapCompleted.java +++ b/src/main/java/com/zdjizhi/operator/map/MapCompleted.java @@ -3,43 +3,63 @@ package com.zdjizhi.operator.map; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; -import com.geedgenetworks.utils.IpLookupV2; -import com.zdjizhi.tools.general.IpLookupUtils; + +import com.zdjizhi.tools.general.ConfigurationsUtils; +import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT; import com.zdjizhi.tools.transform.TransForm; import com.zdjizhi.tools.json.MetaUtil; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; + import org.apache.flink.util.Collector; -import java.util.Map; +import java.util.Properties; /** * @author qidaijie * @version 2021/5/27 15:01 */ -public class MapCompleted extends ProcessFunction { +public class MapCompleted extends ProcessFunction { private static final Log logger = LogFactory.get(); + private ConvertRecordToPERCENT securityEvnetConvert; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + try { + Properties securityProp = new Properties(); + securityProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties")); + securityEvnetConvert = new ConvertRecordToPERCENT(securityProp); + logger.info("ecurity_event日志Schema加载成功"); + } catch (Exception e) { + logger.error("security_event日志Schema加载失败,失败原因为:" + e); + } + } @Override - public void processElement(String message, ProcessFunction.Context ctx, Collector out) throws Exception { + public void processElement(String message, ProcessFunction.Context ctx, Collector out) { try { - JSONObject jsonObject = JSONObject.parseObject(message); - jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); + JSONObject record = JSONObject.parseObject(message); - MetaUtil.dropJsonField(jsonObject); - TransForm.transformLog(jsonObject); - out.collect(jsonObject); + JSONObject jsonObject = null; + if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) { + jsonObject = securityEvnetConvert.convertToPERCENT(record); + } + + if (jsonObject != null) { + jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); + + MetaUtil.dropJsonField(jsonObject); + TransForm.transformLog(jsonObject); + out.collect(jsonObject); + } } catch (RuntimeException e) { logger.error("TransForm log failed ( The field type is not verified ),The exception is :{}\n The error Message is:{}", e, message); } } + + } diff --git a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java index b0096e3..9bbf95f 100644 --- a/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java +++ b/src/main/java/com/zdjizhi/operator/map/TypeMapCompleted.java @@ -3,12 +3,16 @@ package com.zdjizhi.operator.map; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.JSONObject; +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.tools.general.ConfigurationsUtils; import com.zdjizhi.tools.json.MetaUtil; +import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT; import com.zdjizhi.tools.transform.TransForm; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; +import java.util.Properties; /** @@ -17,21 +21,49 @@ import org.apache.flink.util.Collector; */ public class TypeMapCompleted extends ProcessFunction { private static final Log logger = LogFactory.get(); + private ConvertRecordToPERCENT convertRecordToPERCENT; + Properties Prop = new Properties(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + try { + if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) { + Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties")); + } else if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) { + Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties")); + } + convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop); + logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功"); + } catch (Exception e) { + logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e); + } } @Override - public void processElement(String message, ProcessFunction.Context ctx, Collector out) throws Exception { + public void processElement(String message, ProcessFunction.Context ctx, Collector out) { try { - JSONObject jsonObject = JSONObject.parseObject(message); - jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); + JSONObject record = JSONObject.parseObject(message); + JSONObject jsonObject = null; + if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) { + if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) { + jsonObject = convertRecordToPERCENT.convertToPERCENT(record); + } + } - TransForm.transformLog(jsonObject); - MetaUtil.typeTransform(jsonObject); - out.collect(jsonObject); + if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) { + if (record.containsKey("proxy_rule_list")) { + jsonObject = convertRecordToPERCENT.convertToPERCENT(record); + } + } + + if (jsonObject != null) { + jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); + + TransForm.transformLog(jsonObject); + MetaUtil.typeTransform(jsonObject); + out.collect(jsonObject); + } } catch (RuntimeException e) { logger.error("TransForm logs failed( The field type is verified ),The exception is :{}\n The error Message is:{}", e, message); } diff --git a/src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java index 5e3bb06..7cad645 100644 --- a/src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java +++ b/src/main/java/com/zdjizhi/operator/process/DealFileProcessFunction.java @@ -112,8 +112,8 @@ public class DealFileProcessFunction extends ProcessFunction String fileId = FileEdit.getFileId(rqUrlValue, "_1"); message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId)); SourceList request = new SourceList(); - request.setSource_oss_path(rqUrlValue); - request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId)); + request.setSource_oss_path(FlowWriteConfig.HOS_URL+rqUrlValue); + request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); jsonarray.add(request); httpRequestCount++; } @@ -122,8 +122,8 @@ public class DealFileProcessFunction extends ProcessFunction String fileId = FileEdit.getFileId(rpUrlValue, "_2"); message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId)); SourceList response = new SourceList(); - response.setSource_oss_path(rpUrlValue); - response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId)); + response.setSource_oss_path(FlowWriteConfig.HOS_URL+rpUrlValue); + response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); jsonarray.add(response); httpResponseCount++; } @@ -131,8 +131,8 @@ public class DealFileProcessFunction extends ProcessFunction String fileId = FileEdit.getFileId(emailUrlValue, "_9"); message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId)); SourceList emailFile = new SourceList(); - emailFile.setSource_oss_path(emailUrlValue); - emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId)); + emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL+emailUrlValue); + emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); jsonarray.add(emailFile); mailEmlCount++; } @@ -144,7 +144,6 @@ public class DealFileProcessFunction extends ProcessFunction context.output(metaToKafa, JSONObject.toJSONString(fileMeta)); } collector.collect(JSONObject.toJSONString(message)); - } } catch (RuntimeException e) { logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message); diff --git a/src/main/java/com/zdjizhi/tools/connections/http/HttpClientService.java b/src/main/java/com/zdjizhi/tools/connections/http/HttpClientService.java index fa61bf4..2e7dca3 100644 --- a/src/main/java/com/zdjizhi/tools/connections/http/HttpClientService.java +++ b/src/main/java/com/zdjizhi/tools/connections/http/HttpClientService.java @@ -12,6 +12,7 @@ import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.*; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.client.utils.URIBuilder; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; @@ -35,10 +36,12 @@ import java.net.*; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; +import java.util.Map; public class HttpClientService { private static final Log log = LogFactory.get(); + public static final String ERROR_MESSAGE = "-1"; /** * 在调用SSL之前需要重写验证方法,取消检测SSL @@ -254,4 +257,73 @@ public class HttpClientService { return result; } } + + /** + * GET请求 + * + * @param uri 请求地 + * @return message + */ + public String httpGet(URI uri, int socketTimeout, Header... headers) { + String msg = ERROR_MESSAGE; + + // 获取客户端连接对象 + CloseableHttpClient httpClient = getHttpClient(socketTimeout); + CloseableHttpResponse response = null; + + try { + log.info("http get uri {}", uri); + // 创建GET请求对象 + HttpGet httpGet = new HttpGet(uri); + + if (StringUtil.isNotEmpty(headers)) { + for (Header h : headers) { + httpGet.addHeader(h); + log.info("request header : {}", h); + } + } + // 执行请求 + response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + // 获取响应实体 + HttpEntity entity = response.getEntity(); + // 获取响应信息 + msg = EntityUtils.toString(entity, "UTF-8"); + + if (statusCode != HttpStatus.SC_OK) { + log.error("Http get content is :{}", msg); + } + } catch (ClientProtocolException e) { + log.error("协议错误: {}", e.getMessage()); + } catch (ParseException e) { + log.error("解析错误: {}", e.getMessage()); + } catch (IOException e) { + log.error("IO错误: {}", e.getMessage()); + } finally { + if (null != response) { + try { + EntityUtils.consume(response.getEntity()); + response.close(); + } catch (IOException e) { + log.error("释放链接错误: {}", e.getMessage()); + + } + } + } + + return msg; + } + + public void setUrlWithParams(URIBuilder uriBuilder, String path, Map params) { + try { + uriBuilder.setPath(path); + if (params != null && !params.isEmpty()) { + for (Map.Entry kv : params.entrySet()) { + uriBuilder.setParameter(kv.getKey(), kv.getValue().toString()); + } + } + } catch (Exception e) { + log.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params); + } + } } diff --git a/src/main/java/com/zdjizhi/tools/general/FileEdit.java b/src/main/java/com/zdjizhi/tools/general/FileEdit.java index 87f5d58..e8cf99a 100644 --- a/src/main/java/com/zdjizhi/tools/general/FileEdit.java +++ b/src/main/java/com/zdjizhi/tools/general/FileEdit.java @@ -12,38 +12,35 @@ import static com.zdjizhi.common.FlowWriteConfig.judgeFileType; public class FileEdit { - public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){ + public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId) { String fileType = null; - if (judgeFileType(getFileType(urlValue))){ - fileType = getFileType(urlValue); - }else { - if (schemaType.equals("HTTP")){ - fileType = "html"; - } - if (schemaType.equals("MAIL")){ - fileType = "eml"; - } + if (schemaType.equals("HTTP")) { + fileType = "html"; + } + if (schemaType.equals("MAIL")) { + fileType = "eml"; } - return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account; + + return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account; } - public static String getFileDownloadUrl(String fileId){ - return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId; + public static String getFileDownloadUrl(String fileId) { + return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/download?file_id=" + fileId; } - public static String getFileType(String url){ + public static String getFileType(String url) { String[] split = url.split("\\."); - return split[split.length-1]; + return split[split.length - 1]; } - public static String getFileId(String url,String fileSuffix) throws Exception { + public static String getFileId(String filename, String fileSuffix) throws Exception { - String[] arr = url.split("/"); - String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_")); - String prefix = MD5Utils.md5Encode(filename); +// String[] arr = url.split("/"); +// String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_")); +// String prefix = MD5Utils.md5Encode(filename); // String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf(".")); - return prefix+fileSuffix; + return filename + fileSuffix; } } diff --git a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java index 9460425..57db453 100644 --- a/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java +++ b/src/main/java/com/zdjizhi/tools/general/IpLookupUtils.java @@ -4,9 +4,6 @@ import cn.hutool.crypto.digest.DigestUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson2.*; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; import com.geedgenetworks.utils.IpLookupV2; import com.geedgenetworks.utils.StringUtil; import com.google.common.base.Joiner; @@ -14,24 +11,26 @@ import com.zdjizhi.common.CommonConfig; import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.pojo.KnowlegeBaseMeta; import com.zdjizhi.tools.connections.http.HttpClientService; -import com.zdjizhi.tools.connections.nacos.NacosConnection; +import org.apache.http.client.utils.URIBuilder; import java.io.ByteArrayInputStream; +import java.net.URISyntaxException; import java.util.HashMap; -import java.util.concurrent.Executor; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + /** - * @author qidaijie - * @version 2022/11/16 15:23 + * @author wangchengcheng + * @version 2023/11/10 15:23 */ public class IpLookupUtils { private static final Log logger = LogFactory.get(); - private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb"; - private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb"; - private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb"; - private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb"; - private static final String asnV4Name = "asn_v4.mmdb"; - private static final String asnV6Name = "asn_v6.mmdb"; + private static final String ipBuiltInName = "ip_builtin.mmdb"; + private static final String ipUserDefinedName = "ip_user_defined.mmdb"; + + private static final String asnName = "asn_builtin.mmdb"; /** * ip定位库 @@ -58,142 +57,156 @@ public class IpLookupUtils { */ private static final HashMap knowledgeMetaCache = new HashMap<>(16); + private static String currentSha256IpUserDefined = ""; + + private static String currentSha256IpBuiltin = ""; + + + private static String currentSha256AsnBuiltin = ""; + static { - JSONPath jsonPath = JSONPath.of(getFilterParameter()); httpClientService = new HttpClientService(); - - NacosConnection nacosConnection = new NacosConnection(); - ConfigService schemaService = nacosConnection.getPublicService(); try { - String configInfo = schemaService.getConfigAndSignListener(FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID, FlowWriteConfig.NACOS_PUBLIC_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT, new Listener() { + stuffKnowledgeMetaCache(); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { @Override - public Executor getExecutor() { - return null; - } + public void run() { + stuffKnowledgeMetaCache(); - @Override - public void receiveConfigInfo(String configInfo) { - if (StringUtil.isNotBlank(configInfo)) { - updateIpLookup(jsonPath, configInfo); - } } - }); - - if (StringUtil.isNotBlank(configInfo)) { - updateIpLookup(jsonPath, configInfo); - } - } catch (NacosException e) { - logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage()); + }, 0, FlowWriteConfig.KNOWLEDGE_EXECUTION_MINUTES * 1000 * 60); + } catch (Exception e) { + logger.error("知识库加载失败,失败原因为:" + e); } } - private static void updateIpLookup(JSONPath jsonPath, String configInfo) { - String extract = jsonPath.extract(JSONReader.of(configInfo)).toString(); - if (StringUtil.isNotBlank(extract)) { - JSONArray jsonArray = JSON.parseArray(extract); - if (jsonArray.size() > 0) { - for (int i = 0; i < jsonArray.size(); i++) { - KnowlegeBaseMeta knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class); - String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(knowlegeBaseMeta.getName(), knowlegeBaseMeta.getFormat()); - knowledgeMetaCache.put(fileName, knowlegeBaseMeta); - } - reloadIpLookup(); - } + + private static void stuffKnowledgeMetaCache() { + final KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_BUILTIN_KD_ID); + if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) { + String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat()); + knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta); + } + final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID); + if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) { + String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat()); + knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta); + } + + final KnowlegeBaseMeta asnBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.ASN_BUILTIN_KD_ID); + if (!currentSha256AsnBuiltin.equals(asnBuiltinknowlegeBaseMeta.getSha256())) { + String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(asnBuiltinknowlegeBaseMeta.getName(), asnBuiltinknowlegeBaseMeta.getFormat()); + knowledgeMetaCache.put(fileName, asnBuiltinknowlegeBaseMeta); + } + + if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256()) || !currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256()) || !currentSha256AsnBuiltin.equals(asnBuiltinknowlegeBaseMeta.getSha256())) { + currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256(); + currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256(); + currentSha256AsnBuiltin = asnBuiltinknowlegeBaseMeta.getSha256(); + reloadIpLookup(); + logger.info("知识库加载成功."); } } /** * 从HDFS下载文件更新IpLookup - * - * @return 更新后的IpLookup */ - public static void reloadIpLookup() { - int retryNum = 0; + private static void reloadIpLookup() { IpLookupV2.Builder builder = new IpLookupV2.Builder(false); for (String fileName : knowledgeMetaCache.keySet()) { + int retryNum = 0; KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName); String metaSha256 = knowlegeBaseMeta.getSha256(); - do { + while (retryNum < TRY_TIMES) { + System.out.println("download file :" + fileName + ",HOS path :" + knowlegeBaseMeta.getPath()); + Long startTime = System.currentTimeMillis(); byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT); - if (httpGetByte.length > 0) { + if (httpGetByte != null && httpGetByte.length > 0) { String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte); if (metaSha256.equals(downloadFileSha256)) { ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte); switch (fileName) { - case ipv4BuiltInName: - builder.loadDataFileV4(inputStream); + case ipBuiltInName: + builder.loadDataFile(inputStream); break; - case ipv6BuiltInName: - builder.loadDataFileV6(inputStream); + case ipUserDefinedName: + builder.loadDataFilePrivate(inputStream); break; - case ipv4UserDefinedName: - builder.loadDataFilePrivateV4(inputStream); - break; - case ipv6UserDefinedName: - builder.loadDataFilePrivateV6(inputStream); - break; - case asnV4Name: - builder.loadAsnDataFileV4(inputStream); - break; - case asnV6Name: - builder.loadAsnDataFileV6(inputStream); + case asnName: + builder.loadAsnDataFile(inputStream); break; default: } + System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms"); retryNum = TRY_TIMES; } else { - logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum); + logger.error("通过HOS下载{}的sha256为:{} ,网关内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum); retryNum++; } } else { logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum); retryNum++; } - } while (retryNum < TRY_TIMES); + } } ipLookup = builder.build(); } + public static IpLookupV2 getIpLookup() { + return ipLookup; + } + /** * 根据配置组合生成知识库元数据过滤参数 * * @return 过滤参数 */ private static String getFilterParameter() { - String[] typeList = CommonConfig.KNOWLEDGEBASE_TYPE_LIST.split(","); - String[] nameList = CommonConfig.KNOWLEDGEBASE_NAME_LIST.split(","); - String expr = "[?(@.version=='latest')]"; - if (typeList.length > 1) { - StringBuilder typeBuilder = new StringBuilder(); - typeBuilder.append("[?(@.type in ("); - for (int i = 0; i < typeList.length; i++) { - if (i == typeList.length - 1) { - typeBuilder.append("'").append(typeList[i]).append("'))]"); - } else { - typeBuilder.append("'").append(typeList[i]).append("',"); - } - } - expr = expr + typeBuilder; - } - - if (nameList.length > 1) { - StringBuilder nameBuilder = new StringBuilder(); - nameBuilder.append("[?(@.name in ("); - for (int i = 0; i < nameList.length; i++) { - if (i == nameList.length - 1) { - nameBuilder.append("'").append(nameList[i]).append("'))]"); - } else { - nameBuilder.append("'").append(nameList[i]).append("',"); - } - } - expr = expr + nameBuilder; - } + String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined','asn_builtin'))]"; return expr; } - public static IpLookupV2 getIpLookup() { - return ipLookup; + public static String getCountryLookup(String ip) { + return ipLookup.countryLookup(ip); + } + + private static KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) { + KnowlegeBaseMeta knowlegeBaseMeta = null; + String knowledgeInfo = null; + try { + URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.KNOWLEDGE_BASE_URL); + HashMap parms = new HashMap<>(); + parms.put("kb_id", kd_id); + httpClientService.setUrlWithParams(uriBuilder, FlowWriteConfig.KNOWLEDGE_BASE_PATH, parms); + knowledgeInfo = httpClientService.httpGet(uriBuilder.build(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT); + if (knowledgeInfo.contains("200")) { + final Map jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class); + JSONPath jsonPath = JSONPath.of(getFilterParameter()); + String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString(); + if (StringUtil.isNotBlank(extract)) { + JSONArray jsonArray = JSON.parseArray(extract); + if (jsonArray.size() > 0) { + for (int i = 0; i < jsonArray.size(); i++) { + knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class); + } + } + } + } else { + logger.error("获取knowledge_base失败,请求回执为" + knowledgeInfo); + } + } catch (URISyntaxException e) { + logger.error("构造URI异常", e); + } catch (Exception e) { + logger.error("获取knowledge_base失败", e); + } + return knowlegeBaseMeta; + } + + public static void main(String[] args) { + final String countryLookup = IpLookupUtils.getIpLookup().asnLookup("10.64.10.7"); + System.out.println(countryLookup); } } diff --git a/src/main/java/com/zdjizhi/tools/json/MetaUtil.java b/src/main/java/com/zdjizhi/tools/json/MetaUtil.java index a6ace67..e619ff4 100644 --- a/src/main/java/com/zdjizhi/tools/json/MetaUtil.java +++ b/src/main/java/com/zdjizhi/tools/json/MetaUtil.java @@ -143,15 +143,17 @@ public class MetaUtil { for (Map.Entry entry : jsonObject.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); - if (schemaFieldsTypeMap.containsKey(key)) { - try { - Class schemaFieldClass = schemaFieldsTypeMap.get(key); - if (schemaFieldClass != value.getClass()) { - String simpleName = schemaFieldClass.getSimpleName(); - DataTypeCheck.typeConverter(jsonObject, key, value, simpleName); + if (value != null) { + if (schemaFieldsTypeMap.containsKey(key)) { + try { + Class schemaFieldClass = schemaFieldsTypeMap.get(key); + if (schemaFieldClass != value.getClass()) { + String simpleName = schemaFieldClass.getSimpleName(); + DataTypeCheck.typeConverter(jsonObject, key, value, simpleName); + } + } catch (RuntimeException e) { + logger.error("The {} field type conversion is abnormal! message is:", key, e); } - } catch (RuntimeException e) { - logger.error("The {} field type conversion is abnormal! message is:", key, e); } } } diff --git a/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java new file mode 100644 index 0000000..e6dc4c9 --- /dev/null +++ b/src/main/java/com/zdjizhi/tools/logtransformation/ConvertRecordToPERCENT.java @@ -0,0 +1,125 @@ +package com.zdjizhi.tools.logtransformation; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; + +import java.util.*; + +public class ConvertRecordToPERCENT { + private Properties securityProp; + private HashMap recordSchema; + + public ConvertRecordToPERCENT(Properties securityProp) { + this.securityProp = securityProp; + final HashMap schemaMap = new HashMap(); + for (String key : securityProp.stringPropertyNames()) { + final String schema = securityProp.getProperty(key); + schemaMap.put(key, schema); + } + this.recordSchema = schemaMap; + } + + public JSONObject convertToPERCENT(JSONObject record) { + final JSONObject percent = new JSONObject(); + for (Map.Entry entry : record.entrySet()) { + if (recordSchema.containsKey(entry.getKey())) { + percent.put(recordSchema.get(entry.getKey()), entry.getValue()); + } + } + + if (record.containsKey("security_rule_list")) { + percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0)); + percent.put("common_action", fillingCommonAction((String) record.get("security_action"))); + } + + if (record.containsKey("monitor_rule_list")) { + percent.put("common_policy_id", (Integer) JSONArray.from(record.get("monitor_rule_list")).get(0)); + percent.put("common_action", 1); + } + + if (record.containsKey("proxy_rule_list")){ + percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0)); + percent.put("common_action", fillingCommonAction((String) record.get("proxy_action"))); + } + + //填充common_start_time、common_end_time + percent.put("common_start_time",record.get("start_timestamp_ms")); + percent.put("common_end_time",record.get("end_timestamp_ms")); + + //填充common_internal_ip、common_external_ip、common_direction、common_stream_dir + if (record.containsKey("flags")) { + final int flags = (int) record.get("flags"); + if (flags > 0) { + if ((8L & flags) == 8L && (16L & flags) != 16L) { + percent.put("common_internal_ip", record.get("common_client_ip")); + percent.put("common_external_ip", record.get("common_server_ip")); + percent.put("common_direction", 69); + } else if ((8L & flags) != 8L && (16L & flags) == 16L) { + percent.put("common_internal_ip", record.get("common_server_ip")); + percent.put("common_external_ip", record.get("common_client_ip")); + percent.put("common_direction", 73); + } + if ((8192L & flags) == 8192L && (16384L & flags) == 16384L) { + percent.put("common_stream_dir", 3); + } else if ((8192L & flags) == 8192L) { + percent.put("common_stream_dir", 1); + } else if ((16384L & flags) == 16384L) { + percent.put("common_stream_dir", 2); + } + } + } + return percent; + } + + + private int fillingCommonAction(String action) { + int number = 0; + switch (action) { + case "none": + number = 0; + break; + case "Monitor": + number = 1; + break; + case "Intercept": + number = 2; + break; + case "No Intercept": + number = 3; + break; + case "Active Defence": + number = 4; + break; + case "WAN NAT": + number = 8; + break; + case "Reject": + case "Deny": + number = 16; + break; + case "Shaping": + number = 32; + break; + case "Manipulate": + number = 48; + break; + case "Service Chaining": + number = 64; + break; + case "Allow": + case "Bypass": + number = 96; + break; + case "Shunt": + number = 128; + break; + case "Statistics": + number = 129; + break; + default: + number = 0; + } + return number; + } + +} diff --git a/src/test/java/com/zdjizhi/schema/SecurityEventSchema.java b/src/test/java/com/zdjizhi/schema/SecurityEventSchema.java new file mode 100644 index 0000000..97a3343 --- /dev/null +++ b/src/test/java/com/zdjizhi/schema/SecurityEventSchema.java @@ -0,0 +1,22 @@ +package com.zdjizhi.schema; + +import com.zdjizhi.tools.general.ConfigurationsUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Properties; + +public class SecurityEventSchema { + + public static void main(String[] args) throws IOException { + Properties prop = new Properties(); + prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties")); + final HashMap securityEventSchema = new HashMap<>(); + + for (String key : prop.stringPropertyNames()) { + final String schema = prop.getProperty(key); + securityEventSchema.put(key,schema); + } + System.out.println(securityEventSchema); + } +} diff --git a/src/test/java/com/zdjizhi/schema/Test.java b/src/test/java/com/zdjizhi/schema/Test.java new file mode 100644 index 0000000..46f837b --- /dev/null +++ b/src/test/java/com/zdjizhi/schema/Test.java @@ -0,0 +1,7 @@ +package com.zdjizhi.schema; + +public class Test { + public static void main(String[] args) { + System.out.println(26286 & 2132321); + } +}