From 7c201a8a3f259618125d7f2f90bca45ff02d238e Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 16 Dec 2022 16:52:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ENacos=20Namespace=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=EF=BC=8C=E5=88=A0=E9=99=A4=E6=9B=B4=E6=96=B0=E8=87=B3?= =?UTF-8?q?HDFS=E6=97=B6Flush=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/zdjizhi/common/CommonConfig.java | 2 ++ src/main/java/com/zdjizhi/sink/OutputStreamSink.java | 1 + src/main/java/com/zdjizhi/source/SingleHttpSource.java | 2 +- src/main/java/com/zdjizhi/utils/HdfsUtils.java | 2 +- src/main/resources/common.properties | 10 +++++----- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java index 1b5e4ba..9bb50c6 100644 --- a/src/main/java/com/zdjizhi/common/CommonConfig.java +++ b/src/main/java/com/zdjizhi/common/CommonConfig.java @@ -77,10 +77,12 @@ public class CommonConfig { public static final String NACOS_SERVER_ADDR = CommonConfigurations.getStringProperty("nacos.server.addr"); public static final String NACOS_USERNAME = CommonConfigurations.getStringProperty("nacos.username"); public static final String NACOS_PASSWORD = CommonConfigurations.getStringProperty("nacos.password"); + public static final String NACOS_NAMESPACE = CommonConfigurations.getStringProperty("nacos.namespace"); public static final String NACOS_DATA_ID = CommonConfigurations.getStringProperty("nacos.data.id"); public static final String NACOS_GROUP = CommonConfigurations.getStringProperty("nacos.group"); public static final int NACOS_READ_TIMEOUT = CommonConfigurations.getIntProperty("nacos.read.timeout"); + public static final String HOS_TOKEN = CommonConfigurations.getStringProperty("hos.token"); public static final String CLUSTER_OR_SINGLE = CommonConfigurations.getStringProperty("cluster.or.single"); diff --git a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java index c187415..91d3b8e 100644 --- a/src/main/java/com/zdjizhi/sink/OutputStreamSink.java +++ b/src/main/java/com/zdjizhi/sink/OutputStreamSink.java @@ -52,6 +52,7 @@ public class OutputStreamSink { nacosProperties.put(PropertyKeyConst.SERVER_ADDR,CommonConfig.NACOS_SERVER_ADDR); nacosProperties.setProperty(PropertyKeyConst.USERNAME, CommonConfig.NACOS_USERNAME); nacosProperties.setProperty(PropertyKeyConst.PASSWORD, CommonConfig.NACOS_PASSWORD); + nacosProperties.setProperty(PropertyKeyConst.NAMESPACE, CommonConfig.NACOS_NAMESPACE); if ("CLUSTER".equals(CommonConfig.CLUSTER_OR_SINGLE)){ broadcastSource = DosSketchSource.broadcastSource(nacosProperties,CommonConfig.HDFS_PATH); diff --git a/src/main/java/com/zdjizhi/source/SingleHttpSource.java b/src/main/java/com/zdjizhi/source/SingleHttpSource.java index a946b53..a72a82a 100644 --- a/src/main/java/com/zdjizhi/source/SingleHttpSource.java +++ b/src/main/java/com/zdjizhi/source/SingleHttpSource.java @@ -28,7 +28,7 @@ import java.util.concurrent.Executor; public class SingleHttpSource extends RichHttpSourceFunction> { - private static final Logger logger = LoggerFactory.getLogger(HttpSource.class); + private static final Logger logger = LoggerFactory.getLogger(SingleHttpSource.class); private static HashMap knowledgeFileCache; private Properties nacosProperties; diff --git a/src/main/java/com/zdjizhi/utils/HdfsUtils.java b/src/main/java/com/zdjizhi/utils/HdfsUtils.java index c1e4021..4cbc199 100644 --- a/src/main/java/com/zdjizhi/utils/HdfsUtils.java +++ b/src/main/java/com/zdjizhi/utils/HdfsUtils.java @@ -59,7 +59,7 @@ public class HdfsUtils { public static void uploadFileByBytes(String filePath,byte[] bytes) throws IOException { try (FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath), true)) { fsDataOutputStream.write(bytes); - fsDataOutputStream.flush(); +// fsDataOutputStream.flush(); } catch (RuntimeException e) { logger.error("Uploading files to the HDFS is abnormal. Message is :" + e.getMessage()); } catch (IOException e) { diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties index 6d49ff8..083dbba 100644 --- a/src/main/resources/common.properties +++ b/src/main/resources/common.properties @@ -5,7 +5,7 @@ stream.execution.environment.parallelism=1 stream.execution.job.name=DOS-DETECTION-APPLICATION #输入kafka并行度大小 -kafka.input.parallelism=1 +kafka.input.parallelism=3 #输入kafka topic名 kafka.input.topic.name=DOS-SKETCH-RECORD @@ -19,14 +19,14 @@ kafka.input.group.id=dos-detection-job-221125-1 #kafka.input.group.id=dos-detection-job-210813-1 #发送kafka metrics并行度大小 -kafka.output.metric.parallelism=1 +kafka.output.metric.parallelism=3 #发送kafka metrics topic名 #kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS kafka.output.metric.topic.name=test #发送kafka event并行度大小 -kafka.output.event.parallelism=1 +kafka.output.event.parallelism=3 #发送kafka event topic名 #kafka.output.event.topic.name=DOS-EVENT @@ -150,8 +150,8 @@ cluster.or.single=CLUSTER ############################## 集群模式配置文件路径 配置 ###################################### hdfs.path=/test/TEST/ -hdfs.uri.nn1=hdfs://192.168.40.151:9000 -hdfs.uri.nn2=hdfs://192.168.40.152:9000 +hdfs.uri.nn1=192.168.40.151:9000 +hdfs.uri.nn2=192.168.40.152:9000 hdfs.user=dos ############################## 单机模式配置文件下载路径 配置 ######################################