OLAP预聚合代码初始版本

This commit is contained in:
lee
2020-06-04 15:55:12 +08:00
parent a34f8c4df6
commit 36b04e3fea

View File

@@ -0,0 +1,39 @@
package cn.ac.iie.trident.aggregate.spout;
import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
/**
* @ClassNameKafkaSpout
* @Author lixkvip@126.com
* @Date2020/6/4 11:55
* @Version V1.0
**/
public class TridentKafkaSpout {
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static OpaqueTridentKafkaSpout opaqueTridentKafkaSpout;
public static OpaqueTridentKafkaSpout getInstance() {
if (opaqueTridentKafkaSpout == null) {
BrokerHosts zkHosts = new ZkHosts(FlowWriteConfig.ZOOKEEPER_SERVERS);
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = -1L;
//不透明事务型Spout
opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
}
return opaqueTridentKafkaSpout;
}
}