OLAP预聚合代码更新

This commit is contained in:
lee
2020-06-08 15:32:26 +08:00
parent 36b04e3fea
commit cb4ee7544e
12 changed files with 728 additions and 133 deletions

125
dependency-reduced-pom.xml Normal file
View File

@@ -0,0 +1,125 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.ac.iie</groupId>
<artifactId>log-stream-aggregation</artifactId>
<name>log-stream-aggregation</name>
<version>0.0.1-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<build>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
<resource>
<directory>properties</directory>
<includes>
<include>log4j.properties</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer>
<mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
</transformer>
<transformer>
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer>
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>kryo</artifactId>
<groupId>com.esotericsoftware</groupId>
</exclusion>
<exclusion>
<artifactId>clojure</artifactId>
<groupId>org.clojure</groupId>
</exclusion>
<exclusion>
<artifactId>disruptor</artifactId>
<groupId>com.lmax</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-slf4j-impl</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>hamcrest-core</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@@ -0,0 +1,4 @@
path.variable.kotlin_bundled=F\:\\tools\\ideaIU-2018.1.4\\plugins\\Kotlin\\kotlinc
path.variable.maven_repository=C\:\\Users\\lixikang\\.m2\\repository
jdk.home.1.8=C\:/developer_tools/jdk1.8.0_131
javac2.instrumentation.includeJavaRuntime=false

318
log-stream-aggregation.xml Normal file
View File

@@ -0,0 +1,318 @@
<?xml version="1.0" encoding="UTF-8"?>
<project name="log-stream-aggregation" default="all">
<property file="log-stream-aggregation.properties"/>
<!-- Uncomment the following property if no tests compilation is needed -->
<!--
<property name="skip.tests" value="true"/>
-->
<!-- Compiler options -->
<property name="compiler.debug" value="on"/>
<property name="compiler.generate.no.warnings" value="off"/>
<property name="compiler.args" value=""/>
<property name="compiler.max.memory" value="700m"/>
<patternset id="ignored.files">
<exclude name="**/*.hprof/**"/>
<exclude name="**/*.pyc/**"/>
<exclude name="**/*.pyo/**"/>
<exclude name="**/*.rbc/**"/>
<exclude name="**/*.yarb/**"/>
<exclude name="**/*~/**"/>
<exclude name="**/.DS_Store/**"/>
<exclude name="**/.git/**"/>
<exclude name="**/.hg/**"/>
<exclude name="**/.svn/**"/>
<exclude name="**/CVS/**"/>
<exclude name="**/__pycache__/**"/>
<exclude name="**/_svn/**"/>
<exclude name="**/vssver.scc/**"/>
<exclude name="**/vssver2.scc/**"/>
</patternset>
<patternset id="library.patterns">
<include name="*.egg"/>
<include name="*.jar"/>
<include name="*.ear"/>
<include name="*.swc"/>
<include name="*.war"/>
<include name="*.zip"/>
<include name="*.ane"/>
</patternset>
<patternset id="compiler.resources">
<exclude name="**/?*.java"/>
<exclude name="**/?*.form"/>
<exclude name="**/?*.class"/>
<exclude name="**/?*.groovy"/>
<exclude name="**/?*.scala"/>
<exclude name="**/?*.flex"/>
<exclude name="**/?*.kt"/>
<exclude name="**/?*.clj"/>
<exclude name="**/?*.aj"/>
</patternset>
<!-- JDK definitions -->
<property name="jdk.bin.1.8" value="${jdk.home.1.8}/bin"/>
<path id="jdk.classpath.1.8">
<fileset dir="${jdk.home.1.8}">
<include name="jre/lib/charsets.jar"/>
<include name="jre/lib/deploy.jar"/>
<include name="jre/lib/ext/access-bridge-64.jar"/>
<include name="jre/lib/ext/cldrdata.jar"/>
<include name="jre/lib/ext/dnsns.jar"/>
<include name="jre/lib/ext/jaccess.jar"/>
<include name="jre/lib/ext/jfxrt.jar"/>
<include name="jre/lib/ext/localedata.jar"/>
<include name="jre/lib/ext/nashorn.jar"/>
<include name="jre/lib/ext/sunec.jar"/>
<include name="jre/lib/ext/sunjce_provider.jar"/>
<include name="jre/lib/ext/sunmscapi.jar"/>
<include name="jre/lib/ext/sunpkcs11.jar"/>
<include name="jre/lib/ext/zipfs.jar"/>
<include name="jre/lib/javaws.jar"/>
<include name="jre/lib/jce.jar"/>
<include name="jre/lib/jfr.jar"/>
<include name="jre/lib/jfxswt.jar"/>
<include name="jre/lib/jsse.jar"/>
<include name="jre/lib/management-agent.jar"/>
<include name="jre/lib/plugin.jar"/>
<include name="jre/lib/resources.jar"/>
<include name="jre/lib/rt.jar"/>
</fileset>
</path>
<property name="project.jdk.home" value="${jdk.home.1.8}"/>
<property name="project.jdk.bin" value="${jdk.bin.1.8}"/>
<property name="project.jdk.classpath" value="jdk.classpath.1.8"/>
<!-- Project Libraries -->
<path id="library.maven:_com.101tec:zkclient:0.10.classpath">
<pathelement location="${path.variable.maven_repository}/com/101tec/zkclient/0.10/zkclient-0.10.jar"/>
</path>
<path id="library.maven:_com.alibaba:fastjson:1.2.59.classpath">
<pathelement location="${path.variable.maven_repository}/com/alibaba/fastjson/1.2.59/fastjson-1.2.59.jar"/>
</path>
<path id="library.maven:_com.esotericsoftware:kryo:3.0.3.classpath">
<pathelement location="${path.variable.maven_repository}/com/esotericsoftware/kryo/3.0.3/kryo-3.0.3.jar"/>
</path>
<path id="library.maven:_com.esotericsoftware:minlog:1.3.0.classpath">
<pathelement location="${path.variable.maven_repository}/com/esotericsoftware/minlog/1.3.0/minlog-1.3.0.jar"/>
</path>
<path id="library.maven:_com.esotericsoftware:reflectasm:1.10.1.classpath">
<pathelement location="${path.variable.maven_repository}/com/esotericsoftware/reflectasm/1.10.1/reflectasm-1.10.1.jar"/>
</path>
<path id="library.maven:_com.fasterxml.jackson.core:jackson-annotations:2.9.5.classpath">
<pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-annotations/2.9.5/jackson-annotations-2.9.5.jar"/>
</path>
<path id="library.maven:_com.fasterxml.jackson.core:jackson-core:2.9.5.classpath">
<pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-core/2.9.5/jackson-core-2.9.5.jar"/>
</path>
<path id="library.maven:_com.fasterxml.jackson.core:jackson-databind:2.9.1.classpath">
<pathelement location="${path.variable.maven_repository}/com/fasterxml/jackson/core/jackson-databind/2.9.1/jackson-databind-2.9.1.jar"/>
</path>
<path id="library.maven:_com.google.guava:guava:16.0.1.classpath">
<pathelement location="${path.variable.maven_repository}/com/google/guava/guava/16.0.1/guava-16.0.1.jar"/>
</path>
<path id="library.maven:_com.googlecode.json-simple:json-simple:1.1.classpath">
<pathelement location="${path.variable.maven_repository}/com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar"/>
</path>
<path id="library.maven:_com.lmax:disruptor:3.3.2.classpath">
<pathelement location="${path.variable.maven_repository}/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar"/>
</path>
<path id="library.maven:_com.maxmind.db:maxmind-db:1.2.2.classpath">
<pathelement location="${path.variable.maven_repository}/com/maxmind/db/maxmind-db/1.2.2/maxmind-db-1.2.2.jar"/>
</path>
<path id="library.maven:_com.maxmind.geoip2:geoip2:2.12.0.classpath">
<pathelement location="${path.variable.maven_repository}/com/maxmind/geoip2/geoip2/2.12.0/geoip2-2.12.0.jar"/>
</path>
<path id="library.maven:_com.maxmind.geoip:geoip-api:1.3.1.classpath">
<pathelement location="${path.variable.maven_repository}/com/maxmind/geoip/geoip-api/1.3.1/geoip-api-1.3.1.jar"/>
</path>
<path id="library.maven:_com.yammer.metrics:metrics-core:2.2.0.classpath">
<pathelement location="${path.variable.maven_repository}/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar"/>
</path>
<path id="library.maven:_com.zdjizhi:galaxy:1.0.3.classpath">
<pathelement location="${path.variable.maven_repository}/com/zdjizhi/galaxy/1.0.3/galaxy-1.0.3.jar"/>
</path>
<path id="library.maven:_commons-codec:commons-codec:1.10.classpath">
<pathelement location="${path.variable.maven_repository}/commons-codec/commons-codec/1.10/commons-codec-1.10.jar"/>
</path>
<path id="library.maven:_commons-io:commons-io:2.5.classpath">
<pathelement location="${path.variable.maven_repository}/commons-io/commons-io/2.5/commons-io-2.5.jar"/>
</path>
<path id="library.maven:_commons-lang:commons-lang:2.5.classpath">
<pathelement location="${path.variable.maven_repository}/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/>
</path>
<path id="library.maven:_commons-logging:commons-logging:1.2.classpath">
<pathelement location="${path.variable.maven_repository}/commons-logging/commons-logging/1.2/commons-logging-1.2.jar"/>
</path>
<path id="library.maven:_io.netty:netty:3.10.5.final.classpath">
<pathelement location="${path.variable.maven_repository}/io/netty/netty/3.10.5.Final/netty-3.10.5.Final.jar"/>
</path>
<path id="library.maven:_javax.servlet:servlet-api:2.5.classpath">
<pathelement location="${path.variable.maven_repository}/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/>
</path>
<path id="library.maven:_jline:jline:0.9.94.classpath">
<pathelement location="${path.variable.maven_repository}/jline/jline/0.9.94/jline-0.9.94.jar"/>
</path>
<path id="library.maven:_joda-time:joda-time:2.10.classpath">
<pathelement location="${path.variable.maven_repository}/joda-time/joda-time/2.10/joda-time-2.10.jar"/>
</path>
<path id="library.maven:_junit:junit:4.11.classpath">
<pathelement location="${path.variable.maven_repository}/junit/junit/4.11/junit-4.11.jar"/>
</path>
<path id="library.maven:_log4j:log4j:1.2.14.classpath">
<pathelement location="${path.variable.maven_repository}/log4j/log4j/1.2.14/log4j-1.2.14.jar"/>
</path>
<path id="library.maven:_net.sf.jopt-simple:jopt-simple:5.0.4.classpath">
<pathelement location="${path.variable.maven_repository}/net/sf/jopt-simple/jopt-simple/5.0.4/jopt-simple-5.0.4.jar"/>
</path>
<path id="library.maven:_org.apache.curator:curator-client:2.10.0.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/curator/curator-client/2.10.0/curator-client-2.10.0.jar"/>
</path>
<path id="library.maven:_org.apache.curator:curator-framework:2.10.0.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/curator/curator-framework/2.10.0/curator-framework-2.10.0.jar"/>
</path>
<path id="library.maven:_org.apache.httpcomponents:httpclient:4.5.5.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/httpcomponents/httpclient/4.5.5/httpclient-4.5.5.jar"/>
</path>
<path id="library.maven:_org.apache.httpcomponents:httpcore:4.4.9.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/httpcomponents/httpcore/4.4.9/httpcore-4.4.9.jar"/>
</path>
<path id="library.maven:_org.apache.kafka:kafka-clients:1.0.0.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/kafka/kafka-clients/1.0.0/kafka-clients-1.0.0.jar"/>
</path>
<path id="library.maven:_org.apache.kafka:kafka_2.11:1.0.0.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/kafka/kafka_2.11/1.0.0/kafka_2.11-1.0.0.jar"/>
</path>
<path id="library.maven:_org.apache.logging.log4j:log4j-api:2.1.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-api/2.1/log4j-api-2.1.jar"/>
</path>
<path id="library.maven:_org.apache.logging.log4j:log4j-core:2.1.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-core/2.1/log4j-core-2.1.jar"/>
</path>
<path id="library.maven:_org.apache.logging.log4j:log4j-slf4j-impl:2.1.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar"/>
</path>
<path id="library.maven:_org.apache.storm:storm-core:1.0.2.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/storm/storm-core/1.0.2/storm-core-1.0.2.jar"/>
</path>
<path id="library.maven:_org.apache.storm:storm-kafka:1.0.2.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/storm/storm-kafka/1.0.2/storm-kafka-1.0.2.jar"/>
</path>
<path id="library.maven:_org.apache.zookeeper:zookeeper:3.4.9.classpath">
<pathelement location="${path.variable.maven_repository}/org/apache/zookeeper/zookeeper/3.4.9/zookeeper-3.4.9.jar"/>
</path>
<path id="library.maven:_org.clojure:clojure:1.7.0.classpath">
<pathelement location="${path.variable.maven_repository}/org/clojure/clojure/1.7.0/clojure-1.7.0.jar"/>
</path>
<path id="library.maven:_org.hamcrest:hamcrest-core:1.3.classpath">
<pathelement location="${path.variable.maven_repository}/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar"/>
</path>
<path id="library.maven:_org.lz4:lz4-java:1.4.classpath">
<pathelement location="${path.variable.maven_repository}/org/lz4/lz4-java/1.4/lz4-java-1.4.jar"/>
</path>
<path id="library.maven:_org.objenesis:objenesis:2.1.classpath">
<pathelement location="${path.variable.maven_repository}/org/objenesis/objenesis/2.1/objenesis-2.1.jar"/>
</path>
<path id="library.maven:_org.ow2.asm:asm:5.0.3.classpath">
<pathelement location="${path.variable.maven_repository}/org/ow2/asm/asm/5.0.3/asm-5.0.3.jar"/>
</path>
<path id="library.maven:_org.scala-lang:scala-library:2.11.11.classpath">
<pathelement location="${path.variable.maven_repository}/org/scala-lang/scala-library/2.11.11/scala-library-2.11.11.jar"/>
</path>
<path id="library.maven:_org.slf4j:log4j-over-slf4j:1.6.6.classpath">
<pathelement location="${path.variable.maven_repository}/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar"/>
</path>
<path id="library.maven:_org.slf4j:slf4j-api:1.7.7.classpath">
<pathelement location="${path.variable.maven_repository}/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar"/>
</path>
<path id="library.maven:_org.xerial.snappy:snappy-java:1.1.4.classpath">
<pathelement location="${path.variable.maven_repository}/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar"/>
</path>
<!-- Global Libraries -->
<!-- Application Server Libraries -->
<!-- Register Custom Compiler Taskdefs -->
<property name="javac2.home" value="${idea.home}/lib"/>
<path id="javac2.classpath">
<fileset dir="${javac2.home}">
<include name="javac2.jar"/>
<include name="jdom.jar"/>
<include name="asm-all*.jar"/>
<include name="jgoodies-forms.jar"/>
</fileset>
</path>
<target name="register.custom.compilers">
<taskdef name="javac2" classname="com.intellij.ant.Javac2" classpathref="javac2.classpath"/>
<taskdef name="instrumentIdeaExtensions" classname="com.intellij.ant.InstrumentIdeaExtensions" classpathref="javac2.classpath"/>
</target>
<!-- Modules -->
<import file="${basedir}/.idea/module_log-stream-aggregation.xml"/>
<target name="init" description="Build initialization">
<!-- Perform any build initialization in this target -->
</target>
<target name="clean" depends="clean.module.log-stream-aggregation" description="cleanup all"/>
<target name="build.modules" depends="init, clean, compile.module.log-stream-aggregation" description="build all modules"/>
<target name="all" depends="build.modules" description="build all"/>
</project>

143
pom.xml
View File

@@ -12,27 +12,80 @@
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
</repositories>
<!--<repositories>-->
<!--<repository>-->
<!--<id>nexus</id>-->
<!--<name>Team Nexus Repository</name>-->
<!--<url>http://192.168.40.125:8099/content/groups/public</url>-->
<!--</repository>-->
<!--</repositories>-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.ac.iie.trident.aggregate.topology.LogFlowWriteTopology</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>8</source>
<target>8</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>properties</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@@ -42,25 +95,35 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.3</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
@@ -69,26 +132,22 @@
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -99,20 +158,6 @@
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift -->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.10.0</version>
<type>pom</type>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.thrift.tools/maven-thrift-plugin -->
<dependency>
<groupId>org.apache.thrift.tools</groupId>
<artifactId>maven-thrift-plugin</artifactId>
<version>0.1.11</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
@@ -120,37 +165,5 @@
<version>1.2.59</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!--&lt;!&ndash; https://mvnrepository.com/artifact/org.jgrapht/jgrapht-core &ndash;&gt;-->
<!--<dependency>-->
<!--<groupId>org.jgrapht</groupId>-->
<!--<artifactId>jgrapht-core</artifactId>-->
<!--<version>1.1.0</version>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.jgrapht/jgrapht-dist -->
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-dist</artifactId>
<version>1.0.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -30,7 +30,8 @@ group.id=lxk-200512
#输出topic
results.output.topic=agg_test
#results.output.topic=SECURITY-EVENT-COMPLETED-LOG
#聚合时间,单位秒
agg.time=30
#storm topology workers
topology.workers=1

View File

@@ -22,6 +22,9 @@ import java.util.concurrent.TimeUnit;
**/
public class AggregateTopology {
public static void main(String[] args) {
//TODO 创建一个topo任务
TridentTopology topology = new TridentTopology();
@@ -29,16 +32,26 @@ public class AggregateTopology {
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
.name("one")
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
.name("two")
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.slidingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.each(new Fields("map"), new KafkaBolt(), new Fields());
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
.name("three")
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.name("four")
.each(new Fields("map"), new KafkaBolt(), new Fields())
.name("five")
.parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
.name("six");
Config config = new Config();
// config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
config.setDebug(false);
config.setNumWorkers(5);
config.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS); //worker的数量
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-wordcount", config, topology.build());
// StormSubmitter.submitTopology("kafka2storm_opaqueTrident_topology", config,topology.build());
}

View File

@@ -29,6 +29,7 @@ public class TridentKafkaSpout {
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, FlowWriteConfig.KAFKA_TOPIC);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.startOffsetTime = -1L;
kafkaConfig.socketTimeoutMs=60000;
//不透明事务型Spout
opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

View File

@@ -0,0 +1,111 @@
package cn.ac.iie.trident.aggregate.topology;
import cn.ac.iie.trident.aggregate.AggCount;
import cn.ac.iie.trident.aggregate.ParseJson2KV;
import cn.ac.iie.trident.aggregate.bolt.KafkaBolt;
import cn.ac.iie.trident.aggregate.spout.TridentKafkaSpout;
import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
import org.apache.storm.tuple.Fields;
import java.util.concurrent.TimeUnit;
/**
* Storm程序主类
*
* @author Administrator
*/
public class LogFlowWriteTopology {
private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
private TopologyBuilder builder;
private static TridentTopology tridentTopology;
private static Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
return conf;
}
private static StormTopology buildTopology() {
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
tridentTopology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.each(new Fields("map"), new KafkaBolt(), new Fields());
return tridentTopology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
//TODO 创建一个topo任务
TridentTopology topology = new TridentTopology();
//TODO 为Topo绑定Spout
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = TridentKafkaSpout.getInstance();
/* topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM)//6
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM)//9
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.each(new Fields("map"), new KafkaBolt(), new Fields());*/
topology.newStream("kafka2storm_opaqueTrident", opaqueTridentKafkaSpout)
.name("one")
.parallelismHint(FlowWriteConfig.SPOUT_PARALLELISM) //spout的并行度
.name("two")
.each(new Fields("str"), new ParseJson2KV(), new Fields("key", "value"))
.parallelismHint(FlowWriteConfig.DATACENTER_BOLT_PARALLELISM) //处理数据的并行度
.name("three")
.slidingWindow(new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(FlowWriteConfig.AGG_TIME, TimeUnit.SECONDS), new InMemoryWindowsStoreFactory(), new Fields("key", "value"), new AggCount(), new Fields("map"))
.name("four")
.each(new Fields("map"), new KafkaBolt(), new Fields())
.name("five")
.parallelismHint(FlowWriteConfig.KAFKA_BOLT_PARALLELISM) //写到kafka的并行度
.name("six");
if(args.length == 0){//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, topology.build());
Thread.sleep(100000);
cluster.shutdown();
}else{//集群模式运行
StormSubmitter.submitTopology(args[0], conf, topology.build());
}
}
}

View File

@@ -0,0 +1,35 @@
package cn.ac.iie.trident.aggregate.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
/**
* @author Administrator
*/
public final class StormRunner {
private static final int MILLS_IN_SEC = 1000;
private StormRunner() {}
public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, conf, builder.createTopology());
Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
localCluster.shutdown();
}
public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}

View File

@@ -23,6 +23,10 @@ public class FlowWriteConfig {
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
public static final Integer AGG_TIME = FlowWriteConfigurations.getIntProperty(0, "agg.time");
/**
* kafka
*/
@@ -36,8 +40,6 @@ public class FlowWriteConfig {
public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type");
public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
/**
* http
*/

View File

@@ -7,78 +7,50 @@ import cn.ac.iie.trident.aggregate.utils.FlowWriteConfig;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.builtin.Debug;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}
public class AppTest{
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
private static ValueBean valueBean;
@org.junit.Test
public void test(){
System.out.println(valueBean == null);
Config conf = new Config();
// conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setNumWorkers(1);
FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
new Values("nickt1", 4),
new Values("nickt2", 7),
new Values("nickt3", 8),
new Values("nickt4", 9),
new Values("nickt5", 7),
new Values("nickt6", 11),
new Values("nickt7", 5)
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.batchGlobal()
.each(new Fields("user"),new Debug("print:"))
.parallelismHint(5);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-function", conf, topology.build());
}
static class Demo{
private String a;
private String b;
private String c;
public String getA() {
return a;
}
public void setA(String a) {
this.a = a;
}
public String getB() {
return b;
}
public void setB(String b) {
this.b = b;
}
public String getC() {
return c;
}
public void setC(String c) {
this.c = c;
}
}
}

0
storm-topology.log Normal file
View File