周期任务统计功能以及bug修复
This commit is contained in:
@@ -7,6 +7,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.jfinal.aop.Interceptor;
|
import com.jfinal.aop.Interceptor;
|
||||||
@@ -185,7 +186,7 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
|||||||
* 处理非周期任务的统计功能
|
* 处理非周期任务的统计功能
|
||||||
*/
|
*/
|
||||||
Set<Long> insertMissionIds = SyncData.getThreadlocalInsertMissionIds();
|
Set<Long> insertMissionIds = SyncData.getThreadlocalInsertMissionIds();
|
||||||
logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(insertMissionIds));
|
logger.info("即将进行统计的所有新增任务的mission_id为:"+JSON.toJSONString(insertMissionIds));
|
||||||
|
|
||||||
Set<Long> updateMissionIds = SyncData.getThreadlocalUpdateMissionIds();
|
Set<Long> updateMissionIds = SyncData.getThreadlocalUpdateMissionIds();
|
||||||
logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds));
|
logger.info("即将进行统计的所有修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds));
|
||||||
@@ -208,11 +209,29 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
|||||||
/**
|
/**
|
||||||
* 处理周期任务的统计功能
|
* 处理周期任务的统计功能
|
||||||
*/
|
*/
|
||||||
|
Set<Long> loopInsertCurMissionIds = SyncData.getThreadlocalLoopInsertMissionIds();
|
||||||
|
logger.info("即将进行统计的所有周期新增任务的mission_id为:"+JSON.toJSONString(insertMissionIds));
|
||||||
|
|
||||||
|
Set<Long> loopUpdateCurMissionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
|
||||||
|
logger.info("即将进行统计的所有周期修改任务的mission_id为:"+JSON.toJSONString(updateMissionIds));
|
||||||
|
|
||||||
|
if(loopInsertCurMissionIds.size()>0) {
|
||||||
|
for (Long missionId : loopInsertCurMissionIds) {
|
||||||
|
StatisticalLoopHandle(missionId,true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(loopUpdateCurMissionIds.size()>0) {
|
||||||
|
for (Long missionId : loopUpdateCurMissionIds) {
|
||||||
|
StatisticalLoopHandle(missionId,false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 统计结束后 清空threadLocal中的值
|
// 统计结束后 清空threadLocal中的值
|
||||||
SyncData.removeThreadlocalUpdateMissionIds();
|
SyncData.removeThreadlocalUpdateMissionIds();
|
||||||
SyncData.removeThreadlocalInsertMissionIds();
|
SyncData.removeThreadlocalInsertMissionIds();
|
||||||
|
SyncData.removeThreadlocalLoopUpdateMissionIds();
|
||||||
|
SyncData.removeThreadlocalLoopInsertMissionIds();
|
||||||
logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------");
|
logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------");
|
||||||
}catch(Exception e){
|
}catch(Exception e){
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
@@ -303,7 +322,97 @@ public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
|||||||
/**
|
/**
|
||||||
* 处理周期任务的统计功能
|
* 处理周期任务的统计功能
|
||||||
*/
|
*/
|
||||||
public void StatisticalLoopHandle(Long missionId) {
|
public void StatisticalLoopHandle(Long curMissionId,boolean isInsert) {
|
||||||
|
Record loopmissionStateTableInfo = Db.use().findFirst("select * from loopmission_state_table where cur_mission_id = ? ",curMissionId);
|
||||||
|
String missionDesc="";
|
||||||
|
Integer status=null;
|
||||||
|
String autoDesc="";
|
||||||
|
Record missionTableInfo =null;
|
||||||
|
Integer missionState = null;
|
||||||
|
// 判断任务状态为1 根据同步结果数据 修改为状态2 正在执行
|
||||||
|
if(null!=loopmissionStateTableInfo) {
|
||||||
|
missionTableInfo = Db.use().findFirst("select * from mission_state_table where mission_id = ? ",loopmissionStateTableInfo.getLong("mission_id"));
|
||||||
|
if(null!=missionTableInfo&&missionTableInfo.getInt("mission_state").equals(1)&&isInsert) {
|
||||||
|
missionTableInfo.set("mission_state",2);
|
||||||
|
if(null==missionTableInfo.getStr("auto_desc")) {
|
||||||
|
missionTableInfo.set("auto_desc", format.format(System.currentTimeMillis())+"i18n_server.LoadNewMissionThread.missionStart_n81i");
|
||||||
|
}
|
||||||
|
Db.update("mission_state_table", missionTableInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 修改周期任务的执行状态
|
||||||
|
if(null!=loopmissionStateTableInfo) {
|
||||||
|
Record result = Db.use().findFirst("select lmst.mission_id missionId, t.ok, t.fail, t.total\r\n" +
|
||||||
|
"from (select mrt.mission_id, \r\n" +
|
||||||
|
"ifnull(sum(case mrt.result when 0 then 1 else 0 end), 0) ok, \r\n" +
|
||||||
|
"ifnull(sum(case mrt.result when 1 then 1 when -1 then 1 else 0 end), 0) fail, \r\n" +
|
||||||
|
"count(mrt.seq_id) total \r\n" +
|
||||||
|
"from mission_result_table4 mrt \r\n" +
|
||||||
|
"group by mrt.mission_id) t \r\n" +
|
||||||
|
"left join (select max(lst.cur_mission_id) cur_mission_id, lst.mission_id \r\n" +
|
||||||
|
"from mission_result_table4 mrt4 \r\n" +
|
||||||
|
"left join loopmission_state_table lst on mrt4.mission_id = lst.cur_mission_id \r\n" +
|
||||||
|
"group by lst.mission_id) lmst \r\n" +
|
||||||
|
"on lmst.cur_mission_id = t.mission_id \r\n" +
|
||||||
|
"left join mission_state_table mst \r\n" +
|
||||||
|
"on mst.mission_id = lmst.mission_id \r\n" +
|
||||||
|
"where lmst.cur_mission_id is not null and mst.is_loop=1 \r\n" +
|
||||||
|
"and mst.mission_id = ? ", loopmissionStateTableInfo.getLong("mission_id"));
|
||||||
|
if(null!=result) {
|
||||||
|
if(null!=result.get("missionId")&&result.getStr("missionId").length()>0) {
|
||||||
|
Long ok = result.getLong("ok");
|
||||||
|
Long fail =result.getLong("fail");
|
||||||
|
Long total = result.getLong("total");
|
||||||
|
missionDesc="i18n_server.UpgradeService.sql.cycle_n81i "+total+" i18n_server.UpgradeService.sql.executeNode2_n81i,</br>"+(total-ok-fail)+" i18n_server.UpgradeService.sql.unexecute_n81i,</br>"+(ok+fail)+" i18n_server.UpgradeService.sql.execute_n81i【i18n_server.UpgradeService.sql.success_n81i "+ok+" i18n_sserver.UpgradeService.sql.failed_n81i "+fail+" 】";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(null!=loopmissionStateTableInfo&&!isInsert) {
|
||||||
|
missionState = loopmissionStateTableInfo.getInt("mission_state");
|
||||||
|
Integer missionTableInfoState = missionTableInfo.getInt("mission_state");
|
||||||
|
switch(missionState) {
|
||||||
|
case(3):
|
||||||
|
Record findFirst = Db.use().findFirst("select COUNT(*) count from loopmission_state_table where mission_id = ? and mission_state=0",loopmissionStateTableInfo.getLong("mission_id"));
|
||||||
|
if(findFirst.getInt("count").equals(0)&&missionTableInfoState!=3) {
|
||||||
|
status=3;
|
||||||
|
autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i";
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case(4):
|
||||||
|
if(missionTableInfoState!=4) {
|
||||||
|
status=4;
|
||||||
|
autoDesc=format.format(System.currentTimeMillis())+" Task execution failure ";
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case(6):
|
||||||
|
if(missionTableInfoState!=6) {
|
||||||
|
status=6;
|
||||||
|
autoDesc=format.format(System.currentTimeMillis())+"Task revocation start execute";
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case(7):
|
||||||
|
if(missionTableInfoState!=7) {
|
||||||
|
status=7;
|
||||||
|
autoDesc=format.format(System.currentTimeMillis())+"i18n_server.UpgradeService.sql.revoke_n81i";
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(null!=loopmissionStateTableInfo) {
|
||||||
|
Record missionStateTableResult =new Record();
|
||||||
|
missionStateTableResult.set("mission_id", loopmissionStateTableInfo.get("mission_id"));
|
||||||
|
if(status!=null) {
|
||||||
|
missionStateTableResult.set("mission_state", status);
|
||||||
|
}
|
||||||
|
if(missionDesc.length()>0&&null!=missionState&&missionState!=6&&missionState!=7) {
|
||||||
|
missionStateTableResult.set("mission_state_desc",missionDesc);
|
||||||
|
}
|
||||||
|
if(autoDesc.length()>0) {
|
||||||
|
missionStateTableResult.set("auto_desc",autoDesc);
|
||||||
|
}
|
||||||
|
Db.use("masterDataSource").update("mission_state_table","mission_id",missionStateTableResult);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,8 +26,14 @@ import com.nms.thread.SyncThread;
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class SyncData{
|
public class SyncData{
|
||||||
|
// 线程变量记录任务结果修改的任务id
|
||||||
private static final ThreadLocal<Set<Long>> threadLocalUpdateMissionIds = new ThreadLocal<Set<Long>>();
|
private static final ThreadLocal<Set<Long>> threadLocalUpdateMissionIds = new ThreadLocal<Set<Long>>();
|
||||||
|
// 线程变量记录任务结果新增的任务id
|
||||||
private static final ThreadLocal<Set<Long>> threadLocalInsertMissionIds = new ThreadLocal<Set<Long>>();
|
private static final ThreadLocal<Set<Long>> threadLocalInsertMissionIds = new ThreadLocal<Set<Long>>();
|
||||||
|
// 线程变量记录周期任务结果修改的任务id
|
||||||
|
private static final ThreadLocal<Set<Long>> threadLocalLoopUpdateMissionIds = new ThreadLocal<Set<Long>>();
|
||||||
|
// 线程变量记录周期任务结果新增的任务id
|
||||||
|
private static final ThreadLocal<Set<Long>> threadLocalLoopInsertMissionIds = new ThreadLocal<Set<Long>>();
|
||||||
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
@@ -100,4 +106,36 @@ public class SyncData{
|
|||||||
public static void removeThreadlocalInsertMissionIds() {
|
public static void removeThreadlocalInsertMissionIds() {
|
||||||
threadLocalInsertMissionIds.remove();
|
threadLocalInsertMissionIds.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public static Set<Long> getThreadlocalLoopUpdateMissionIds() {
|
||||||
|
Set<Long> set = threadLocalLoopUpdateMissionIds.get();
|
||||||
|
if(null==set) {
|
||||||
|
set=new HashSet<Long>();
|
||||||
|
}
|
||||||
|
return set;
|
||||||
|
}
|
||||||
|
public static void setThreadlocalLoopUpdateMissionIds(Set<Long> set) {
|
||||||
|
threadLocalLoopUpdateMissionIds.set(set);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void removeThreadlocalLoopUpdateMissionIds() {
|
||||||
|
threadLocalLoopUpdateMissionIds.remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Set<Long> getThreadlocalLoopInsertMissionIds() {
|
||||||
|
Set<Long> set = threadLocalLoopInsertMissionIds.get();
|
||||||
|
if(null==set) {
|
||||||
|
set=new HashSet<Long>();
|
||||||
|
}
|
||||||
|
return set;
|
||||||
|
}
|
||||||
|
public static void setThreadlocalLoopInsertMissionIds(Set<Long> set) {
|
||||||
|
threadLocalLoopInsertMissionIds.set(set);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void removeThreadlocalLoopInsertMissionIds() {
|
||||||
|
threadLocalLoopInsertMissionIds.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
entity.remove(record.getStr("id_name"));
|
entity.remove(record.getStr("id_name"));
|
||||||
}
|
}
|
||||||
SyncData.setThreadlocalInsertMissionIds(set);
|
SyncData.setThreadlocalInsertMissionIds(set);
|
||||||
}else {
|
}else if(record.getInt("mode").equals(2)){
|
||||||
for(Record entity:data) {
|
for(Record entity:data) {
|
||||||
entity.remove(record.getStr("id_name"));
|
entity.remove(record.getStr("id_name"));
|
||||||
}
|
}
|
||||||
@@ -121,6 +121,45 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}else if(record.getStr("table_name").equals("loopmission_state_table")){
|
||||||
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
|
@Override
|
||||||
|
public boolean run() throws SQLException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return Db.use("masterDataSource").tx(new IAtom() {
|
||||||
|
@Override
|
||||||
|
public boolean run() throws SQLException {
|
||||||
|
Set<Long> set = SyncData.getThreadlocalLoopInsertMissionIds();
|
||||||
|
final List<Record> insertDatas=new ArrayList<Record>();
|
||||||
|
final List<Record> updateDatas=new ArrayList<Record>();
|
||||||
|
for(Record entity:data) {
|
||||||
|
// 循环遍历数据 判断当前数据是新增还是修改
|
||||||
|
Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
|
||||||
|
if(null!=findFirst) {
|
||||||
|
updateDatas.add(entity);
|
||||||
|
}else {
|
||||||
|
set.add(entity.getLong(record.getStr("id_name")));
|
||||||
|
insertDatas.add(entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SyncData.setThreadlocalLoopInsertMissionIds(set);
|
||||||
|
if(insertDatas.size()>0) {
|
||||||
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
||||||
|
}
|
||||||
|
if(updateDatas.size()>0) {
|
||||||
|
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
||||||
|
updateDatas, record.getInt("batch_size"));
|
||||||
|
}
|
||||||
|
logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
|
||||||
|
logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
|
||||||
|
record.set("last_id", lastInsertId);
|
||||||
|
record.set("last_date", new Date());
|
||||||
|
Db.use(url.toString()).update("table_sync_info", record);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
}else {
|
}else {
|
||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
@@ -268,6 +307,13 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name")));
|
updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(record.getStr("table_name").equals("loopmission_state_table")) {
|
||||||
|
Set<Long> missionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
|
||||||
|
for(Record updateData:updateDatas) {
|
||||||
|
missionIds.add(updateData.getLong(record.getStr("id_name")));
|
||||||
|
}
|
||||||
|
SyncData.setThreadlocalLoopUpdateMissionIds(missionIds);
|
||||||
|
}
|
||||||
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
||||||
updateDatas, record.getInt("batch_size"));
|
updateDatas, record.getInt("batch_size"));
|
||||||
}
|
}
|
||||||
@@ -313,6 +359,11 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
logger.error("分库同步主库数据当前操作的异常表名为:"+errorTableName);
|
logger.error("分库同步主库数据当前操作的异常表名为:"+errorTableName);
|
||||||
logger.error("分库数据同步主库发生错误 异常信息为:"+e.getMessage());
|
logger.error("分库数据同步主库发生错误 异常信息为:"+e.getMessage());
|
||||||
logger.error("分库数据同步主库发生错误 异常信息",e);
|
logger.error("分库数据同步主库发生错误 异常信息",e);
|
||||||
|
// 如果出现异常信息 清楚线程变量 不用进行统计
|
||||||
|
SyncData.removeThreadlocalUpdateMissionIds();
|
||||||
|
SyncData.removeThreadlocalInsertMissionIds();
|
||||||
|
SyncData.removeThreadlocalLoopUpdateMissionIds();
|
||||||
|
SyncData.removeThreadlocalLoopInsertMissionIds();
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user