feat:Adapt to version 24.01 session-record

This commit is contained in:
wangchengcheng
2024-01-17 20:02:27 +08:00
parent 5c0a108393
commit 68b4805c4f
14 changed files with 642 additions and 158 deletions

View File

@@ -0,0 +1,68 @@
recv_time=common_recv_time
log_id=common_log_id
start_timestamp_ms=common_start_timestamp_ms
end_timestamp_ms=common_end_timestamp_ms
processing_time=common_processing_time
device_id=common_device_id
data_center=common_data_center
sled_ip=common_sled_ip
device_tag=common_device_tag
client_ip=common_client_ip
client_port=common_client_port
client_asn=common_client_asn
subscriber_id=common_subscriber_id
imei=common_imei
imsi=common_imsi
phone_number=common_phone_number
server_ip=common_server_ip
server_port=common_server_port
server_asn=common_server_asn
address_type=common_address_type
http_url=http_url
http_host=http_host
http_request_line=http_request_line
http_response_line=http_response_line
http_request_body=http_request_body
http_response_body=http_response_body
http_cookie=http_cookie
http_referer=http_referer
http_user_agent=http_user_agent
http_request_content_length=http_request_content_length
http_request_content_type=http_request_content_type
http_response_content_length=http_response_content_length
http_response_content_type=http_response_content_type
http_set_cookie=http_set_cookie
http_version=http_version
http_action_file_size=http_action_file_size
doh_url=doh_url
doh_host=doh_host
doh_cookie=doh_cookie
doh_referer=doh_referer
doh_user_agent=doh_user_agent
doh_version=doh_version
doh_message_id=doh_message_id
doh_qr=doh_qr
doh_opcode=doh_opcode
doh_aa=doh_aa
doh_tc=doh_tc
doh_rd=doh_rd
doh_ra=doh_ra
doh_rcode=doh_rcode
doh_qdcount=doh_qdcount
doh_ancount=doh_ancount
doh_nscount=doh_nscount
doh_arcount=doh_arcount
doh_qname=doh_qname
doh_qtype=doh_qtype
doh_qclass=doh_qclass
doh_cname=doh_cname
doh_sub=doh_sub
doh_rr=doh_rr
client_geolocation=common_client_location
server_geolocation=common_server_location
ip_protocol=common_l4_protocol
sent_bytes=common_c2s_byte_num
received_bytes=common_s2c_byte_num
decoded_as=common_schema_type
proxy_rule_list=proxy_rule_list
session_id=common_stream_trace_id

View File

@@ -0,0 +1,101 @@
recv_time=common_recv_time
log_id=common_log_id
start_timestamp_ms=common_start_timestamp_ms
end_timestamp_ms=common_end_timestamp_ms
duration_ms=common_con_duration_ms
processing_time=common_processing_time
device_id=common_device_id
data_center=common_data_center
sled_ip=common_sled_ip
device_tag=common_device_tag
client_ip=common_client_ip
client_port=common_client_port
client_asn=common_client_asn
subscriber_id=common_subscriber_id
imei=common_imei
imsi=common_imsi
phone_number=common_phone_number
server_ip=common_server_ip
server_port=common_server_port
server_asn=common_server_asn
address_type=common_address_type
http_url=http_url
http_host=http_host
http_request_body=http_request_body
http_response_body=http_response_body
http_proxy_flag=http_proxy_flag
http_sequence=http_sequence
http_cookie=http_cookie
http_referer=http_referer
http_user_agent=http_user_agent
http_request_content_length=http_request_content_length
http_request_content_type=http_request_content_type
http_response_content_length=http_response_content_length
http_response_content_type=http_response_content_type
http_set_cookie=http_set_cookie
http_version=http_version
http_response_latency_ms=http_response_latency_ms
http_session_duration_ms=http_session_duration_ms
mail_protocol_type=mail_protocol_type
mail_account=mail_account
mail_password=mail_passwd
mail_from_cmd=mail_from_cmd
mail_to_cmd=mail_to_cmd
mail_from=mail_from
mail_to=mail_to
mail_cc=mail_cc
mail_bcc=mail_bcc
mail_subject=mail_subject
mail_subject_charset=mail_subject_charset
mail_attachment_name=mail_attachment_name
mail_attachment_name_charset=mail_attachment_name_charset
mail_eml_file=mail_eml_file
dns_message_id=dns_message_id
dns_qr=dns_qr
dns_opcode=dns_opcode
dns_aa=dns_aa
dns_tc=dns_tc
dns_rd=dns_rd
dns_ra=dns_ra
dns_rcode=dns_rcode
dns_qdcount=dns_qdcount
dns_ancount=dns_ancount
dns_nscount=dns_nscount
dns_arcount=dns_arcount
dns_qname=dns_qname
dns_qtype=dns_qtype
dns_qclass=dns_qclass
dns_cname=dns_cname
dns_sub=dns_sub
dns_rr=dns_rr
quic_version=quic_version
quic_sni=quic_sni
quic_user_agent=quic_user_agent
ftp_account=ftp_account
ftp_url=ftp_url
ftp_link_type=ftp_link_type
out_link_id=common_egress_link_id
in_link_id=common_ingress_link_id
client_geolocation=common_client_location
server_geolocation=common_server_location
app=common_app_label
ip_protocol=common_l4_protocol
sent_pkts=common_c2s_pkt_num
received_pkts=common_s2c_pkt_num
sent_bytes=common_c2s_byte_num
received_bytes=common_s2c_byte_num
tcp_client_isn=common_tcp_client_isn
tcp_server_isn=common_tcp_server_isn
decoded_as=common_schema_type
session_id=common_stream_trace_id
ssl_version=ssl_version
ssl_sni=ssl_sni
ssl_san=ssl_san
ssl_cn=ssl_cn
ssl_handshake_latency_ms=ssl_con_latency_ms
ssl_ja3_hash=ssl_ja3_hash
ssl_cert_issuer=ssl_cert_issuer
ssl_cert_subject=ssl_cert_subject
security_rule_list=security_rule_list
monitor_rule_list=monitor_rule_list
tcp_handshake_latency_ms=common_establish_latency_ms

View File

@@ -1,7 +1,7 @@
#--------------------------------地址配置------------------------------# #--------------------------------地址配置------------------------------#
#管理kafka地址 #管理kafka地址
source.kafka.servers=192.168.44.12:9094 #source.kafka.servers=192.168.44.12:9094
source.kafka.servers=192.168.44.11:9094,192.168.44.14:9094,192.168.44.15:9094
#百分点输出kafka地址 #百分点输出kafka地址
percent.sink.kafka.servers=192.168.44.12:9094 percent.sink.kafka.servers=192.168.44.12:9094
#文件源数据topic输出kafka地址 #文件源数据topic输出kafka地址
@@ -18,17 +18,17 @@ tools.library=D:\\workerspace\\dat\\
#--------------------------------nacos配置------------------------------# #--------------------------------nacos配置------------------------------#
#nacos 地址 #nacos 地址
nacos.server=192.168.44.67:8848 nacos.server=192.168.44.12:8848
#schema namespace名称 #schema namespace名称
nacos.schema.namespace=f507879a-8b1b-4330-913e-83d4fcdc14bb nacos.schema.namespace=P19
#schema data id名称 #schema data id名称
nacos.schema.data.id=session_record.json nacos.schema.data.id=proxy_event.json
#--------------------------------Kafka消费/生产配置------------------------------# #--------------------------------Kafka消费/生产配置------------------------------#
#kafka 接收数据topic #kafka 接收数据topic
source.kafka.topic=test source.kafka.topic=SESSION-RECORD
sink.percent.kafka.topic=PERCENT-RECORD sink.percent.kafka.topic=PERCENT-RECORD
@@ -64,4 +64,16 @@ oos.servers=10.3.45.100:8057
#prometheus-httpserver #prometheus-httpserver
prometheus.pushgateway.address=192.168.44.12:9091 prometheus.pushgateway.address=192.168.44.12:9091
pushgateway.statistics.time=300 pushgateway.statistics.time=300
deal.file.statistics.time=60 deal.file.statistics.time=60
#------------------------------------knowledge配置------------------------------------#
knowledge.execution.minutes=600
knowledge.base.uri=http://192.168.44.67:9999
knowledge.base.path=/v1/knowledge_base
ip.user.defined.kd.id=004390bc-3135-4a6f-a492-3662ecb9e289
ip.builtin.kd.id=64af7077-eb9b-4b8f-80cf-2ceebc89bea9
asn.builtin.kd.id=f9f6bc91-2142-4673-8249-e097c00fe1ea
hos.url=http://192.168.44.67:9098/hos/traffic_file_bucket/

View File

@@ -161,4 +161,18 @@ public class FlowWriteConfig {
public static final Integer PUSHGATEWAY_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "pushgateway.statistics.time"); public static final Integer PUSHGATEWAY_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "pushgateway.statistics.time");
public static final Integer DEAL_FILE_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "deal.file.statistics.time"); public static final Integer DEAL_FILE_STATISTICS_TIME = ConfigurationsUtils.getIntProperty(propService, "deal.file.statistics.time");
public static final String KNOWLEDGE_BASE_URL = ConfigurationsUtils.getStringProperty(propService, "knowledge.base.uri");
public static final String KNOWLEDGE_BASE_PATH = ConfigurationsUtils.getStringProperty(propService, "knowledge.base.path");
public static final String ASN_BUILTIN_KD_ID = ConfigurationsUtils.getStringProperty(propService, "asn.builtin.kd.id");
public static final String IP_BUILTIN_KD_ID = ConfigurationsUtils.getStringProperty(propService, "ip.builtin.kd.id");
public static final String IP_USER_DEFINED_KD_ID = ConfigurationsUtils.getStringProperty(propService, "ip.user.defined.kd.id");
public static final String HOS_URL = ConfigurationsUtils.getStringProperty(propService, "hos.url");
public static final Long KNOWLEDGE_EXECUTION_MINUTES = ConfigurationsUtils.getLongProperty(propService,"knowledge.execution.minutes");
} }

View File

@@ -3,43 +3,63 @@ package com.zdjizhi.operator.map;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.geedgenetworks.utils.IpLookupV2;
import com.zdjizhi.tools.general.IpLookupUtils; import com.zdjizhi.tools.general.ConfigurationsUtils;
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
import com.zdjizhi.tools.transform.TransForm; import com.zdjizhi.tools.transform.TransForm;
import com.zdjizhi.tools.json.MetaUtil; import com.zdjizhi.tools.json.MetaUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.util.Map; import java.util.Properties;
/** /**
* @author qidaijie * @author qidaijie
* @version 2021/5/27 15:01 * @version 2021/5/27 15:01
*/ */
public class MapCompleted extends ProcessFunction<String,com.alibaba.fastjson2.JSONObject> { public class MapCompleted extends ProcessFunction<String, com.alibaba.fastjson2.JSONObject> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private ConvertRecordToPERCENT securityEvnetConvert;
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
try {
Properties securityProp = new Properties();
securityProp.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
securityEvnetConvert = new ConvertRecordToPERCENT(securityProp);
logger.info("ecurity_event日志Schema加载成功");
} catch (Exception e) {
logger.error("security_event日志Schema加载失败,失败原因为:" + e);
}
} }
@Override @Override
public void processElement(String message, ProcessFunction<String, com.alibaba.fastjson2.JSONObject>.Context ctx, Collector<com.alibaba.fastjson2.JSONObject> out) throws Exception { public void processElement(String message, ProcessFunction<String, com.alibaba.fastjson2.JSONObject>.Context ctx, Collector<com.alibaba.fastjson2.JSONObject> out) {
try { try {
JSONObject jsonObject = JSONObject.parseObject(message); JSONObject record = JSONObject.parseObject(message);
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
MetaUtil.dropJsonField(jsonObject); JSONObject jsonObject = null;
TransForm.transformLog(jsonObject); if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) {
out.collect(jsonObject); jsonObject = securityEvnetConvert.convertToPERCENT(record);
}
if (jsonObject != null) {
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
MetaUtil.dropJsonField(jsonObject);
TransForm.transformLog(jsonObject);
out.collect(jsonObject);
}
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("TransForm log failed ( The field type is not verified ),The exception is :{}\n The error Message is:{}", e, message); logger.error("TransForm log failed ( The field type is not verified ),The exception is :{}\n The error Message is:{}", e, message);
} }
} }
} }

View File

@@ -3,12 +3,16 @@ package com.zdjizhi.operator.map;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.general.ConfigurationsUtils;
import com.zdjizhi.tools.json.MetaUtil; import com.zdjizhi.tools.json.MetaUtil;
import com.zdjizhi.tools.logtransformation.ConvertRecordToPERCENT;
import com.zdjizhi.tools.transform.TransForm; import com.zdjizhi.tools.transform.TransForm;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import java.util.Properties;
/** /**
@@ -17,21 +21,49 @@ import org.apache.flink.util.Collector;
*/ */
public class TypeMapCompleted extends ProcessFunction<String, JSONObject> { public class TypeMapCompleted extends ProcessFunction<String, JSONObject> {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private ConvertRecordToPERCENT convertRecordToPERCENT;
Properties Prop = new Properties();
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
try {
if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) {
Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
} else if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) {
Prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("proxy_event_mapping_table.properties"));
}
convertRecordToPERCENT = new ConvertRecordToPERCENT(Prop);
logger.info(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载成功");
} catch (Exception e) {
logger.error(FlowWriteConfig.NACOS_SCHEMA_DATA_ID + "日志Schema加载失败,失败原因为:" + e);
}
} }
@Override @Override
public void processElement(String message, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception { public void processElement(String message, ProcessFunction<String, JSONObject>.Context ctx, Collector<JSONObject> out) {
try { try {
JSONObject jsonObject = JSONObject.parseObject(message); JSONObject record = JSONObject.parseObject(message);
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000); JSONObject jsonObject = null;
if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("security_event.json")) {
if (record.containsKey("security_rule_list") || record.containsKey("monitor_rule_list")) {
jsonObject = convertRecordToPERCENT.convertToPERCENT(record);
}
}
TransForm.transformLog(jsonObject); if (FlowWriteConfig.NACOS_SCHEMA_DATA_ID.equals("proxy_event.json")) {
MetaUtil.typeTransform(jsonObject); if (record.containsKey("proxy_rule_list")) {
out.collect(jsonObject); jsonObject = convertRecordToPERCENT.convertToPERCENT(record);
}
}
if (jsonObject != null) {
jsonObject.put("common_ingestion_time", ctx.timestamp() / 1000);
TransForm.transformLog(jsonObject);
MetaUtil.typeTransform(jsonObject);
out.collect(jsonObject);
}
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("TransForm logs failed( The field type is verified ),The exception is :{}\n The error Message is:{}", e, message); logger.error("TransForm logs failed( The field type is verified ),The exception is :{}\n The error Message is:{}", e, message);
} }

View File

@@ -112,8 +112,8 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
String fileId = FileEdit.getFileId(rqUrlValue, "_1"); String fileId = FileEdit.getFileId(rqUrlValue, "_1");
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId)); message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
SourceList request = new SourceList(); SourceList request = new SourceList();
request.setSource_oss_path(rqUrlValue); request.setSource_oss_path(FlowWriteConfig.HOS_URL+rqUrlValue);
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId)); request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
jsonarray.add(request); jsonarray.add(request);
httpRequestCount++; httpRequestCount++;
} }
@@ -122,8 +122,8 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
String fileId = FileEdit.getFileId(rpUrlValue, "_2"); String fileId = FileEdit.getFileId(rpUrlValue, "_2");
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId)); message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
SourceList response = new SourceList(); SourceList response = new SourceList();
response.setSource_oss_path(rpUrlValue); response.setSource_oss_path(FlowWriteConfig.HOS_URL+rpUrlValue);
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId)); response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
jsonarray.add(response); jsonarray.add(response);
httpResponseCount++; httpResponseCount++;
} }
@@ -131,8 +131,8 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
String fileId = FileEdit.getFileId(emailUrlValue, "_9"); String fileId = FileEdit.getFileId(emailUrlValue, "_9");
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId)); message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
SourceList emailFile = new SourceList(); SourceList emailFile = new SourceList();
emailFile.setSource_oss_path(emailUrlValue); emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL+emailUrlValue);
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId)); emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
jsonarray.add(emailFile); jsonarray.add(emailFile);
mailEmlCount++; mailEmlCount++;
} }
@@ -144,7 +144,6 @@ public class DealFileProcessFunction extends ProcessFunction<JSONObject, String>
context.output(metaToKafa, JSONObject.toJSONString(fileMeta)); context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
} }
collector.collect(JSONObject.toJSONString(message)); collector.collect(JSONObject.toJSONString(message));
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message); logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);

View File

@@ -12,6 +12,7 @@ import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*; import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.Registry; import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder; import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ConnectTimeoutException;
@@ -35,10 +36,12 @@ import java.net.*;
import java.security.KeyManagementException; import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.Map;
public class HttpClientService { public class HttpClientService {
private static final Log log = LogFactory.get(); private static final Log log = LogFactory.get();
public static final String ERROR_MESSAGE = "-1";
/** /**
* 在调用SSL之前需要重写验证方法取消检测SSL * 在调用SSL之前需要重写验证方法取消检测SSL
@@ -254,4 +257,73 @@ public class HttpClientService {
return result; return result;
} }
} }
/**
* GET请求
*
* @param uri 请求地
* @return message
*/
public String httpGet(URI uri, int socketTimeout, Header... headers) {
String msg = ERROR_MESSAGE;
// 获取客户端连接对象
CloseableHttpClient httpClient = getHttpClient(socketTimeout);
CloseableHttpResponse response = null;
try {
log.info("http get uri {}", uri);
// 创建GET请求对象
HttpGet httpGet = new HttpGet(uri);
if (StringUtil.isNotEmpty(headers)) {
for (Header h : headers) {
httpGet.addHeader(h);
log.info("request header : {}", h);
}
}
// 执行请求
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
// 获取响应实体
HttpEntity entity = response.getEntity();
// 获取响应信息
msg = EntityUtils.toString(entity, "UTF-8");
if (statusCode != HttpStatus.SC_OK) {
log.error("Http get content is :{}", msg);
}
} catch (ClientProtocolException e) {
log.error("协议错误: {}", e.getMessage());
} catch (ParseException e) {
log.error("解析错误: {}", e.getMessage());
} catch (IOException e) {
log.error("IO错误: {}", e.getMessage());
} finally {
if (null != response) {
try {
EntityUtils.consume(response.getEntity());
response.close();
} catch (IOException e) {
log.error("释放链接错误: {}", e.getMessage());
}
}
}
return msg;
}
public void setUrlWithParams(URIBuilder uriBuilder, String path, Map<String, Object> params) {
try {
uriBuilder.setPath(path);
if (params != null && !params.isEmpty()) {
for (Map.Entry<String, Object> kv : params.entrySet()) {
uriBuilder.setParameter(kv.getKey(), kv.getValue().toString());
}
}
} catch (Exception e) {
log.error("拼接url出错,uri : {}, path : {},参数: {}", uriBuilder.toString(), path, params);
}
}
} }

View File

@@ -12,38 +12,35 @@ import static com.zdjizhi.common.FlowWriteConfig.judgeFileType;
public class FileEdit { public class FileEdit {
public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){ public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId) {
String fileType = null; String fileType = null;
if (judgeFileType(getFileType(urlValue))){ if (schemaType.equals("HTTP")) {
fileType = getFileType(urlValue); fileType = "html";
}else { }
if (schemaType.equals("HTTP")){ if (schemaType.equals("MAIL")) {
fileType = "html"; fileType = "eml";
}
if (schemaType.equals("MAIL")){
fileType = "eml";
}
} }
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account;
return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account;
} }
public static String getFileDownloadUrl(String fileId){ public static String getFileDownloadUrl(String fileId) {
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId; return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/download?file_id=" + fileId;
} }
public static String getFileType(String url){ public static String getFileType(String url) {
String[] split = url.split("\\."); String[] split = url.split("\\.");
return split[split.length-1]; return split[split.length - 1];
} }
public static String getFileId(String url,String fileSuffix) throws Exception { public static String getFileId(String filename, String fileSuffix) throws Exception {
String[] arr = url.split("/"); // String[] arr = url.split("/");
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_")); // String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
String prefix = MD5Utils.md5Encode(filename); // String prefix = MD5Utils.md5Encode(filename);
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf(".")); // String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
return prefix+fileSuffix; return filename + fileSuffix;
} }
} }

View File

@@ -4,9 +4,6 @@ import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.*; import com.alibaba.fastjson2.*;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.geedgenetworks.utils.IpLookupV2; import com.geedgenetworks.utils.IpLookupV2;
import com.geedgenetworks.utils.StringUtil; import com.geedgenetworks.utils.StringUtil;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@@ -14,24 +11,26 @@ import com.zdjizhi.common.CommonConfig;
import com.zdjizhi.common.FlowWriteConfig; import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.common.pojo.KnowlegeBaseMeta; import com.zdjizhi.common.pojo.KnowlegeBaseMeta;
import com.zdjizhi.tools.connections.http.HttpClientService; import com.zdjizhi.tools.connections.http.HttpClientService;
import com.zdjizhi.tools.connections.nacos.NacosConnection; import org.apache.http.client.utils.URIBuilder;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.net.URISyntaxException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.Executor; import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
/** /**
* @author qidaijie * @author wangchengcheng
* @version 2022/11/16 15:23 * @version 2023/11/10 15:23
*/ */
public class IpLookupUtils { public class IpLookupUtils {
private static final Log logger = LogFactory.get(); private static final Log logger = LogFactory.get();
private static final String ipv4BuiltInName = "ip_v4_built_in.mmdb"; private static final String ipBuiltInName = "ip_builtin.mmdb";
private static final String ipv6BuiltInName = "ip_v6_built_in.mmdb"; private static final String ipUserDefinedName = "ip_user_defined.mmdb";
private static final String ipv4UserDefinedName = "ip_v4_user_defined.mmdb";
private static final String ipv6UserDefinedName = "ip_v6_user_defined.mmdb"; private static final String asnName = "asn_builtin.mmdb";
private static final String asnV4Name = "asn_v4.mmdb";
private static final String asnV6Name = "asn_v6.mmdb";
/** /**
* ip定位库 * ip定位库
@@ -58,142 +57,156 @@ public class IpLookupUtils {
*/ */
private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16); private static final HashMap<String, KnowlegeBaseMeta> knowledgeMetaCache = new HashMap<>(16);
private static String currentSha256IpUserDefined = "";
private static String currentSha256IpBuiltin = "";
private static String currentSha256AsnBuiltin = "";
static { static {
JSONPath jsonPath = JSONPath.of(getFilterParameter());
httpClientService = new HttpClientService(); httpClientService = new HttpClientService();
NacosConnection nacosConnection = new NacosConnection();
ConfigService schemaService = nacosConnection.getPublicService();
try { try {
String configInfo = schemaService.getConfigAndSignListener(FlowWriteConfig.NACOS_KNOWLEDGEBASE_DATA_ID, FlowWriteConfig.NACOS_PUBLIC_GROUP, FlowWriteConfig.NACOS_CONNECTION_TIMEOUT, new Listener() { stuffKnowledgeMetaCache();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override @Override
public Executor getExecutor() { public void run() {
return null; stuffKnowledgeMetaCache();
}
@Override
public void receiveConfigInfo(String configInfo) {
if (StringUtil.isNotBlank(configInfo)) {
updateIpLookup(jsonPath, configInfo);
}
} }
}); }, 0, FlowWriteConfig.KNOWLEDGE_EXECUTION_MINUTES * 1000 * 60);
} catch (Exception e) {
if (StringUtil.isNotBlank(configInfo)) { logger.error("知识库加载失败,失败原因为:" + e);
updateIpLookup(jsonPath, configInfo);
}
} catch (NacosException e) {
logger.error("Get Schema config from Nacos error,The exception message is :" + e.getMessage());
} }
} }
private static void updateIpLookup(JSONPath jsonPath, String configInfo) {
String extract = jsonPath.extract(JSONReader.of(configInfo)).toString(); private static void stuffKnowledgeMetaCache() {
if (StringUtil.isNotBlank(extract)) { final KnowlegeBaseMeta ipBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_BUILTIN_KD_ID);
JSONArray jsonArray = JSON.parseArray(extract); if (!currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256())) {
if (jsonArray.size() > 0) { String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipBuiltinknowlegeBaseMeta.getName(), ipBuiltinknowlegeBaseMeta.getFormat());
for (int i = 0; i < jsonArray.size(); i++) { knowledgeMetaCache.put(fileName, ipBuiltinknowlegeBaseMeta);
KnowlegeBaseMeta knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class); }
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(knowlegeBaseMeta.getName(), knowlegeBaseMeta.getFormat()); final KnowlegeBaseMeta ipUserDefinedknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.IP_USER_DEFINED_KD_ID);
knowledgeMetaCache.put(fileName, knowlegeBaseMeta); if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256())) {
} String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(ipUserDefinedknowlegeBaseMeta.getName(), ipUserDefinedknowlegeBaseMeta.getFormat());
reloadIpLookup(); knowledgeMetaCache.put(fileName, ipUserDefinedknowlegeBaseMeta);
} }
final KnowlegeBaseMeta asnBuiltinknowlegeBaseMeta = getKnowlegeBaseMeta(FlowWriteConfig.ASN_BUILTIN_KD_ID);
if (!currentSha256AsnBuiltin.equals(asnBuiltinknowlegeBaseMeta.getSha256())) {
String fileName = Joiner.on(LOCATION_SEPARATOR).useForNull("").join(asnBuiltinknowlegeBaseMeta.getName(), asnBuiltinknowlegeBaseMeta.getFormat());
knowledgeMetaCache.put(fileName, asnBuiltinknowlegeBaseMeta);
}
if (!currentSha256IpUserDefined.equals(ipUserDefinedknowlegeBaseMeta.getSha256()) || !currentSha256IpBuiltin.equals(ipBuiltinknowlegeBaseMeta.getSha256()) || !currentSha256AsnBuiltin.equals(asnBuiltinknowlegeBaseMeta.getSha256())) {
currentSha256IpBuiltin = ipBuiltinknowlegeBaseMeta.getSha256();
currentSha256IpUserDefined = ipUserDefinedknowlegeBaseMeta.getSha256();
currentSha256AsnBuiltin = asnBuiltinknowlegeBaseMeta.getSha256();
reloadIpLookup();
logger.info("知识库加载成功.");
} }
} }
/** /**
* 从HDFS下载文件更新IpLookup * 从HDFS下载文件更新IpLookup
*
* @return 更新后的IpLookup
*/ */
public static void reloadIpLookup() { private static void reloadIpLookup() {
int retryNum = 0;
IpLookupV2.Builder builder = new IpLookupV2.Builder(false); IpLookupV2.Builder builder = new IpLookupV2.Builder(false);
for (String fileName : knowledgeMetaCache.keySet()) { for (String fileName : knowledgeMetaCache.keySet()) {
int retryNum = 0;
KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName); KnowlegeBaseMeta knowlegeBaseMeta = knowledgeMetaCache.get(fileName);
String metaSha256 = knowlegeBaseMeta.getSha256(); String metaSha256 = knowlegeBaseMeta.getSha256();
do { while (retryNum < TRY_TIMES) {
System.out.println("download file " + fileName + ",HOS path :" + knowlegeBaseMeta.getPath());
Long startTime = System.currentTimeMillis();
byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT); byte[] httpGetByte = httpClientService.httpGetByte(knowlegeBaseMeta.getPath(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
if (httpGetByte.length > 0) { if (httpGetByte != null && httpGetByte.length > 0) {
String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte); String downloadFileSha256 = DigestUtil.sha256Hex(httpGetByte);
if (metaSha256.equals(downloadFileSha256)) { if (metaSha256.equals(downloadFileSha256)) {
ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte); ByteArrayInputStream inputStream = new ByteArrayInputStream(httpGetByte);
switch (fileName) { switch (fileName) {
case ipv4BuiltInName: case ipBuiltInName:
builder.loadDataFileV4(inputStream); builder.loadDataFile(inputStream);
break; break;
case ipv6BuiltInName: case ipUserDefinedName:
builder.loadDataFileV6(inputStream); builder.loadDataFilePrivate(inputStream);
break; break;
case ipv4UserDefinedName: case asnName:
builder.loadDataFilePrivateV4(inputStream); builder.loadAsnDataFile(inputStream);
break;
case ipv6UserDefinedName:
builder.loadDataFilePrivateV6(inputStream);
break;
case asnV4Name:
builder.loadAsnDataFileV4(inputStream);
break;
case asnV6Name:
builder.loadAsnDataFileV6(inputStream);
break; break;
default: default:
} }
System.out.println("update " + fileName + " finished, speed :" + (System.currentTimeMillis() - startTime) + "ms");
retryNum = TRY_TIMES; retryNum = TRY_TIMES;
} else { } else {
logger.error("通过HOS下载{}的sha256为:{} ,Nacos内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum); logger.error("通过HOS下载{}的sha256为:{} ,网关内记录为:{} ,sha256不相等 开始第{}次重试下载文件", fileName, downloadFileSha256, metaSha256, retryNum);
retryNum++; retryNum++;
} }
} else { } else {
logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum); logger.error("通过HOS下载{}的流为空 ,开始第{}次重试下载文件", fileName, retryNum);
retryNum++; retryNum++;
} }
} while (retryNum < TRY_TIMES); }
} }
ipLookup = builder.build(); ipLookup = builder.build();
} }
public static IpLookupV2 getIpLookup() {
return ipLookup;
}
/** /**
* 根据配置组合生成知识库元数据过滤参数 * 根据配置组合生成知识库元数据过滤参数
* *
* @return 过滤参数 * @return 过滤参数
*/ */
private static String getFilterParameter() { private static String getFilterParameter() {
String[] typeList = CommonConfig.KNOWLEDGEBASE_TYPE_LIST.split(",");
String[] nameList = CommonConfig.KNOWLEDGEBASE_NAME_LIST.split(",");
String expr = "[?(@.version=='latest')]";
if (typeList.length > 1) { String expr = "[?(@.version=='latest')][?(@.name in ('ip_builtin','ip_user_defined','asn_builtin'))]";
StringBuilder typeBuilder = new StringBuilder();
typeBuilder.append("[?(@.type in (");
for (int i = 0; i < typeList.length; i++) {
if (i == typeList.length - 1) {
typeBuilder.append("'").append(typeList[i]).append("'))]");
} else {
typeBuilder.append("'").append(typeList[i]).append("',");
}
}
expr = expr + typeBuilder;
}
if (nameList.length > 1) {
StringBuilder nameBuilder = new StringBuilder();
nameBuilder.append("[?(@.name in (");
for (int i = 0; i < nameList.length; i++) {
if (i == nameList.length - 1) {
nameBuilder.append("'").append(nameList[i]).append("'))]");
} else {
nameBuilder.append("'").append(nameList[i]).append("',");
}
}
expr = expr + nameBuilder;
}
return expr; return expr;
} }
public static IpLookupV2 getIpLookup() { public static String getCountryLookup(String ip) {
return ipLookup; return ipLookup.countryLookup(ip);
}
private static KnowlegeBaseMeta getKnowlegeBaseMeta(String kd_id) {
KnowlegeBaseMeta knowlegeBaseMeta = null;
String knowledgeInfo = null;
try {
URIBuilder uriBuilder = new URIBuilder(FlowWriteConfig.KNOWLEDGE_BASE_URL);
HashMap<String, Object> parms = new HashMap<>();
parms.put("kb_id", kd_id);
httpClientService.setUrlWithParams(uriBuilder, FlowWriteConfig.KNOWLEDGE_BASE_PATH, parms);
knowledgeInfo = httpClientService.httpGet(uriBuilder.build(), FlowWriteConfig.HTTP_SOCKET_TIMEOUT);
if (knowledgeInfo.contains("200")) {
final Map<String, Object> jsonObject = JSONObject.parseObject(knowledgeInfo, Map.class);
JSONPath jsonPath = JSONPath.of(getFilterParameter());
String extract = jsonPath.extract(JSONReader.of(jsonObject.get("data").toString())).toString();
if (StringUtil.isNotBlank(extract)) {
JSONArray jsonArray = JSON.parseArray(extract);
if (jsonArray.size() > 0) {
for (int i = 0; i < jsonArray.size(); i++) {
knowlegeBaseMeta = JSONObject.parseObject(jsonArray.getString(i), KnowlegeBaseMeta.class);
}
}
}
} else {
logger.error("获取knowledge_base失败,请求回执为" + knowledgeInfo);
}
} catch (URISyntaxException e) {
logger.error("构造URI异常", e);
} catch (Exception e) {
logger.error("获取knowledge_base失败", e);
}
return knowlegeBaseMeta;
}
public static void main(String[] args) {
final String countryLookup = IpLookupUtils.getIpLookup().asnLookup("10.64.10.7");
System.out.println(countryLookup);
} }
} }

View File

@@ -143,15 +143,17 @@ public class MetaUtil {
for (Map.Entry<String, Object> entry : jsonObject.entrySet()) { for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
String key = entry.getKey(); String key = entry.getKey();
Object value = entry.getValue(); Object value = entry.getValue();
if (schemaFieldsTypeMap.containsKey(key)) { if (value != null) {
try { if (schemaFieldsTypeMap.containsKey(key)) {
Class<?> schemaFieldClass = schemaFieldsTypeMap.get(key); try {
if (schemaFieldClass != value.getClass()) { Class<?> schemaFieldClass = schemaFieldsTypeMap.get(key);
String simpleName = schemaFieldClass.getSimpleName(); if (schemaFieldClass != value.getClass()) {
DataTypeCheck.typeConverter(jsonObject, key, value, simpleName); String simpleName = schemaFieldClass.getSimpleName();
DataTypeCheck.typeConverter(jsonObject, key, value, simpleName);
}
} catch (RuntimeException e) {
logger.error("The {} field type conversion is abnormal! message is:", key, e);
} }
} catch (RuntimeException e) {
logger.error("The {} field type conversion is abnormal! message is:", key, e);
} }
} }
} }

View File

@@ -0,0 +1,125 @@
package com.zdjizhi.tools.logtransformation;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import java.util.*;
public class ConvertRecordToPERCENT {
private Properties securityProp;
private HashMap<String, String> recordSchema;
public ConvertRecordToPERCENT(Properties securityProp) {
this.securityProp = securityProp;
final HashMap<String, String> schemaMap = new HashMap<String, String>();
for (String key : securityProp.stringPropertyNames()) {
final String schema = securityProp.getProperty(key);
schemaMap.put(key, schema);
}
this.recordSchema = schemaMap;
}
public JSONObject convertToPERCENT(JSONObject record) {
final JSONObject percent = new JSONObject();
for (Map.Entry<String, Object> entry : record.entrySet()) {
if (recordSchema.containsKey(entry.getKey())) {
percent.put(recordSchema.get(entry.getKey()), entry.getValue());
}
}
if (record.containsKey("security_rule_list")) {
percent.put("common_policy_id", (Integer) JSONArray.from(record.get("security_rule_list")).get(0));
percent.put("common_action", fillingCommonAction((String) record.get("security_action")));
}
if (record.containsKey("monitor_rule_list")) {
percent.put("common_policy_id", (Integer) JSONArray.from(record.get("monitor_rule_list")).get(0));
percent.put("common_action", 1);
}
if (record.containsKey("proxy_rule_list")){
percent.put("common_policy_id", (Integer) JSONArray.from(record.get("proxy_rule_list")).get(0));
percent.put("common_action", fillingCommonAction((String) record.get("proxy_action")));
}
//填充common_start_time、common_end_time
percent.put("common_start_time",record.get("start_timestamp_ms"));
percent.put("common_end_time",record.get("end_timestamp_ms"));
//填充common_internal_ip、common_external_ip、common_direction、common_stream_dir
if (record.containsKey("flags")) {
final int flags = (int) record.get("flags");
if (flags > 0) {
if ((8L & flags) == 8L && (16L & flags) != 16L) {
percent.put("common_internal_ip", record.get("common_client_ip"));
percent.put("common_external_ip", record.get("common_server_ip"));
percent.put("common_direction", 69);
} else if ((8L & flags) != 8L && (16L & flags) == 16L) {
percent.put("common_internal_ip", record.get("common_server_ip"));
percent.put("common_external_ip", record.get("common_client_ip"));
percent.put("common_direction", 73);
}
if ((8192L & flags) == 8192L && (16384L & flags) == 16384L) {
percent.put("common_stream_dir", 3);
} else if ((8192L & flags) == 8192L) {
percent.put("common_stream_dir", 1);
} else if ((16384L & flags) == 16384L) {
percent.put("common_stream_dir", 2);
}
}
}
return percent;
}
private int fillingCommonAction(String action) {
int number = 0;
switch (action) {
case "none":
number = 0;
break;
case "Monitor":
number = 1;
break;
case "Intercept":
number = 2;
break;
case "No Intercept":
number = 3;
break;
case "Active Defence":
number = 4;
break;
case "WAN NAT":
number = 8;
break;
case "Reject":
case "Deny":
number = 16;
break;
case "Shaping":
number = 32;
break;
case "Manipulate":
number = 48;
break;
case "Service Chaining":
number = 64;
break;
case "Allow":
case "Bypass":
number = 96;
break;
case "Shunt":
number = 128;
break;
case "Statistics":
number = 129;
break;
default:
number = 0;
}
return number;
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.schema;
import com.zdjizhi.tools.general.ConfigurationsUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
public class SecurityEventSchema {
public static void main(String[] args) throws IOException {
Properties prop = new Properties();
prop.load(ConfigurationsUtils.class.getClassLoader().getResourceAsStream("security_event_mapping_table.properties"));
final HashMap<String, String> securityEventSchema = new HashMap<>();
for (String key : prop.stringPropertyNames()) {
final String schema = prop.getProperty(key);
securityEventSchema.put(key,schema);
}
System.out.println(securityEventSchema);
}
}

View File

@@ -0,0 +1,7 @@
package com.zdjizhi.schema;
public class Test {
public static void main(String[] args) {
System.out.println(26286 & 2132321);
}
}