From 896e99e3788fd1dc83c6d96eb645b0b519087b8b Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 1 Feb 2021 11:39:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E7=BA=BF=E4=B8=8A=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 64 +++---- properties/subscriber-config.properties | 32 ++++ .../java/cn/ac/iie/bolt/SubscriberIdBolt.java | 169 ++++++++++++++++++ .../cn/ac/iie/common/SubscriberConfig.java | 52 ++++++ .../cn/ac/iie/spout/CustomizedKafkaSpout.java | 10 +- .../iie/topology/LogSubscriberTopology.java | 81 +++++++++ .../java/cn/ac/iie/topology/StormRunner.java | 3 + .../iie/utils/SubscriberConfigurations.java | 62 +++++++ src/test/java/cn/ac/iie/test/LoggerTest.java | 21 +++ .../java/cn/ac/iie/test/SubcribeIdBolt.java | 14 +- .../cn/ac/iie/test/SubcribeIdBoltone.java | 8 +- src/test/java/cn/ac/iie/test/subTest.java | 4 +- 12 files changed, 471 insertions(+), 49 deletions(-) create mode 100644 properties/subscriber-config.properties create mode 100644 src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java create mode 100644 src/main/java/cn/ac/iie/common/SubscriberConfig.java create mode 100644 src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java create mode 100644 src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java create mode 100644 src/test/java/cn/ac/iie/test/LoggerTest.java diff --git a/pom.xml b/pom.xml index 35ee297..11862a4 100644 --- a/pom.xml +++ b/pom.xml @@ -2,20 +2,35 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - log-address-hbase - log-address-hbase + log-subscriber-hbase + log-subscriber-hbase 0.0.1-SNAPSHOT jar - log-address-hbase + log-subscriber-hbase http://maven.apache.org nexus Team Nexus Repository - http://192.168.10.125:8099/content/groups/public + http://192.168.40.125:8099/content/groups/public + + + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + true + + + true + always + fail + + + @@ -35,7 +50,7 @@ - cn.ac.iie.topology.LogAddressRedisTopology + cn.ac.iie.topology.LogSubscriberTopology @@ -85,7 +100,7 @@ UTF-8 1.0.0 1.0.2 - 1.4.9 + 2.2.3 2.7.1 @@ -108,6 +123,16 @@ storm-core ${storm.version} provided + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + @@ -126,34 +151,11 @@ - - com.nis - nis-core - 1.0 - - - - info.monitorenter - cpdetector - 1.0.7 - - - - com.google.guava - guava - 18.0 - - - - redis.clients - jedis - 2.8.1 - junit junit - 3.8.1 + 4.12 test @@ -161,7 +163,7 @@ com.alibaba fastjson - 1.2.47 + 1.2.59 diff --git a/properties/subscriber-config.properties b/properties/subscriber-config.properties new file mode 100644 index 0000000..153a905 --- /dev/null +++ b/properties/subscriber-config.properties @@ -0,0 +1,32 @@ +#管理kafka地址 +bootstrap.servers=192.168.40.186:9092 +#从kafka哪里开始读:earliest/latest +auto.offset.reset=latest + +#hbase zookeeper地址 +hbase.zookeeper.servers=192.168.40.186:2181 + +#hbase table name +hbase.table.name=subscriber_info + +#tick时钟频率 +topology.tick.tuple.freq.secs=30 + +topology.config.max.spout.pending=500000 + +topology.num.acks=0 + +#kafka broker下的topic名称 +kafka.topic=RADIUS-RECORD-LOG + +#kafka消费group id +group.id=account-to-hbase-a + +#storm topology workers +topology.workers=1 + +#storm spout parallelism +spout.parallelism=1 + +#storm bolt parallelism +format.bolt.parallelism=1 diff --git a/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java new file mode 100644 index 0000000..af08f87 --- /dev/null +++ b/src/main/java/cn/ac/iie/bolt/SubscriberIdBolt.java @@ -0,0 +1,169 @@ +package cn.ac.iie.bolt; + +import cn.ac.iie.common.SubscriberConfig; +import cn.ac.iie.utils.TupleUtils; +import com.alibaba.fastjson.JSONObject; +import com.zdjizhi.utils.StringUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.io.IOException; +import java.util.*; + +/** + * @author qidaijie + */ +public class SubscriberIdBolt extends BaseBasicBolt { + private static Logger logger = Logger.getLogger(SubscriberIdBolt.class); + private static Map subIdMap; + private List putList; + private static Connection connection; + + static { + // 管理Hbase的配置信息 + Configuration configuration = HBaseConfiguration.create(); + // 设置zookeeper节点 + configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS); + configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + configuration.set("hbase.client.retries.number", "3"); + configuration.set("hbase.bulkload.retries.number", "3"); + configuration.set("zookeeper.recovery.retry", "3"); + try { + connection = ConnectionFactory.createConnection(configuration); + } catch (IOException e) { + logger.error("Subscriber写入HBase程序连接HBase异常"); + e.printStackTrace(); + } + } + + @Override + public void prepare(Map stormConf, TopologyContext context) { + subIdMap = new HashMap<>(3333334); + putList = new ArrayList<>(); + getAll(); + } + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + try { + if (TupleUtils.isTick(tuple)) { + insertData(putList); + } else { + String message = tuple.getString(0); + if (StringUtil.isNotBlank(message)) { + JSONObject jsonObject = JSONObject.parseObject(message); + if (jsonObject.containsKey(SubscriberConfig.PACKET_TYPE) && jsonObject.containsKey(SubscriberConfig.STATUS_TYPE)) { + if (SubscriberConfig.ACCOUNTING_REQUEST == jsonObject.getInteger(SubscriberConfig.PACKET_TYPE) + && SubscriberConfig.START_BILLING == jsonObject.getInteger(SubscriberConfig.STATUS_TYPE)) { + String framedIp = jsonObject.getString("radius_framed_ip"); + String account = jsonObject.getString("radius_account"); + dataValidation(framedIp, account, putList); + } + if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) { + insertData(putList); + } + } + } + } + } catch (Exception e) { + logger.error("Radius 日志解析/更新HBase 失败!"); + e.printStackTrace(); + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap(16); + conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + return conf; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + /** + * 获取所有的 key value + */ + private static void getAll() { + try { + Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME)); + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan); + for (Result result : scanner) { + Cell[] cells = result.rawCells(); + for (Cell cell : cells) { + subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + } + } + logger.warn("初始化内存数据成功--初始化map大小->(" + subIdMap.size() + ")"); + scanner.close(); + } catch (IOException e) { + logger.error("初始化内存数据异常"); + e.printStackTrace(); + } + } + + /** + * 写入数据到HBase + * + * @param putList puts list + */ + private static void insertData(List putList) { + Table table = null; + try { + table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME)); + table.put(putList); + putList.clear(); + logger.warn("更新HBase成功,更新条数:" + putList.size()); + } catch (IOException e) { + logger.error("更新数据写入HBase失败"); + e.printStackTrace(); + } finally { + try { + if (table != null) { + table.close(); + } + } catch (IOException e) { + logger.error("HBase表关闭异常"); + e.printStackTrace(); + } + } + + } + + /** + * 验证数据并与内存中的对比 + * + * @param ip framed_ip + * @param account account + */ + private static void dataValidation(String ip, String account, List putList) { + if (subIdMap.containsKey(ip)) { + if (!subIdMap.get(ip).equals(account)) { + Put put = new Put(ip.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } else { + Put put = new Put(ip.getBytes()); + put.addColumn("subscriber_id".getBytes(), "account".getBytes(), account.getBytes()); + putList.add(put); + subIdMap.put(ip, account); + } + } +} diff --git a/src/main/java/cn/ac/iie/common/SubscriberConfig.java b/src/main/java/cn/ac/iie/common/SubscriberConfig.java new file mode 100644 index 0000000..45040ee --- /dev/null +++ b/src/main/java/cn/ac/iie/common/SubscriberConfig.java @@ -0,0 +1,52 @@ +package cn.ac.iie.common; + +import cn.ac.iie.utils.SubscriberConfigurations; + +import java.io.Serializable; + +/** + * @author qidaijie + */ +public class SubscriberConfig implements Serializable { + private static final long serialVersionUID = -8326385159484059324L; + + public static final String SEGMENTATION = ","; + public static final int LIST_SIZE_MAX = 5000; + /** + * 4- Accounting-Request(账户授权) + */ + public static final int ACCOUNTING_REQUEST = 4; + /** + * radius_packet_type + */ + public static final String PACKET_TYPE = "radius_packet_type"; + /** + * 1、开始计费 + */ + public static final int START_BILLING = 1; + /** + * radius_acct_status_type + */ + public static final String STATUS_TYPE = "radius_acct_status_type"; + + + /*** + * kafka and system + */ + public static final String BOOTSTRAP_SERVERS = SubscriberConfigurations.getStringProperty(0, "bootstrap.servers"); + public static final Integer SPOUT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "spout.parallelism"); + public static final Integer FORMAT_BOLT_PARALLELISM = SubscriberConfigurations.getIntProperty(0, "format.bolt.parallelism"); + public static final String GROUP_ID = SubscriberConfigurations.getStringProperty(0, "group.id"); + public static final String KAFKA_TOPIC = SubscriberConfigurations.getStringProperty(0, "kafka.topic"); + public static final String AUTO_OFFSET_RESET = SubscriberConfigurations.getStringProperty(0, "auto.offset.reset"); + public static final Integer TOPOLOGY_TICK_TUPLE_FREQ_SECS = SubscriberConfigurations.getIntProperty(0, "topology.tick.tuple.freq.secs"); + public static final Integer TOPOLOGY_CONFIG_MAX_SPOUT_PENDING = SubscriberConfigurations.getIntProperty(0, "topology.config.max.spout.pending"); + public static final Integer TOPOLOGY_NUM_ACKS = SubscriberConfigurations.getIntProperty(0, "topology.num.acks"); + + public static final Integer TOPOLOGY_WORKERS = SubscriberConfigurations.getIntProperty(0, "topology.workers"); + + public static final String CHECK_IP_SCOPE = SubscriberConfigurations.getStringProperty(0, "check.ip.scope"); + + public static final String HBASE_ZOOKEEPER_SERVERS = SubscriberConfigurations.getStringProperty(0, "hbase.zookeeper.servers"); + public static final String HBASE_TABLE_NAME = SubscriberConfigurations.getStringProperty(0, "hbase.table.name"); +} \ No newline at end of file diff --git a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java index 96c8330..6c9dadc 100644 --- a/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java +++ b/src/main/java/cn/ac/iie/spout/CustomizedKafkaSpout.java @@ -1,6 +1,6 @@ package cn.ac.iie.spout; -import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.common.SubscriberConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -29,9 +29,9 @@ public class CustomizedKafkaSpout extends BaseRichSpout { private static Properties createConsumerConfig() { Properties props = new Properties(); - props.put("bootstrap.servers", AddressConfig.BOOTSTRAP_SERVERS); - props.put("group.id", AddressConfig.GROUP_ID); - props.put("auto.offset.reset", AddressConfig.AUTO_OFFSET_RESET); + props.put("bootstrap.servers", SubscriberConfig.BOOTSTRAP_SERVERS); + props.put("group.id", SubscriberConfig.GROUP_ID); + props.put("auto.offset.reset", SubscriberConfig.AUTO_OFFSET_RESET); props.put("session.timeout.ms", "60000"); props.put("max.poll.records", 3000); props.put("max.partition.fetch.bytes", 31457280); @@ -47,7 +47,7 @@ public class CustomizedKafkaSpout extends BaseRichSpout { this.context = context; Properties prop = createConsumerConfig(); this.consumer = new KafkaConsumer<>(prop); - this.consumer.subscribe(Arrays.asList(AddressConfig.KAFKA_TOPIC)); + this.consumer.subscribe(Arrays.asList(SubscriberConfig.KAFKA_TOPIC)); } @Override diff --git a/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java new file mode 100644 index 0000000..5a0c903 --- /dev/null +++ b/src/main/java/cn/ac/iie/topology/LogSubscriberTopology.java @@ -0,0 +1,81 @@ +package cn.ac.iie.topology; + +import cn.ac.iie.bolt.SubscriberIdBolt; +import cn.ac.iie.common.SubscriberConfig; +import cn.ac.iie.spout.CustomizedKafkaSpout; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author qidaijie + */ +public class LogSubscriberTopology { + private static final Logger logger = LoggerFactory.getLogger(LogSubscriberTopology.class); + private final String topologyName; + private final Config topologyConfig; + private TopologyBuilder builder; + + public LogSubscriberTopology() { + this(LogSubscriberTopology.class.getSimpleName()); + } + + public LogSubscriberTopology(String topologyName) { + this.topologyName = topologyName; + topologyConfig = createTopologConfig(); + } + + private Config createTopologConfig() { + Config conf = new Config(); + conf.setDebug(false); + conf.setMessageTimeoutSecs(120); + conf.setTopologyWorkerMaxHeapSize(500); + conf.setMaxSpoutPending(SubscriberConfig.TOPOLOGY_CONFIG_MAX_SPOUT_PENDING); + if (SubscriberConfig.TOPOLOGY_NUM_ACKS == 0) { + conf.setNumAckers(0); + } + return conf; + } + + public void runLocally() throws InterruptedException { + topologyConfig.setMaxTaskParallelism(1); + StormRunner.runTopologyLocally(builder, topologyName, topologyConfig, 600); + } + + public void runRemotely() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + topologyConfig.setNumWorkers(SubscriberConfig.TOPOLOGY_WORKERS); + //设置过高会导致很多问题,如心跳线程饿死、吞吐量大幅下跌 + topologyConfig.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 8); + StormRunner.runTopologyRemotely(builder, topologyName, topologyConfig); + } + + private void buildTopology() { + builder = new TopologyBuilder(); + builder.setSpout("CustomizedKafkaSpout", new CustomizedKafkaSpout(), SubscriberConfig.SPOUT_PARALLELISM); + builder.setBolt("SubscriberIdBolt", new SubscriberIdBolt(), SubscriberConfig.FORMAT_BOLT_PARALLELISM).localOrShuffleGrouping("CustomizedKafkaSpout"); + } + + public static void main(String[] args) throws Exception { + LogSubscriberTopology csst = null; + boolean runLocally = true; + if (args.length >= 2 && "remote".equalsIgnoreCase(args[1])) { + runLocally = false; + csst = new LogSubscriberTopology(args[0]); + } else { + csst = new LogSubscriberTopology(); + } + csst.buildTopology(); + + if (runLocally) { + logger.info("执行本地模式..."); + csst.runLocally(); + } else { + logger.info("执行远程部署模式..."); + csst.runRemotely(); + } + } +} diff --git a/src/main/java/cn/ac/iie/topology/StormRunner.java b/src/main/java/cn/ac/iie/topology/StormRunner.java index d2d4ab9..4d5be53 100644 --- a/src/main/java/cn/ac/iie/topology/StormRunner.java +++ b/src/main/java/cn/ac/iie/topology/StormRunner.java @@ -9,6 +9,9 @@ import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; +/** + * @author qidaijie + */ public final class StormRunner{ private static final int MILLS_IN_SEC = 1000; diff --git a/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java new file mode 100644 index 0000000..95dd847 --- /dev/null +++ b/src/main/java/cn/ac/iie/utils/SubscriberConfigurations.java @@ -0,0 +1,62 @@ +package cn.ac.iie.utils; + +import java.util.Properties; + + + +public final class SubscriberConfigurations { + + private static Properties propCommon = new Properties(); +// private static Properties propService = new Properties(); + + + public static String getStringProperty(Integer type, String key) { + if (type == 0) { + return propCommon.getProperty(key); +// } else if (type == 1) { +// return propService.getProperty(key); + } else { + return null; + } + } + + + public static Integer getIntProperty(Integer type, String key) { + if (type == 0) { + return Integer.parseInt(propCommon.getProperty(key)); +// } else if (type == 1) { +// return Integer.parseInt(propService.getProperty(key)); + } else { + return null; + } + } + + public static Long getLongProperty(Integer type, String key) { + if (type == 0) { + return Long.parseLong(propCommon.getProperty(key)); +// } else if (type == 1) { +// return Long.parseLong(propService.getProperty(key)); + } else { + return null; + } + } + + public static Boolean getBooleanProperty(Integer type, String key) { + if (type == 0) { + return "true".equals(propCommon.getProperty(key).toLowerCase().trim()); +// } else if (type == 1) { +// return propService.getProperty(key).toLowerCase().trim().equals("true"); + } else { + return null; + } + } + + static { + try { + propCommon.load(SubscriberConfigurations.class.getClassLoader().getResourceAsStream("subscriber-config.properties")); + } catch (Exception e) { + propCommon = null; + System.err.println("配置加载失败"); + } + } +} diff --git a/src/test/java/cn/ac/iie/test/LoggerTest.java b/src/test/java/cn/ac/iie/test/LoggerTest.java new file mode 100644 index 0000000..71e6d2a --- /dev/null +++ b/src/test/java/cn/ac/iie/test/LoggerTest.java @@ -0,0 +1,21 @@ +package cn.ac.iie.test; + +import cn.ac.iie.common.SubscriberConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggerTest { + private static final Logger logger = LoggerFactory.getLogger(LoggerTest.class); + + public static void main(String[] args) { + try{ + int a = 1; + throw new Exception(); + }catch (Exception e){ + int b = 20; + logger.error("id is {} aaaaaaaaaa {}.",SubscriberConfig.KAFKA_TOPIC,b,e); + } + + + } +} diff --git a/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java b/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java index add969a..d4051a3 100644 --- a/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java +++ b/src/test/java/cn/ac/iie/test/SubcribeIdBolt.java @@ -1,6 +1,6 @@ package cn.ac.iie.test; -import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.common.SubscriberConfig; import cn.ac.iie.utils.TupleUtils; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.utils.StringUtil; @@ -37,7 +37,7 @@ public class SubcribeIdBolt extends BaseBasicBolt { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS); + configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { @@ -64,7 +64,7 @@ public class SubcribeIdBolt extends BaseBasicBolt { String ip = jsonObject.getString("framed_ip"); String account = jsonObject.getString("account"); dataValidation(ip, account, putList); - if (putList.size() == AddressConfig.LIST_SIZE_MAX) { + if (putList.size() == SubscriberConfig.LIST_SIZE_MAX) { insertData(putList); } } @@ -79,7 +79,7 @@ public class SubcribeIdBolt extends BaseBasicBolt { public Map getComponentConfiguration() { Map conf = new HashMap(16); conf.put(org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, - AddressConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); + SubscriberConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS); return conf; } @@ -93,7 +93,7 @@ public class SubcribeIdBolt extends BaseBasicBolt { */ private static void getAll() { try { - Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { @@ -114,7 +114,7 @@ public class SubcribeIdBolt extends BaseBasicBolt { private static void insertData(List putList) { Table table = null; try { - table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME)); table.put(putList); logger.error("写入hbase数目:" + putList.size()); putList.clear(); @@ -141,7 +141,7 @@ public class SubcribeIdBolt extends BaseBasicBolt { private static void dataValidation(String ip, String account, List putList) { if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { // String s = ip.split("\\.")[0]; -// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) { +// if (!SubscriberConfig.CHECK_IP_SCOPE.contains(s)) { if (subIdMap.containsKey(ip)) { if (!subIdMap.get(ip).equals(account)) { Put put = new Put(ip.getBytes()); diff --git a/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java b/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java index 00b54a8..558a141 100644 --- a/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java +++ b/src/test/java/cn/ac/iie/test/SubcribeIdBoltone.java @@ -1,6 +1,6 @@ package cn.ac.iie.test; -import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.common.SubscriberConfig; import com.alibaba.fastjson.JSONObject; import com.zdjizhi.utils.StringUtil; import org.apache.hadoop.conf.Configuration; @@ -35,7 +35,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", AddressConfig.HBASE_ZOOKEEPER_SERVERS); + configuration.set("hbase.zookeeper.quorum", SubscriberConfig.HBASE_ZOOKEEPER_SERVERS); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { @@ -76,7 +76,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt { */ private static void getAll() { try { - Table table = connection.getTable(TableName.valueOf("sub:" + AddressConfig.HBASE_TABLE_NAME)); + Table table = connection.getTable(TableName.valueOf("sub:" + SubscriberConfig.HBASE_TABLE_NAME)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { @@ -101,7 +101,7 @@ public class SubcribeIdBoltone extends BaseBasicBolt { private static void dataValidation(String ip, String account, BasicOutputCollector collector) { if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { // String s = ip.split("\\.")[0]; -// if (!AddressConfig.CHECK_IP_SCOPE.contains(s)) { +// if (!SubscriberConfig.CHECK_IP_SCOPE.contains(s)) { if (subIdMap.containsKey(ip)) { if (!subIdMap.get(ip).equals(account)) { subIdMap.put(ip, account); diff --git a/src/test/java/cn/ac/iie/test/subTest.java b/src/test/java/cn/ac/iie/test/subTest.java index 468517a..b0a9814 100644 --- a/src/test/java/cn/ac/iie/test/subTest.java +++ b/src/test/java/cn/ac/iie/test/subTest.java @@ -1,6 +1,6 @@ package cn.ac.iie.test; -import cn.ac.iie.common.AddressConfig; +import cn.ac.iie.common.SubscriberConfig; import com.zdjizhi.utils.StringUtil; public class subTest { @@ -24,7 +24,7 @@ public class subTest { private static boolean dataValidation(String ip, String account) { if (StringUtil.isNotBlank(ip) && StringUtil.isNotBlank(account)) { String s = ip.split("\\.")[0]; - return !AddressConfig.CHECK_IP_SCOPE.contains(s); + return !SubscriberConfig.CHECK_IP_SCOPE.contains(s); } return false; }