1.适配百分点OSS V3接口(GAL-384)。
2.将FileMeta的属性sourceList改为source_list。
This commit is contained in:
@@ -10,7 +10,6 @@ import com.zdjizhi.bean.SourceList;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import com.zdjizhi.utils.general.FileEdit;
|
||||
import com.zdjizhi.utils.json.JsonTypeUtil;
|
||||
|
||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
@@ -53,56 +52,50 @@ public class DealFileProcessFunction extends ProcessFunction<Map<String, Object>
|
||||
public void processElement(Map<String, Object> message, Context context, Collector<String> collector) throws Exception {
|
||||
try {
|
||||
if (message.size() > 0) {
|
||||
// jsonMap = (Map<String, Object>) JsonMapper.fromJsonString(message, Map.class);
|
||||
// jsonMap = JsonTypeUtil.typeTransform(map);
|
||||
|
||||
rpUrlValue = (String) message.get("http_response_body");
|
||||
rqUrlValue = (String) message.get("http_request_body");
|
||||
emailUrlValue = (String) message.get("mail_eml_file");
|
||||
|
||||
|
||||
if (StringUtil.isNotBlank(rpUrlValue) || StringUtil.isNotBlank(rqUrlValue) || StringUtil.isNotBlank(emailUrlValue)) {
|
||||
cfgId = (long) message.get("common_policy_id");
|
||||
cfgId = (long) message.getOrDefault("common_policy_id",0L);
|
||||
sIp = (String) message.get("common_client_ip");
|
||||
sPort = (int) message.get("common_client_port");
|
||||
dIp = (String) message.get("common_server_ip");
|
||||
dPort = (int) message.get("common_server_port");
|
||||
foundTime = (long) message.get("common_recv_time");
|
||||
schemaType = (String) message.get("common_schema_type");
|
||||
domain = (String)message.getOrDefault("http_domain","");
|
||||
account = (String)message.getOrDefault("common_subscribe_id","");
|
||||
|
||||
if (StringUtil.isNotBlank((String) message.get("http_domain"))) {
|
||||
domain = message.get("http_domain").toString();
|
||||
} else {
|
||||
domain = "NA";
|
||||
}
|
||||
if (StringUtil.isNotBlank((String) message.get("common_subscribe_id"))) {
|
||||
account = message.get("common_subscribe_id").toString();
|
||||
} else {
|
||||
account = "NA";
|
||||
}
|
||||
FileMeta fileMeta = new FileMeta();
|
||||
JSONArray jsonarray = new JSONArray();
|
||||
if (StringUtil.isNotBlank(rqUrlValue)) {
|
||||
message.put("http_request_body", FileEdit.fileDownloadUrl(rqUrlValue, "_1"));
|
||||
System.out.println(rqUrlValue);
|
||||
String fileId = FileEdit.getFileId(rqUrlValue,"_1");
|
||||
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
|
||||
SourceList request = new SourceList();
|
||||
request.setSource_oss_path(rqUrlValue);
|
||||
request.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, "_1"));
|
||||
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rqUrlValue, schemaType, fileId));
|
||||
jsonarray.add(request);
|
||||
}
|
||||
if (StringUtil.isNotBlank(rpUrlValue)) {
|
||||
message.put("http_response_body", FileEdit.fileDownloadUrl(rpUrlValue, "_2"));
|
||||
String fileId = FileEdit.getFileId(rpUrlValue,"_2");
|
||||
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
|
||||
SourceList response = new SourceList();
|
||||
response.setSource_oss_path(rpUrlValue);
|
||||
response.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, "_2"));
|
||||
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, rpUrlValue, schemaType, fileId));
|
||||
jsonarray.add(response);
|
||||
}
|
||||
if (StringUtil.isNotBlank(emailUrlValue)) {
|
||||
message.put("mail_eml_file", FileEdit.fileDownloadUrl(emailUrlValue, "_9"));
|
||||
String fileId = FileEdit.getFileId(emailUrlValue,"_9");
|
||||
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
|
||||
SourceList emailFile = new SourceList();
|
||||
emailFile.setSource_oss_path(emailUrlValue);
|
||||
emailFile.setDestination_oss_path(FileEdit.fileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, "_9"));
|
||||
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, emailUrlValue, schemaType, fileId));
|
||||
jsonarray.add(emailFile);
|
||||
}
|
||||
fileMeta.setSourceList(jsonarray);
|
||||
fileMeta.setSource_list(jsonarray);
|
||||
fileMeta.setCommon_log_id((long) message.get("common_log_id"));
|
||||
fileMeta.setCommon_recv_time(Integer.parseInt(message.get("common_recv_time").toString()));
|
||||
fileMeta.setCommon_schema_type((String) message.get("common_schema_type"));
|
||||
|
||||
@@ -12,7 +12,7 @@ import static com.zdjizhi.utils.system.FlowWriteConfigurations.judgeFileType;
|
||||
public class FileEdit {
|
||||
|
||||
|
||||
public static String fileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileSuffix) throws Exception {
|
||||
public static String getFileUploadUrl(long cfgId,String sIp,int sPort,String dIp,int dPort,long foundTime,String account,String domain, String urlValue,String schemaType,String fileId){
|
||||
String fileType = null;
|
||||
if (judgeFileType(getFileType(urlValue))){
|
||||
fileType = getFileType(urlValue);
|
||||
@@ -24,11 +24,12 @@ public class FileEdit {
|
||||
fileType = "eml";
|
||||
}
|
||||
}
|
||||
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/upload_v2"+"/"+cfgId+"/"+fileType+"/"+sIp+"/"+sPort+"/"+dIp+"/"+dPort+"/"+foundTime+"/"+account+"/"+domain+"/"+getFileName(urlValue,fileSuffix);
|
||||
|
||||
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/upload?cfg_id="+cfgId+"&file_id="+fileId+"&file_type="+fileType+"&found_time="+foundTime+"&s_ip="+sIp+"&s_port="+sPort+"&d_ip="+dIp+"&d_port="+dPort+"&domain="+domain+"&account="+account;
|
||||
}
|
||||
|
||||
public static String fileDownloadUrl( String urlValue,String fileSuffix) throws Exception {
|
||||
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/download_v2"+"/"+getFileName(urlValue,fileSuffix);
|
||||
public static String getFileDownloadUrl(String fileId){
|
||||
return "http://"+ FlowWriteConfig.OOS_SERVERS+"/v3/download?file_id="+fileId;
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +38,8 @@ public class FileEdit {
|
||||
return split[split.length-1];
|
||||
}
|
||||
|
||||
public static String getFileName(String url,String fileSuffix) throws Exception {
|
||||
public static String getFileId(String url,String fileSuffix) throws Exception {
|
||||
|
||||
String[] arr = url.split("/");
|
||||
String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
|
||||
String prefix = MD5Utils.md5Encode(filename);
|
||||
|
||||
@@ -44,7 +44,7 @@ public class TransFormMap {
|
||||
} catch (RuntimeException e) {
|
||||
logger.error("TransForm logs failed,The exception is :" + e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user