fix:support oos address list

This commit is contained in:
wangchengcheng
2024-01-31 22:52:22 +08:00
parent ed07a3dbfb
commit 4962a40f97
7 changed files with 33 additions and 37 deletions

View File

@@ -48,7 +48,9 @@ transform.parallelism=1
deal.file.parallelism=1
sink.file.data.parallelism=1
sink.percent.parallelism=1
sink.percent.session.parallelism=1
sink.percent.security.parallelism=1
sink.percent.proxy.parallelism=1
#数据中心,取值范围(0-31)
data.center.id.num=0
#hbase 更新时间如填写0则不更新缓存
@@ -58,7 +60,7 @@ hbase.tick.tuple.freq.secs=180
producer.kafka.compression.type=snappy
#------------------------------------OOS配置------------------------------------#
#oos地址
oos.servers=10.3.45.100:8057
oos.servers=172.18.10.153:8058,172.18.10.154:8058,172.18.10.155:8058,172.18.10.156:8058,172.18.10.157:8058
#prometheus-httpserver
prometheus.pushgateway.address=192.168.44.12:9091
pushgateway.statistics.time=300

View File

@@ -83,8 +83,10 @@ public class FlowWriteConfig {
public static final Integer BUFFER_TIMEOUT = ConfigurationsUtils.getIntProperty(propDefault, "buffer.timeout");
public static final Integer DEAL_FILE_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "deal.file.parallelism");
public static final Integer SINK_FILE_DATA_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.file.data.parallelism");
public static final Integer SINK_PERCENT_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.parallelism");
public static final Integer SINK_PERCENT_SESSION_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.session.parallelism");
public static final Integer SINK_PERCENT_SECURITY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.security.parallelism");
public static final Integer SINK_PERCENT_PROXY_PARALLELISM = ConfigurationsUtils.getIntProperty(propService, "sink.percent.proxy.parallelism");
/**
* HBase
*/

View File

@@ -73,8 +73,7 @@ public class SendCountProcess extends ProcessFunction<Tuple7<Long, Long, Long, L
httpRequestCount = 0;
httpResponseCount = 0;
mailEmlCount = 0;
securityCount = 0;
proxyCount = 0;
}
}
}, 0, FlowWriteConfig.PUSHGATEWAY_STATISTICS_TIME * 1000);

View File

@@ -27,7 +27,6 @@ public class PercentSessionProcess extends ProcessFunction<JSONObject, String> {
} catch (Exception e) {
logger.error("percent_session_record.properties日志加载失败,失败原因为:" + e);
}
}
@Override

View File

@@ -16,6 +16,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
@@ -61,6 +62,7 @@ public class DealFileProcess extends ProcessFunction<JSONObject, JSONObject> {
private boolean metricSendFlag = true;
private long securityCount = 0L;
private long proxyCount = 0L;
private Random random;
//初始化侧输流的标记
@@ -70,6 +72,7 @@ public class DealFileProcess extends ProcessFunction<JSONObject, JSONObject> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
Timer timer = new Timer();
//注册定时器
timer.schedule(new TimerTask() {
@@ -118,31 +121,37 @@ public class DealFileProcess extends ProcessFunction<JSONObject, JSONObject> {
account = (String) message.getOrDefault("common_subscribe_id", "");
FileMeta fileMeta = new FileMeta();
JSONArray jsonarray = new JSONArray();
final String[] oosArr = FlowWriteConfig.OOS_SERVERS.split(",");
final int i = random.nextInt(oosArr.length);
if (StringUtil.isNotBlank(rqUrlValue)) {
String fileId = FileEdit.getFileId(rqUrlValue, "_1");
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId));
message.put("http_request_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
SourceList request = new SourceList();
request.setSource_oss_path(FlowWriteConfig.HOS_URL + rqUrlValue);
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
request.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
jsonarray.add(request);
httpRequestCount++;
}
if (StringUtil.isNotBlank(rpUrlValue)) {
String fileId = FileEdit.getFileId(rpUrlValue, "_2");
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId));
message.put("http_response_body", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
SourceList response = new SourceList();
response.setSource_oss_path(FlowWriteConfig.HOS_URL + rpUrlValue);
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
response.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
jsonarray.add(response);
httpResponseCount++;
}
if (StringUtil.isNotBlank(emailUrlValue)) {
String fileId = FileEdit.getFileId(emailUrlValue, "_9");
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId));
message.put("mail_eml_file", FileEdit.getFileDownloadUrl(fileId,oosArr[i]));
SourceList emailFile = new SourceList();
emailFile.setSource_oss_path(FlowWriteConfig.HOS_URL + emailUrlValue);
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId));
emailFile.setDestination_oss_path(FileEdit.getFileUploadUrl(cfgId, sIp, sPort, dIp, dPort, foundTime, account, domain, schemaType, fileId,oosArr[i]));
jsonarray.add(emailFile);
mailEmlCount++;
}

View File

@@ -1,18 +1,12 @@
package com.zdjizhi.tools.general;
import com.zdjizhi.common.FlowWriteConfig;
import com.zdjizhi.tools.ordinary.MD5Utils;
import static com.zdjizhi.common.FlowWriteConfig.judgeFileType;
/**
* 文件字段操作工具
*/
public class FileEdit {
public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId) {
public static String getFileUploadUrl(long cfgId, String sIp, int sPort, String dIp, int dPort, long foundTime, String account, String domain, String schemaType, String fileId,String oosServer) {
String fileType = null;
if (schemaType.equals("HTTP")) {
fileType = "html";
@@ -21,26 +15,19 @@ public class FileEdit {
fileType = "eml";
}
return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account;
return "http://" + oosServer + "/v3/upload?cfg_id=" + cfgId + "&file_id=" + fileId + "&file_type=" + fileType + "&found_time=" + foundTime + "&s_ip=" + sIp + "&s_port=" + sPort + "&d_ip=" + dIp + "&d_port=" + dPort + "&domain=" + domain + "&account=" + account;
}
public static String getFileDownloadUrl(String fileId) {
return "http://" + FlowWriteConfig.OOS_SERVERS + "/v3/download?file_id=" + fileId;
public static String getFileDownloadUrl(String fileId,String oosServer) {
return "http://" + oosServer + "/v3/download?file_id=" + fileId;
}
public static String getFileType(String url) {
String[] split = url.split("\\.");
return split[split.length - 1];
}
public static String getFileId(String filename, String fileSuffix) throws Exception {
// String[] arr = url.split("/");
// String filename = arr[arr.length-1].substring(0,arr[arr.length-1].lastIndexOf("_"));
// String prefix = MD5Utils.md5Encode(filename);
// String suffix = arr[arr.length-1].substring(arr[arr.length-1].lastIndexOf("_"),arr[arr.length-1].lastIndexOf("."));
return filename + fileSuffix;
}
}

View File

@@ -16,8 +16,6 @@ import com.zdjizhi.tools.connections.kafka.KafkaConsumer;
import com.zdjizhi.tools.connections.kafka.KafkaProducer;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.descriptors.Kafka;
public class LogFlowWriteTopology {
private static final Log logger = LogFactory.get();
@@ -42,15 +40,15 @@ public class LogFlowWriteTopology {
//处理带有非结构化文件字段的数据
SingleOutputStreamOperator<JSONObject> dealFileProcessFunction = completedStream.process(new DealFileProcess()).name("DealFileProcess").uid("DealFile-Process").setParallelism(FlowWriteConfig.DEAL_FILE_PARALLELISM);
//补全后的数据发送至百分点的kafka
final SingleOutputStreamOperator<String> percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(1);
final SingleOutputStreamOperator<String> percentSecurityProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentSecurityTag).process(new PercentSecurityProcess()).name("PercentSecurityProcess").uid("Percent-Security-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM);
final SingleOutputStreamOperator<String> percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(1);
final SingleOutputStreamOperator<String> percentProxyProcess = dealFileProcessFunction.getSideOutput(DealFileProcess.percentProxyTag).process(new PercentProxyProcess()).name("PercentProxyProcess").uid("Percent-Proxy-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM);
final SingleOutputStreamOperator<String> percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(1);
final SingleOutputStreamOperator<String> percentSessionProcess = dealFileProcessFunction.process(new PercentSessionProcess()).name("PercentSessionProcess").uid("Percent-Session-Process").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM);
percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM);
percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM);
percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_PARALLELISM);
percentSecurityProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SECURITY)).name("sendSecurityEvent").uid("send-Security-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_PROXY_PARALLELISM);
percentProxyProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_PROXY)).name("sendProxyEvent").uid("send-Proxy-Event").setParallelism(FlowWriteConfig.SINK_PERCENT_SECURITY_PARALLELISM);
percentSessionProcess.addSink(KafkaProducer.getPercentKafkaProducer(FlowWriteConfig.SINK_PERCENT_KAFKA_TOPIC_SESSION)).name("sendSessionRECORD").uid("send-Session-RECORD").setParallelism(FlowWriteConfig.SINK_PERCENT_SESSION_PARALLELISM);
//文件元数据发送至TRAFFIC-FILE-METADATA
dealFileProcessFunction.getSideOutput(DealFileProcess.metaToKafa).addSink(KafkaProducer.getTrafficFileMetaKafkaProducer()).name("toTrafficFileMeta").uid("to-Traffic-FileMeta").setParallelism(FlowWriteConfig.SINK_FILE_DATA_PARALLELISM);