diff --git a/src/main/java/com/nis/restful/RestBusinessCode.java b/src/main/java/com/nis/restful/RestBusinessCode.java index 89c5dd6..31b33b2 100644 --- a/src/main/java/com/nis/restful/RestBusinessCode.java +++ b/src/main/java/com/nis/restful/RestBusinessCode.java @@ -667,7 +667,10 @@ public enum RestBusinessCode { /** * 状态更新操作中更新内容Map不能为空 */ - ConfigInfoMapIsNull(5004005,"状态更新操作中更新内容Map不能为空"), + ConfigInfoMapIsNull(5004006,"状态更新操作中更新内容Map不能为空"), + + + existBatchTask(5005001,"有其他任务正在执行批量导入,请稍后再试") ; diff --git a/src/main/java/com/nis/util/Constants.java b/src/main/java/com/nis/util/Constants.java index 464dd42..7489828 100644 --- a/src/main/java/com/nis/util/Constants.java +++ b/src/main/java/com/nis/util/Constants.java @@ -211,12 +211,22 @@ public final class Constants { *是否使用Minio */ public static final Boolean IS_USE_MINIO = Configurations.getBooleanProperty("isUseMinio", true); - public static final int MAXTHREADNUM = Configurations.getIntProperty("maxThreadNum", 10); - public static final int EVERTHREADNUM = Configurations.getIntProperty("everThreadNum", 10000); /** * 保存请求内容时,最大的资源列表size */ public static final int MAX_LIST_SIZE = Configurations.getIntProperty("maxListSize", 10); + + /** + * redis分布式锁超时时间,默认五分钟,3000秒 + */ + public static final Long REDISLOCKTIME=Configurations.getLongProperty("redisLockTime", 3000); + /** + * 获取redis分布式锁失败后的尝试获取锁的次数,每次失败暂停一秒钟后再次尝试 + */ + public static final Long REDISRETRYNUM=Configurations.getLongProperty("redisRetryNum", 5); + + + } diff --git a/src/main/java/com/nis/util/JedisUtils.java b/src/main/java/com/nis/util/JedisUtils.java index 20d8456..54ea353 100644 --- a/src/main/java/com/nis/util/JedisUtils.java +++ b/src/main/java/com/nis/util/JedisUtils.java @@ -1,5 +1,6 @@ package com.nis.util; +import java.util.Collections; import java.util.List; import org.slf4j.Logger; @@ -16,7 +17,8 @@ import com.nis.web.service.SpringContextHolder; public class JedisUtils { private static Logger logger = LoggerFactory.getLogger(JedisUtils.class); - private static JedisPool jedisPool = SpringContextHolder.getBean(JedisPool.class); + private static final JedisPool jedisPool = SpringContextHolder.getBean(JedisPool.class); + /** * 获取缓存 * @@ -232,7 +234,7 @@ public class JedisUtils { jedis.select(redisDb); } catch (JedisException e) { returnBrokenResource(jedis); - logger.error("获取redis连接失败,异常信息:{}" ,ExceptionUtil.getExceptionMsg(e)); + logger.error("获取redis连接失败,异常信息:{}", ExceptionUtil.getExceptionMsg(e)); throw new ServiceRuntimeException("获取redis连接失败,请联系管理员检查程序", RestBusinessCode.CannotConnectionRedis.getValue()); } @@ -298,4 +300,72 @@ public class JedisUtils { return ObjectUtils.unserialize(bytes); } + // 设置成功返回的结果OK + private static final String LOCK_SUCCESS = "OK"; + // NX -- Only set the key if it does not already exist. XX -- Only set the key + // if it already exist + private static final String SET_IF_NOT_EXIST = "NX"; + // 失效单位秒(EX)还是毫秒(PX) + private static final String SET_WITH_EXPIRE_TIME = "EX"; + private static final Long UNLOCK_SUCCESS = 1L; + + /** + * 尝试获取分布式锁,如果没有key就set,有key就不操作 + * + * @param requestId + * 请求标识(UUID.randomUUID().toString()),正常情况下是谁加的锁,谁去解锁不能a加的锁,b去解锁 + * @return 是否获取成功 + */ + public static Boolean lock(String requestId) { + String key = "redisDistributedLock"; + String var1 = getResource(0).set(key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, + Constants.REDISLOCKTIME); + if (LOCK_SUCCESS.equals(var1)) { + return true; + } + return false; + } + + /** + * 解锁操作 + * + * @param value + * 客户端标识(requestId) + * @return + */ + public static Boolean unLock(String value) { + String key = "redisDistributedLock"; + // 这个字符串是个lua脚本,代表的意思是如果根据key拿到的value跟传入的value相同就执行del,否则就返回0【保证安全性】 + String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then return redis.call(\"del\",KEYS[1]) else return 0 end"; + // 这个命令就是去执行lua脚本,KEYS的集合就是第二个参数,ARGV的集合就是第三参数【保证解锁的原子操作】 + Object var2 = getResource(0).eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)); + if (UNLOCK_SUCCESS == var2) { + return true; + } + return false; + } + + /** + * 重试机制 + * + * @param value + * 客户端标识 + * @return + */ + public static Boolean lockRetry(String value) { + Boolean flag = false; + try { + for (int i = 0; i < Constants.REDISRETRYNUM; i++) { + flag = lock(value); + if (flag) { + break; + } + Thread.sleep(1000); + } + } catch (Exception e) { + logger.error("尝试获取redis分布式锁失败,失败原因:{}", ExceptionUtil.getExceptionMsg(e)); + } + return flag; + } + } diff --git a/src/main/java/com/nis/web/controller/BaseRestController.java b/src/main/java/com/nis/web/controller/BaseRestController.java index d7043a3..0e136c5 100644 --- a/src/main/java/com/nis/web/controller/BaseRestController.java +++ b/src/main/java/com/nis/web/controller/BaseRestController.java @@ -31,8 +31,10 @@ import com.nis.restful.RestBusinessCode; import com.nis.restful.RestConstants; import com.nis.restful.RestResult; import com.nis.restful.RestServiceException; +import com.nis.restful.ServiceRuntimeException; import com.nis.util.Constants; import com.nis.util.DateUtils; +import com.nis.util.JedisUtils; import com.nis.web.service.AuditLogThread; import com.nis.web.service.ServicesRequestLogService; import com.zdjizhi.utils.StringUtil; @@ -419,4 +421,35 @@ public class BaseRestController { entity.setSearchFoundEndTime(map.get("endTime")); } } + + /** + * 获取redis分布式锁的状态 + * + * @param requestId + * @return + */ + protected boolean getLock(String requestId) { + boolean lock = false; + if (JedisUtils.lock(requestId)) { + lock = true; + } else { + if (JedisUtils.lockRetry(requestId)) { + lock = true; + } else { + throw new ServiceRuntimeException("有其他任务正在执行批量导入,请稍后再试.如一直提示该信息,请联系管理员检查系统运行状态!", + RestBusinessCode.ConfigSourceIsNull.getValue()); + } + } + return lock; + } + + /** + * 解除redis分布式锁 + * @param requestId + */ + protected void deblocking (String requestId) { + JedisUtils.unLock(requestId); + } + + } diff --git a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java index e54ee69..b4e34d5 100644 --- a/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java +++ b/src/main/java/com/nis/web/controller/restful/ConfigSourcesController.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.UUID; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -63,28 +64,31 @@ public class ConfigSourcesController extends BaseRestController { @Autowired ConfigRedisService configRedisService; - + @RequestMapping(value = "/cfg/v1/configSources", method = RequestMethod.POST) @ApiOperation(value = "MAAT规则存储接口", httpMethod = "POST", response = Map.class, notes = "接收MAAT规则数据,存储到流量处理平台配置线中") @ApiParam(value = "MAAT规则对象", name = "configSource", required = true) public Map createMaatConfigSource(@RequestBody ConfigSource configSource, HttpServletRequest request, HttpServletResponse response) { - + long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, configSource); - + // 分布式锁的标识,谁加锁,谁解锁,如果中间发生了异常则根据失效时间自动失效,默认五分钟失效 StringBuffer sb = new StringBuffer(); + String requestId = UUID.randomUUID().toString(); try { - if (null != configSource && null != configSource.getConfigCompileList() - && configSource.getConfigCompileList().size() > 0) { - checkOpAction(thread, System.currentTimeMillis() - start, configSource.getOpAction(), Constants.OPACTION_POST); - // 验证配置编译数据 - validateConfigSource(thread, start, configSource); - configSourcesService.saveMaatConfig(thread, start, configSource.getConfigCompileList(), sb); - } else { - throw new RestServiceException("Maat规则不能为空", - RestBusinessCode.ConfigSourceIsNull.getValue()); + if (getLock(requestId)) { + if (null != configSource && null != configSource.getConfigCompileList() + && configSource.getConfigCompileList().size() > 0) { + checkOpAction(thread, System.currentTimeMillis() - start, configSource.getOpAction(), + Constants.OPACTION_POST); + // 验证配置编译数据 + validateConfigSource(thread, start, configSource); + configSourcesService.saveMaatConfig(thread, start, configSource.getConfigCompileList(), sb); + } else { + throw new RestServiceException("Maat规则不能为空", RestBusinessCode.ConfigSourceIsNull.getValue()); + } } } catch (Exception e) { // TODO: handle exception @@ -92,15 +96,17 @@ public class ConfigSourcesController extends BaseRestController { logger.error("Maat 规则存储异常:" + e.getMessage()); if (e instanceof RestServiceException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, - "Maat 规则存储异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if(e instanceof ServiceRuntimeException) { + "Maat 规则存储异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "Maat 规则存储异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else{ + "Maat 规则存储异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "Maat 规则存储异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); + "Maat 规则存储异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, @@ -116,20 +122,23 @@ public class ConfigSourcesController extends BaseRestController { AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_PUT, request, configSource); StringBuffer sb = new StringBuffer(); + String requestId = UUID.randomUUID().toString(); try { - if (null == configSource.getOpTime()) { - configSource.setOpTime(new Date()); - } - if (null != configSource && null != configSource.getConfigCompileList() - && configSource.getConfigCompileList().size() > 0) { - int opAction = configSource.getOpAction(); - checkOpAction(thread, System.currentTimeMillis() - start, opAction, 2); - configSourcesService.updateConfigSources(thread, start, configSource.getConfigCompileList(), - configSource.getOpTime(), sb); - - } else { - throw new RestServiceException("Maat规则不能为空" + sb.toString(), - RestBusinessCode.ConfigSourceIsNull.getValue()); + if (getLock(requestId)) { + if (null == configSource.getOpTime()) { + configSource.setOpTime(new Date()); + } + if (null != configSource && null != configSource.getConfigCompileList() + && configSource.getConfigCompileList().size() > 0) { + int opAction = configSource.getOpAction(); + checkOpAction(thread, System.currentTimeMillis() - start, opAction, 2); + configSourcesService.updateConfigSources(thread, start, configSource.getConfigCompileList(), + configSource.getOpTime(), sb); + + } else { + throw new RestServiceException("Maat规则不能为空" + sb.toString(), + RestBusinessCode.ConfigSourceIsNull.getValue()); + } } } catch (Exception e) { // TODO: handle exception @@ -137,16 +146,18 @@ public class ConfigSourcesController extends BaseRestController { logger.error("MAAT规则状态更新时出现异常:" + e.getMessage()); if (e instanceof RestServiceException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, - "MAAT规则状态更新时出现异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if(e instanceof ServiceRuntimeException) { + "MAAT规则状态更新时出现异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "MAAT规则状态更新时出现异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else{ + "MAAT规则状态更新时出现异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "MAAT规则状态更新时出现异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); + "MAAT规则状态更新时出现异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } - + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "Maat规则状态更新成功" + sb.toString(), Constants.IS_DEBUG ? configSource : null); } @@ -177,36 +188,41 @@ public class ConfigSourcesController extends BaseRestController { } } - + @RequestMapping(value = "/cfg/v1/commonSources", method = RequestMethod.POST, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE) @ApiOperation(value = "回调(通用)规则存储接口", httpMethod = "POST", response = Map.class, notes = "接收回调规则数据,格式为结构化行列式JSON,存储到流量处理平台配置线中") public Map createCommonConfigSource(@RequestBody String jsonString, HttpServletRequest request, HttpServletResponse response) { long start = System.currentTimeMillis(); - AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, - null); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); + String requestId = UUID.randomUUID().toString(); try { - configSourcesService.saveCommonSources(thread, start, jsonString); + if (getLock(requestId)) { + configSourcesService.saveCommonSources(thread, start, jsonString); + } } catch (Exception e) { // TODO: handle exception thread.setExceptionInfo("回调规则存储异常:" + e.getMessage()); logger.error("回调规则存储异常:" + e.getMessage()); if (e instanceof RestServiceException) { - throw new RestServiceException(thread, System.currentTimeMillis() - start, - "回调规则存储异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if (e instanceof ServiceRuntimeException) { - throw new RestServiceException(thread, System.currentTimeMillis() - start, - "回调规则存储异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else { + throw new RestServiceException(thread, System.currentTimeMillis() - start, "回调规则存储异常:" + e.getMessage(), + ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { + throw new RestServiceException(thread, System.currentTimeMillis() - start, "回调规则存储异常:" + e.getMessage(), + ((ServiceRuntimeException) e).getErrorCode()); + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "回调规则存储异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); + "回调规则存储异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } - + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "回调规则下发成功", Constants.IS_DEBUG ? jsonString : null); } + @RequestMapping(value = "/cfg/v1/commonSources", method = RequestMethod.PATCH, produces = org.springframework.http.MediaType.APPLICATION_JSON_VALUE) @ApiOperation(value = "回调(通用)规则内容更新接口", httpMethod = "PATCH", response = Map.class, notes = "接收回调规则数据,格式为结构化行列式JSON,存储到流量处理平台配置线中") public Map patchCommonConfigSource(@RequestBody String jsonString, HttpServletRequest request, @@ -214,27 +230,33 @@ public class ConfigSourcesController extends BaseRestController { long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_PATCH, request, null); + String requestId = UUID.randomUUID().toString(); try { - configSourcesService.saveCommonSources(thread, start, jsonString); + if (getLock(requestId)) { + configSourcesService.saveCommonSources(thread, start, jsonString); + } } catch (Exception e) { // TODO: handle exception thread.setExceptionInfo("回调规则内容更新异常:" + e.getMessage()); logger.error("回调规则内容更新异常:" + e.getMessage()); if (e instanceof RestServiceException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, - "回调规则内容更新异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if (e instanceof ServiceRuntimeException) { + "回调规则内容更新异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, - "回调规则内容更新异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else { + "回调规则内容更新异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "回调规则内容更新异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); + "回调规则内容更新异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } - + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "回调规则内容更新成功", Constants.IS_DEBUG ? jsonString : null); } + @RequestMapping(value = "/cfg/v1/commonSources", method = RequestMethod.PUT) @ApiOperation(value = "回调(通用)规则状态更新接口", httpMethod = "PUT", response = Map.class, notes = "接收回调规则,对其状态置为失效") public Map updateCommonConfigSource(@RequestBody String jsonString, HttpServletRequest request, @@ -243,8 +265,11 @@ public class ConfigSourcesController extends BaseRestController { AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_PUT, request, jsonString); StringBuffer sb = new StringBuffer(); + String requestId = UUID.randomUUID().toString(); try { - configSourcesService.updateCommonSources(thread, start, jsonString, new Date(), sb); + if (getLock(requestId)) { + configSourcesService.updateCommonSources(thread, start, jsonString, new Date(), sb); + } } catch (Exception e) { // TODO: handle exception thread.setExceptionInfo("回调规则状态更新异常:" + e.getMessage()); @@ -252,15 +277,17 @@ public class ConfigSourcesController extends BaseRestController { if (e instanceof RestServiceException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, "回调规则状态更新异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if (e instanceof ServiceRuntimeException) { + } else if (e instanceof ServiceRuntimeException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, "回调规则状态更新异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else { + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, "回调规则状态更新异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } - + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "回调规则状态更新成功", Constants.IS_DEBUG ? jsonString : null); } @@ -271,8 +298,7 @@ public class ConfigSourcesController extends BaseRestController { public Map fileUploadSource(@RequestBody MultipartFile file, HttpServletRequest request, HttpServletResponse response) { long start = System.currentTimeMillis(); - AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, - null); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, null); String filePath = ""; try { FileDesc fileDesc = (FileDesc) JSONObject.toBean(JSONObject.fromObject(request.getHeader("File-Desc")), @@ -291,9 +317,10 @@ public class ConfigSourcesController extends BaseRestController { } String ext = file.getOriginalFilename().substring(file.getOriginalFilename().lastIndexOf(".") + 1); logger.info("-----------------调用接口上传文件---------------"); - if(Constants.IS_USE_MINIO){ - filePath = MinioUtil.uploadFile(file.getInputStream(), file.getOriginalFilename(), file.getContentType()); - }else{ + if (Constants.IS_USE_MINIO) { + filePath = MinioUtil.uploadFile(file.getInputStream(), file.getOriginalFilename(), + file.getContentType()); + } else { FastDFSFile fdsfile = new FastDFSFile(file.getBytes(), file.getOriginalFilename(), ext); // NameValuePair[] meta_list = new NameValuePair[5]; // meta_list[0] = new NameValuePair("fileName", file.getOriginalFilename()); @@ -305,27 +332,28 @@ public class ConfigSourcesController extends BaseRestController { filePath = FileManager.upload(fdsfile, null); } } - }catch (IOException e) { + } catch (IOException e) { // TODO Auto-generated catch block - logger.error(RestBusinessCode.FileUploadFailure.getErrorReason()+":"+e.getMessage()); - thread.setExceptionInfo(RestBusinessCode.FileUploadFailure.getErrorReason()+":"+e.getMessage()); + logger.error(RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage()); + thread.setExceptionInfo(RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage()); throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - RestBusinessCode.FileUploadFailure.getErrorReason()+":"+ e.getMessage(), RestBusinessCode.FileUploadFailure.getValue()); - }catch (Exception e) { + RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage(), + RestBusinessCode.FileUploadFailure.getValue()); + } catch (Exception e) { // TODO: handle exception - logger.error("文件上传异常:" +e.getMessage()); + logger.error("文件上传异常:" + e.getMessage()); thread.setExceptionInfo("文件上传异常:" + e.getMessage()); if (e instanceof RestServiceException) { - throw new RestServiceException(thread, System.currentTimeMillis() - start, - e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if (e instanceof ServiceRuntimeException) { - throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else { - throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - e.getMessage(), RestBusinessCode.FileUploadFailure.getValue()); + throw new RestServiceException(thread, System.currentTimeMillis() - start, e.getMessage(), + ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { + throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, e.getMessage(), + ((ServiceRuntimeException) e).getErrorCode()); + } else { + throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, e.getMessage(), + RestBusinessCode.FileUploadFailure.getValue()); } - + } JSONObject jsonObj = new JSONObject(); // jsonObj.put("accessUrl", filePath.substring(filePath.indexOf("group"))); @@ -339,10 +367,10 @@ public class ConfigSourcesController extends BaseRestController { public Map fileDigestSources(@RequestBody MultipartFile file, HttpServletRequest request, HttpServletResponse response) { long start = System.currentTimeMillis(); - AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, - file, null); + AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, file, + null); JSONObject resultObject = new JSONObject(); - String filePath=""; + String filePath = ""; try { if (file == null) { throw new RestServiceException("请上传获取摘要的文件到file参数", RestBusinessCode.FileIsNull.getValue()); @@ -355,14 +383,14 @@ public class ConfigSourcesController extends BaseRestController { String md5 = DigestUtils.md5Hex(file.getInputStream()); System.out.println("----------------------------MD5:'" + md5 + "'==='" + fileDesc.getChecksum() + "'"); if (!md5.equals(fileDesc.getChecksum())) { - throw new RestServiceException("checksum与文件MD5值不一致", - RestBusinessCode.CheckSumIsWrong.getValue()); + throw new RestServiceException("checksum与文件MD5值不一致", RestBusinessCode.CheckSumIsWrong.getValue()); } String ext = file.getOriginalFilename().substring(file.getOriginalFilename().lastIndexOf(".") + 1); logger.info("-----------------调用接口上传文件---------------"); - if(Constants.IS_USE_MINIO){ - filePath = MinioUtil.uploadFile(file.getInputStream(), file.getOriginalFilename(), file.getContentType()); - }else{ + if (Constants.IS_USE_MINIO) { + filePath = MinioUtil.uploadFile(file.getInputStream(), file.getOriginalFilename(), + file.getContentType()); + } else { FastDFSFile fdsfile = new FastDFSFile(file.getBytes(), file.getOriginalFilename(), ext); // NameValuePair[] meta_list = new NameValuePair[5]; // meta_list[0] = new NameValuePair("fileName", file.getOriginalFilename()); @@ -375,44 +403,46 @@ public class ConfigSourcesController extends BaseRestController { } resultObject.put("accessUrl", filePath); } - }catch (IOException e) { + } catch (IOException e) { // TODO Auto-generated catch block - logger.error(RestBusinessCode.FileUploadFailure.getErrorReason()+":"+e.getMessage()); - thread.setExceptionInfo(RestBusinessCode.FileUploadFailure.getErrorReason()+":"+ e.getMessage()); + logger.error(RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage()); + thread.setExceptionInfo(RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage()); throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - RestBusinessCode.FileUploadFailure.getErrorReason()+":"+ e.getMessage(), RestBusinessCode.FileUploadFailure.getValue()); - }catch (Exception e) { + RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage(), + RestBusinessCode.FileUploadFailure.getValue()); + } catch (Exception e) { // TODO: handle exception - logger.error(RestBusinessCode.FileUploadFailure.getErrorReason()+":"+e.getMessage()); - thread.setExceptionInfo(RestBusinessCode.FileUploadFailure.getErrorReason()+":"+ e.getMessage()); + logger.error(RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage()); + thread.setExceptionInfo(RestBusinessCode.FileUploadFailure.getErrorReason() + ":" + e.getMessage()); if (e instanceof RestServiceException) { - throw new RestServiceException(thread, System.currentTimeMillis() - start, - e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if (e instanceof ServiceRuntimeException) { - throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else { - throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - e.getMessage(), RestBusinessCode.FileUploadFailure.getValue()); + throw new RestServiceException(thread, System.currentTimeMillis() - start, e.getMessage(), + ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { + throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, e.getMessage(), + ((ServiceRuntimeException) e).getErrorCode()); + } else { + throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, e.getMessage(), + RestBusinessCode.FileUploadFailure.getValue()); } } try { String tempFilePath = request.getRealPath(File.separator) + "upload" + File.separator + (new Date()).getTime() + file.getOriginalFilename(); file.transferTo(new File(tempFilePath)); - //System.out.println("------------" + tempFilePath); + // System.out.println("------------" + tempFilePath); logger.info("摘要获取开始:---------------"); String digestStr = configSourcesService.getDigestGen(request.getRealPath(File.separator), tempFilePath); - logger.info("摘要获取结束:---------------:"+digestStr); - resultObject.put("digest", null==digestStr?"":digestStr); + logger.info("摘要获取结束:---------------:" + digestStr); + resultObject.put("digest", null == digestStr ? "" : digestStr); resultObject.put("rawLen", file.getSize()); FileUtils.deleteFile(tempFilePath); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); - logger.error(RestBusinessCode.GetFileDigestFailure.getValue()+":"+e.getMessage()+",请检查摘要获取工具是否安装成功"); - thread.setExceptionInfo("摘要获取过程中出现异常:"+e.getMessage()+",请检查摘要获取工具是否安装成功"); - throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, RestBusinessCode.GetFileDigestFailure.getErrorReason()+":"+e.getMessage()+",请检查摘要获取工具是否安装成功", + logger.error(RestBusinessCode.GetFileDigestFailure.getValue() + ":" + e.getMessage() + ",请检查摘要获取工具是否安装成功"); + thread.setExceptionInfo("摘要获取过程中出现异常:" + e.getMessage() + ",请检查摘要获取工具是否安装成功"); + throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, + RestBusinessCode.GetFileDigestFailure.getErrorReason() + ":" + e.getMessage() + ",请检查摘要获取工具是否安装成功", RestBusinessCode.GetFileDigestFailure.getValue()); } return serviceResponse(thread, System.currentTimeMillis() - start, request, response, "摘要获取成功", resultObject); @@ -423,22 +453,25 @@ public class ConfigSourcesController extends BaseRestController { @ApiParam(value = "分组复用域配置对象", name = "groupReuseSource", required = true) public Map addGroupReuseSources(@RequestBody GroupReuseSource groupReuseSource, HttpServletRequest request, HttpServletResponse response) { - + long start = System.currentTimeMillis(); AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_POST, request, groupReuseSource); StringBuffer sb = new StringBuffer(); + String requestId = UUID.randomUUID().toString(); try { - if (null != groupReuseSource && null != groupReuseSource.getGroupReuseList() - && groupReuseSource.getGroupReuseList().size() > 0) { - checkOpAction(thread, System.currentTimeMillis() - start, groupReuseSource.getOpAction(), Constants.OPACTION_POST); - // 验证配置编译数据 - validateGroupReuseSource(thread, start, groupReuseSource); - configSourcesService.addGroupReuseSources(thread, start, groupReuseSource.getGroupReuseList(), sb); - } else { - throw new RestServiceException("Maat规则不能为空", - RestBusinessCode.ConfigSourceIsNull.getValue()); + if (getLock(requestId)) { + if (null != groupReuseSource && null != groupReuseSource.getGroupReuseList() + && groupReuseSource.getGroupReuseList().size() > 0) { + checkOpAction(thread, System.currentTimeMillis() - start, groupReuseSource.getOpAction(), + Constants.OPACTION_POST); + // 验证配置编译数据 + validateGroupReuseSource(thread, start, groupReuseSource); + configSourcesService.addGroupReuseSources(thread, start, groupReuseSource.getGroupReuseList(), sb); + } else { + throw new RestServiceException("Maat规则不能为空", RestBusinessCode.ConfigSourceIsNull.getValue()); + } } } catch (Exception e) { // TODO: handle exception @@ -446,21 +479,23 @@ public class ConfigSourcesController extends BaseRestController { logger.error("Maat 分组复用规则存储异常:" + e.getMessage()); if (e instanceof RestServiceException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, - "Maat 分组复用规则存储异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if(e instanceof ServiceRuntimeException) { + "Maat 分组复用规则存储异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "Maat 分组复用规则存储异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else{ + "Maat 分组复用规则存储异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "Maat 分组复用规则存储异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); + "Maat 分组复用规则存储异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, "Maat分组复用规则添加成功" + sb.toString(), Constants.IS_DEBUG ? groupReuseSource : null); } - + @RequestMapping(value = "/cfg/v1/groupReuseSources", method = RequestMethod.PUT) @ApiOperation(value = "分组复用域配置删除接口", httpMethod = "PUT", response = Map.class, notes = "接收分组复用域配置,并从对应分组中删除") @ApiParam(value = "分组复用域配置对象", name = "groupReuseSource", required = true) @@ -470,20 +505,23 @@ public class ConfigSourcesController extends BaseRestController { AuditLogThread thread = super.saveRequestLog(servicesRequestLogService, Constants.OPACTION_PUT, request, groupReuseSource); StringBuffer sb = new StringBuffer(); + String requestId = UUID.randomUUID().toString(); try { - if (null == groupReuseSource.getOpTime()) { - groupReuseSource.setOpTime(new Date()); - } - if (null != groupReuseSource && null != groupReuseSource.getGroupReuseList() - && groupReuseSource.getGroupReuseList().size() > 0) { - int opAction = groupReuseSource.getOpAction(); - checkOpAction(thread, System.currentTimeMillis() - start, opAction, 2); - configSourcesService.deleteGroupReuseSources(thread, start,groupReuseSource.getGroupReuseList(), - groupReuseSource.getOpTime(), sb); - - } else { - throw new RestServiceException("分组复用信息不能为空" + sb.toString(), - RestBusinessCode.ConfigSourceIsNull.getValue()); + if (getLock(requestId)) { + if (null == groupReuseSource.getOpTime()) { + groupReuseSource.setOpTime(new Date()); + } + if (null != groupReuseSource && null != groupReuseSource.getGroupReuseList() + && groupReuseSource.getGroupReuseList().size() > 0) { + int opAction = groupReuseSource.getOpAction(); + checkOpAction(thread, System.currentTimeMillis() - start, opAction, 2); + configSourcesService.deleteGroupReuseSources(thread, start, groupReuseSource.getGroupReuseList(), + groupReuseSource.getOpTime(), sb); + + } else { + throw new RestServiceException("分组复用信息不能为空" + sb.toString(), + RestBusinessCode.ConfigSourceIsNull.getValue()); + } } } catch (Exception e) { // TODO: handle exception @@ -491,20 +529,22 @@ public class ConfigSourcesController extends BaseRestController { logger.error("删除MAAT规则分组复用域配置时出现异常:" + e.getMessage()); if (e instanceof RestServiceException) { throw new RestServiceException(thread, System.currentTimeMillis() - start, - "删除MAAT规则分组复用域配置时出现异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); - }else if(e instanceof ServiceRuntimeException) { + "删除MAAT规则分组复用域配置时出现异常:" + e.getMessage(), ((RestServiceException) e).getErrorCode()); + } else if (e instanceof ServiceRuntimeException) { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "删除MAAT规则分组复用域配置时出现异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); - }else{ + "删除MAAT规则分组复用域配置时出现异常:" + e.getMessage(), ((ServiceRuntimeException) e).getErrorCode()); + } else { throw new ServiceRuntimeException(thread, System.currentTimeMillis() - start, - "删除MAAT规则分组复用域配置时出现异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); + "删除MAAT规则分组复用域配置时出现异常:" + e.getMessage(), RestBusinessCode.service_runtime_error.getValue()); } + } finally { + deblocking(requestId); } - + return compileServiceResponse(thread, System.currentTimeMillis() - start, request, response, - "MAAT规则分组复用域配置删除成功" + sb.toString(), Constants.IS_DEBUG ?groupReuseSource : null); + "MAAT规则分组复用域配置删除成功" + sb.toString(), Constants.IS_DEBUG ? groupReuseSource : null); } - + private void validateGroupReuseSource(AuditLogThread thread, long start, GroupReuseSource groupReuseSource) { String errorInfo = ""; @@ -531,6 +571,7 @@ public class ConfigSourcesController extends BaseRestController { } } + private boolean isBlank(Date datetime) { if (null != datetime) { return true; diff --git a/src/main/resources/nis.properties b/src/main/resources/nis.properties index d0efaf5..04aeaab 100644 --- a/src/main/resources/nis.properties +++ b/src/main/resources/nis.properties @@ -213,9 +213,7 @@ fileProtocol=redis:// #是否开启日志查询count和last功能 isOpenLogCountAndLast=true - -#定义单独添加域与删除域开启的最大线程数 -maxThreadNum=20 -#每个线程处理多少条数据 -everThreadNum=500 - +#redis分布式锁超时时间,默认五分钟,3000秒 +redisLockTime=3000 +#获取redis分布式锁失败后的尝试获取锁的次数,每次失败暂停一秒钟后再次尝试 +redisRetryNum=5 \ No newline at end of file