优化Subscriber数据缓存获取策略:

1:规范关系命名及获取方式。
2:根据acct_status_type状态添加或剔除缓存内的关系。
This commit is contained in:
qidaijie
2021-12-06 15:42:44 +03:00
parent 150cf4c367
commit f12e8079c2
4 changed files with 33 additions and 24 deletions

View File

@@ -26,13 +26,10 @@ import java.util.concurrent.TimeUnit;
public class HBaseUtils {
private static final Log logger = LogFactory.get();
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(83334);
private static Map<String, String> subIdMap = new ConcurrentHashMap<>(16);
private static Connection connection;
private static Long time;
private static String zookeeperIp;
private static String hBaseTable;
private static HBaseUtils hBaseUtils;
private static void getInstance() {
@@ -44,8 +41,6 @@ public class HBaseUtils {
* 构造函数-新
*/
private HBaseUtils() {
zookeeperIp = FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS;
hBaseTable = FlowWriteConfig.HBASE_TABLE_NAME;
//获取连接
getConnection();
//拉取所有
@@ -59,7 +54,7 @@ public class HBaseUtils {
// 管理Hbase的配置信息
Configuration configuration = HBaseConfiguration.create();
// 设置zookeeper节点
configuration.set("hbase.zookeeper.quorum", zookeeperIp);
configuration.set("hbase.zookeeper.quorum", FlowWriteConfig.HBASE_ZOOKEEPER_SERVERS);
configuration.set("hbase.client.retries.number", "3");
configuration.set("hbase.bulkload.retries.number", "3");
configuration.set("zookeeper.recovery.retry", "3");
@@ -97,21 +92,24 @@ public class HBaseUtils {
ResultScanner scanner = null;
Scan scan2 = new Scan();
try {
table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
scan2.setTimeRange(startTime, endTime);
scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String key = Bytes.toString(CellUtil.cloneRow(cell)).trim();
String value = Bytes.toString(CellUtil.cloneValue(cell)).trim();
if (subIdMap.containsKey(key)) {
if (!value.equals(subIdMap.get(key))) {
subIdMap.put(key, value);
int acctStatusType = getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip"))).trim();
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account"))).trim();
if (acctStatusType == 1) {
if (subIdMap.containsKey(framedIp)) {
boolean same = account.equals(subIdMap.get(framedIp));
if (!same) {
subIdMap.put(framedIp, account);
}
} else {
subIdMap.put(key, value);
subIdMap.put(framedIp, account);
}
} else if (acctStatusType == 2) {
subIdMap.remove(framedIp);
}
}
Long end = System.currentTimeMillis();
@@ -142,13 +140,15 @@ public class HBaseUtils {
private static void getAll() {
long begin = System.currentTimeMillis();
try {
Table table = connection.getTable(TableName.valueOf("sub:" + hBaseTable));
Table table = connection.getTable(TableName.valueOf(FlowWriteConfig.HBASE_TABLE_NAME));
Scan scan2 = new Scan();
ResultScanner scanner = table.getScanner(scan2);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
subIdMap.put(Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
int acctStatusType = getAcctStatusType(result);
String framedIp = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("framed_ip")));
String account = Bytes.toString(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("account")));
if (acctStatusType == 1) {
subIdMap.put(framedIp, account);
}
}
logger.warn("HBaseUtils Get fullAmount List size->subIdMap.size(): " + subIdMap.size());
@@ -198,4 +198,13 @@ public class HBaseUtils {
}
private static int getAcctStatusType(Result result) {
boolean hasType = result.containsColumn(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type"));
if (hasType) {
return Bytes.toInt(result.getValue(Bytes.toBytes("radius"), Bytes.toBytes("acct_status_type")));
} else {
return 1;
}
}
}