diff --git a/properties/file_type.properties b/properties/file_type.properties new file mode 100644 index 0000000..8ffc908 --- /dev/null +++ b/properties/file_type.properties @@ -0,0 +1,5 @@ +txt +html +eml +jpg +png \ No newline at end of file diff --git a/src/main/java/com/zdjizhi/bean/FileMeta.java b/src/main/java/com/zdjizhi/bean/FileMeta.java new file mode 100644 index 0000000..e24e0b4 --- /dev/null +++ b/src/main/java/com/zdjizhi/bean/FileMeta.java @@ -0,0 +1,51 @@ +package com.zdjizhi.bean; + +import com.alibaba.fastjson.JSONArray; + +public class FileMeta { + private long common_log_id; + protected int common_recv_time; + private String common_schema_type; + private JSONArray sourceList; + private int processing_time; + + public long getCommon_log_id() { + return common_log_id; + } + + public void setCommon_log_id(long common_log_id) { + this.common_log_id = common_log_id; + } + + public int getCommon_recv_time() { + return common_recv_time; + } + + public void setCommon_recv_time(int common_recv_time) { + this.common_recv_time = common_recv_time; + } + + public String getCommon_schema_type() { + return common_schema_type; + } + + public void setCommon_schema_type(String common_schema_type) { + this.common_schema_type = common_schema_type; + } + + public JSONArray getSourceList() { + return sourceList; + } + + public void setSourceList(JSONArray sourceList) { + this.sourceList = sourceList; + } + + public int getProcessing_time() { + return processing_time; + } + + public void setProcessing_time(int processing_time) { + this.processing_time = processing_time; + } +} diff --git a/src/main/java/com/zdjizhi/bean/SourceList.java b/src/main/java/com/zdjizhi/bean/SourceList.java new file mode 100644 index 0000000..8fba85d --- /dev/null +++ b/src/main/java/com/zdjizhi/bean/SourceList.java @@ -0,0 +1,22 @@ +package com.zdjizhi.bean; + +public class SourceList { + private String destination_oss_path; + private String source_oss_path; + + public String getDestination_oss_path() { + return destination_oss_path; + } + + public void setDestination_oss_path(String destination_oss_path) { + this.destination_oss_path = destination_oss_path; + } + + public String getSource_oss_path() { + return source_oss_path; + } + + public void setSource_oss_path(String source_oss_path) { + this.source_oss_path = source_oss_path; + } +} diff --git a/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java new file mode 100644 index 0000000..a90f2f2 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/functions/DealFileProcessFunction.java @@ -0,0 +1,119 @@ +package com.zdjizhi.utils.functions; + + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.bean.FileMeta; +import com.zdjizhi.bean.SourceList; +import com.zdjizhi.utils.JsonMapper; +import com.zdjizhi.utils.StringUtil; +import com.zdjizhi.utils.general.FileEdit; +import com.zdjizhi.utils.json.JsonTypeUtil; + +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.Map; + + +/** + * @author wangchengcheng + * @Package com.zdjizhi.utils.functions + * @Description: + * @date 2021/10/14 + */ +public class DealFileProcessFunction extends ProcessFunction, String> { + private static final Log logger = LogFactory.get(); + + private String rpUrlValue; + private String rqUrlValue; + private String emailUrlValue; + + private long cfgId = 0; //= common_policy_id; + + private String sIp = null; // = common_client_ip; + private int sPort = 0;// = common_client_port; + private String dIp = null;//= common_server_ip; + private int dPort = 0;// = common_server_port; + private long foundTime = 0;// = common_recv_time; + private String account = null; + private String domain = null; + private String schemaType = null; + + + //初始化侧输流的标记 + public static OutputTag metaToKafa = new OutputTag("metaToKafka") { + }; + + @SuppressWarnings("unchecked") + @Override + public void processElement(Map message, Context context, Collector collector) throws Exception { + try { + if (message.size() > 0) { +// jsonMap = (Map) JsonMapper.fromJsonString(message, Map.class); +// jsonMap = JsonTypeUtil.typeTransform(map); + rpUrlValue = (String) message.get("http_response_body"); + rqUrlValue = (String) message.get("http_request_body"); + emailUrlValue = (String) message.get("mail_eml_file"); + + + if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) { + cfgId = (long) message.get("common_policy_id"); + sIp = (String) message.get("common_client_ip"); + sPort = (int) message.get("common_client_port"); + dIp = (String) message.get("common_server_ip"); + dPort = (int) message.get("common_server_port"); + foundTime = (long) message.get("common_recv_time"); + schemaType = (String) message.get("common_schema_type"); + + if (StringUtil.isNotBlank((String) message.get("http_domain"))) { + domain = message.get("http_domain").toString(); + } else { + domain = "NA"; + } + if (StringUtil.isNotBlank((String) message.get("common_subscribe_id"))) { + account = message.get("common_subscribe_id").toString(); + } else { + account = "NA"; + } + FileMeta fileMeta = new FileMeta(); + JSONArray jsonarray = new JSONArray(); + if (StringUtil.isNotBlank(rqUrlValue)) { + message.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1")); + SourceList request = new SourceList(); + request.setSource_oss_path(rqUrlValue); + request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1")); + jsonarray.add(request); + } + if (StringUtil.isNotBlank(rpUrlValue)) { + message.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2")); + SourceList response = new SourceList(); + response.setSource_oss_path(rpUrlValue); + response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2")); + jsonarray.add(response); + } + if (StringUtil.isNotBlank(emailUrlValue)) { + message.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9")); + SourceList emailFile = new SourceList(); + emailFile.setSource_oss_path(emailUrlValue); + emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9")); + jsonarray.add(emailFile); + } + fileMeta.setSourceList(jsonarray); + fileMeta.setCommon_log_id((long) message.get("common_log_id")); + fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString())); + fileMeta.setCommon_schema_type((String) message.get("common_schema_type")); + fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000)); + + context.output(metaToKafa, JSONObject.toJSONString(fileMeta)); + } + collector.collect(JsonMapper.toJsonString(message)); + } + } catch (RuntimeException e) { + logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message); + } + } +} diff --git a/src/main/java/com/zdjizhi/utils/general/FileEdit.java b/src/main/java/com/zdjizhi/utils/general/FileEdit.java new file mode 100644 index 0000000..8c3da79 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/general/FileEdit.java @@ -0,0 +1,47 @@ +package com.zdjizhi.utils.general; + +import com.zdjizhi.common.FlowWriteConfig; +import com.zdjizhi.utils.ordinary.MD5Utils; + +import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType; + + +/** + * 文件字段操作工具 + */ +public class FileEdit { + + + public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception { + String fileType = null; + if (judgeFileType(getFileType(urlValue))){ + fileType = getFileType(urlValue); + }else { + if (schemaType.equals("HTTP")){ + fileType = "html"; + } + if (schemaType.equals("MAIL")){ + fileType = "eml"; + } + } + return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix); + } + + public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception { + return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix); + } + + + public static String getFileType(String url){ + String[] split = url.split("\\."); + return split[split.length-1]; + } + + public static String getFileName(String url,String fileSuffix) throws Exception { + String[] arr = url.split("/"); + String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_")); + String prefix = MD5Utils.md5Encode(filename); +// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf(".")); + return prefix+fileSuffix; + } +} diff --git a/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java b/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java new file mode 100644 index 0000000..aa55951 --- /dev/null +++ b/src/main/java/com/zdjizhi/utils/ordinary/MD5Utils.java @@ -0,0 +1,64 @@ +package com.zdjizhi.utils.ordinary; + +import org.apache.log4j.Logger; + +import java.security.MessageDigest; + +/** + * 描述:转换MD5工具类 + * + * @author Administrator + * @create 2018-08-13 15:11 + */ +public class MD5Utils { + private static Logger logger = Logger.getLogger(MD5Utils.class); + + public static String md5Encode(String msg) throws Exception { + try { + byte[] msgBytes = msg.getBytes("utf-8"); + /* + * 声明使用Md5算法,获得MessaDigest对象 + */ + MessageDigest md5 = MessageDigest.getInstance("MD5"); + /* + * 使用指定的字节更新摘要 + */ + md5.update(msgBytes); + /* + * 完成哈希计算,获得密文 + */ + byte[] digest = md5.digest(); + /* + * 以上两行代码等同于 + * byte[] digest = md5.digest(msgBytes); + */ + return byteArr2hexString(digest); + } catch (Exception e) { + logger.error("Error in conversion MD5! " + msg); +// e.printStackTrace(); + return ""; + } + } + + /** + * 将byte数组转化为16进制字符串形式 + * + * @param bys 字节数组 + * @return 字符串 + */ + public static String byteArr2hexString(byte[] bys) { + StringBuffer hexVal = new StringBuffer(); + int val = 0; + for (byte by : bys) { + //将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算 + val = ((int) by) & 0xff; + if (val < 16) { + hexVal.append("0"); + } + hexVal.append(Integer.toHexString(val)); + } + + return hexVal.toString(); + + } +}