package com.nis.quartz; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.log4j.Logger; import org.quartz.CalendarIntervalScheduleBuilder; import org.quartz.CalendarIntervalTrigger; import org.quartz.CronScheduleBuilder; import org.quartz.DateBuilder; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; import org.quartz.PersistJobDataAfterExecution; import org.quartz.ScheduleBuilder; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SimpleScheduleBuilder; import org.quartz.SimpleTrigger; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.triggers.CalendarIntervalTriggerImpl; import org.quartz.impl.triggers.CronTriggerImpl; import org.quartz.spi.MutableTrigger; import com.nis.domain.ScheduleCfg; import com.nis.util.Constants; import com.nis.util.DateUtil; import com.nis.util.DateUtils; import com.nis.util.DictUtils; import com.nis.util.StringUtils; import com.nis.web.dao.SchedulerDao; import com.nis.web.service.SpringContextHolder; /** * 定时任务 配置全量同步时未执行的定时任务加载 * 1、每 n s 执行一次,每次读取 schedule_cfg 最新的数据 * 2、删除或新增 定时任务 * 3、单线程执行 * @author ddm * */ @DisallowConcurrentExecution @PersistJobDataAfterExecution public class ScheduleSyncCfgJob implements Job { SimpleDateFormat sdf=new SimpleDateFormat(Constants.COMMON_DATE_FORMAT); private static final Logger log = Logger.getLogger(ScheduleSyncCfgJob.class); /** * 状态组,格式:statusGroup-compileId */ private static final String STATUS_GROUP = "syncGroup-"; private static final String STATUS_JOB = "SYNC-JOB"; /** * 生效标识:valid-cronexp */ private static final String VALID_KEY = "valid-"; /** * 失效标识:invalid-cronexp */ private static final String INVALID_KEY = "invalid-"; private static final JobKey STATUS_JOBKEY = JobKey.jobKey(STATUS_JOB, "syncGroup"); private static final JobDetail STATUS_JOBDETAIL = JobBuilder.newJob(ScheduleStatusJob.class) .withIdentity(STATUS_JOBKEY) .storeDurably(true) .build(); @Override public void execute(JobExecutionContext context) throws JobExecutionException { Scheduler scheduler = context.getScheduler(); SchedulerDao dao = SpringContextHolder.getBean(SchedulerDao.class); JobDataMap dataMap = context.getJobDetail().getJobDataMap(); long scheduleCfgId = dataMap.get("scheduleCfgId") == null ? 0:dataMap.getLong("scheduleCfgId"); long limit = dataMap.get("limit") == null ? Constants.MAAT_JSON_SEND_SIZE:dataMap.getLong("limit"); log.info(String.format("Sync定时配置任务开始扫描,scheduleCfgId:%s,limit:%s",scheduleCfgId,limit )); List newlyCfg = null; int totalNum = 0; //全量同步当前状态 0:start:开始 1:init:初始化 2:doing:进行中 String currentStatus = DictUtils.getDictLabel("currrent_sync_status", "status","0"); if(!(currentStatus.equals("0") || currentStatus.equals("1") || currentStatus.equals("2"))) { do { newlyCfg = dao.findNewlyCfg(scheduleCfgId, limit,2,null); if(newlyCfg != null && newlyCfg.size() > 0) { totalNum += newlyCfg.size(); for(ScheduleCfg cfg : newlyCfg) {//先取消之前的定时配置 Integer compileId = cfg.getCompileId(); try { //取消之前所有的 trigger GroupMatcher groupMatcher= GroupMatcher.triggerGroupEquals(STATUS_GROUP + compileId); Set triggerKeys = scheduler.getTriggerKeys(groupMatcher); if(triggerKeys != null && triggerKeys.size() > 0) { for(TriggerKey tk : triggerKeys) { scheduler.unscheduleJob(tk); } log.info(String.format("Sync定时任务取消成功,compile:%s", compileId)); } } catch (Exception e) { log.error(String.format("Sync定时任务取消异常,compileId:%s", compileId),e); } } int index=0; for(ScheduleCfg cfg : newlyCfg) { Integer compileId = cfg.getCompileId(); try { //判断状态,重新添加最新的 trigger Integer isValid = cfg.getIsValid(); Integer isAudit = cfg.getIsAudit(); //添加定时任务的条件 if((isValid == 1 && isAudit == 1) || (isValid == 0 && isAudit == 0)) { //添加定时任务,包括valid 和 invalid addJob(scheduler, cfg,index); log.info(String.format("Sync定时任务添加成功,compile:%s", compileId)); } } catch (Exception e) { log.error(String.format("Sync定时任务更新异常,compileId:%s", compileId),e); } index++; } //最后 保存此次 最后的id ScheduleCfg lastCfg = newlyCfg.get(newlyCfg.size() -1); scheduleCfgId = lastCfg.getId(); dataMap.put("scheduleCfgId", scheduleCfgId); log.info(String.format("Sync加载定时任务,total num :%s", newlyCfg.size())); } } while (newlyCfg != null && newlyCfg.size() > 0); log.info(String.format("Sync定时配置任务结束执行,total num:%s",totalNum)); } log.info("全量同步中缓存的定时配置正在扫描(currentStatus:"+currentStatus+")"); } /** * 将定时任务信息添加到 定时器框架中调度 * @param scheduler * @param cfg * @throws SchedulerException */ /*public static void addJob(Scheduler scheduler,ScheduleCfg cfg) throws SchedulerException { Integer compileId = cfg.getCompileId(); String cronValid = cfg.getCronValid(); String cronInvalid = cfg.getCronInvalid(); Trigger validTrigger = createTrigger(cronValid, compileId, true, cfg); Trigger invalidTrigger = createTrigger(cronInvalid, compileId, false, cfg); boolean jobExist = scheduler.checkExists(STATUS_JOBKEY); if(!jobExist) {//判断 job 是否存在,不存在添加 scheduler.addJob(STATUS_JOBDETAIL, false); } boolean checkExists = scheduler.checkExists(validTrigger.getKey()); if(!checkExists) {//判断 valid trigger 是否存在,不存在添加 scheduler.scheduleJob(validTrigger); }else { log.warn(String.format("Trigger already exists:%s ", validTrigger.getKey().toString())); } checkExists = scheduler.checkExists(invalidTrigger.getKey()); if(!checkExists) {//判断 invalid trigger 是否存在,不存在添加 scheduler.scheduleJob(invalidTrigger); }else { log.warn(String.format("Trigger already exists:%s ", invalidTrigger.getKey().toString())); } }*/ public static void addJob(Scheduler scheduler,ScheduleCfg cfg,int expire) throws SchedulerException { List triList = createTrigger(cfg,expire); boolean jobExist = scheduler.checkExists(STATUS_JOBKEY); if(!jobExist) {//判断 job 是否存在,不存在添加 scheduler.addJob(STATUS_JOBDETAIL, false); } for(Trigger tri : triList) { boolean checkExists = scheduler.checkExists(tri.getKey()); if(!checkExists) {//判断 valid trigger 是否存在,不存在添加 log.debug(String.format("Sync定时任务添加,%s", tri.getKey())); scheduler.scheduleJob(tri); log.info(String.format("Sync定时任务添加成功,%s", tri.getKey())); }else { log.warn(String.format("Trigger(sync) already exists:%s ", tri.getKey().toString())); } } } /** * 将页面配置的内容 转换成 trigger * @param cfg * @return */ public static List createTrigger(ScheduleCfg cfg,int expire){ String mode = cfg.getUserRegion1().toUpperCase();//定时任务运行模式:一次,每天,每周,每月 List triList = null; switch (mode) { case "ALWAYS"://单次运行,但只创建单次生效触发器 triList = createSimpleTrigger(cfg,expire); break; case "SINGLE"://单次运行 triList = createSimpleTrigger(cfg,expire); break; case "EVERYDAY"://每天运行 0 0 0 2/1 * ? ,不符合要求,定义每天都执行,然后在 代码判断 间隔时间 triList = createCalendarIntervalTrigger(cfg); break; case "EVERYWEEK"://每周运行 triList = createCalendarIntervalTrigger(cfg); break; case "EVERYMONTH"://每月运行 triList = createEveryMonthTrigger(cfg); break; default: log.warn(String.format("unknown mode : %s ", mode)); break; } return triList; } /** * 将时间转换成 时分秒 * @param time * @return */ public static List parseTime(String time) { if(StringUtils.isNoneBlank(time)) { String[] split = time.split(":"); List tl = new ArrayList(3); for(String s : split) { tl.add(Integer.valueOf(s)); } return tl; } return null; } public static Trigger createCronTrigger(String cron,Integer compileId,boolean isValid,ScheduleCfg cfg) { String triggerName = isValid ? (VALID_KEY + cron) : (INVALID_KEY + cron); JobDataMap dataMap = new JobDataMap(); dataMap.put("isValid", isValid); dataMap.put("cfg", cfg); return TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triggerName, STATUS_GROUP+compileId)) .withSchedule(CronScheduleBuilder.cronSchedule(cron)) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .build(); } /** * 创建全量同步时未执行的任务,每个任务均为单词任务 * @param cfg * @return */ public static List createSimpleTrigger(ScheduleCfg cfg,int expire){ List triList = new ArrayList(); Integer compileId = cfg.getCompileId(); String cronValid = cfg.getCronValid(); String cronInvalid = cfg.getCronInvalid(); Date validDate = null; Date invalidDate = null; if(StringUtils.isNotBlank(cronValid)){ Calendar c = Calendar.getInstance(); c.add(Calendar.MINUTE, (expire+1)); validDate = c.getTime(); } if(StringUtils.isNotBlank(cronInvalid)){ Calendar c = Calendar.getInstance(); c.add(Calendar.MINUTE, (expire+2)); invalidDate = c.getTime(); } JobDataMap dataMap = new JobDataMap(); if(validDate!=null){//生效时间如果不为空,则创建定时生效触发器 dataMap.put("isValid", true); dataMap.put("cfg", cfg); String triName = VALID_KEY + cfg.getUserRegion1() + "_" + cronValid; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId)) .withSchedule(SimpleScheduleBuilder.simpleSchedule()) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .startAt(validDate) .build(); triList.add(trigger); } if(invalidDate!=null){//失效时间如果不为空,则创建定时失效触发器 dataMap = new JobDataMap(); dataMap.put("isValid", false); dataMap.put("cfg", cfg); String triName = INVALID_KEY + cfg.getUserRegion1() + "_" + cronInvalid; Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId)) .withSchedule(SimpleScheduleBuilder.simpleSchedule()) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .startAt(invalidDate) .build(); triList.add(trigger); } return triList; } /** * 间隔 n 天 或 n 周执行 * @param cfg * @return */ public static List createCalendarIntervalTrigger(ScheduleCfg cfg) { List triList = new ArrayList(); Integer compileId = cfg.getCompileId(); String cronValid = cfg.getCronValid(); String cronInvalid = cfg.getCronInvalid(); String dayOrWeek = cfg.getUserRegion1(); Integer interval = Integer.valueOf(cfg.getUserRegion2()); List validList = parseTime(cronValid); List invalidList = parseTime(cronInvalid); Date validStartTime = DateBuilder.todayAt(validList.get(0), validList.get(1), validList.get(2)); Date invalidTime = DateBuilder.todayAt(invalidList.get(0), invalidList.get(1), invalidList.get(2)); CalendarIntervalScheduleBuilder intervalBuilder = null; if("EVERYDAY".equalsIgnoreCase(dayOrWeek)) { intervalBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withIntervalInDays(interval); //valid JobDataMap dataMap = new JobDataMap(); dataMap.put("isValid", true); dataMap.put("cfg", cfg); String triName = VALID_KEY + dayOrWeek+"("+interval+")" + "_" + DateUtils.formatDate(validStartTime, Constants.COMMON_DATE_FORMAT); Trigger validTri = TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId)) .withSchedule(intervalBuilder) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .startAt(validStartTime) .build(); triList.add(validTri); //invalid dataMap = new JobDataMap(); dataMap.put("isValid", false); dataMap.put("cfg", cfg); triName = INVALID_KEY + dayOrWeek +"("+interval+")" + "_" + DateUtils.formatDate(invalidTime, Constants.COMMON_DATE_FORMAT); validTri = TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId)) .withSchedule(intervalBuilder) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .startAt(invalidTime) .build(); triList.add(validTri); }else if("EVERYWEEK".equalsIgnoreCase(dayOrWeek)) { intervalBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withIntervalInWeeks(interval); String[] weeks = cfg.getUserRegion3().split(","); for(String week : weeks) { if(StringUtils.isNoneBlank(week)) { Date temp = closestAfterWeek(validStartTime, Integer.valueOf(week)); JobDataMap dataMap = new JobDataMap(); dataMap.put("isValid", true); dataMap.put("cfg", cfg); String triName = VALID_KEY + dayOrWeek +week+"("+interval+")" + "_" + DateUtils.formatDate(temp, Constants.COMMON_DATE_FORMAT); Trigger validTri = TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId)) .withSchedule(intervalBuilder) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .startAt(temp) .build(); triList.add(validTri); //invalid dataMap = new JobDataMap(); dataMap.put("isValid", false); dataMap.put("cfg", cfg); temp = closestAfterWeek(invalidTime, Integer.valueOf(week)); triName = INVALID_KEY + dayOrWeek +week+"("+interval+")" + "_" + DateUtils.formatDate(temp, Constants.COMMON_DATE_FORMAT); validTri = TriggerBuilder.newTrigger() .withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId)) .withSchedule(intervalBuilder) .usingJobData(dataMap) .forJob(STATUS_JOBDETAIL) .startAt(temp) .build(); triList.add(validTri); } } } return triList; } /** * 每月 执行 * @param cfg * @return */ public static List createEveryMonthTrigger(ScheduleCfg cfg){ String dayWeek = cfg.getUserRegion3(); String cronInvalid = cfg.getCronInvalid(); String cronValid = cfg.getCronValid(); StringBuilder cronSb = new StringBuilder(); Trigger trigger = null; List validList = parseTime(cronValid);//time 转换 List invalidList = parseTime(cronInvalid);//time 转换 List triList = new ArrayList(); String userRegion4 = cfg.getUserRegion4().toUpperCase(); if("day".equalsIgnoreCase(dayWeek)) {//指定天 boolean hasL = userRegion4.contains("L"); StringBuilder chooseSb = new StringBuilder(); for(String str : userRegion4.split(",")) { if(!"L".equalsIgnoreCase(str.trim())) { chooseSb.append(",").append(str); } } chooseSb.deleteCharAt(0); cronSb.append(validList.get(2)).append(" ")//秒 .append(validList.get(1)).append(" ")//分 .append(validList.get(0)).append(" ")//小时 .append(chooseSb.toString()).append(" ")//日 .append(cfg.getUserRegion2()).append(" ")//月 .append("?").append(" ");//周 trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), true, cfg); triList.add(trigger); cronSb.setLength(0); cronSb.append(invalidList.get(2)).append(" ")//秒 .append(invalidList.get(1)).append(" ")//分 .append(invalidList.get(0)).append(" ")//小时 .append(chooseSb.toString()).append(" ")//日 .append(cfg.getUserRegion2()).append(" ")//月 .append("?").append(" ");//周 trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), false, cfg); triList.add(trigger); if(hasL) {// 月的最后一天,quartz 不支持 1,L 这种指定,所以 L单独处理一下 cronSb.setLength(0); cronSb.append(validList.get(2)).append(" ")//秒 .append(validList.get(1)).append(" ")//分 .append(validList.get(0)).append(" ")//小时 .append("L").append(" ")//日 .append(cfg.getUserRegion2()).append(" ")//月 .append("?").append(" ");//周 trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), true, cfg); triList.add(trigger); cronSb.setLength(0); cronSb.append(invalidList.get(2)).append(" ")//秒 .append(invalidList.get(1)).append(" ")//分 .append(invalidList.get(0)).append(" ")//小时 .append("L").append(" ")//日 .append(cfg.getUserRegion2()).append(" ")//月 .append("?").append(" ");//周 trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), false, cfg); triList.add(trigger); } }else if ("week".equalsIgnoreCase(dayWeek)) {//指定周,1#2: 第一周的周二,4L:最后一周的周四 for(String nthWeek : userRegion4.split(",")) {//第几周 for(String week : cfg.getUserRegion5().split(",")) {//星期几 cronSb.setLength(0); cronSb.append(validList.get(2)).append(" ")//秒 .append(validList.get(1)).append(" ")//分 .append(validList.get(0)).append(" ")//小时 .append("?").append(" ")//日 .append(cfg.getUserRegion2()).append(" ");//月 if("L".equalsIgnoreCase(nthWeek)) { cronSb.append(week).append("L");//周 }else { cronSb.append(week).append("#").append(nthWeek);//周 } trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), true, cfg); triList.add(trigger); cronSb.setLength(0); cronSb.append(invalidList.get(2)).append(" ")//秒 .append(invalidList.get(1)).append(" ")//分 .append(invalidList.get(0)).append(" ")//小时 .append("?").append(" ")//日 .append(cfg.getUserRegion2()).append(" ");//月 if("L".equalsIgnoreCase(nthWeek)) { cronSb.append(week).append("L");//周 }else { cronSb.append(week).append("#").append(nthWeek);//周 } trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), false, cfg); triList.add(trigger); } } } return triList; } /** * 查找最近的 星期几 ,包括今天 * @param date * @param w 周一开始 1 -7 * @return */ public static Date closestAfterWeek(Date date,int w) { Calendar cal = Calendar.getInstance(); cal.setTime(date); int i = cal.get(Calendar.DAY_OF_WEEK);//周日开始 1-7 i = (i==1)? 7: i-1;//转换为 周一到 周日 1-7 cal.add(Calendar.DAY_OF_MONTH, (i>w)?(7-(i-w)) : (w-i)); return cal.getTime(); } public static TriggerKey createTiggerKey(String name,String group) { TriggerKey key = new TriggerKey(name, group); return key; } /** * jquery cron 生成的cron 表达式,quartz 不能直接使用,需要做些修改 * @param cron * @return */ public static String modifyCronExp(String cron) { String[] cronArr = cron.split("\\s"); if("*".equals(cronArr[4])) { cronArr[4] = "?"; }else { cronArr[3] = "*"; cronArr[2] = "?"; } return "0 " + StringUtils.join(cronArr, " "); } public static void main(String[] args) { CronTriggerImpl cron = new CronTriggerImpl(); try { String exp = "0 0 0 ? 1,2 1#4"; cron.setCronExpression(exp); System.out.println(cron); } catch (ParseException e) { e.printStackTrace(); } } }