From 83ebcbea0f2b28b17e7a68b9673e250590b13529 Mon Sep 17 00:00:00 2001 From: default Date: Mon, 5 Nov 2018 16:18:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=91=A8=E6=9C=9F=E4=BB=BB=E5=8A=A1=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E5=8A=9F=E8=83=BD=E4=BB=A5=E5=8F=8Abug=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ncMissionResultStatisticalInterceptor.java | 113 +++++++++++++++++- nms_sync/src/com/nms/main/SyncData.java | 38 ++++++ .../nms/thread/SyncSlaveToMasterThread.java | 53 +++++++- 3 files changed, 201 insertions(+), 3 deletions(-) diff --git a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java index d4eabc2..895525c 100644 --- a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java +++ b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; + import org.apache.log4j.Logger; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Interceptor; @@ -185,7 +186,7 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ * 处理非周期任务的统计功能 */ Set insertMissionIds = SyncData.getThreadlocalInsertMissionIds(); - logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(insertMissionIds)); + logger.info("即将进行统计的所有新增任务的mission_id为:"+JSON.toJSONString(insertMissionIds)); Set updateMissionIds = SyncData.getThreadlocalUpdateMissionIds(); logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds)); @@ -208,11 +209,29 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ /** * 处理周期任务的统计功能 */ + Set loopInsertCurMissionIds = SyncData.getThreadlocalLoopInsertMissionIds(); + logger.info("即将进行统计的所有周期新增任务的mission_id为:"+JSON.toJSONString(insertMissionIds)); + Set loopUpdateCurMissionIds = SyncData.getThreadlocalLoopUpdateMissionIds(); + logger.info("即将进行统计的所有周期修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds)); + + if(loopInsertCurMissionIds.size()>0) { + for (Long missionId : loopInsertCurMissionIds) { + StatisticalLoopHandle(missionId,true); + } + } + + if(loopUpdateCurMissionIds.size()>0) { + for (Long missionId : loopUpdateCurMissionIds) { + StatisticalLoopHandle(missionId,false); + } + } // 统计结束后 清空threadLocal中的值 SyncData.removeThreadlocalUpdateMissionIds(); SyncData.removeThreadlocalInsertMissionIds(); + SyncData.removeThreadlocalLoopUpdateMissionIds(); + SyncData.removeThreadlocalLoopInsertMissionIds(); logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------"); }catch(Exception e){ e.printStackTrace(); @@ -303,7 +322,97 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ /** * 处理周期任务的统计功能 */ - public void StatisticalLoopHandle(Long missionId) { + public void StatisticalLoopHandle(Long curMissionId,boolean isInsert) { + Record loopmissionStateTableInfo = Db.use().findFirst("select * from loopmission_state_table where cur_mission_id = ? ",curMissionId); + String missionDesc=""; + Integer status=null; + String autoDesc=""; + Record missionTableInfo =null; + Integer missionState = null; + // 判断任务状态为1 根据同步结果数据 修改为状态2 正在执行 + if(null!=loopmissionStateTableInfo) { + missionTableInfo = Db.use().findFirst("select * from mission_state_table where mission_id = ? ",loopmissionStateTableInfo.getLong("mission_id")); + if(null!=missionTableInfo&&missionTableInfo.getInt("mission_state").equals(1)&&isInsert) { + missionTableInfo.set("mission_state",2); + if(null==missionTableInfo.getStr("auto_desc")) { + missionTableInfo.set("auto_desc", format.format(System.currentTimeMillis())+"i18n_server.LoadNewMissionThread.missionStart_n81i"); + } + Db.update("mission_state_table", missionTableInfo); + } + } + // 修改周期任务的执行状态 + if(null!=loopmissionStateTableInfo) { + Record result = Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" + + "from (select mrt.mission_id, \r\n" + + "ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" + + "ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" + + "count(mrt.seq_id) total \r\n" + + "from mission_result_table4 mrt \r\n" + + "group by mrt.mission_id) t \r\n" + + "left join (select max(lst.cur_mission_id) cur_mission_id, lst.mission_id \r\n" + + "from mission_result_table4 mrt4 \r\n" + + "left join loopmission_state_table lst on mrt4.mission_id = lst.cur_mission_id \r\n" + + "group by lst.mission_id) lmst \r\n" + + "on lmst.cur_mission_id = t.mission_id \r\n" + + "left join mission_state_table mst \r\n" + + "on mst.mission_id = lmst.mission_id \r\n" + + "where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" + + "and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id")); + if(null!=result) { + if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) { + Long ok = result.getLong("ok"); + Long fail =result.getLong("fail"); + Long total = result.getLong("total"); + missionDesc="i18n_server.UpgradeService.sql.cycle_n81i "+total+" i18n_server.UpgradeService.sql.executeNode2_n81i,
"+(total-ok-fail)+" i18n_server.UpgradeService.sql.unexecute_n81i,
"+(ok+fail)+" i18n_server.UpgradeService.sql.execute_n81i【i18n_server.UpgradeService.sql.success_n81i "+ok+" i18n_sserver.UpgradeService.sql.failed_n81i "+fail+" 】"; + } + + } + } + if(null!=loopmissionStateTableInfo&&!isInsert) { + missionState = loopmissionStateTableInfo.getInt("mission_state"); + Integer missionTableInfoState = missionTableInfo.getInt("mission_state"); + switch(missionState) { + case(3): + Record findFirst = Db.use().findFirst("select COUNT(*) count from loopmission_state_table where mission_id = ? and mission_state=0",loopmissionStateTableInfo.getLong("mission_id")); + if(findFirst.getInt("count").equals(0)&&missionTableInfoState!=3) { + status=3; + autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i"; + } + break; + case(4): + if(missionTableInfoState!=4) { + status=4; + autoDesc=format.format(System.currentTimeMillis())+" Task execution failure "; + } + break; + case(6): + if(missionTableInfoState!=6) { + status=6; + autoDesc=format.format(System.currentTimeMillis())+"Task revocation start execute"; + } + break; + case(7): + if(missionTableInfoState!=7) { + status=7; + autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i"; + } + break; + } + } + if(null!=loopmissionStateTableInfo) { + Record missionStateTableResult =new Record(); + missionStateTableResult.set("mission_id", loopmissionStateTableInfo.get("mission_id")); + if(status!=null) { + missionStateTableResult.set("mission_state", status); + } + if(missionDesc.length()>0&&null!=missionState&&missionState!=6&&missionState!=7) { + missionStateTableResult.set("mission_state_desc",missionDesc); + } + if(autoDesc.length()>0) { + missionStateTableResult.set("auto_desc",autoDesc); + } + Db.use("masterDataSource").update("mission_state_table","mission_id",missionStateTableResult); + } } } diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java index 37a1a03..70ec188 100644 --- a/nms_sync/src/com/nms/main/SyncData.java +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -26,8 +26,14 @@ import com.nms.thread.SyncThread; * */ public class SyncData{ + // 线程变量记录任务结果修改的任务id private static final ThreadLocal> threadLocalUpdateMissionIds = new ThreadLocal>(); + // 线程变量记录任务结果新增的任务id private static final ThreadLocal> threadLocalInsertMissionIds = new ThreadLocal>(); + // 线程变量记录周期任务结果修改的任务id + private static final ThreadLocal> threadLocalLoopUpdateMissionIds = new ThreadLocal>(); + // 线程变量记录周期任务结果新增的任务id + private static final ThreadLocal> threadLocalLoopInsertMissionIds = new ThreadLocal>(); public static void main(String[] args) { @@ -100,4 +106,36 @@ public class SyncData{ public static void removeThreadlocalInsertMissionIds() { threadLocalInsertMissionIds.remove(); } + + + + public static Set getThreadlocalLoopUpdateMissionIds() { + Set set = threadLocalLoopUpdateMissionIds.get(); + if(null==set) { + set=new HashSet(); + } + return set; + } + public static void setThreadlocalLoopUpdateMissionIds(Set set) { + threadLocalLoopUpdateMissionIds.set(set); + } + + public static void removeThreadlocalLoopUpdateMissionIds() { + threadLocalLoopUpdateMissionIds.remove(); + } + + public static Set getThreadlocalLoopInsertMissionIds() { + Set set = threadLocalLoopInsertMissionIds.get(); + if(null==set) { + set=new HashSet(); + } + return set; + } + public static void setThreadlocalLoopInsertMissionIds(Set set) { + threadLocalLoopInsertMissionIds.set(set); + } + + public static void removeThreadlocalLoopInsertMissionIds() { + threadLocalLoopInsertMissionIds.remove(); + } } diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index edd1782..907f4b3 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -76,7 +76,7 @@ public class SyncSlaveToMasterThread implements Runnable{ entity.remove(record.getStr("id_name")); } SyncData.setThreadlocalInsertMissionIds(set); - }else { + }else if(record.getInt("mode").equals(2)){ for(Record entity:data) { entity.remove(record.getStr("id_name")); } @@ -121,6 +121,45 @@ public class SyncSlaveToMasterThread implements Runnable{ }); } }); + }else if(record.getStr("table_name").equals("loopmission_state_table")){ + Db.use(url.toString()).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + // TODO Auto-generated method stub + return Db.use("masterDataSource").tx(new IAtom() { + @Override + public boolean run() throws SQLException { + Set set = SyncData.getThreadlocalLoopInsertMissionIds(); + final List insertDatas=new ArrayList(); + final List updateDatas=new ArrayList(); + for(Record entity:data) { + // 循环遍历数据 判断当前数据是新增还是修改 + Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name"))); + if(null!=findFirst) { + updateDatas.add(entity); + }else { + set.add(entity.getLong(record.getStr("id_name"))); + insertDatas.add(entity); + } + } + SyncData.setThreadlocalLoopInsertMissionIds(set); + if(insertDatas.size()>0) { + Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + } + if(updateDatas.size()>0) { + Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } + logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name")); + logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url.toString()).update("table_sync_info", record); + return true; + } + }); + } + }); }else { Db.use(url.toString()).tx(new IAtom() { @Override @@ -268,6 +307,13 @@ public class SyncSlaveToMasterThread implements Runnable{ updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name"))); } } + if(record.getStr("table_name").equals("loopmission_state_table")) { + Set missionIds = SyncData.getThreadlocalLoopUpdateMissionIds(); + for(Record updateData:updateDatas) { + missionIds.add(updateData.getLong(record.getStr("id_name"))); + } + SyncData.setThreadlocalLoopUpdateMissionIds(missionIds); + } Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } @@ -313,6 +359,11 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.error("分库同步主库数据当前操作的异常表名为:"+errorTableName); logger.error("分库数据同步主库发生错误 异常信息为:"+e.getMessage()); logger.error("分库数据同步主库发生错误 异常信息",e); + // 如果出现异常信息 清楚线程变量 不用进行统计 + SyncData.removeThreadlocalUpdateMissionIds(); + SyncData.removeThreadlocalInsertMissionIds(); + SyncData.removeThreadlocalLoopUpdateMissionIds(); + SyncData.removeThreadlocalLoopInsertMissionIds(); e.printStackTrace(); } }