diff --git a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java index d9c2e56..d4eabc2 100644 --- a/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java +++ b/nms_sync/src/com/nms/interceptor/SyncMissionResultStatisticalInterceptor.java @@ -13,6 +13,7 @@ import com.jfinal.aop.Interceptor; import com.jfinal.aop.Invocation; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; +import com.nms.main.SyncData; import com.nms.model.SyncDbInfo; import com.nms.thread.SyncSlaveToMasterThread; @@ -24,8 +25,8 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ @Override public void intercept(Invocation inv) { try{ - logger.info("---------SyncMissionResultStatisticalInterceptor拦截器拦截开始------------"); + /* //创建一个任务id集合 存储任务结果改变的任务id 这些任务统一走新的统计方法 修改mission_state_table状态 //新增或者修改的结果可能是多条 但是任务id相同 用set进行去重 @@ -55,10 +56,9 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ Record beforeUpdateMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable6)); + */ - - inv.invoke(); - + /* Record afterInsertMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable1)); @@ -176,16 +176,43 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ } } } - } + }*/ + + logger.info("---------SyncMissionResultStatisticalInterceptor拦截器拦截开始------------"); + inv.invoke(); + + /** + * 处理非周期任务的统计功能 + */ + Set insertMissionIds = SyncData.getThreadlocalInsertMissionIds(); + logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(insertMissionIds)); + + Set updateMissionIds = SyncData.getThreadlocalUpdateMissionIds(); + logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds)); + - logger.info("即将进行统计的所有任务的mission_id为:"+JSON.toJSONString(missionIds)); //根据统计结果更新mission_state_talbe表对应任务的状态 - if(missionIds.size()>0) { - for (Long missionId : missionIds) { - StatisticalHandle(missionId); + if(insertMissionIds.size()>0) { + for (Long missionId : insertMissionIds) { + StatisticalHandle(missionId,true); } } + + if(updateMissionIds.size()>0) { + for (Long missionId : updateMissionIds) { + StatisticalHandle(missionId,false); + } + } + + /** + * 处理周期任务的统计功能 + */ + + + // 统计结束后 清空threadLocal中的值 + SyncData.removeThreadlocalUpdateMissionIds(); + SyncData.removeThreadlocalInsertMissionIds(); logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------"); }catch(Exception e){ e.printStackTrace(); @@ -194,7 +221,10 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ } - public void StatisticalHandle(Long missionId) { + /** + * 处理非周期任务的统计功能 + */ + public void StatisticalHandle(Long missionId,boolean isInsert) { logger.info("根据当前任务id为:"+missionId+"开始统计"); // mission_state_table 状态值 Integer status = null; @@ -244,6 +274,13 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ logger.info("统计完成 修改mission_state_table状态为:? 总个数:?执行数:? 成功:? 失败:?"); String missionStateAutoDesc=null; + if(isInsert) { + Record missionStateTable = Db.use().findFirst("select * from mission_state_table where mission_id = ?",missionId); + if(null!=missionStateTable&&null!=missionStateTable.get("auto_desc")&&missionStateTable.getStr("auto_desc").length()>0) { + return; + } + missionStateAutoDesc=format.format(System.currentTimeMillis())+"i18n_server.MissionConstants.NOTICE_TASK_RUNNING_n81i
"; + } if(okCount+failCount==total) { missionStateAutoDesc=format.format(System.currentTimeMillis())+" i18n_sserver.UpgradeService.sql.complate_n81i "+total+" i18n_sserver.UpgradeService.sql.executeNode_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount; } @@ -261,4 +298,12 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{ } logger.info("修改mission_state_table信息完成"); } + + + /** + * 处理周期任务的统计功能 + */ + public void StatisticalLoopHandle(Long missionId) { + + } } diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java index 8589302..37a1a03 100644 --- a/nms_sync/src/com/nms/main/SyncData.java +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -1,5 +1,6 @@ package com.nms.main; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -25,8 +26,9 @@ import com.nms.thread.SyncThread; * */ public class SyncData{ + private static final ThreadLocal> threadLocalUpdateMissionIds = new ThreadLocal>(); + private static final ThreadLocal> threadLocalInsertMissionIds = new ThreadLocal>(); - private static final ThreadLocal> threadLocal = new ThreadLocal>(); public static void main(String[] args) { Logger logger = Logger.getLogger(SyncData.class); @@ -68,4 +70,34 @@ public class SyncData{ logger.info("获取同步记录信息失败 请检查数据库数据信息"); } } + + public static Set getThreadlocalUpdateMissionIds() { + Set set = threadLocalUpdateMissionIds.get(); + if(null==set) { + set=new HashSet(); + } + return set; + } + public static void setThreadlocalUpdateMissionIds(Set set) { + threadLocalUpdateMissionIds.set(set); + } + + public static void removeThreadlocalUpdateMissionIds() { + threadLocalUpdateMissionIds.remove(); + } + + public static Set getThreadlocalInsertMissionIds() { + Set set = threadLocalInsertMissionIds.get(); + if(null==set) { + set=new HashSet(); + } + return set; + } + public static void setThreadlocalInsertMissionIds(Set set) { + threadLocalInsertMissionIds.set(set); + } + + public static void removeThreadlocalInsertMissionIds() { + threadLocalInsertMissionIds.remove(); + } } diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index de50297..edd1782 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -4,9 +4,11 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Set; import org.apache.log4j.Logger; import com.nms.interceptor.SyncMissionResultStatisticalInterceptor; +import com.nms.main.SyncData; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Before; import com.jfinal.plugin.activerecord.Db; @@ -67,7 +69,14 @@ public class SyncSlaveToMasterThread implements Runnable{ //logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); - if(record.getInt("mode").equals(2)) { + if(record.getInt("mode").equals(2)&&record.getStr("table_name").contains("mission_result_table")) { + Set set = SyncData.getThreadlocalInsertMissionIds(); + for(Record entity:data) { + set.add(entity.getLong("mission_id")); + entity.remove(record.getStr("id_name")); + } + SyncData.setThreadlocalInsertMissionIds(set); + }else { for(Record entity:data) { entity.remove(record.getStr("id_name")); } @@ -245,9 +254,12 @@ public class SyncSlaveToMasterThread implements Runnable{ } Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size")); }else if(record.getStr("table_name").contains("mission_result_table")){ + Set missionIds = SyncData.getThreadlocalUpdateMissionIds(); for(Record updateData:updateDatas) { + missionIds.add(updateData.getLong("mission_id")); updateData.remove(record.getStr("id_name")); } + SyncData.setThreadlocalUpdateMissionIds(missionIds); Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size")); }else { if(record.getStr("table_name").equals("detection_info_new")) {