1:修改全量配置同步,controller对状态的验证
2:添加从集群中恢复数据的功能并记录相关的maat_version 3:添加无验证的保存maat和unmaat类的方法 4:修改集群中保存同步数据的实效时间为24小时 5:修改集群获取连接的方式为每次使用获取,使用完关闭,避免连接出现问题
This commit is contained in:
@@ -29,6 +29,7 @@ import com.nis.restful.RestServiceException;
|
||||
import com.nis.restful.ServiceRuntimeException;
|
||||
import com.nis.util.Constants;
|
||||
import com.nis.util.FileUtils;
|
||||
import com.nis.util.JedisClusterUtils;
|
||||
import com.nis.util.MinioUtil;
|
||||
import com.nis.util.StringUtils;
|
||||
import com.nis.web.controller.BaseRestController;
|
||||
@@ -549,8 +550,17 @@ public class ConfigSourcesController extends BaseRestController {
|
||||
long start = System.currentTimeMillis();
|
||||
AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null);
|
||||
thread.setContent("全量同步不记录请求内容");
|
||||
String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus();
|
||||
if (allConfigSyncStatus != null) {
|
||||
if (!allConfigSyncStatus.equals("0")) {
|
||||
throw new RestServiceException("后台同步指令为" + allConfigSyncStatus + ",请先下发配置同步指令,或等待后台数据同步进程完成后再次下发配置同步指令",
|
||||
RestBusinessCode.config_integrity_error.getValue());
|
||||
}
|
||||
} else {
|
||||
throw new RestServiceException("没有获取到同步请求标志,请先下发配置同步指令",
|
||||
RestBusinessCode.config_integrity_error.getValue());
|
||||
}
|
||||
try {
|
||||
|
||||
String configType = request.getHeader("Config-Type");
|
||||
String serviceId = request.getHeader("Service-Id");
|
||||
String configTable = request.getHeader("Config-Table");
|
||||
@@ -579,16 +589,19 @@ public class ConfigSourcesController extends BaseRestController {
|
||||
}
|
||||
logger.info("-----------开始存储到json格式数据------->>configType:" + configType + ",serviceId:" + serviceId
|
||||
+ ",configTable:" + configTable + ",lastCompletedTag:" + lastCompletedTag);
|
||||
String key = null;
|
||||
if ("1".equals(configType)) {
|
||||
key = "MAAT";
|
||||
} else {
|
||||
key = "UNMAAT";
|
||||
|
||||
if (jsonString != null && !jsonString.trim().equals("")) {// 张薇说body可能为空,有的service没有数据也会下发一次,譬如最后一次提交时,lastCompletedTag=finished,但是body可能是空的
|
||||
String key = null;
|
||||
if ("1".equals(configType)) {
|
||||
key = "MAAT";
|
||||
} else {
|
||||
key = "UNMAAT";
|
||||
}
|
||||
key = key + "-" + serviceId + "-" + UUID.randomUUID();
|
||||
configSourcesService.setRedisClusterKey(key, jsonString);
|
||||
configSourcesService.setAllServiceKey(key);
|
||||
}
|
||||
key = key + "-" + serviceId + "-" + UUID.randomUUID();
|
||||
configSourcesService.setRedisClusterKey(key, jsonString);
|
||||
configSourcesService.setAllServiceKey(key);
|
||||
if (!StringUtil.isEmpty(lastCompletedTag)) {
|
||||
if (!StringUtil.isEmpty(lastCompletedTag) && lastCompletedTag.trim().toLowerCase().equals("finished")) {
|
||||
// 设置配置同步状态为接收配置完成
|
||||
configSourcesService.setAllConfigSyncStatus("1");
|
||||
logger.info("接收全量同步配置:FINISHED");
|
||||
@@ -610,11 +623,35 @@ public class ConfigSourcesController extends BaseRestController {
|
||||
public Map acceptStatus(@RequestBody String jsonString, HttpServletRequest request, HttpServletResponse response) {
|
||||
long start = System.currentTimeMillis();
|
||||
AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null);
|
||||
|
||||
JSONObject obj = JSONObject.fromObject(jsonString);
|
||||
if (!StringUtil.isEmpty(obj.get("syncStatus")) && "1".equals(obj.get("syncStatus").toString())) {
|
||||
logger.info("-----------配置同步指令下发:" + new Date());
|
||||
// 设置配置同步状态为开始
|
||||
configSourcesService.setAllConfigSyncStatus("0");
|
||||
if (!StringUtil.isEmpty(obj.get("syncStatus"))) {
|
||||
String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus();
|
||||
if ("1".equals(obj.get("syncStatus").toString())) {
|
||||
if (allConfigSyncStatus != null) {
|
||||
if (allConfigSyncStatus.equals("0") || allConfigSyncStatus.equals("1")
|
||||
|| allConfigSyncStatus.equals("2")) {
|
||||
throw new RestServiceException("已经下发过配置同步指令,请等待后台数据同步进程完成后再次下发配置同步指令",
|
||||
RestBusinessCode.config_integrity_error.getValue());
|
||||
}
|
||||
}
|
||||
logger.info("-----------配置同步指令下发:" + new Date());
|
||||
// 设置配置同步状态为开始
|
||||
configSourcesService.setAllConfigSyncStatus("0");
|
||||
} else if ("0".equals(obj.get("syncStatus").toString())) {// 取消同步指令
|
||||
if (allConfigSyncStatus != null
|
||||
&& (allConfigSyncStatus.equals("0") || allConfigSyncStatus.equals("1"))) {// 只有在没有完全接收配置之前可以取消,否则不允许取消,因为接收完配置之后会把redis清空,所以这个时候不允许取消了
|
||||
// 设置配置同步状态为完成,并把之前记录的key删除
|
||||
configSourcesService.setAllConfigSyncStatus("3");
|
||||
if (JedisClusterUtils.exists("allConfigSyncKey")) {
|
||||
JedisClusterUtils.getResource().del("allConfigSyncKey");
|
||||
}
|
||||
} else {
|
||||
throw new RestServiceException(
|
||||
"配置同步指令状态为" + allConfigSyncStatus + ",不可执行取消操作了请等待后台数据同步进程完成后再次下发配置同步指令",
|
||||
RestBusinessCode.config_integrity_error.getValue());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.error("未获取到同步状态码");
|
||||
thread.setBusinessCode(RestBusinessCode.syncStatusFailed.getValue());
|
||||
@@ -633,7 +670,12 @@ public class ConfigSourcesController extends BaseRestController {
|
||||
List<JSONObject> list = new ArrayList<JSONObject>();
|
||||
JSONObject obj = new JSONObject();
|
||||
obj.put("service", "ntc");
|
||||
obj.put("status", configSourcesService.getAllConfigSyncStatus());
|
||||
String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus();
|
||||
if (allConfigSyncStatus == null || allConfigSyncStatus.trim().equals("")) {
|
||||
obj.put("status", "-1");
|
||||
} else {
|
||||
obj.put("status", configSourcesService.getAllConfigSyncStatus());
|
||||
}
|
||||
obj.put("opTime", "2018-11-23 08:31:27");
|
||||
list.add(obj);
|
||||
return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取全量同步状态成功",
|
||||
|
||||
@@ -307,6 +307,185 @@ public class ConfigSourcesService extends BaseService {
|
||||
configRedisService.saveMaatConfig(configMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* @Description:
|
||||
* @author(zdx) @date 2018年12月3日 下午6:48:32
|
||||
* @param configCompileList
|
||||
* @throws Exception
|
||||
*/
|
||||
|
||||
public void saveMaatConfig(List<ConfigCompile> configCompileList) throws Exception {
|
||||
Map<Integer, List<MaatConfig>> maatMap = new HashMap<Integer, List<MaatConfig>>();
|
||||
Map<Integer, List<MaatConfig>> configMap = new HashMap<Integer, List<MaatConfig>>();
|
||||
|
||||
for (ConfigCompile configCompile : configCompileList) {
|
||||
Integer service = Integer.valueOf(configCompile.getService().toString());
|
||||
MaatConfig maatConfig = new MaatConfig();
|
||||
|
||||
maatConfig.setService(service);
|
||||
// 编译
|
||||
maatConfig.setCompileMap(convertObjectToMap(configCompile, ConfigCompile.class));
|
||||
// 分组
|
||||
List<Map<String, String>> dstMaplList = null;
|
||||
if (!StringUtil.isEmpty(configCompile.getGroupRelationList())) {
|
||||
dstMaplList = new ArrayList<Map<String, String>>();
|
||||
for (ConfigGroupRelation group : configCompile.getGroupRelationList()) {
|
||||
dstMaplList.add(convertObjectToMap(group, ConfigGroupRelation.class));
|
||||
}
|
||||
}
|
||||
maatConfig.setGroupMapList(dstMaplList);
|
||||
// 字符串域
|
||||
dstMaplList = null;
|
||||
List<Map<String, String>> strongMapList = null;
|
||||
if (!StringUtil.isEmpty(configCompile.getStrRegionList())) {
|
||||
dstMaplList = new ArrayList<Map<String, String>>();
|
||||
for (StrRegion region : configCompile.getStrRegionList()) {
|
||||
if (StringUtil.isEmpty(region.getDistrict())) {
|
||||
dstMaplList.add(convertObjectToMap(region, StrRegion.class));
|
||||
} else {
|
||||
if (StringUtil.isEmpty(strongMapList)) {
|
||||
strongMapList = new ArrayList<Map<String, String>>();
|
||||
}
|
||||
strongMapList.add(convertObjectToMap(region, StrRegion.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
maatConfig.setStrRegionMapList(dstMaplList);
|
||||
// 增强字符串域
|
||||
if (!StringUtil.isEmpty(strongMapList) && strongMapList.size() > 0) {
|
||||
maatConfig.setStrStrRegionMapList((strongMapList));
|
||||
}
|
||||
// 数值域
|
||||
dstMaplList = null;
|
||||
if (!StringUtil.isEmpty(configCompile.getNumRegionList())) {
|
||||
dstMaplList = new ArrayList<Map<String, String>>();
|
||||
for (NumRegion region : configCompile.getNumRegionList()) {
|
||||
dstMaplList.add(convertObjectToMap(region, NumRegion.class));
|
||||
}
|
||||
}
|
||||
maatConfig.setNumRegionMapList(dstMaplList);
|
||||
|
||||
// Ip域
|
||||
dstMaplList = null;
|
||||
if (!StringUtil.isEmpty(configCompile.getIpRegionList())) {
|
||||
dstMaplList = new ArrayList<Map<String, String>>();
|
||||
for (IpRegion region : configCompile.getIpRegionList()) {
|
||||
dstMaplList.add(convertObjectToMap(region, IpRegion.class));
|
||||
}
|
||||
}
|
||||
maatConfig.setIpRegionMapList(dstMaplList);
|
||||
|
||||
// 摘要类域
|
||||
dstMaplList = null;
|
||||
if (!StringUtil.isEmpty(configCompile.getDigestRegionList())) {
|
||||
dstMaplList = new ArrayList<Map<String, String>>();
|
||||
for (DigestRegion region : configCompile.getDigestRegionList()) {
|
||||
dstMaplList.add(convertObjectToMap(region, DigestRegion.class));
|
||||
}
|
||||
}
|
||||
|
||||
maatConfig.setFileDigestRegionMapList(dstMaplList);
|
||||
|
||||
// 文本相似性域
|
||||
// dstMaplList = null;
|
||||
// maatConfig.setFileLikeRegionMapList(dstMaplList);
|
||||
|
||||
// 生效范围IP域
|
||||
dstMaplList = null;
|
||||
if (!StringUtil.isEmpty(configCompile.getIpClientRangeList())) {
|
||||
dstMaplList = new ArrayList<Map<String, String>>();
|
||||
for (IpRegion region : configCompile.getIpClientRangeList()) {
|
||||
dstMaplList.add(convertObjectToMap(region, IpRegion.class));
|
||||
}
|
||||
}
|
||||
maatConfig.setIpClientRangeMapList(dstMaplList);
|
||||
|
||||
if (maatMap.containsKey(service)) {
|
||||
maatMap.get(service).add(maatConfig);
|
||||
} else {
|
||||
List<MaatConfig> maatCfgList = new ArrayList<MaatConfig>();
|
||||
maatCfgList.add(maatConfig);
|
||||
maatMap.put(service, maatCfgList);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 调用接口入redis
|
||||
|
||||
Iterator serviceIterator = maatMap.keySet().iterator();
|
||||
while (serviceIterator.hasNext()) {
|
||||
Integer service = Integer.valueOf(serviceIterator.next().toString());
|
||||
List<Integer> dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service);
|
||||
if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) {
|
||||
for (Integer dbIndex : dbIndexList) {
|
||||
// 分发到阀门有些业务需要添加编译属性到域配置
|
||||
List<MaatConfig> newMaatConfigList = new ArrayList<MaatConfig>();
|
||||
newMaatConfigList.addAll(maatMap.get(service));
|
||||
if (dbIndex.intValue() == ServiceAndRDBIndexReal.getValveDBIndex().intValue()) {
|
||||
Map<Integer, Map<String, String[]>> maatToValueMap = ServiceAndRDBIndexReal.getMaatToValveMap();
|
||||
if (maatToValueMap.containsKey(service)) {
|
||||
|
||||
Map<String, String[]> regionAndFiledMap = maatToValueMap.get(service);
|
||||
for (int i = 0; i < newMaatConfigList.size(); i++) {
|
||||
MaatConfig maatConfig = newMaatConfigList.get(i);
|
||||
MaatConfig newMaatConfig = (MaatConfig) JsonMapper
|
||||
.fromJsonString(JsonMapper.toJsonString(maatConfig), MaatConfig.class);
|
||||
Iterator iterator = regionAndFiledMap.keySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String regionName = iterator.next().toString();
|
||||
PropertyDescriptor pd;
|
||||
try {
|
||||
pd = new PropertyDescriptor(regionName + "MapList", MaatConfig.class);
|
||||
Method method = pd.getReadMethod();
|
||||
Object object = method.invoke(newMaatConfig);
|
||||
|
||||
if (object != null) {
|
||||
|
||||
List<Map<String, String>> listMaps = new ArrayList<Map<String, String>>();
|
||||
listMaps.addAll((List<Map<String, String>>) object);
|
||||
String[] fields = regionAndFiledMap.get(regionName);
|
||||
for (String fieldName : fields) {
|
||||
String value = newMaatConfig.getCompileMap()
|
||||
.get(fieldName.toLowerCase());
|
||||
if (!StringUtil.isEmpty(value)) {
|
||||
for (Map<String, String> map : listMaps) {
|
||||
map.put(fieldName.toLowerCase(), value);
|
||||
}
|
||||
}
|
||||
}
|
||||
method = pd.getWriteMethod();
|
||||
method.invoke(newMaatConfig, listMaps);
|
||||
}
|
||||
newMaatConfigList.set(i, newMaatConfig);
|
||||
} catch (Exception e) {
|
||||
// TODO Auto-generated catch block
|
||||
throw new RestServiceException("未找到域列表,请检查配置文件中域类型是否正确!:" + e.getMessage(),
|
||||
RestBusinessCode.service_runtime_error.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (configMap.containsKey(dbIndex)) {
|
||||
configMap.get(dbIndex).addAll(newMaatConfigList);
|
||||
} else {
|
||||
List<MaatConfig> list = new ArrayList<MaatConfig>();
|
||||
list.addAll(newMaatConfigList);
|
||||
configMap.put(dbIndex, list);
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
throw new ServiceRuntimeException(RestBusinessCode.ServiceNoFoundDBIndex.getErrorReason(),
|
||||
RestBusinessCode.ServiceNoFoundDBIndex.getValue());
|
||||
}
|
||||
}
|
||||
logger.info("---------------调用Redis maat配置新增接口---------------------");
|
||||
configRedisService.saveMaatConfig(configMap);
|
||||
}
|
||||
|
||||
private Map<String, String> convertObjectToMap(Object obj, Class clazz) throws Exception {
|
||||
Map<String, String> dstMap = new HashMap<String, String>();
|
||||
Field[] fields = obj.getClass().getDeclaredFields();
|
||||
@@ -630,6 +809,96 @@ public class ConfigSourcesService extends BaseService {
|
||||
configRedisService.saveUnMaatConfig(configMap);
|
||||
}
|
||||
|
||||
public void saveCommonSources(String jsonString) throws Exception {
|
||||
JsonArray jsonObjectList = null;
|
||||
try {
|
||||
jsonObjectList = new JsonParser().parse(jsonString).getAsJsonArray();
|
||||
} catch (Exception e) {
|
||||
// TODO: handle exception
|
||||
throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(),
|
||||
RestBusinessCode.CBParamFormateError.getValue());
|
||||
}
|
||||
Map<Integer, List<Map<String, String>>> dstMaps = new HashMap<Integer, List<Map<String, String>>>();
|
||||
for (int i = 0; i < jsonObjectList.size(); i++) {
|
||||
JsonObject jsonObj = (JsonObject) jsonObjectList.get(i);
|
||||
Map<String, Object> srcMap = JSONObject.fromObject(JSONObject.fromObject((jsonObj.toString())));
|
||||
if (srcMap.containsKey("service")) {
|
||||
Map<String, String> dstMap = new HashMap<String, String>();
|
||||
List<CommonSourceFieldCfg> commonSourceFieldCfgList = ReadCommSourceXmlUtil
|
||||
.getCommonSourceCfgByService(srcMap.get("service").toString().trim());
|
||||
|
||||
// 获取IP类型
|
||||
Integer ipType = null;
|
||||
String ipTypeName = "";
|
||||
|
||||
for (CommonSourceFieldCfg commonSourceFieldCfg : commonSourceFieldCfgList) {
|
||||
if (commonSourceFieldCfg.getDstName().equals("addr_type")) {
|
||||
String dstVal = srcMap.get(commonSourceFieldCfg.getSrcName()).toString();
|
||||
ipType = Integer.parseInt(dstVal);
|
||||
}
|
||||
}
|
||||
if (ipType == null) {
|
||||
ipType = 4;
|
||||
}
|
||||
for (CommonSourceFieldCfg commonSourceFieldCfg : commonSourceFieldCfgList) {
|
||||
// 字段类型 String Number Date Ip Port
|
||||
String dstStr = StringUtil.isEmpty(srcMap.get(commonSourceFieldCfg.getSrcName()))
|
||||
? commonSourceFieldCfg.getDefaultVal()
|
||||
: srcMap.get(commonSourceFieldCfg.getSrcName()).toString();
|
||||
if (!StringUtil.isEmpty(dstStr) && dstStr.startsWith("[") && dstStr.endsWith("]")) {
|
||||
dstStr = srcMap.get(dstStr.substring(1, dstStr.length() - 1)).toString();
|
||||
}
|
||||
|
||||
if ("dstFile".equals(commonSourceFieldCfg.getSrcName())) {
|
||||
if ("dst_file".equals(commonSourceFieldCfg.getDstName())) {
|
||||
String maatTableName = ServiceAndRDBIndexReal
|
||||
.getUnMaatTableName(Integer.valueOf(srcMap.get("service").toString().trim()));
|
||||
String dstPath = Constants.MM_SAMPLE_DST_PATH.replace("{tableType}",
|
||||
maatTableName.substring(maatTableName.lastIndexOf("_") + 1));
|
||||
dstStr = dstPath.replace("{fileName}", dstStr.substring(dstStr.lastIndexOf("/") + 1));
|
||||
} else if ("file_id".equals(commonSourceFieldCfg.getDstName())) {
|
||||
// dstStr = dstStr.substring(dstStr.indexOf("group"));
|
||||
}
|
||||
}
|
||||
dstMap.put(commonSourceFieldCfg.getDstName(), dstStr);
|
||||
}
|
||||
if (StringUtil.isEmpty(dstMaps.get(Integer.valueOf(srcMap.get("service").toString())))) {
|
||||
List<Map<String, String>> list = new ArrayList<Map<String, String>>();
|
||||
list.add(dstMap);
|
||||
dstMaps.put(Integer.valueOf(srcMap.get("service").toString()), list);
|
||||
} else {
|
||||
List<Map<String, String>> list = dstMaps.get(Integer.valueOf(srcMap.get("service").toString()));
|
||||
list.add(dstMap);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info("------------------调用非maat配置新增接口-------------------");
|
||||
// 按service分库
|
||||
Map<Integer, List<Map<String, String>>> configMap = new HashMap<Integer, List<Map<String, String>>>();
|
||||
Iterator serviceIterator = dstMaps.keySet().iterator();
|
||||
while (serviceIterator.hasNext()) {
|
||||
Integer service = Integer.valueOf(serviceIterator.next().toString());
|
||||
List<Integer> dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service);
|
||||
if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) {
|
||||
for (Integer dbIndex : dbIndexList) {
|
||||
if (configMap.containsKey(dbIndex)) {
|
||||
configMap.get(dbIndex).addAll(dstMaps.get(service));
|
||||
} else {
|
||||
List<Map<String, String>> list = new ArrayList<Map<String, String>>();
|
||||
list.addAll(dstMaps.get(service));
|
||||
configMap.put(dbIndex, list);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new ServiceRuntimeException("service与写入数据库序号映射关系不存在",
|
||||
RestBusinessCode.ServiceNoFoundDBIndex.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
configRedisService.saveUnMaatConfig(configMap);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @Description:回调类配置状态更新(停/启用)
|
||||
@@ -1077,7 +1346,7 @@ public class ConfigSourcesService extends BaseService {
|
||||
* @param value
|
||||
*/
|
||||
public void setRedisClusterKey(String key, String value) {
|
||||
JedisClusterUtils.set(key, value, 0);
|
||||
JedisClusterUtils.set(key, value, 86400);// 24小时超时
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -19,31 +19,52 @@ import com.nis.restful.RestBusinessCode;
|
||||
import com.nis.restful.ServiceRuntimeException;
|
||||
import com.nis.util.Constants;
|
||||
import com.nis.util.ExceptionUtil;
|
||||
import com.nis.util.JedisUtils;
|
||||
import com.nis.web.service.AuditLogThread;
|
||||
import com.nis.web.service.SpringContextHolder;
|
||||
import com.nis.web.service.restful.ConfigSourcesService;
|
||||
import com.zdjizhi.utils.JsonMapper;
|
||||
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
import redis.clients.jedis.exceptions.JedisException;
|
||||
|
||||
@Component
|
||||
@PropertySource(value = { "classpath:nis.properties" })
|
||||
public class SyncAllConfigTask {
|
||||
private static Logger logger = LoggerFactory.getLogger(SyncAllConfigTask.class);
|
||||
|
||||
@Autowired
|
||||
private JedisCluster jedisCluster;
|
||||
// @Autowired
|
||||
// private JedisCluster jedisCluster;
|
||||
@Autowired
|
||||
protected ConfigSourcesService configSourcesService;
|
||||
// @Scheduled(cron = "${syncUiAndServiceConfigCron}")
|
||||
|
||||
/**
|
||||
* 每次使用获取新连接,避免集群某些机器宕机后影响连接使用,注意需要在applicationContext-redis.xml中修改jedisCluster的scope为 prototype
|
||||
*
|
||||
* @return
|
||||
* @throws JedisException
|
||||
*/
|
||||
public static JedisCluster getResource() throws JedisException {
|
||||
JedisCluster jedisCluster = SpringContextHolder.getBean(JedisCluster.class);
|
||||
if (jedisCluster == null) {
|
||||
throw new ServiceRuntimeException("无法获取redis-cluster连接,请联系管理员检查程序",
|
||||
RestBusinessCode.CannotConnectionRedis.getValue());
|
||||
}
|
||||
return jedisCluster;
|
||||
|
||||
}
|
||||
|
||||
@Scheduled(cron = "${syncUiAndServiceConfigCron}")
|
||||
public void syncRedisToCluster() {
|
||||
String requestId = UUID.randomUUID().toString();
|
||||
JedisCluster jedisCluster = getResource();
|
||||
Map<Integer, Map<String, String>> map = null;
|
||||
try {
|
||||
//
|
||||
if (lock(requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
|
||||
if (lock(jedisCluster, requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
|
||||
// if (true) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务
|
||||
String allConfigSyncStatus = jedisCluster.get("allConfigSyncStatus");
|
||||
if (allConfigSyncStatus != null) {// 配置初始化完成
|
||||
if (allConfigSyncStatus.trim().equals("1")) {
|
||||
logger.warn("");
|
||||
map = getAllSeqAndVersion();
|
||||
// 设置配置同步状态为正在进行
|
||||
configSourcesService.setAllConfigSyncStatus("2");
|
||||
logger.warn("开始执行配置全量导入操作,将allConfigSyncStatus值设置为2正在进行导入操作");
|
||||
@@ -61,12 +82,13 @@ public class SyncAllConfigTask {
|
||||
maatMap.put(md5, val);
|
||||
}
|
||||
}
|
||||
addConfigToRedis(maatMap,true);
|
||||
addConfigToRedis(unMaatMap,false);
|
||||
flushRedisDb();
|
||||
addConfigToRedis(maatMap, true);
|
||||
addConfigToRedis(unMaatMap, false);
|
||||
logger.warn("执行配置全量导入成功,将allConfigSyncStatus值设置为3,导入成功");
|
||||
// 设置配置同步状态为写redis成功
|
||||
configSourcesService.setAllConfigSyncStatus("3");
|
||||
|
||||
|
||||
// 删除存储全量配置key的关系key
|
||||
jedisCluster.del("allConfigSyncKey");
|
||||
for (String key : split) {
|
||||
@@ -92,12 +114,22 @@ public class SyncAllConfigTask {
|
||||
configSourcesService.setAllConfigSyncStatus("-1");
|
||||
logger.error("执行配置全量导入失败,将allConfigSyncStatus值设置为-1,导入失败");
|
||||
} finally {
|
||||
unlock(requestId);
|
||||
closeConn();
|
||||
unlock(jedisCluster, requestId);
|
||||
closeConn(jedisCluster);
|
||||
if (map != null && map.size() > 0) {
|
||||
recoverRedisData(map);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void addConfigToRedis(Map<String, String> map, boolean isMaat) throws Exception {
|
||||
/**
|
||||
* 保存数据入库,有验证逻辑
|
||||
*
|
||||
* @param map
|
||||
* @param isMaat
|
||||
* @throws Exception
|
||||
*/
|
||||
public void addConfigToRedisYZ(Map<String, String> map, boolean isMaat) throws Exception {
|
||||
long time = System.currentTimeMillis();
|
||||
StringBuffer sb = new StringBuffer();
|
||||
if (isMaat) {
|
||||
@@ -116,7 +148,35 @@ public class SyncAllConfigTask {
|
||||
|
||||
}
|
||||
|
||||
public void closeConn() {
|
||||
/**
|
||||
* 保存数据入库,无验证逻辑
|
||||
*
|
||||
* @param map
|
||||
* @param isMaat
|
||||
* @throws Exception
|
||||
*/
|
||||
private void addConfigToRedis(Map<String, String> map, boolean isMaat) throws Exception {
|
||||
if (isMaat) {
|
||||
for (Entry<String, String> entry : map.entrySet()) {
|
||||
ConfigSource configSource = new JsonMapper().fromJson(entry.getValue(), ConfigSource.class);
|
||||
configSourcesService.saveMaatConfig(configSource.getConfigCompileList());
|
||||
}
|
||||
} else {
|
||||
for (Entry<String, String> entry : map.entrySet()) {
|
||||
String value = entry.getValue();
|
||||
configSourcesService.saveCommonSources(value);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭集群连接
|
||||
*
|
||||
* @param jedisCluster
|
||||
*/
|
||||
private void closeConn(JedisCluster jedisCluster) {
|
||||
if (jedisCluster != null) {
|
||||
try {
|
||||
jedisCluster.close();
|
||||
@@ -142,7 +202,7 @@ public class SyncAllConfigTask {
|
||||
* @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁
|
||||
* @return 是否获取成功
|
||||
*/
|
||||
public Boolean lock(String requestId) {
|
||||
private Boolean lock(JedisCluster jedisCluster, String requestId) {
|
||||
String key = "allConfigSyncDistributedLock";
|
||||
String var1 = jedisCluster.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME,
|
||||
Constants.CONFIGSYNCLOCKTIME);
|
||||
@@ -157,7 +217,7 @@ public class SyncAllConfigTask {
|
||||
*
|
||||
* @param requestId
|
||||
*/
|
||||
protected boolean unlock(String requestId) {
|
||||
private boolean unlock(JedisCluster jedisCluster, String requestId) {
|
||||
String key = "allConfigSyncDistributedLock";
|
||||
// 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】
|
||||
String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end";
|
||||
@@ -169,4 +229,58 @@ public class SyncAllConfigTask {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从配置redis库中获取没个redisdb的maat_version,0,14,15号库不会有maat_version所以就不获取了
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
private Map<Integer, Map<String, String>> getAllSeqAndVersion() {
|
||||
// 第一个key是redisdb,第二个key是redis的
|
||||
Map<Integer, Map<String, String>> map = new HashMap<>();
|
||||
for (int i = 1; i < 14; i++) {
|
||||
String maatVersionStr = JedisUtils.get("MAAT_VERSION", i);
|
||||
if (!map.containsKey(i)) {
|
||||
Map<String, String> keyValMap = new HashMap<>();
|
||||
if (maatVersionStr != null) {
|
||||
keyValMap.put("MAAT_VERSION", maatVersionStr);
|
||||
map.put(i, keyValMap);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
// String seqCompileid = JedisUtils.get("SEQ_COMPILEID", 0);
|
||||
// String seqGroupid = JedisUtils.get("SEQ_GROUPID", 0);
|
||||
// String seqRegionid = JedisUtils.get("SEQ_REGIONID", 0);
|
||||
// Map<String, String> keyValMap = new HashMap<>();
|
||||
// keyValMap.put("SEQ_COMPILEID", seqCompileid);
|
||||
// keyValMap.put("SEQ_GROUPID", seqGroupid);
|
||||
// keyValMap.put("SEQ_REGIONID", seqRegionid);
|
||||
// map.put(0, keyValMap);
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空配置redis库,不清空0号库
|
||||
*/
|
||||
private void flushRedisDb() {// 不清空0号库
|
||||
for (int i = 1; i < 16; i++) {
|
||||
JedisUtils.getResource(i).flushDB();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 恢复配置redis库中各个索引的maat_version
|
||||
*
|
||||
* @param map
|
||||
*/
|
||||
private void recoverRedisData(Map<Integer, Map<String, String>> map) {
|
||||
for (Integer redisDB : map.keySet()) {
|
||||
Map<String, String> keyValMap = map.get(redisDB);
|
||||
for (String redisKey : keyValMap.keySet()) {
|
||||
JedisUtils.set(redisKey, String.valueOf(Long.parseLong(keyValMap.get(redisKey)) + 2l), 0, redisDB);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user