From 3ecda50e80fde917d6fddee4113032fe00d348be Mon Sep 17 00:00:00 2001 From: zhangdongxu Date: Mon, 26 Nov 2018 14:20:19 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=85=A8=E9=87=8F?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/nis/restful/RestBusinessCode.java | 10 ++++ .../web/controller/BaseRestController.java | 9 ++- .../restful/ConfigSourcesController.java | 56 +++++++++++++++++++ 3 files changed, 73 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/nis/restful/RestBusinessCode.java b/src/main/java/com/nis/restful/RestBusinessCode.java index 89c5dd6..2c5135a 100644 --- a/src/main/java/com/nis/restful/RestBusinessCode.java +++ b/src/main/java/com/nis/restful/RestBusinessCode.java @@ -29,6 +29,16 @@ public enum RestBusinessCode { */ missing_args (1002,"缺少必要的参数信息"), + /** + * 已接收,响应成功 + */ + syncStatusSuccessed(2021000,"已接收,响应成功"), + + /** + * 已接收,响应失败(原因可能正在忙,指令不规范等) + */ + syncStatusFailed(2021010,"已接收,响应失败(原因可能正在忙,指令不规范等)"), + /** * 操作行为错误,1-插入2-更新 3-删除4-查询 ,插入时选择了删除这种错误返回该异常代码 */ diff --git a/src/main/java/com/nis/web/controller/BaseRestController.java b/src/main/java/com/nis/web/controller/BaseRestController.java index d7043a3..dec2eae 100644 --- a/src/main/java/com/nis/web/controller/BaseRestController.java +++ b/src/main/java/com/nis/web/controller/BaseRestController.java @@ -207,8 +207,13 @@ public class BaseRestController { restResult.setStatus(HttpStatus.CREATED); restResult.setBusinessCode(RestBusinessCode.update_success); } else if (requestMethod.equals(RequestMethod.POST.name())) { - restResult.setStatus(HttpStatus.CREATED); - restResult.setBusinessCode(RestBusinessCode.add_success); + if (thread.getBusinessCode()!=0&&(HttpStatus.ACCEPTED+"").equals((thread.getBusinessCode()+"").substring(0, 3))) { + restResult.setStatus(HttpStatus.ACCEPTED); + restResult.setBusinessCode(RestBusinessCode.valueOf(thread.getBusinessCode())); + }else{ + restResult.setStatus(HttpStatus.CREATED); + restResult.setBusinessCode(RestBusinessCode.add_success); + } } else if (requestMethod.equals(RequestMethod.PATCH.name())) { restResult.setStatus(HttpStatus.CREATED); restResult.setBusinessCode(RestBusinessCode.update_success); diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java index e54ee69..61f56f4 100644 --- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java +++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java @@ -504,6 +504,62 @@ public class ConfigSourcesController extends BaseRestController { return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "MAAT规则分组复用域配置删除成功" + sb.toString(), Constants.IS_DEBUG ?groupReuseSource : null); } + @RequestMapping(value = "/cfg_batch/v1/configSources", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE) + @ApiOperation(value = "全量同步配置接收接口", httpMethod = "POST", response = Map.class, notes = "接收全量同步配置") + public Map receiveConfigSources(@RequestBody String jsonString, HttpServletRequest request, + HttpServletResponse response) { + long start = System.currentTimeMillis(); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, + null); + try { + JSONObject obj= JSONObject.fromObject(jsonString); + logger.info("-----------接收到json格式数据"+new Date()); + } catch (Exception e) { + // TODO: handle exception + throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + e.getMessage(), + RestBusinessCode.config_integrity_error.getValue()); + } + + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "已接收全量同步配置信息", + Constants.IS_DEBUG ? jsonString : null); + } + @RequestMapping(value = "/cfg_batch/v1/status", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE) + @ApiOperation(value = "配置全量同步指令下发接口", httpMethod = "POST", response = Map.class, notes = "下发配置同步指令") + public Map acceptStatus(@RequestBody String jsonString, HttpServletRequest request, + HttpServletResponse response) { + long start = System.currentTimeMillis(); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, + null); + JSONObject obj= JSONObject.fromObject(jsonString); + + if (!StringUtil.isEmpty(obj.get("syncStatus"))&&"1".equals(obj.get("syncStatus").toString())) { + logger.info("-----------配置同步指令下发:"+new Date()); + }else{ + logger.error("未获取到同步状态码"); + thread.setBusinessCode(RestBusinessCode.syncStatusFailed.getValue()); + throw new RestServiceException("未获取到同步状态码", RestBusinessCode.syncStatusFailed.getValue()); + } + + thread.setBusinessCode(RestBusinessCode.syncStatusSuccessed.getValue()); + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "配置同步指令下发成功", + Constants.IS_DEBUG ? jsonString : null); + } + + @RequestMapping(value = "/cfg_batch/v1/status", method = RequestMethod.GET) + @ApiOperation(value = "获取全量同步状态服务接口", httpMethod = "GET", response = Map.class, notes = "获取全量同步状态服务") + public Map getSyncStatus(HttpServletRequest request, + HttpServletResponse response) { + long start = System.currentTimeMillis(); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, + null); + + JSONObject obj = new JSONObject(); + obj.put("service", "ntc"); + obj.put("status", 0); + obj.put("opTime", "2018-11-23 08:31:27"); + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取全量同步状态成功", + obj); + } private void validateGroupReuseSource(AuditLogThread thread, long start, GroupReuseSource groupReuseSource) { String errorInfo = ""; From 94d41754b62e87db60efe9c69ffc6f32272b1b27 Mon Sep 17 00:00:00 2001 From: zhangdongxu Date: Tue, 27 Nov 2018 17:45:33 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=86=85=E5=AE=B9=E4=BF=AE=E6=94=B9=EF=BC=8C?= =?UTF-8?q?post=E8=AF=B7=E6=B1=82=E8=BF=94=E5=9B=9E=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E7=A0=81status=E6=94=B9=E4=B8=BA201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/controller/BaseRestController.java | 5 +-- .../restful/ConfigSourcesController.java | 42 ++++++++++++++++--- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/nis/web/controller/BaseRestController.java b/src/main/java/com/nis/web/controller/BaseRestController.java index 832b59a..fefc09c 100644 --- a/src/main/java/com/nis/web/controller/BaseRestController.java +++ b/src/main/java/com/nis/web/controller/BaseRestController.java @@ -209,11 +209,10 @@ public class BaseRestController { restResult.setStatus(HttpStatus.CREATED); restResult.setBusinessCode(RestBusinessCode.update_success); } else if (requestMethod.equals(RequestMethod.POST.name())) { - if (thread.getBusinessCode()!=0&&(HttpStatus.ACCEPTED+"").equals((thread.getBusinessCode()+"").substring(0, 3))) { - restResult.setStatus(HttpStatus.ACCEPTED); + restResult.setStatus(HttpStatus.CREATED); + if (thread.getBusinessCode()!=0) { restResult.setBusinessCode(RestBusinessCode.valueOf(thread.getBusinessCode())); }else{ - restResult.setStatus(HttpStatus.CREATED); restResult.setBusinessCode(RestBusinessCode.add_success); } } else if (requestMethod.equals(RequestMethod.PATCH.name())) { diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java index 616d252..7c43803 100644 --- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java +++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java @@ -2,6 +2,7 @@ package com.nis.web.controller.restful; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -10,7 +11,6 @@ import java.util.UUID; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import com.zdjizhi.utils.StringUtil; import net.sf.json.JSONObject; import org.apache.commons.codec.digest.DigestUtils; @@ -43,6 +43,7 @@ import com.nis.web.service.restful.ConfigSourcesService; import com.wordnik.swagger.annotations.Api; import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiParam; +import com.zdjizhi.utils.StringUtil; /** * @ClassName: ConfigSourcesController @@ -552,11 +553,41 @@ public class ConfigSourcesController extends BaseRestController { long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); + thread.setContent("全量同步不记录请求内容"); try { - JSONObject obj= JSONObject.fromObject(jsonString); - logger.info("-----------接收到json格式数据"+new Date()); + + String configType = request.getHeader("Config-Type"); + String serviceId = request.getHeader("Service-Id"); + String configTable = request.getHeader("Config-Table"); + String lastCompletedTag = request.getHeader("Last-Completed-Tag"); + logger.info(new Date()+"-----------接收到json格式数据:"+jsonString+"-------:"); + if(StringUtil.isEmpty(serviceId)){ + logger.error("未在请求头中获取到serviceId"); + throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到serviceId", + RestBusinessCode.config_integrity_error.getValue()); + } + if (!StringUtil.isEmpty(lastCompletedTag)) { + System.out.println("接收全量同步配置:FINISHED"); + logger.info("接收全量同步配置:FINISHED"); + } + if(StringUtil.isEmpty(configType)){ + logger.error("未在请求头中获取到Config-Type"); + throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到Config-Type", + RestBusinessCode.config_integrity_error.getValue()); + }else{ + if (!("1".equals(configType)||"2".equals(configType))) { + logger.error("Config-Type的值只能是1(maat)和2(回调)"); + throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + "Config-Type的值只能是1(maat)和2(回调)", + RestBusinessCode.config_integrity_error.getValue()); + + } + } + + logger.info("-----------开始存储到json格式数据------->>configType:"+configType+",serviceId:"+serviceId+",configTable:"+configTable+",lastCompletedTag:"+lastCompletedTag); +// } catch (Exception e) { // TODO: handle exception + logger.error(e.getMessage()); throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + e.getMessage(), RestBusinessCode.config_integrity_error.getValue()); } @@ -593,13 +624,14 @@ public class ConfigSourcesController extends BaseRestController { long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, null); - + List list = new ArrayList(); JSONObject obj = new JSONObject(); obj.put("service", "ntc"); obj.put("status", 0); obj.put("opTime", "2018-11-23 08:31:27"); + list.add(obj); return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取全量同步状态成功", - obj); + list); } private void validateGroupReuseSource(AuditLogThread thread, long start, GroupReuseSource groupReuseSource) { From ab0d5575ffe817dd18fa0b352c03cce42a145db9 Mon Sep 17 00:00:00 2001 From: renkaige Date: Sun, 2 Dec 2018 15:29:45 +0600 Subject: [PATCH 3/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=95=8C=E9=9D=A2?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=85=A8=E9=87=8F=E9=85=8D=E7=BD=AE=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/nis/util/Constants.java | 8 +- .../java/com/nis/util/JedisClusterUtils.java | 361 ++++++++++++++++++ src/main/java/com/nis/util/JedisUtils.java | 8 +- .../restful/ConfigSourcesController.java | 111 +++--- .../service/restful/ConfigSourcesService.java | 129 ++++++- .../com/nis/web/task/SyncAllConfigTask.java | 172 +++++++++ src/main/resources/nis.properties | 12 +- 7 files changed, 732 insertions(+), 69 deletions(-) create mode 100644 src/main/java/com/nis/util/JedisClusterUtils.java create mode 100644 src/main/java/com/nis/web/task/SyncAllConfigTask.java diff --git a/src/main/java/com/nis/util/Constants.java b/src/main/java/com/nis/util/Constants.java index 7489828..e2a4ed7 100644 --- a/src/main/java/com/nis/util/Constants.java +++ b/src/main/java/com/nis/util/Constants.java @@ -219,9 +219,13 @@ public final class Constants { /** - * redis分布式锁超时时间,默认五分钟,3000秒 + * redis分布式锁超时时间,默认五分钟,300秒 */ - public static final Long REDISLOCKTIME=Configurations.getLongProperty("redisLockTime", 3000); + public static final Long REDISLOCKTIME=Configurations.getLongProperty("redisLockTime", 300); + /** + * 全量数据同步分布式锁时间,默认两个小时 + */ + public static final Long CONFIGSYNCLOCKTIME=Configurations.getLongProperty("configSyncLockTime", 7200); /** * 获取redis分布式锁失败后的尝试获取锁的次数,每次失败暂停一秒钟后再次尝试 */ diff --git a/src/main/java/com/nis/util/JedisClusterUtils.java b/src/main/java/com/nis/util/JedisClusterUtils.java new file mode 100644 index 0000000..e5d1044 --- /dev/null +++ b/src/main/java/com/nis/util/JedisClusterUtils.java @@ -0,0 +1,361 @@ +package com.nis.util; + +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.nis.restful.RestBusinessCode; +import com.nis.restful.ServiceRuntimeException; +import com.nis.web.service.SpringContextHolder; + +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisException; + +/** + * @ClassName: JedisClusterUtils + * @Description: redis-cluster库工具类 + * @author (rkg) + * @date 2018年12月01日 上午10:31:00 + * @version V1.0 + */ +public class JedisClusterUtils { + private static Logger logger = LoggerFactory.getLogger(JedisClusterUtils.class); + + /** + * 获取缓存 + * + * @param key 键 + * @return 值 + */ + public static String get(String key) { + String value = null; + JedisCluster jedis = null; + try { + jedis = getResource(); + if (jedis.exists(key)) { + value = jedis.get(key); + value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null; + logger.debug("get {} = {}", key, value); + } + } catch (Exception e) { + throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败", + RestBusinessCode.KeyNotExistsInRedis.getValue()); + } finally { + returnResource(jedis); + } + return value; + } + + /** + * 获取缓存 + * + * @param key 键 + * @return 值 + */ + public static Object getObject(String key) { + Object value = null; + JedisCluster jedis = null; + try { + jedis = getResource(); + if (jedis.exists(getBytesKey(key))) { + value = toObject(jedis.get(getBytesKey(key))); + logger.debug("getObject {} = {}", key, value); + } + } catch (Exception e) { + throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败", + RestBusinessCode.KeyNotExistsInRedis.getValue()); + } finally { + returnResource(jedis); + } + return value; + } + + /** + * 可以作为获取唯一id的方法
+ * 将key对应的value加上指定的值,只有value可以转为数字时该方法才可用 + * + * @param String key + * @param long number 要减去的值 + * @return long 相加后的值 + */ + public static long incrBy(String key, long number) { + JedisCluster jedis = getResource(); + long len = jedis.incrBy(key, number); + returnResource(jedis); + return len; + } + + /** + * 设置缓存 + * + * @param key 键 + * @param value 值 + * @param cacheSeconds 超时时间,0为不超时 + * @return + */ + public static String set(String key, String value, int cacheSeconds) { + String result = null; + JedisCluster jedis = null; + try { + jedis = getResource(); + result = jedis.set(key, value); + if (cacheSeconds != 0) { + jedis.expire(key, cacheSeconds); + } + logger.debug("set {} = {}", key, value); + } catch (Exception e) { + throw new ServiceRuntimeException("向redis-cluster中设置zset失败,key=" + key + ",value=" + value, + RestBusinessCode.ZsetFailed.getValue()); + } finally { + returnResource(jedis); + } + return result; + } + + /** + * 获取List缓存 + * + * @param key 键 + * @return 值 + */ + public static List getList(String key) { + List value = null; + JedisCluster jedis = null; + try { + jedis = getResource(); + if (jedis.exists(key)) { + value = jedis.lrange(key, 0, -1); + logger.debug("getList {} = {}", key, value); + } + } catch (Exception e) { + throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败", + RestBusinessCode.KeyNotExistsInRedis.getValue()); + } finally { + returnResource(jedis); + } + return value; + } + + /** + * 获取List缓存 + * + * @param key 键 + * @return 值 + */ + public static List getObjectList(String key) { + List value = null; + JedisCluster jedis = null; + try { + jedis = getResource(); + if (jedis.exists(getBytesKey(key))) { + List list = jedis.lrange(getBytesKey(key), 0, -1); + value = Lists.newArrayList(); + for (byte[] bs : list) { + value.add(toObject(bs)); + } + logger.debug("getObjectList {} = {}", key, value); + } + } catch (Exception e) { + throw new ServiceRuntimeException("从redis-cluster中获取" + key + "对应的值失败", + RestBusinessCode.KeyNotExistsInRedis.getValue()); + } finally { + returnResource(jedis); + } + return value; + } + + /** + * 缓存是否存在 + * + * @param key 键 + * @return + */ + public static boolean exists(String key) { + boolean result = false; + JedisCluster jedis = null; + try { + jedis = getResource(); + result = jedis.exists(key); + logger.debug("exists {}", key); + } catch (Exception e) { + throw new ServiceRuntimeException("从redis-cluster中判断" + key + "是否存在失败", + RestBusinessCode.ExistsKeyFailed.getValue()); + } finally { + returnResource(jedis); + } + return result; + } + + /** + * 缓存是否存在 + * + * @param key 键 + * @return + */ + public static boolean existsObject(String key) { + boolean result = false; + JedisCluster jedis = null; + try { + jedis = getResource(); + result = jedis.exists(getBytesKey(key)); + logger.debug("existsObject {}", key); + } catch (Exception e) { + throw new ServiceRuntimeException("从redis-cluster中判断" + key + "是否存在失败", + RestBusinessCode.ExistsKeyFailed.getValue()); + } finally { + returnResource(jedis); + } + return result; + } + + /** + * 获取资源 + * + * @return + * @throws JedisException + */ + public static JedisCluster getResource() throws JedisException { + JedisCluster jedisCluster = SpringContextHolder.getBean(JedisCluster.class); + if (jedisCluster == null) { + throw new ServiceRuntimeException("无法获取redis-cluster连接,请联系管理员检查程序", + RestBusinessCode.CannotConnectionRedis.getValue()); + } + return jedisCluster; + + } + + /** + * 释放资源 + * + * @param jedis + * @param isBroken + */ + public static void returnBrokenResource(JedisCluster jedis) { + if (jedis != null) { + try { + jedis.close(); + } catch (Exception e) { + throw new ServiceRuntimeException("释放redis-cluster连接失败", RestBusinessCode.CannotConnectionRedis.getValue()); + } + } + } + + /** + * 释放资源 + * + * @param jedis + * @param isBroken + */ + public static void returnResource(JedisCluster jedis) { + if (jedis != null) { + try { + jedis.close(); + } catch (Exception e) { + throw new ServiceRuntimeException("释放redis-cluster连接失败", RestBusinessCode.CannotConnectionRedis.getValue()); + } + } + } + + /** + * 获取byte[]类型Key + * + * @param key + * @return + */ + public static byte[] getBytesKey(Object object) { + if (object instanceof String) { + return StringUtils.getBytes((String) object); + } else { + return ObjectUtils.serialize(object); + } + } + + /** + * Object转换byte[]类型 + * + * @param key + * @return + */ + public static byte[] toBytes(Object object) { + return ObjectUtils.serialize(object); + } + + /** + * byte[]型转换Object + * + * @param key + * @return + */ + public static Object toObject(byte[] bytes) { + return ObjectUtils.unserialize(bytes); + } + + // 设置成功返回的结果OK + private static final String LOCK_SUCCESS = "OK"; + // NX -- Only set the key if it does not already exist. XX -- Only set the key + // if it already exist + private static final String SET_IF_NOT_EXIST = "NX"; + // 失效单位秒(EX)还是毫秒(PX) + private static final String SET_WITH_EXPIRE_TIME = "EX"; + private static final Long UNLOCK_SUCCESS = 1L; + + /** + * 尝试获取分布式锁,如果没有key就set,有key就不操作 + * + * @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁 + * @return 是否获取成功 + */ + public static Boolean lock(String requestId) { + String key = "uiAndServiceConfigSyncLock"; + String var1 = getResource().set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, + Constants.REDISLOCKTIME); + if (LOCK_SUCCESS.equals(var1)) { + return true; + } + return false; + } + + /** + * 解锁操作 + * + * @param value 客户端标识(requestId) + * @return + */ + public static Boolean unLock(String value) { + String key = "uiAndServiceConfigSyncLock"; + // 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】 + String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end"; + // 这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】 + Object var2 = getResource().eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)); + if (UNLOCK_SUCCESS == var2) { + return true; + } + return false; + } + + /** + * 重试机制 + * + * @param value 客户端标识 + * @return + */ + public static Boolean lockRetry(String value) { + Boolean flag = false; + try { + for (int i = 0; i < Constants.REDISRETRYNUM; i++) { + flag = lock(value); + if (flag) { + break; + } + Thread.sleep(1000); + } + } catch (Exception e) { + logger.error("尝试获取redis分布式锁失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e)); + } + return flag; + } + +} diff --git a/src/main/java/com/nis/util/JedisUtils.java b/src/main/java/com/nis/util/JedisUtils.java index 54ea353..1fc18b5 100644 --- a/src/main/java/com/nis/util/JedisUtils.java +++ b/src/main/java/com/nis/util/JedisUtils.java @@ -14,7 +14,13 @@ import com.google.common.collect.Lists; import com.nis.restful.RestBusinessCode; import com.nis.restful.ServiceRuntimeException; import com.nis.web.service.SpringContextHolder; - +/** + * @ClassName: JedisUtils + * @Description: redis工具类 + * @author (rkg) + * @date 2018年03月5日 上午10:20:33 + * @version V1.0 + */ public class JedisUtils { private static Logger logger = LoggerFactory.getLogger(JedisUtils.class); private static final JedisPool jedisPool = SpringContextHolder.getBean(JedisPool.class); diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java index 7c43803..140cd23 100644 --- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java +++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java @@ -11,8 +11,6 @@ import java.util.UUID; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import net.sf.json.JSONObject; - import org.apache.commons.codec.digest.DigestUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; @@ -38,13 +36,14 @@ import com.nis.web.service.AuditLogThread; import com.nis.web.service.ServicesRequestLogService; import com.nis.web.service.fdfs.FastDFSFile; import com.nis.web.service.fdfs.FileManager; -import com.nis.web.service.restful.ConfigRedisService; import com.nis.web.service.restful.ConfigSourcesService; import com.wordnik.swagger.annotations.Api; import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiParam; import com.zdjizhi.utils.StringUtil; +import net.sf.json.JSONObject; + /** * @ClassName: ConfigSourcesController * @Description: 配置存储服务 @@ -63,9 +62,6 @@ public class ConfigSourcesController extends BaseRestController { @Autowired protected ServicesRequestLogService servicesRequestLogService; - @Autowired - ConfigRedisService configRedisService; - @RequestMapping(value = "/cfg/v1/configSources", method = RequestMethod.POST) @ApiOperation(value = "MAAT规则存储接口", httpMethod = "POST", response = Map.class, notes = "接收MAAT规则数据,存储到流量处理平台配置线中") @ApiParam(value = "MAAT规则对象", name = "configSource", required = true) @@ -549,69 +545,81 @@ public class ConfigSourcesController extends BaseRestController { @RequestMapping(value = "/cfg_batch/v1/configSources", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE) @ApiOperation(value = "全量同步配置接收接口", httpMethod = "POST", response = Map.class, notes = "接收全量同步配置") public Map receiveConfigSources(@RequestBody String jsonString, HttpServletRequest request, - HttpServletResponse response) { + HttpServletResponse response) { long start = System.currentTimeMillis(); - AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, - null); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); thread.setContent("全量同步不记录请求内容"); try { - + String configType = request.getHeader("Config-Type"); String serviceId = request.getHeader("Service-Id"); String configTable = request.getHeader("Config-Table"); String lastCompletedTag = request.getHeader("Last-Completed-Tag"); - logger.info(new Date()+"-----------接收到json格式数据:"+jsonString+"-------:"); - if(StringUtil.isEmpty(serviceId)){ + logger.info(new Date() + "-----------接收到json格式数据:" + jsonString + "-------:"); + if (StringUtil.isEmpty(serviceId)) { logger.error("未在请求头中获取到serviceId"); - throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到serviceId", - RestBusinessCode.config_integrity_error.getValue()); + throw new RestServiceException( + RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到serviceId", + RestBusinessCode.config_integrity_error.getValue()); } + if (StringUtil.isEmpty(configType)) { + logger.error("未在请求头中获取到Config-Type"); + throw new RestServiceException( + RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到Config-Type", + RestBusinessCode.config_integrity_error.getValue()); + } else { + if (!("1".equals(configType) || "2".equals(configType))) { + logger.error("Config-Type的值只能是1(maat)和2(回调)"); + throw new RestServiceException( + RestBusinessCode.config_integrity_error.getErrorReason() + "," + + "Config-Type的值只能是1(maat)和2(回调)", + RestBusinessCode.config_integrity_error.getValue()); + + } + } + logger.info("-----------开始存储到json格式数据------->>configType:" + configType + ",serviceId:" + serviceId + + ",configTable:" + configTable + ",lastCompletedTag:" + lastCompletedTag); + String key = null; + if ("1".equals(configType)) { + key = "MAAT"; + } else { + key = "UNMAAT"; + } + key = key + "-" + serviceId + "-" + UUID.randomUUID(); + configSourcesService.setRedisClusterKey(key, jsonString); + configSourcesService.setAllServiceKey(key); if (!StringUtil.isEmpty(lastCompletedTag)) { - System.out.println("接收全量同步配置:FINISHED"); + // 设置配置同步状态为接收配置完成 + configSourcesService.setAllConfigSyncStatus("1"); logger.info("接收全量同步配置:FINISHED"); } - if(StringUtil.isEmpty(configType)){ - logger.error("未在请求头中获取到Config-Type"); - throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + "未在请求头中获取到Config-Type", - RestBusinessCode.config_integrity_error.getValue()); - }else{ - if (!("1".equals(configType)||"2".equals(configType))) { - logger.error("Config-Type的值只能是1(maat)和2(回调)"); - throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + "Config-Type的值只能是1(maat)和2(回调)", - RestBusinessCode.config_integrity_error.getValue()); - - } - } - - logger.info("-----------开始存储到json格式数据------->>configType:"+configType+",serviceId:"+serviceId+",configTable:"+configTable+",lastCompletedTag:"+lastCompletedTag); -// } catch (Exception e) { // TODO: handle exception logger.error(e.getMessage()); - throw new RestServiceException(RestBusinessCode.config_integrity_error.getErrorReason() + "," + e.getMessage(), + throw new RestServiceException( + RestBusinessCode.config_integrity_error.getErrorReason() + "," + e.getMessage(), RestBusinessCode.config_integrity_error.getValue()); } return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "已接收全量同步配置信息", Constants.IS_DEBUG ? jsonString : null); } + @RequestMapping(value = "/cfg_batch/v1/status", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE) @ApiOperation(value = "配置全量同步指令下发接口", httpMethod = "POST", response = Map.class, notes = "下发配置同步指令") - public Map acceptStatus(@RequestBody String jsonString, HttpServletRequest request, - HttpServletResponse response) { + public Map acceptStatus(@RequestBody String jsonString, HttpServletRequest request, HttpServletResponse response) { long start = System.currentTimeMillis(); - AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, - null); - JSONObject obj= JSONObject.fromObject(jsonString); - - if (!StringUtil.isEmpty(obj.get("syncStatus"))&&"1".equals(obj.get("syncStatus").toString())) { - logger.info("-----------配置同步指令下发:"+new Date()); - }else{ + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); + JSONObject obj = JSONObject.fromObject(jsonString); + if (!StringUtil.isEmpty(obj.get("syncStatus")) && "1".equals(obj.get("syncStatus").toString())) { + logger.info("-----------配置同步指令下发:" + new Date()); + // 设置配置同步状态为开始 + configSourcesService.setAllConfigSyncStatus("0"); + } else { logger.error("未获取到同步状态码"); thread.setBusinessCode(RestBusinessCode.syncStatusFailed.getValue()); throw new RestServiceException("未获取到同步状态码", RestBusinessCode.syncStatusFailed.getValue()); } - thread.setBusinessCode(RestBusinessCode.syncStatusSuccessed.getValue()); return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "配置同步指令下发成功", Constants.IS_DEBUG ? jsonString : null); @@ -619,21 +627,19 @@ public class ConfigSourcesController extends BaseRestController { @RequestMapping(value = "/cfg_batch/v1/status", method = RequestMethod.GET) @ApiOperation(value = "获取全量同步状态服务接口", httpMethod = "GET", response = Map.class, notes = "获取全量同步状态服务") - public Map getSyncStatus(HttpServletRequest request, - HttpServletResponse response) { + public Map getSyncStatus(HttpServletRequest request, HttpServletResponse response) { long start = System.currentTimeMillis(); - AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, - null); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, null); List list = new ArrayList(); JSONObject obj = new JSONObject(); obj.put("service", "ntc"); - obj.put("status", 0); + obj.put("status", configSourcesService.getAllConfigSyncStatus()); obj.put("opTime", "2018-11-23 08:31:27"); list.add(obj); return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取全量同步状态成功", list); } - + private void validateGroupReuseSource(AuditLogThread thread, long start, GroupReuseSource groupReuseSource) { String errorInfo = ""; @@ -668,4 +674,15 @@ public class ConfigSourcesController extends BaseRestController { return false; } + @RequestMapping(value = "/cfg/v1/getConfigCount", method = RequestMethod.GET) + @ApiOperation(value = "获取有效无效的配置个数", httpMethod = "GET", response = Map.class, notes = "获取有效无效的配置个数") + public Map getConfigCount(HttpServletRequest request, HttpServletResponse response) { + long start = System.currentTimeMillis(); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_GET, request, null); + Map allConfigByScan = configSourcesService.getAllConfigByScan(); + allConfigByScan.putAll(configSourcesService.getAllConfig()); + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取有效无效的配置个数成功", +// configSourcesService.getAllConfig()); + allConfigByScan); + } } diff --git a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java index 1758443..453e386 100644 --- a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java +++ b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java @@ -8,16 +8,22 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; import net.sf.json.JSONObject; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.ScanParams; +import redis.clients.jedis.ScanResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -42,6 +48,8 @@ import com.nis.util.CamelUnderlineUtil; import com.nis.util.CompileVal; import com.nis.util.Constants; import com.nis.util.GroupReuseVal; +import com.nis.util.JedisClusterUtils; +import com.nis.util.JedisUtils; import com.nis.util.JsonMapper; import com.nis.util.ReadCommSourceXmlUtil; import com.nis.util.ServiceAndRDBIndexReal; @@ -126,8 +134,8 @@ public class ConfigSourcesService extends BaseService { StringBuffer sb) throws Exception { Map> maatMap = new HashMap>(); Map> configMap = new HashMap>(); - - if (configCompileList!=null&&configCompileList.size()>Constants.MAX_LIST_SIZE) { + + if (configCompileList != null && configCompileList.size() > Constants.MAX_LIST_SIZE) { thread.setSaveContentFlag(false); } for (ConfigCompile configCompile : configCompileList) { @@ -326,7 +334,7 @@ public class ConfigSourcesService extends BaseService { public void updateConfigSources(AuditLogThread thread, long start, List compileList, Date opTime, StringBuffer sb) throws Exception { Map> compileMap = new HashMap>(); - if (compileList!=null&&compileList.size()>Constants.MAX_LIST_SIZE) { + if (compileList != null && compileList.size() > Constants.MAX_LIST_SIZE) { thread.setSaveContentFlag(false); } if (null != compileList && compileList.size() > 0) { @@ -422,7 +430,7 @@ public class ConfigSourcesService extends BaseService { throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(), RestBusinessCode.CBParamFormateError.getValue()); } - if (jsonObjectList!=null&&jsonObjectList.size()>Constants.MAX_LIST_SIZE) { + if (jsonObjectList != null && jsonObjectList.size() > Constants.MAX_LIST_SIZE) { thread.setSaveContentFlag(false); } Map>> dstMaps = new HashMap>>(); @@ -495,7 +503,7 @@ public class ConfigSourcesService extends BaseService { maatTableName.substring(maatTableName.lastIndexOf("_") + 1)); dstStr = dstPath.replace("{fileName}", dstStr.substring(dstStr.lastIndexOf("/") + 1)); } else if ("file_id".equals(commonSourceFieldCfg.getDstName())) { - //dstStr = dstStr.substring(dstStr.indexOf("group")); + // dstStr = dstStr.substring(dstStr.indexOf("group")); } } switch (commonSourceFieldCfg.getFieldType()) { @@ -644,7 +652,7 @@ public class ConfigSourcesService extends BaseService { throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(), RestBusinessCode.CBParamFormateError.getValue()); } - if (jsonObjectList!=null&&jsonObjectList.size()>Constants.MAX_LIST_SIZE) { + if (jsonObjectList != null && jsonObjectList.size() > Constants.MAX_LIST_SIZE) { thread.setSaveContentFlag(false); } // @@ -878,18 +886,21 @@ public class ConfigSourcesService extends BaseService { } else { maatConfig.getIpRegionMapList().addAll(dstMapList); } - - if ((maatConfig.getStrRegionMapList()!=null&&maatConfig.getStrRegionMapList().size()>Constants.MAX_LIST_SIZE) - ||(maatConfig.getStrStrRegionMapList()!=null&&maatConfig.getStrStrRegionMapList().size()>Constants.MAX_LIST_SIZE) - ||(maatConfig.getIpRegionMapList()!=null&&maatConfig.getIpRegionMapList().size()>Constants.MAX_LIST_SIZE) - ||(maatConfig.getNumRegionMapList()!=null&&maatConfig.getNumRegionMapList().size()>Constants.MAX_LIST_SIZE)) { + + if ((maatConfig.getStrRegionMapList() != null + && maatConfig.getStrRegionMapList().size() > Constants.MAX_LIST_SIZE) + || (maatConfig.getStrStrRegionMapList() != null + && maatConfig.getStrStrRegionMapList().size() > Constants.MAX_LIST_SIZE) + || (maatConfig.getIpRegionMapList() != null + && maatConfig.getIpRegionMapList().size() > Constants.MAX_LIST_SIZE) + || (maatConfig.getNumRegionMapList() != null + && maatConfig.getNumRegionMapList().size() > Constants.MAX_LIST_SIZE)) { thread.setSaveContentFlag(false); } - //maatConfig.setService(groupReuse.getService()); + // maatConfig.setService(groupReuse.getService()); list.add(maatConfig); } - - + // 调用接口入redis logger.info("---------------调用Redis 分组复用配置新增接口---------------------"); configRedisService.saveGroupReuseConfig(list); @@ -957,9 +968,9 @@ public class ConfigSourcesService extends BaseService { } } logger.info("调用接口删除Redis中分组复用的域配置接口"); - if (reuseMap!=null&&reuseMap.size()>Constants.MAX_LIST_SIZE) { + if (reuseMap != null && reuseMap.size() > Constants.MAX_LIST_SIZE) { thread.setSaveContentFlag(false); - } + } // 所有的都删除成功返回true if (!configRedisService.delGroupReuseConfig(reuseMap)) { // if (!configRedisService.delGroupReuseConfigByPipeline(reuseMap)) { @@ -1040,4 +1051,90 @@ public class ConfigSourcesService extends BaseService { RestBusinessCode.ReuseRegionIsNull.getValue()); } } + + /** + * 设置配置全量同步状态 + * + * @param value + */ + public void setAllConfigSyncStatus(String value) { + JedisClusterUtils.set("allConfigSyncStatus", value, Constants.CONFIGSYNCLOCKTIME.intValue()); + } + + /** + * 获取配置全量同步状态 + * + * @return + */ + public String getAllConfigSyncStatus() { + return JedisClusterUtils.get("allConfigSyncStatus"); + } + + /** + * 将界面发过来的数据存储到rediscluster中 + * + * @param key + * @param value + */ + public void setRedisClusterKey(String key, String value) { + JedisClusterUtils.set(key, value, 0); + } + + /** + * 将所有业务的配置key记录下来,方便读取 + * + * @param value + */ + public void setAllServiceKey(String value) { + JedisCluster resource = JedisClusterUtils.getResource(); + resource.append("allConfigSyncKey", value + ";"); + } + + public Map getAllConfig() { + Jedis resource = JedisUtils.getResource(0); + Set effectiveSet = new HashSet<>(); + Set obsoleteSet = new HashSet<>(); + for (int i = 2; i < 6; i++) { + resource.select(i); + effectiveSet.addAll(resource.keys("EFFECTIVE_RULE:*_COMPILE*")); + obsoleteSet.addAll(resource.keys("OBSOLETE_RULE:*_COMPILE*")); + } + Map map = new HashMap<>(); + map.put("effectiveMaatKeys", effectiveSet.size()); + map.put("obsoleteMaatKeys", obsoleteSet.size()); + JedisUtils.returnBrokenResource(resource); + return map; + } + + public Map getAllConfigByScan() { + Jedis resource = JedisUtils.getResource(0); + Set effectiveSet = new HashSet<>(); + Set obsoleteSet = new HashSet<>(); + for (int i = 2; i < 6; i++) { + resource.select(i); + effectiveSet.addAll(getKeyByScan("EFFECTIVE_RULE:*_COMPILE*", resource)); + obsoleteSet.addAll(getKeyByScan("OBSOLETE_RULE:*_COMPILE*", resource)); + } + Map map = new HashMap<>(); + map.put("effectiveMaat", effectiveSet.size()); + map.put("obsoleteMaat", obsoleteSet.size()); + JedisUtils.returnBrokenResource(resource); + return map; + } + + public List getKeyByScan(String pattern, Jedis resource) { + List list = new ArrayList<>(); + int count = 1000; + String cursor = ScanParams.SCAN_POINTER_START; + ScanParams scanParams = new ScanParams(); + scanParams.count(count); + scanParams.match(pattern); + + do { + ScanResult scanResult = resource.scan(cursor, scanParams); + list.addAll(scanResult.getResult()); + cursor = scanResult.getStringCursor(); + } while (!"0".equals(cursor)); + return list; + } } diff --git a/src/main/java/com/nis/web/task/SyncAllConfigTask.java b/src/main/java/com/nis/web/task/SyncAllConfigTask.java new file mode 100644 index 0000000..7b69bf8 --- /dev/null +++ b/src/main/java/com/nis/web/task/SyncAllConfigTask.java @@ -0,0 +1,172 @@ +package com.nis.web.task; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.nis.domain.restful.ConfigSource; +import com.nis.restful.RestBusinessCode; +import com.nis.restful.ServiceRuntimeException; +import com.nis.util.Constants; +import com.nis.util.ExceptionUtil; +import com.nis.web.service.AuditLogThread; +import com.nis.web.service.restful.ConfigSourcesService; +import com.zdjizhi.utils.JsonMapper; + +import redis.clients.jedis.JedisCluster; + +@Component +@PropertySource(value = { "classpath:nis.properties" }) +public class SyncAllConfigTask { + private static Logger logger = LoggerFactory.getLogger(SyncAllConfigTask.class); + + @Autowired + private JedisCluster jedisCluster; + @Autowired + protected ConfigSourcesService configSourcesService; +// @Scheduled(cron = "${syncUiAndServiceConfigCron}") + public void syncRedisToCluster() { + String requestId = UUID.randomUUID().toString(); + try { +// + if (lock(requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务 + String allConfigSyncStatus = jedisCluster.get("allConfigSyncStatus"); + if (allConfigSyncStatus != null) {// 配置初始化完成 + if (allConfigSyncStatus.trim().equals("1")) { + logger.warn(""); + // 设置配置同步状态为正在进行 + configSourcesService.setAllConfigSyncStatus("2"); + logger.warn("开始执行配置全量导入操作,将allConfigSyncStatus值设置为2正在进行导入操作"); + Map maatMap = new HashMap<>(); + Map unMaatMap = new HashMap<>(); + String allConfigSyncKey = jedisCluster.get("allConfigSyncKey"); + if (allConfigSyncKey != null && !allConfigSyncKey.trim().equals("")) { + String[] split = org.apache.commons.lang.StringUtils.split(allConfigSyncKey, ";"); + for (String key : split) { + String val = jedisCluster.get(key); + String md5 = DigestUtils.md5Hex(val); + if (key.startsWith("UNMAAT")) { + unMaatMap.put(md5, val); + } else if (key.startsWith("MAAT")) { + maatMap.put(md5, val); + } + } + addConfigToRedis(maatMap,true); + addConfigToRedis(unMaatMap,false); + logger.warn("执行配置全量导入成功,将allConfigSyncStatus值设置为3,导入成功"); + // 设置配置同步状态为写redis成功 + configSourcesService.setAllConfigSyncStatus("3"); + + // 删除存储全量配置key的关系key + jedisCluster.del("allConfigSyncKey"); + for (String key : split) { + jedisCluster.del(key); + } + logger.warn("删除allConfigSyncKey,及其中的内容成功"); + } + } else { + logger.info( + "集群中allConfigSyncStatus的值是{}[开始:0(界面下发同步状态),初始化:1(配置接收完成状态),进行中:2(服务写redis),已完成:3(服务写redis完毕),失败:-1(服务写redis失败)],暂不执行全量配置同步操作", + allConfigSyncStatus); + } + } else { + logger.info("未从集群中获取到allConfigSyncStatus的值,暂不执行全量配置同步操作"); + } + + } else { + logger.info("没有从rediscluster中获取到allConfigSyncDistributedLock分布式锁,暂时不执行数据同步!"); + } + } catch (Exception e) { + logger.error("同步界面配置到redis中失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e)); + // 设置配置同步状态为写redis失败 + configSourcesService.setAllConfigSyncStatus("-1"); + logger.error("执行配置全量导入失败,将allConfigSyncStatus值设置为-1,导入失败"); + } finally { + unlock(requestId); + closeConn(); + } + } + + public void addConfigToRedis(Map map, boolean isMaat) throws Exception { + long time = System.currentTimeMillis(); + StringBuffer sb = new StringBuffer(); + if (isMaat) { + for (Entry entry : map.entrySet()) { + ConfigSource configSource = new JsonMapper().fromJson(entry.getValue(), ConfigSource.class); + configSourcesService.saveMaatConfig(new AuditLogThread(), time, configSource.getConfigCompileList(), + sb); + } + } else { + for (Entry entry : map.entrySet()) { + String value = entry.getValue(); + configSourcesService.saveCommonSources(new AuditLogThread(), time, value); + } + + } + + } + + public void closeConn() { + if (jedisCluster != null) { + try { + jedisCluster.close(); + } catch (Exception e) { + throw new ServiceRuntimeException("释放redis-cluster连接失败", + RestBusinessCode.CannotConnectionRedis.getValue()); + } + } + } + +// 设置成功返回的结果OK + private static final String LOCK_SUCCESS = "OK"; +// NX -- Only set the key if it does not already exist. XX -- Only set the key +// if it already exist + private static final String SET_IF_NOT_EXIST = "NX"; +// 失效单位秒(EX)还是毫秒(PX) + private static final String SET_WITH_EXPIRE_TIME = "EX"; + private static final Long UNLOCK_SUCCESS = 1L; + + /** + * 尝试获取分布式锁,如果没有key就set,有key就不操作 + * + * @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁 + * @return 是否获取成功 + */ + public Boolean lock(String requestId) { + String key = "allConfigSyncDistributedLock"; + String var1 = jedisCluster.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, + Constants.CONFIGSYNCLOCKTIME); + if (LOCK_SUCCESS.equals(var1)) { + return true; + } + return false; + } + + /** + * 解除redis分布式锁 + * + * @param requestId + */ + protected boolean unlock(String requestId) { + String key = "allConfigSyncDistributedLock"; + // 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】 + String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end"; + // 这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】 + Object var2 = jedisCluster.eval(luaScript, Collections.singletonList(key), + Collections.singletonList(requestId)); + if (UNLOCK_SUCCESS == var2) { + return true; + } + return false; + } +} diff --git a/src/main/resources/nis.properties b/src/main/resources/nis.properties index 04aeaab..b34bc84 100644 --- a/src/main/resources/nis.properties +++ b/src/main/resources/nis.properties @@ -213,7 +213,13 @@ fileProtocol=redis:// #是否开启日志查询count和last功能 isOpenLogCountAndLast=true -#redis分布式锁超时时间,默认五分钟,3000秒 -redisLockTime=3000 +#redis分布式锁超时时间,默认五分钟,300秒 +redisLockTime=300 #获取redis分布式锁失败后的尝试获取锁的次数,每次失败暂停一秒钟后再次尝试 -redisRetryNum=5 \ No newline at end of file +redisRetryNum=5 + +#定时检测界面和服务的配置是否需要同步的定时任务间隔 +syncUiAndServiceConfigCron=0/10 * * * * ? +#全量数据同步分布式锁时间,默认两个小时 +configSyncLockTime=7200 + From 6340ea3f9e9a67a7c21805e47e59cfc67bcdc964 Mon Sep 17 00:00:00 2001 From: renkaige Date: Mon, 3 Dec 2018 18:09:12 +0600 Subject: [PATCH 4/5] =?UTF-8?q?1:=E4=BF=AE=E6=94=B9=E5=85=A8=E9=87=8F?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=90=8C=E6=AD=A5,controller=E5=AF=B9?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E7=9A=84=E9=AA=8C=E8=AF=81=202:=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BB=8E=E9=9B=86=E7=BE=A4=E4=B8=AD=E6=81=A2=E5=A4=8D?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E7=9A=84=E5=8A=9F=E8=83=BD=E5=B9=B6=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E7=9B=B8=E5=85=B3=E7=9A=84maat=5Fversion=203:?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A0=E9=AA=8C=E8=AF=81=E7=9A=84=E4=BF=9D?= =?UTF-8?q?=E5=AD=98maat=E5=92=8Cunmaat=E7=B1=BB=E7=9A=84=E6=96=B9?= =?UTF-8?q?=E6=B3=95=204:=E4=BF=AE=E6=94=B9=E9=9B=86=E7=BE=A4=E4=B8=AD?= =?UTF-8?q?=E4=BF=9D=E5=AD=98=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE=E7=9A=84?= =?UTF-8?q?=E5=AE=9E=E6=95=88=E6=97=B6=E9=97=B4=E4=B8=BA24=E5=B0=8F?= =?UTF-8?q?=E6=97=B6=205:=E4=BF=AE=E6=94=B9=E9=9B=86=E7=BE=A4=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E8=BF=9E=E6=8E=A5=E7=9A=84=E6=96=B9=E5=BC=8F=E4=B8=BA?= =?UTF-8?q?=E6=AF=8F=E6=AC=A1=E4=BD=BF=E7=94=A8=E8=8E=B7=E5=8F=96,?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=AE=8C=E5=85=B3=E9=97=AD,=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E8=BF=9E=E6=8E=A5=E5=87=BA=E7=8E=B0=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../restful/ConfigSourcesController.java | 72 ++++- .../service/restful/ConfigSourcesService.java | 271 +++++++++++++++++- .../com/nis/web/task/SyncAllConfigTask.java | 146 ++++++++-- 3 files changed, 457 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java index 140cd23..7b0fb1f 100644 --- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java +++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java @@ -29,6 +29,7 @@ import com.nis.restful.RestServiceException; import com.nis.restful.ServiceRuntimeException; import com.nis.util.Constants; import com.nis.util.FileUtils; +import com.nis.util.JedisClusterUtils; import com.nis.util.MinioUtil; import com.nis.util.StringUtils; import com.nis.web.controller.BaseRestController; @@ -549,8 +550,17 @@ public class ConfigSourcesController extends BaseRestController { long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); thread.setContent("全量同步不记录请求内容"); + String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus(); + if (allConfigSyncStatus != null) { + if (!allConfigSyncStatus.equals("0")) { + throw new RestServiceException("后台同步指令为" + allConfigSyncStatus + ",请先下发配置同步指令,或等待后台数据同步进程完成后再次下发配置同步指令", + RestBusinessCode.config_integrity_error.getValue()); + } + } else { + throw new RestServiceException("没有获取到同步请求标志,请先下发配置同步指令", + RestBusinessCode.config_integrity_error.getValue()); + } try { - String configType = request.getHeader("Config-Type"); String serviceId = request.getHeader("Service-Id"); String configTable = request.getHeader("Config-Table"); @@ -579,16 +589,19 @@ public class ConfigSourcesController extends BaseRestController { } logger.info("-----------开始存储到json格式数据------->>configType:" + configType + ",serviceId:" + serviceId + ",configTable:" + configTable + ",lastCompletedTag:" + lastCompletedTag); - String key = null; - if ("1".equals(configType)) { - key = "MAAT"; - } else { - key = "UNMAAT"; + + if (jsonString != null && !jsonString.trim().equals("")) {// 张薇说body可能为空,有的service没有数据也会下发一次,譬如最后一次提交时,lastCompletedTag=finished,但是body可能是空的 + String key = null; + if ("1".equals(configType)) { + key = "MAAT"; + } else { + key = "UNMAAT"; + } + key = key + "-" + serviceId + "-" + UUID.randomUUID(); + configSourcesService.setRedisClusterKey(key, jsonString); + configSourcesService.setAllServiceKey(key); } - key = key + "-" + serviceId + "-" + UUID.randomUUID(); - configSourcesService.setRedisClusterKey(key, jsonString); - configSourcesService.setAllServiceKey(key); - if (!StringUtil.isEmpty(lastCompletedTag)) { + if (!StringUtil.isEmpty(lastCompletedTag) && lastCompletedTag.trim().toLowerCase().equals("finished")) { // 设置配置同步状态为接收配置完成 configSourcesService.setAllConfigSyncStatus("1"); logger.info("接收全量同步配置:FINISHED"); @@ -610,11 +623,35 @@ public class ConfigSourcesController extends BaseRestController { public Map acceptStatus(@RequestBody String jsonString, HttpServletRequest request, HttpServletResponse response) { long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); + JSONObject obj = JSONObject.fromObject(jsonString); - if (!StringUtil.isEmpty(obj.get("syncStatus")) && "1".equals(obj.get("syncStatus").toString())) { - logger.info("-----------配置同步指令下发:" + new Date()); - // 设置配置同步状态为开始 - configSourcesService.setAllConfigSyncStatus("0"); + if (!StringUtil.isEmpty(obj.get("syncStatus"))) { + String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus(); + if ("1".equals(obj.get("syncStatus").toString())) { + if (allConfigSyncStatus != null) { + if (allConfigSyncStatus.equals("0") || allConfigSyncStatus.equals("1") + || allConfigSyncStatus.equals("2")) { + throw new RestServiceException("已经下发过配置同步指令,请等待后台数据同步进程完成后再次下发配置同步指令", + RestBusinessCode.config_integrity_error.getValue()); + } + } + logger.info("-----------配置同步指令下发:" + new Date()); + // 设置配置同步状态为开始 + configSourcesService.setAllConfigSyncStatus("0"); + } else if ("0".equals(obj.get("syncStatus").toString())) {// 取消同步指令 + if (allConfigSyncStatus != null + && (allConfigSyncStatus.equals("0") || allConfigSyncStatus.equals("1"))) {// 只有在没有完全接收配置之前可以取消,否则不允许取消,因为接收完配置之后会把redis清空,所以这个时候不允许取消了 + // 设置配置同步状态为完成,并把之前记录的key删除 + configSourcesService.setAllConfigSyncStatus("3"); + if (JedisClusterUtils.exists("allConfigSyncKey")) { + JedisClusterUtils.getResource().del("allConfigSyncKey"); + } + } else { + throw new RestServiceException( + "配置同步指令状态为" + allConfigSyncStatus + ",不可执行取消操作了请等待后台数据同步进程完成后再次下发配置同步指令", + RestBusinessCode.config_integrity_error.getValue()); + } + } } else { logger.error("未获取到同步状态码"); thread.setBusinessCode(RestBusinessCode.syncStatusFailed.getValue()); @@ -633,7 +670,12 @@ public class ConfigSourcesController extends BaseRestController { List list = new ArrayList(); JSONObject obj = new JSONObject(); obj.put("service", "ntc"); - obj.put("status", configSourcesService.getAllConfigSyncStatus()); + String allConfigSyncStatus = configSourcesService.getAllConfigSyncStatus(); + if (allConfigSyncStatus == null || allConfigSyncStatus.trim().equals("")) { + obj.put("status", "-1"); + } else { + obj.put("status", configSourcesService.getAllConfigSyncStatus()); + } obj.put("opTime", "2018-11-23 08:31:27"); list.add(obj); return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "获取全量同步状态成功", diff --git a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java index 453e386..4791ba8 100644 --- a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java +++ b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java @@ -307,6 +307,185 @@ public class ConfigSourcesService extends BaseService { configRedisService.saveMaatConfig(configMap); } + /** + * @Description: + * @author(zdx) @date 2018年12月3日 下午6:48:32 + * @param configCompileList + * @throws Exception + */ + + public void saveMaatConfig(List configCompileList) throws Exception { + Map> maatMap = new HashMap>(); + Map> configMap = new HashMap>(); + + for (ConfigCompile configCompile : configCompileList) { + Integer service = Integer.valueOf(configCompile.getService().toString()); + MaatConfig maatConfig = new MaatConfig(); + + maatConfig.setService(service); + // 编译 + maatConfig.setCompileMap(convertObjectToMap(configCompile, ConfigCompile.class)); + // 分组 + List> dstMaplList = null; + if (!StringUtil.isEmpty(configCompile.getGroupRelationList())) { + dstMaplList = new ArrayList>(); + for (ConfigGroupRelation group : configCompile.getGroupRelationList()) { + dstMaplList.add(convertObjectToMap(group, ConfigGroupRelation.class)); + } + } + maatConfig.setGroupMapList(dstMaplList); + // 字符串域 + dstMaplList = null; + List> strongMapList = null; + if (!StringUtil.isEmpty(configCompile.getStrRegionList())) { + dstMaplList = new ArrayList>(); + for (StrRegion region : configCompile.getStrRegionList()) { + if (StringUtil.isEmpty(region.getDistrict())) { + dstMaplList.add(convertObjectToMap(region, StrRegion.class)); + } else { + if (StringUtil.isEmpty(strongMapList)) { + strongMapList = new ArrayList>(); + } + strongMapList.add(convertObjectToMap(region, StrRegion.class)); + } + } + } + maatConfig.setStrRegionMapList(dstMaplList); + // 增强字符串域 + if (!StringUtil.isEmpty(strongMapList) && strongMapList.size() > 0) { + maatConfig.setStrStrRegionMapList((strongMapList)); + } + // 数值域 + dstMaplList = null; + if (!StringUtil.isEmpty(configCompile.getNumRegionList())) { + dstMaplList = new ArrayList>(); + for (NumRegion region : configCompile.getNumRegionList()) { + dstMaplList.add(convertObjectToMap(region, NumRegion.class)); + } + } + maatConfig.setNumRegionMapList(dstMaplList); + + // Ip域 + dstMaplList = null; + if (!StringUtil.isEmpty(configCompile.getIpRegionList())) { + dstMaplList = new ArrayList>(); + for (IpRegion region : configCompile.getIpRegionList()) { + dstMaplList.add(convertObjectToMap(region, IpRegion.class)); + } + } + maatConfig.setIpRegionMapList(dstMaplList); + + // 摘要类域 + dstMaplList = null; + if (!StringUtil.isEmpty(configCompile.getDigestRegionList())) { + dstMaplList = new ArrayList>(); + for (DigestRegion region : configCompile.getDigestRegionList()) { + dstMaplList.add(convertObjectToMap(region, DigestRegion.class)); + } + } + + maatConfig.setFileDigestRegionMapList(dstMaplList); + + // 文本相似性域 + // dstMaplList = null; + // maatConfig.setFileLikeRegionMapList(dstMaplList); + + // 生效范围IP域 + dstMaplList = null; + if (!StringUtil.isEmpty(configCompile.getIpClientRangeList())) { + dstMaplList = new ArrayList>(); + for (IpRegion region : configCompile.getIpClientRangeList()) { + dstMaplList.add(convertObjectToMap(region, IpRegion.class)); + } + } + maatConfig.setIpClientRangeMapList(dstMaplList); + + if (maatMap.containsKey(service)) { + maatMap.get(service).add(maatConfig); + } else { + List maatCfgList = new ArrayList(); + maatCfgList.add(maatConfig); + maatMap.put(service, maatCfgList); + + } + } + + // 调用接口入redis + + Iterator serviceIterator = maatMap.keySet().iterator(); + while (serviceIterator.hasNext()) { + Integer service = Integer.valueOf(serviceIterator.next().toString()); + List dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service); + if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) { + for (Integer dbIndex : dbIndexList) { + // 分发到阀门有些业务需要添加编译属性到域配置 + List newMaatConfigList = new ArrayList(); + newMaatConfigList.addAll(maatMap.get(service)); + if (dbIndex.intValue() == ServiceAndRDBIndexReal.getValveDBIndex().intValue()) { + Map> maatToValueMap = ServiceAndRDBIndexReal.getMaatToValveMap(); + if (maatToValueMap.containsKey(service)) { + + Map regionAndFiledMap = maatToValueMap.get(service); + for (int i = 0; i < newMaatConfigList.size(); i++) { + MaatConfig maatConfig = newMaatConfigList.get(i); + MaatConfig newMaatConfig = (MaatConfig) JsonMapper + .fromJsonString(JsonMapper.toJsonString(maatConfig), MaatConfig.class); + Iterator iterator = regionAndFiledMap.keySet().iterator(); + while (iterator.hasNext()) { + String regionName = iterator.next().toString(); + PropertyDescriptor pd; + try { + pd = new PropertyDescriptor(regionName + "MapList", MaatConfig.class); + Method method = pd.getReadMethod(); + Object object = method.invoke(newMaatConfig); + + if (object != null) { + + List> listMaps = new ArrayList>(); + listMaps.addAll((List>) object); + String[] fields = regionAndFiledMap.get(regionName); + for (String fieldName : fields) { + String value = newMaatConfig.getCompileMap() + .get(fieldName.toLowerCase()); + if (!StringUtil.isEmpty(value)) { + for (Map map : listMaps) { + map.put(fieldName.toLowerCase(), value); + } + } + } + method = pd.getWriteMethod(); + method.invoke(newMaatConfig, listMaps); + } + newMaatConfigList.set(i, newMaatConfig); + } catch (Exception e) { + // TODO Auto-generated catch block + throw new RestServiceException("未找到域列表,请检查配置文件中域类型是否正确!:" + e.getMessage(), + RestBusinessCode.service_runtime_error.getValue()); + } + + } + } + } + } + + if (configMap.containsKey(dbIndex)) { + configMap.get(dbIndex).addAll(newMaatConfigList); + } else { + List list = new ArrayList(); + list.addAll(newMaatConfigList); + configMap.put(dbIndex, list); + } + + } + } else { + throw new ServiceRuntimeException(RestBusinessCode.ServiceNoFoundDBIndex.getErrorReason(), + RestBusinessCode.ServiceNoFoundDBIndex.getValue()); + } + } + logger.info("---------------调用Redis maat配置新增接口---------------------"); + configRedisService.saveMaatConfig(configMap); + } + private Map convertObjectToMap(Object obj, Class clazz) throws Exception { Map dstMap = new HashMap(); Field[] fields = obj.getClass().getDeclaredFields(); @@ -630,6 +809,96 @@ public class ConfigSourcesService extends BaseService { configRedisService.saveUnMaatConfig(configMap); } + public void saveCommonSources(String jsonString) throws Exception { + JsonArray jsonObjectList = null; + try { + jsonObjectList = new JsonParser().parse(jsonString).getAsJsonArray(); + } catch (Exception e) { + // TODO: handle exception + throw new RestServiceException(RestBusinessCode.CBParamFormateError.getErrorReason() + "," + e.getMessage(), + RestBusinessCode.CBParamFormateError.getValue()); + } + Map>> dstMaps = new HashMap>>(); + for (int i = 0; i < jsonObjectList.size(); i++) { + JsonObject jsonObj = (JsonObject) jsonObjectList.get(i); + Map srcMap = JSONObject.fromObject(JSONObject.fromObject((jsonObj.toString()))); + if (srcMap.containsKey("service")) { + Map dstMap = new HashMap(); + List commonSourceFieldCfgList = ReadCommSourceXmlUtil + .getCommonSourceCfgByService(srcMap.get("service").toString().trim()); + + // 获取IP类型 + Integer ipType = null; + String ipTypeName = ""; + + for (CommonSourceFieldCfg commonSourceFieldCfg : commonSourceFieldCfgList) { + if (commonSourceFieldCfg.getDstName().equals("addr_type")) { + String dstVal = srcMap.get(commonSourceFieldCfg.getSrcName()).toString(); + ipType = Integer.parseInt(dstVal); + } + } + if (ipType == null) { + ipType = 4; + } + for (CommonSourceFieldCfg commonSourceFieldCfg : commonSourceFieldCfgList) { + // 字段类型 String Number Date Ip Port + String dstStr = StringUtil.isEmpty(srcMap.get(commonSourceFieldCfg.getSrcName())) + ? commonSourceFieldCfg.getDefaultVal() + : srcMap.get(commonSourceFieldCfg.getSrcName()).toString(); + if (!StringUtil.isEmpty(dstStr) && dstStr.startsWith("[") && dstStr.endsWith("]")) { + dstStr = srcMap.get(dstStr.substring(1, dstStr.length() - 1)).toString(); + } + + if ("dstFile".equals(commonSourceFieldCfg.getSrcName())) { + if ("dst_file".equals(commonSourceFieldCfg.getDstName())) { + String maatTableName = ServiceAndRDBIndexReal + .getUnMaatTableName(Integer.valueOf(srcMap.get("service").toString().trim())); + String dstPath = Constants.MM_SAMPLE_DST_PATH.replace("{tableType}", + maatTableName.substring(maatTableName.lastIndexOf("_") + 1)); + dstStr = dstPath.replace("{fileName}", dstStr.substring(dstStr.lastIndexOf("/") + 1)); + } else if ("file_id".equals(commonSourceFieldCfg.getDstName())) { + // dstStr = dstStr.substring(dstStr.indexOf("group")); + } + } + dstMap.put(commonSourceFieldCfg.getDstName(), dstStr); + } + if (StringUtil.isEmpty(dstMaps.get(Integer.valueOf(srcMap.get("service").toString())))) { + List> list = new ArrayList>(); + list.add(dstMap); + dstMaps.put(Integer.valueOf(srcMap.get("service").toString()), list); + } else { + List> list = dstMaps.get(Integer.valueOf(srcMap.get("service").toString())); + list.add(dstMap); + + } + } + } + logger.info("------------------调用非maat配置新增接口-------------------"); + // 按service分库 + Map>> configMap = new HashMap>>(); + Iterator serviceIterator = dstMaps.keySet().iterator(); + while (serviceIterator.hasNext()) { + Integer service = Integer.valueOf(serviceIterator.next().toString()); + List dbIndexList = ServiceAndRDBIndexReal.getRedisDBByService(service); + if (!StringUtil.isEmpty(dbIndexList) && dbIndexList.size() > 0) { + for (Integer dbIndex : dbIndexList) { + if (configMap.containsKey(dbIndex)) { + configMap.get(dbIndex).addAll(dstMaps.get(service)); + } else { + List> list = new ArrayList>(); + list.addAll(dstMaps.get(service)); + configMap.put(dbIndex, list); + } + } + } else { + throw new ServiceRuntimeException("service与写入数据库序号映射关系不存在", + RestBusinessCode.ServiceNoFoundDBIndex.getValue()); + } + + } + configRedisService.saveUnMaatConfig(configMap); + } + /** * * @Description:回调类配置状态更新(停/启用) @@ -1077,7 +1346,7 @@ public class ConfigSourcesService extends BaseService { * @param value */ public void setRedisClusterKey(String key, String value) { - JedisClusterUtils.set(key, value, 0); + JedisClusterUtils.set(key, value, 86400);// 24小时超时 } /** diff --git a/src/main/java/com/nis/web/task/SyncAllConfigTask.java b/src/main/java/com/nis/web/task/SyncAllConfigTask.java index 7b69bf8..ec9b288 100644 --- a/src/main/java/com/nis/web/task/SyncAllConfigTask.java +++ b/src/main/java/com/nis/web/task/SyncAllConfigTask.java @@ -19,31 +19,52 @@ import com.nis.restful.RestBusinessCode; import com.nis.restful.ServiceRuntimeException; import com.nis.util.Constants; import com.nis.util.ExceptionUtil; +import com.nis.util.JedisUtils; import com.nis.web.service.AuditLogThread; +import com.nis.web.service.SpringContextHolder; import com.nis.web.service.restful.ConfigSourcesService; import com.zdjizhi.utils.JsonMapper; import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisException; @Component @PropertySource(value = { "classpath:nis.properties" }) public class SyncAllConfigTask { private static Logger logger = LoggerFactory.getLogger(SyncAllConfigTask.class); - - @Autowired - private JedisCluster jedisCluster; +// @Autowired +// private JedisCluster jedisCluster; @Autowired protected ConfigSourcesService configSourcesService; -// @Scheduled(cron = "${syncUiAndServiceConfigCron}") + + /** + * 每次使用获取新连接,避免集群某些机器宕机后影响连接使用,注意需要在applicationContext-redis.xml中修改jedisCluster的scope为 prototype + * + * @return + * @throws JedisException + */ + public static JedisCluster getResource() throws JedisException { + JedisCluster jedisCluster = SpringContextHolder.getBean(JedisCluster.class); + if (jedisCluster == null) { + throw new ServiceRuntimeException("无法获取redis-cluster连接,请联系管理员检查程序", + RestBusinessCode.CannotConnectionRedis.getValue()); + } + return jedisCluster; + + } + + @Scheduled(cron = "${syncUiAndServiceConfigCron}") public void syncRedisToCluster() { String requestId = UUID.randomUUID().toString(); + JedisCluster jedisCluster = getResource(); + Map> map = null; try { -// - if (lock(requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务 + if (lock(jedisCluster, requestId)) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务 +// if (true) {// 避免集群环境下同一秒钟所有的机器都执行这个定时任务 String allConfigSyncStatus = jedisCluster.get("allConfigSyncStatus"); if (allConfigSyncStatus != null) {// 配置初始化完成 if (allConfigSyncStatus.trim().equals("1")) { - logger.warn(""); + map = getAllSeqAndVersion(); // 设置配置同步状态为正在进行 configSourcesService.setAllConfigSyncStatus("2"); logger.warn("开始执行配置全量导入操作,将allConfigSyncStatus值设置为2正在进行导入操作"); @@ -61,12 +82,13 @@ public class SyncAllConfigTask { maatMap.put(md5, val); } } - addConfigToRedis(maatMap,true); - addConfigToRedis(unMaatMap,false); + flushRedisDb(); + addConfigToRedis(maatMap, true); + addConfigToRedis(unMaatMap, false); logger.warn("执行配置全量导入成功,将allConfigSyncStatus值设置为3,导入成功"); // 设置配置同步状态为写redis成功 configSourcesService.setAllConfigSyncStatus("3"); - + // 删除存储全量配置key的关系key jedisCluster.del("allConfigSyncKey"); for (String key : split) { @@ -92,12 +114,22 @@ public class SyncAllConfigTask { configSourcesService.setAllConfigSyncStatus("-1"); logger.error("执行配置全量导入失败,将allConfigSyncStatus值设置为-1,导入失败"); } finally { - unlock(requestId); - closeConn(); + unlock(jedisCluster, requestId); + closeConn(jedisCluster); + if (map != null && map.size() > 0) { + recoverRedisData(map); + } } } - public void addConfigToRedis(Map map, boolean isMaat) throws Exception { + /** + * 保存数据入库,有验证逻辑 + * + * @param map + * @param isMaat + * @throws Exception + */ + public void addConfigToRedisYZ(Map map, boolean isMaat) throws Exception { long time = System.currentTimeMillis(); StringBuffer sb = new StringBuffer(); if (isMaat) { @@ -116,7 +148,35 @@ public class SyncAllConfigTask { } - public void closeConn() { + /** + * 保存数据入库,无验证逻辑 + * + * @param map + * @param isMaat + * @throws Exception + */ + private void addConfigToRedis(Map map, boolean isMaat) throws Exception { + if (isMaat) { + for (Entry entry : map.entrySet()) { + ConfigSource configSource = new JsonMapper().fromJson(entry.getValue(), ConfigSource.class); + configSourcesService.saveMaatConfig(configSource.getConfigCompileList()); + } + } else { + for (Entry entry : map.entrySet()) { + String value = entry.getValue(); + configSourcesService.saveCommonSources(value); + } + + } + + } + + /** + * 关闭集群连接 + * + * @param jedisCluster + */ + private void closeConn(JedisCluster jedisCluster) { if (jedisCluster != null) { try { jedisCluster.close(); @@ -142,7 +202,7 @@ public class SyncAllConfigTask { * @param requestId 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁 * @return 是否获取成功 */ - public Boolean lock(String requestId) { + private Boolean lock(JedisCluster jedisCluster, String requestId) { String key = "allConfigSyncDistributedLock"; String var1 = jedisCluster.set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, Constants.CONFIGSYNCLOCKTIME); @@ -157,7 +217,7 @@ public class SyncAllConfigTask { * * @param requestId */ - protected boolean unlock(String requestId) { + private boolean unlock(JedisCluster jedisCluster, String requestId) { String key = "allConfigSyncDistributedLock"; // 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】 String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end"; @@ -169,4 +229,58 @@ public class SyncAllConfigTask { } return false; } + + /** + * 从配置redis库中获取没个redisdb的maat_version,0,14,15号库不会有maat_version所以就不获取了 + * + * @return + */ + private Map> getAllSeqAndVersion() { + // 第一个key是redisdb,第二个key是redis的 + Map> map = new HashMap<>(); + for (int i = 1; i < 14; i++) { + String maatVersionStr = JedisUtils.get("MAAT_VERSION", i); + if (!map.containsKey(i)) { + Map keyValMap = new HashMap<>(); + if (maatVersionStr != null) { + keyValMap.put("MAAT_VERSION", maatVersionStr); + map.put(i, keyValMap); + } + + } + } +// String seqCompileid = JedisUtils.get("SEQ_COMPILEID", 0); +// String seqGroupid = JedisUtils.get("SEQ_GROUPID", 0); +// String seqRegionid = JedisUtils.get("SEQ_REGIONID", 0); +// Map keyValMap = new HashMap<>(); +// keyValMap.put("SEQ_COMPILEID", seqCompileid); +// keyValMap.put("SEQ_GROUPID", seqGroupid); +// keyValMap.put("SEQ_REGIONID", seqRegionid); +// map.put(0, keyValMap); + return map; + } + + /** + * 清空配置redis库,不清空0号库 + */ + private void flushRedisDb() {// 不清空0号库 + for (int i = 1; i < 16; i++) { + JedisUtils.getResource(i).flushDB(); + } + } + + /** + * 恢复配置redis库中各个索引的maat_version + * + * @param map + */ + private void recoverRedisData(Map> map) { + for (Integer redisDB : map.keySet()) { + Map keyValMap = map.get(redisDB); + for (String redisKey : keyValMap.keySet()) { + JedisUtils.set(redisKey, String.valueOf(Long.parseLong(keyValMap.get(redisKey)) + 2l), 0, redisDB); + } + } + } + } From 536e5b887a206908ae2edeb093f564ba2dc1362d Mon Sep 17 00:00:00 2001 From: zhangdongxu Date: Wed, 5 Dec 2018 17:11:12 +0800 Subject: [PATCH 5/5] =?UTF-8?q?1=E3=80=81=E4=BF=AE=E6=94=B9getMaatTableNam?= =?UTF-8?q?e=E6=96=B9=E6=B3=95=E4=B8=AD=E8=8E=B7=E5=8F=96=E4=B8=8D?= =?UTF-8?q?=E5=88=B0=E8=A1=A8=E5=90=8D=E7=9A=84=E5=BC=82=E5=B8=B8=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=EF=BC=9B=202=E3=80=81=E5=A6=82=E6=9E=9C=E6=9C=80?= =?UTF-8?q?=E5=90=8E=E7=9A=84service=E6=B2=A1=E6=9C=89=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=EF=BC=8C=E4=B8=8D=E8=AE=BA=E6=98=AFmaat=E7=B1=BB=E8=BF=98?= =?UTF-8?q?=E6=98=AF=E5=9B=9E=E8=B0=83=E7=B1=BB=E9=85=8D=E7=BD=AE=EF=BC=8C?= =?UTF-8?q?=E9=83=BD=E4=BC=9A=E4=BC=A0{}+lastCompletedTag(finished)?= =?UTF-8?q?=E6=9D=A5=E7=BB=93=E6=9D=9F=E6=95=B0=E6=8D=AE=E4=BC=A0=E8=BE=93?= =?UTF-8?q?=EF=BC=9B=203=E3=80=81=E6=89=BE=E4=B8=8D=E5=88=B0=E4=B8=9A?= =?UTF-8?q?=E5=8A=A1=E5=85=A5=E5=BA=93=E5=BA=8F=E5=8F=B7=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E7=A4=BAXX=E4=B8=9A=E5=8A=A1=E5=86=99=E5=85=A5?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=BA=8F=E5=8F=B7=E6=98=A0=E5=B0=84?= =?UTF-8?q?=E5=85=B3=E7=B3=BB=E4=B8=8D=E5=AD=98=E5=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/nis/util/ServiceAndRDBIndexReal.java | 2 +- .../controller/restful/ConfigSourcesController.java | 8 ++++++-- .../nis/web/service/restful/ConfigSourcesService.java | 10 +++++----- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java b/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java index b7427c1..578e4e2 100644 --- a/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java +++ b/src/main/java/com/nis/util/ServiceAndRDBIndexReal.java @@ -311,7 +311,7 @@ public class ServiceAndRDBIndexReal { return tableList.get(index); } else { logger.error("未从业务类型和表对应关系中,找到业务类型:{},配置类型:{}表名:{}对应的真实表名", service, type, tableName); - throw new ServiceRuntimeException("无法从applicationConfig-rule.properties配置文件中,找到回调类配置service为" + throw new ServiceRuntimeException("无法从applicationConfig-rule.properties配置文件中,找到service为" + service + ",配置类型:" + type + "对应的真实表名", RestBusinessCode.NotFoundTableName.getValue()); } diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java index 7b0fb1f..121d90f 100644 --- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java +++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java @@ -565,7 +565,7 @@ public class ConfigSourcesController extends BaseRestController { String serviceId = request.getHeader("Service-Id"); String configTable = request.getHeader("Config-Table"); String lastCompletedTag = request.getHeader("Last-Completed-Tag"); - logger.info(new Date() + "-----------接收到json格式数据:" + jsonString + "-------:"); +// logger.info(new Date() + "-----------接收到json格式数据:" + jsonString + "-------:"); if (StringUtil.isEmpty(serviceId)) { logger.error("未在请求头中获取到serviceId"); throw new RestServiceException( @@ -590,7 +590,7 @@ public class ConfigSourcesController extends BaseRestController { logger.info("-----------开始存储到json格式数据------->>configType:" + configType + ",serviceId:" + serviceId + ",configTable:" + configTable + ",lastCompletedTag:" + lastCompletedTag); - if (jsonString != null && !jsonString.trim().equals("")) {// 张薇说body可能为空,有的service没有数据也会下发一次,譬如最后一次提交时,lastCompletedTag=finished,但是body可能是空的 + if (jsonString != null && !jsonString.trim().equals("{}")) {// 如果最后的service没有配置,不论是maat类还是回调类配置,都会传{}+lastCompletedTag(finished)来结束数据传输 String key = null; if ("1".equals(configType)) { key = "MAAT"; @@ -637,6 +637,10 @@ public class ConfigSourcesController extends BaseRestController { } logger.info("-----------配置同步指令下发:" + new Date()); // 设置配置同步状态为开始 + //在下次开始同步之前把上次记录的key删除 + if (JedisClusterUtils.exists("allConfigSyncKey")) { + JedisClusterUtils.getResource().del("allConfigSyncKey"); + } configSourcesService.setAllConfigSyncStatus("0"); } else if ("0".equals(obj.get("syncStatus").toString())) {// 取消同步指令 if (allConfigSyncStatus != null diff --git a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java index 4791ba8..eb4416d 100644 --- a/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java +++ b/src/main/java/com/nis/web/service/restful/ConfigSourcesService.java @@ -299,7 +299,7 @@ public class ConfigSourcesService extends BaseService { } } else { - throw new ServiceRuntimeException(RestBusinessCode.ServiceNoFoundDBIndex.getErrorReason(), + throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在", RestBusinessCode.ServiceNoFoundDBIndex.getValue()); } } @@ -478,7 +478,7 @@ public class ConfigSourcesService extends BaseService { } } else { - throw new ServiceRuntimeException(RestBusinessCode.ServiceNoFoundDBIndex.getErrorReason(), + throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在", RestBusinessCode.ServiceNoFoundDBIndex.getValue()); } } @@ -801,7 +801,7 @@ public class ConfigSourcesService extends BaseService { } } } else { - throw new ServiceRuntimeException("service与写入数据库序号映射关系不存在", + throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在", RestBusinessCode.ServiceNoFoundDBIndex.getValue()); } @@ -891,7 +891,7 @@ public class ConfigSourcesService extends BaseService { } } } else { - throw new ServiceRuntimeException("service与写入数据库序号映射关系不存在", + throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在", RestBusinessCode.ServiceNoFoundDBIndex.getValue()); } @@ -972,7 +972,7 @@ public class ConfigSourcesService extends BaseService { } } } else { - throw new ServiceRuntimeException("service与写入数据库序号映射关系不存在", + throw new ServiceRuntimeException("service为"+service+"的业务写入数据库序号映射关系不存在", RestBusinessCode.ServiceNoFoundDBIndex.getValue()); } }