定时任务任务表增加type:1-正常任务;2-全量同步执行时未执行的任务。

增加全量同步时未执行任务的扫描
全量同步时,业务无数据也需要下发{}至服务端。
This commit is contained in:
段冬梅
2019-04-09 09:10:40 +08:00
parent 08dd0f3868
commit bf00ccd875
13 changed files with 846 additions and 77 deletions

View File

@@ -331,6 +331,7 @@ public class ScheduleUpdateInterceptor extends BaseInterceptor{
if(schedule != null ) {
BeanUtils.copyProperties(baseCfg, schedule,new String[]{"userRegion1","userRegion2","userRegion3","userRegion4","userRegion5"});
schedule.setTableName(tableName);
schedule.setType(1);
}
return schedule;
}

View File

@@ -87,7 +87,7 @@ public class ScheduleCfgJob implements Job {
List<ScheduleCfg> newlyCfg = null;
int totalNum = 0;
do {
newlyCfg = dao.findNewlyCfg(scheduleCfgId, limit);
newlyCfg = dao.findNewlyCfg(scheduleCfgId, limit,1);
if(newlyCfg != null && newlyCfg.size() > 0) {
totalNum += newlyCfg.size();
for(ScheduleCfg cfg : newlyCfg) {//先取消之前的定时配置

View File

@@ -1,12 +1,20 @@
package com.nis.quartz;
import java.util.Date;
import org.apache.log4j.Logger;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import com.nis.domain.ScheduleCfg;
import com.nis.util.Constants;
import com.nis.util.DateUtil;
import com.nis.util.DictUtils;
import com.nis.web.dao.SchedulerDao;
import com.nis.web.service.ScheduleService;
import com.nis.web.service.SpringContextHolder;
@@ -23,6 +31,33 @@ public class ScheduleStatusJob implements Job{
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
//全量同步状态
String currentStatus = DictUtils.getDictLabel("currrent_sync_status", "status","0");
//0:start:开始 1:init:初始化 2:doing:进行中
//全量同步过程中未执行的任务进入另一个job中等待全量同步完成开始执行
if(currentStatus.equals("0") || currentStatus.equals("1")||currentStatus.equals("2")) {
//配置下发,并修改 配置表的状态,保存下发记录等
SchedulerDao schedulerDao = SpringContextHolder.getBean(SchedulerDao.class);
//从trigger中 获取 配置信息
JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
boolean isValid = jobDataMap.getBoolean("isValid");
ScheduleCfg cfg = (ScheduleCfg)jobDataMap.get("cfg");
cfg.setId(null);//新入库一个任务
cfg.setType(2);//全量同步中未执行的任务
cfg.setUserRegion1("single"); //只执行一次
if(isValid) {
cfg.setCronValid("1900-01-01 01:01:01"); //无实际效果,仅仅为填充值
cfg.setCronInvalid("");
}else {
cfg.setCronValid("");
cfg.setCronInvalid("1900-01-01 01:01:01");//无实际效果,仅仅为填充值
}
schedulerDao.insert(cfg);
}else {
//从trigger中 获取 配置信息
JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
boolean isValid = jobDataMap.getBoolean("isValid");
@@ -35,4 +70,6 @@ public class ScheduleStatusJob implements Job{
log.debug(String.format("任务开始执行compileId:%s,isValid:%s",compileId,isValid ));
}
}
}

View File

@@ -0,0 +1,548 @@
package com.nis.quartz;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.quartz.CalendarIntervalScheduleBuilder;
import org.quartz.CalendarIntervalTrigger;
import org.quartz.CronScheduleBuilder;
import org.quartz.DateBuilder;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.PersistJobDataAfterExecution;
import org.quartz.ScheduleBuilder;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.impl.triggers.CalendarIntervalTriggerImpl;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.quartz.spi.MutableTrigger;
import com.nis.domain.ScheduleCfg;
import com.nis.util.Constants;
import com.nis.util.DateUtil;
import com.nis.util.DateUtils;
import com.nis.util.DictUtils;
import com.nis.util.StringUtils;
import com.nis.web.dao.SchedulerDao;
import com.nis.web.service.SpringContextHolder;
/**
* 定时任务 配置全量同步时未执行的定时任务加载
* 1、每 n s 执行一次,每次读取 schedule_cfg 最新的数据
* 2、删除或新增 定时任务
* 3、单线程执行
* @author ddm
*
*/
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class ScheduleSyncCfgJob implements Job {
SimpleDateFormat sdf=new SimpleDateFormat(Constants.COMMON_DATE_FORMAT);
private static final Logger log = Logger.getLogger(ScheduleSyncCfgJob.class);
/**
* 状态组格式statusGroup-compileId
*/
private static final String STATUS_GROUP = "syncGroup-";
private static final String STATUS_JOB = "SYNC-JOB";
/**
* 生效标识valid-cronexp
*/
private static final String VALID_KEY = "valid-";
/**
* 失效标识invalid-cronexp
*/
private static final String INVALID_KEY = "invalid-";
private static final JobKey STATUS_JOBKEY = JobKey.jobKey(STATUS_JOB, "syncGroup");
private static final JobDetail STATUS_JOBDETAIL = JobBuilder.newJob(ScheduleStatusJob.class)
.withIdentity(STATUS_JOBKEY)
.storeDurably(true)
.build();
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
Scheduler scheduler = context.getScheduler();
SchedulerDao dao = SpringContextHolder.getBean(SchedulerDao.class);
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
long scheduleCfgId = dataMap.get("scheduleCfgId") == null ? 0:dataMap.getLong("scheduleCfgId");
long limit = dataMap.get("limit") == null ? 1000:dataMap.getLong("limit");
log.info(String.format("Sync定时配置任务开始扫描scheduleCfgId:%s,limit:%s",scheduleCfgId,limit ));
List<ScheduleCfg> newlyCfg = null;
int totalNum = 0;
//全量同步当前状态 0:start:开始 1:init:初始化 2:doing:进行中
String currentStatus = DictUtils.getDictLabel("currrent_sync_status", "status","0");
if(!(currentStatus.equals("0") || currentStatus.equals("1") || currentStatus.equals("2"))) {
do {
newlyCfg = dao.findNewlyCfg(scheduleCfgId, limit,2);
if(newlyCfg != null && newlyCfg.size() > 0) {
totalNum += newlyCfg.size();
for(ScheduleCfg cfg : newlyCfg) {//先取消之前的定时配置
Integer compileId = cfg.getCompileId();
try {
//取消之前所有的 trigger
GroupMatcher<TriggerKey> groupMatcher= GroupMatcher.triggerGroupEquals(STATUS_GROUP + compileId);
Set<TriggerKey> triggerKeys = scheduler.getTriggerKeys(groupMatcher);
if(triggerKeys != null && triggerKeys.size() > 0) {
for(TriggerKey tk : triggerKeys) {
scheduler.unscheduleJob(tk);
}
log.info(String.format("Sync定时任务取消成功compile:%s", compileId));
}
} catch (Exception e) {
log.error(String.format("Sync定时任务取消异常compileId:%s", compileId),e);
}
}
int index=0;
for(ScheduleCfg cfg : newlyCfg) {
Integer compileId = cfg.getCompileId();
try {
//判断状态,重新添加最新的 trigger
Integer isValid = cfg.getIsValid();
Integer isAudit = cfg.getIsAudit();
//添加定时任务的条件
if((isValid == 1 && isAudit == 1) || (isValid == 0 && isAudit == 0)) {
//添加定时任务包括valid 和 invalid
addJob(scheduler, cfg,index);
log.info(String.format("Sync定时任务添加成功compile:%s", compileId));
}
} catch (Exception e) {
log.error(String.format("Sync定时任务更新异常compileId:%s", compileId),e);
}
index++;
}
//最后 保存此次 最后的id
ScheduleCfg lastCfg = newlyCfg.get(newlyCfg.size() -1);
scheduleCfgId = lastCfg.getId();
dataMap.put("scheduleCfgId", scheduleCfgId);
log.info(String.format("Sync加载定时任务total num :%s", newlyCfg.size()));
}
} while (newlyCfg != null && newlyCfg.size() > 0);
log.info(String.format("Sync定时配置任务结束执行total num:%s",totalNum));
}
log.info("全量同步中缓存的定时配置正在扫描currentStatus"+currentStatus+"");
}
/**
* 将定时任务信息添加到 定时器框架中调度
* @param scheduler
* @param cfg
* @throws SchedulerException
*/
/*public static void addJob(Scheduler scheduler,ScheduleCfg cfg) throws SchedulerException {
Integer compileId = cfg.getCompileId();
String cronValid = cfg.getCronValid();
String cronInvalid = cfg.getCronInvalid();
Trigger validTrigger = createTrigger(cronValid, compileId, true, cfg);
Trigger invalidTrigger = createTrigger(cronInvalid, compileId, false, cfg);
boolean jobExist = scheduler.checkExists(STATUS_JOBKEY);
if(!jobExist) {//判断 job 是否存在,不存在添加
scheduler.addJob(STATUS_JOBDETAIL, false);
}
boolean checkExists = scheduler.checkExists(validTrigger.getKey());
if(!checkExists) {//判断 valid trigger 是否存在,不存在添加
scheduler.scheduleJob(validTrigger);
}else {
log.warn(String.format("Trigger already exists:%s ", validTrigger.getKey().toString()));
}
checkExists = scheduler.checkExists(invalidTrigger.getKey());
if(!checkExists) {//判断 invalid trigger 是否存在,不存在添加
scheduler.scheduleJob(invalidTrigger);
}else {
log.warn(String.format("Trigger already exists:%s ", invalidTrigger.getKey().toString()));
}
}*/
public static void addJob(Scheduler scheduler,ScheduleCfg cfg,int expire) throws SchedulerException {
List<Trigger> triList = createTrigger(cfg,expire);
boolean jobExist = scheduler.checkExists(STATUS_JOBKEY);
if(!jobExist) {//判断 job 是否存在,不存在添加
scheduler.addJob(STATUS_JOBDETAIL, false);
}
for(Trigger tri : triList) {
boolean checkExists = scheduler.checkExists(tri.getKey());
if(!checkExists) {//判断 valid trigger 是否存在,不存在添加
log.debug(String.format("Sync定时任务添加%s", tri.getKey()));
scheduler.scheduleJob(tri);
log.info(String.format("Sync定时任务添加成功%s", tri.getKey()));
}else {
log.warn(String.format("Triggersync already exists:%s ", tri.getKey().toString()));
}
}
}
/**
* 将页面配置的内容 转换成 trigger
* @param cfg
* @return
*/
public static List<Trigger> createTrigger(ScheduleCfg cfg,int expire){
String mode = cfg.getUserRegion1().toUpperCase();//定时任务运行模式:一次,每天,每周,每月
List<Trigger> triList = null;
switch (mode) {
case "ALWAYS"://单次运行,但只创建单次生效触发器
triList = createSimpleTrigger(cfg,expire);
break;
case "SINGLE"://单次运行
triList = createSimpleTrigger(cfg,expire);
break;
case "EVERYDAY"://每天运行 0 0 0 2/1 * ? ,不符合要求,定义每天都执行,然后在 代码判断 间隔时间
triList = createCalendarIntervalTrigger(cfg);
break;
case "EVERYWEEK"://每周运行
triList = createCalendarIntervalTrigger(cfg);
break;
case "EVERYMONTH"://每月运行
triList = createEveryMonthTrigger(cfg);
break;
default:
log.warn(String.format("unknown mode : %s ", mode));
break;
}
return triList;
}
/**
* 将时间转换成 时分秒
* @param time
* @return
*/
public static List<Integer> parseTime(String time) {
if(StringUtils.isNoneBlank(time)) {
String[] split = time.split(":");
List<Integer> tl = new ArrayList<Integer>(3);
for(String s : split) {
tl.add(Integer.valueOf(s));
}
return tl;
}
return null;
}
public static Trigger createCronTrigger(String cron,Integer compileId,boolean isValid,ScheduleCfg cfg) {
String triggerName = isValid ? (VALID_KEY + cron) : (INVALID_KEY + cron);
JobDataMap dataMap = new JobDataMap();
dataMap.put("isValid", isValid);
dataMap.put("cfg", cfg);
return TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triggerName, STATUS_GROUP+compileId))
.withSchedule(CronScheduleBuilder.cronSchedule(cron))
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.build();
}
/**
* 创建全量同步时未执行的任务,每个任务均为单词任务
* @param cfg
* @return
*/
public static List<Trigger> createSimpleTrigger(ScheduleCfg cfg,int expire){
List<Trigger> triList = new ArrayList<Trigger>();
Integer compileId = cfg.getCompileId();
String cronValid = cfg.getCronValid();
String cronInvalid = cfg.getCronInvalid();
Date validDate = null;
Date invalidDate = null;
if(StringUtils.isNotBlank(cronValid)){
Calendar c = Calendar.getInstance();
c.add(Calendar.MINUTE, (expire+1));
validDate = c.getTime();
}
if(StringUtils.isNotBlank(cronInvalid)){
Calendar c = Calendar.getInstance();
c.add(Calendar.MINUTE, (expire+2));
invalidDate = c.getTime();
}
JobDataMap dataMap = new JobDataMap();
if(validDate!=null){//生效时间如果不为空,则创建定时生效触发器
dataMap.put("isValid", true);
dataMap.put("cfg", cfg);
String triName = VALID_KEY + cfg.getUserRegion1() + "_" + cronValid;
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId))
.withSchedule(SimpleScheduleBuilder.simpleSchedule())
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.startAt(validDate)
.build();
triList.add(trigger);
}
if(invalidDate!=null){//失效时间如果不为空,则创建定时失效触发器
dataMap = new JobDataMap();
dataMap.put("isValid", false);
dataMap.put("cfg", cfg);
String triName = INVALID_KEY + cfg.getUserRegion1() + "_" + cronInvalid;
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId))
.withSchedule(SimpleScheduleBuilder.simpleSchedule())
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.startAt(invalidDate)
.build();
triList.add(trigger);
}
return triList;
}
/**
* 间隔 n 天 或 n 周执行
* @param cfg
* @return
*/
public static List<Trigger> createCalendarIntervalTrigger(ScheduleCfg cfg) {
List<Trigger> triList = new ArrayList<Trigger>();
Integer compileId = cfg.getCompileId();
String cronValid = cfg.getCronValid();
String cronInvalid = cfg.getCronInvalid();
String dayOrWeek = cfg.getUserRegion1();
Integer interval = Integer.valueOf(cfg.getUserRegion2());
List<Integer> validList = parseTime(cronValid);
List<Integer> invalidList = parseTime(cronInvalid);
Date validStartTime = DateBuilder.todayAt(validList.get(0), validList.get(1), validList.get(2));
Date invalidTime = DateBuilder.todayAt(invalidList.get(0), invalidList.get(1), invalidList.get(2));
CalendarIntervalScheduleBuilder intervalBuilder = null;
if("EVERYDAY".equalsIgnoreCase(dayOrWeek)) {
intervalBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withIntervalInDays(interval);
//valid
JobDataMap dataMap = new JobDataMap();
dataMap.put("isValid", true);
dataMap.put("cfg", cfg);
String triName = VALID_KEY + dayOrWeek+"("+interval+")" + "_" + DateUtils.formatDate(validStartTime, Constants.COMMON_DATE_FORMAT);
Trigger validTri = TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId))
.withSchedule(intervalBuilder)
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.startAt(validStartTime)
.build();
triList.add(validTri);
//invalid
dataMap = new JobDataMap();
dataMap.put("isValid", false);
dataMap.put("cfg", cfg);
triName = INVALID_KEY + dayOrWeek +"("+interval+")" + "_" + DateUtils.formatDate(invalidTime, Constants.COMMON_DATE_FORMAT);
validTri = TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId))
.withSchedule(intervalBuilder)
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.startAt(invalidTime)
.build();
triList.add(validTri);
}else if("EVERYWEEK".equalsIgnoreCase(dayOrWeek)) {
intervalBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withIntervalInWeeks(interval);
String[] weeks = cfg.getUserRegion3().split(",");
for(String week : weeks) {
if(StringUtils.isNoneBlank(week)) {
Date temp = closestAfterWeek(validStartTime, Integer.valueOf(week));
JobDataMap dataMap = new JobDataMap();
dataMap.put("isValid", true);
dataMap.put("cfg", cfg);
String triName = VALID_KEY + dayOrWeek +week+"("+interval+")" + "_" + DateUtils.formatDate(temp, Constants.COMMON_DATE_FORMAT);
Trigger validTri = TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId))
.withSchedule(intervalBuilder)
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.startAt(temp)
.build();
triList.add(validTri);
//invalid
dataMap = new JobDataMap();
dataMap.put("isValid", false);
dataMap.put("cfg", cfg);
temp = closestAfterWeek(invalidTime, Integer.valueOf(week));
triName = INVALID_KEY + dayOrWeek +week+"("+interval+")" + "_" + DateUtils.formatDate(temp, Constants.COMMON_DATE_FORMAT);
validTri = TriggerBuilder.newTrigger()
.withIdentity(createTiggerKey(triName, STATUS_GROUP+compileId))
.withSchedule(intervalBuilder)
.usingJobData(dataMap)
.forJob(STATUS_JOBDETAIL)
.startAt(temp)
.build();
triList.add(validTri);
}
}
}
return triList;
}
/**
* 每月 执行
* @param cfg
* @return
*/
public static List<Trigger> createEveryMonthTrigger(ScheduleCfg cfg){
String dayWeek = cfg.getUserRegion3();
String cronInvalid = cfg.getCronInvalid();
String cronValid = cfg.getCronValid();
StringBuilder cronSb = new StringBuilder();
Trigger trigger = null;
List<Integer> validList = parseTime(cronValid);//time 转换
List<Integer> invalidList = parseTime(cronInvalid);//time 转换
List<Trigger> triList = new ArrayList<Trigger>();
String userRegion4 = cfg.getUserRegion4().toUpperCase();
if("day".equalsIgnoreCase(dayWeek)) {//指定天
boolean hasL = userRegion4.contains("L");
StringBuilder chooseSb = new StringBuilder();
for(String str : userRegion4.split(",")) {
if(!"L".equalsIgnoreCase(str.trim())) {
chooseSb.append(",").append(str);
}
}
chooseSb.deleteCharAt(0);
cronSb.append(validList.get(2)).append(" ")//秒
.append(validList.get(1)).append(" ")//分
.append(validList.get(0)).append(" ")//小时
.append(chooseSb.toString()).append(" ")//日
.append(cfg.getUserRegion2()).append(" ")//月
.append("?").append(" ");//周
trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), true, cfg);
triList.add(trigger);
cronSb.setLength(0);
cronSb.append(invalidList.get(2)).append(" ")//秒
.append(invalidList.get(1)).append(" ")//分
.append(invalidList.get(0)).append(" ")//小时
.append(chooseSb.toString()).append(" ")//日
.append(cfg.getUserRegion2()).append(" ")//月
.append("?").append(" ");//周
trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), false, cfg);
triList.add(trigger);
if(hasL) {// 月的最后一天quartz 不支持 1,L 这种指定,所以 L单独处理一下
cronSb.setLength(0);
cronSb.append(validList.get(2)).append(" ")//秒
.append(validList.get(1)).append(" ")//分
.append(validList.get(0)).append(" ")//小时
.append("L").append(" ")//日
.append(cfg.getUserRegion2()).append(" ")//月
.append("?").append(" ");//周
trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), true, cfg);
triList.add(trigger);
cronSb.setLength(0);
cronSb.append(invalidList.get(2)).append(" ")//秒
.append(invalidList.get(1)).append(" ")//分
.append(invalidList.get(0)).append(" ")//小时
.append("L").append(" ")//日
.append(cfg.getUserRegion2()).append(" ")//月
.append("?").append(" ");//周
trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), false, cfg);
triList.add(trigger);
}
}else if ("week".equalsIgnoreCase(dayWeek)) {//指定周1#2: 第一周的周二4L:最后一周的周四
for(String nthWeek : userRegion4.split(",")) {//第几周
for(String week : cfg.getUserRegion5().split(",")) {//星期几
cronSb.setLength(0);
cronSb.append(validList.get(2)).append(" ")//秒
.append(validList.get(1)).append(" ")//分
.append(validList.get(0)).append(" ")//小时
.append("?").append(" ")//日
.append(cfg.getUserRegion2()).append(" ");//月
if("L".equalsIgnoreCase(nthWeek)) {
cronSb.append(week).append("L");//周
}else {
cronSb.append(week).append("#").append(nthWeek);//周
}
trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), true, cfg);
triList.add(trigger);
cronSb.setLength(0);
cronSb.append(invalidList.get(2)).append(" ")//秒
.append(invalidList.get(1)).append(" ")//分
.append(invalidList.get(0)).append(" ")//小时
.append("?").append(" ")//日
.append(cfg.getUserRegion2()).append(" ");//月
if("L".equalsIgnoreCase(nthWeek)) {
cronSb.append(week).append("L");//周
}else {
cronSb.append(week).append("#").append(nthWeek);//周
}
trigger = createCronTrigger(cronSb.toString(), cfg.getCompileId(), false, cfg);
triList.add(trigger);
}
}
}
return triList;
}
/**
* 查找最近的 星期几 ,包括今天
* @param date
* @param w 周一开始 1 -7
* @return
*/
public static Date closestAfterWeek(Date date,int w) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
int i = cal.get(Calendar.DAY_OF_WEEK);//周日开始 1-7
i = (i==1)? 7: i-1;//转换为 周一到 周日 1-7
cal.add(Calendar.DAY_OF_MONTH, (i>w)?(7-(i-w)) : (w-i));
return cal.getTime();
}
public static TriggerKey createTiggerKey(String name,String group) {
TriggerKey key = new TriggerKey(name, group);
return key;
}
/**
* jquery cron 生成的cron 表达式quartz 不能直接使用,需要做些修改
* @param cron
* @return
*/
public static String modifyCronExp(String cron) {
String[] cronArr = cron.split("\\s");
if("*".equals(cronArr[4])) {
cronArr[4] = "?";
}else {
cronArr[3] = "*";
cronArr[2] = "?";
}
return "0 " + StringUtils.join(cronArr, " ");
}
public static void main(String[] args) {
CronTriggerImpl cron = new CronTriggerImpl();
try {
String exp = "0 0 0 ? 1,2 1#4";
cron.setCronExpression(exp);
System.out.println(cron);
} catch (ParseException e) {
e.printStackTrace();
}
}
}

View File

@@ -852,6 +852,7 @@ public class ConfigServiceUtil {
throw new MaatConvertException("<spring:message code=\"request_service_failed\"/>:"+result);
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}finally {
if (response != null) {
@@ -898,6 +899,7 @@ public class ConfigServiceUtil {
throw new MaatConvertException("<spring:message code=\"request_service_failed\"/>:"+result);
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}finally {
if (response != null) {

View File

@@ -72,6 +72,13 @@ public class ConfigSynchronizationController extends BaseController {
dictService.updateDictItem(item);
//删除字典缓存
CacheUtils.remove(Constants.CACHE_DICT_MAP);
//全量同步之前等待一段时间,避免定时任务还未扫描到当前的同步状态。
try{
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
send(serviceId,model,request,response);

View File

@@ -20,7 +20,7 @@ public interface SchedulerDao extends CrudDao<ScheduleCfg> {
* @param size
* @return
*/
List<ScheduleCfg> findNewlyCfg(@Param("id")Long id,@Param("limit")Long limit);
List<ScheduleCfg> findNewlyCfg(@Param("id")Long id,@Param("limit")Long limit,@Param("type")int type);
/**
* 更新 del_flag 字段为删除标识

View File

@@ -135,6 +135,7 @@
a.CFG_ID,
a.IS_VALID,
a.IS_AUDIT,
a.function_id,
a.CREATOR_ID,
a.CREATE_TIME,
a.EDITOR_ID,
@@ -149,6 +150,7 @@
<where>
del_Flag = 1
and id > #{id}
and type=#{type}
</where>
order by a.id
limit #{limit}
@@ -176,7 +178,8 @@
user_region2,
user_region3,
user_region4,
user_region5
user_region5,
type
) values (
#{name,jdbcType=VARCHAR},
#{cronValid,jdbcType=VARCHAR},
@@ -196,7 +199,8 @@
#{userRegion2,jdbcType=VARCHAR},
#{userRegion3,jdbcType=VARCHAR},
#{userRegion4,jdbcType=VARCHAR},
#{userRegion5,jdbcType=VARCHAR}
#{userRegion5,jdbcType=VARCHAR},
#{type,jdbcType=INTEGER}
)
</insert>
@@ -236,7 +240,19 @@
<!-- 修改配置表状态 -->
<update id="updateCfgTableStatus">
update ${tableName} set is_valid = #{isValid} ,is_audit = 1 where compile_id = #{compileId}
update ${tableName}
<set>
is_valid = #{isValid} ,
is_audit = 1,
<!-- <if test="isValid == 0 ">
is_audit = 3,
</if>
<if test="isValid == 1 ">
is_audit = 1,
</if> -->
audit_time=now(),
</set>
where compile_id = #{compileId}
</update>
<!-- 查询最新的配置状态 -->

View File

@@ -40,37 +40,22 @@ public class ScheduleService extends BaseService{
ScheduleExceInfo exceNew = dao.findScheduleExceNew(se);
//2、如果已经下发直接下发状态否则下发配置
Integer issueResult = 1;
//是否下发内容
Integer isIssueContent = 1;
String errorInfo = null;
String tableName = cfg.getTableName();
SchedulerTaskUtil scheduler = new SchedulerTaskUtil();
boolean udpateConfigStatus = false;
try {
if(isValid == 1 && (exceNew == null || exceNew.getIsIssue() == 1)) {//生效配置需要下发
udpateConfigStatus = scheduler.updateConfigByServiceAndCompile(cfg.getServiceId(), compileId, isValid, 1,configSynchronizationDao);
logger.info(String.format("定时器下发配置内容compileId:%s,isValid:%s,issueResult:%s,errorInfo:%s",compileId,isValid,issueResult,errorInfo));
}else{//只需修改状态
udpateConfigStatus = scheduler.updateConfigByServiceAndCompile(cfg.getServiceId(), compileId, isValid, 0,configSynchronizationDao);
logger.info(String.format("定时器修改配置状态compileId:%s,isValid:%s,issueResult:%s,errorInfo:%s",compileId,isValid,issueResult,errorInfo));
//生效配置需要下发
if(isValid == 1 && (exceNew == null || exceNew.getIsIssue() == 1)) {
isIssueContent=1;
}else {
//之下发状态
isIssueContent=0;
}
} catch (NoSuchFieldException e) {
udpateConfigStatus = false;
e.printStackTrace();
} catch (SecurityException e) {
udpateConfigStatus = false;
e.printStackTrace();
} catch (IllegalArgumentException e) {
udpateConfigStatus = false;
e.printStackTrace();
} catch (IllegalAccessException e) {
udpateConfigStatus = false;
e.printStackTrace();
}
// logger.info(String.format("配置状态更新compileId:%s,isValid:%s,issueResult:%s,errorInfo:%s",compileId,isValid,issueResult,errorInfo));
if(udpateConfigStatus){//配置更新成功
//首先对数据库操作
if(exceNew == null) {
//新增exce_new 表状态
exceNew = new ScheduleExceInfo();
@@ -108,7 +93,27 @@ public class ScheduleService extends BaseService{
//3、更新 配置表的 isValid 字段,添加 exce_log 记录
dao.insertScheduleExceLog(exceNew);
dao.updateCfgTableStatus(tableName, compileId, isValid);
//数据库操作完成后与服务端交互
try {
udpateConfigStatus = scheduler.updateConfigByServiceAndCompile(cfg.getServiceId(), compileId, isValid, isIssueContent,configSynchronizationDao);
logger.info(String.format("定时器下发配置内容compileId:%s,isValid:%s,issueResult:%s,errorInfo:%s",compileId,isValid,issueResult,errorInfo));
} catch (NoSuchFieldException e) {
udpateConfigStatus = false;
e.printStackTrace();
} catch (SecurityException e) {
udpateConfigStatus = false;
e.printStackTrace();
} catch (IllegalArgumentException e) {
udpateConfigStatus = false;
e.printStackTrace();
} catch (IllegalAccessException e) {
udpateConfigStatus = false;
e.printStackTrace();
}
// logger.info(String.format("配置状态更新compileId:%s,isValid:%s,issueResult:%s,errorInfo:%s",compileId,isValid,issueResult,errorInfo));
}

View File

@@ -155,8 +155,8 @@ public class ConfigSynchronizationService extends BaseService{
Page<BaseCfg> page=new Page<BaseCfg>(request,response,Constants.MAAT_JSON_SEND_SIZE,"a");
handleCallbackData(className,page,entity,request,response,false);
}
//如果业务没有配置数据,并且为最后一个业务,需要发送给服务接口一个结束标识
if(!isFinished && lastServiceTag){
//最后一个业务,发送一个结束标识
if(lastServiceTag){
String json = "{}";
//如果是所有业务全量同步需要发送一个service=-1的请求有助于服务端删除分组复用配置
if(StringUtil.isEmpty(serviceIdCondition)){
@@ -477,7 +477,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
}
@@ -649,7 +649,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
}
@@ -839,7 +839,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
}
@@ -1010,7 +1010,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
}
@@ -1339,7 +1339,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
@@ -1548,7 +1548,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
@@ -1703,7 +1703,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+compileId+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
@@ -1859,7 +1859,7 @@ public class ConfigSynchronizationService extends BaseService{
//调用服务接口配置全量更新
isFinished = ((!hasData)&&lastServiceTag)?true:false;
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+compileId+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,1,entity.getServiceId(),null,null);
logger.info("全量下发响应信息:"+result.toString());
}
@@ -2021,7 +2021,7 @@ public class ConfigSynchronizationService extends BaseService{
FileUtils.writeToFile("/home/ceiec/configSync/"+DateUtils.getDate("yyyy-MM-dd")+"/"+entity.getServiceId()+"_"+page.getPageNo()+"_"+System.currentTimeMillis()+".json", json, false);
//调用服务接口同步回调类配置
isFinished = ((!hasData)&&lastServiceTag)?true:false;
JSONObject result = ConfigServiceUtil.configSync(json,2,entity.getServiceId(),entity.getTableName(),isFinished?"FINISHED":null);
JSONObject result = ConfigServiceUtil.configSync(json,2,entity.getServiceId(),entity.getTableName(),null);
logger.info("全量下发响应信息:"+result.toString());
}
}

View File

@@ -23,7 +23,7 @@
<property name="group" value="cfg" />
</bean>
<!-- 定时检查 schedule_cfg 表 -->
<!-- 定时检查 schedule_cfg(正常任务)-->
<bean id="scheduleCfgSimpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
<property name="jobDetail">
<ref bean="scheduleCfgJobDetail" />
@@ -35,6 +35,24 @@
</bean>
<!-- 配置加载 任务执行类 -->
<bean id="scheduleSyncCfgJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="com.nis.quartz.ScheduleSyncCfgJob"/>
<property name="name" value="scheduleSyncCfgJobDetail" />
<property name="group" value="cfg" />
</bean>
<!-- 定时检查 schedule_cfg(全量同步未执行任务) 表 -->
<bean id="scheduleSyncCfgSimpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
<property name="jobDetail">
<ref bean="scheduleSyncCfgJobDetail" />
</property>
<property name="repeatInterval" value="60000" /><!-- 间隔 60s 执行一次 -->
<property name="startDelay" value="60000" /><!-- 延迟 60s 启动 -->
<property name="name" value="scheduleSyncCfgTri" />
<property name="group" value="syncCfg" />
</bean>
<!-- 定时任务 -->
<bean id="NTCScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" >
<property name="dataSource" ref="ProductDataSource" /><!-- 配置数据源 -->
@@ -45,6 +63,7 @@
<property name="triggers">
<list>
<ref bean="scheduleCfgSimpleTrigger" />
<ref bean="scheduleSyncCfgSimpleTrigger" />
</list>
</property>
</bean>

View File

@@ -0,0 +1,133 @@
CREATE DEFINER=`root`@`%` PROCEDURE `proc_statistics_config`()
BEGIN
DECLARE ntime VARCHAR(40);/*当前时间*/
DECLARE otime VARCHAR(40);/*上次统计时间*/
DECLARE nRow VARCHAR(40);/*本次统计条数*/
DECLARE tabName VARCHAR(500);
DECLARE description VARCHAR(500);
DECLARE deleteSql VARCHAR(500);
DECLARE done INT;/*游标标识*/
DECLARE flag INT;/*循环标识*/
DECLARE t_error INT;/*错误标识*/
DECLARE proc_log_table VARCHAR(100);/*存储过程日志表*/
DECLARE proc_name VARCHAR(100);/*存储过程名称*/
DECLARE icursor CURSOR FOR SELECT tab_name FROM statistics_tables where is_valid=1;
DECLARE CONTINUE HANDLER FOR NOT found SET done=1;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION set t_error=1;
select max(statistic_time) into otime from cfg_num_statistics;
SET done=0;
SET t_error=0;
SET proc_log_table='proc_exec_log';
SET proc_name='proc_statistics_config';
SET ntime=DATE_FORMAT(SYSDATE(),'%Y-%m-%d %H:%i:%S');
OPEN icursor;
loop_iloop:LOOP
FETCH icursor INTO tabName;
SET description=tabName;
set @descriptionStart=concat(description,'表统计start');
/*统计当前配置表数据到统计表中start*/
set @v_log_sql1 := concat('insert into ',proc_log_table,'(proc_name,table_name,log_time,description) values(?,?,?,?)');
PREPARE execs FROM @v_log_sql1;
EXECUTE execs using proc_name,proc_log_table,ntime,@descriptionStart;
DEALLOCATE PREPARE execs;
COMMIT;
#没有定时任务前的统计is_audit=1 && is_valid=1为approved
#set @insert_statistics_sql := concat('insert into cfg_num_statistics(statistic_time,audit_time,function_id,service_id,action,compile_id,cfg_state) select ','''',ntime,'''',',','audit_time,function_id,service_id,action,compile_id,if(is_audit=3,3,if(is_audit=2,2,if(is_audit=1,1,if(is_valid=0,0,if(is_valid,-1,-1))))) cfg_state from ',tabName);
#20190328 增加定时任务后is_audit=1 && (is_valid=1 || is_valid=0) 即为approved 为与界面显示保持一致更改统计条件此时的结果中approved并不代表所有有效的配置总数
set @insert_statistics_sql := concat('insert into cfg_num_statistics(statistic_time,function_id,service_id,action,compile_id,cfg_state) select ','''',ntime,'''',',','function_id,service_id,action,compile_id,if(is_valid=-1,-1,if(is_valid=1,1,if(is_valid=0 && is_audit=0,0,if(is_valid=0 && is_audit=1,1,if(is_valid=0 && is_audit=2,2,if(is_valid=0 && is_audit=3,3,0)))))) cfg_state from ',tabName);
PREPARE execs FROM @insert_statistics_sql;
EXECUTE execs;
DEALLOCATE PREPARE execs;
COMMIT;
set @descriptionEnd=concat(description,'表统计end');
set @v_log_sql2 := concat('insert into ',proc_log_table,'(proc_name,table_name,log_time,description) values(?,?,?,?)');
PREPARE execs FROM @v_log_sql2;
EXECUTE execs using proc_name,proc_log_table,ntime,@descriptionEnd;
DEALLOCATE PREPARE execs;
COMMIT;
/*异常退出loop*/
IF t_error=1 THEN
LEAVE loop_iloop;
END IF;
/*循环结束退出loop*/
IF done=1 THEN
LEAVE loop_iloop;
ELSE
SET flag=0;
END IF;
IF flag=0 THEN
SET done=0;
END IF;
END LOOP loop_iloop;
CLOSE icursor;
/*取出本次统计条数*/
SELECT count(statistic_time) INTO nRow from cfg_num_statistics where statistic_time=ntime;
IF t_error=1 THEN /*如果异常清楚本次数据*/
delete from cfg_num_statistics where statistic_time=ntime;
COMMIT;
ELSEIF nRow > 0 THEN /*判断本次统计是否有数据录入,如果有则删除上次统计数据,如果没有则不清除上次统计数据*/
delete from cfg_num_statistics where statistic_time=otime;
COMMIT;
END IF;
COMMIT;
END

View File

@@ -0,0 +1 @@
ALTER TABLE schedule_cfg ADD type int(1) DEFAULT 1 COMMENT '1:定时任务2全量同步时未执行的任务';