优化Kafka认证方式,删除配置项通过连接端口判断

This commit is contained in:
qidaijie
2022-03-09 10:05:54 +08:00
parent c6f364d451
commit 956811c2d4
7 changed files with 41 additions and 36 deletions

View File

@@ -4,8 +4,8 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zdjizhi.common.StreamAggregateConfig;
import com.zdjizhi.utils.functions.*;
import com.zdjizhi.utils.kafka.Consumer;
import com.zdjizhi.utils.kafka.Producer;
import com.zdjizhi.utils.kafka.KafkaConsumer;
import com.zdjizhi.utils.kafka.KafkaProducer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,7 +35,7 @@ public class StreamAggregateTopology {
//两个输出之间的最大时间 (单位milliseconds)
environment.setBufferTimeout(StreamAggregateConfig.BUFFER_TIMEOUT);
DataStream<String> streamSource = environment.addSource(Consumer.getKafkaConsumer())
DataStream<String> streamSource = environment.addSource(KafkaConsumer.getKafkaConsumer())
.setParallelism(StreamAggregateConfig.SOURCE_PARALLELISM);
SingleOutputStreamOperator<Tuple3<String, String, String>> parseDataMap = streamSource.map(new ParseMapFunction())
@@ -56,7 +56,7 @@ public class StreamAggregateTopology {
.name("SecondCountWindow").setParallelism(StreamAggregateConfig.SECOND_WINDOW_PARALLELISM);
secondCountWindow.flatMap(new ResultFlatMapFunction()).name("ResultFlatMap").setParallelism(StreamAggregateConfig.SINK_PARALLELISM)
.addSink(Producer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM);
.addSink(KafkaProducer.getKafkaProducer()).name("LogSinkKafka").setParallelism(StreamAggregateConfig.SINK_PARALLELISM);
environment.execute(args[0]);
} catch (Exception e) {