修改周期任务相关数据缺失问题
This commit is contained in:
@@ -188,7 +188,27 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 修改周期任务的执行状态
|
// 修改周期任务的执行状态
|
||||||
Record result = Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
|
Record result = null;
|
||||||
|
// 周期任务时进行统计
|
||||||
|
if(loopmissionStateTableInfo.getInt("mission_state").equals(7)) {
|
||||||
|
result=Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
|
||||||
|
"from (select mrt.mission_id, \r\n" +
|
||||||
|
"ifnull(sum(case mrt.result when 7 then 1 else 0 end), 0) ok, \r\n" +
|
||||||
|
"ifnull(sum(case mrt.result when 4 then 1 else 0 end), 0) fail, \r\n" +
|
||||||
|
"count(mrt.seq_id) total \r\n" +
|
||||||
|
"from mission_result_table4 mrt \r\n" +
|
||||||
|
"group by mrt.mission_id) t \r\n" +
|
||||||
|
"left join (select max(lst.cur_mission_id) cur_mission_id, lst.mission_id \r\n" +
|
||||||
|
"from mission_result_table4 mrt4 \r\n" +
|
||||||
|
"left join loopmission_state_table lst on mrt4.mission_id = lst.cur_mission_id \r\n" +
|
||||||
|
"group by lst.mission_id) lmst \r\n" +
|
||||||
|
"on lmst.cur_mission_id = t.mission_id \r\n" +
|
||||||
|
"left join mission_state_table mst \r\n" +
|
||||||
|
"on mst.mission_id = lmst.mission_id \r\n" +
|
||||||
|
"where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
|
||||||
|
"and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
|
||||||
|
}else {
|
||||||
|
result=Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
|
||||||
"from (select mrt.mission_id, \r\n" +
|
"from (select mrt.mission_id, \r\n" +
|
||||||
"ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" +
|
"ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" +
|
||||||
"ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" +
|
"ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" +
|
||||||
@@ -204,6 +224,7 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
|||||||
"on mst.mission_id = lmst.mission_id \r\n" +
|
"on mst.mission_id = lmst.mission_id \r\n" +
|
||||||
"where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
|
"where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
|
||||||
"and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
|
"and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
|
||||||
|
}
|
||||||
if(null!=result) {
|
if(null!=result) {
|
||||||
if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) {
|
if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) {
|
||||||
Long ok = result.getLong("ok");
|
Long ok = result.getLong("ok");
|
||||||
@@ -244,6 +265,18 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}else {
|
||||||
|
missionState = loopmissionStateTableInfo.getInt("mission_state");
|
||||||
|
Integer missionTableInfoState = missionTableInfo.getInt("mission_state");
|
||||||
|
switch(missionState) {
|
||||||
|
case(3):
|
||||||
|
Record findFirst = Db.use().findFirst("select COUNT(*) count from loopmission_state_table where mission_id = ? and mission_state=0",loopmissionStateTableInfo.getLong("mission_id"));
|
||||||
|
if(findFirst.getInt("count").equals(0)&&missionTableInfoState!=3) {
|
||||||
|
status=3;
|
||||||
|
autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i";
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Record missionStateTableResult =new Record();
|
Record missionStateTableResult =new Record();
|
||||||
|
|||||||
@@ -132,45 +132,6 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}else if(record.getStr("table_name").equals("loopmission_state_table")){
|
|
||||||
Db.use(url.toString()).tx(new IAtom() {
|
|
||||||
@Override
|
|
||||||
public boolean run() throws SQLException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return Db.use("masterDataSource").tx(new IAtom() {
|
|
||||||
@Override
|
|
||||||
public boolean run() throws SQLException {
|
|
||||||
Set<Long> set = SyncData.getThreadlocalLoopInsertMissionIds();
|
|
||||||
final List<Record> insertDatas=new ArrayList<Record>();
|
|
||||||
final List<Record> updateDatas=new ArrayList<Record>();
|
|
||||||
for(Record entity:data) {
|
|
||||||
// 循环遍历数据 判断当前数据是新增还是修改
|
|
||||||
Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
|
|
||||||
if(null!=findFirst) {
|
|
||||||
updateDatas.add(entity);
|
|
||||||
}else {
|
|
||||||
set.add(entity.getLong(record.getStr("id_name")));
|
|
||||||
insertDatas.add(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SyncData.setThreadlocalLoopInsertMissionIds(set);
|
|
||||||
if(insertDatas.size()>0) {
|
|
||||||
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
|
||||||
}
|
|
||||||
if(updateDatas.size()>0) {
|
|
||||||
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
|
||||||
updateDatas, record.getInt("batch_size"));
|
|
||||||
}
|
|
||||||
logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
|
|
||||||
logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
|
|
||||||
record.set("last_id", lastInsertId);
|
|
||||||
record.set("last_date", new Date());
|
|
||||||
Db.use(url.toString()).update("table_sync_info", record);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}else {
|
}else {
|
||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
@@ -227,17 +188,18 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
||||||
insertIds.toArray());
|
insertIds.toArray());
|
||||||
|
if(!record.getStr("table_name").equals("loopmission_state_table")) {
|
||||||
for(Record insertData:insertDatas){
|
for(Record insertData:insertDatas){
|
||||||
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
|
|
||||||
if(record.getStr("table_name").equals("event_record_library")) {
|
if(record.getStr("table_name").equals("event_record_library")) {
|
||||||
|
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
|
||||||
//设置数据状态为同步数据
|
//设置数据状态为同步数据
|
||||||
insertData.set("sync_status",1);
|
insertData.set("sync_status",1);
|
||||||
//设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
|
//设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
|
||||||
insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
|
insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
|
||||||
insertData.set("db_id", syncDbInfo.getLong("id"));
|
insertData.set("db_id", syncDbInfo.getLong("id"));
|
||||||
}
|
|
||||||
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
|
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
||||||
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
||||||
Object lastInsertId = data.get(data.size() - 1).get("id");
|
Object lastInsertId = data.get(data.size() - 1).get("id");
|
||||||
@@ -246,6 +208,36 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
record.set("last_date", new Date());
|
record.set("last_date", new Date());
|
||||||
Db.use(url.toString()).update("table_sync_info", record);
|
Db.use(url.toString()).update("table_sync_info", record);
|
||||||
return true;
|
return true;
|
||||||
|
}else{
|
||||||
|
Object lastInsertId = data.get(data.size() - 1).get("id");
|
||||||
|
Set<Long> set = SyncData.getThreadlocalLoopInsertMissionIds();
|
||||||
|
final List<Record> insertDatas2=new ArrayList<Record>();
|
||||||
|
final List<Record> updateDatas=new ArrayList<Record>();
|
||||||
|
for(Record entity:insertDatas) {
|
||||||
|
// 循环遍历数据 判断当前数据是新增还是修改
|
||||||
|
Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
|
||||||
|
if(null!=findFirst) {
|
||||||
|
updateDatas.add(entity);
|
||||||
|
}else {
|
||||||
|
set.add(entity.getLong(record.getStr("id_name")));
|
||||||
|
insertDatas2.add(entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SyncData.setThreadlocalLoopInsertMissionIds(set);
|
||||||
|
if(insertDatas2.size()>0) {
|
||||||
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
||||||
|
}
|
||||||
|
if(updateDatas.size()>0) {
|
||||||
|
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
||||||
|
updateDatas, record.getInt("batch_size"));
|
||||||
|
}
|
||||||
|
logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
|
||||||
|
logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
|
||||||
|
record.set("last_id", lastInsertId);
|
||||||
|
record.set("last_date", new Date());
|
||||||
|
Db.use(url.toString()).update("table_sync_info", record);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user