diff --git a/pom.xml b/pom.xml
index 35ee297..11862a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,20 +2,35 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- log-address-hbase
- log-address-hbase
+ log-subscriber-hbase
+ log-subscriber-hbase
0.0.1-SNAPSHOT
jar
- log-address-hbase
+ log-subscriber-hbase
http://maven.apache.org
nexus
Team Nexus Repository
- http://192.168.10.125:8099/content/groups/public
+ http://192.168.40.125:8099/content/groups/public
+
+
+
+ maven-ali
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+ true
+
+
+ true
+ always
+ fail
+
+
+
@@ -35,7 +50,7 @@
- cn.ac.iie.topology.LogAddressRedisTopology
+ cn.ac.iie.topology.LogSubscriberTopology
@@ -85,7 +100,7 @@
UTF-8
1.0.0
1.0.2
- 1.4.9
+ 2.2.3
2.7.1
@@ -108,6 +123,16 @@
storm-core
${storm.version}
provided
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ log4j-over-slf4j
+ org.slf4j
+
+
@@ -126,34 +151,11 @@
-
- com.nis
- nis-core
- 1.0
-
-
-
- info.monitorenter
- cpdetector
- 1.0.7
-
-
-
- com.google.guava
- guava
- 18.0
-
-
-
- redis.clients
- jedis
- 2.8.1
-
junit
junit
- 3.8.1
+ 4.12
test
@@ -161,7 +163,7 @@
com.alibaba
fastjson
- 1.2.47
+ 1.2.59
diff --git a/properties/subscriber-config.properties b/properties/subscriber-config.properties
new file mode 100644
index 0000000..153a905
--- /dev/null
+++ b/properties/subscriber-config.properties
@@ -0,0 +1,32 @@
+#管理kafka地址
+bootstrap.servers=192.168.40.186:9092
+#从kafka哪里开始读:earliest/latest
+auto.offset.reset=latest
+
+#hbase zookeeper地址
+hbase.zookeeper.servers=192.168.40.186:2181
+
+#hbase table name
+hbase.table.name=subscriber_info
+
+#tick时钟频率
+topology.tick.tuple.freq.secs=30
+
+topology.config.max.spout.pending=500000
+
+topology.num.acks=0
+
+#kafka broker下的topic名称
+kafka.topic=RADIUS-RECORD-LOG
+
+#kafka消费group id
+group.id=account-to-hbase-a
+
+#storm topology workers
+topology.workers=1
+
+#storm spout parallelism
+spout.parallelism=1
+
+#storm bolt parallelism
+format.bolt.parallelism=1
diff --git a/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
new file mode 100644
index 0000000..af08f87
--- /dev/null
+++ b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java
@@ -0,0 +1,169 @@
+package cn.ac.iie.bolt;
+
+import cn.ac.iie.common.SubscriberConfig;
+import cn.ac.iie.utils.TupleUtils;
+import com.alibaba.fastjson.JSONObject;
+import com.zdjizhi.utils.StringUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * @author qidaijie
+ */
+public class SubscriberIdBolt extends BaseBasicBolt {
+ private static Logger logger = Logger.getLogger(SubscriberIdBolt.class);
+ private static Map subIdMap;
+ private List putList;
+ private static Connection connection;
+
+ static {
+ // 管理Hbase的配置信息
+ Configuration configuration = HBaseConfiguration.create();
+ // 设置zookeeper节点
+ configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
+ configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ configuration.set("hbase.client.retries.number", "3");
+ configuration.set("hbase.bulkload.retries.number", "3");
+ configuration.set("zookeeper.recovery.retry", "3");
+ try {
+ connection = ConnectionFactory.createConnection(configuration);
+ } catch (IOException e) {
+ logger.error("Subscriber写入HBase程序连接HBase异常");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ subIdMap = new HashMap<>(3333334);
+ putList = new ArrayList<>();
+ getAll();
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ try {
+ if (TupleUtils.isTick(tuple)) {
+ insertData(putList);
+ } else {
+ String message = tuple.getString(0);
+ if (StringUtil.isNotBlank(message)) {
+ JSONObject jsonObject = JSONObject.parseObject(message);
+ if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) {
+ if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE)
+ && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) {
+ String framedIp = jsonObject.getString("radius_framed_ip");
+ String account = jsonObject.getString("radius_account");
+ dataValidation(framedIp, account, putList);
+ }
+ if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
+ insertData(putList);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Radius 日志解析/更新HBase 失败!");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ Map conf = new HashMap(16);
+ conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
+ SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ return conf;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ /**
+ * 获取所有的 key value
+ */
+ private static void getAll() {
+ try {
+ Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
+ Scan scan = new Scan();
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result result : scanner) {
+ Cell[] cells = result.rawCells();
+ for (Cell cell : cells) {
+ subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
+ }
+ }
+ logger.warn("初始化内存数据成功--初始化map大小->(" + subIdMap.size() + ")");
+ scanner.close();
+ } catch (IOException e) {
+ logger.error("初始化内存数据异常");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 写入数据到HBase
+ *
+ * @param putList puts list
+ */
+ private static void insertData(List putList) {
+ Table table = null;
+ try {
+ table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
+ table.put(putList);
+ putList.clear();
+ logger.warn("更新HBase成功,更新条数:" + putList.size());
+ } catch (IOException e) {
+ logger.error("更新数据写入HBase失败");
+ e.printStackTrace();
+ } finally {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException e) {
+ logger.error("HBase表关闭异常");
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ /**
+ * 验证数据并与内存中的对比
+ *
+ * @param ip framed_ip
+ * @param account account
+ */
+ private static void dataValidation(String ip, String account, List putList) {
+ if (subIdMap.containsKey(ip)) {
+ if (!subIdMap.get(ip).equals(account)) {
+ Put put = new Put(ip.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
+ putList.add(put);
+ subIdMap.put(ip, account);
+ }
+ } else {
+ Put put = new Put(ip.getBytes());
+ put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes());
+ putList.add(put);
+ subIdMap.put(ip, account);
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/common/SubscriberConfig.java b/src/main/java/cn/ac/iie/common/SubscriberConfig.java
new file mode 100644
index 0000000..45040ee
--- /dev/null
+++ b/src/main/java/cn/ac/iie/common/SubscriberConfig.java
@@ -0,0 +1,52 @@
+package cn.ac.iie.common;
+
+import cn.ac.iie.utils.SubscriberConfigurations;
+
+import java.io.Serializable;
+
+/**
+ * @author qidaijie
+ */
+public class SubscriberConfig implements Serializable {
+ private static final long serialVersionUID = -8326385159484059324L;
+
+ public static final String SEGMENTATION = ",";
+ public static final int LIST_SIZE_MAX = 5000;
+ /**
+ * 4- Accounting-Request(账户授权)
+ */
+ public static final int ACCOUNTING_REQUEST = 4;
+ /**
+ * radius_packet_type
+ */
+ public static final String PACKET_TYPE = "radius_packet_type";
+ /**
+ * 1、开始计费
+ */
+ public static final int START_BILLING = 1;
+ /**
+ * radius_acct_status_type
+ */
+ public static final String STATUS_TYPE = "radius_acct_status_type";
+
+
+ /***
+ * kafka and system
+ */
+ public static final String BOOTSTRAP_SERVERS = SubscriberConfigurations.getStringProperty(0, "bootstrap.servers");
+ public static final Integer SPOUT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "spout.parallelism");
+ public static final Integer FORMAT_BOLT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "format.bolt.parallelism");
+ public static final String GROUP_ID = SubscriberConfigurations.getStringProperty(0, "group.id");
+ public static final String KAFKA_TOPIC = SubscriberConfigurations.getStringProperty(0, "kafka.topic");
+ public static final String AUTO_OFFSET_RESET = SubscriberConfigurations.getStringProperty(0, "auto.offset.reset");
+ public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = SubscriberConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs");
+ public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = SubscriberConfigurations.getIntProperty(0, "topology.config.max.spout.pending");
+ public static final Integer TOPOLOGY_NUM_ACKS = SubscriberConfigurations.getIntProperty(0, "topology.num.acks");
+
+ public static final Integer TOPOLOGY_WORKERS = SubscriberConfigurations.getIntProperty(0, "topology.workers");
+
+ public static final String CHECK_IP_SCOPE = SubscriberConfigurations.getStringProperty(0, "check.ip.scope");
+
+ public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers");
+ public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name");
+}
\ No newline at end of file
diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
index 96c8330..6c9dadc 100644
--- a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
+++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java
@@ -1,6 +1,6 @@
package cn.ac.iie.spout;
-import cn.ac.iie.common.AddressConfig;
+import cn.ac.iie.common.SubscriberConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -29,9 +29,9 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
private static Properties createConsumerConfig() {
Properties props = new Properties();
- props.put("bootstrap.servers", AddressConfig.BOOTSTRAP_SERVERS);
- props.put("group.id", AddressConfig.GROUP_ID);
- props.put("auto.offset.reset", AddressConfig.AUTO_OFFSET_RESET);
+ props.put("bootstrap.servers", SubscriberConfig.BOOTSTRAP_SERVERS);
+ props.put("group.id", SubscriberConfig.GROUP_ID);
+ props.put("auto.offset.reset", SubscriberConfig.AUTO_OFFSET_RESET);
props.put("session.timeout.ms", "60000");
props.put("max.poll.records", 3000);
props.put("max.partition.fetch.bytes", 31457280);
@@ -47,7 +47,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout {
this.context = context;
Properties prop = createConsumerConfig();
this.consumer = new KafkaConsumer<>(prop);
- this.consumer.subscribe(Arrays.asList(AddressConfig.KAFKA_TOPIC));
+ this.consumer.subscribe(Arrays.asList(SubscriberConfig.KAFKA_TOPIC));
}
@Override
diff --git a/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
new file mode 100644
index 0000000..5a0c903
--- /dev/null
+++ b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java
@@ -0,0 +1,81 @@
+package cn.ac.iie.topology;
+
+import cn.ac.iie.bolt.SubscriberIdBolt;
+import cn.ac.iie.common.SubscriberConfig;
+import cn.ac.iie.spout.CustomizedKafkaSpout;
+import org.apache.storm.Config;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author qidaijie
+ */
+public class LogSubscriberTopology {
+ private static final Logger logger = LoggerFactory.getLogger(LogSubscriberTopology.class);
+ private final String topologyName;
+ private final Config topologyConfig;
+ private TopologyBuilder builder;
+
+ public LogSubscriberTopology() {
+ this(LogSubscriberTopology.class.getSimpleName());
+ }
+
+ public LogSubscriberTopology(String topologyName) {
+ this.topologyName = topologyName;
+ topologyConfig = createTopologConfig();
+ }
+
+ private Config createTopologConfig() {
+ Config conf = new Config();
+ conf.setDebug(false);
+ conf.setMessageTimeoutSecs(120);
+ conf.setTopologyWorkerMaxHeapSize(500);
+ conf.setMaxSpoutPending(SubscriberConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING);
+ if (SubscriberConfig.TOPOLOGY_NUM_ACKS == 0) {
+ conf.setNumAckers(0);
+ }
+ return conf;
+ }
+
+ public void runLocally() throws InterruptedException {
+ topologyConfig.setMaxTaskParallelism(1);
+ StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600);
+ }
+
+ public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ topologyConfig.setNumWorkers(SubscriberConfig.TOPOLOGY_WORKERS);
+ //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌
+ topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8);
+ StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig);
+ }
+
+ private void buildTopology() {
+ builder = new TopologyBuilder();
+ builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), SubscriberConfig.SPOUT_PARALLELISM);
+ builder.setBolt("SubscriberIdBolt", new SubscriberIdBolt(), SubscriberConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout");
+ }
+
+ public static void main(String[] args) throws Exception {
+ LogSubscriberTopology csst = null;
+ boolean runLocally = true;
+ if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) {
+ runLocally = false;
+ csst = new LogSubscriberTopology(args[0]);
+ } else {
+ csst = new LogSubscriberTopology();
+ }
+ csst.buildTopology();
+
+ if (runLocally) {
+ logger.info("执行本地模式...");
+ csst.runLocally();
+ } else {
+ logger.info("执行远程部署模式...");
+ csst.runRemotely();
+ }
+ }
+}
diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java
index d2d4ab9..4d5be53 100644
--- a/src/main/java/cn/ac/iie/topology/StormRunner.java
+++ b/src/main/java/cn/ac/iie/topology/StormRunner.java
@@ -9,6 +9,9 @@ import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
+/**
+ * @author qidaijie
+ */
public final class StormRunner{
private static final int MILLS_IN_SEC = 1000;
diff --git a/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
new file mode 100644
index 0000000..95dd847
--- /dev/null
+++ b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java
@@ -0,0 +1,62 @@
+package cn.ac.iie.utils;
+
+import java.util.Properties;
+
+
+
+public final class SubscriberConfigurations {
+
+ private static Properties propCommon = new Properties();
+// private static Properties propService = new Properties();
+
+
+ public static String getStringProperty(Integer type, String key) {
+ if (type == 0) {
+ return propCommon.getProperty(key);
+// } else if (type == 1) {
+// return propService.getProperty(key);
+ } else {
+ return null;
+ }
+ }
+
+
+ public static Integer getIntProperty(Integer type, String key) {
+ if (type == 0) {
+ return Integer.parseInt(propCommon.getProperty(key));
+// } else if (type == 1) {
+// return Integer.parseInt(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Long getLongProperty(Integer type, String key) {
+ if (type == 0) {
+ return Long.parseLong(propCommon.getProperty(key));
+// } else if (type == 1) {
+// return Long.parseLong(propService.getProperty(key));
+ } else {
+ return null;
+ }
+ }
+
+ public static Boolean getBooleanProperty(Integer type, String key) {
+ if (type == 0) {
+ return "true".equals(propCommon.getProperty(key).toLowerCase().trim());
+// } else if (type == 1) {
+// return propService.getProperty(key).toLowerCase().trim().equals("true");
+ } else {
+ return null;
+ }
+ }
+
+ static {
+ try {
+ propCommon.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("subscriber-config.properties"));
+ } catch (Exception e) {
+ propCommon = null;
+ System.err.println("配置加载失败");
+ }
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/LoggerTest.java b/src/test/java/cn/ac/iie/test/LoggerTest.java
new file mode 100644
index 0000000..71e6d2a
--- /dev/null
+++ b/src/test/java/cn/ac/iie/test/LoggerTest.java
@@ -0,0 +1,21 @@
+package cn.ac.iie.test;
+
+import cn.ac.iie.common.SubscriberConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggerTest {
+ private static final Logger logger = LoggerFactory.getLogger(LoggerTest.class);
+
+ public static void main(String[] args) {
+ try{
+ int a = 1;
+ throw new Exception();
+ }catch (Exception e){
+ int b = 20;
+ logger.error("id is {} aaaaaaaaaa {}.",SubscriberConfig.KAFKA_TOPIC,b,e);
+ }
+
+
+ }
+}
diff --git a/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java b/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java
index add969a..d4051a3 100644
--- a/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java
+++ b/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java
@@ -1,6 +1,6 @@
package cn.ac.iie.test;
-import cn.ac.iie.common.AddressConfig;
+import cn.ac.iie.common.SubscriberConfig;
import cn.ac.iie.utils.TupleUtils;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
@@ -37,7 +37,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
+ configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
@@ -64,7 +64,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
String ip = jsonObject.getString("framed_ip");
String account = jsonObject.getString("account");
dataValidation(ip, account, putList);
- if (putList.size() == AddressConfig.LIST_SIZE_MAX) {
+ if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) {
insertData(putList);
}
}
@@ -79,7 +79,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
public Map getComponentConfiguration() {
Map conf = new HashMap(16);
conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
- AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+ SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
return conf;
}
@@ -93,7 +93,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
*/
private static void getAll() {
try {
- Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
+ Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
@@ -114,7 +114,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
private static void insertData(List putList) {
Table table = null;
try {
- table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
+ table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
table.put(putList);
logger.error("写入hbase数目:" + putList.size());
putList.clear();
@@ -141,7 +141,7 @@ public class SubcribeIdBolt extends BaseBasicBolt {
private static void dataValidation(String ip, String account, List putList) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
// String s = ip.split("\\.")[0];
-// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
+// if (!SubscriberConfig.CHECK_IP_SCOPE.contains(s)) {
if (subIdMap.containsKey(ip)) {
if (!subIdMap.get(ip).equals(account)) {
Put put = new Put(ip.getBytes());
diff --git a/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java b/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java
index 00b54a8..558a141 100644
--- a/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java
+++ b/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java
@@ -1,6 +1,6 @@
package cn.ac.iie.test;
-import cn.ac.iie.common.AddressConfig;
+import cn.ac.iie.common.SubscriberConfig;
import com.alibaba.fastjson.JSONObject;
import com.zdjizhi.utils.StringUtil;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +35,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
- configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS);
+ configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS);
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
@@ -76,7 +76,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt {
*/
private static void getAll() {
try {
- Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME));
+ Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
@@ -101,7 +101,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt {
private static void dataValidation(String ip, String account, BasicOutputCollector collector) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
// String s = ip.split("\\.")[0];
-// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) {
+// if (!SubscriberConfig.CHECK_IP_SCOPE.contains(s)) {
if (subIdMap.containsKey(ip)) {
if (!subIdMap.get(ip).equals(account)) {
subIdMap.put(ip, account);
diff --git a/src/test/java/cn/ac/iie/test/subTest.java b/src/test/java/cn/ac/iie/test/subTest.java
index 468517a..b0a9814 100644
--- a/src/test/java/cn/ac/iie/test/subTest.java
+++ b/src/test/java/cn/ac/iie/test/subTest.java
@@ -1,6 +1,6 @@
package cn.ac.iie.test;
-import cn.ac.iie.common.AddressConfig;
+import cn.ac.iie.common.SubscriberConfig;
import com.zdjizhi.utils.StringUtil;
public class subTest {
@@ -24,7 +24,7 @@ public class subTest {
private static boolean dataValidation(String ip, String account) {
if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) {
String s = ip.split("\\.")[0];
- return !AddressConfig.CHECK_IP_SCOPE.contains(s);
+ return !SubscriberConfig.CHECK_IP_SCOPE.contains(s);
}
return false;
}