From 058a1c090b3190441f9b4822d53df51d804d1516 Mon Sep 17 00:00:00 2001 From: default Date: Mon, 12 Nov 2018 11:00:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=91=A8=E6=9C=9F=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9B=B8=E5=85=B3=E6=95=B0=E6=8D=AE=E7=BC=BA=E5=A4=B1?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ncMissionResultStatisticalInterceptor.java | 35 +++++- .../nms/thread/SyncSlaveToMasterThread.java | 104 ++++++++---------- 2 files changed, 82 insertions(+), 57 deletions(-) diff --git a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java index a6d74ff..ae6043f 100644 --- a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java +++ b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java @@ -188,7 +188,27 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ } // 修改周期任务的执行状态 - Record result = Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" + + Record result = null; + // 周期任务时进行统计 + if(loopmissionStateTableInfo.getInt("mission_state").equals(7)) { + result=Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" + + "from (select mrt.mission_id, \r\n" + + "ifnull(sum(case mrt.result when 7 then 1 else 0 end), 0) ok, \r\n" + + "ifnull(sum(case mrt.result when 4 then 1 else 0 end), 0) fail, \r\n" + + "count(mrt.seq_id) total \r\n" + + "from mission_result_table4 mrt \r\n" + + "group by mrt.mission_id) t \r\n" + + "left join (select max(lst.cur_mission_id) cur_mission_id, lst.mission_id \r\n" + + "from mission_result_table4 mrt4 \r\n" + + "left join loopmission_state_table lst on mrt4.mission_id = lst.cur_mission_id \r\n" + + "group by lst.mission_id) lmst \r\n" + + "on lmst.cur_mission_id = t.mission_id \r\n" + + "left join mission_state_table mst \r\n" + + "on mst.mission_id = lmst.mission_id \r\n" + + "where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" + + "and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id")); + }else { + result=Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" + "from (select mrt.mission_id, \r\n" + "ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" + "ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" + @@ -204,6 +224,7 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ "on mst.mission_id = lmst.mission_id \r\n" + "where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" + "and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id")); + } if(null!=result) { if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) { Long ok = result.getLong("ok"); @@ -244,6 +265,18 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ } break; } + }else { + missionState = loopmissionStateTableInfo.getInt("mission_state"); + Integer missionTableInfoState = missionTableInfo.getInt("mission_state"); + switch(missionState) { + case(3): + Record findFirst = Db.use().findFirst("select COUNT(*) count from loopmission_state_table where mission_id = ? and mission_state=0",loopmissionStateTableInfo.getLong("mission_id")); + if(findFirst.getInt("count").equals(0)&&missionTableInfoState!=3) { + status=3; + autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i"; + } + break; + } } Record missionStateTableResult =new Record(); diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index 93f31d1..b655ccc 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -132,45 +132,6 @@ public class SyncSlaveToMasterThread implements Runnable{ }); } }); - }else if(record.getStr("table_name").equals("loopmission_state_table")){ - Db.use(url.toString()).tx(new IAtom() { - @Override - public boolean run() throws SQLException { - // TODO Auto-generated method stub - return Db.use("masterDataSource").tx(new IAtom() { - @Override - public boolean run() throws SQLException { - Set set = SyncData.getThreadlocalLoopInsertMissionIds(); - final List insertDatas=new ArrayList(); - final List updateDatas=new ArrayList(); - for(Record entity:data) { - // 循环遍历数据 判断当前数据是新增还是修改 - Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name"))); - if(null!=findFirst) { - updateDatas.add(entity); - }else { - set.add(entity.getLong(record.getStr("id_name"))); - insertDatas.add(entity); - } - } - SyncData.setThreadlocalLoopInsertMissionIds(set); - if(insertDatas.size()>0) { - Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); - } - if(updateDatas.size()>0) { - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), - updateDatas, record.getInt("batch_size")); - } - logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name")); - logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); - record.set("last_id", lastInsertId); - record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); - return true; - } - }); - } - }); }else { Db.use(url.toString()).tx(new IAtom() { @Override @@ -227,25 +188,56 @@ public class SyncSlaveToMasterThread implements Runnable{ .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); - for(Record insertData:insertDatas){ - Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); - if(record.getStr("table_name").equals("event_record_library")) { - //设置数据状态为同步数据 - insertData.set("sync_status",1); - //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库 - insertData.set("old_id",insertData.getLong(record.getStr("id_name"))); - insertData.set("db_id", syncDbInfo.getLong("id")); + if(!record.getStr("table_name").equals("loopmission_state_table")) { + for(Record insertData:insertDatas){ + if(record.getStr("table_name").equals("event_record_library")) { + Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + //设置数据状态为同步数据 + insertData.set("sync_status",1); + //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库 + insertData.set("old_id",insertData.getLong(record.getStr("id_name"))); + insertData.set("db_id", syncDbInfo.getLong("id")); + insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); + } } - insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); + Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 + Object lastInsertId = data.get(data.size() - 1).get("id"); + logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url.toString()).update("table_sync_info", record); + return true; + }else{ + Object lastInsertId = data.get(data.size() - 1).get("id"); + Set set = SyncData.getThreadlocalLoopInsertMissionIds(); + final List insertDatas2=new ArrayList(); + final List updateDatas=new ArrayList(); + for(Record entity:insertDatas) { + // 循环遍历数据 判断当前数据是新增还是修改 + Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name"))); + if(null!=findFirst) { + updateDatas.add(entity); + }else { + set.add(entity.getLong(record.getStr("id_name"))); + insertDatas2.add(entity); + } + } + SyncData.setThreadlocalLoopInsertMissionIds(set); + if(insertDatas2.size()>0) { + Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + } + if(updateDatas.size()>0) { + Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } + logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name")); + logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url.toString()).update("table_sync_info", record); + return true; } - Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); - // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 - Object lastInsertId = data.get(data.size() - 1).get("id"); - logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); - record.set("last_id", lastInsertId); - record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); - return true; } }); }