radius 写入hbase初始版本

This commit is contained in:
qidaijie
2019-11-08 11:46:29 +08:00
commit 027908d6cb
17 changed files with 1506 additions and 0 deletions

287
pom.xml Normal file
View File

@@ -0,0 +1,287 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>log-address-hbase</groupId>
<artifactId>log-address-hbase</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>log-address-hbase</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>nexus</id>
<name>Team Nexus Repository</name>
<url>http://192.168.10.125:8099/content/groups/public</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.ac.iie.topology.LogAddressRedisTopology</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>properties</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>log4j.properties</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>1.0.0</kafka.version>
<storm.version>1.0.2</storm.version>
<hbase.version>1.4.9</hbase.version>
<hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.nis</groupId>
<artifactId>nis-core</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>info.monitorenter</groupId>
<artifactId>cpdetector</artifactId>
<version>1.0.7</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>com.zdjizhi</groupId>
<artifactId>galaxy</artifactId>
<version>1.0.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,41 @@
#管理kafka地址
bootstrap.servers=192.168.40.119:9092,192.168.40.122:9092,192.168.40.123:9092
#从kafka哪里开始读earliest/latest
auto.offset.reset=latest
#hbase zookeeper地址
hbase.zookeeper.servers=192.168.40.203:2186
#hbase tablename
hbase.table.name=subcriber_info
#tick时钟频率
topology.tick.tuple.freq.secs=50
topology.config.max.spout.pending=500000
topology.num.acks=0
#redis过期时间
expiration.time=604800
#用于过滤对准用户名
check.ip.scope=10,100,192
#kafka broker下的topic名称
kafka.topic=RADIUS-LOG
#kafka消费group id
group.id=account-to-hbase
#storm topology workers
topology.workers=1
#storm spout parallelism
spout.parallelism=10
#storm bolt parallelism
format.bolt.parallelism=10
#1=单机逐条写入 2=集群逐条写入 3=集群批量写入
redis.model=1

64
properties/core-site.xml Normal file
View File

@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns1</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/opt/hadoop/tmp</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131702</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.logfile.size</name>
<value>10000000</value>
<description>The max size of each log file</description>
</property>
<property>
<name>hadoop.logfile.count</name>
<value>1</value>
<description>The max number of log files</description>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>192.168.40.202:2181,192.168.40.203:2181,192.168.40.206:2181</value>
</property>
<property>
<name>io.compression.codecs</name>
<value>com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>

81
properties/hbase-site.xml Normal file
View File

@@ -0,0 +1,81 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://ns1/hbase/hbase-1.4.9</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.40.202,192.168.40.203,192.168.40.206</value>
</property>
<property>
<name>hbase.master.info.port</name>
<value>60010</value>
</property>
<!-- 开启启schema支持 对应hbase的namespace -->
<property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>
<property>
<name>hbase.client.keyvalue.maxsize</name>
<value>99428800</value>
</property>
<property>
<name>hbase.server.keyvalue.maxsize</name>
<value>99428800</value>
</property>
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>1800000</value>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>1200000</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>1200000</value>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>1000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>1200000</value>
</property>
</configuration>

116
properties/hdfs-site.xml Normal file
View File

@@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/home/bigdata/hadoop/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/home/bigdata/hadoop/dfs/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>192.168.40.202:9001</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址nn1所在地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn1</name>
<value>192.168.40.202:9000</value>
</property>
<!-- nn1的http通信地址外部访问地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn1</name>
<value>192.168.40.202:50070</value>
</property>
<!-- nn2的RPC通信地址nn2所在地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn2</name>
<value>192.168.40.203:9000</value>
</property>
<!-- nn2的http通信地址外部访问地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn2</name>
<value>192.168.40.203:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode日志上的存放位置(一般和zookeeper部署在一起) -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://192.168.40.203:8485;192.168.40.206:8485;192.168.40.202:8485/ns1</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/home/bigdata/hadoop/journal</value>
</property>
<!--客户端通过代理访问namenode访问文件系统HDFS 客户端与Active 节点通信的Java 类使用其确定Active 节点是否活跃 -->
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!--这是配置自动切换的方法,有多种使用方法,具体可以看官网,在文末会给地址,这里是远程登录杀死的方法 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 这个是使用sshfence隔离机制时才需要配置ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间这个属性同上如果你是用脚本的方法切换这个应该是可以不配置的 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
<!-- 这个是开启自动故障转移,如果你没有自动故障转移,这个可以先不配 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>

View File

@@ -0,0 +1,159 @@
package cn.ac.iie.bolt;
import cn.ac.iie.common.AddressConfig;
import cn.ac.iie.utils.TupleUtils;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.IOException;
import java.util.*;
/**
* @author qidaijie
*/
public class SubcribeIdBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(SubcribeIdBolt.class);
private static Map<String, String> subIdMap;
private List<Put> putList;
private static Connection connection;
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
subIdMap = new HashMap<>(16);
putList = new ArrayList<>();
getAll();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(tuple)) {
insertData(putList);
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
JSONObject jsonObject = JSONObject.parseObject(message);
String ip = jsonObject.getString("framed_ip");
String account = jsonObject.getString("account");
dataValidation(ip, account, putList);
if (putList.size() == AddressConfig.LIST_SIZE_MAX) {
insertData(putList);
}
}
}
} catch (Exception e) {
logger.error("Radius写入Redis出现异常", e);
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
/**
* 获取所有的 key value
*/
private static void getAll() {
try {
Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 写入数据到hbase
*/
private static void insertData(List<Put> putList) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
table.put(putList);
putList.clear();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 验证数据并与内存中的对比
*
* @param ip framed_ip
* @param account account
*/
private static void dataValidation(String ip, String account, List<Put> putList) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
// String s = ip.split("\\.")[0];
// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
if (subIdMap.containsKey(ip)) {
if (!subIdMap.get(ip).equals(account)) {
Put put = new Put(ip.getBytes());
put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes());
putList.add(put);
subIdMap.put(ip, account);
}
} else {
Put put = new Put(ip.getBytes());
put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes());
putList.add(put);
subIdMap.put(ip, account);
}
// }
}
}
}

View File

@@ -0,0 +1,109 @@
package cn.ac.iie.bolt;
import cn.ac.iie.common.AddressConfig;
import cn.ac.iie.utils.TupleUtils;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author qidaijie
*/
public class ToHBaseBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(ToHBaseBolt.class);
private static List<Put> putList;
private static Connection connection;
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
putList = new ArrayList<>();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(tuple)) {
insertData();
putList.clear();
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
String[] split = message.split("-");
Put put = new Put(split[0].getBytes());
put.addColumn("subscribe_id".getBytes(), "account".getBytes(), split[1].getBytes());
putList.add(put);
if (putList.size() == AddressConfig.LIST_SIZE_MAX) {
insertData();
putList.clear();
}
}
}
} catch (Exception e) {
logger.error("Radius写入Redis出现异常", e);
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
/**
* 写入数据到hbase
*/
private static void insertData() {
Table table = null;
try {
table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
table.put(putList);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

View File

@@ -0,0 +1,38 @@
package cn.ac.iie.common;
import cn.ac.iie.utils.AddressConfigurations;
import java.io.Serializable;
/**
* @author qidaijie
*/
public class AddressConfig implements Serializable {
private static final long serialVersionUID = -8326385159484059324L;
public static final String SEGMENTATION = ",";
public static final int LIST_SIZE_MAX = 5000;
/***
* kafka and system
*/
public static final String BOOTSTRAP_SERVERS = AddressConfigurations.getStringProperty(0, "bootstrap.servers");
public static final Integer SPOUT_PARALLELISM = AddressConfigurations.getIntProperty(0, "spout.parallelism");
public static final Integer FORMAT_BOLT_PARALLELISM = AddressConfigurations.getIntProperty(0, "format.bolt.parallelism");
public static final String GROUP_ID = AddressConfigurations.getStringProperty(0, "group.id");
public static final String KAFKA_TOPIC = AddressConfigurations.getStringProperty(0, "kafka.topic");
public static final String AUTO_OFFSET_RESET = AddressConfigurations.getStringProperty(0, "auto.offset.reset");
public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = AddressConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = AddressConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
public static final Integer TOPOLOGY_NUM_ACKS = AddressConfigurations.getIntProperty(0, "topology.num.acks");
public static final Integer TOPOLOGY_WORKERS = AddressConfigurations.getIntProperty(0, "topology.workers");
public static final Integer EXPIRATION_TIME = AddressConfigurations.getIntProperty(0, "expiration.time");
public static final Integer REDIS_MODEL = AddressConfigurations.getIntProperty(0, "redis.model");
public static final String CHECK_IP_SCOPE = AddressConfigurations.getStringProperty(0, "check.ip.scope");
public static final String HBASE_ZOOKEEPER_SERVERS = AddressConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
public static final String HBASE_TABLE_NAME = AddressConfigurations.getStringProperty(0, "hbase.table.name");
}

View File

@@ -0,0 +1,78 @@
package cn.ac.iie.spout;
import cn.ac.iie.common.AddressConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
/**
* @author qidaijie
*/
public class CustomizedKafkaSpout extends BaseRichSpout {
private static final long serialVersionUID = 2934528972182398950L;
private KafkaConsumer<String, String> consumer;
private SpoutOutputCollector collector = null;
private TopologyContext context = null;
private final static Logger logger = Logger.getLogger(CustomizedKafkaSpout.class);
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", AddressConfig.BOOTSTRAP_SERVERS);
props.put("group.id", AddressConfig.GROUP_ID);
props.put("auto.offset.reset", AddressConfig.AUTO_OFFSET_RESET);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
this.consumer.subscribe(Arrays.asList(AddressConfig.KAFKA_TOPIC));
}
@Override
public void close() {
consumer.close();
}
@Override
public void nextTuple() {
try {
// TODO Auto-generated method stub
ConsumerRecords<String, String> records = consumer.poll(10000L);
Thread.sleep(300);
for (ConsumerRecord<String, String> record : records) {
this.collector.emit(new Values(record.value()));
}
} catch (Exception e) {
logger.error("kfaka-spout 发送数据出现异常" + e);
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("source"));
}
}

View File

@@ -0,0 +1,83 @@
package cn.ac.iie.topology;
import cn.ac.iie.bolt.SubcribeIdBolt;
import cn.ac.iie.bolt.ToHBaseBolt;
import cn.ac.iie.common.AddressConfig;
import cn.ac.iie.spout.CustomizedKafkaSpout;
import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qidaijie
*/
public class LogAddressRedisTopology {
private static final Logger logger = LoggerFactory.getLogger(LogAddressRedisTopology.class);
private final String topologyName;
private final Config topologyConfig;
private TopologyBuilder builder;
public LogAddressRedisTopology() {
this(LogAddressRedisTopology.class.getSimpleName());
}
public LogAddressRedisTopology(String topologyName) {
this.topologyName = topologyName;
topologyConfig = createTopologConfig();
}
private Config createTopologConfig() {
Config conf = new Config();
conf.setDebug(false);
conf.setMessageTimeoutSecs(120);
conf.setTopologyWorkerMaxHeapSize(500);
conf.setMaxSpoutPending(AddressConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
if (AddressConfig.TOPOLOGY_NUM_ACKS == 0) {
conf.setNumAckers(0);
}
return conf;
}
public void runLocally() throws InterruptedException {
topologyConfig.setMaxTaskParallelism(1);
StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
}
public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
topologyConfig.setNumWorkers(AddressConfig.TOPOLOGY_WORKERS);
topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);//设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
}
private void buildTopology() {
builder = new TopologyBuilder();
builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), AddressConfig.SPOUT_PARALLELISM);
builder.setBolt("SubcribeIdBolt", new SubcribeIdBolt(), AddressConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
// builder.setBolt("ToHBaseBolt", new ToHBaseBolt(), 1).localOrShuffleGrouping("SubcribeIdBolt");
}
public static void main(String[] args) throws Exception {
LogAddressRedisTopology csst = null;
boolean runLocally = true;
if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
runLocally = false;
csst = new LogAddressRedisTopology(args[0]);
} else {
csst = new LogAddressRedisTopology();
}
csst.buildTopology();
if (runLocally) {
logger.info("执行本地模式...");
csst.runLocally();
} else {
logger.info("执行远程部署模式...");
csst.runRemotely();
}
}
}

View File

@@ -0,0 +1,32 @@
package cn.ac.iie.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
public final class StormRunner{
private static final int MILLS_IN_SEC = 1000;
private StormRunner() {}
public static void runTopologyLocally(TopologyBuilder builder, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, conf, builder.createTopology());
Thread.sleep((long) runtimeInSeconds * MILLS_IN_SEC);
localCluster.shutdown();
}
public static void runTopologyRemotely(TopologyBuilder builder, String topologyName, Config conf ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}

View File

@@ -0,0 +1,62 @@
package cn.ac.iie.utils;
import java.util.Properties;
public final class AddressConfigurations {
private static Properties propCommon = new Properties();
// private static Properties propService = new Properties();
public static String getStringProperty(Integer type, String key) {
if (type == 0) {
return propCommon.getProperty(key);
// } else if (type == 1) {
// return propService.getProperty(key);
} else {
return null;
}
}
public static Integer getIntProperty(Integer type, String key) {
if (type == 0) {
return Integer.parseInt(propCommon.getProperty(key));
// } else if (type == 1) {
// return Integer.parseInt(propService.getProperty(key));
} else {
return null;
}
}
public static Long getLongProperty(Integer type, String key) {
if (type == 0) {
return Long.parseLong(propCommon.getProperty(key));
// } else if (type == 1) {
// return Long.parseLong(propService.getProperty(key));
} else {
return null;
}
}
public static Boolean getBooleanProperty(Integer type, String key) {
if (type == 0) {
return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
// } else if (type == 1) {
// return propService.getProperty(key).toLowerCase().trim().equals("true");
} else {
return null;
}
}
static {
try {
propCommon.load(AddressConfigurations.class.getClassLoader().getResourceAsStream("address_routine.properties"));
} catch (Exception e) {
propCommon = null;
System.err.println("配置加载失败");
}
}
}

View File

@@ -0,0 +1,21 @@
package cn.ac.iie.utils;
import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
/**
* @author Administrator
*/
public final class TupleUtils {
/**
* 判断是否系统自动发送的Tuple
*
* @param tuple
* @return
*/
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
}

View File

@@ -0,0 +1,25 @@
#Log4j
log4j.rootLogger=info,console,file
# 控制台日志设置
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=error
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] [Thread\:%t] %l %x - <%m>%n
# 文件日志设置
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=error
log4j.appender.file.encoding=UTF-8
log4j.appender.file.Append=true
#路径请用相对路径,做好相关测试输出到应用目下
log4j.appender.file.file=${nis.root}/log/galaxy-name.log
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss} %X{ip} [%t] %5p %c{1} %m%n
log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH\:mm\:ss}] [%-5p] %X{ip} [Thread\:%t] %l %x - %m%n
#MyBatis 配置com.nis.web.dao是mybatis接口所在包
log4j.logger.com.nis.web.dao=debug
#bonecp数据源配置
log4j.category.com.jolbox=debug,console

View File

@@ -0,0 +1,161 @@
package cn.ac.iie.test;
import cn.ac.iie.common.AddressConfig;
import cn.ac.iie.utils.TupleUtils;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author qidaijie
*/
public class SubcribeIdBolt extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(SubcribeIdBolt.class);
private static Map<String, String> subIdMap;
private List<Put> putList;
private static Connection connection;
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
subIdMap = new HashMap<>(16);
putList = new ArrayList<>();
getAll();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
if (TupleUtils.isTick(tuple)) {
insertData(putList);
} else {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
JSONObject jsonObject = JSONObject.parseObject(message);
String ip = jsonObject.getString("framed_ip");
String account = jsonObject.getString("account");
dataValidation(ip, account, putList);
if (putList.size() == AddressConfig.LIST_SIZE_MAX) {
insertData(putList);
}
}
}
} catch (Exception e) {
logger.error("Radius写入Redis出现异常", e);
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
return conf;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
/**
* 获取所有的 key value
*/
private static void getAll() {
try {
Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 写入数据到hbase
*/
private static void insertData(List<Put> putList) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
table.put(putList);
logger.error("写入hbase数目" + putList.size());
putList.clear();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (table != null) {
table.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 验证数据并与内存中的对比
*
* @param ip framed_ip
* @param account account
*/
private static void dataValidation(String ip, String account, List<Put> putList) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
// String s = ip.split("\\.")[0];
// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
if (subIdMap.containsKey(ip)) {
if (!subIdMap.get(ip).equals(account)) {
Put put = new Put(ip.getBytes());
put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes());
putList.add(put);
subIdMap.put(ip, account);
}
} else {
Put put = new Put(ip.getBytes());
put.addColumn("subscribe_id".getBytes(), "account".getBytes(), account.getBytes());
putList.add(put);
subIdMap.put(ip, account);
}
// }
}
}
}

View File

@@ -0,0 +1,117 @@
package cn.ac.iie.test;
import cn.ac.iie.common.AddressConfig;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author qidaijie
*/
public class SubcribeIdBoltone extends BaseBasicBolt {
private static Logger logger = Logger.getLogger(SubcribeIdBoltone.class);
private static Map<String, String> subIdMap;
private static Connection connection;
static {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
subIdMap = new HashMap<>(16);
getAll();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
String message = tuple.getString(0);
if (StringUtil.isNotBlank(message)) {
JSONObject jsonObject = JSONObject.parseObject(message);
String ip = jsonObject.getString("framed_ip");
String account = jsonObject.getString("account");
// dataValidation(ip, account, collector);
collector.emit(new Values(ip + "-" + account));
}
} catch (Exception e) {
logger.error("Radius写入Redis出现异常", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("connLog"));
}
/**
* 获取所有的 key value
*/
private static void getAll() {
try {
Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
scanner.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 验证数据并与内存中的对比
*
* @param ip framed_ip
* @param account account
*/
private static void dataValidation(String ip, String account, BasicOutputCollector collector) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
// String s = ip.split("\\.")[0];
// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
if (subIdMap.containsKey(ip)) {
if (!subIdMap.get(ip).equals(account)) {
subIdMap.put(ip, account);
collector.emit(new Values(ip + "-" + account));
}
} else {
subIdMap.put(ip, account);
collector.emit(new Values(ip + "-" + account));
}
// }
}
}
}

View File

@@ -0,0 +1,32 @@
package cn.ac.iie.test;
import cn.ac.iie.common.AddressConfig;
import com.zdjizhi.utils.StringUtil;
public class subTest {
public static void main(String[] args) {
boolean validation = dataValidation("1.168.40.123", "abcd");
System.out.println(validation);
if ("10,100,192".contains("1")){
System.out.println("yes");
}else{
System.out.println("no");
}
String s = "192.168.40.125-abcd";
System.out.println(s.split("-")[0]);
}
private static boolean dataValidation(String ip, String account) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
String s = ip.split("\\.")[0];
return !AddressConfig.CHECK_IP_SCOPE.contains(s);
}
return false;
}
}