项目初始导入
This commit is contained in:
235
src/main/java/com/nis/util/redis/JedisClusterPipeline.java
Normal file
235
src/main/java/com/nis/util/redis/JedisClusterPipeline.java
Normal file
@@ -0,0 +1,235 @@
|
||||
/**
|
||||
* Copyright: Copyright (c) 2015
|
||||
*
|
||||
* @author youaremoon
|
||||
* @date 2016年6月25日
|
||||
* @version V1.0
|
||||
*/
|
||||
package com.nis.util.redis;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
|
||||
import redis.clients.jedis.BinaryJedisCluster;
|
||||
import redis.clients.jedis.Client;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
import redis.clients.jedis.JedisClusterConnectionHandler;
|
||||
import redis.clients.jedis.JedisClusterInfoCache;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
|
||||
import redis.clients.jedis.PipelineBase;
|
||||
import redis.clients.jedis.exceptions.JedisMovedDataException;
|
||||
import redis.clients.jedis.exceptions.JedisRedirectionException;
|
||||
import redis.clients.util.JedisClusterCRC16;
|
||||
import redis.clients.util.SafeEncoder;
|
||||
|
||||
/**
|
||||
* 在集群模式下提供批量操作的功能。 <br/>
|
||||
* 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),
|
||||
* 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />
|
||||
* 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>
|
||||
* 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />
|
||||
* 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />
|
||||
*
|
||||
* 该类非线程安全
|
||||
*
|
||||
* @author youaremoon
|
||||
* @version
|
||||
* @since Ver 1.1
|
||||
*/
|
||||
public class JedisClusterPipeline extends PipelineBase implements Closeable {
|
||||
// private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);
|
||||
|
||||
// 部分字段没有对应的获取方法,只能采用反射来做
|
||||
// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
|
||||
private static final Field FIELD_CONNECTION_HANDLER;
|
||||
private static final Field FIELD_CACHE;
|
||||
static {
|
||||
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
|
||||
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
|
||||
}
|
||||
|
||||
private JedisSlotBasedConnectionHandler connectionHandler;
|
||||
private JedisClusterInfoCache clusterInfoCache;
|
||||
private Queue<Client> clients = new LinkedList<Client>(); // 根据顺序存储每个命令对应的Client
|
||||
private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用于缓存连接
|
||||
private boolean hasDataInBuf = false; // 是否有数据在缓存区
|
||||
|
||||
/**
|
||||
* 根据jedisCluster实例生成对应的JedisClusterPipeline
|
||||
* @param
|
||||
* @return
|
||||
*/
|
||||
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
|
||||
JedisClusterPipeline pipeline = new JedisClusterPipeline();
|
||||
pipeline.setJedisCluster(jedisCluster);
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
public JedisClusterPipeline() {
|
||||
}
|
||||
|
||||
public void setJedisCluster(JedisCluster jedis) {
|
||||
connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
|
||||
clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
|
||||
}
|
||||
|
||||
/**
|
||||
* 刷新集群信息,当集群信息发生变更时调用
|
||||
* @param
|
||||
* @return
|
||||
*/
|
||||
public void refreshCluster() {
|
||||
connectionHandler.renewSlotCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
|
||||
*/
|
||||
public void sync() {
|
||||
innerSync(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步读取所有数据 并按命令顺序返回一个列表
|
||||
*
|
||||
* @return 按照命令的顺序返回所有的数据
|
||||
*/
|
||||
public List<Object> syncAndReturnAll() {
|
||||
List<Object> responseList = new ArrayList<Object>();
|
||||
|
||||
innerSync(responseList);
|
||||
|
||||
return responseList;
|
||||
}
|
||||
|
||||
private void innerSync(List<Object> formatted) {
|
||||
HashSet<Client> clientSet = new HashSet<Client>();
|
||||
|
||||
try {
|
||||
for (Client client : clients) {
|
||||
// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。
|
||||
// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了
|
||||
Object data = generateResponse(client.getOne()).get();
|
||||
if (null != formatted) {
|
||||
formatted.add(data);
|
||||
}
|
||||
|
||||
// size相同说明所有的client都已经添加,就不用再调用add方法了
|
||||
if (clientSet.size() != jedisMap.size()) {
|
||||
clientSet.add(client);
|
||||
}
|
||||
}
|
||||
} catch (JedisRedirectionException jre) {
|
||||
if (jre instanceof JedisMovedDataException) {
|
||||
// if MOVED redirection occurred, rebuilds cluster's slot cache,
|
||||
// recommended by Redis cluster specification
|
||||
refreshCluster();
|
||||
}
|
||||
|
||||
throw jre;
|
||||
} finally {
|
||||
if (clientSet.size() != jedisMap.size()) {
|
||||
// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
|
||||
for (Jedis jedis : jedisMap.values()) {
|
||||
if (clientSet.contains(jedis.getClient())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
flushCachedData(jedis);
|
||||
}
|
||||
}
|
||||
|
||||
hasDataInBuf = false;
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
clean();
|
||||
|
||||
clients.clear();
|
||||
|
||||
for (Jedis jedis : jedisMap.values()) {
|
||||
if (hasDataInBuf) {
|
||||
flushCachedData(jedis);
|
||||
}
|
||||
|
||||
jedis.close();
|
||||
}
|
||||
|
||||
jedisMap.clear();
|
||||
|
||||
hasDataInBuf = false;
|
||||
}
|
||||
|
||||
private void flushCachedData(Jedis jedis) {
|
||||
try {
|
||||
jedis.getClient().getAll();
|
||||
} catch (RuntimeException ex) {
|
||||
// 其中一个client出问题,后面出问题的几率较大
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Client getClient(String key) {
|
||||
byte[] bKey = SafeEncoder.encode(key);
|
||||
|
||||
return getClient(bKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Client getClient(byte[] key) {
|
||||
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
|
||||
|
||||
Client client = jedis.getClient();
|
||||
clients.add(client);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
private Jedis getJedis(int slot) {
|
||||
JedisPool pool = clusterInfoCache.getSlotPool(slot);
|
||||
|
||||
// 根据pool从缓存中获取Jedis
|
||||
Jedis jedis = jedisMap.get(pool);
|
||||
if (null == jedis) {
|
||||
jedis = pool.getResource();
|
||||
jedisMap.put(pool, jedis);
|
||||
}
|
||||
|
||||
hasDataInBuf = true;
|
||||
return jedis;
|
||||
}
|
||||
|
||||
private static Field getField(Class<?> cls, String fieldName) {
|
||||
try {
|
||||
Field field = cls.getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
|
||||
return field;
|
||||
} catch (NoSuchFieldException | SecurityException e) {
|
||||
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked" })
|
||||
private static <T> T getValue(Object obj, Field field) {
|
||||
try {
|
||||
return (T)field.get(obj);
|
||||
} catch (IllegalArgumentException | IllegalAccessException e) {
|
||||
// LOGGER.error("get value fail", e);
|
||||
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
458
src/main/java/com/nis/util/redis/RedisDao.java
Normal file
458
src/main/java/com/nis/util/redis/RedisDao.java
Normal file
@@ -0,0 +1,458 @@
|
||||
/**
|
||||
*@Title: RedisUtil.java
|
||||
*@Package com.nis.demo
|
||||
*@Description TODO
|
||||
*@author dell
|
||||
*@date 2016年9月13日 下午3:05:45
|
||||
*@version 版本号
|
||||
*/
|
||||
package com.nis.util.redis;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.nis.util.BeanHelper;
|
||||
import com.nis.util.Configurations;
|
||||
import com.nis.util.Constants;
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
|
||||
/**
|
||||
* @ClassName: RedisUtil.java
|
||||
* @Description: TODO
|
||||
* @author (wx)
|
||||
* @date 2016年9月13日 下午3:05:45
|
||||
* @version V1.0
|
||||
*/
|
||||
public class RedisDao {
|
||||
protected final Logger logger = Logger.getLogger(RedisDao.class);
|
||||
private JedisCluster cluster=null;
|
||||
private JedisClusterPipeline jcp=null;
|
||||
|
||||
public void initCluster(int type) throws Exception{
|
||||
if(cluster==null && Constants.IS_OPEN_REDIS){
|
||||
//初始化连接池配置
|
||||
GenericObjectPoolConfig config= getPoolConfig();
|
||||
//初始化节点信息
|
||||
Set<HostAndPort> jedisHostAndNodeSet= initNodes();
|
||||
int connectionTimeout=Configurations.getIntProperty("redis.cluster.connectiontimeout", 100);
|
||||
int soTimeout=Configurations.getIntProperty("redis.cluster.sotimeout", 100);
|
||||
int maxAttempts=Configurations.getIntProperty("redis.cluster.maxattempts", 3);
|
||||
//构造单例集群实例,其内部由多个节点构成,每个节点都有自己的连接池
|
||||
cluster=new JedisCluster(jedisHostAndNodeSet,connectionTimeout,soTimeout,maxAttempts,config);
|
||||
}
|
||||
logger.info("--------------------init redis cluster-----------");
|
||||
}
|
||||
/**
|
||||
* getJedisCluster(初始化redis的cluster)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @return
|
||||
* @throws Exception
|
||||
*JedisCluster
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@PostConstruct
|
||||
public JedisCluster getJedisCluster() throws Exception{
|
||||
initCluster(0);
|
||||
return cluster;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* closeJedisCluster(关闭redis cluster的方法,在系统终止时使用)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @throws IOException
|
||||
*void
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@PreDestroy
|
||||
public void closeJedisCluster() throws IOException{
|
||||
if(cluster!=null)
|
||||
cluster.close();
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getPoolConfig(初始化连接池的配置,这里可以设置很多参数的,不过目前没加)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @return
|
||||
*GenericObjectPoolConfig
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
private GenericObjectPoolConfig getPoolConfig(){
|
||||
GenericObjectPoolConfig config=new GenericObjectPoolConfig();
|
||||
config.setMaxTotal(Configurations.getIntProperty("redis.pool.maxtotal", 500));//整个池的最大值
|
||||
config.setMaxIdle(Configurations.getIntProperty("redis.pool.maxidle", 100));//最大空闲
|
||||
config.setMaxWaitMillis(Configurations.getIntProperty("redis.pool.maxwaitmillis", -1));//获取不到永远等待
|
||||
config.setBlockWhenExhausted(Configurations.getBooleanProperty("redis.pool.blockwhenexhausted", true));
|
||||
config.setNumTestsPerEvictionRun(Configurations.getIntProperty("redis.pool.numtestsperevictionrun", Integer.MAX_VALUE));//always test all idle object
|
||||
config.setTestOnBorrow(Configurations.getBooleanProperty("redis.pool.testonborrow", true));
|
||||
config.setTestOnReturn(Configurations.getBooleanProperty("redis.pool.testonreturn", false));
|
||||
config.setTestWhileIdle(Configurations.getBooleanProperty("redis.pool.testwhileidle", true));//发呆过长时间是否先test一下
|
||||
config.setTimeBetweenEvictionRunsMillis(Configurations.getLongProperty("redis.pool.timebetweenevictionrunsmillis", 60000L));//-1不启动,默认1min一次
|
||||
config.setMinEvictableIdleTimeMillis(Configurations.getLongProperty("redis.pool.minevictableidletimemillis", 60000L));//可发呆的时间,10mins
|
||||
return config;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* initNodes(初始化节点信息)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @return
|
||||
* @throws Exception
|
||||
*Set<HostAndPort>
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
private Set<HostAndPort> initNodes() throws Exception{
|
||||
String hostAndPorts=Configurations.getStringProperty("redis.cluster.host_port", null);
|
||||
if(hostAndPorts==null)
|
||||
throw new RuntimeException("配置文件中redis.cluster.host_port为空!");
|
||||
String[] hostAndPort=hostAndPorts.split(",");
|
||||
Set<HostAndPort> jedisClusterNodes=new HashSet<HostAndPort>();
|
||||
for(String host_port:hostAndPort){
|
||||
String [] _host_port=host_port.split(":");
|
||||
if(_host_port.length!=2)
|
||||
throw new RuntimeException("配置文件中redis.cluster.host_port格式不正确!");
|
||||
HostAndPort node=new HostAndPort(_host_port[0],Integer.parseInt(_host_port[1]));
|
||||
jedisClusterNodes.add(node);
|
||||
}
|
||||
return jedisClusterNodes;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getClusterNodes(获取redis集群中的所有节点,每个节点都是一个连接池)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @return
|
||||
*Map<String,JedisPool>
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public Map<String, JedisPool> getClusterNodes(){
|
||||
return cluster==null?null:cluster.getClusterNodes();
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getJedisClusterPipeline(获取redis cluster的 pipline.由于jedis没有实现,这里是用来一个第三方的pipline)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @return
|
||||
* @throws Exception
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void getJedisClusterPipeline() throws Exception{
|
||||
if(cluster==null) cluster=getJedisCluster();;
|
||||
jcp=JedisClusterPipeline.pipelined(cluster);
|
||||
jcp.refreshCluster();//刷新集群状态
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getJedisClusterPipeline(获取redis cluster的 pipline.由于jedis没有实现,这里是用来一个第三方的pipline)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @return
|
||||
*JedisClusterPipeline
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
// public JedisClusterPipeline getJedisClusterPipeline(JedisCluster cluster){
|
||||
// JedisClusterPipeline jcp=JedisClusterPipeline.pipelined(cluster);
|
||||
// jcp.refreshCluster();
|
||||
// return jcp;
|
||||
// }
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* closeJedisClusterPipeline(关闭pipeline)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param jcp
|
||||
*void
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void closeJedisClusterPipeline(){
|
||||
if(jcp!=null)
|
||||
jcp.close();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* closeJedisClusterPipeline(关闭pipeline)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param jcp
|
||||
*void
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
// public void closeJedisClusterPipeline(JedisClusterPipeline jcp){
|
||||
// if(jcp!=null)
|
||||
// jcp.close();
|
||||
// }
|
||||
/**
|
||||
*
|
||||
* * saveMaps(将对象转换成的Map保存到redis中)
|
||||
* (注意Date转换成long类型的时间戳然后以字符串的形式保存,蛋疼的jedis有问题,除了字符串跟byte[]数组就没别的保存类型了)
|
||||
* @param jcp//pipeLine
|
||||
* @param dataList//数据集合
|
||||
* @param tableName//实体类对应的表或者类名,用于生成唯一的hash key
|
||||
* @param keyField //实体类字段中表示唯一字段的字段名,其值用于生成唯一的hash key
|
||||
* @param clazz //类名
|
||||
* @throws Exception
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void saveMaps(List<? extends Object> dataList ,String tableName,Class clazz,String keyField) throws Exception{
|
||||
logger.info("save maps start");
|
||||
if(cluster==null)cluster=getJedisCluster();
|
||||
if(jcp==null)getJedisClusterPipeline();
|
||||
long start=System.currentTimeMillis();
|
||||
// try{
|
||||
for(Object data:dataList){
|
||||
Map<String,String> dataMap=BeanHelper.transportBean2Map(clazz, data);
|
||||
jcp.hmset((tableName+"_"+dataMap.get(keyField)), dataMap);
|
||||
}
|
||||
jcp.sync();
|
||||
// }catch (Exception e) {
|
||||
// // TODO: handle exception
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
long end=System.currentTimeMillis();
|
||||
jcp.close();
|
||||
jcp=null;
|
||||
logger.info("save maps end, cost:"+(end-start));
|
||||
}
|
||||
/**
|
||||
*
|
||||
* updateMaps(更新redis中的对象)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param jcp
|
||||
* @param dataList
|
||||
* @param tableName
|
||||
* @param clazz
|
||||
* @param keyField
|
||||
* @throws Exception
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void updateMaps(List<? extends Object> dataList ,String tableName,Class clazz,String keyField) throws Exception{
|
||||
logger.info("update maps start");
|
||||
if(cluster==null)cluster=getJedisCluster();
|
||||
if(jcp==null)getJedisClusterPipeline();
|
||||
long start=System.currentTimeMillis();
|
||||
// try{
|
||||
for(Object data:dataList){
|
||||
Map<String,String> dataMap=BeanHelper.transportBean2Map(clazz, data);
|
||||
jcp.hmset((tableName+"_"+dataMap.get(keyField)), dataMap);
|
||||
}
|
||||
jcp.sync();
|
||||
// }catch (Exception e) {
|
||||
// // TODO: handle exception
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
long end=System.currentTimeMillis();
|
||||
jcp.close();
|
||||
jcp=null;
|
||||
logger.info("update maps end, cost:"+(end-start));
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getMapById(获取Id 为xxxx的对象,返回一个Map)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param tableName
|
||||
* @param id
|
||||
* @return
|
||||
*Map<String,String>
|
||||
* @throws Exception
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public Map<String,String> getMapById(String tableName,long id) throws Exception{
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
Map<String, String> result=cluster.hgetAll(tableName+"_"+id);
|
||||
return result;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getMapById(获取Id 为xxxx的对象,返回一个Map)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param cluster
|
||||
* @param tableName
|
||||
* @param id
|
||||
* @return
|
||||
*Map<String,String>
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public Map<String,String> getMapById(JedisCluster cluster,String tableName,long id){
|
||||
Map<String, String> result=cluster.hgetAll(tableName+"_"+id);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void saveList(List<String> jsonList,String sql,int expire) throws Exception{
|
||||
if(cluster==null)cluster=getJedisCluster();
|
||||
if(jcp==null)getJedisClusterPipeline();
|
||||
for(String json :jsonList){
|
||||
jcp.rpush(sql, json);
|
||||
}
|
||||
jcp.expire(sql, expire);
|
||||
jcp.sync();
|
||||
jcp.close();
|
||||
jcp=null;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* getSet(获取redis中的一个set的全部成员)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param key
|
||||
* @return
|
||||
* @throws Exception
|
||||
*Set<String>
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public Set<String> getSet(String key) throws Exception{
|
||||
if(cluster==null)cluster=getJedisCluster();
|
||||
return cluster.smembers(key);
|
||||
}
|
||||
/**
|
||||
*
|
||||
* saveSet(向一个set中添加成员)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param setName
|
||||
* @param values
|
||||
* @throws Exception
|
||||
*void
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void saveSet(String setName,String ...values) throws Exception{
|
||||
if(cluster==null)cluster=getJedisCluster();
|
||||
if(jcp==null)getJedisClusterPipeline();
|
||||
for(String val:values){
|
||||
jcp.sadd(setName, val);
|
||||
}
|
||||
jcp.sync();
|
||||
jcp.close();
|
||||
jcp=null;
|
||||
}
|
||||
/**
|
||||
*
|
||||
* isExistsInSet(判断一个字符串是否是set的成员,可用于去重,验证唯一性)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param setName
|
||||
* @param value
|
||||
* @return
|
||||
* @throws Exception
|
||||
*boolean
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public boolean isExistsInSet(String setName,String value) throws Exception{
|
||||
if(cluster==null)cluster=getJedisCluster();
|
||||
return cluster.sismember(setName, value);
|
||||
}
|
||||
public List<String> getList(String sql,long start,long end) throws Exception{
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
List<String> data=cluster.lrange(sql, start, end);
|
||||
return data;
|
||||
}
|
||||
public void saveString(String key,String value,int expire) throws Exception{
|
||||
logger.info("save String start");
|
||||
long start=System.currentTimeMillis();
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
cluster.set(key, value);
|
||||
cluster.expire(key, expire);
|
||||
long end=System.currentTimeMillis();
|
||||
logger.info("save String end,cost:"+(end-start));
|
||||
}
|
||||
public String getString(String key) throws Exception{
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
return cluster.get(key);
|
||||
}
|
||||
/**
|
||||
*
|
||||
* del(删除key)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param keys
|
||||
* @throws Exception
|
||||
*void
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void del(String... keys) throws Exception{
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
cluster.del(keys);
|
||||
}
|
||||
/**
|
||||
*
|
||||
* clearData(清空数据)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @throws Exception
|
||||
*void
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public void clearData() throws Exception{
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
logger.info("clear data start");
|
||||
long start=System.currentTimeMillis();
|
||||
Map<String, JedisPool> nodesMap=cluster.getClusterNodes();
|
||||
if(nodesMap==null){
|
||||
logger.error("empty redis nodes");
|
||||
return;
|
||||
}
|
||||
for(Entry<String, JedisPool> jedis:nodesMap.entrySet()){
|
||||
Jedis client=jedis.getValue().getResource();
|
||||
// only master node can flush db
|
||||
if(client.clusterInfo().indexOf("master")!=-1)
|
||||
client.flushAll();
|
||||
client.close();
|
||||
}
|
||||
long time=System.currentTimeMillis()-start;
|
||||
logger.info("clear data finish cost: "+time);
|
||||
}
|
||||
/**
|
||||
*
|
||||
* exists(键是否存在)
|
||||
* (这里描述这个方法适用条件 – 可选)
|
||||
* @param key
|
||||
* @return
|
||||
* @throws Exception
|
||||
*boolean
|
||||
* @exception
|
||||
* @since 1.0.0
|
||||
*/
|
||||
public boolean exists(String key) throws Exception{
|
||||
if(cluster==null){
|
||||
cluster=getJedisCluster();
|
||||
}
|
||||
return cluster.exists(key);
|
||||
}
|
||||
public static void main(String[] args) {
|
||||
|
||||
}
|
||||
}
|
||||
39
src/main/java/com/nis/util/redis/SaveRedisListThread.java
Normal file
39
src/main/java/com/nis/util/redis/SaveRedisListThread.java
Normal file
@@ -0,0 +1,39 @@
|
||||
package com.nis.util.redis;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.nis.web.service.SpringContextHolder;
|
||||
|
||||
public class SaveRedisListThread extends Thread {
|
||||
private static RedisDao redisDao = SpringContextHolder.getBean(RedisDao.class);
|
||||
protected final Logger logger = Logger.getLogger(this.getClass());
|
||||
|
||||
private String key;
|
||||
private List<String> value;
|
||||
private int expire;
|
||||
|
||||
public SaveRedisListThread(String key, List<String> value, int expire) {
|
||||
super(SaveRedisListThread.class.getSimpleName());
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.expire = expire;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
redisDao.saveList(value, key, expire);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage());
|
||||
try {
|
||||
if (redisDao.exists(key)) {
|
||||
redisDao.del(key);
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
logger.error(e1.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
42
src/main/java/com/nis/util/redis/SaveRedisThread.java
Normal file
42
src/main/java/com/nis/util/redis/SaveRedisThread.java
Normal file
@@ -0,0 +1,42 @@
|
||||
package com.nis.util.redis;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.nis.util.Constants;
|
||||
import com.nis.util.JsonMapper;
|
||||
import com.nis.util.redis.RedisDao;
|
||||
import com.nis.web.service.SpringContextHolder;
|
||||
|
||||
public class SaveRedisThread extends Thread {
|
||||
private static RedisDao redisDao = SpringContextHolder.getBean(RedisDao.class);
|
||||
protected final Logger logger = Logger.getLogger(this.getClass());
|
||||
|
||||
private String key;
|
||||
private Object value;
|
||||
private int expire;
|
||||
|
||||
public SaveRedisThread( String key, Object value,int expire){
|
||||
super(SaveRedisThread.class.getSimpleName());
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.expire = expire;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logger.info("saveRedis开始--"+System.currentTimeMillis());
|
||||
redisDao.saveString(key, JsonMapper.toJsonString(value), expire);
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage());
|
||||
try {
|
||||
if(redisDao.exists(key)){
|
||||
redisDao.del(key);
|
||||
}
|
||||
} catch (Exception e1) {
|
||||
logger.error(e1.getMessage());
|
||||
}
|
||||
}finally {
|
||||
logger.info("saveRedis结束--"+System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user