This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
galaxy-tsg-olap-storm-log-s…/src/main/java/cn/ac/iie/topology/LogFlowWriteTopology.java

95 lines
3.5 KiB
Java
Raw Normal View History

2019-08-05 17:50:15 +08:00
package cn.ac.iie.topology;
2020-12-25 17:32:54 +08:00
import cn.ac.iie.bolt.CompletionBolt;
import cn.ac.iie.bolt.kafka.LogSendBolt;
2019-08-05 17:50:15 +08:00
import cn.ac.iie.common.FlowWriteConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
/**
* Storm程序主类
*
* @author Administrator
*/
public class LogFlowWriteTopology {
private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
private LogFlowWriteTopology() {
this(LogFlowWriteTopology.class.getSimpleName());
}
private LogFlowWriteTopology(String topologyName) {
this.topologyName = topologyName;
2020-12-25 17:32:54 +08:00
topologyConfig = createTopologyConfig();
2019-08-05 17:50:15 +08:00
}
2020-12-25 17:32:54 +08:00
private Config createTopologyConfig() {
2019-08-05 17:50:15 +08:00
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setMaxSpoutPending(FlowWriteConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
conf.setNumAckers(FlowWriteConfig.TOPOLOGY_NUM_ACKS);
return conf;
}
private void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
private void buildTopology() {
2020-12-25 17:32:54 +08:00
String need = "yes";
2019-08-05 17:50:15 +08:00
builder = new TopologyBuilder();
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
2020-12-25 17:32:54 +08:00
if (need.equals(FlowWriteConfig.LOG_NEED_COMPLETE)) {
builder.setBolt("LogCompletionBolt", new CompletionBolt(),
FlowWriteConfig.COMPLETION_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
builder.setBolt("CompletionLogSendBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogCompletionBolt");
} else {
builder.setBolt("LogSendBolt", new LogSendBolt(),
FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
}
2019-11-12 16:02:50 +08:00
}
2019-08-05 17:50:15 +08:00
public static void main(String[] args) throws Exception {
2020-12-25 17:32:54 +08:00
LogFlowWriteTopology flowWriteTopology;
2019-08-05 17:50:15 +08:00
boolean runLocally = true;
String parameter = "remote";
int size = 2;
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
runLocally = false;
2020-12-25 17:32:54 +08:00
flowWriteTopology = new LogFlowWriteTopology(args[0]);
2019-08-05 17:50:15 +08:00
} else {
2020-12-25 17:32:54 +08:00
flowWriteTopology = new LogFlowWriteTopology();
2019-08-05 17:50:15 +08:00
}
2020-12-25 17:32:54 +08:00
flowWriteTopology.buildTopology();
2019-08-05 17:50:15 +08:00
if (runLocally) {
logger.info("执行本地模式...");
2020-12-25 17:32:54 +08:00
flowWriteTopology.runLocally();
2019-08-05 17:50:15 +08:00
} else {
logger.info("执行远程部署模式...");
2020-12-25 17:32:54 +08:00
flowWriteTopology.runRemotely();
2019-08-05 17:50:15 +08:00
}
}
}