From 36b04e3feaaf1f1c5af1a005af20d2eb28804bc7 Mon Sep 17 00:00:00 2001 From: lee Date: Thu, 4 Jun 2020 15:55:12 +0800 Subject: [PATCH] =?UTF-8?q?OLAP=E9=A2=84=E8=81=9A=E5=90=88=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E5=88=9D=E5=A7=8B=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../aggregate/spout/TridentKafkaSpout.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java new file mode 100644 index 0000000..21ce8d3 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java @@ -0,0 +1,39 @@ +package cn.ac.iie.trident.aggregate.spout; + +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.StringScheme; +import org.apache.storm.kafka.ZkHosts; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; +import org.apache.storm.kafka.trident.TridentKafkaConfig; +import org.apache.storm.spout.SchemeAsMultiScheme; + +/** + * @ClassNameKafkaSpout + * @Author lixkvip@126.com + * @Date2020/6/4 11:55 + * @Version V1.0 + **/ +public class TridentKafkaSpout { + + + /** + * kafka生产者适配器(单例),用来代理kafka生产者发送消息 + */ + private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout; + + public static OpaqueTridentKafkaSpout getInstance() { + if (opaqueTridentKafkaSpout == null) { + + BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS); + TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC); + kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); + kafkaConfig.startOffsetTime = -1L; + + //不透明事务型Spout + opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); + } + return opaqueTridentKafkaSpout; + } + +}