264 lines
12 KiB
Java
264 lines
12 KiB
Java
package com.nms.interceptor;
|
||
|
||
|
||
|
||
import java.text.SimpleDateFormat;
|
||
import java.util.ArrayList;
|
||
import java.util.HashSet;
|
||
import java.util.List;
|
||
import java.util.Set;
|
||
|
||
import org.apache.log4j.Logger;
|
||
import com.alibaba.fastjson.JSON;
|
||
import com.jfinal.aop.Interceptor;
|
||
import com.jfinal.aop.Invocation;
|
||
import com.jfinal.plugin.activerecord.Db;
|
||
import com.jfinal.plugin.activerecord.Record;
|
||
import com.nms.main.SyncData;
|
||
import com.nms.model.SyncDbInfo;
|
||
import com.nms.thread.SyncSlaveToMasterThread;
|
||
|
||
|
||
public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
||
private Logger logger =Logger.getLogger(this.getClass());
|
||
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //Java Date 类型数据格式化格式
|
||
|
||
@Override
|
||
public void intercept(Invocation inv) {
|
||
try{
|
||
logger.info("---------SyncMissionResultStatisticalInterceptor拦截器拦截开始------------");
|
||
inv.invoke();
|
||
|
||
/**
|
||
* 处理非周期任务的统计功能
|
||
*/
|
||
Set<Long> insertMissionIds = SyncData.getThreadlocalInsertMissionIds();
|
||
logger.info("即将进行统计的所有新增任务的mission_id为:"+JSON.toJSONString(insertMissionIds));
|
||
|
||
Set<Long> updateMissionIds = SyncData.getThreadlocalUpdateMissionIds();
|
||
logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds));
|
||
|
||
|
||
|
||
//根据统计结果更新mission_state_talbe表对应任务的状态
|
||
if(insertMissionIds.size()>0) {
|
||
for (Long missionId : insertMissionIds) {
|
||
StatisticalHandle(missionId,true);
|
||
}
|
||
}
|
||
|
||
if(updateMissionIds.size()>0) {
|
||
for (Long missionId : updateMissionIds) {
|
||
StatisticalHandle(missionId,false);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理周期任务的统计功能
|
||
*/
|
||
Set<Long> loopInsertCurMissionIds = SyncData.getThreadlocalLoopInsertMissionIds();
|
||
logger.info("即将进行统计的所有周期新增任务的mission_id为:"+JSON.toJSONString(insertMissionIds));
|
||
|
||
Set<Long> loopUpdateCurMissionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
|
||
logger.info("即将进行统计的所有周期修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds));
|
||
|
||
if(loopInsertCurMissionIds.size()>0) {
|
||
for (Long missionId : loopInsertCurMissionIds) {
|
||
StatisticalLoopHandle(missionId,true);
|
||
}
|
||
}
|
||
|
||
if(loopUpdateCurMissionIds.size()>0) {
|
||
for (Long missionId : loopUpdateCurMissionIds) {
|
||
StatisticalLoopHandle(missionId,false);
|
||
}
|
||
}
|
||
|
||
// 统计结束后 清空threadLocal中的值
|
||
SyncData.removeThreadlocalUpdateMissionIds();
|
||
SyncData.removeThreadlocalInsertMissionIds();
|
||
SyncData.removeThreadlocalLoopUpdateMissionIds();
|
||
SyncData.removeThreadlocalLoopInsertMissionIds();
|
||
logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------");
|
||
}catch(Exception e){
|
||
e.printStackTrace();
|
||
logger.error("SyncMissionResultStatisticalInterceptor拦截器内部程序出现异常信息",e);
|
||
}
|
||
|
||
}
|
||
|
||
/**
|
||
* 处理非周期任务的统计功能
|
||
*/
|
||
public void StatisticalHandle(Long missionId,boolean isInsert) {
|
||
logger.info("根据当前任务id为:"+missionId+"开始统计");
|
||
// mission_state_table 状态值
|
||
Integer status = null;
|
||
List<Record> results = Db.use().find("select result from (\r\n" +
|
||
"(select result,mission_id from mission_result_table1 mrt) union all \r\n" +
|
||
"(select result,mission_id from mission_result_table4 mrt) union all \r\n" +
|
||
"(select result,mission_id from mission_result_table6 mrt)\r\n" +
|
||
") t \r\n" +
|
||
"left join mission_state_table mst on mst.mission_id = t.mission_id \r\n" +
|
||
"where mst.is_loop = 0 and mst.mission_id=?",missionId);
|
||
boolean noThree=true;
|
||
if(null!=results&&results.size()>0) {
|
||
for (Record record : results) {
|
||
if(record.getInt("result")==3) {
|
||
noThree=false;
|
||
}
|
||
}
|
||
}
|
||
// 判断任务结果有没有状态值为3的 如果有 则任务状态为在下发
|
||
Record result = Db.use().findFirst("select t.mission_id,t.ok,t.fail,t.total from (\r\n" +
|
||
"(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table1 mrt group by mrt.mission_id) union all \r\n" +
|
||
"(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table4 mrt group by mrt.mission_id) union all \r\n" +
|
||
"(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table6 mrt group by mrt.mission_id)\r\n" +
|
||
") t \r\n" +
|
||
"left join mission_state_table mst on mst.mission_id = t.mission_id \r\n" +
|
||
"where mst.is_loop = 0 and t.mission_id = ?",missionId);
|
||
if(null!=result) {
|
||
Integer okCount = result.getInt("ok");
|
||
Integer failCount =result.getInt("fail");
|
||
Integer total =result.getInt("total");
|
||
|
||
|
||
if(okCount+failCount==total) {
|
||
if(failCount==0) {
|
||
status=30;
|
||
}else if(okCount==0) {
|
||
status=31;
|
||
}else {
|
||
status=32;
|
||
}
|
||
}
|
||
|
||
if(!noThree) {
|
||
status=2;
|
||
}
|
||
|
||
logger.info("统计完成 修改mission_state_table状态为:? 总个数:?执行数:? 成功:? 失败:?");
|
||
|
||
String missionStateAutoDesc=null;
|
||
if(isInsert) {
|
||
Record missionStateTable = Db.use().findFirst("select * from mission_state_table where mission_id = ?",missionId);
|
||
if(null!=missionStateTable&&null!=missionStateTable.get("auto_desc")&&missionStateTable.getStr("auto_desc").length()>0) {
|
||
return;
|
||
}
|
||
missionStateAutoDesc=format.format(System.currentTimeMillis())+"i18n_server.MissionConstants.NOTICE_TASK_RUNNING_n81i </br>";
|
||
}
|
||
if(okCount+failCount==total) {
|
||
missionStateAutoDesc=format.format(System.currentTimeMillis())+" i18n_sserver.UpgradeService.sql.complate_n81i "+total+" i18n_sserver.UpgradeService.sql.executeNode_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount;
|
||
}
|
||
String missionStateDesc="\r\n" +
|
||
"i18n_server.UpgradeService.sql.total_n81i "+total+" i18n_server.UpgradeService.sql.executeNode2_n81i,</br> "+(total-okCount-failCount)+" i18n_server.UpgradeService.sql.unexecute_n81i,</br>"+(okCount+failCount)+" i18n_server.UpgradeService.sql.execute_n81i【i18n_server.UpgradeService.sql.success_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount+"】";
|
||
|
||
Record missionStateTableResult =new Record();
|
||
missionStateTableResult.set("mission_id", missionId);
|
||
missionStateTableResult.set("mission_state", status);
|
||
missionStateTableResult.set("mission_state_desc", missionStateDesc);
|
||
if(missionStateAutoDesc!=null) {
|
||
missionStateTableResult.set("auto_desc", missionStateAutoDesc);
|
||
}
|
||
Db.use("masterDataSource").update("mission_state_table","mission_id",missionStateTableResult);
|
||
}
|
||
logger.info("修改mission_state_table信息完成");
|
||
}
|
||
|
||
|
||
/**
|
||
* 处理周期任务的统计功能
|
||
*/
|
||
public void StatisticalLoopHandle(Long curMissionId,boolean isInsert) {
|
||
Record loopmissionStateTableInfo = Db.use().findFirst("select * from loopmission_state_table where cur_mission_id = ? ",curMissionId);
|
||
String missionDesc="";
|
||
Integer status=null;
|
||
String autoDesc="";
|
||
Record missionTableInfo =null;
|
||
Integer missionState = null;
|
||
// 判断任务状态为1 根据同步结果数据 修改为状态2 正在执行
|
||
if(null!=loopmissionStateTableInfo) {
|
||
missionTableInfo = Db.use().findFirst("select * from mission_state_table where mission_id = ? ",loopmissionStateTableInfo.getLong("mission_id"));
|
||
if(null!=missionTableInfo&&missionTableInfo.getInt("mission_state").equals(1)&&isInsert) {
|
||
missionTableInfo.set("mission_state",2);
|
||
if(null==missionTableInfo.getStr("auto_desc")) {
|
||
missionTableInfo.set("auto_desc", format.format(System.currentTimeMillis())+"i18n_server.LoadNewMissionThread.missionStart_n81i");
|
||
}
|
||
Db.update("mission_state_table", missionTableInfo);
|
||
}
|
||
|
||
// 修改周期任务的执行状态
|
||
Record result = Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
|
||
"from (select mrt.mission_id, \r\n" +
|
||
"ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" +
|
||
"ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" +
|
||
"count(mrt.seq_id) total \r\n" +
|
||
"from mission_result_table4 mrt \r\n" +
|
||
"group by mrt.mission_id) t \r\n" +
|
||
"left join (select max(lst.cur_mission_id) cur_mission_id, lst.mission_id \r\n" +
|
||
"from mission_result_table4 mrt4 \r\n" +
|
||
"left join loopmission_state_table lst on mrt4.mission_id = lst.cur_mission_id \r\n" +
|
||
"group by lst.mission_id) lmst \r\n" +
|
||
"on lmst.cur_mission_id = t.mission_id \r\n" +
|
||
"left join mission_state_table mst \r\n" +
|
||
"on mst.mission_id = lmst.mission_id \r\n" +
|
||
"where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
|
||
"and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
|
||
if(null!=result) {
|
||
if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) {
|
||
Long ok = result.getLong("ok");
|
||
Long fail =result.getLong("fail");
|
||
Long total = result.getLong("total");
|
||
missionDesc="i18n_server.UpgradeService.sql.cycle_n81i "+total+" i18n_server.UpgradeService.sql.executeNode2_n81i,</br>"+(total-ok-fail)+" i18n_server.UpgradeService.sql.unexecute_n81i,</br>"+(ok+fail)+" i18n_server.UpgradeService.sql.execute_n81i【i18n_server.UpgradeService.sql.success_n81i "+ok+" i18n_sserver.UpgradeService.sql.failed_n81i "+fail+" 】";
|
||
}
|
||
|
||
}
|
||
|
||
if(isInsert) {
|
||
missionState = loopmissionStateTableInfo.getInt("mission_state");
|
||
Integer missionTableInfoState = missionTableInfo.getInt("mission_state");
|
||
switch(missionState) {
|
||
case(3):
|
||
Record findFirst = Db.use().findFirst("select COUNT(*) count from loopmission_state_table where mission_id = ? and mission_state=0",loopmissionStateTableInfo.getLong("mission_id"));
|
||
if(findFirst.getInt("count").equals(0)&&missionTableInfoState!=3) {
|
||
status=3;
|
||
autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i";
|
||
}
|
||
break;
|
||
case(4):
|
||
if(missionTableInfoState!=4) {
|
||
status=4;
|
||
autoDesc=format.format(System.currentTimeMillis())+" Task execution failure ";
|
||
}
|
||
break;
|
||
case(6):
|
||
if(missionTableInfoState!=6) {
|
||
status=6;
|
||
autoDesc=format.format(System.currentTimeMillis())+"Task revocation start execute";
|
||
}
|
||
break;
|
||
case(7):
|
||
if(missionTableInfoState!=7) {
|
||
status=7;
|
||
autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i";
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
|
||
Record missionStateTableResult =new Record();
|
||
missionStateTableResult.set("mission_id", loopmissionStateTableInfo.get("mission_id"));
|
||
if(status!=null) {
|
||
missionStateTableResult.set("mission_state", status);
|
||
}
|
||
if(missionDesc.length()>0&&null!=missionState&&missionState!=6&&missionState!=7) {
|
||
missionStateTableResult.set("mission_state_desc",missionDesc);
|
||
}
|
||
if(autoDesc.length()>0) {
|
||
missionStateTableResult.set("auto_desc",autoDesc);
|
||
}
|
||
Db.use("masterDataSource").update("mission_state_table","mission_id",missionStateTableResult);
|
||
}
|
||
}
|
||
}
|