1.更新为日志补全11版本
2.完善文件名后缀种类
This commit is contained in:
@@ -1,123 +0,0 @@
|
||||
package com.zdjizhi.utils.functions;
|
||||
|
||||
|
||||
import cn.hutool.log.Log;
|
||||
import cn.hutool.log.LogFactory;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.bean.FileMeta;
|
||||
|
||||
|
||||
import com.zdjizhi.bean.SourceList;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.FileEdit;
|
||||
import com.zdjizhi.utils.json.JsonTypeUtils;
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.OutputTag;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* @author wangchengcheng
|
||||
* @Package com.zdjizhi.utils.functions
|
||||
* @Description:
|
||||
* @date 2021/10/14
|
||||
*/
|
||||
public class DealFileProcessFunction extends ProcessFunction<String,String> {
|
||||
private static final Log logger = LogFactory.get();
|
||||
|
||||
private Map<String, Object> jsonMap = null;
|
||||
private String rpUrlValue;
|
||||
private String rqUrlValue;
|
||||
private String emailUrlValue;
|
||||
|
||||
private long cfgId = 0; //= common_policy_id;
|
||||
|
||||
private String sIp = null; // = common_client_ip;
|
||||
private int sPort = 0;// = common_client_port;
|
||||
private String dIp = null;//= common_server_ip;
|
||||
private int dPort = 0;// = common_server_port;
|
||||
private long foundTime = 0;// = common_recv_time;
|
||||
private String account = null;
|
||||
private String domain = null;
|
||||
private String schemaType = null;
|
||||
|
||||
|
||||
//初始化侧输流的标记
|
||||
public static OutputTag<String> metaToKafa = new OutputTag<String>("metaToKafka") {};
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void processElement(String message, Context context, Collector<String> collector) throws Exception {
|
||||
try {
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
Map<String, Object> map = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
jsonMap = JsonTypeUtils.typeTransform(map);
|
||||
rpUrlValue = (String) jsonMap.get("http_response_body");
|
||||
rqUrlValue = (String) jsonMap.get("http_request_body");
|
||||
emailUrlValue = (String) jsonMap.get("mail_eml_file");
|
||||
|
||||
if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
|
||||
cfgId = (long) jsonMap.get("common_policy_id");
|
||||
sIp = (String) jsonMap.get("common_client_ip");
|
||||
sPort = (int) jsonMap.get("common_client_port");
|
||||
dIp = (String) jsonMap.get("common_server_ip");
|
||||
dPort = (int) jsonMap.get("common_server_port");
|
||||
foundTime = (long) jsonMap.get("common_recv_time");
|
||||
schemaType = (String) jsonMap.get("common_schema_type");
|
||||
|
||||
if (StringUtil.isNotBlank((String) jsonMap.get("http_domain"))) {
|
||||
domain = jsonMap.get("http_domain").toString();
|
||||
} else {
|
||||
domain = "NA";
|
||||
}
|
||||
if (StringUtil.isNotBlank((String) jsonMap.get("common_subscribe_id"))) {
|
||||
account = jsonMap.get("common_subscribe_id").toString();
|
||||
} else {
|
||||
account = "NA";
|
||||
}
|
||||
|
||||
FileMeta fileMeta = new FileMeta();
|
||||
JSONArray jsonarray = new JSONArray();
|
||||
if (StringUtil.isNotBlank(rqUrlValue)) {
|
||||
jsonMap.put("http_request_body", FileEdit.dealFileUrlToPercent(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
|
||||
SourceList request = new SourceList();
|
||||
request.setSource_oss_path(rqUrlValue);
|
||||
request.setDestination_oss_path(FileEdit.dealFileUrlToPercent(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
|
||||
jsonarray.add(request);
|
||||
}
|
||||
if (StringUtil.isNotBlank(rpUrlValue)) {
|
||||
jsonMap.put("http_response_body", FileEdit.dealFileUrlToPercent(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
|
||||
SourceList response = new SourceList();
|
||||
response.setSource_oss_path(rpUrlValue);
|
||||
response.setDestination_oss_path(FileEdit.dealFileUrlToPercent(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
|
||||
jsonarray.add(response);
|
||||
}
|
||||
if (StringUtil.isNotBlank(emailUrlValue)) {
|
||||
jsonMap.put("mail_eml_file", FileEdit.dealFileUrlToPercent(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
|
||||
SourceList emailFile = new SourceList();
|
||||
emailFile.setSource_oss_path(emailUrlValue);
|
||||
emailFile.setDestination_oss_path(FileEdit.dealFileUrlToPercent(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
|
||||
jsonarray.add(emailFile);
|
||||
}
|
||||
fileMeta.setSourceList(jsonarray);
|
||||
fileMeta.setCommon_log_id((long) jsonMap.get("common_log_id"));
|
||||
fileMeta.setCommon_recv_time(Integer.parseInt(jsonMap.get("common_recv_time").toString()));
|
||||
fileMeta.setCommon_schema_type((String) jsonMap.get("common_schema_type"));
|
||||
fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000));
|
||||
|
||||
context.output(metaToKafa, JSONObject.toJSONString(fileMeta));
|
||||
}
|
||||
collector.collect(JsonMapper.toJsonString(jsonMap));
|
||||
} else {
|
||||
collector.collect(message);
|
||||
}
|
||||
}catch (RuntimeException e) {
|
||||
logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user