更改redis集群连接方式并优化

This commit is contained in:
RenKaiGe-Office
2018-09-12 17:02:19 +08:00
parent 4d04a0cea7
commit 2d98ed2dde
4 changed files with 71 additions and 127 deletions

View File

@@ -1,103 +0,0 @@
package com.nis.util;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
public class JedisClusterFactory implements FactoryBean<JedisCluster>, InitializingBean {
private Resource addressConfig;
private String addressKeyPrefix;
private JedisCluster jedisCluster;
private Integer timeout;
private Integer maxRedirections;
private GenericObjectPoolConfig genericObjectPoolConfig;
private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$");
@Override
public JedisCluster getObject() throws Exception {
return jedisCluster;
}
@Override
public Class<? extends JedisCluster> getObjectType() {
return (this.jedisCluster != null ? this.jedisCluster.getClass() : JedisCluster.class);
}
@Override
public boolean isSingleton() {
return true;
}
private Set<HostAndPort> parseHostAndPort() throws Exception {
try {
Properties prop = new Properties();
prop.load(this.addressConfig.getInputStream());
Set<HostAndPort> haps = new HashSet<HostAndPort>();
for (Object key : prop.keySet()) {
if (!((String) key).startsWith(addressKeyPrefix)) {
continue;
}
String val = (String) prop.get(key);
boolean isIpPort = p.matcher(val).matches();
if (!isIpPort) {
throw new IllegalArgumentException("ip 或 port 不合法");
}
String[] ipAndPort = val.split(":");
HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
haps.add(hap);
}
return haps;
} catch (IllegalArgumentException ex) {
throw ex;
} catch (Exception ex) {
throw new Exception("解析 jedis 配置文件失败", ex);
}
}
@Override
public void afterPropertiesSet() throws Exception {
Set<HostAndPort> haps = this.parseHostAndPort();
jedisCluster = new JedisCluster(haps, timeout, maxRedirections, genericObjectPoolConfig);
}
public void setAddressConfig(Resource addressConfig) {
this.addressConfig = addressConfig;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public void setMaxRedirections(int maxRedirections) {
this.maxRedirections = maxRedirections;
}
public void setAddressKeyPrefix(String addressKeyPrefix) {
this.addressKeyPrefix = addressKeyPrefix;
}
public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) {
this.genericObjectPoolConfig = genericObjectPoolConfig;
}
}

View File

@@ -11,6 +11,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -22,6 +23,7 @@ import com.nis.web.service.restful.ConfigJedisServiceimpl;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.Tuple;
import redis.clients.util.JedisClusterCRC16;
@@ -33,13 +35,15 @@ public class SyncRedisToCluster {
@Autowired
private JedisCluster jedisCluster;
// @Autowired
// private JedisSentinelPool jedisSentinelPool;
// @Scheduled(cron = "0/3 * * * * ?")
@Scheduled(cron = "${syncRedisToClusterCron}")
public void syncRedisToCluster() {
try {
// keys("EFFECTIVE_RULE*");
// keys("OBSOLETE_RULE*");
// keys("EFFECTIVE_RULE*");
// keys("OBSOLETE_RULE*");
String clusterMaatVersionStr = jedisCluster.get("MAAT_VERSION");
String redisMaatVersionStr = JedisUtils.get("MAAT_VERSION", redisStatisticsRealDBIndex);
if (clusterMaatVersionStr != null && !clusterMaatVersionStr.trim().equals("")) {
@@ -49,8 +53,8 @@ public class SyncRedisToCluster {
logger.info("redis集群中的MAAT_VERSION为大于配置库中的MAAT_VERSION,开始执行全量同步");
syncAllData(redisMaatVersionStr);
} else if (redisMaatVersion > clusterMaatVersion) {// 获取增量的数据
logger.info("redis集群中的MAAT_VERSION为小于配置库中的MAAT_VERSION,开始执行增量同步,score是" + clusterMaatVersion + "-"
+ redisMaatVersion);
logger.info("redis集群中的MAAT_VERSION为小于配置库中的MAAT_VERSION,开始执行增量同步,score是{}-{}", clusterMaatVersion,
+redisMaatVersion);
syncData(clusterMaatVersion.doubleValue(), redisMaatVersion.doubleValue(), redisMaatVersionStr);
} else {
logger.info("redis集群中的MAAT_VERSION与配置库中的MAAT_VERSION相等,暂不执行配置同步操作");
@@ -60,7 +64,7 @@ public class SyncRedisToCluster {
syncAllData(redisMaatVersionStr);
}
} catch (Exception e) {
logger.error("同步配置库配置到3A-redisCluster失败,失败原因:" + ExceptionUtil.getExceptionMsg(e));
logger.error("同步配置库配置到3A-redisCluster失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e));
}
}
@@ -68,6 +72,7 @@ public class SyncRedisToCluster {
try {
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) {
Jedis jedis = entry.getValue().getResource();
// 判断非从节点(因为若主从复制,从节点会跟随主节点的变化而变化)
if (!jedis.info("replication").contains("role:slave")) {
@@ -92,7 +97,7 @@ public class SyncRedisToCluster {
.toArray(new String[integerListEntry.getValue().size()]));
// jedisCluster.del(integerListEntry.getValue()
// .toArray(new String[integerListEntry.getValue().size()]));
logger.debug("从redis集群{}删除key={},成功", entry.getKey(),integerListEntry.getValue());
}
}
}
@@ -118,9 +123,9 @@ public class SyncRedisToCluster {
connection.close();// 用完一定要close这个链接
}
}
// for (String string : keys) {
// System.out.println(string);
// }
// for (String string : keys) {
// System.out.println(string);
// }
logger.debug("Keys gotten!");
return keys;
}
@@ -162,11 +167,14 @@ public class SyncRedisToCluster {
String val = JedisUtils.get(key, redisStatisticsRealDBIndex);
if (val != null && !val.trim().equals("")) {
jedisCluster.set(key, val);
logger.debug("向redis集群中插入了一条数据key是{},value是{}", key.toUpperCase(), val);
}
jedisCluster.zadd("MAAT_UPDATE_STATUS", score, zset);
logger.debug("向redis集群中更新了MAAT_UPDATE_STATUS,内容是{},SCORES是{}", zset.toUpperCase(), score);
// jedisCluster.incr("MAAT_VERSION");
}
jedisCluster.set("MAAT_VERSION", verion);
logger.info("更新了redis集群中的MAAT_VERSION,更新后值是{}", verion);
logger.info("向redis集群同步数据成功");
}