1:为实时统计配置中jediscluster改为每次使用都获取一个新的
2:添加根据配置id获取编译,组,域等信息的接口 3:修改获取或释放redis分布式锁后,将redis连接释放 4:解决冲突
This commit is contained in:
@@ -11,15 +11,17 @@ import java.util.UUID;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.nis.restful.RestBusinessCode;
|
||||
import com.nis.restful.ServiceRuntimeException;
|
||||
import com.nis.util.Configurations;
|
||||
import com.nis.util.Constants;
|
||||
import com.nis.util.ExceptionUtil;
|
||||
import com.nis.util.JedisUtils;
|
||||
import com.nis.web.service.SpringContextHolder;
|
||||
import com.nis.web.service.restful.ConfigJedisServiceimpl;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
@@ -28,6 +30,7 @@ import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.ScanParams;
|
||||
import redis.clients.jedis.ScanResult;
|
||||
import redis.clients.jedis.Tuple;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
import redis.clients.util.JedisClusterCRC16;
|
||||
|
||||
@Component
|
||||
@@ -36,14 +39,46 @@ public class SyncRedisToCluster {
|
||||
private static Logger logger = LoggerFactory.getLogger(ConfigJedisServiceimpl.class);
|
||||
private static int redisStatisticsRealDBIndex = Configurations.getIntProperty("redisStatisticsRealDBIndex", 14);
|
||||
|
||||
@Autowired
|
||||
private JedisCluster jedisCluster;
|
||||
/**
|
||||
* 每次使用获取新连接,避免集群某些机器宕机后影响连接使用,注意需要在applicationContext-redis.xml中修改jedisCluster的scope为
|
||||
* prototype
|
||||
*
|
||||
* @return
|
||||
* @throws JedisException
|
||||
*/
|
||||
public JedisCluster getResource() throws JedisException {
|
||||
JedisCluster jedisCluster = SpringContextHolder.getBean(JedisCluster.class);
|
||||
if (jedisCluster == null) {
|
||||
throw new ServiceRuntimeException("无法获取redis-cluster连接,请联系管理员检查程序",
|
||||
RestBusinessCode.CannotConnectionRedis.getValue());
|
||||
}
|
||||
return jedisCluster;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭集群连接
|
||||
*
|
||||
* @param jedisCluster
|
||||
*/
|
||||
private void closeConn(JedisCluster jedisCluster) {
|
||||
if (jedisCluster != null) {
|
||||
try {
|
||||
jedisCluster.close();
|
||||
} catch (Exception e) {
|
||||
throw new ServiceRuntimeException("释放redis-cluster连接失败",
|
||||
RestBusinessCode.CannotConnectionRedis.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(cron = "${syncRedisToClusterCron}")
|
||||
public void syncRedisToCluster() {
|
||||
JedisCluster jedisCluster = getResource();
|
||||
System.out.println(jedisCluster);
|
||||
String requestId = UUID.randomUUID().toString();
|
||||
try {
|
||||
if (lock(requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
|
||||
if (lock(jedisCluster, requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
|
||||
// keys("EFFECTIVE_RULE*");
|
||||
// keys("OBSOLETE_RULE*");
|
||||
String clusterMaatVersionStr = jedisCluster.get("MAAT_VERSION");
|
||||
@@ -54,11 +89,11 @@ public class SyncRedisToCluster {
|
||||
Integer redisMaatVersion = Integer.valueOf(redisMaatVersionStr);
|
||||
if (redisMaatVersion < clusterMaatVersion) {// 如果主从库比集群库的版本号小则下发全量
|
||||
logger.info("redis集群中的MAAT_VERSION为大于配置库中的MAAT_VERSION,开始执行全量同步");
|
||||
syncAllData(redisMaatVersionStr);
|
||||
syncAllData(jedisCluster, redisMaatVersionStr);
|
||||
} else if (redisMaatVersion > clusterMaatVersion) {// 获取增量的数据
|
||||
logger.info("redis集群中的MAAT_VERSION为小于配置库中的MAAT_VERSION,开始执行增量同步,score是{}-{}",
|
||||
clusterMaatVersion, +redisMaatVersion);
|
||||
syncData(clusterMaatVersion.doubleValue(), redisMaatVersion.doubleValue(),
|
||||
syncData(jedisCluster, clusterMaatVersion.doubleValue(), redisMaatVersion.doubleValue(),
|
||||
redisMaatVersionStr);
|
||||
} else {
|
||||
logger.info("redis集群中的MAAT_VERSION与配置库中的MAAT_VERSION相等,暂不执行配置同步操作");
|
||||
@@ -66,13 +101,13 @@ public class SyncRedisToCluster {
|
||||
} else {
|
||||
logger.info("redis配置库中MAAT_VERSION为null,但是redis集群中的MAAT_VERSION为{},集群与配置库的数据不同步,开始删除集群中的配置",
|
||||
clusterMaatVersionStr);
|
||||
delClusterData();
|
||||
delClusterData(jedisCluster);
|
||||
}
|
||||
} else {
|
||||
if (redisMaatVersionStr != null && !redisMaatVersionStr.trim().equals("")) {
|
||||
logger.info("redis配置库中的MAAT_VERSION为{},redis集群中的MAAT_VERSION为null,开始执行全量同步",
|
||||
redisMaatVersionStr);
|
||||
syncAllData(redisMaatVersionStr);
|
||||
syncAllData(jedisCluster, redisMaatVersionStr);
|
||||
} else {
|
||||
logger.info("redis配置库中和redis集群中的MAAT_VERSION都为null,暂时不执行全量同步");
|
||||
}
|
||||
@@ -83,12 +118,13 @@ public class SyncRedisToCluster {
|
||||
} catch (Exception e) {
|
||||
logger.error("同步配置库配置到3A-redisCluster失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e));
|
||||
} finally {
|
||||
unlock(requestId);
|
||||
unlock(jedisCluster, requestId);
|
||||
closeConn(jedisCluster);
|
||||
}
|
||||
}
|
||||
|
||||
// 删除之前的key
|
||||
public void deleteRedisKeyStartWith(String redisKeyStartWith) {
|
||||
public void deleteRedisKeyStartWith(JedisCluster jedisCluster, String redisKeyStartWith) {
|
||||
try {
|
||||
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
|
||||
for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
|
||||
@@ -126,7 +162,7 @@ public class SyncRedisToCluster {
|
||||
}
|
||||
}
|
||||
|
||||
public TreeSet<String> keys(String pattern) {
|
||||
public TreeSet<String> keys(JedisCluster jedisCluster, String pattern) {
|
||||
TreeSet<String> keys = new TreeSet<>();
|
||||
// 获取所有的节点
|
||||
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
|
||||
@@ -150,7 +186,7 @@ public class SyncRedisToCluster {
|
||||
return keys;
|
||||
}
|
||||
|
||||
public void delClusterData() {
|
||||
public void delClusterData(JedisCluster jedisCluster) {
|
||||
// 清空集群库配置数据
|
||||
if (jedisCluster.exists("MAAT_UPDATE_STATUS")) {
|
||||
jedisCluster.del("MAAT_UPDATE_STATUS");
|
||||
@@ -160,18 +196,18 @@ public class SyncRedisToCluster {
|
||||
jedisCluster.del("MAAT_VERSION");
|
||||
logger.info("删除MAAT_VERSION成功");
|
||||
}
|
||||
deleteRedisKeyStartWith("EFFECTIVE_RULE*");
|
||||
deleteRedisKeyStartWith(jedisCluster, "EFFECTIVE_RULE*");
|
||||
logger.info("删除EFFECTIVE_RULE*成功");
|
||||
deleteRedisKeyStartWith("OBSOLETE_RULE*");
|
||||
deleteRedisKeyStartWith(jedisCluster, "OBSOLETE_RULE*");
|
||||
logger.info("删除OBSOLETE_RULE*成功");
|
||||
}
|
||||
|
||||
public void syncAllData(String version) {
|
||||
delClusterData();
|
||||
syncData(null, null, version);
|
||||
public void syncAllData(JedisCluster jedisCluster, String version) {
|
||||
delClusterData(jedisCluster);
|
||||
syncData(jedisCluster, null, null, version);
|
||||
}
|
||||
|
||||
public void syncData(Double min, Double max, String verionStr) {
|
||||
public void syncData(JedisCluster jedisCluster, Double min, Double max, String verionStr) {
|
||||
Jedis resource = JedisUtils.getResource(redisStatisticsRealDBIndex);
|
||||
Set<Tuple> zrangeByScoreWithScores = null;
|
||||
if (min == null && max == null) {
|
||||
@@ -190,20 +226,20 @@ public class SyncRedisToCluster {
|
||||
}
|
||||
min = Double.parseDouble(start + "");
|
||||
max = Double.parseDouble(end + "");
|
||||
//分批获取,避免一次性获取太多,造成redis阻塞
|
||||
// 分批获取,避免一次性获取太多,造成redis阻塞
|
||||
zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", min, max);// 获取所有的maat_update_status
|
||||
syncData(zrangeByScoreWithScores);
|
||||
syncData(jedisCluster, zrangeByScoreWithScores);
|
||||
}
|
||||
} else {
|
||||
zrangeByScoreWithScores = resource.zrangeByScoreWithScores("MAAT_UPDATE_STATUS", min, max);// 获取增量的数据
|
||||
syncData(zrangeByScoreWithScores);
|
||||
syncData(jedisCluster, zrangeByScoreWithScores);
|
||||
}
|
||||
jedisCluster.set("MAAT_VERSION", verionStr);
|
||||
logger.info("更新了redis集群中的MAAT_VERSION,更新后值是{}", verionStr);
|
||||
logger.info("向redis集群同步数据成功");
|
||||
}
|
||||
|
||||
private void syncData(Set<Tuple> zrangeByScoreWithScores) {
|
||||
private void syncData(JedisCluster jedisCluster, Set<Tuple> zrangeByScoreWithScores) {
|
||||
for (Tuple tuple : zrangeByScoreWithScores) {
|
||||
String key = tuple.getElement();
|
||||
String zset = key;
|
||||
@@ -239,7 +275,7 @@ public class SyncRedisToCluster {
|
||||
* @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁
|
||||
* @return 是否获取成功
|
||||
*/
|
||||
public Boolean lock(String requestId) {
|
||||
public Boolean lock(JedisCluster jedisCluster, String requestId) {
|
||||
String key = "configSyncDistributedLock";
|
||||
String var1 = jedisCluster.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, Constants.REDISLOCKTIME);
|
||||
if (LOCK_SUCCESS.equals(var1)) {
|
||||
@@ -253,7 +289,7 @@ public class SyncRedisToCluster {
|
||||
*
|
||||
* @param requestId
|
||||
*/
|
||||
protected boolean unlock(String requestId) {
|
||||
protected boolean unlock(JedisCluster jedisCluster, String requestId) {
|
||||
String key = "configSyncDistributedLock";
|
||||
// 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】
|
||||
String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";
|
||||
|
||||
Reference in New Issue
Block a user