OLAP预聚合代码初始版本
This commit is contained in:
292
pom.xml
Normal file
292
pom.xml
Normal file
@@ -0,0 +1,292 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>cn.ac.iie</groupId>
|
||||
<artifactId>log-stream-aggregation</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>log-stream-aggregation</name>
|
||||
<url>http://maven.apache.org</url>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>nexus</id>
|
||||
<name>Team Nexus Repository</name>
|
||||
<url>http://192.168.40.125:8099/content/groups/public</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>2.4.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>cn.ac.iie.topology.LogFlowWriteTopology</mainClass>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.handlers</resource>
|
||||
</transformer>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
|
||||
<resource>META-INF/spring.schemas</resource>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>2.3.2</version>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>properties</directory>
|
||||
<includes>
|
||||
<include>**/*.properties</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
<resource>
|
||||
<directory>properties</directory>
|
||||
<includes>
|
||||
<include>log4j.properties</include>
|
||||
</includes>
|
||||
<filtering>false</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
<storm.version>1.0.2</storm.version>
|
||||
<hbase.version>2.2.1</hbase.version>
|
||||
<hadoop.version>2.7.1</hadoop.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>1.0.0</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-core</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
<scope>provided</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.influxdb</groupId>
|
||||
<artifactId>influxdb-java</artifactId>
|
||||
<version>2.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.storm</groupId>
|
||||
<artifactId>storm-kafka</artifactId>
|
||||
<version>${storm.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.59</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cglib</groupId>
|
||||
<artifactId>cglib-nodep</artifactId>
|
||||
<version>3.2.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zdjizhi</groupId>
|
||||
<artifactId>galaxy</artifactId>
|
||||
<version>1.0.3</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
<version>3.4.9</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<version>${hbase.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-server</artifactId>
|
||||
<version>${hbase.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>log4j-over-slf4j</artifactId>
|
||||
<groupId>org.slf4j</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>5.3.2</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.5.2</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpcore</artifactId>
|
||||
<version>4.4.1</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
87
properties/service_flow_config.properties
Normal file
87
properties/service_flow_config.properties
Normal file
@@ -0,0 +1,87 @@
|
||||
#管理kafka地址
|
||||
#bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
|
||||
bootstrap.servers=192.168.40.127:9093
|
||||
|
||||
#zookeeper 地址
|
||||
zookeeper.servers=192.168.40.127:2182
|
||||
#zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
|
||||
|
||||
#hbase zookeeper地址
|
||||
#hbase.zookeeper.servers=192.168.40.119:2181,192.168.40.122:2181,192.168.40.123:2181
|
||||
hbase.zookeeper.servers=192.168.40.203:2181
|
||||
|
||||
#hbase tablename
|
||||
hbase.table.name=subscriber_info
|
||||
|
||||
#latest/earliest
|
||||
auto.offset.reset=latest
|
||||
|
||||
#压缩模式 none or snappy
|
||||
kafka.compression.type=none
|
||||
|
||||
#kafka broker下的topic名称
|
||||
#kafka.topic=SECURITY-EVENT-LOG
|
||||
kafka.topic=CONNECTION-RECORD-LOG
|
||||
|
||||
#读取topic,存储该spout id的消费offset信息,可通过该拓扑命名;具体存储offset的位置,确定下次读取不重复的数据;
|
||||
group.id=lxk-200512
|
||||
|
||||
#输出topic
|
||||
results.output.topic=CONNECTION-RECORD-COMPLETED-LOG
|
||||
#results.output.topic=SECURITY-EVENT-COMPLETED-LOG
|
||||
|
||||
#storm topology workers
|
||||
topology.workers=1
|
||||
|
||||
#spout并行度 建议与kafka分区数相同
|
||||
spout.parallelism=1
|
||||
|
||||
#处理补全操作的bolt并行度-worker的倍数
|
||||
datacenter.bolt.parallelism=1
|
||||
|
||||
#写入kafka的并行度10
|
||||
kafka.bolt.parallelism=1
|
||||
|
||||
#定位库地址
|
||||
#ip.library=/home/ceiec/topology/dat/
|
||||
#ip.library=D:\\workerSpace\\K18-Phase2\\3.0.2019115\\log-stream-completion\\
|
||||
ip.library=/dat/
|
||||
|
||||
#kafka批量条数
|
||||
batch.insert.num=2000
|
||||
|
||||
#网关的schema位置
|
||||
schema.http=http://192.168.40.224:9999/metadata/schema/v1/fields/connection_record_log
|
||||
|
||||
#数据中心(UID)
|
||||
data.center.id.num=15
|
||||
|
||||
#tick时钟频率
|
||||
topology.tick.tuple.freq.secs=5
|
||||
|
||||
#hbase 更新时间
|
||||
hbase.tick.tuple.freq.secs=60
|
||||
|
||||
#当bolt性能受限时,限制spout接收速度,理论看ack开启才有效
|
||||
topology.config.max.spout.pending=150000
|
||||
|
||||
#ack设置 1启动ack 0不启动ack
|
||||
topology.num.acks=0
|
||||
|
||||
#spout接收睡眠时间
|
||||
topology.spout.sleep.time=1
|
||||
|
||||
#允许发送kafka最大失败数
|
||||
max.failure.num=20
|
||||
|
||||
#邮件默认编码
|
||||
mail.default.charset=UTF-8
|
||||
|
||||
#influx地址
|
||||
influx.ip=http://192.168.40.151:8086
|
||||
|
||||
#influx用户名
|
||||
influx.username=admin
|
||||
|
||||
#influx密码
|
||||
influx.password=admin
|
||||
138
src/java/cn/ac/iie/bean/KeyTuple.java
Normal file
138
src/java/cn/ac/iie/bean/KeyTuple.java
Normal file
@@ -0,0 +1,138 @@
|
||||
package cn.ac.iie.bean;
|
||||
|
||||
/**
|
||||
* @ClassNameKeyTuple
|
||||
* @Author lixkvip@126.com
|
||||
* @Date2020/5/27 16:18
|
||||
* @Version V1.0
|
||||
**/
|
||||
public class KeyTuple {
|
||||
|
||||
private int common_policy_id;
|
||||
private int common_action;
|
||||
private String common_sub_action;
|
||||
private String common_client_ip;
|
||||
private String common_client_location;
|
||||
private String common_sled_ip;
|
||||
private String common_device_id;
|
||||
private String common_subscriber_id;
|
||||
private String common_server_ip;
|
||||
private String common_server_location;
|
||||
private int common_server_port;
|
||||
private String common_l4_protocol;
|
||||
private String http_domain;
|
||||
private String ssl_sni;
|
||||
|
||||
|
||||
public int getCommon_policy_id() {
|
||||
return common_policy_id;
|
||||
}
|
||||
|
||||
public void setCommon_policy_id(int common_policy_id) {
|
||||
this.common_policy_id = common_policy_id;
|
||||
}
|
||||
|
||||
public int getCommon_action() {
|
||||
return common_action;
|
||||
}
|
||||
|
||||
public void setCommon_action(int common_action) {
|
||||
this.common_action = common_action;
|
||||
}
|
||||
|
||||
public String getCommon_sub_action() {
|
||||
return common_sub_action;
|
||||
}
|
||||
|
||||
public void setCommon_sub_action(String common_sub_action) {
|
||||
this.common_sub_action = common_sub_action;
|
||||
}
|
||||
|
||||
public String getCommon_client_ip() {
|
||||
return common_client_ip;
|
||||
}
|
||||
|
||||
public void setCommon_client_ip(String common_client_ip) {
|
||||
this.common_client_ip = common_client_ip;
|
||||
}
|
||||
|
||||
public String getCommon_client_location() {
|
||||
return common_client_location;
|
||||
}
|
||||
|
||||
public void setCommon_client_location(String common_client_location) {
|
||||
this.common_client_location = common_client_location;
|
||||
}
|
||||
|
||||
public String getCommon_sled_ip() {
|
||||
return common_sled_ip;
|
||||
}
|
||||
|
||||
public void setCommon_sled_ip(String common_sled_ip) {
|
||||
this.common_sled_ip = common_sled_ip;
|
||||
}
|
||||
|
||||
public String getCommon_device_id() {
|
||||
return common_device_id;
|
||||
}
|
||||
|
||||
public void setCommon_device_id(String common_device_id) {
|
||||
this.common_device_id = common_device_id;
|
||||
}
|
||||
|
||||
public String getCommon_subscriber_id() {
|
||||
return common_subscriber_id;
|
||||
}
|
||||
|
||||
public void setCommon_subscriber_id(String common_subscriber_id) {
|
||||
this.common_subscriber_id = common_subscriber_id;
|
||||
}
|
||||
|
||||
public String getCommon_server_ip() {
|
||||
return common_server_ip;
|
||||
}
|
||||
|
||||
public void setCommon_server_ip(String common_server_ip) {
|
||||
this.common_server_ip = common_server_ip;
|
||||
}
|
||||
|
||||
public String getCommon_server_location() {
|
||||
return common_server_location;
|
||||
}
|
||||
|
||||
public void setCommon_server_location(String common_server_location) {
|
||||
this.common_server_location = common_server_location;
|
||||
}
|
||||
|
||||
public int getCommon_server_port() {
|
||||
return common_server_port;
|
||||
}
|
||||
|
||||
public void setCommon_server_port(int common_server_port) {
|
||||
this.common_server_port = common_server_port;
|
||||
}
|
||||
|
||||
public String getCommon_l4_protocol() {
|
||||
return common_l4_protocol;
|
||||
}
|
||||
|
||||
public void setCommon_l4_protocol(String common_l4_protocol) {
|
||||
this.common_l4_protocol = common_l4_protocol;
|
||||
}
|
||||
|
||||
public String getHttp_domain() {
|
||||
return http_domain;
|
||||
}
|
||||
|
||||
public void setHttp_domain(String http_domain) {
|
||||
this.http_domain = http_domain;
|
||||
}
|
||||
|
||||
public String getSsl_sni() {
|
||||
return ssl_sni;
|
||||
}
|
||||
|
||||
public void setSsl_sni(String ssl_sni) {
|
||||
this.ssl_sni = ssl_sni;
|
||||
}
|
||||
}
|
||||
67
src/java/cn/ac/iie/bean/ValueTuple.java
Normal file
67
src/java/cn/ac/iie/bean/ValueTuple.java
Normal file
@@ -0,0 +1,67 @@
|
||||
package cn.ac.iie.bean;
|
||||
|
||||
/**
|
||||
* @ClassNameValueTuple
|
||||
* @Author lixkvip@126.com
|
||||
* @Date2020/5/27 16:18
|
||||
* @Version V1.0
|
||||
**/
|
||||
public class ValueTuple {
|
||||
|
||||
private int common_sessions;
|
||||
private int common_c2s_pkt_num;
|
||||
private int common_s2c_pkt_num;
|
||||
private int common_c2s_byte_num;
|
||||
private int common_s2c_byte_num;
|
||||
|
||||
public int getCommon_sessions() {
|
||||
return common_sessions;
|
||||
}
|
||||
|
||||
public void setCommon_sessions(int common_sessions) {
|
||||
this.common_sessions = common_sessions;
|
||||
}
|
||||
|
||||
public int getCommon_c2s_pkt_num() {
|
||||
return common_c2s_pkt_num;
|
||||
}
|
||||
|
||||
public void setCommon_c2s_pkt_num(int common_c2s_pkt_num) {
|
||||
this.common_c2s_pkt_num = common_c2s_pkt_num;
|
||||
}
|
||||
|
||||
public int getCommon_s2c_pkt_num() {
|
||||
return common_s2c_pkt_num;
|
||||
}
|
||||
|
||||
public void setCommon_s2c_pkt_num(int common_s2c_pkt_num) {
|
||||
this.common_s2c_pkt_num = common_s2c_pkt_num;
|
||||
}
|
||||
|
||||
public int getCommon_c2s_byte_num() {
|
||||
return common_c2s_byte_num;
|
||||
}
|
||||
|
||||
public void setCommon_c2s_byte_num(int common_c2s_byte_num) {
|
||||
this.common_c2s_byte_num = common_c2s_byte_num;
|
||||
}
|
||||
|
||||
public int getCommon_s2c_byte_num() {
|
||||
return common_s2c_byte_num;
|
||||
}
|
||||
|
||||
public void setCommon_s2c_byte_num(int common_s2c_byte_num) {
|
||||
this.common_s2c_byte_num = common_s2c_byte_num;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ValueTuple{" +
|
||||
"common_sessions=" + common_sessions +
|
||||
", common_c2s_pkt_num=" + common_c2s_pkt_num +
|
||||
", common_s2c_pkt_num=" + common_s2c_pkt_num +
|
||||
", common_c2s_byte_num=" + common_c2s_byte_num +
|
||||
", common_s2c_byte_num=" + common_s2c_byte_num +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
54
src/java/cn/ac/iie/bolt/CompletionBolt.java
Normal file
54
src/java/cn/ac/iie/bolt/CompletionBolt.java
Normal file
@@ -0,0 +1,54 @@
|
||||
package cn.ac.iie.bolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.system.TupleUtils;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class CompletionBolt extends BaseBasicBolt {
|
||||
private final static Logger logger = Logger.getLogger(CompletionBolt.class);
|
||||
private static final long serialVersionUID = 9006119186526123734L;
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context) {
|
||||
}
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
|
||||
try {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
basicOutputCollector.emit(new Values(message));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
Map<String, Object> conf = new HashMap<String, Object>(16);
|
||||
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
|
||||
FlowWriteConfig.HBASE_TICK_TUPLE_FREQ_SECS);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
outputFieldsDeclarer.declare(new Fields("connLog"));
|
||||
}
|
||||
|
||||
}
|
||||
71
src/java/cn/ac/iie/bolt/NtcLogSendBolt.java
Normal file
71
src/java/cn/ac/iie/bolt/NtcLogSendBolt.java
Normal file
@@ -0,0 +1,71 @@
|
||||
package cn.ac.iie.bolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.kafka.KafkaLogNtc;
|
||||
import cn.ac.iie.utils.system.TupleUtils;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author qidaijie
|
||||
* @date 2018/8/14
|
||||
*/
|
||||
public class NtcLogSendBolt extends BaseBasicBolt {
|
||||
private static final long serialVersionUID = -3663610927224396615L;
|
||||
private static Logger logger = Logger.getLogger(NtcLogSendBolt.class);
|
||||
private List<String> list;
|
||||
private KafkaLogNtc kafkaLogNtc;
|
||||
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context) {
|
||||
list = new LinkedList<>();
|
||||
kafkaLogNtc = KafkaLogNtc.getInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
|
||||
try {
|
||||
if (TupleUtils.isTick(tuple)) {
|
||||
if (list.size() != 0) {
|
||||
kafkaLogNtc.sendMessage(list);
|
||||
list.clear();
|
||||
}
|
||||
} else {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
list.add(message);
|
||||
}
|
||||
if (list.size() == FlowWriteConfig.BATCH_INSERT_NUM) {
|
||||
kafkaLogNtc.sendMessage(list);
|
||||
list.clear();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "日志发送Kafka过程出现异常");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
Map<String, Object> conf = new HashMap<>(16);
|
||||
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, FlowWriteConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
}
|
||||
|
||||
}
|
||||
51
src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java
Normal file
51
src/java/cn/ac/iie/bolt/radius/RadiusCompletionBolt.java
Normal file
@@ -0,0 +1,51 @@
|
||||
package cn.ac.iie.bolt.radius;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.BasicOutputCollector;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseBasicBolt;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* 通联关系日志补全
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class RadiusCompletionBolt extends BaseBasicBolt {
|
||||
|
||||
private final static Logger logger = Logger.getLogger(RadiusCompletionBolt.class);
|
||||
private static final long serialVersionUID = -3657802387129063952L;
|
||||
|
||||
@Override
|
||||
public void prepare(Map stormConf, TopologyContext context) {
|
||||
|
||||
}
|
||||
@SuppressWarnings("Duplicates")
|
||||
@Override
|
||||
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
|
||||
try {
|
||||
String message = tuple.getString(0);
|
||||
if (StringUtil.isNotBlank(message)) {
|
||||
basicOutputCollector.emit(new Values(message));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(FlowWriteConfig.KAFKA_TOPIC + "接收/解析过程出现异常");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
|
||||
outputFieldsDeclarer.declare(new Fields("connLog"));
|
||||
}
|
||||
|
||||
}
|
||||
51
src/java/cn/ac/iie/common/FlowWriteConfig.java
Normal file
51
src/java/cn/ac/iie/common/FlowWriteConfig.java
Normal file
@@ -0,0 +1,51 @@
|
||||
package cn.ac.iie.common;
|
||||
|
||||
|
||||
import cn.ac.iie.utils.system.FlowWriteConfigurations;
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
public class FlowWriteConfig {
|
||||
|
||||
public static final int IPV4_TYPE = 1;
|
||||
public static final int IPV6_TYPE = 2;
|
||||
public static final String FORMAT_SPLITTER = ",";
|
||||
/**
|
||||
* System
|
||||
*/
|
||||
public static final Integer SPOUT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "spout.parallelism");
|
||||
public static final Integer DATACENTER_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "datacenter.bolt.parallelism");
|
||||
public static final Integer TOPOLOGY_WORKERS = FlowWriteConfigurations.getIntProperty(0, "topology.workers");
|
||||
public static final Integer KAFKA_BOLT_PARALLELISM = FlowWriteConfigurations.getIntProperty(0, "kafka.bolt.parallelism");
|
||||
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
|
||||
public static final Integer HBASE_TICK_TUPLE_FREQ_SECS = FlowWriteConfigurations.getIntProperty(0, "hbase.tick.tuple.freq.secs");
|
||||
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = FlowWriteConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
|
||||
public static final Integer TOPOLOGY_NUM_ACKS = FlowWriteConfigurations.getIntProperty(0, "topology.num.acks");
|
||||
public static final Integer TOPOLOGY_SPOUT_SLEEP_TIME = FlowWriteConfigurations.getIntProperty(0, "topology.spout.sleep.time");
|
||||
public static final Integer BATCH_INSERT_NUM = FlowWriteConfigurations.getIntProperty(0, "batch.insert.num");
|
||||
public static final Integer DATA_CENTER_ID_NUM = FlowWriteConfigurations.getIntProperty(0, "data.center.id.num");
|
||||
public static final Integer MAX_FAILURE_NUM = FlowWriteConfigurations.getIntProperty(0, "max.failure.num");
|
||||
public static final String MAIL_DEFAULT_CHARSET = FlowWriteConfigurations.getStringProperty(0, "mail.default.charset");
|
||||
|
||||
/**
|
||||
* kafka
|
||||
*/
|
||||
public static final String BOOTSTRAP_SERVERS = FlowWriteConfigurations.getStringProperty(0, "bootstrap.servers");
|
||||
public static final String ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "zookeeper.servers");
|
||||
public static final String HBASE_ZOOKEEPER_SERVERS = FlowWriteConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
|
||||
public static final String HBASE_TABLE_NAME = FlowWriteConfigurations.getStringProperty(0, "hbase.table.name");
|
||||
public static final String GROUP_ID = FlowWriteConfigurations.getStringProperty(0, "group.id");
|
||||
public static final String RESULTS_OUTPUT_TOPIC = FlowWriteConfigurations.getStringProperty(0, "results.output.topic");
|
||||
public static final String KAFKA_TOPIC = FlowWriteConfigurations.getStringProperty(0, "kafka.topic");
|
||||
public static final String AUTO_OFFSET_RESET = FlowWriteConfigurations.getStringProperty(0, "auto.offset.reset");
|
||||
public static final String KAFKA_COMPRESSION_TYPE = FlowWriteConfigurations.getStringProperty(0, "kafka.compression.type");
|
||||
|
||||
public static final String IP_LIBRARY = FlowWriteConfigurations.getStringProperty(0, "ip.library");
|
||||
|
||||
/**
|
||||
* http
|
||||
*/
|
||||
public static final String SCHEMA_HTTP = FlowWriteConfigurations.getStringProperty(0, "schema.http");
|
||||
|
||||
}
|
||||
81
src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
Normal file
81
src/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
Normal file
@@ -0,0 +1,81 @@
|
||||
package cn.ac.iie.spout;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.spout.SpoutOutputCollector;
|
||||
import org.apache.storm.task.TopologyContext;
|
||||
import org.apache.storm.topology.OutputFieldsDeclarer;
|
||||
import org.apache.storm.topology.base.BaseRichSpout;
|
||||
import org.apache.storm.tuple.Fields;
|
||||
import org.apache.storm.tuple.Values;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* kafkaSpout
|
||||
*
|
||||
* @author Administrator
|
||||
*/
|
||||
public class CustomizedKafkaSpout extends BaseRichSpout {
|
||||
private static final long serialVersionUID = -3363788553406229592L;
|
||||
private KafkaConsumer<String, String> consumer;
|
||||
private SpoutOutputCollector collector = null;
|
||||
private TopologyContext context = null;
|
||||
private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
|
||||
|
||||
|
||||
private static Properties createConsumerConfig() {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
|
||||
props.put("group.id", FlowWriteConfig.GROUP_ID);
|
||||
props.put("session.timeout.ms", "60000");
|
||||
props.put("max.poll.records", 3000);
|
||||
props.put("max.partition.fetch.bytes", 31457280);
|
||||
props.put("auto.offset.reset", FlowWriteConfig.AUTO_OFFSET_RESET);
|
||||
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
return props;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
|
||||
// TODO Auto-generated method stub
|
||||
this.collector = collector;
|
||||
this.context = context;
|
||||
Properties prop = createConsumerConfig();
|
||||
this.consumer = new KafkaConsumer<>(prop);
|
||||
this.consumer.subscribe(Collections.singletonList(FlowWriteConfig.KAFKA_TOPIC));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nextTuple() {
|
||||
try {
|
||||
// TODO Auto-generated method stub
|
||||
ConsumerRecords<String, String> records = consumer.poll(10000L);
|
||||
Thread.sleep(FlowWriteConfig.TOPOLOGY_SPOUT_SLEEP_TIME);
|
||||
for (ConsumerRecord<String, String> record : records) {
|
||||
this.collector.emit(new Values(record.value()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("KafkaSpout发送消息出现异常!", e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
||||
// TODO Auto-generated method stub
|
||||
declarer.declare(new Fields("source"));
|
||||
}
|
||||
}
|
||||
92
src/java/cn/ac/iie/topology/LogFlowWriteTopology.java
Normal file
92
src/java/cn/ac/iie/topology/LogFlowWriteTopology.java
Normal file
@@ -0,0 +1,92 @@
|
||||
package cn.ac.iie.topology;
|
||||
|
||||
|
||||
import cn.ac.iie.bolt.CompletionBolt;
|
||||
import cn.ac.iie.bolt.NtcLogSendBolt;
|
||||
import cn.ac.iie.bolt.radius.RadiusCompletionBolt;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.spout.CustomizedKafkaSpout;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.storm.Config;
|
||||
import org.apache.storm.generated.AlreadyAliveException;
|
||||
import org.apache.storm.generated.AuthorizationException;
|
||||
import org.apache.storm.generated.InvalidTopologyException;
|
||||
import org.apache.storm.topology.TopologyBuilder;
|
||||
|
||||
/**
|
||||
* Storm程序主类
|
||||
*
|
||||
* @author Administrator
|
||||
*/
|
||||
|
||||
public class LogFlowWriteTopology {
|
||||
private static Logger logger = Logger.getLogger(LogFlowWriteTopology.class);
|
||||
private final String topologyName;
|
||||
private final Config topologyConfig;
|
||||
private TopologyBuilder builder;
|
||||
|
||||
private LogFlowWriteTopology() {
|
||||
this(LogFlowWriteTopology.class.getSimpleName());
|
||||
}
|
||||
|
||||
private LogFlowWriteTopology(String topologyName) {
|
||||
this.topologyName = topologyName;
|
||||
topologyConfig = createTopologConfig();
|
||||
}
|
||||
|
||||
private Config createTopologConfig() {
|
||||
Config conf = new Config();
|
||||
conf.setDebug(false);
|
||||
conf.setMessageTimeoutSecs(60);
|
||||
conf.setMaxSpoutPending(FlowWriteConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
|
||||
conf.setNumAckers(FlowWriteConfig.TOPOLOGY_NUM_ACKS);
|
||||
return conf;
|
||||
}
|
||||
|
||||
private void runLocally() throws InterruptedException {
|
||||
topologyConfig.setMaxTaskParallelism(1);
|
||||
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
|
||||
}
|
||||
|
||||
private void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
||||
topologyConfig.setNumWorkers(FlowWriteConfig.TOPOLOGY_WORKERS);
|
||||
//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
|
||||
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
|
||||
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
|
||||
}
|
||||
|
||||
private void buildTopology() {
|
||||
builder = new TopologyBuilder();
|
||||
builder.setSpout("LogFlowWriteSpout", new CustomizedKafkaSpout(), FlowWriteConfig.SPOUT_PARALLELISM);
|
||||
|
||||
builder.setBolt(FlowWriteConfig.KAFKA_TOPIC, new CompletionBolt(), FlowWriteConfig.DATACENTER_BOLT_PARALLELISM).localOrShuffleGrouping("LogFlowWriteSpout");
|
||||
|
||||
builder.setBolt("NtcLogSendBolt", new NtcLogSendBolt(), FlowWriteConfig.KAFKA_BOLT_PARALLELISM).localOrShuffleGrouping(FlowWriteConfig.KAFKA_TOPIC);
|
||||
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
LogFlowWriteTopology csst = null;
|
||||
boolean runLocally = true;
|
||||
String parameter = "remote";
|
||||
int size = 2;
|
||||
if (args.length >= size && parameter.equalsIgnoreCase(args[1])) {
|
||||
runLocally = false;
|
||||
csst = new LogFlowWriteTopology(args[0]);
|
||||
} else {
|
||||
csst = new LogFlowWriteTopology();
|
||||
}
|
||||
|
||||
csst.buildTopology();
|
||||
|
||||
if (runLocally) {
|
||||
logger.info("执行本地模式...");
|
||||
csst.runLocally();
|
||||
} else {
|
||||
logger.info("执行远程部署模式...");
|
||||
csst.runRemotely();
|
||||
}
|
||||
}
|
||||
}
|
||||
35
src/java/cn/ac/iie/topology/StormRunner.java
Normal file
35
src/java/cn/ac/iie/topology/StormRunner.java
Normal file
@@ -0,0 +1,35 @@
|
||||
package cn.ac.iie.topology;
|
||||
|
||||
|
||||
import org.apache.storm.Config;
|
||||
import org.apache.storm.LocalCluster;
|
||||
import org.apache.storm.StormSubmitter;
|
||||
import org.apache.storm.generated.AlreadyAliveException;
|
||||
import org.apache.storm.generated.AuthorizationException;
|
||||
import org.apache.storm.generated.InvalidTopologyException;
|
||||
import org.apache.storm.topology.TopologyBuilder;
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
public final class StormRunner{
|
||||
private static final int MILLS_IN_SEC = 1000;
|
||||
|
||||
private StormRunner() {}
|
||||
|
||||
public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
|
||||
|
||||
LocalCluster localCluster = new LocalCluster();
|
||||
localCluster.submitTopology(topologyName, conf, builder.createTopology());
|
||||
Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
|
||||
localCluster.shutdown();
|
||||
|
||||
}
|
||||
|
||||
public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
|
||||
|
||||
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
152
src/java/cn/ac/iie/utils/general/Aggregate.java
Normal file
152
src/java/cn/ac/iie/utils/general/Aggregate.java
Normal file
@@ -0,0 +1,152 @@
|
||||
package cn.ac.iie.utils.general;
|
||||
|
||||
import cn.ac.iie.bean.ValueTuple;
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.json.JsonParseUtil;
|
||||
import cn.ac.iie.utils.tuple.TwoTuple;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
public class Aggregate {
|
||||
|
||||
|
||||
/**
|
||||
* 在内存中加载反射类用的map
|
||||
*/
|
||||
private static HashMap<String, Class> map = JsonParseUtil.getMapFromHttp(FlowWriteConfig.SCHEMA_HTTP);
|
||||
|
||||
/**
|
||||
* 反射成一个类
|
||||
*/
|
||||
private static Object mapObject = JsonParseUtil.generateObject(map);
|
||||
|
||||
private static String key;
|
||||
|
||||
private static HashMap<String, ValueTuple> resultMap = new HashMap<>();
|
||||
|
||||
private static Object conn;
|
||||
|
||||
private static ValueTuple valueTuple = new ValueTuple();
|
||||
|
||||
private static String test = "{\"bgp_as_num\":\"100\",\"bgp_route\":\"192.168.222.0/24\",\"bgp_type\":1,\"common_action\":4,\"common_address_list\":\"\",\"common_address_type\":4,\"common_app_id\":0,\"common_app_label\":\"\",\"common_c2s_byte_num\":650,\"common_c2s_pkt_num\":7,\"common_client_asn\":\"9198\",\"common_client_ip\":\"95.56.198.87\",\"common_client_location\":\"Pervomayskiy,Almaty oblysy,哈萨克斯坦\",\"common_client_port\":13555,\"common_con_duration_ms\":154122,\"common_device_id\":\"2506398\",\"common_direction\":0,\"common_encapsulation\":0,\"common_end_time\":1590388545,\"common_entrance_id\":20,\"common_has_dup_traffic\":1,\"common_isp\":\"CMCC\",\"common_l4_protocol\":\"VLAN\",\"common_link_id\":1,\"common_log_id\":126995036504993794,\"common_policy_id\":0,\"common_protocol_id\":0,\"common_recv_time\":1590388694,\"common_s2c_byte_num\":9921,\"common_s2c_pkt_num\":21,\"common_schema_type\":\"SSL\",\"common_server_asn\":\"12876\",\"common_server_ip\":\"62.210.101.44\",\"common_server_location\":\"法国\",\"common_server_port\":22,\"common_service\":7,\"common_sled_ip\":\"192.168.10.36\",\"common_start_time\":1590388490,\"common_stream_dir\":2,\"common_stream_error\":\"\",\"common_stream_trace_id\":6193492085736674541,\"common_user_region\":\"prysUgOCWSmUYcGRL5rcUvVc8zbI9MOtlb9KOvH8rZCMRVqnIEyQVtQfBp94IIIjha24tGQ4x33qtC3jSx8udADkuezGGzrTrxCB\",\"common_user_tags\":\"9PD3v4GaIgS97l4wQnRtahW00YBi3933RDQg8PpiB8R9ftXhELHploJ0Ocg1Pj0xH06svaPbY2Tp1Chb0usQPttRqhpNbHTkW3En\",\"dns_aa\":0,\"dns_ancount\":64,\"dns_arcount\":22,\"dns_cname\":\"bFh2JvWJMWTCNcVEyuroMimLhoNM3O4effDDiNA9SRlCFdzaev10\",\"dns_message_id\":744559,\"dns_nscount\":59,\"dns_opcode\":0,\"dns_qclass\":2,\"dns_qdcount\":26,\"dns_qname\":\"kankanews.com12041281\",\"dns_qr\":1,\"dns_qtype\":5,\"dns_ra\":0,\"dns_rcode\":9,\"dns_rd\":0,\"dns_rr\":\"{\\\"aEWseVK\\\":\\\"UEUZ4qlk8qOjJBZ4\\\",\\\"9jGNxy0\\\":\\\"s075dZOXDXZ\\\",\\\"yyNXYD9G\\\":\\\"EEKxB99FuYDHH2E6NrV\\\",\\\"al23zn\\\":\\\"4dX1qd5L0A\\\"}\",\"dns_sub\":1,\"dns_tc\":0,\"ftp_account\":\"JXU2RDRCJXU4QkQ1\",\"ftp_content\":\"JXU2RDRCJXU4QkQ1\",\"ftp_url\":\"ftp://test:test@76.95.92.196/soft/list.txt\",\"http_content_length\":\"37339\",\"http_content_type\":\"application/x-javascript\",\"http_cookie\":\"BD_UPN=12314753\",\"http_domain\":\"163.com\",\"http_host\":\"v.163.com\",\"http_proxy_flag\":1,\"http_referer\":\"https://www.baidu.com/\",\"http_request_body\":\"\",\"http_request_body_key\":\"\",\"http_request_header\":\"\",\"http_request_line\":\"GET www.baidu.com/ HTTP/1.1\",\"http_response_body\":\"\",\"http_response_body_key\":\"\",\"http_response_header\":\"\",\"http_response_line\":\"HTTP/1.1 200 OK\",\"http_sequence\":9,\"http_set_cookie\":\"delPer=0; path=/; domain=.baidu.com\",\"http_snapshot\":\"\",\"http_url\":\"http://v.163.com/movie/2011/7/0/3/M7B9K1R60_M7BAANT03.html\",\"http_user_agent\":\"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.87 Safari/537.36\",\"http_version\":\"http1\",\"mail_account\":\"123456789\",\"mail_attachment_content\":\"\",\"mail_attachment_name\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_attachment_name_charset\":\"UTF-8\",\"mail_bcc\":\"\",\"mail_cc\":\"0yrj459uw4c@msn.com\",\"mail_content\":\"\",\"mail_content_charset\":\"\",\"mail_eml_file\":\"\",\"mail_from\":\"jcgkljyfkx@msn.com\",\"mail_from_cmd\":\"5pk@163.net\",\"mail_protocol_type\":\"POP3\",\"mail_snapshot\":\"\",\"mail_subject\":\"%u6D4B%u8BD5%u90AE%u4EF6\",\"mail_subject_charset\":\"UTF-8\",\"mail_to\":\"cbs@yahoo.com\",\"mail_to_cmd\":\"xlk7nj@hotmail.com\",\"ssl_cert_verify\":0,\"ssl_client_side_latency\":5237,\"ssl_client_side_version\":\"SSLv3\",\"ssl_cn\":\"\",\"ssl_con_latency_ms\":3,\"ssl_error\":\"\",\"ssl_intercept_state\":0,\"ssl_pinningst\":1,\"ssl_san\":\"\",\"ssl_server_side_latency\":4644,\"ssl_server_side_version\":\"TLSv1.1\",\"ssl_sni\":\"cztv.com11547021\",\"ssl_version\":\"V3\",\"streaming_media_protocol\":\"RTP\",\"streaming_media_url\":\"http://home.sogua.com/lujingai/mv/play.aspx?id=30195689\",\"voip_called_account\":\"13307536537\",\"voip_called_number\":\"15301710004\",\"voip_calling_account\":\"15901848931\",\"voip_calling_number\":\"13908208553\"}";
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
|
||||
|
||||
resultMap = aggregateJsonToMap(resultMap, test);
|
||||
|
||||
System.out.println("聚合一次之后: " + resultMap.get(key));
|
||||
|
||||
resultMap = aggregateJsonToMap(resultMap, test);
|
||||
|
||||
System.out.println("聚合两次之后: " + resultMap.get(key));
|
||||
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 将一条新数据累加到HashMap中
|
||||
* @param map
|
||||
* @param message
|
||||
* @return
|
||||
*/
|
||||
public static HashMap<String, ValueTuple> aggregateJsonToMap(HashMap<String,ValueTuple> map, String message) {
|
||||
|
||||
ValueTuple valueTuple = JSON.parseObject(message, ValueTuple.class);
|
||||
|
||||
key = getKey(message);
|
||||
|
||||
map.put(key,addTuple(map.get(key), valueTuple));
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* 两个ValueTuple类型的对象做相应属性的聚合
|
||||
* @param result
|
||||
* @param message
|
||||
* @return
|
||||
*/
|
||||
public static ValueTuple addTuple(ValueTuple result,ValueTuple message){
|
||||
|
||||
if (result == null){
|
||||
|
||||
result = new ValueTuple();
|
||||
}
|
||||
|
||||
result.setCommon_s2c_byte_num(result.getCommon_s2c_byte_num() + message.getCommon_s2c_byte_num());
|
||||
result.setCommon_c2s_byte_num(result.getCommon_c2s_byte_num() + message.getCommon_c2s_byte_num());
|
||||
result.setCommon_s2c_pkt_num(result.getCommon_s2c_pkt_num() + message.getCommon_s2c_pkt_num());
|
||||
result.setCommon_c2s_pkt_num(result.getCommon_c2s_pkt_num() + message.getCommon_c2s_pkt_num());
|
||||
result.setCommon_sessions(result.getCommon_sessions() + message.getCommon_sessions());
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public static String getKey(String message){
|
||||
Object conn = JSONObject.parseObject(message, mapObject.getClass());
|
||||
//TODO key
|
||||
Object common_policy_id = JsonParseUtil.getValue(conn, "common_policy_id");
|
||||
Object common_action = JsonParseUtil.getValue(conn, "common_action");
|
||||
Object common_sub_action = JsonParseUtil.getValue(conn, "common_sub_action");
|
||||
Object common_client_ip = JsonParseUtil.getValue(conn, "common_client_ip");
|
||||
Object common_client_location = JsonParseUtil.getValue(conn, "common_client_location");
|
||||
Object common_sled_ip = JsonParseUtil.getValue(conn, "common_sled_ip");
|
||||
Object common_device_id = JsonParseUtil.getValue(conn, "common_device_id");
|
||||
Object common_subscriber_id = JsonParseUtil.getValue(conn, "common_subscriber_id");
|
||||
Object common_server_ip = JsonParseUtil.getValue(conn, "common_server_ip");
|
||||
Object common_server_location = JsonParseUtil.getValue(conn, "common_server_location");
|
||||
Object common_server_port = JsonParseUtil.getValue(conn, "common_server_port");
|
||||
Object common_l4_protocol = JsonParseUtil.getValue(conn, "common_l4_protocol");
|
||||
Object http_domain = JsonParseUtil.getValue(conn, "http_domain");
|
||||
Object ssl_sni = JsonParseUtil.getValue(conn, "ssl_sni");
|
||||
|
||||
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append(common_policy_id).append("_")
|
||||
.append(common_action).append("_")
|
||||
.append(common_sub_action).append("_")
|
||||
.append(common_client_ip).append("_")
|
||||
.append(common_client_location).append("_")
|
||||
.append(common_sled_ip).append("_")
|
||||
.append(common_device_id).append("_")
|
||||
.append(common_subscriber_id).append("_")
|
||||
.append(common_server_ip).append("_")
|
||||
.append(common_server_location).append("_")
|
||||
.append(common_server_port).append("_")
|
||||
.append(common_l4_protocol).append("_")
|
||||
.append(http_domain).append("_")
|
||||
.append(ssl_sni);
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
|
||||
/* public static ValueTuple getValueTuple(String message){
|
||||
|
||||
conn = JSONObject.parseObject(message, mapObject.getClass());
|
||||
Object common_sessions = JsonParseUtil.getValue(conn, "common_sessions");
|
||||
|
||||
if (StringUtil.isEmpty(common_sessions)) {
|
||||
common_sessions = 0;
|
||||
}
|
||||
Object common_c2s_pkt_num = JsonParseUtil.getValue(conn, "common_c2s_pkt_num");
|
||||
Object common_s2c_pkt_num = JsonParseUtil.getValue(conn, "common_s2c_pkt_num");
|
||||
Object common_c2s_byte_num = JsonParseUtil.getValue(conn, "common_c2s_byte_num");
|
||||
Object common_s2c_byte_num = JsonParseUtil.getValue(conn, "common_s2c_byte_num");
|
||||
|
||||
valueTuple.setCommon_sessions((int) common_sessions);
|
||||
valueTuple.setCommon_c2s_pkt_num((int) common_c2s_pkt_num);
|
||||
valueTuple.setCommon_s2c_pkt_num((int) common_s2c_pkt_num);
|
||||
valueTuple.setCommon_c2s_byte_num((int) common_c2s_byte_num);
|
||||
valueTuple.setCommon_s2c_byte_num((int) common_s2c_byte_num);
|
||||
|
||||
return valueTuple;
|
||||
|
||||
}*/
|
||||
}
|
||||
55
src/java/cn/ac/iie/utils/http/HttpClientUtil.java
Normal file
55
src/java/cn/ac/iie/utils/http/HttpClientUtil.java
Normal file
@@ -0,0 +1,55 @@
|
||||
package cn.ac.iie.utils.http;
|
||||
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
|
||||
/**
|
||||
* 获取网关schema的工具类
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class HttpClientUtil {
|
||||
|
||||
/**
|
||||
* 请求网关获取schema
|
||||
* @param http 网关url
|
||||
* @return schema
|
||||
*/
|
||||
public static String requestByGetMethod(String http) {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
StringBuilder entityStringBuilder = null;
|
||||
try {
|
||||
HttpGet get = new HttpGet(http);
|
||||
try (CloseableHttpResponse httpResponse = httpClient.execute(get)) {
|
||||
HttpEntity entity = httpResponse.getEntity();
|
||||
entityStringBuilder = new StringBuilder();
|
||||
if (null != entity) {
|
||||
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent(), "UTF-8"), 8 * 1024);
|
||||
String line = null;
|
||||
while ((line = bufferedReader.readLine()) != null) {
|
||||
entityStringBuilder.append(line);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
try {
|
||||
if (httpClient != null) {
|
||||
httpClient.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return entityStringBuilder.toString();
|
||||
}
|
||||
|
||||
}
|
||||
204
src/java/cn/ac/iie/utils/json/JsonParseUtil.java
Normal file
204
src/java/cn/ac/iie/utils/json/JsonParseUtil.java
Normal file
@@ -0,0 +1,204 @@
|
||||
package cn.ac.iie.utils.json;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import cn.ac.iie.utils.http.HttpClientUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zdjizhi.utils.StringUtil;
|
||||
import net.sf.cglib.beans.BeanGenerator;
|
||||
import net.sf.cglib.beans.BeanMap;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 使用FastJson解析json的工具类
|
||||
*
|
||||
* @author qidaijie
|
||||
*/
|
||||
public class JsonParseUtil {
|
||||
|
||||
/**
|
||||
* 模式匹配,给定一个类型字符串返回一个类类型
|
||||
*
|
||||
* @param type 类型
|
||||
* @return 类类型
|
||||
*/
|
||||
|
||||
private static Class getClassName(String type) {
|
||||
Class clazz;
|
||||
|
||||
switch (type) {
|
||||
case "int":
|
||||
clazz = Integer.class;
|
||||
break;
|
||||
case "String":
|
||||
clazz = String.class;
|
||||
break;
|
||||
case "long":
|
||||
clazz = long.class;
|
||||
break;
|
||||
case "Integer":
|
||||
clazz = Integer.class;
|
||||
break;
|
||||
case "double":
|
||||
clazz = double.class;
|
||||
break;
|
||||
case "float":
|
||||
clazz = float.class;
|
||||
break;
|
||||
case "char":
|
||||
clazz = char.class;
|
||||
break;
|
||||
case "byte":
|
||||
clazz = byte.class;
|
||||
break;
|
||||
case "boolean":
|
||||
clazz = boolean.class;
|
||||
break;
|
||||
case "short":
|
||||
clazz = short.class;
|
||||
break;
|
||||
default:
|
||||
clazz = String.class;
|
||||
}
|
||||
return clazz;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据反射生成对象的方法
|
||||
*
|
||||
* @param properties 反射类用的map
|
||||
* @return 生成的Object类型的对象
|
||||
*/
|
||||
public static Object generateObject(Map properties) {
|
||||
BeanGenerator generator = new BeanGenerator();
|
||||
Set keySet = properties.keySet();
|
||||
for (Object aKeySet : keySet) {
|
||||
String key = (String) aKeySet;
|
||||
generator.addProperty(key, (Class) properties.get(key));
|
||||
}
|
||||
return generator.create();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取属性值的方法
|
||||
*
|
||||
* @param obj 对象
|
||||
* @param property key
|
||||
* @return 属性的值
|
||||
*/
|
||||
public static Object getValue(Object obj, String property) {
|
||||
BeanMap beanMap = BeanMap.create(obj);
|
||||
return beanMap.get(property);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新属性值的方法
|
||||
*
|
||||
* @param obj 对象
|
||||
* @param property 更新的key
|
||||
* @param value 更新的值
|
||||
*/
|
||||
public static void setValue(Object obj, String property, Object value) {
|
||||
BeanMap beanMap = BeanMap.create(obj);
|
||||
beanMap.put(property, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过获取String类型的网关schema链接来获取map,用于生成一个Object类型的对象
|
||||
*
|
||||
* @param http 网关schema地址
|
||||
* @return 用于反射生成schema类型的对象的一个map集合
|
||||
*/
|
||||
public static HashMap<String, Class> getMapFromHttp(String http) {
|
||||
HashMap<String, Class> map = new HashMap<>();
|
||||
|
||||
String schema = HttpClientUtil.requestByGetMethod(http);
|
||||
Object data = JSON.parseObject(schema).get("data");
|
||||
|
||||
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
||||
JSONObject schemaJson = JSON.parseObject(data.toString());
|
||||
JSONArray fields = (JSONArray) schemaJson.get("fields");
|
||||
|
||||
for (Object field : fields) {
|
||||
String name = JSON.parseObject(field.toString()).get("name").toString();
|
||||
String type = JSON.parseObject(field.toString()).get("type").toString();
|
||||
//组合用来生成实体类的map
|
||||
map.put(name, getClassName(type));
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 根据http链接获取schema,解析之后返回一个任务列表 (useList toList funcList paramlist)
|
||||
*
|
||||
* @param http 网关url
|
||||
* @return 任务列表
|
||||
*/
|
||||
public static ArrayList<String[]> getJobListFromHttp(String http) {
|
||||
ArrayList<String[]> list = new ArrayList<>();
|
||||
|
||||
String schema = HttpClientUtil.requestByGetMethod(http);
|
||||
//解析data
|
||||
Object data = JSON.parseObject(schema).get("data");
|
||||
|
||||
//获取fields,并转化为数组,数组的每个元素都是一个name doc type
|
||||
JSONObject schemaJson = JSON.parseObject(data.toString());
|
||||
JSONArray fields = (JSONArray) schemaJson.get("fields");
|
||||
|
||||
for (Object field : fields) {
|
||||
|
||||
if (JSON.parseObject(field.toString()).containsKey("doc")) {
|
||||
Object doc = JSON.parseObject(field.toString()).get("doc");
|
||||
|
||||
if (JSON.parseObject(doc.toString()).containsKey("format")) {
|
||||
String name = JSON.parseObject(field.toString()).get("name").toString();
|
||||
Object format = JSON.parseObject(doc.toString()).get("format");
|
||||
JSONObject formatObject = JSON.parseObject(format.toString());
|
||||
|
||||
String functions = formatObject.get("functions").toString();
|
||||
String appendTo = null;
|
||||
String params = null;
|
||||
|
||||
if (formatObject.containsKey("appendTo")) {
|
||||
appendTo = formatObject.get("appendTo").toString();
|
||||
}
|
||||
|
||||
if (formatObject.containsKey("param")) {
|
||||
params = formatObject.get("param").toString();
|
||||
}
|
||||
|
||||
|
||||
if (StringUtil.isNotBlank(appendTo) && StringUtil.isBlank(params)) {
|
||||
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
|
||||
for (int i = 0; i < functionArray.length; i++) {
|
||||
list.add(new String[]{name, appendToArray[i], functionArray[i], null});
|
||||
}
|
||||
|
||||
} else if (StringUtil.isNotBlank(appendTo) && StringUtil.isNotBlank(params)) {
|
||||
String[] functionArray = functions.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
String[] appendToArray = appendTo.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
String[] paramArray = params.split(FlowWriteConfig.FORMAT_SPLITTER);
|
||||
|
||||
for (int i = 0; i < functionArray.length; i++) {
|
||||
list.add(new String[]{name, appendToArray[i], functionArray[i], paramArray[i]});
|
||||
|
||||
}
|
||||
} else {
|
||||
list.add(new String[]{name, name, functions, params});
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
80
src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java
Normal file
80
src/java/cn/ac/iie/utils/kafka/KafkaLogNtc.java
Normal file
@@ -0,0 +1,80 @@
|
||||
package cn.ac.iie.utils.kafka;
|
||||
|
||||
import cn.ac.iie.common.FlowWriteConfig;
|
||||
import org.apache.kafka.clients.producer.*;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* NTC系统配置产生日志写入数据中心类
|
||||
*
|
||||
* @author Administrator
|
||||
* @create 2018-08-13 15:11
|
||||
*/
|
||||
|
||||
public class KafkaLogNtc {
|
||||
private static Logger logger = Logger.getLogger(KafkaLogNtc.class);
|
||||
|
||||
/**
|
||||
* kafka生产者,用于向kafka中发送消息
|
||||
*/
|
||||
private static Producer<String, String> kafkaProducer;
|
||||
|
||||
/**
|
||||
* kafka生产者适配器(单例),用来代理kafka生产者发送消息
|
||||
*/
|
||||
private static KafkaLogNtc kafkaLogNtc;
|
||||
|
||||
private KafkaLogNtc() {
|
||||
initKafkaProducer();
|
||||
}
|
||||
|
||||
public static KafkaLogNtc getInstance() {
|
||||
if (kafkaLogNtc == null) {
|
||||
kafkaLogNtc = new KafkaLogNtc();
|
||||
}
|
||||
return kafkaLogNtc;
|
||||
}
|
||||
|
||||
|
||||
public void sendMessage(List<String> list) {
|
||||
final int[] errorSum = {0};
|
||||
for (String value : list) {
|
||||
kafkaProducer.send(new ProducerRecord<>(FlowWriteConfig.RESULTS_OUTPUT_TOPIC, value), new Callback() {
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
if (exception != null) {
|
||||
logger.error("写入" + FlowWriteConfig.RESULTS_OUTPUT_TOPIC + "出现异常", exception);
|
||||
errorSum[0]++;
|
||||
}
|
||||
}
|
||||
});
|
||||
if (errorSum[0] > FlowWriteConfig.MAX_FAILURE_NUM) {
|
||||
list.clear();
|
||||
}
|
||||
}
|
||||
kafkaProducer.flush();
|
||||
logger.debug("Log sent to National Center successfully!!!!!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据kafka生产者配置信息初始化kafka消息生产者,只初始化一次
|
||||
*/
|
||||
private void initKafkaProducer() {
|
||||
Properties properties = new Properties();
|
||||
properties.put("bootstrap.servers", FlowWriteConfig.BOOTSTRAP_SERVERS);
|
||||
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
properties.put("acks", "1");
|
||||
properties.put("linger.ms", "2");
|
||||
properties.put("request.timeout.ms", 30000);
|
||||
properties.put("batch.size", 262144);
|
||||
properties.put("buffer.memory", 33554432);
|
||||
properties.put("compression.type", FlowWriteConfig.KAFKA_COMPRESSION_TYPE);
|
||||
kafkaProducer = new KafkaProducer<>(properties);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
65
src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java
Normal file
65
src/java/cn/ac/iie/utils/system/FlowWriteConfigurations.java
Normal file
@@ -0,0 +1,65 @@
|
||||
package cn.ac.iie.utils.system;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
/**
|
||||
* @author Administrator
|
||||
*/
|
||||
|
||||
public final class FlowWriteConfigurations {
|
||||
|
||||
// private static Properties propCommon = new Properties();
|
||||
private static Properties propService = new Properties();
|
||||
|
||||
|
||||
public static String getStringProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return propService.getProperty(key);
|
||||
// } else if (type == 1) {
|
||||
// return propCommon.getProperty(key);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Integer getIntProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return Integer.parseInt(propService.getProperty(key));
|
||||
// } else if (type == 1) {
|
||||
// return Integer.parseInt(propCommon.getProperty(key));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Long getLongProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return Long.parseLong(propService.getProperty(key));
|
||||
// } else if (type == 1) {
|
||||
// return Long.parseLong(propCommon.getProperty(key));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Boolean getBooleanProperty(Integer type, String key) {
|
||||
if (type == 0) {
|
||||
return "true".equals(propService.getProperty(key).toLowerCase().trim());
|
||||
// } else if (type == 1) {
|
||||
// return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
try {
|
||||
propService.load(FlowWriteConfigurations.class.getClassLoader().getResourceAsStream("service_flow_config.properties"));
|
||||
} catch (Exception e) {
|
||||
// propCommon = null;
|
||||
propService = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
23
src/java/cn/ac/iie/utils/system/TupleUtils.java
Normal file
23
src/java/cn/ac/iie/utils/system/TupleUtils.java
Normal file
@@ -0,0 +1,23 @@
|
||||
package cn.ac.iie.utils.system;
|
||||
|
||||
import org.apache.storm.Constants;
|
||||
import org.apache.storm.tuple.Tuple;
|
||||
|
||||
/**
|
||||
* 用于检测是否是系统发送的tuple
|
||||
*
|
||||
* @author Administrator
|
||||
*/
|
||||
public final class TupleUtils {
|
||||
/**
|
||||
* 判断是否系统自动发送的Tuple
|
||||
*
|
||||
* @param tuple 元组
|
||||
* @return true or false
|
||||
*/
|
||||
public static boolean isTick(Tuple tuple) {
|
||||
return tuple != null
|
||||
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
|
||||
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
|
||||
}
|
||||
}
|
||||
20
src/java/cn/ac/iie/utils/tuple/ThreeTuple.java
Normal file
20
src/java/cn/ac/iie/utils/tuple/ThreeTuple.java
Normal file
@@ -0,0 +1,20 @@
|
||||
package cn.ac.iie.utils.tuple;
|
||||
|
||||
public class ThreeTuple<A,B,C> {
|
||||
|
||||
public String first;
|
||||
|
||||
public long second;
|
||||
|
||||
public int third;
|
||||
|
||||
public ThreeTuple(String name,long time, int sum){
|
||||
first = name;
|
||||
second = time;
|
||||
third = sum;
|
||||
}
|
||||
|
||||
public String toString(){
|
||||
return "[" + first + ", " + second + ", " + third + "]";
|
||||
}
|
||||
}
|
||||
195
src/java/cn/ac/iie/utils/tuple/TupleAggregate.java
Normal file
195
src/java/cn/ac/iie/utils/tuple/TupleAggregate.java
Normal file
@@ -0,0 +1,195 @@
|
||||
package cn.ac.iie.utils.tuple;
|
||||
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
public class TupleAggregate {
|
||||
|
||||
|
||||
private static TwoTuple<String, Integer> a = new TwoTuple<>("192.168.40.101", 1);
|
||||
private static TwoTuple<String, Integer> b = new TwoTuple<>("192.168.40.101", 1);
|
||||
|
||||
private static ThreeTuple<String, Long, Integer> a1 = new ThreeTuple<>("lxk", 30L, 2);
|
||||
private static ThreeTuple<String, Long, Integer> b1 = new ThreeTuple<>("lxk", 20L, 2);
|
||||
|
||||
|
||||
public static TwoTuple<TwoTuple, ThreeTuple> parseJsonToTuple(String json) {
|
||||
|
||||
CONN conn = JSON.parseObject(json, CONN.class);
|
||||
//二元组 key
|
||||
TwoTuple key = new TwoTuple<>(conn.getCip(), conn.getNum());
|
||||
//三元组 value
|
||||
ThreeTuple value = new ThreeTuple<>(conn.getName(), conn.getTime(), conn.getSum());
|
||||
|
||||
|
||||
return new TwoTuple<>(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 聚合两个三元组
|
||||
*
|
||||
* @param tuple1
|
||||
* @param tuple2
|
||||
* @return
|
||||
*/
|
||||
public static ThreeTuple addTuple(ThreeTuple tuple1, ThreeTuple tuple2) {
|
||||
|
||||
|
||||
tuple1.second += tuple2.second;
|
||||
tuple1.third += tuple2.third;
|
||||
|
||||
return tuple1;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 将一条新数据累加到HashMap中
|
||||
*
|
||||
* @param map
|
||||
* @param json
|
||||
* @return map1
|
||||
*/
|
||||
public static HashMap aggregate(HashMap<TwoTuple, ThreeTuple> map, String json) {
|
||||
|
||||
|
||||
//TODO json解析成对象,取出key聚合组成tuple 与HashMap中具有相同key的聚合
|
||||
|
||||
/**
|
||||
* 还存在的问题
|
||||
* 1. key是对象 就算值一样也不会相同 (重写HashCode和equal方法)
|
||||
*
|
||||
* 2. 拿key去map中取值,如果为null,后面聚合会报错 空指针异常
|
||||
*/
|
||||
|
||||
//一条日志 ==》 两元组
|
||||
TwoTuple<TwoTuple, ThreeTuple> tuple = parseJsonToTuple(json);
|
||||
//取出key
|
||||
TwoTuple key = tuple.first;
|
||||
//内存中的HashMap中获取具有相同key的value
|
||||
ThreeTuple value = map.get(key);
|
||||
//将两个value聚合,赋值给value
|
||||
value = addTuple(value, tuple.second);
|
||||
//聚合的结果放回到内存中的HashMap
|
||||
map.put(key, value);
|
||||
|
||||
return map;
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
// HashMap<TwoTuple, ThreeTuple> map1 = new HashMap<>();
|
||||
// a1 = addTuple(a1, b1);
|
||||
// System.out.println("聚合成功:" + a1);
|
||||
|
||||
|
||||
CONN conn1 = new CONN();
|
||||
CONN conn2 = new CONN();
|
||||
|
||||
conn1.setCip("192.168.40.101");
|
||||
conn2.setCip("192.168.40.101");
|
||||
|
||||
conn1.setNum(1);
|
||||
conn2.setNum(1);
|
||||
|
||||
conn1.setName("lxk");
|
||||
conn2.setName("lxk");
|
||||
|
||||
conn1.setTime(100L);
|
||||
conn2.setTime(200L);
|
||||
|
||||
conn1.setSum(10);
|
||||
conn2.setSum(20);
|
||||
|
||||
System.out.println("conn1" + conn1);
|
||||
System.out.println("conn2" + conn2);
|
||||
|
||||
String json1 = JSON.toJSONString(conn1);
|
||||
String json2 = JSON.toJSONString(conn2);
|
||||
|
||||
|
||||
HashMap map = new HashMap<>();
|
||||
|
||||
map.put(a, a1);
|
||||
|
||||
System.out.println("开始的map:" + map);
|
||||
|
||||
map = aggregate(map, json1);
|
||||
|
||||
System.out.println("后来的map:" + map);
|
||||
|
||||
System.out.println("a的HashCode: " + a.hashCode());
|
||||
System.out.println("b的HashCode: " + b.hashCode());
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
class CONN {
|
||||
|
||||
//二元组使用的 key
|
||||
private String cip;
|
||||
private int num;
|
||||
|
||||
//三元组使用的 value
|
||||
private String name;
|
||||
private long time;
|
||||
private int sum;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public long getTime() {
|
||||
return time;
|
||||
}
|
||||
|
||||
public void setTime(long time) {
|
||||
this.time = time;
|
||||
}
|
||||
|
||||
public int getSum() {
|
||||
return sum;
|
||||
}
|
||||
|
||||
public void setSum(int sum) {
|
||||
this.sum = sum;
|
||||
}
|
||||
|
||||
public String getCip() {
|
||||
return cip;
|
||||
}
|
||||
|
||||
public void setCip(String cip) {
|
||||
this.cip = cip;
|
||||
}
|
||||
|
||||
public int getNum() {
|
||||
return num;
|
||||
}
|
||||
|
||||
public void setNum(int num) {
|
||||
this.num = num;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CONN{" +
|
||||
"cip='" + cip + '\'' +
|
||||
", num=" + num +
|
||||
", name='" + name + '\'' +
|
||||
", time=" + time +
|
||||
", sum=" + sum +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
32
src/java/cn/ac/iie/utils/tuple/TwoTuple.java
Normal file
32
src/java/cn/ac/iie/utils/tuple/TwoTuple.java
Normal file
@@ -0,0 +1,32 @@
|
||||
package cn.ac.iie.utils.tuple;
|
||||
|
||||
public class TwoTuple<A, B> {
|
||||
|
||||
public A first;
|
||||
|
||||
public B second;
|
||||
|
||||
public TwoTuple(){};
|
||||
public TwoTuple(A cip, B num) {
|
||||
first = cip;
|
||||
second = num;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "[" + first + ", " + second + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
|
||||
int result = (first != null && second != null) ? (first.toString() + second).hashCode() : 0;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
||||
return this.first.toString().equals(((TwoTuple) o).first.toString()) && this.first.toString().equals(((TwoTuple) o).first.toString());
|
||||
}
|
||||
}
|
||||
23
src/java/log4j.properties
Normal file
23
src/java/log4j.properties
Normal file
@@ -0,0 +1,23 @@
|
||||
#Log4j
|
||||
log4j.rootLogger=info,console,file
|
||||
# 控制台日志设置
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.Threshold=info
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
|
||||
|
||||
# 文件日志设置
|
||||
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
|
||||
log4j.appender.file.Threshold=error
|
||||
log4j.appender.file.encoding=UTF-8
|
||||
log4j.appender.file.Append=true
|
||||
#路径请用相对路径,做好相关测试输出到应用目下
|
||||
log4j.appender.file.file=storm-topology.log
|
||||
log4j.appender.file.DatePattern='.'yyyy-MM-dd
|
||||
log4j.appender.file.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
|
||||
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
|
||||
#MyBatis 配置,com.nis.web.dao是mybatis接口所在包
|
||||
log4j.logger.com.nis.web.dao=debug
|
||||
#bonecp数据源配置
|
||||
log4j.category.com.jolbox=debug,console
|
||||
Reference in New Issue
Block a user