diff --git a/src/main/java/com/nis/web/task/SyncRedisToCluster.java b/src/main/java/com/nis/web/task/SyncRedisToCluster.java index d22dda8..8be178f 100644 --- a/src/main/java/com/nis/web/task/SyncRedisToCluster.java +++ b/src/main/java/com/nis/web/task/SyncRedisToCluster.java @@ -25,6 +25,8 @@ import com.nis.web.service.restful.ConfigJedisServiceimpl; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; +import redis.clients.jedis.ScanParams; +import redis.clients.jedis.ScanResult; import redis.clients.jedis.Tuple; import redis.clients.util.JedisClusterCRC16; @@ -42,8 +44,8 @@ public class SyncRedisToCluster { String requestId = UUID.randomUUID().toString(); try { if (lock(requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务 - // keys("EFFECTIVE_RULE*"); - // keys("OBSOLETE_RULE*"); +// keys("EFFECTIVE_RULE*"); +// keys("OBSOLETE_RULE*"); String clusterMaatVersionStr = jedisCluster.get("MAAT_VERSION"); String redisMaatVersionStr = JedisUtils.get("MAAT_VERSION", redisStatisticsRealDBIndex); if (clusterMaatVersionStr != null && !clusterMaatVersionStr.trim().equals("")) { @@ -93,7 +95,7 @@ public class SyncRedisToCluster { Jedis jedis = entry.getValue().getResource(); // 判断非从节点(因为若主从复制,从节点会跟随主节点的变化而变化) if (!jedis.info("replication").contains("role:slave")) { - Set keys = jedis.keys(redisKeyStartWith + "*"); + List keys = getKeyByScan(redisKeyStartWith + "*", jedis); if (keys.size() > 0) { Map> map = new HashMap<>(); for (String key : keys) { @@ -133,16 +135,16 @@ public class SyncRedisToCluster { JedisPool jp = clusterNodes.get(k); Jedis connection = jp.getResource(); try { - keys.addAll(connection.keys(pattern)); + keys.addAll(getKeyByScan(pattern, connection)); } catch (Exception e) { logger.error("从{}获取{}失败,失败原因{}", k, pattern, e); } finally { connection.close();// 用完一定要close这个链接!!! } } - // for (String string : keys) { - // System.out.println(string); - // } + for (String string : keys) { + System.out.println(string); + } logger.debug("Keys gotten!"); return keys; } @@ -168,14 +170,38 @@ public class SyncRedisToCluster { syncData(null, null, version); } - public void syncData(Double min, Double max, String verion) { + public void syncData(Double min, Double max, String verionStr) { Jedis resource = JedisUtils.getResource(redisStatisticsRealDBIndex); Set zrangeByScoreWithScores = null; if (min == null && max == null) { - zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", "-inf", "+inf");// 获取所有的maat_update_status + int version = Integer.parseInt(verionStr); + int maxVersion = 1000; + int count = version / maxVersion; + if ((version % maxVersion) != 0) { + count++; + } + min = 0D; + for (int i = 0; i < count; i++) { + int start = i * maxVersion + 1; + int end = (i + 1) * maxVersion; + if (end > version) { + end = version; + } + min = Double.parseDouble(start + ""); + max = Double.parseDouble(end + ""); + zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", min, max);// 获取所有的maat_update_status + syncData(zrangeByScoreWithScores); + } } else { zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", min, max);// 获取增量的数据 + syncData(zrangeByScoreWithScores); } + jedisCluster.set("MAAT_VERSION", verionStr); + logger.info("更新了redis集群中的MAAT_VERSION,更新后值是{}", verionStr); + logger.info("向redis集群同步数据成功"); + } + + private void syncData(Set zrangeByScoreWithScores) { for (Tuple tuple : zrangeByScoreWithScores) { String key = tuple.getElement(); String zset = key; @@ -194,9 +220,6 @@ public class SyncRedisToCluster { logger.debug("向redis集群中更新了MAAT_UPDATE_STATUS,内容是{},SCORES是{}", zset.toUpperCase(), score); // jedisCluster.incr("MAAT_VERSION"); } - jedisCluster.set("MAAT_VERSION", verion); - logger.info("更新了redis集群中的MAAT_VERSION,更新后值是{}", verion); - logger.info("向redis集群同步数据成功"); } // 设置成功返回的结果OK @@ -240,4 +263,19 @@ public class SyncRedisToCluster { } return false; } + + public List getKeyByScan(String pattern, Jedis resource) { + List list = new ArrayList<>(); + int count = 1000; + String cursor = ScanParams.SCAN_POINTER_START; + ScanParams scanParams = new ScanParams(); + scanParams.count(count); + scanParams.match(pattern); + do { + ScanResult scanResult = resource.scan(cursor, scanParams); + list.addAll(scanResult.getResult()); + cursor = scanResult.getStringCursor(); + } while (!"0".equals(cursor)); + return list; + } }