From 4962a40f97ba75ff976f788fa089a4e5cc5b7335 Mon Sep 17 00:00:00 2001 From: wangchengcheng Date: Wed, 31 Jan 2024 22:52:22 +0800 Subject: [PATCH] fix:support oos address list --- properties/service_flow_config.properties | 6 ++++-- .../com/zdjizhi/common/FlowWriteConfig.java | 4 +++- .../operator/count/SendCountProcess.java | 3 +-- .../percent/PercentSessionProcess.java | 1 - .../operator/process/DealFileProcess.java | 21 +++++++++++++------ .../com/zdjizhi/tools/general/FileEdit.java | 21 ++++--------------- .../topology/LogFlowWriteTopology.java | 14 ++++++------- 7 files changed, 33 insertions(+), 37 deletions(-) diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 3afe10d..19c11eb 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -48,7 +48,9 @@ transform.parallelism=1 deal.file.parallelism=1 sink.file.data.parallelism=1 -sink.percent.parallelism=1 +sink.percent.session.parallelism=1 +sink.percent.security.parallelism=1 +sink.percent.proxy.parallelism=1 #数据中心,取值范围(0-31) data.center.id.num=0 #hbase 更新时间,如填写0则不更新缓存 @@ -58,7 +60,7 @@ hbase.tick.tuple.freq.secs=180 producer.kafka.compression.type=snappy #------------------------------------OOS配置------------------------------------# #oos地址 -oos.servers=10.3.45.100:8057 +oos.servers=172.18.10.153:8058,172.18.10.154:8058,172.18.10.155:8058,172.18.10.156:8058,172.18.10.157:8058 #prometheus-httpserver prometheus.pushgateway.address=192.168.44.12:9091 pushgateway.statistics.time=300 diff --git a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java index cd3ca43..20b6dde 100644 --- a/src/main/java/com/zdjizhi/common/FlowWriteConfig.java +++ b/src/main/java/com/zdjizhi/common/FlowWriteConfig.java @@ -83,8 +83,10 @@ public class FlowWriteConfig { public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout"); public static final Integer DEAL_FILE_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "deal.file.parallelism"); public static final Integer SINK_FILE_DATA_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.file.data.parallelism"); - public static final Integer SINK_PERCENT_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.parallelism"); + public static final Integer SINK_PERCENT_SESSION_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.session.parallelism"); + public static final Integer SINK_PERCENT_SECURITY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.security.parallelism"); + public static final Integer SINK_PERCENT_PROXY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.proxy.parallelism"); /** * HBase */ diff --git a/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java b/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java index 009345f..7ca1eb7 100644 --- a/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java +++ b/src/main/java/com/zdjizhi/operator/count/SendCountProcess.java @@ -73,8 +73,7 @@ public class SendCountProcess extends ProcessFunction { } catch (Exception e) { logger.error("percent_session_record.properties日志加载失败,失败原因为:" + e); } - } @Override diff --git a/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java b/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java index 3bba57d..0e2ac5d 100644 --- a/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java +++ b/src/main/java/com/zdjizhi/operator/process/DealFileProcess.java @@ -16,6 +16,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; +import java.util.Random; import java.util.Timer; import java.util.TimerTask; @@ -61,6 +62,7 @@ public class DealFileProcess extends ProcessFunction { private boolean metricSendFlag = true; private long securityCount = 0L; private long proxyCount = 0L; + private Random random; //初始化侧输流的标记 @@ -70,6 +72,7 @@ public class DealFileProcess extends ProcessFunction { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); + random = new Random(); Timer timer = new Timer(); //注册定时器 timer.schedule(new TimerTask() { @@ -118,31 +121,37 @@ public class DealFileProcess extends ProcessFunction { account = (String) message.getOrDefault("common_subscribe_id", ""); FileMeta fileMeta = new FileMeta(); JSONArray jsonarray = new JSONArray(); + + final String[] oosArr = FlowWriteConfig.OOS_SERVERS.split(","); + + final int i = random.nextInt(oosArr.length); + + if (StringUtil.isNotBlank(rqUrlValue)) { String fileId = FileEdit.getFileId(rqUrlValue, "_1"); - message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId)); + message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i])); SourceList request = new SourceList(); request.setSource_oss_path(FlowWriteConfig.HOS_URL + rqUrlValue); - request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); + request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i])); jsonarray.add(request); httpRequestCount++; } if (StringUtil.isNotBlank(rpUrlValue)) { String fileId = FileEdit.getFileId(rpUrlValue, "_2"); - message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId)); + message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i])); SourceList response = new SourceList(); response.setSource_oss_path(FlowWriteConfig.HOS_URL + rpUrlValue); - response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); + response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i])); jsonarray.add(response); httpResponseCount++; } if (StringUtil.isNotBlank(emailUrlValue)) { String fileId = FileEdit.getFileId(emailUrlValue, "_9"); - message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId)); + message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId,oosArr[i])); SourceList emailFile = new SourceList(); emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL + emailUrlValue); - emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId)); + emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i])); jsonarray.add(emailFile); mailEmlCount++; } diff --git a/src/main/java/com/zdjizhi/tools/general/FileEdit.java b/src/main/java/com/zdjizhi/tools/general/FileEdit.java index e8cf99a..ba78ed9 100644 --- a/src/main/java/com/zdjizhi/tools/general/FileEdit.java +++ b/src/main/java/com/zdjizhi/tools/general/FileEdit.java @@ -1,18 +1,12 @@ package com.zdjizhi.tools.general; -import com.zdjizhi.common.FlowWriteConfig; -import com.zdjizhi.tools.ordinary.MD5Utils; - -import static com.zdjizhi.common.FlowWriteConfig.judgeFileType; - - /** * 文件字段操作工具 */ public class FileEdit { - public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId) { + public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId,String oosServer) { String fileType = null; if (schemaType.equals("HTTP")) { fileType = "html"; @@ -21,26 +15,19 @@ public class FileEdit { fileType = "eml"; } - - return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account; + return "http://" + oosServer + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account; } - public static String getFileDownloadUrl(String fileId) { - return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/download?file_id=" + fileId; + public static String getFileDownloadUrl(String fileId,String oosServer) { + return "http://" + oosServer + "/v3/download?file_id=" + fileId; } - public static String getFileType(String url) { String[] split = url.split("\\."); return split[split.length - 1]; } public static String getFileId(String filename, String fileSuffix) throws Exception { - -// String[] arr = url.split("/"); -// String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_")); -// String prefix = MD5Utils.md5Encode(filename); -// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf(".")); return filename + fileSuffix; } } diff --git a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java index 7dbf5de..f2189c8 100644 --- a/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java +++ b/src/main/java/com/zdjizhi/topology/LogFlowWriteTopology.java @@ -16,8 +16,6 @@ import com.zdjizhi.tools.connections.kafka.KafkaConsumer; import com.zdjizhi.tools.connections.kafka.KafkaProducer; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.descriptors.Kafka; - public class LogFlowWriteTopology { private static final Log logger = LogFactory.get(); @@ -42,15 +40,15 @@ public class LogFlowWriteTopology { //处理带有非结构化文件字段的数据 SingleOutputStreamOperator dealFileProcessFunction = completedStream.process(new DealFileProcess()).name("DealFileProcess").uid("DealFile-Process").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM); //补全后的数据发送至百分点的kafka - final SingleOutputStreamOperator percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(1); + final SingleOutputStreamOperator percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM); - final SingleOutputStreamOperator percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(1); + final SingleOutputStreamOperator percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM); - final SingleOutputStreamOperator percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(1); + final SingleOutputStreamOperator percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM); - percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); - percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); - percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM); + percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM); + percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM); + percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM); //文件元数据发送至TRAFFIC-FILE-METADATA dealFileProcessFunction.getSideOutput(DealFileProcess.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM);