From b4237bb4a9f7d318b432e6b5686b1a4329cf4acd Mon Sep 17 00:00:00 2001
From: wanglihui <949764788@qq.com>
Date: Mon, 6 Sep 2021 16:19:33 +0800
Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ekafka=20sasl=E8=AE=A4?=
=?UTF-8?q?=E8=AF=81=E6=9C=BA=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 64 ++++++-------------
.../java/com/zdjizhi/common/CommonConfig.java | 3 +
.../com/zdjizhi/source/DosSketchSource.java | 3 +
.../zdjizhi/utils/FlinkEnvironmentUtils.java | 13 +---
.../java/com/zdjizhi/utils/KafkaUtils.java | 10 ++-
src/main/resources/common.properties | 20 +++---
.../java/com/zdjizhi/common/UdtfTest.java | 17 -----
7 files changed, 45 insertions(+), 85 deletions(-)
delete mode 100644 src/test/java/com/zdjizhi/common/UdtfTest.java
diff --git a/pom.xml b/pom.xml
index 0c4e1fd..9567eb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,44 +116,7 @@
org.apache.flink
- flink-sql-connector-kafka_2.11
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-connector-kafka_2.11
- ${flink.version}
-
-
-
-
-
- org.apache.kafka
- kafka-clients
- 1.0.0
-
-
-
-
- org.apache.flink
- flink-table-api-java
- ${flink.version}
-
-
-
-
-
- org.apache.flink
- flink-table-planner-blink_2.11
- ${flink.version}
-
-
-
-
- org.apache.flink
- flink-table-planner_2.11
+ flink-connector-kafka_2.12
${flink.version}
@@ -161,7 +124,7 @@
org.apache.flink
- flink-clients_2.11
+ flink-clients_2.12
${flink.version}
@@ -181,6 +144,23 @@
+
+
+ org.apache.hbase
+ hbase-client
+ 2.2.3
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
+
+
org.apache.zookeeper
zookeeper
@@ -203,12 +183,6 @@
4.5.6
-
- org.apache.flink
- flink-connector-hbase-2.2_2.11
- ${flink.version}
-
-
cn.hutool
hutool-all
diff --git a/src/main/java/com/zdjizhi/common/CommonConfig.java b/src/main/java/com/zdjizhi/common/CommonConfig.java
index 2da067b..69d6859 100644
--- a/src/main/java/com/zdjizhi/common/CommonConfig.java
+++ b/src/main/java/com/zdjizhi/common/CommonConfig.java
@@ -61,4 +61,7 @@ public class CommonConfig {
public static final int STATIC_THRESHOLD_SCHEDULE_MINUTES = CommonConfigurations.getIntProperty("static.threshold.schedule.minutes");
public static final int BASELINE_THRESHOLD_SCHEDULE_DAYS = CommonConfigurations.getIntProperty("baseline.threshold.schedule.days");
+ public static final String SASL_JAAS_CONFIG_USER = CommonConfigurations.getStringProperty("sasl.jaas.config.user");
+ public static final String SASL_JAAS_CONFIG_PASSWORD = CommonConfigurations.getStringProperty("sasl.jaas.config.password");
+
}
diff --git a/src/main/java/com/zdjizhi/source/DosSketchSource.java b/src/main/java/com/zdjizhi/source/DosSketchSource.java
index fc86c87..af7f6ed 100644
--- a/src/main/java/com/zdjizhi/source/DosSketchSource.java
+++ b/src/main/java/com/zdjizhi/source/DosSketchSource.java
@@ -20,6 +20,9 @@ public class DosSketchSource {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_INPUT_BOOTSTRAP_SERVERS);
properties.setProperty("group.id", CommonConfig.KAFKA_GROUP_ID);
+ properties.setProperty("security.protocol", "SASL_PLAINTEXT");
+ properties.setProperty("sasl.mechanism", "PLAIN");
+ properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
return streamExeEnv.addSource(new FlinkKafkaConsumer(
CommonConfig.KAFKA_INPUT_TOPIC_NAME,
diff --git a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
index 6e8e02c..e34ce28 100644
--- a/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
+++ b/src/main/java/com/zdjizhi/utils/FlinkEnvironmentUtils.java
@@ -2,8 +2,6 @@ package com.zdjizhi.utils;
import com.zdjizhi.common.CommonConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
@@ -12,17 +10,8 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkEnvironmentUtils {
public static StreamExecutionEnvironment streamExeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-
- public static StreamTableEnvironment getStreamTableEnv() {
+ static {
streamExeEnv.setParallelism(CommonConfig.STREAM_EXECUTION_ENVIRONMENT_PARALLELISM);
-
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode()
- .build();
-
- return StreamTableEnvironment.create(streamExeEnv, settings);
}
-
}
diff --git a/src/main/java/com/zdjizhi/utils/KafkaUtils.java b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
index 954a406..0f32683 100644
--- a/src/main/java/com/zdjizhi/utils/KafkaUtils.java
+++ b/src/main/java/com/zdjizhi/utils/KafkaUtils.java
@@ -9,10 +9,14 @@ import java.util.Properties;
public class KafkaUtils {
private static Properties getKafkaSinkProperty(){
- Properties propertiesproducer = new Properties();
- propertiesproducer.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", CommonConfig.KAFKA_OUTPUT_BOOTSTRAP_SERVERS);
+ properties.setProperty("security.protocol", "SASL_PLAINTEXT");
+ properties.setProperty("sasl.mechanism", "PLAIN");
+ properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+CommonConfig.SASL_JAAS_CONFIG_USER+"\" password=\""+CommonConfig.SASL_JAAS_CONFIG_PASSWORD+"\";");
- return propertiesproducer;
+
+ return properties;
}
public static FlinkKafkaProducer getKafkaSink(String topic){
diff --git a/src/main/resources/common.properties b/src/main/resources/common.properties
index 82cea3e..9b19d77 100644
--- a/src/main/resources/common.properties
+++ b/src/main/resources/common.properties
@@ -8,11 +8,11 @@ stream.execution.job.name=DOS-DETECTION-APPLICATION
kafka.input.parallelism=1
#输入kafka topic名
-kafka.input.topic.name=DOS-SKETCH-LOG
+kafka.input.topic.name=DOS-SKETCH-RECORD
#输入kafka地址
-#kafka.input.bootstrap.servers=192.168.44.12:9092
-kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
+kafka.input.bootstrap.servers=192.168.44.12:9092
+#kafka.input.bootstrap.servers=192.168.44.11:9092,192.168.44.14:9092,192.168.44.15:9092
#读取kafka group id
kafka.input.group.id=2108231709
@@ -22,15 +22,15 @@ kafka.input.group.id=2108231709
kafka.output.metric.parallelism=1
#发送kafka metrics topic名
-#kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS-LOG
-kafka.output.metric.topic.name=test
+kafka.output.metric.topic.name=TRAFFIC-TOP-DESTINATION-IP-METRICS
+#kafka.output.metric.topic.name=test
#发送kafka event并行度大小
kafka.output.event.parallelism=1
#发送kafka event topic名
-#kafka.output.event.topic.name=DOS-EVENT-LOG
-kafka.output.event.topic.name=test
+kafka.output.event.topic.name=DOS-EVENT
+#kafka.output.event.topic.name=test
#kafka输出地址
kafka.output.bootstrap.servers=192.168.44.12:9092
@@ -118,4 +118,8 @@ http.pool.response.timeout=60000
static.threshold.schedule.minutes=10
#获取baseline周期,默认7天
-baseline.threshold.schedule.days=7
\ No newline at end of file
+baseline.threshold.schedule.days=7
+
+#kafka用户认证配置参数
+sasl.jaas.config.user=admin
+sasl.jaas.config.password=galaxy2019
\ No newline at end of file
diff --git a/src/test/java/com/zdjizhi/common/UdtfTest.java b/src/test/java/com/zdjizhi/common/UdtfTest.java
deleted file mode 100644
index 479febe..0000000
--- a/src/test/java/com/zdjizhi/common/UdtfTest.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.zdjizhi.common;
-
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.types.Row;
-
-public class UdtfTest extends TableFunction {
-
- public void eval(Row[] rows) {
- for (Row row : rows) {
- collect(row);
- }
- }
-
- public static void main(String[] args) {
-
- }
-}