package com.zdjizhi.utils.functions; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.bean.FileMeta; import com.zdjizhi.bean.SourceList; import com.zdjizhi.utils.JsonMapper; import com.zdjizhi.utils.StringUtil; import com.zdjizhi.utils.general.FileEdit; import com.zdjizhi.utils.json.JsonTypeUtils; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.util.Map; /** * @author wangchengcheng * @Package com.zdjizhi.utils.functions * @Description: * @date 2021/10/14 */ public class DealFileProcessFunction extends ProcessFunction { private static final Log logger = LogFactory.get(); private Map jsonMap = null; private String rpUrlValue; private String rqUrlValue; private String emailUrlValue; private long cfgId = 0; //= common_policy_id; private String sIp = null; // = common_client_ip; private int sPort = 0;// = common_client_port; private String dIp = null;//= common_server_ip; private int dPort = 0;// = common_server_port; private long foundTime = 0;// = common_recv_time; private String account = null; private String domain = null; private String schemaType = null; //初始化侧输流的标记 public static OutputTag metaToKafa = new OutputTag("metaToKafka") {}; @SuppressWarnings("unchecked") @Override public void processElement(String message, Context context, Collector collector) throws Exception { try { if (StringUtil.isNotBlank(message)) { Map map = (Map) JsonMapper.fromJsonString(message, Map.class); jsonMap = JsonTypeUtils.typeTransform(map); rpUrlValue = (String) jsonMap.get("http_response_body"); rqUrlValue = (String) jsonMap.get("http_request_body"); emailUrlValue = (String) jsonMap.get("mail_eml_file"); if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) { cfgId = (long) jsonMap.get("common_policy_id"); sIp = (String) jsonMap.get("common_client_ip"); sPort = (int) jsonMap.get("common_client_port"); dIp = (String) jsonMap.get("common_server_ip"); dPort = (int) jsonMap.get("common_server_port"); foundTime = (long) jsonMap.get("common_recv_time"); schemaType = (String) jsonMap.get("common_schema_type"); if (StringUtil.isNotBlank((String) jsonMap.get("http_domain"))) { domain = jsonMap.get("http_domain").toString(); } else { domain = "NA"; } if (StringUtil.isNotBlank((String) jsonMap.get("common_subscribe_id"))) { account = jsonMap.get("common_subscribe_id").toString(); } else { account = "NA"; } FileMeta fileMeta = new FileMeta(); JSONArray jsonarray = new JSONArray(); if (StringUtil.isNotBlank(rqUrlValue)) { jsonMap.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1")); SourceList request = new SourceList(); request.setSource_oss_path(rqUrlValue); request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1")); jsonarray.add(request); } if (StringUtil.isNotBlank(rpUrlValue)) { jsonMap.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2")); SourceList response = new SourceList(); response.setSource_oss_path(rpUrlValue); response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2")); jsonarray.add(response); } if (StringUtil.isNotBlank(emailUrlValue)) { jsonMap.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9")); SourceList emailFile = new SourceList(); emailFile.setSource_oss_path(emailUrlValue); emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9")); jsonarray.add(emailFile); } fileMeta.setSourceList(jsonarray); fileMeta.setCommon_log_id((long) jsonMap.get("common_log_id")); fileMeta.setCommon_recv_time(Integer.parseInt(jsonMap.get("common_recv_time").toString())); fileMeta.setCommon_schema_type((String) jsonMap.get("common_schema_type")); fileMeta.setProcessing_time((int) (System.currentTimeMillis() / 1000)); context.output(metaToKafa, JSONObject.toJSONString(fileMeta)); } collector.collect(JsonMapper.toJsonString(jsonMap)); } else { collector.collect(message); } }catch (RuntimeException e) { logger.error("处理带有非结构结构化字段的日志出错:" + e + "\n" + message); } } }