完成全量同步配置功能,添加记录各个业务与配置关系的记录

This commit is contained in:
RenKaiGe
2019-04-01 11:42:22 +08:00
parent 19d2236a7c
commit 06cd930380
10 changed files with 348 additions and 106 deletions

View File

@@ -48,6 +48,23 @@ public class JedisClusterUtils {
}
return value;
}
public static void del(String key) {
JedisCluster jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
jedis.del(key);
}
} catch (Exception e) {
throw new ServiceRuntimeException("从redis-cluster中删除" + key + "失败",
RestBusinessCode.KeyNotExistsInRedis.getValue());
} finally {
returnResource(jedis);
}
}
/**
* 获取缓存

View File

@@ -1,7 +1,9 @@
package com.nis.util;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,7 +14,7 @@ import com.nis.restful.ServiceRuntimeException;
import com.nis.web.service.SpringContextHolder;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
public class JedisUtils {
@@ -42,6 +44,47 @@ public class JedisUtils {
}
return value;
}
/**
* 获取缓存
*
* @param key 键
* @return 值
*/
public static Set<String> keys(String key, int redisDb) {
Set<String> value = null;
Jedis jedis = null;
try {
jedis = getResource(redisDb);
value = jedis.keys(key);
} catch (Exception e) {
throw new ServiceRuntimeException("" + redisDb + "号redisDB中获取" + key + "对应的值失败",
RestBusinessCode.KeyNotExistsInRedis.getValue());
} finally {
returnResource(jedis);
}
return value;
}
public static Set<String> smembers(String key, int redisDb) {
Set<String> value = new HashSet<String>();
Jedis jedis = null;
try {
jedis = getResource(redisDb);
value = jedis.smembers(key);
} catch (Exception e) {
throw new ServiceRuntimeException("" + redisDb + "号redisDB中获取" + key + "对应的值失败",
RestBusinessCode.KeyNotExistsInRedis.getValue());
} finally {
returnResource(jedis);
}
return value;
}
/**
* 获取缓存
@@ -220,8 +263,8 @@ public class JedisUtils {
* @throws JedisException
*/
public static Jedis getResource(int redisDb) throws JedisException {
JedisSentinelPool jedisSentinelPool = SpringContextHolder.getBean(JedisSentinelPool.class);
// JedisPool jedisSentinelPool = SpringContextHolder.getBean(JedisPool.class);
// JedisSentinelPool jedisSentinelPool = SpringContextHolder.getBean(JedisSentinelPool.class);
JedisPool jedisSentinelPool = SpringContextHolder.getBean(JedisPool.class);
if (jedisSentinelPool == null) {
throw new ServiceRuntimeException("redis连接池为空,请联系管理员检查程序",

View File

@@ -140,9 +140,9 @@ public class ConfigSourcesController extends BaseRestController {
checkOpAction(thread, System.currentTimeMillis() - start, configSourceStartStop.getOpAction(),
Constants.OPACTION_PUT);
configSourcesService.updateConfigSources(thread, start, configSourceStartStop.getConfigCompileStartStopList(),
configSourceStartStop.getOpTime(), sb,true);
configSourcesService.updateConfigSources(thread, start,
configSourceStartStop.getConfigCompileStartStopList(), configSourceStartStop.getOpTime(), sb,
true);
}
} catch (Exception e) {
@@ -192,7 +192,7 @@ public class ConfigSourcesController extends BaseRestController {
}
checkOpAction(thread, System.currentTimeMillis() - start, opAction, 2);
configSourcesService.updateConfigSources(thread, start, configSource.getConfigCompileList(),
configSource.getOpTime(), sb,false);
configSource.getOpTime(), sb, false);
} else {
throw new RestServiceException("Maat规则不能为空" + sb.toString(),
@@ -622,7 +622,6 @@ public class ConfigSourcesController extends BaseRestController {
"MAAT规则分组复用域配置删除成功" + sb.toString(), Constants.IS_DEBUG ? groupReuseSource : null);
}
@RequestMapping(value = "/cfg/v1/getAllKVByCompileId", method = RequestMethod.GET)
@ApiOperation(value = "根据配置id获取对应的编译,组,域等信息", httpMethod = "GET", response = Map.class, notes = "根据配置id获取对应的编译,组,域等信息")
@ApiParam(value = "配置id", name = "getAllKVByCompileId", required = true)
@@ -647,7 +646,7 @@ public class ConfigSourcesController extends BaseRestController {
@RequestMapping(value = "/cfg_batch/v1/configSources", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "全量同步配置接收接口", httpMethod = "POST", response = Map.class, notes = "接收全量同步配置")
public Map receiveConfigSources(@RequestBody String jsonString, HttpServletRequest request,
HttpServletResponse response) {
HttpServletResponse response) {
long start = System.currentTimeMillis();
AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null);
thread.setContent("全量同步不记录请求内容");
@@ -691,16 +690,18 @@ public class ConfigSourcesController extends BaseRestController {
logger.info("-----------开始存储到json格式数据------->>configType:" + configType + ",serviceId:" + serviceId
+ ",configTable:" + configTable + ",lastCompletedTag:" + lastCompletedTag);
if (jsonString != null && !jsonString.trim().equals("{}")) {// 如果最后的service没有配置不论是maat类还是回调类配置都会传{}+lastCompletedTag(finished)来结束数据传输
if (jsonString != null) {
String key = null;
if ("1".equals(configType)) {
key = "MAAT";
} else {
key = "UNMAAT";
}
key = key + "-" + serviceId + "-" + UUID.randomUUID();
configSourcesService.setRedisClusterKey(key, jsonString);
configSourcesService.setAllServiceKey(key);
key = key + "_" + serviceId + "_" + UUID.randomUUID();
if (!jsonString.trim().equals("{}")) {// 如果service下没有配置不论是maat类还是回调类配置都会传{},有数据的话{}中有内容
configSourcesService.setRedisClusterKey(key, jsonString);
}
configSourcesService.setAllServiceKey(key);//无论service下有没有数据,都会记录当前需要同步的业务,避免后台有数据,但是界面没有数据的情况,这种情况下可以发送空业务来删除后台的数据
}
if (!StringUtil.isEmpty(lastCompletedTag) && lastCompletedTag.trim().toLowerCase().equals("finished")) {
// 设置配置同步状态为接收配置完成
@@ -738,7 +739,7 @@ public class ConfigSourcesController extends BaseRestController {
}
logger.info("-----------配置同步指令下发:" + new Date());
// 设置配置同步状态为开始
//在下次开始同步之前把上次记录的key删除
// 在下次开始同步之前把上次记录的key删除
if (JedisClusterUtils.exists("allConfigSyncKey")) {
JedisClusterUtils.getResource().del("allConfigSyncKey");
}
@@ -821,14 +822,13 @@ public class ConfigSourcesController extends BaseRestController {
return false;
}
@RequestMapping(value = "/cfg/v1/getConfigCount", method = RequestMethod.GET)
@ApiOperation(value = "获取有效无效的配置个数", httpMethod = "GET", response = Map.class, notes = "获取有效无效的配置个数")
public Map getConfigCount(HttpServletRequest request, HttpServletResponse response) {
@RequestMapping(value = "/cfg/v1/getConfigByService", method = RequestMethod.GET)
@ApiOperation(value = "获取有效无效的配置信息", httpMethod = "GET", response = Map.class, notes = "获取有效无效的配置信息")
public Map getConfigCount(@RequestParam int service,HttpServletRequest request, HttpServletResponse response) {
long start = System.currentTimeMillis();
AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, null);
Map<String, Integer> allConfigByScan = configSourcesService.getAllConfigByScan();
allConfigByScan.putAll(configSourcesService.getAllConfig());
return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取有效无效的配置个数成功",
Map<String, Integer> allConfigByScan = configSourcesService.getConfigByService(service);
return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取有效无效的配置信息成功",
// configSourcesService.getAllConfig());
allConfigByScan);
}

View File

@@ -42,7 +42,7 @@ public class ConfigJedisServiceimpl implements ConfigRedisService {
// 用于在实时统计配置后面添加时间,方便读取入库时间
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
int serviceAndConfigRealDB = Configurations.getIntProperty("serviceAndConfigRealDB", 13);
public boolean saveUnMaatConfig(Map<Integer, List<Map<String, String>>> configMap) {
if (configMap != null && configMap.size() > 0) {
int count = 0;
@@ -126,7 +126,7 @@ public class ConfigJedisServiceimpl implements ConfigRedisService {
}
}
updateMaatInfo(expressionList, maatKey, transaction, maatVersion, redisDBIndex,
false, false, serviceStr);
false, true, serviceStr);
} else {
throw new ServiceRuntimeException(
"无法从applicationConfig-maatRedis.xml配置文件中获取service为" + service
@@ -474,7 +474,7 @@ public class ConfigJedisServiceimpl implements ConfigRedisService {
setConfig(maatConfig, maatXmlConfig, maatVersion, service, transaction,
redisDBIndex);
}
transaction.select(redisDBIndex);
transaction.incrBy("MAAT_VERSION", 1l);
logger.info("向{}号redis数据库更新了MAAT_VERSION,更新后版本是{}", redisDBIndex,
Integer.valueOf(maatVersionStr) + 1);
@@ -837,10 +837,10 @@ public class ConfigJedisServiceimpl implements ConfigRedisService {
if (isInvalid) {// 删除
updateMaatInfo(expressionList, maatKey, transaction, maatVersion,
redisDBIndex, true, false, service + "");
redisDBIndex, true, true, service + "");
} else {// 新增
updateMaatInfo(expressionList, maatKey, transaction, maatVersion,
redisDBIndex, false, false, service + "");
redisDBIndex, false, true, service + "");
}
List<MaatXmlSeq> seqList = maatXmlConfig.getSequenceList();
@@ -2267,17 +2267,19 @@ public class ConfigJedisServiceimpl implements ConfigRedisService {
}
}
}
transaction.select(13);
transaction.select(serviceAndConfigRealDB);
if (isSave) {// 只记录maat类的编译和回调类
if (idDel) {
String key = "OBSOLETE_" + service;
String key = "OBSOLETE_RULE:" + service;
transaction.sadd(key, maatKey);// 失效里面加一个,生效里面删除一个
transaction.srem(key.replace("OBSOLETE", "EFFECTIVE"), maatKey);
transaction.srem(key.replace("OBSOLETE_RULE", "EFFECTIVE_RULE"), maatKey.replace("OBSOLETE_RULE", "EFFECTIVE_RULE"));
} else {
String key = "EFFECTIVE" + service;
transaction.sadd(key, maatKey);// 生效里面加一个
String key = "EFFECTIVE_RULE:" + service;
transaction.sadd(key, maatKey);// 生效里面加一个,失效里面删一个
transaction.srem(key.replace("EFFECTIVE_RULE", "OBSOLETE_RULE"), maatKey.replace("EFFECTIVE_RULE", "OBSOLETE_RULE"));
}
}
transaction.select(redisDBIndex);
}
public Map<Integer, Map<String, String>> getAllKVByCompileId(Long id) {

View File

@@ -47,6 +47,7 @@ import com.nis.util.BasicProvingUtil;
import com.nis.util.CamelUnderlineUtil;
import com.nis.util.CompileVal;
import com.nis.util.Constants;
import com.nis.util.ExceptionUtil;
import com.nis.util.GroupReuseVal;
import com.nis.util.JedisClusterUtils;
import com.nis.util.JedisUtils;
@@ -54,6 +55,9 @@ import com.nis.util.JsonMapper;
import com.nis.util.ReadCommSourceXmlUtil;
import com.nis.util.ServiceAndRDBIndexReal;
import com.zdjizhi.utils.StringUtil;
import it.unimi.dsi.fastutil.Hash;
import com.nis.web.service.AuditLogThread;
import com.nis.web.service.BaseService;
@@ -270,7 +274,7 @@ public class ConfigSourcesService extends BaseService {
logger.info("---------------调用Redis maat配置新增接口---------------------");
long end = System.currentTimeMillis();
logger.warn("执行ConfigSourcesService.saveMaatConfig用时{}毫秒",end-currentTimeMillis);
logger.warn("执行ConfigSourcesService.saveMaatConfig用时{}毫秒", end - currentTimeMillis);
configRedisService.saveMaatConfig(configMap);
}
@@ -328,6 +332,7 @@ public class ConfigSourcesService extends BaseService {
}
}
}
/**
* @Description:
* @author(zdx) @date 2018年12月3日 下午6:48:32
@@ -499,7 +504,7 @@ public class ConfigSourcesService extends BaseService {
}
} else {
throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在",
throw new ServiceRuntimeException("service为" + service + "的业务写入数据库序号映射关系不存在",
RestBusinessCode.ServiceNoFoundDBIndex.getValue());
}
}
@@ -569,7 +574,7 @@ public class ConfigSourcesService extends BaseService {
if (!isConfigStartStop) {
// 所有的都删除成功返回true
if (!configRedisService.delMaatConfig(compileMap,false)) {
if (!configRedisService.delMaatConfig(compileMap, false)) {
throw new ServiceRuntimeException("取消MAAT配置时出现异常具体原因不详请联系管理员",
RestBusinessCode.service_runtime_error.getValue());
}
@@ -705,7 +710,7 @@ public class ConfigSourcesService extends BaseService {
maatTableName.substring(maatTableName.lastIndexOf("_") + 1));
dstStr = dstPath.replace("{fileName}", dstStr.substring(dstStr.lastIndexOf("/") + 1));
} else if ("file_id".equals(commonSourceFieldCfg.getDstName())) {
//dstStr = dstStr.substring(dstStr.indexOf("group"));
// dstStr = dstStr.substring(dstStr.indexOf("group"));
}
}
switch (commonSourceFieldCfg.getFieldType()) {
@@ -832,7 +837,6 @@ public class ConfigSourcesService extends BaseService {
configRedisService.saveUnMaatConfig(configMap);
}
public void saveCommonSources(String jsonString) throws Exception {
JsonArray jsonObjectList = null;
try {
@@ -915,7 +919,7 @@ public class ConfigSourcesService extends BaseService {
}
}
} else {
throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在",
throw new ServiceRuntimeException("service为" + service + "的业务写入数据库序号映射关系不存在",
RestBusinessCode.ServiceNoFoundDBIndex.getValue());
}
@@ -923,7 +927,6 @@ public class ConfigSourcesService extends BaseService {
configRedisService.saveUnMaatConfig(configMap);
}
/**
*
* @Description:回调类配置状态更新(停/启用)
@@ -946,7 +949,7 @@ public class ConfigSourcesService extends BaseService {
throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(),
RestBusinessCode.CBParamFormateError.getValue());
}
if (jsonObjectList!=null&&jsonObjectList.size()>Constants.MAX_LIST_SIZE) {
if (jsonObjectList != null && jsonObjectList.size() > Constants.MAX_LIST_SIZE) {
thread.setSaveContentFlag(false);
}
// <service,cfgIdList>
@@ -1180,18 +1183,21 @@ public class ConfigSourcesService extends BaseService {
} else {
maatConfig.getIpRegionMapList().addAll(dstMapList);
}
if ((maatConfig.getStrRegionMapList()!=null&&maatConfig.getStrRegionMapList().size()>Constants.MAX_LIST_SIZE)
||(maatConfig.getStrStrRegionMapList()!=null&&maatConfig.getStrStrRegionMapList().size()>Constants.MAX_LIST_SIZE)
||(maatConfig.getIpRegionMapList()!=null&&maatConfig.getIpRegionMapList().size()>Constants.MAX_LIST_SIZE)
||(maatConfig.getNumRegionMapList()!=null&&maatConfig.getNumRegionMapList().size()>Constants.MAX_LIST_SIZE)) {
if ((maatConfig.getStrRegionMapList() != null
&& maatConfig.getStrRegionMapList().size() > Constants.MAX_LIST_SIZE)
|| (maatConfig.getStrStrRegionMapList() != null
&& maatConfig.getStrStrRegionMapList().size() > Constants.MAX_LIST_SIZE)
|| (maatConfig.getIpRegionMapList() != null
&& maatConfig.getIpRegionMapList().size() > Constants.MAX_LIST_SIZE)
|| (maatConfig.getNumRegionMapList() != null
&& maatConfig.getNumRegionMapList().size() > Constants.MAX_LIST_SIZE)) {
thread.setSaveContentFlag(false);
}
//maatConfig.setService(groupReuse.getService());
// maatConfig.setService(groupReuse.getService());
list.add(maatConfig);
}
// 调用接口入redis
logger.info("---------------调用Redis 分组复用配置新增接口---------------------");
configRedisService.saveGroupReuseConfig(list);
@@ -1378,39 +1384,64 @@ public class ConfigSourcesService extends BaseService {
*/
public void setAllServiceKey(String value) {
JedisCluster resource = JedisClusterUtils.getResource();
resource.append("allConfigSyncKey", value + ";");
resource.sadd("allConfigSyncKey", value);
}
public Map<String, Integer> getAllConfig() {
Jedis resource = JedisUtils.getResource(0);
Set<String> effectiveSet = new HashSet<>();
Set<String> obsoleteSet = new HashSet<>();
for (int i = 2; i < 6; i++) {
resource.select(i);
effectiveSet.addAll(resource.keys("EFFECTIVE_RULE:*_COMPILE*"));
obsoleteSet.addAll(resource.keys("OBSOLETE_RULE:*_COMPILE*"));
public Map<String, Integer> getConfigByService(Integer service) {
Map map = new HashMap<>();
try {
Jedis resource = JedisUtils.getResource(13);
Map<Integer, List<String>> effectiveMap = null;
Map<Integer, List<String>> obsuleteMap = null;
if (service == -1) {
effectiveMap = getAllServiceAndConfigMap(resource, "EFFECTIVE_RULE:*");
obsuleteMap = getAllServiceAndConfigMap(resource, "OBSOLETE_RULE:*");
} else {
effectiveMap = getServiceAndConfigMap(resource, "EFFECTIVE_RULE:"+service);
obsuleteMap = getServiceAndConfigMap(resource, "OBSOLETE_RULE:"+service);
}
map.put("effective", effectiveMap);
int effectiveCount=0;
for (Integer key : effectiveMap.keySet()) {
effectiveCount+=effectiveMap.get(key).size();
}
map.put("effectiveCount", effectiveCount);
map.put("obsolete", obsuleteMap);
int obsoleteCount=0;
for (Integer key : obsuleteMap.keySet()) {
obsoleteCount+=obsuleteMap.get(key).size();
}
map.put("obsoleteCount",obsoleteCount);
JedisUtils.returnBrokenResource(resource);
} catch (Exception e) {
logger.error("获取有效无效的配置信息失败,失败原因{}",ExceptionUtil.getExceptionMsg(e));
throw new RestServiceException("获取有效无效的配置信息失败", RestBusinessCode.syncStatusFailed.getValue());
}
Map<String, Integer> map = new HashMap<>();
map.put("effectiveMaatKeys", effectiveSet.size());
map.put("obsoleteMaatKeys", obsoleteSet.size());
JedisUtils.returnBrokenResource(resource);
return map;
}
public Map<String, Integer> getAllConfigByScan() {
Jedis resource = JedisUtils.getResource(0);
Set<String> effectiveSet = new HashSet<>();
Set<String> obsoleteSet = new HashSet<>();
for (int i = 2; i < 6; i++) {
resource.select(i);
effectiveSet.addAll(getKeyByScan("EFFECTIVE_RULE:*_COMPILE*", resource));
obsoleteSet.addAll(getKeyByScan("OBSOLETE_RULE:*_COMPILE*", resource));
private Map<Integer, List<String>> getAllServiceAndConfigMap(Jedis resource, String keys) {
Map<Integer, List<String>> effectiveMap = new HashMap<>();
Set<String> effectiveKeys = resource.keys(keys);
for (String effectiveKey : effectiveKeys) {
effectiveMap.putAll(getServiceAndConfigMap(resource, effectiveKey));
}
Map<String, Integer> map = new HashMap<>();
map.put("effectiveMaat", effectiveSet.size());
map.put("obsoleteMaat", obsoleteSet.size());
JedisUtils.returnBrokenResource(resource);
return map;
return effectiveMap;
}
private Map<Integer, List<String>> getServiceAndConfigMap(Jedis resource, String key) {
Map<Integer, List<String>> effectiveMap = new HashMap<>();
List<String> list = new ArrayList<>();
Set<String> smembers = resource.smembers(key);
if (smembers.size() > 0) {
for (String configKey : smembers) {
list.add(configKey.substring(configKey.indexOf(",") + 1));
// list.add(configKey);
}
}
effectiveMap.put(Integer.parseInt(key.substring(key.indexOf(":") + 1)), list);
return effectiveMap;
}
public List<String> getKeyByScan(String pattern, Jedis resource) {

View File

@@ -2,8 +2,11 @@ package com.nis.web.task;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.codec.digest.DigestUtils;
@@ -17,15 +20,20 @@ import org.springframework.stereotype.Component;
import com.nis.domain.restful.ConfigSource;
import com.nis.restful.RestBusinessCode;
import com.nis.restful.ServiceRuntimeException;
import com.nis.util.Configurations;
import com.nis.util.Constants;
import com.nis.util.ExceptionUtil;
import com.nis.util.JedisUtils;
import com.nis.util.ServiceAndRDBIndexReal;
import com.nis.web.service.AuditLogThread;
import com.nis.web.service.SpringContextHolder;
import com.nis.web.service.restful.ConfigSourcesService;
import com.zdjizhi.utils.JsonMapper;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;
@Component
@@ -36,9 +44,16 @@ public class SyncAllConfigTask {
// private JedisCluster jedisCluster;
@Autowired
protected ConfigSourcesService configSourcesService;
// 存放编译,分组,域配置id关系的redis数据库编号
int idRelaRedisDBIndex = Configurations.getIntProperty("idRelaRedisDBIndex", 15);
// 存储各个业务下的配置信息
int serviceAndConfigRealDB = Configurations.getIntProperty("serviceAndConfigRealDB", 13);
// 存储实时统计数据的redisdb
int redisStatisticsRealDBIndex = Configurations.getIntProperty("redisStatisticsRealDBIndex", 14);
/**
* 每次使用获取新连接,避免集群某些机器宕机后影响连接使用,注意需要在applicationContext-redis.xml中修改jedisCluster的scope为 prototype
* 每次使用获取新连接,避免集群某些机器宕机后影响连接使用,注意需要在applicationContext-redis.xml中修改jedisCluster的scope为
* prototype
*
* @return
* @throws JedisException
@@ -57,24 +72,27 @@ public class SyncAllConfigTask {
public void syncRedisToCluster() {
String requestId = UUID.randomUUID().toString();
JedisCluster jedisCluster = getResource();
Map<Integer, Map<String, String>> map = null;
// Map<Integer, Map<String, String>> map = null;
Jedis resource = JedisUtils.getResource(0);
Transaction transaction = resource.multi();
try {
if (lock(jedisCluster, requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
// if (true) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
String allConfigSyncStatus = jedisCluster.get("allConfigSyncStatus");
if (allConfigSyncStatus != null) {// 配置初始化完成
if (allConfigSyncStatus.trim().equals("1")) {
map = getAllSeqAndVersion();
// map = getAllSeqAndVersion();
// 设置配置同步状态为正在进行
configSourcesService.setAllConfigSyncStatus("2");
logger.warn("开始执行配置全量导入操作,将allConfigSyncStatus值设置为2正在进行导入操作");
Map<String, String> maatMap = new HashMap<>();
Map<String, String> unMaatMap = new HashMap<>();
String allConfigSyncKey = jedisCluster.get("allConfigSyncKey");
if (allConfigSyncKey != null && !allConfigSyncKey.trim().equals("")) {
String[] split = org.apache.commons.lang.StringUtils.split(allConfigSyncKey, ";");
for (String key : split) {
String val = jedisCluster.get(key);
Set<String> smembers = jedisCluster.smembers("allConfigSyncKey");
Set<Integer> serviceSet = new HashSet<>();
for (String key : smembers) {
serviceSet.add(Integer.parseInt(key.substring(key.indexOf("_") + 1, key.lastIndexOf("_"))));
String val = jedisCluster.get(key);
if (val != null && !val.trim().equals("")) {
String md5 = DigestUtils.md5Hex(val);
if (key.startsWith("UNMAAT")) {
unMaatMap.put(md5, val);
@@ -82,20 +100,27 @@ public class SyncAllConfigTask {
maatMap.put(md5, val);
}
}
flushRedisDb();
addConfigToRedis(maatMap, true);
addConfigToRedis(unMaatMap, false);
logger.warn("执行配置全量导入成功,将allConfigSyncStatus值设置为3,导入成功");
// 设置配置同步状态为写redis成功
configSourcesService.setAllConfigSyncStatus("3");
// 删除存储全量配置key的关系key
jedisCluster.del("allConfigSyncKey");
for (String key : split) {
jedisCluster.del(key);
}
logger.warn("删除allConfigSyncKey,及其中的内容成功");
}
Set<Integer> flushRedisDb = flushRedisDb(serviceSet, transaction);
addConfigToRedis(maatMap, true);
addConfigToRedis(unMaatMap, false);
logger.warn("执行配置全量导入成功,将allConfigSyncStatus值设置为3,导入成功");
// 设置配置同步状态为写redis成功
configSourcesService.setAllConfigSyncStatus("3");
// 删除存储全量配置key的关系key
jedisCluster.del("allConfigSyncKey");
for (String key : smembers) {
jedisCluster.del(key);
}
logger.warn("删除allConfigSyncKey,及其中的内容成功");
for (Integer db : flushRedisDb) {
if (db != idRelaRedisDBIndex && db != serviceAndConfigRealDB) {// 13和15不处理maat_version
JedisUtils.incrBy("MAAT_VERSION", 2, db);
}
}
} else {
logger.info(
"集群中allConfigSyncStatus的值是{}[开始:0(界面下发同步状态),初始化:1(配置接收完成状态),进行中:2(服务写redis),已完成:3(服务写redis完毕),失败:-1(服务写redis失败)],暂不执行全量配置同步操作",
@@ -110,15 +135,19 @@ public class SyncAllConfigTask {
}
} catch (Exception e) {
logger.error("同步界面配置到redis中失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e));
transaction.discard();
// 设置配置同步状态为写redis失败
configSourcesService.setAllConfigSyncStatus("-1");
jedisCluster.del("allConfigSyncKey");
logger.error("执行配置全量导入失败,将allConfigSyncStatus值设置为-1,导入失败");
} finally {
// 释放连接到连接池
JedisUtils.returnResource(resource);
unlock(jedisCluster, requestId);
closeConn(jedisCluster);
if (map != null && map.size() > 0) {
recoverRedisData(map);
}
// if (map != null && map.size() > 0) {
// recoverRedisData(map);
// }
}
}
@@ -263,9 +292,126 @@ public class SyncAllConfigTask {
/**
* 清空配置redis库,不清空0号库
*/
private void flushRedisDb() {// 不清空0号库
for (int i = 1; i < 16; i++) {
JedisUtils.getResource(i).flushDB();
private Set<Integer> flushRedisDb(Set<Integer> serviceSet, Transaction transaction) {// 不清空0号库
// 记录每个库中需要删除的key
Map<Integer, Set<String>> delKeyMap = new HashMap<>();
delKeyMap.put(idRelaRedisDBIndex, new HashSet<String>());// 删除编译,组,域的关系
delKeyMap.put(redisStatisticsRealDBIndex, new HashSet<String>());// 删除实时统计数据
delKeyMap.put(serviceAndConfigRealDB, new HashSet<String>());// 删除业务和配置的对应关系
for (Integer service : serviceSet) {
if (service != -1) {
String key = "EFFECTIVE_RULE:" + service;
Set<String> smembers = JedisUtils.smembers(key, serviceAndConfigRealDB);
if (smembers.size() > 0) {
delKeyMap.get(serviceAndConfigRealDB).add(key);// 删除记录的配置关联关系的key
List<Integer> list = ServiceAndRDBIndexReal.getServiceDBIndexmap().get(service);
if (ServiceAndRDBIndexReal.getUnMaatTableName(service) != null) {
for (Integer db : list) {
if (delKeyMap.containsKey(db)) {
for (String delKey : smembers) {
delKeyMap.get(db).add(delKey);
}
} else {
Set<String> keySet = new HashSet<>();
for (String delKey : smembers) {
keySet.add(delKey);
}
delKeyMap.put(db, keySet);
}
}
} else {
transaction.select(idRelaRedisDBIndex);
String groupTableName = ServiceAndRDBIndexReal.getMaatTableName(service, 11, null);
Set<String> groupIdSet = new HashSet<String>();// 获取当前业务下所有的groupId
for (String delKey : smembers) {// 获取所有的编译id
String compileId = delKey.substring(delKey.indexOf(",") + 1);
String compileGroupKey = "COMPILEGROUP:" + compileId;
delKeyMap.get(idRelaRedisDBIndex).add(compileGroupKey);
delKeyMap.get(redisStatisticsRealDBIndex).add(delKey);
String groupCompileKeyStr = JedisUtils.get(compileGroupKey, idRelaRedisDBIndex);
if (groupCompileKeyStr != null) {
String[] groupCompileSplit = org.apache.commons.lang.StringUtils
.split(groupCompileKeyStr, ";");
for (String groupCompileKey : groupCompileSplit) {
String groupCompileStr = groupCompileKey.substring(0, groupCompileKey.indexOf("-"));
delKeyMap.get(idRelaRedisDBIndex).add(groupCompileStr);
String groupId = groupCompileStr.replace("GROUPCOMPILE:", "");
groupIdSet.add(groupId);
String dbStr = groupCompileKey.substring(groupCompileKey.indexOf("-") + 1);
String[] split = org.apache.commons.lang.StringUtils.split(dbStr, ",");
for (String dbNumStr : split) {
int db = Integer.parseInt(dbNumStr.trim());
if (delKeyMap.containsKey(db)) {
delKeyMap.get(db).add(
"EFFECTIVE_RULE:" + groupTableName + "," + groupId + compileId);
delKeyMap.get(db).add(delKey);
} else {
Set<String> keySet = new HashSet<>();
keySet.add("EFFECTIVE_RULE:" + groupTableName + "," + groupId + compileId);
keySet.add(delKey);
delKeyMap.put(db, keySet);
}
}
}
}
}
for (String groupId : groupIdSet) {
getDelRegionKey(delKeyMap, transaction, "COMMONGROUPREGION:" + groupId);
getDelRegionKey(delKeyMap, transaction, "GROUPREGION:" + groupId);
}
}
// } else {
// throw new ServiceRuntimeException(
// "全量同步时:无法从" + serviceAndConfigRealDB + "号库中获取service=" + service + "对应的规则,请检查业务类型是否正确",
// RestBusinessCode.NotFoundRedisRule.getValue());
}
} else {
Set<String> keys = JedisUtils.keys("EFFECTIVE_RULE:*", idRelaRedisDBIndex);
if(keys!=null&&keys.size()>0) {
delKeyMap.get(idRelaRedisDBIndex).addAll(keys);
}
}
}
for (Integer db : delKeyMap.keySet()) {
transaction.select(db);
for (String delKey : delKeyMap.get(db)) {
transaction.del(delKey);
}
}
transaction.exec();
return delKeyMap.keySet();// 返回哪个db删除了配置,在同步完配置后让这些db中的maat_version+2
}
/**
* 获取需要删除的域信息的key
*
* @param delKeyMap
* @param groupRegionKey
*/
private void getDelRegionKey(Map<Integer, Set<String>> delKeyMap, Transaction transaction, String groupRegionKey) {
String commonGroupRegionStr = JedisUtils.get(groupRegionKey, idRelaRedisDBIndex);
if (commonGroupRegionStr != null) {
delKeyMap.get(idRelaRedisDBIndex).add(groupRegionKey);
String[] groupRegion = org.apache.commons.lang.StringUtils.split(commonGroupRegionStr, ";");
for (String groupRegionStr : groupRegion) {
String regionStr = groupRegionStr.substring(0, groupRegionStr.indexOf("-"));
String regionDbStr = groupRegionStr.substring(groupRegionStr.indexOf("-") + 1);
String[] regionDbSplit = org.apache.commons.lang.StringUtils.split(regionDbStr, ",");
for (String dbNumStr : regionDbSplit) {
int db = Integer.parseInt(dbNumStr.trim());
if (delKeyMap.containsKey(db)) {
delKeyMap.get(db).add(regionStr);
} else {
Set<String> keySet = new HashSet<>();
keySet.add(regionStr);
delKeyMap.put(db, keySet);
}
}
}
transaction.del(groupRegionKey);
}
}

View File

@@ -72,7 +72,7 @@ public class SyncRedisToCluster {
}
}
// @Scheduled(cron = "${syncRedisToClusterCron}")
@Scheduled(cron = "${syncRedisToClusterCron}")
public void syncRedisToCluster() {
JedisCluster jedisCluster = getResource();
String requestId = UUID.randomUUID().toString();

View File

@@ -50,7 +50,7 @@
<!-- </bean> -->
<!-- redis集群 -->
<bean id="jedisCluster" class="redis.clients.jedis.JedisCluster">
<bean id="jedisCluster" class="redis.clients.jedis.JedisCluster" scope="prototype">
<constructor-arg index="0">
<set>
<bean class="redis.clients.jedis.HostAndPort">

View File

@@ -105,11 +105,11 @@ jdbc.clickhouse.password=jOA3tbfJiJlPci6XUHIbVg==
#本地的clickhouse地址,新增的TBS_ODS_NTC_CONN_RECORD_LOG_LOCAL在本地
jdbc.ckLocal.url=jdbc:clickhouse://10.0.8.14:8123/k18_ods?socket_timeout=90000
jdbc.ckLocal.url=jdbc:clickhouse://192.168.10.192:8123/k18_ods?socket_timeout=90000
jdbc.ckLocal.username=default
#实际密码ceiec2018
jdbc.ckLocal.key=p8yBsnjQ2S4qT0XeSTi7lQ==
jdbc.ckLocal.password=obYXo/qhb8fDaQbTpX9slA==
jdbc.ckLocal.key=aUkjs+fcwf6p4rDqHiC+ng==
jdbc.ckLocal.password=jOA3tbfJiJlPci6XUHIbVg==
@@ -176,14 +176,14 @@ minio_bucketName=maat-redis
#华严
#redis.host=192.168.11.243
#元辰鑫外网-开发环境
redis.host=192.168.10.12
redis.host=192.168.10.204
#元辰鑫外网
#redis.host=192.168.10.205
#亦庄测试环境
#redis.host=192.168.10.215
#亦庄演示环境
#redis.host=10.3.34.1
redis.port=6379
redis.port=6380
redis.pass=
#最大空闲连接数
redis.maxIdle=50
@@ -208,3 +208,4 @@ redis.cluster.host2=192.168.10.193
redis.cluster.port1=7001
redis.cluster.port2=7002
redis.cluster.port3=7003

View File

@@ -191,6 +191,8 @@ maxRedisDBIndex=16
idRelaRedisDBIndex=15
#存储分组复用域配置的redisdb
tmpStorageReuseRegionDB=15
#存储各个业务下的配置信息
serviceAndConfigRealDB=13
##实时统计(编译配置)数据表达式
redisStatisticsReal=[COMPILE_ID];\t;[SERVICE];\t;[ACTION];\t;[CONT_TYPE];\t;[ATTR_TYPE];\t;[CONT_LABEL];\t;[TASK_ID];\t;[AFFAIR_ID];\t;[DO_BLACKLIST];\t;[DO_LOG];\t;[EFFECTIVE_RANGE];\t;[START_TIME];\t;[END_TIME];\t;[USER_REGION];\t;[IS_VALID];\t;[GROUP_NUM];\t;[FATHER_CFG_ID];\t;[OP_TIME]