diff --git a/pom.xml b/pom.xml index f2eb9c1..7d282d6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.zdjizhi log-completion-schema - 211116-jackson + 211206-radius log-completion-schema http://www.example.com diff --git a/properties/default_config.properties b/properties/default_config.properties index 99d8c79..71f83b6 100644 --- a/properties/default_config.properties +++ b/properties/default_config.properties @@ -42,7 +42,7 @@ kafka.pin=galaxy2019 #====================Topology Default====================# #hbase table name -hbase.table.name=subscriber_info +hbase.table.name=tsg_galaxy:relation_framedip_account #邮件默认编码 mail.default.charset=UTF-8 diff --git a/properties/service_flow_config.properties b/properties/service_flow_config.properties index 23e52db..8acb476 100644 --- a/properties/service_flow_config.properties +++ b/properties/service_flow_config.properties @@ -1,7 +1,7 @@ #--------------------------------地址配置------------------------------# #管理kafka地址 -source.kafka.servers=10.224.11.14:9094 +source.kafka.servers=10.224.11.11 #管理输出kafka地址 sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224.11.17:9095,10.224.11.18:9095,10.224.11.19:9095,10.224.11.20:9095,10.224.11.21:9095,10.224.11.22:9095,10.224.11.23:9095 @@ -10,7 +10,7 @@ sink.kafka.servers=10.224.11.14:9095,10.224.11.15:9095,10.224.11.16:9095,10.224. zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #hbase zookeeper地址 用于连接HBase -hbase.zookeeper.servers=10.231.12.4:2181 +hbase.zookeeper.servers=10.224.11.11:2181,10.224.11.12:2181,10.224.11.13:2181 #--------------------------------HTTP/定位库------------------------------# #定位库地址 diff --git a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java index 710e4b9..de5e149 100644 --- a/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java +++ b/src/main/java/com/zdjizhi/utils/hbase/HBaseUtils.java @@ -26,13 +26,10 @@ import java.util.concurrent.TimeUnit; public class HBaseUtils { private static final Log logger = LogFactory.get(); - private static Map subIdMap = new ConcurrentHashMap<>(83334); + private static Map subIdMap = new ConcurrentHashMap<>(16); private static Connection connection; private static Long time; - private static String zookeeperIp; - private static String hBaseTable; - private static HBaseUtils hBaseUtils; private static void getInstance() { @@ -44,8 +41,6 @@ public class HBaseUtils { * 构造函数-新 */ private HBaseUtils() { - zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS; - hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME; //获取连接 getConnection(); //拉取所有 @@ -59,7 +54,7 @@ public class HBaseUtils { // 管理Hbase的配置信息 Configuration configuration = HBaseConfiguration.create(); // 设置zookeeper节点 - configuration.set("hbase.zookeeper.quorum", zookeeperIp); + configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS); configuration.set("hbase.client.retries.number", "3"); configuration.set("hbase.bulkload.retries.number", "3"); configuration.set("zookeeper.recovery.retry", "3"); @@ -97,21 +92,24 @@ public class HBaseUtils { ResultScanner scanner = null; Scan scan2 = new Scan(); try { - table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); scan2.setTimeRange(startTime, endTime); scanner = table.getScanner(scan2); for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - String key = Bytes.toString(CellUtil.cloneRow(cell)).trim(); - String value = Bytes.toString(CellUtil.cloneValue(cell)).trim(); - if (subIdMap.containsKey(key)) { - if (!value.equals(subIdMap.get(key))) { - subIdMap.put(key, value); + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim(); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim(); + if (acctStatusType == 1) { + if (subIdMap.containsKey(framedIp)) { + boolean same = account.equals(subIdMap.get(framedIp)); + if (!same) { + subIdMap.put(framedIp, account); } } else { - subIdMap.put(key, value); + subIdMap.put(framedIp, account); } + } else if (acctStatusType == 2) { + subIdMap.remove(framedIp); } } Long end = System.currentTimeMillis(); @@ -142,13 +140,15 @@ public class HBaseUtils { private static void getAll() { long begin = System.currentTimeMillis(); try { - Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable)); + Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME)); Scan scan2 = new Scan(); ResultScanner scanner = table.getScanner(scan2); for (Result result : scanner) { - Cell[] cells = result.rawCells(); - for (Cell cell : cells) { - subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell))); + int acctStatusType = getAcctStatusType(result); + String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))); + String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))); + if (acctStatusType == 1) { + subIdMap.put(framedIp, account); } } logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size()); @@ -198,4 +198,13 @@ public class HBaseUtils { } + private static int getAcctStatusType(Result result) { + boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")); + if (hasType) { + return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"))); + } else { + return 1; + } + } + }