From f12e8079c2efaf69d4277185f9d9995579101587 Mon Sep 17 00:00:00 2001 From: qidaijie Date: Mon, 6 Dec 2021 15:42:44 +0300 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Subscriber=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E8=8E=B7=E5=8F=96=E7=AD=96=E7=95=A5=EF=BC=9A?= =?UTF-8?q?=201=EF=BC=9A=E8=A7=84=E8=8C=83=E5=85=B3=E7=B3=BB=E5=91=BD?= =?UTF-8?q?=E5=90=8D=E5=8F=8A=E8=8E=B7=E5=8F=96=E6=96=B9=E5=BC=8F=E3=80=82?= =?UTF-8?q?=202=EF=BC=9A=E6=A0=B9=E6=8D=AEacct=5Fstatus=5Ftype=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E6=B7=BB=E5=8A=A0=E6=88=96=E5=89=94=E9=99=A4=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E5=86=85=E7=9A=84=E5=85=B3=E7=B3=BB=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- properties/default_config.properties | 2 +- properties/service_flow_config.properties | 4 +- .../com/zdjizhi/utils/hbase/HBaseUtils.java | 49 +++++++++++-------- 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index f2eb9c1..7d282d6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 211116-jackson + 211206-radius log-completion-schema http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties index 99d8c79..71f83b6 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -42,7 +42,7 @@ kafka.pin=galaxy2019 #====================Topology Default====================# #hbase table name -hbase.table.name=subscriber_info +hbase.table.name=tsg_galaxy:relation_framedip_account #邮件默认编码 mail.default.charset=UTF-8 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 23e52db..8acb476 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=10.224.11.14:9094 +source.kafka.servers=10.224.11.11 #管理输出kafka地址 sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224.11.17:9095,10.224.11.18:9095,10.224.11.19:9095,10.224.11.20:9095,10.224.11.21:9095,10.224.11.22:9095,10.224.11.23:9095 @@ -10,7 +10,7 @@ sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224. zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=10.231.12.4:2181 +hbase.zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 710e4b9..de5e149 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -26,13 +26,10 @@ import java.util.concurrent.TimeUnit; public class HBaseUtils { private static final Log logger = LogFactory.get(); - private static Map subIdMap = new ConcurrentHashMap<>(83334); + private static Map subIdMap = new ConcurrentHashMap<>(16); private static Connection connection; private static Long time; - private static String zookeeperIp; - private static String hBaseTable; - private static HBaseUtils hBaseUtils; private static void getInstance() { @@ -44,8 +41,6 @@ public class HBaseUtils { * 构造函数-新 */ private HBaseUtils() { - zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS; - hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME; //获取连接 getConnection(); //拉取所有 @@ -59,7 +54,7 @@ public class HBaseUtils { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); configuration.set("hbase.client.retries.number", "3"); configuration.set("hbase.bulkload.retries.number", "3"); configuration.set("zookeeper.recovery.retry", "3"); @@ -97,21 +92,24 @@ public class HBaseUtils { ResultScanner scanner = null; Scan scan2 = new Scan(); try { - table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); scan2.setTimeRange(startTime, endTime); scanner = table.getScanner(scan2); for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - String key = Bytes.toString(CellUtil.cloneRow(cell)).trim(); - String value = Bytes.toString(CellUtil.cloneValue(cell)).trim(); - if (subIdMap.containsKey(key)) { - if (!value.equals(subIdMap.get(key))) { - subIdMap.put(key, value); + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim(); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim(); + if (acctStatusType == 1) { + if (subIdMap.containsKey(framedIp)) { + boolean same = account.equals(subIdMap.get(framedIp)); + if (!same) { + subIdMap.put(framedIp, account); } } else { - subIdMap.put(key, value); + subIdMap.put(framedIp, account); } + } else if (acctStatusType == 2) { + subIdMap.remove(framedIp); } } Long end = System.currentTimeMillis(); @@ -142,13 +140,15 @@ public class HBaseUtils { private static void getAll() { long begin = System.currentTimeMillis(); try { - Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + if (acctStatusType == 1) { + subIdMap.put(framedIp, account); } } logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); @@ -198,4 +198,13 @@ public class HBaseUtils { } + private static int getAcctStatusType(Result result) { + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + return 1; + } + } + }