Storm版本liveCharts功能代码更新

This commit is contained in:
李玺康
2020-10-27 14:51:16 +08:00
commit 5260718fba
25 changed files with 1825 additions and 0 deletions

16
.idea/compiler.xml generated Normal file
View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<annotationProcessing>
<profile name="Maven default annotation processors profile" enabled="true">
<sourceOutputDir name="target/generated-sources/annotations" />
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<module name="storm-olap-aggregation" />
</profile>
</annotationProcessing>
<bytecodeTargetLevel>
<module name="storm-olap-aggregation" target="1.8" />
</bytecodeTargetLevel>
</component>
</project>

6
.idea/encodings.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$" charset="UTF-8" />
</component>
</project>

14
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK" />
</project>

72
.idea/storm-olap-aggregation.iml generated Normal file
View File

@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/properties" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.apache.kafka:kafka_2.11:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.kafka:kafka-clients:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.lz4:lz4-java:1.4" level="project" />
<orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.1.4" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.9.1" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.9.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.9.1" level="project" />
<orderEntry type="library" name="Maven: net.sf.jopt-simple:jopt-simple:5.0.4" level="project" />
<orderEntry type="library" name="Maven: com.yammer.metrics:metrics-core:2.2.0" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.11.11" level="project" />
<orderEntry type="library" name="Maven: com.101tec:zkclient:0.10" level="project" />
<orderEntry type="library" name="Maven: org.apache.storm:storm-core:1.0.2" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:kryo:3.0.3" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:reflectasm:1.10.1" level="project" />
<orderEntry type="library" name="Maven: org.ow2.asm:asm:5.0.3" level="project" />
<orderEntry type="library" name="Maven: com.esotericsoftware:minlog:1.3.0" level="project" />
<orderEntry type="library" name="Maven: org.objenesis:objenesis:2.1" level="project" />
<orderEntry type="library" name="Maven: org.clojure:clojure:1.7.0" level="project" />
<orderEntry type="library" name="Maven: com.lmax:disruptor:3.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-core:2.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-slf4j-impl:2.1" level="project" />
<orderEntry type="library" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.7" level="project" />
<orderEntry type="library" name="Maven: org.apache.storm:storm-kafka:1.0.2" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-framework:2.10.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.curator:curator-client:2.10.0" level="project" />
<orderEntry type="library" name="Maven: commons-lang:commons-lang:2.5" level="project" />
<orderEntry type="library" name="Maven: com.googlecode.json-simple:json-simple:1.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" name="Maven: com.alibaba:fastjson:1.2.70" level="project" />
<orderEntry type="library" name="Maven: cglib:cglib-nodep:3.2.4" level="project" />
<orderEntry type="library" name="Maven: com.zdjizhi:galaxy:1.0.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.4" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.14" level="project" />
<orderEntry type="library" name="Maven: joda-time:joda-time:2.10" level="project" />
<orderEntry type="library" name="Maven: com.maxmind.geoip:geoip-api:1.3.1" level="project" />
<orderEntry type="library" name="Maven: com.maxmind.geoip2:geoip2:2.12.0" level="project" />
<orderEntry type="library" name="Maven: com.maxmind.db:maxmind-db:1.2.2" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:23.0" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:1.3.9" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14" level="project" />
<orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.9" level="project" />
<orderEntry type="library" name="Maven: jline:jline:0.9.94" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty:3.10.5.Final" level="project" />
<orderEntry type="library" name="Maven: org.junit.jupiter:junit-jupiter-api:5.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apiguardian:apiguardian-api:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.opentest4j:opentest4j:1.1.1" level="project" />
<orderEntry type="library" name="Maven: org.junit.platform:junit-platform-commons:1.3.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.5.2" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: commons-codec:commons-codec:1.9" level="project" />
<orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.1" level="project" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

234
pom.xml Normal file
View File

@@ -0,0 +1,234 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.ac.iie</groupId>
<artifactId>storm-olap-aggregation</artifactId>
<version>v3.20.09.23</version>
<packaging>jar</packaging>
<name>storm-olap-aggregation</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.40.125:8099/content/groups/public</url>
</repository>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.ac.iie.storm.topology.StreamAggregateTopology</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>properties</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>1.0.0</kafka.version>
<storm.version>1.0.2</storm.version>
<hbase.version>2.2.1</hbase.version>
<hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.1</version>
</dependency>
<!--&lt;!&ndash; https://mvnrepository.com/artifact/org.apache.druid.extensions/druid-datasketches &ndash;&gt;-->
<!--<dependency>-->
<!--<groupId>org.apache.druid.extensions</groupId>-->
<!--<artifactId>druid-datasketches</artifactId>-->
<!--<version>0.16.0-incubating</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>com.jayway.jsonpath</groupId>-->
<!--<artifactId>json-path</artifactId>-->
<!--<version>2.4.0</version>-->
<!--</dependency>-->
</dependencies>
</project>

View File

@@ -0,0 +1,60 @@
#管理kafka地址
bootstrap.servers=192.168.44.12:9092
#latest/earliest
auto.offset.reset=latest
#压缩模式 none or snappy
kafka.compression.type=none
#kafka broker下的topic名称
kafka.topic=CONNECTION-RECORD-COMPLETED-LOG
#读取topic,存储该spout id的消费offset信息可通过该拓扑命名;具体存储offset的位置确定下次读取不重复的数据
group.id=test-20201026
#输出topic
results.bootstrap.servers=192.168.44.12:9092
#输出topic
results.output.topic=TRAFFIC-PROTOCOL-STAT-LOG
#聚合时间,单位秒
agg.time=15
#更新APP-ID时间
update.app.id.time=60
#storm topology workers
topology.workers=1
#spout并行度 建议与kafka分区数相同
spout.parallelism=1
#处理补全操作的bolt并行度-worker的倍数
datacenter.bolt.parallelism=1
#写入kafka的并行度10
kafka.bolt.parallelism=1
#kafka批量条数
batch.insert.num=2000
#网关的schema位置
schema.http=http://192.168.44.67:9999/metadata/schema/v1/fields/liveChart
#网关的schema位置
app.id.http=http://192.168.44.67:9999/open-api/appDicList
#数据中心UID
data.center.id.num=15
#ack设置 1启动ack 0不启动ack
topology.num.acks=0
#spout接收睡眠时间
topology.spout.sleep.time=1
#允许发送kafka最大失败数
max.failure.num=20

View File

@@ -0,0 +1,177 @@
package cn.ac.iie.storm.bolt;
import cn.ac.iie.storm.utils.combine.AggregateUtils;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassNameAggregateBolt
* @Author lixkvip@126.com
* @Date2020/6/24 13:39
* @Version V1.0
**/
public class AggregateBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(AggregateBolt.class);
private static final long serialVersionUID = -7666031217706448622L;
private HashMap<String, JSONObject> metricsMap;
private HashMap<String, String[]> actionMap;
private HashMap<String, JSONObject> cacheMap;
private static String timestamp;
/**
* 只会在程序初始化的时候执行一次
*
* @param stormConf
* @param context
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
// timestampValue = System.currentTimeMillis() / 1000;
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
timestamp = AggregateUtils.getTimeMetric(schema);
cacheMap = new HashMap<>(32);
// TODO 获取action标签的内容
actionMap = AggregateUtils.getActionMap(schema);
metricsMap = AggregateUtils.getMetircsMap(schema);
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(input)) {
long timestampValue = System.currentTimeMillis() / 1000;
for (String s : cacheMap.keySet()) {
JSONObject result = JSONObject.parseObject(s);
result.put(timestamp, timestampValue);
result.putAll(cacheMap.get(s));
collector.emit(new Values(result.toString()));
}
cacheMap.clear();
} else {
String label = input.getStringByField("label");
System.out.println("recv label=================="+label);
//action中某个协议的所有function,如果没有就默认
String[] metrics = actionMap.getOrDefault(label, actionMap.get("Default"));
String dimensions = input.getStringByField("dimensions");
System.out.println("recv dimensions=============="+dimensions);
String message = input.getStringByField("message");
System.out.println("recv message==================="+message);
//一条数据
JSONObject event = JSONObject.parseObject(message);
//数据中的key值 (protocol,device_id,isp)
//map中对应的数据可能为空,为空就默认创建一个对象
JSONObject cacheMessage = cacheMap.getOrDefault(dimensions, new JSONObject());
//TODO 接下来遍历所有的函数去metrics的Map中去找对应的方法并执行
for (String metric : metrics) {
String name = metricsMap.get(metric).getString("name");
//可能为空
String fieldName = metricsMap.get(name).getString("fieldName");
String nameValue = cacheMessage.getString(name);
//map中的字段值
nameValue = (nameValue == null) ? "0" : nameValue;
String fieldNameValue = event.getString(fieldName);
//数据中的字段值
fieldNameValue = (fieldNameValue == null) ? "0" : fieldNameValue;
//TODO 每次新增函数,需要改动此处代码
functionSet(name, cacheMessage, nameValue, fieldNameValue);
}
cacheMap.put(dimensions, cacheMessage);
}
} catch (Exception e) {
logger.error("计算节点异常,异常信息:" + e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.AGG_TIME);
return conf;
}
/**
* 根据schema描述对应字段进行操作的 函数集合
*
* @param name 函数名称
* @param cacheMessage 结果集
* @param nameValue 当前值
* @param fieldNameValue 新加值
*/
private static void functionSet(String name, JSONObject cacheMessage, String nameValue, String fieldNameValue) {
switch (name) {
case "sessions":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_byte_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_byte_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_pkt_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_pkt_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_ipfrag_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_ipfrag_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_tcp_lostlen":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_tcp_lostlen":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "c2s_tcp_unorder_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "s2c_tcp_unorder_num":
cacheMessage.put(name, AggregateUtils.longSum(Long.parseLong(nameValue), Long.parseLong(fieldNameValue)));
break;
case "unique_sip_num":
//TODO
//cacheMessage.put(name, AggregateUtils.)
break;
case "unique_cip_num":
//TODO
break;
default:
break;
}
}
}

View File

@@ -0,0 +1,168 @@
package cn.ac.iie.storm.bolt;
import cn.ac.iie.storm.utils.combine.AggregateUtils;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TupleUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static cn.ac.iie.storm.utils.combine.AggregateUtils.transDimensions;
import static cn.ac.iie.storm.utils.combine.AggregateUtils.updateAppRelation;
/**
* @ClassNameMyWindowBolt
* @Author lixkvip@126.com
* @Date2020/6/9 14:45
* @Version V1.0
**/
public class ParseKvBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(ParseKvBolt.class);
private static final long serialVersionUID = -999382396035310355L;
private JSONArray transforms;
private JSONArray dimensions;
private static HashMap<Long, String> appMap = new HashMap<>(32);
/**
* 此方法只在程序启动的时候执行一次,用来初始化
*
* @param stormConf Map
* @param context TopologyContext
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.SCHEMA_HTTP);
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = JSONObject.parseObject(jsonObject.getString("data")).getString("doc");
//TODO 解析 schema
transforms = JSONObject.parseArray(JSONObject.parseObject(data).getString("transforms"));
//TODO 获取dimensions
dimensions = JSONObject.parseArray(JSONObject.parseObject(data).getString("dimensions"));
updateAppRelation(appMap);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(tuple)) {
updateAppRelation(appMap);
} else {
//TODO 解析tuple的 message
JSONObject message = JSONObject.parseObject(tuple.getStringByField("source"));
//TODO 新建一个dimensions的Json对象
JSONObject dimensionsObj = transDimensions(dimensions, message);
for (Object transform : transforms) {
JSONObject transformObj = JSONObject.parseObject(transform.toString());
String function = transformObj.getString("function");
String name = transformObj.getString("name");
String fieldName = transformObj.getString("fieldName");
String parameters = transformObj.getString("parameters");
switch (function) {
case "alignment":
if (StringUtil.isNotBlank(parameters)) {
if (message.containsKey(fieldName)) {
alignmentUtils(parameters, message, name, fieldName);
}
}
break;
case "combination":
if (StringUtil.isNotBlank(parameters)) {
combinationUtils(parameters, message, name, fieldName, dimensionsObj);
}
break;
case "hierarchy":
String hierarchyValue = message.getString(fieldName);
//TODO 执行拆分代码
if (StringUtil.isNotBlank(hierarchyValue) && StringUtil.isNotBlank(parameters)) {
String[] hierarchyPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String[] dimensionsArr = hierarchyValue.split(hierarchyPars[0]);
//TODO 递归拼接tuple并发送出去
AggregateUtils.reSend(1, dimensionsArr, "", collector, message, dimensionsObj, name);
}
break;
default:
//数据原样输出
collector.emit(new Values(null, null, message.toString()));
break;
}
}
}
} catch (Exception e) {
logger.error("上层解析原始日志/拼接计算日志发送异常,异常信息:" + e);
e.printStackTrace();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, StreamAggregateConfig.UPDATE_APP_ID_TIME);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("label", "dimensions", "message"));
}
/**
* alignment ID替换操作
*
* @param parameters 参数集
* @param message 原始日志
* @param name 结果日志列名
* @param fieldName 原始日志列名
*/
private static void alignmentUtils(String parameters, JSONObject message, String name, String fieldName) {
String[] alignmentPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String data = message.getString(fieldName);
System.out.println("alignmentPars=" + Arrays.toString(alignmentPars) + "data=" + data);
int subscript = Integer.parseInt(alignmentPars[0]);
String[] fieldSplit = data.split(alignmentPars[1]);
Long appID = Long.valueOf(fieldSplit[subscript]);
int length = fieldSplit[subscript].length();
StringBuilder sb = new StringBuilder(data);
message.put(name, sb.replace(0, length, appMap.get(appID)));
}
/**
* combination 拼接操作
*
* @param parameters 参数集
* @param message 原始日志
* @param name 结果日志列名
* @param fieldName 原始日志列名
* @param dimensionsObj 结果集
*/
private static void combinationUtils(String parameters, JSONObject message, String name, String fieldName, JSONObject dimensionsObj) {
String[] combinationPars = parameters.split(StreamAggregateConfig.FORMAT_SPLITTER);
String parameter0Value = message.getString(combinationPars[0]);
if (StringUtil.isNotBlank(parameter0Value)) {
String combinationValue = parameter0Value + combinationPars[1] + message.getString(fieldName);
message.put(fieldName, combinationValue);
dimensionsObj.put(name, combinationValue);
}
}
}

View File

@@ -0,0 +1,44 @@
package cn.ac.iie.storm.bolt;
import cn.ac.iie.storm.utils.common.LogSendKafka;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
* 发送到kafka的bolt
*
* @author qidaijie
*/
public class ResultSendBolt extends BaseBasicBolt {
private static final long serialVersionUID = 3237813470939823159L;
private static Logger logger = Logger.getLogger(ResultSendBolt.class);
private LogSendKafka logSendKafka;
@Override
public void prepare(Map stormConf, TopologyContext context) {
logSendKafka = LogSendKafka.getInstance();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
try {
logSendKafka.sendMessage(tuple.getStringByField("message"));
} catch (Exception e) {
logger.error(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "日志发送Kafka过程出现异常,异常信息:" + e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@@ -0,0 +1,68 @@
package cn.ac.iie.storm.bolt.change;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* @ClassNameFilterBolt
* @Author lixkvip@126.com
* @Date2020/7/1 12:02
* @Version V1.0
**/
public class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
JSONObject source = JSONObject.parseObject(input.getStringByField("source"));
String schema = "";
String data = JSONObject.parseObject(schema).getString("data");
String filters = JSONObject.parseObject(data).getString("filters");
boolean flag = true;
String type = JSONObject.parseObject(filters).getString("type");
JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
if ("and".equals(type)) {
for (int i = 0; i < fieldsArr.size(); i++) {
JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
String name = field.getString("fieldName");
String fieldType = field.getString("type");
Object values = field.get("values");
Object nameValue = source.get(name);
System.out.println("nameValue ========" +nameValue);
if ("not".equals(fieldType)) {
if (nameValue == values) {
//满足过滤条件
flag = false;
}
} else if ("in".equals(fieldType)) {
if (!values.toString().contains(nameValue.toString())) {
//满足过滤条件
flag = false;
}
}
}}
collector.emit(new Values(source));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("filter"));
}
}

View File

@@ -0,0 +1,40 @@
package cn.ac.iie.storm.bolt.print;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
/**
* @ClassNamePrintBolt
* @Author lixkvip@126.com
* @Date2020/6/28 15:34
* @Version V1.0
**/
public class PrintBolt extends BaseBasicBolt {
private final static Logger logger = Logger.getLogger(PrintBolt.class);
private static long a;
private long b;
public static long c;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
logger.error("==================================一批数据=========================");
a= System.currentTimeMillis();
b= System.currentTimeMillis();
c= System.currentTimeMillis();
logger.error(Thread.currentThread() + "private static long a======:" + a);
logger.error(Thread.currentThread() + "private long b======:" + b);
logger.error(Thread.currentThread() + "public static long c======:" + c);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@@ -0,0 +1,80 @@
package cn.ac.iie.storm.spout;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* kafkaSpout
*
* @author Administrator
*/
public class CustomizedKafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = -3363788553406229592L;
private KafkaConsumer<String, String> consumer;
private SpoutOutputCollector collector = null;
private TopologyContext context = null;
private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", StreamAggregateConfig.BOOTSTRAP_SERVERS);
props.put("group.id", StreamAggregateConfig.GROUP_ID);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("auto.offset.reset", StreamAggregateConfig.AUTO_OFFSET_RESET);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
this.consumer.subscribe(Collections.singletonList(StreamAggregateConfig.KAFKA_TOPIC));
}
@Override
public void close() {
consumer.close();
}
@Override
public void nextTuple() {
try {
// TODO Auto-generated method stub
ConsumerRecords<String, String> records = consumer.poll(10000L);
Thread.sleep(StreamAggregateConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
for (ConsumerRecord<String, String> record : records) {
this.collector.emit(new Values(record.value()));
}
} catch (Exception e) {
logger.error("KafkaSpout发送消息出现异常!", e);
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("source"));
}
}

View File

@@ -0,0 +1,35 @@
package cn.ac.iie.storm.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
/**
* @author Administrator
*/
public final class StormRunner {
private static final int MILLS_IN_SEC = 1000;
private StormRunner() {}
public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, conf, builder.createTopology());
Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
localCluster.shutdown();
}
public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}

View File

@@ -0,0 +1,103 @@
package cn.ac.iie.storm.topology;
import cn.ac.iie.storm.bolt.AggregateBolt;
import cn.ac.iie.storm.bolt.ResultSendBolt;
import cn.ac.iie.storm.bolt.ParseKvBolt;
import cn.ac.iie.storm.spout.CustomizedKafkaSpout;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* @ClassNameFlowAggregateTopo
* @Author lixkvip@126.com
* @Date2020/6/24 10:14
* @Version V1.0
**/
public class StreamAggregateTopology {
private static Logger logger = Logger.getLogger(StreamAggregateTopology.class);
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
private StreamAggregateTopology() {
this(StreamAggregateTopology.class.getSimpleName());
}
private StreamAggregateTopology(String topologyName) {
this.topologyName = topologyName;
topologyConfig = createTopologConfig();
}
/**
* 测试配置
* conf.setTopologyWorkerMaxHeapSize(6144);
* conf.put(Config.WORKER_CHILDOPTS, "-Xmx4G -Xms2G");
*/
private Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(60);
conf.setMaxSpoutPending(StreamAggregateConfig.SPOUT_PARALLELISM);
conf.setNumAckers(StreamAggregateConfig.TOPOLOGY_NUM_ACKS);
return conf;
}
private void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 6000);
}
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(StreamAggregateConfig.TOPOLOGY_WORKERS);
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), StreamAggregateConfig.SPOUT_PARALLELISM);
builder.setBolt("ParseKvBolt", new ParseKvBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
.localOrShuffleGrouping("CustomizedKafkaSpout");
builder.setBolt("AggregateBolt", new AggregateBolt(), StreamAggregateConfig.DATACENTER_BOLT_PARALLELISM)
.fieldsGrouping("ParseKvBolt", new Fields("dimensions"));
builder.setBolt("ResultSendBolt", new ResultSendBolt(), StreamAggregateConfig.KAFKA_BOLT_PARALLELISM)
.localOrShuffleGrouping("AggregateBolt");
// builder.setBolt("PrintBolt", new PrintBolt(), 3).localOrShuffleGrouping("LogFlowWriteSpout");
}
public static void main(String[] args) throws Exception {
StreamAggregateTopology csst = null;
boolean runLocally = true;
String parameter = "remote";
int size = 2;
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
runLocally = false;
csst = new StreamAggregateTopology(args[0]);
} else {
csst = new StreamAggregateTopology();
}
csst.buildTopology();
if (runLocally) {
logger.info("执行本地模式...");
csst.runLocally();
} else {
logger.info("执行远程部署模式...");
csst.runRemotely();
}
}
}

View File

@@ -0,0 +1,208 @@
package cn.ac.iie.storm.utils.combine;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
/**
* @ClassNameAggregateUtils
* @Author lixkvip@126.com
* @Date2020/6/23 14:04
* @Version V1.0
**/
public class AggregateUtils {
private final static Logger logger = Logger.getLogger(AggregateUtils.class);
/**
* Long类型的数据求和
*
* @param value1 第一个值
* @param value2 第二个值
* @return value1 + value2
*/
public static Long longSum(Long value1, Long value2) {
return value1 + value2;
}
/**
* 计算Count
*
* @param count 当前count值
* @return count+1
*/
public static Long count(Long count) {
count++;
return count;
}
/**
* 返回指标列的Map集合
*
* @param schema 动态获取的schema
* @return 指标列集合
* c2s_byte_len, { "function" : "c2s_byte_sum", "name" : "c2s_byte_len", "fieldName" : "common_c2s_byte_num" }
*/
public static HashMap<String, JSONObject> getMetircsMap(String schema) {
HashMap<String, JSONObject> metricsMap = new HashMap<>(16);
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
JSONArray metrics = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("doc")).getString("metrics"));
for (Object metric : metrics) {
JSONObject json = JSONObject.parseObject(metric.toString());
String name = json.getString("name");
metricsMap.put(name, json);
}
return metricsMap;
}
/**
* 递归发送tuple
*
* @param headIndex ssss
* @param splitArr
* @param initStr
* @param collector
* @param message
* @param dimesionsObj
* @param name
*/
public static void reSend(int headIndex, String[] splitArr, String initStr, BasicOutputCollector collector, JSONObject message, JSONObject dimesionsObj, String name) {
// //递归拼接字符串
// if (splitArr.length == headIndex - 1) {
// //什么也不做
// } else {
// //递归的核心代码
// if ("".equals(initStr)) {
// initStr = splitArr[splitArr.length - headIndex];
// } else {
// initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
// }
// dimesionsObj.put(name, initStr);
//
// collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
// reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
// }
//递归拼接字符串
if (splitArr.length != headIndex - 1) {
//递归的核心代码
if ("".equals(initStr)) {
initStr = splitArr[splitArr.length - headIndex];
} else {
initStr = initStr + "/" + splitArr[splitArr.length - headIndex];
}
dimesionsObj.put(name, initStr);
System.out.println("发送之前================================");
collector.emit(new Values(splitArr[splitArr.length - headIndex], dimesionsObj.toString(), message.toString()));
System.out.println("send label ==============="+splitArr[splitArr.length - headIndex]);
System.out.println("send dimesion============="+dimesionsObj.toString());
reSend(headIndex + 1, splitArr, initStr, collector, message, dimesionsObj, name);
}
}
/**
* 获取action模块的Map集合
*
* @param schema 动态获取的schema
* @return HTTPmetrics数组
*/
public static HashMap<String, String[]> getActionMap(String schema) {
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
JSONArray actions = JSONObject.parseArray(JSONObject.parseObject(JSONObject.parseObject(data).getString("doc")).getString("action"));
HashMap<String, String[]> map = new HashMap<>(16);
for (Object action : actions) {
JSONObject json = JSONObject.parseObject(action.toString());
String label = json.getString("label");
String[] metrics = json.getString("metrics").split(",");
map.put(label, metrics);
}
return map;
}
/**
* 获取时间列的集合
*
* @param schema 动态获取的schema
* @return 时间列
*/
public static String getTimeMetric(String schema) {
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
return JSONObject.parseObject(JSONObject.parseObject(JSONObject.parseObject(data)
.getString("doc"))
.getString("timestamp"))
.getString("name");
}
/**
* 更新缓存中的对应关系map
*
* @param hashMap 当前缓存对应关系map
*/
public static void updateAppRelation(HashMap<Long, String> hashMap) {
try {
Long begin = System.currentTimeMillis();
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
String data = JSONObject.parseObject(schema).getString("data");
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
Long key = jsonArray.getLong(0);
String value = jsonArray.getString(1);
if (hashMap.containsKey(key)) {
if (!value.equals(hashMap.get(key))) {
hashMap.put(key, value);
}
} else {
hashMap.put(key, value);
}
}
System.out.println((System.currentTimeMillis() - begin));
logger.warn("更新缓存中的对应的APP关系,拉取接口数据长度:[" + objects.size());
} catch (Exception e) {
logger.error("更新缓存APP-ID失败,异常:" + e);
}
}
/**
* 解析 dimensions 字段集
*
* @param dimensions 维度集
* @param message 原始日志
* @return 结果维度集
*/
public static JSONObject transDimensions(JSONArray dimensions, JSONObject message) {
JSONObject dimensionsObj = new JSONObject();
for (Object dimension : dimensions) {
String fieldName = JSONObject.parseObject(dimension.toString()).getString("fieldName");
String name = JSONObject.parseObject(dimension.toString()).getString("name");
dimensionsObj.put(name, message.get(fieldName));
}
return dimensionsObj;
}
}

View File

@@ -0,0 +1,76 @@
package cn.ac.iie.storm.utils.common;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;
import java.util.Properties;
/**
* NTC系统配置产生日志写入数据中心类
*
* @author Administrator
* @create 2018-08-13 15:11
*/
public class LogSendKafka {
private static Logger logger = Logger.getLogger(LogSendKafka.class);
/**
* kafka生产者用于向kafka中发送消息
*/
private static Producer<String, String> kafkaProducer;
/**
* kafka生产者适配器单例用来代理kafka生产者发送消息
*/
private static LogSendKafka logSendKafka;
private LogSendKafka() {
initKafkaProducer();
}
public static LogSendKafka getInstance() {
if (logSendKafka == null) {
logSendKafka = new LogSendKafka();
}
return logSendKafka;
}
public void sendMessage(String message) {
final int[] errorSum = {0};
kafkaProducer.send(new ProducerRecord<>(StreamAggregateConfig.RESULTS_OUTPUT_TOPIC, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error("写入" + StreamAggregateConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
errorSum[0]++;
}
}
});
kafkaProducer.flush();
logger.debug("Log sent to National Center successfully!!!!!");
}
/**
* 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
*/
private void initKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", StreamAggregateConfig.RESULTS_BOOTSTRAP_SERVERS);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("linger.ms", "2");
properties.put("request.timeout.ms", 30000);
properties.put("batch.size", 262144);
properties.put("buffer.memory", 33554432);
properties.put("compression.type", StreamAggregateConfig.KAFKA_COMPRESSION_TYPE);
kafkaProducer = new KafkaProducer<>(properties);
}
}

View File

@@ -0,0 +1,45 @@
package cn.ac.iie.storm.utils.file;
/**
* @author Administrator
*/
public class StreamAggregateConfig {
public static final String FORMAT_SPLITTER = ",";
/**
* System
*/
public static final Integer SPOUT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "spout.parallelism");
public static final Integer DATACENTER_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
public static final Integer TOPOLOGY_WORKERS = StreamAggregateConfigurations.getIntProperty(0, "topology.workers");
public static final Integer KAFKA_BOLT_PARALLELISM = StreamAggregateConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
public static final Integer TOPOLOGY_NUM_ACKS = StreamAggregateConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = StreamAggregateConfigurations.getIntProperty(0, "topology.spout.sleep.time");
public static final Integer BATCH_INSERT_NUM = StreamAggregateConfigurations.getIntProperty(0, "batch.insert.num");
public static final Integer DATA_CENTER_ID_NUM = StreamAggregateConfigurations.getIntProperty(0, "data.center.id.num");
public static final Integer MAX_FAILURE_NUM = StreamAggregateConfigurations.getIntProperty(0, "max.failure.num");
public static final Integer AGG_TIME = StreamAggregateConfigurations.getIntProperty(0, "agg.time");
public static final Integer UPDATE_APP_ID_TIME = StreamAggregateConfigurations.getIntProperty(0, "update.app.id.time");
/**
* kafka
*/
public static final String BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "bootstrap.servers");
public static final String RESULTS_BOOTSTRAP_SERVERS = StreamAggregateConfigurations.getStringProperty(0, "results.bootstrap.servers");
public static final String GROUP_ID = StreamAggregateConfigurations.getStringProperty(0, "group.id");
public static final String RESULTS_OUTPUT_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "results.output.topic");
public static final String KAFKA_TOPIC = StreamAggregateConfigurations.getStringProperty(0, "kafka.topic");
public static final String AUTO_OFFSET_RESET = StreamAggregateConfigurations.getStringProperty(0, "auto.offset.reset");
public static final String KAFKA_COMPRESSION_TYPE = StreamAggregateConfigurations.getStringProperty(0, "kafka.compression.type");
/**
* http
*/
public static final String SCHEMA_HTTP = StreamAggregateConfigurations.getStringProperty(0, "schema.http");
public static final String APP_ID_HTTP = StreamAggregateConfigurations.getStringProperty(0, "app.id.http");
}

View File

@@ -0,0 +1,65 @@
package cn.ac.iie.storm.utils.file;
import java.util.Properties;
/**
* @author Administrator
*/
public final class StreamAggregateConfigurations {
// private static Properties propCommon = new Properties();
private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propService.getProperty(key);
// } else if (type == 1) {
// return propCommon.getProperty(key);
} else {
return null;
}
}
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propService.getProperty(key));
// } else if (type == 1) {
// return Integer.parseInt(propCommon.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propService.getProperty(key));
// } else if (type == 1) {
// return Long.parseLong(propCommon.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return "true".equals(propService.getProperty(key).toLowerCase().trim());
// } else if (type == 1) {
// return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
} else {
return null;
}
}
static {
try {
propService.load(StreamAggregateConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
} catch (Exception e) {
// propCommon = null;
propService = null;
}
}
}

View File

@@ -0,0 +1,55 @@
package cn.ac.iie.storm.utils.http;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 获取网关schema的工具类
*
* @author qidaijie
*/
public class HttpClientUtil {
/**
* 请求网关获取schema
* @param http 网关url
* @return schema
*/
public static String requestByGetMethod(String http) {
CloseableHttpClient httpClient = HttpClients.createDefault();
StringBuilder entityStringBuilder = null;
try {
HttpGet get = new HttpGet(http);
try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
HttpEntity entity = httpResponse.getEntity();
entityStringBuilder = new StringBuilder();
if (null != entity) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
String line;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return entityStringBuilder.toString();
}
}

11
src/main/main.iml Normal file
View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -0,0 +1,54 @@
package com.wp;
import cn.ac.iie.storm.utils.file.StreamAggregateConfig;
import cn.ac.iie.storm.utils.http.HttpClientUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
/**
* @author qidaijie
* @Package com.wp
* @Description:
* @date 2020/9/2215:09
*/
public class AppIdTest {
@Test
public void appTest() {
//http://192.168.44.12:9999/open-api/appDicList
String schema = HttpClientUtil.requestByGetMethod(StreamAggregateConfig.APP_ID_HTTP);
JSONObject jsonObject = JSONObject.parseObject(schema);
String data = jsonObject.getString("data");
HashMap<Long, String> map = new HashMap<>(16);
JSONArray objects = JSONArray.parseArray(data);
for (Object object : objects) {
JSONArray jsonArray = JSONArray.parseArray(object.toString());
map.put(jsonArray.getLong(0), jsonArray.getString(1));
// System.out.println(object);
}
System.out.println(map.toString());
System.out.println(map.size());
}
@Test
public void changeApp() {
String a = "QQ";
String[] alignmentPars = "0,/".split(StreamAggregateConfig.FORMAT_SPLITTER);
String data = "100/HTTP";
int subscript = Integer.parseInt(alignmentPars[0]);
String[] fieldSplit = data.split(alignmentPars[1]);
Long appID = Long.valueOf(fieldSplit[subscript]);
int length = fieldSplit[subscript].length();
StringBuilder sb = new StringBuilder(data);
System.out.println(sb.replace(0, length, a));
}
}

View File

@@ -0,0 +1,133 @@
package com.wp;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* @ClassNameFilterBolt
* @Author lixkvip@126.com
* @Date2020/7/1 14:53
* @Version V1.0
**/
public class FilterBolt {
@SuppressWarnings("all")
public static void main(String[] args) {
JSONObject source = new JSONObject();
String schema = "{\n" +
" \"task\": \"Application-Protocol-Distribution\",\n" +
" \"in\": \"CONNECTION-SKETCH-COMPLETED\",\n" +
" \"out\": \"TRAFFIC-PROTOCOL-STAT-LOG\",\n" +
" \"data\": {\n" +
" \"timestamp\": {\n" +
" \"name\": \"stat_time\"\n" +
" },\n" +
" \"dimensions\": [\n" +
" {\n" +
" \"name\": \"protocol_id\",\n" +
" \"fieldName\": \"common_protocol_label\",\n" +
" \"type\": \"String\"\n" +
" },\n" +
" {\n" +
" \"name\": \"device_id\",\n" +
" \"fieldName\": \"common_device_id\",\n" +
" \"type\": \"String\"\n" +
" },\n" +
" {\n" +
" \"name\": \"isp\",\n" +
" \"fieldName\": \"common_isp\",\n" +
" \"type\": \"String\"\n" +
" }\n" +
" ],\n" +
" \"metrics\": [\n" +
" { \"function\" : \"sessions_count\", \"name\" : \"sessions\"},\n" +
" { \"function\" : \"c2s_byte_sum\", \"name\" : \"c2s_byte_len\", \"fieldName\" : \"common_c2s_byte_num\" },\n" +
" { \"function\" : \"s2c_byte_sum\", \"name\" : \"s2c_byte_len\", \"fieldName\" : \"common_s2c_byte_num\" },\n" +
" { \"function\" : \"c2s_pkt_sum\", \"name\" : \"c2s_pkt_num\", \"fieldName\" : \"common_c2s_pkt_num\" },\n" +
" { \"function\" : \"s2c_pkt_sum\", \"name\" : \"s2c_pkt_num\", \"fieldName\" : \"common_s2c_pkt_num\" },\n" +
" { \"function\" : \"sip_disCount\", \"name\" : \"unique_sip_num\", \"fieldName\" : \"common_server_ip\" },\n" +
" { \"function\" : \"cip_disCount\", \"name\" : \"unique_cip_num\", \"fieldName\" : \"common_client_ip\" }\n" +
" ],\n" +
" \"filters\": {\n" +
" \"type\": \"and\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"fieldName\": \"common_device_id\",\n" +
" \"type\": \"not\",\n" +
" \"values\": null\n" +
" },\n" +
" {\n" +
" \"fieldName\": \"common_protocol_label\",\n" +
" \"type\": \"not\",\n" +
" \"values\": null\n" +
" },\n" +
" {\n" +
" \"fieldName\": \"common_isp\",\n" +
" \"type\": \"not\",\n" +
" \"values\": null\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"transforms\":[\n" +
" {\"function\":\"combination\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"common_app_label,/\"},\n" +
" {\"function\":\"hierarchy\",\"name\":\"protocol_id\",\"fieldName\":\"common_protocol_label\",\"parameters\": \"/\"}\n" +
" ],\n" +
" \"action\":[\n" +
" {\"label\": \"Default\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num\"},\n" +
" {\"label\": \"HTTP\", \"metrics\": \"sessions,c2s_byte_len,s2c_byte_len,c2s_pkt_num,s2c_pkt_num,unique_sip_num,unique_cip_num\"}\n" +
" ],\n" +
" \"granularity\":{\n" +
" \"type\": \"period\",\n" +
" \"period\": \"5M\"\n" +
" }\n" +
" }\n" +
"}";
source.put("common_protocol_label", "HTTP");
source.put("common_isp", "Unicom");
source.put("common_device_id", "1");
String data = JSONObject.parseObject(schema).getString("data");
String filters = JSONObject.parseObject(data).getString("filters");
boolean flag = true;
String type = JSONObject.parseObject(filters).getString("type");
JSONArray fieldsArr = JSONObject.parseArray(JSONObject.parseObject(filters).getString("fields"));
if ("and".equals(type)) {
for (int i = 0; i < fieldsArr.size(); i++) {
JSONObject field = JSONObject.parseObject(fieldsArr.get(i).toString());
String name = field.getString("fieldName");
String fieldType = field.getString("type");
Object values = field.get("values");
Object nameValue = source.get(name);
if ("not".equals(fieldType)) {
if (nameValue == values) {
//满足过滤条件
flag = false;
}
} else if ("in".equals(fieldType)) {
if (!values.toString().contains(nameValue.toString())) {
//满足过滤条件
flag = false;
}
}
}
if (flag){
System.out.println("输出到下个Bolt");
}else {
System.out.println("此条消息被过滤掉");
}
}
}
}

View File

@@ -0,0 +1,43 @@
package com.wp;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
/**
* @ClassNameSchemaTest
* @Author lixkvip@126.com
* @Date2020/6/28 10:11
* @Version V1.0
**/
public class SchemaTest {
static String str = "";
public static void main(String[] args) {
String str1 = null;
String str2 = " ";
System.out.println(StringUtil.isNotBlank(str1));
System.out.println(StringUtil.isNotEmpty(str1));
System.out.println(StringUtil.isNotBlank(str2));
System.out.println(StringUtil.isNotEmpty(str2));
}
public static void reAdd(int m, String[] split, String str) {
//递归拼接字符串
if (0 == m) {
} else {
//递归的核心代码
str = str + split[m - 1] + "/";
reAdd(m - 1, split, str);
}
}
}

12
src/test/test.iml Normal file
View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/java" isTestSource="true" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="main" />
</component>
</module>