根据04版补全程序更新P19双写程序。

This commit is contained in:
wangchengcheng
2022-06-17 16:53:16 +08:00
parent a862f38b6d
commit 935dcfa702
6 changed files with 308 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
txt
html
eml
jpg
png

View File

@@ -0,0 +1,51 @@
package com.zdjizhi.bean;
import com.alibaba.fastjson.JSONArray;
public class FileMeta {
private long common_log_id;
protected int common_recv_time;
private String common_schema_type;
private JSONArray sourceList;
private int processing_time;
public long getCommon_log_id() {
return common_log_id;
}
public void setCommon_log_id(long common_log_id) {
this.common_log_id = common_log_id;
}
public int getCommon_recv_time() {
return common_recv_time;
}
public void setCommon_recv_time(int common_recv_time) {
this.common_recv_time = common_recv_time;
}
public String getCommon_schema_type() {
return common_schema_type;
}
public void setCommon_schema_type(String common_schema_type) {
this.common_schema_type = common_schema_type;
}
public JSONArray getSourceList() {
return sourceList;
}
public void setSourceList(JSONArray sourceList) {
this.sourceList = sourceList;
}
public int getProcessing_time() {
return processing_time;
}
public void setProcessing_time(int processing_time) {
this.processing_time = processing_time;
}
}

View File

@@ -0,0 +1,22 @@
package com.zdjizhi.bean;
public class SourceList {
private String destination_oss_path;
private String source_oss_path;
public String getDestination_oss_path() {
return destination_oss_path;
}
public void setDestination_oss_path(String destination_oss_path) {
this.destination_oss_path = destination_oss_path;
}
public String getSource_oss_path() {
return source_oss_path;
}
public void setSource_oss_path(String source_oss_path) {
this.source_oss_path = source_oss_path;
}
}

View File

@@ -0,0 +1,119 @@
package com.zdjizhi.utils.functions;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.bean.FileMeta;
import com.zdjizhi.bean.SourceList;
import com.zdjizhi.utils.JsonMapper;
import com.zdjizhi.utils.StringUtil;
import com.zdjizhi.utils.general.FileEdit;
import com.zdjizhi.utils.json.JsonTypeUtil;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Map;
/**
* @author wangchengcheng
* @Package com.zdjizhi.utils.functions
* @Description:
* @date 2021/10/14
*/
public class DealFileProcessFunction extends ProcessFunction<Map<String, Object>, String> {
private static final Log logger = LogFactory.get();
private String rpUrlValue;
private String rqUrlValue;
private String emailUrlValue;
private long cfgId = 0; //= common_policy_id;
private String sIp = null; // = common_client_ip;
private int sPort = 0;// = common_client_port;
private String dIp = null;//= common_server_ip;
private int dPort = 0;// = common_server_port;
private long foundTime = 0;// = common_recv_time;
private String account = null;
private String domain = null;
private String schemaType = null;
//初始化侧输流的标记
public static OutputTag<String> metaToKafa = new OutputTag<String>("metaToKafka") {
};
@SuppressWarnings("unchecked")
@Override
public void processElement(Map<String, Object> message, Context context, Collector<String> collector) throws Exception {
try {
if (message.size() > 0) {
// jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
// jsonMap = JsonTypeUtil.typeTransform(map);
rpUrlValue = (String) message.get("http_response_body");
rqUrlValue = (String) message.get("http_request_body");
emailUrlValue = (String) message.get("mail_eml_file");
if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
cfgId = (long) message.get("common_policy_id");
sIp = (String) message.get("common_client_ip");
sPort = (int) message.get("common_client_port");
dIp = (String) message.get("common_server_ip");
dPort = (int) message.get("common_server_port");
foundTime = (long) message.get("common_recv_time");
schemaType = (String) message.get("common_schema_type");
if (StringUtil.isNotBlank((String) message.get("http_domain"))) {
domain = message.get("http_domain").toString();
} else {
domain = "NA";
}
if (StringUtil.isNotBlank((String) message.get("common_subscribe_id"))) {
account = message.get("common_subscribe_id").toString();
} else {
account = "NA";
}
FileMeta fileMeta = new FileMeta();
JSONArray jsonarray = new JSONArray();
if (StringUtil.isNotBlank(rqUrlValue)) {
message.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1"));
SourceList request = new SourceList();
request.setSource_oss_path(rqUrlValue);
request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
jsonarray.add(request);
}
if (StringUtil.isNotBlank(rpUrlValue)) {
message.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2"));
SourceList response = new SourceList();
response.setSource_oss_path(rpUrlValue);
response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
jsonarray.add(response);
}
if (StringUtil.isNotBlank(emailUrlValue)) {
message.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9"));
SourceList emailFile = new SourceList();
emailFile.setSource_oss_path(emailUrlValue);
emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
jsonarray.add(emailFile);
}
fileMeta.setSourceList(jsonarray);
fileMeta.setCommon_log_id((long) message.get("common_log_id"));
fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString()));
fileMeta.setCommon_schema_type((String) message.get("common_schema_type"));
fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
}
collector.collect(JsonMapper.toJsonString(message));
}
} catch (RuntimeException e) {
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
}
}
}

View File

@@ -0,0 +1,47 @@
package com.zdjizhi.utils.general;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.utils.ordinary.MD5Utils;
import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
/**
* 文件字段操作工具
*/
public class FileEdit {
public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception {
String fileType = null;
if (judgeFileType(getFileType(urlValue))){
fileType = getFileType(urlValue);
}else {
if (schemaType.equals("HTTP")){
fileType = "html";
}
if (schemaType.equals("MAIL")){
fileType = "eml";
}
}
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix);
}
public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception {
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix);
}
public static String getFileType(String url){
String[] split = url.split("\\.");
return split[split.length-1];
}
public static String getFileName(String url,String fileSuffix) throws Exception {
String[] arr = url.split("/");
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
String prefix = MD5Utils.md5Encode(filename);
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
return prefix+fileSuffix;
}
}

View File

@@ -0,0 +1,64 @@
package com.zdjizhi.utils.ordinary;
import org.apache.log4j.Logger;
import java.security.MessageDigest;
/**
* 描述:转换MD5工具类
*
* @author Administrator
* @create 2018-08-13 15:11
*/
public class MD5Utils {
private static Logger logger = Logger.getLogger(MD5Utils.class);
public static String md5Encode(String msg) throws Exception {
try {
byte[] msgBytes = msg.getBytes("utf-8");
/*
* 声明使用Md5算法,获得MessaDigest对象
*/
MessageDigest md5 = MessageDigest.getInstance("MD5");
/*
* 使用指定的字节更新摘要
*/
md5.update(msgBytes);
/*
* 完成哈希计算,获得密文
*/
byte[] digest = md5.digest();
/*
* 以上两行代码等同于
* byte[] digest = md5.digest(msgBytes);
*/
return byteArr2hexString(digest);
} catch (Exception e) {
logger.error("Error in conversion MD5! " + msg);
// e.printStackTrace();
return "";
}
}
/**
* 将byte数组转化为16进制字符串形式
*
* @param bys 字节数组
* @return 字符串
*/
public static String byteArr2hexString(byte[] bys) {
StringBuffer hexVal = new StringBuffer();
int val = 0;
for (byte by : bys) {
//将byte转化为int 如果byte是一个负数就必须要和16进制的0xff做一次与运算
val = ((int) by) & 0xff;
if (val < 16) {
hexVal.append("0");
}
hexVal.append(Integer.toHexString(val));
}
return hexVal.toString();
}
}