package com.nms.interceptor; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.log4j.Logger; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Interceptor; import com.jfinal.aop.Invocation; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.nms.main.SyncData; import com.nms.model.SyncDbInfo; import com.nms.thread.SyncSlaveToMasterThread; public class SyncMissionResultStatisticalInterceptor implements Interceptor{ private Logger logger =Logger.getLogger(this.getClass()); private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //Java Date 类型数据格式化格式 @Override public void intercept(Invocation inv) { try{ /* //创建一个任务id集合 存储任务结果改变的任务id 这些任务统一走新的统计方法 修改mission_state_table状态 //新增或者修改的结果可能是多条 但是任务id相同 用set进行去重 Set missionIds =new HashSet(); //同步前 查询出最后任务结果数据信息ID 用于查出新任务结果数据或者修改的数据信息 SyncSlaveToMasterThread target = inv.getTarget(); SyncDbInfo syncDbInfo = target.getSyncDbInfo(); String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"; logger.info("当前数据库连接为 "+url); Record beforeInsertMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeInsertMissionResultTable1)); Record beforeInsertMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeInsertMissionResultTable4)); Record beforeInsertMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeInsertMissionResultTable6)); Record beforeUpdateMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable1)); Record beforeUpdateMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable4)); Record beforeUpdateMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable6)); */ /* Record afterInsertMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable1)); Record afterInsertMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable4)); Record afterInsertMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=1 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable6)); if(null!=beforeInsertMissionResultTable1&&null!=afterInsertMissionResultTable1) { List find = Db.use(url.toString()).find("select * from mission_result_table1 where id>= ? and id <= ? ",beforeInsertMissionResultTable1.get("last_id"),afterInsertMissionResultTable1.get("last_id")); if(null!=find && find.size()>0) { for (Record record : find) { missionIds.add(record.getLong("mission_id")); } } } if(null!=beforeInsertMissionResultTable4&&null!=afterInsertMissionResultTable4) { List find = Db.use(url.toString()).find("select * from mission_result_table4 where id>= ? and id <= ? ",beforeInsertMissionResultTable4.get("last_id"),afterInsertMissionResultTable4.get("last_id")); if(null!=find && find.size()>0) { for (Record record : find) { missionIds.add(record.getLong("mission_id")); } } } if(null!=beforeInsertMissionResultTable6&&null!=afterInsertMissionResultTable6) { List find = Db.use(url.toString()).find("select * from mission_result_table6 where id>= ? and id <= ? ",beforeInsertMissionResultTable6.get("last_id"),afterInsertMissionResultTable6.get("last_id")); if(null!=find && find.size()>0) { for (Record record : find) { missionIds.add(record.getLong("mission_id")); } } } Record afterUpdateMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable1)); Record afterUpdateMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable4)); Record afterUpdateMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=2 and db_id=-1"); logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable6)); if(null!=beforeUpdateMissionResultTable1&&null!=afterUpdateMissionResultTable1) { List updateIds = new ArrayList(); List updateDatas = Db.use(url.toString()).find("select * from table_event_log where table_name='mission_result_table1' and id>= ? and id <= ?",beforeUpdateMissionResultTable1.get("last_id"),afterUpdateMissionResultTable1.get("last_id")); StringBuilder handleStr=new StringBuilder(); if(null!=updateDatas && updateDatas.size()>0) { for (int i=0;i updateDataInfos = Db.use(url.toString()) .find(" select mission_id from mission_result_table1 where id in (" + handleStr + ") ", updateIds.toArray()); if (updateDataInfos != null && updateDataInfos.size() > 0) { for (Record record : updateDataInfos) { missionIds.add(record.getLong("mission_id")); } } } } if(null!=beforeUpdateMissionResultTable4&&null!=afterUpdateMissionResultTable4) { List updateIds = new ArrayList(); List updateDatas = Db.use(url.toString()).find("select * from table_event_log where table_name='mission_result_table4' and id>= ? and id <= ?",beforeUpdateMissionResultTable4.get("last_id"),afterUpdateMissionResultTable4.get("last_id")); StringBuilder handleStr=new StringBuilder(); if(null!=updateDatas && updateDatas.size()>0) { for (int i=0;i updateDataInfos = Db.use(url.toString()) .find(" select mission_id from mission_result_table4 where id in (" + handleStr + ") ", updateIds.toArray()); if (updateDataInfos != null && updateDataInfos.size() > 0) { for (Record record : updateDataInfos) { missionIds.add(record.getLong("mission_id")); } } } } if(null!=beforeUpdateMissionResultTable6&&null!=afterUpdateMissionResultTable6) { List updateIds = new ArrayList(); List updateDatas = Db.use(url.toString()).find("select * from table_event_log where table_name='mission_result_table6' and id>= ? and id <= ?",beforeUpdateMissionResultTable1.get("last_id"),afterUpdateMissionResultTable1.get("last_id")); StringBuilder handleStr=new StringBuilder(); if(null!=updateDatas && updateDatas.size()>0) { for (int i=0;i updateDataInfos = Db.use(url.toString()) .find(" select mission_id from mission_result_table6 where id in (" + handleStr + ") ", updateIds.toArray()); if (updateDataInfos != null && updateDataInfos.size() > 0) { for (Record record : updateDataInfos) { missionIds.add(record.getLong("mission_id")); } } } }*/ logger.info("---------SyncMissionResultStatisticalInterceptor拦截器拦截开始------------"); inv.invoke(); /** * 处理非周期任务的统计功能 */ Set insertMissionIds = SyncData.getThreadlocalInsertMissionIds(); logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(insertMissionIds)); Set updateMissionIds = SyncData.getThreadlocalUpdateMissionIds(); logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds)); //根据统计结果更新mission_state_talbe表对应任务的状态 if(insertMissionIds.size()>0) { for (Long missionId : insertMissionIds) { StatisticalHandle(missionId,true); } } if(updateMissionIds.size()>0) { for (Long missionId : updateMissionIds) { StatisticalHandle(missionId,false); } } /** * 处理周期任务的统计功能 */ // 统计结束后 清空threadLocal中的值 SyncData.removeThreadlocalUpdateMissionIds(); SyncData.removeThreadlocalInsertMissionIds(); logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------"); }catch(Exception e){ e.printStackTrace(); logger.error("SyncMissionResultStatisticalInterceptor拦截器内部程序出现异常信息",e); } } /** * 处理非周期任务的统计功能 */ public void StatisticalHandle(Long missionId,boolean isInsert) { logger.info("根据当前任务id为:"+missionId+"开始统计"); // mission_state_table 状态值 Integer status = null; List results = Db.use().find("select result from (\r\n" + "(select result,mission_id from mission_result_table1 mrt) union all \r\n" + "(select result,mission_id from mission_result_table4 mrt) union all \r\n" + "(select result,mission_id from mission_result_table6 mrt)\r\n" + ") t \r\n" + "left join mission_state_table mst on mst.mission_id = t.mission_id \r\n" + "where mst.is_loop = 0 and mst.mission_id=?",missionId); boolean noThree=true; if(null!=results&&results.size()>0) { for (Record record : results) { if(record.getInt("result")==3) { noThree=false; } } } // 判断任务结果有没有状态值为3的 如果有 则任务状态为在下发 Record result = Db.use().findFirst("select t.mission_id,t.ok,t.fail,t.total from (\r\n" + "(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table1 mrt group by mrt.mission_id) union all \r\n" + "(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table4 mrt group by mrt.mission_id) union all \r\n" + "(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table6 mrt group by mrt.mission_id)\r\n" + ") t \r\n" + "left join mission_state_table mst on mst.mission_id = t.mission_id \r\n" + "where mst.is_loop = 0 and t.mission_id = ?",missionId); if(null!=result) { Integer okCount = result.getInt("ok"); Integer failCount =result.getInt("fail"); Integer total =result.getInt("total"); if(okCount+failCount==total) { if(failCount==0) { status=30; }else if(okCount==0) { status=31; }else { status=32; } } if(!noThree) { status=2; } logger.info("统计完成 修改mission_state_table状态为:? 总个数:?执行数:? 成功:? 失败:?"); String missionStateAutoDesc=null; if(isInsert) { Record missionStateTable = Db.use().findFirst("select * from mission_state_table where mission_id = ?",missionId); if(null!=missionStateTable&&null!=missionStateTable.get("auto_desc")&&missionStateTable.getStr("auto_desc").length()>0) { return; } missionStateAutoDesc=format.format(System.currentTimeMillis())+"i18n_server.MissionConstants.NOTICE_TASK_RUNNING_n81i
"; } if(okCount+failCount==total) { missionStateAutoDesc=format.format(System.currentTimeMillis())+" i18n_sserver.UpgradeService.sql.complate_n81i "+total+" i18n_sserver.UpgradeService.sql.executeNode_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount; } String missionStateDesc="\r\n" + "i18n_server.UpgradeService.sql.total_n81i "+total+" i18n_server.UpgradeService.sql.executeNode2_n81i,
"+(total-okCount-failCount)+" i18n_server.UpgradeService.sql.unexecute_n81i,
"+(okCount+failCount)+" i18n_server.UpgradeService.sql.execute_n81i【i18n_server.UpgradeService.sql.success_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount+"】"; Record missionStateTableResult =new Record(); missionStateTableResult.set("mission_id", missionId); missionStateTableResult.set("mission_state", status); missionStateTableResult.set("mission_state_desc", missionStateDesc); if(missionStateAutoDesc!=null) { missionStateTableResult.set("auto_desc", missionStateAutoDesc); } Db.use("masterDataSource").update("mission_state_table","mission_id",missionStateTableResult); } logger.info("修改mission_state_table信息完成"); } /** * 处理周期任务的统计功能 */ public void StatisticalLoopHandle(Long missionId) { } }