diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml new file mode 100644 index 0000000..cc9ded2 --- /dev/null +++ b/dependency-reduced-pom.xml @@ -0,0 +1,125 @@ + + + 4.0.0 + cn.ac.iie + log-stream-aggregation + log-stream-aggregation + 0.0.1-SNAPSHOT + http://maven.apache.org + + + + properties + + **/*.properties + + + + properties + + log4j.properties + + + + + + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + + + org.apache.storm + storm-core + 1.0.2 + provided + + + kryo + com.esotericsoftware + + + clojure + org.clojure + + + disruptor + com.lmax + + + log4j-api + org.apache.logging.log4j + + + log4j-core + org.apache.logging.log4j + + + log4j-slf4j-impl + org.apache.logging.log4j + + + log4j-over-slf4j + org.slf4j + + + servlet-api + javax.servlet + + + + + junit + junit + 4.11 + test + + + hamcrest-core + org.hamcrest + + + + + + UTF-8 + + + diff --git a/log-stream-aggregation.properties b/log-stream-aggregation.properties new file mode 100644 index 0000000..c77a9da --- /dev/null +++ b/log-stream-aggregation.properties @@ -0,0 +1,4 @@ +path.variable.kotlin_bundled=F\:\\tools\\ideaIU-2018.1.4\\plugins\\Kotlin\\kotlinc +path.variable.maven_repository=C\:\\Users\\lixikang\\.m2\\repository +jdk.home.1.8=C\:/developer_tools/jdk1.8.0_131 +javac2.instrumentation.includeJavaRuntime=false \ No newline at end of file diff --git a/log-stream-aggregation.xml b/log-stream-aggregation.xml new file mode 100644 index 0000000..197b825 --- /dev/null +++ b/log-stream-aggregation.xml @@ -0,0 +1,318 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index f6c2779..8b12d9e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,27 +12,80 @@ http://maven.apache.org + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + + + - - - - - - - + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.2 + + + package + + shade + + + + + cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology + + + META-INF/spring.handlers + + + META-INF/spring.schemas + + + + + + + + + org.apache.maven.plugins maven-compiler-plugin + 2.3.2 - 8 - 8 + 1.8 + 1.8 + + + properties + + **/*.properties + + false + + + properties + + log4j.properties + + false + + + + UTF-8 @@ -42,25 +95,35 @@ org.apache.storm storm-core 1.0.2 + provided org.apache.storm storm-kafka 1.0.2 + org.apache.kafka kafka-clients - com.zdjizhi galaxy 1.0.3 + + + org.apache.commons + commons-lang3 + + + commons-io + commons-io + slf4j-log4j12 org.slf4j @@ -69,26 +132,22 @@ log4j-over-slf4j org.slf4j + + guava + com.google.guava + org.apache.kafka kafka_2.11 - 0.10.0.1 + 1.0.0 - - org.apache.zookeeper - zookeeper - org.slf4j slf4j-log4j12 - - log4j - log4j - @@ -99,20 +158,6 @@ test - - - org.apache.thrift - libthrift - 0.10.0 - pom - - - - - org.apache.thrift.tools - maven-thrift-plugin - 0.1.11 - com.alibaba @@ -120,37 +165,5 @@ 1.2.59 - - commons-collections - commons-collections - 3.2.2 - - - commons-codec - commons-codec - 1.10 - - - - - - - - - - - org.jgrapht - jgrapht-dist - 1.0.1 - pom - - - org.junit.jupiter - junit-jupiter-api - 5.0.0 - compile - - - diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index ce08511..b3eb718 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -30,7 +30,8 @@ group.id=lxk-200512 #输出topic results.output.topic=agg_test #results.output.topic=SECURITY-EVENT-COMPLETED-LOG - +#聚合时间,单位秒 +agg.time=30 #storm topology workers topology.workers=1 diff --git a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java index cd179cd..f63884d 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/AggregateTopology.java @@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit; **/ public class AggregateTopology { + + + public static void main(String[] args) { //TODO 创建一个topo任务 TridentTopology topology = new TridentTopology(); @@ -29,16 +32,26 @@ public class AggregateTopology { OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) - .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .name("one") + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 + .name("two") .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) - .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) - .slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) - .each(new Fields("map"), new KafkaBolt(), new Fields()); + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 + .name("three") + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .name("four") + .each(new Fields("map"), new KafkaBolt(), new Fields()) + .name("five") + .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 + .name("six"); Config config = new Config(); +// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); config.setDebug(false); - config.setNumWorkers(5); + config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量 LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-wordcount", config, topology.build()); // StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build()); } diff --git a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java index 21ce8d3..2583b5b 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/spout/TridentKafkaSpout.java @@ -29,6 +29,7 @@ public class TridentKafkaSpout { TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); kafkaConfig.startOffsetTime = -1L; + kafkaConfig.socketTimeoutMs=60000; //不透明事务型Spout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig); diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java new file mode 100644 index 0000000..d981fe1 --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/LogFlowWriteTopology.java @@ -0,0 +1,111 @@ +package cn.ac.iie.trident.aggregate.topology; + + +import cn.ac.iie.trident.aggregate.AggCount; +import cn.ac.iie.trident.aggregate.ParseJson2KV; +import cn.ac.iie.trident.aggregate.bolt.KafkaBolt; +import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout; +import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; +import org.apache.log4j.Logger; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory; +import org.apache.storm.tuple.Fields; + +import java.util.concurrent.TimeUnit; + +/** + * Storm程序主类 + * + * @author Administrator + */ + +public class LogFlowWriteTopology { + private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class); + + private TopologyBuilder builder; + private static TridentTopology tridentTopology; + + + + + + private static Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + return conf; + } + + + private static StormTopology buildTopology() { + + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields()); + return tridentTopology.build(); + } + + public static void main(String[] args) throws Exception { + + + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); + + //TODO 创建一个topo任务 + TridentTopology topology = new TridentTopology(); + //TODO 为Topo绑定Spout + OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance(); + + /* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6 + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9 + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .each(new Fields("map"), new KafkaBolt(), new Fields());*/ + + + topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout) + .name("one") + .parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度 + .name("two") + .each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value")) + .parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度 + .name("three") + .slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map")) + .name("four") + .each(new Fields("map"), new KafkaBolt(), new Fields()) + .name("five") + .parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度 + .name("six"); + + if(args.length == 0){//本地模式运行 + + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-function", conf, topology.build()); + Thread.sleep(100000); + cluster.shutdown(); + }else{//集群模式运行 + StormSubmitter.submitTopology(args[0], conf, topology.build()); + } + + } +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java new file mode 100644 index 0000000..708f77c --- /dev/null +++ b/src/main/java/cn/ac/iie/trident/aggregate/topology/StormRunner.java @@ -0,0 +1,35 @@ +package cn.ac.iie.trident.aggregate.topology; + + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; + +/** + * @author Administrator + */ +public final class StormRunner { + private static final int MILLS_IN_SEC = 1000; + + private StormRunner() {} + + public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { + + LocalCluster localCluster = new LocalCluster(); + localCluster.submitTopology(topologyName, conf, builder.createTopology()); + Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC); + localCluster.shutdown(); + + } + + public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); + } + + +} diff --git a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java index 0ab90a5..3905580 100644 --- a/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java +++ b/src/main/java/cn/ac/iie/trident/aggregate/utils/FlowWriteConfig.java @@ -23,6 +23,10 @@ public class FlowWriteConfig { public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num"); public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset"); + + public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time"); + + /** * kafka */ @@ -36,8 +40,6 @@ public class FlowWriteConfig { public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset"); public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type"); - public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library"); - /** * http */ diff --git a/src/test/java/com/wp/AppTest.java b/src/test/java/com/wp/AppTest.java index 3202adb..77c826f 100644 --- a/src/test/java/com/wp/AppTest.java +++ b/src/test/java/com/wp/AppTest.java @@ -7,78 +7,50 @@ import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Debug; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; /** * Unit test for simple App. */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } +public class AppTest{ - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } - - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - } - - - private static ValueBean valueBean; @org.junit.Test public void test(){ - System.out.println(valueBean == null); + Config conf = new Config(); +// conf.setDebug(false); + conf.setMessageTimeoutSecs(60); + conf.setNumWorkers(1); + + FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, + new Values("nickt1", 4), + new Values("nickt2", 7), + new Values("nickt3", 8), + new Values("nickt4", 9), + new Values("nickt5", 7), + new Values("nickt6", 11), + new Values("nickt7", 5) + ); + spout.setCycle(true); + TridentTopology topology = new TridentTopology(); + topology.newStream("spout1", spout) + .batchGlobal() + .each(new Fields("user"),new Debug("print:")) + .parallelismHint(5); + + LocalCluster cluster = new LocalCluster(); + + cluster.submitTopology("trident-function", conf, topology.build()); } - static class Demo{ - private String a; - private String b; - private String c; - - public String getA() { - return a; - } - - public void setA(String a) { - this.a = a; - } - - public String getB() { - return b; - } - - public void setB(String b) { - this.b = b; - } - - public String getC() { - return c; - } - - public void setC(String c) { - this.c = c; - } - } } diff --git a/storm-topology.log b/storm-topology.log new file mode 100644 index 0000000..e69de29