From 0a5c6f53b7710a76d0ff1ed70a6387d5c557b8ee Mon Sep 17 00:00:00 2001 From: default Date: Fri, 9 Nov 2018 14:05:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=87=AA=E5=8A=A8=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../nms/thread/SyncSlaveToMasterThread.java | 13 +++++++++- nms_sync/src/com/nms/thread/SyncThread.java | 24 ++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index 907f4b3..93f31d1 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -48,6 +48,11 @@ public class SyncSlaveToMasterThread implements Runnable{ for (final Record record : find) { //logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name")); errorTableName=record.getStr("table_name"); + //针对个别特殊表动态条件 + String special=""; + if(record.getStr("table_name").equals("node_table")) { + special=" and sync_status!=1 "; + } //如果设定指定字段 则只操作指定字段数据 无则操作全部 final StringBuffer columns=new StringBuffer(); columns.append("*"); @@ -64,7 +69,7 @@ public class SyncSlaveToMasterThread implements Runnable{ // 新增操作 取出最后更新id信息 查询增量数据 final List data = Db.use(url.toString()) .find("select "+columns.toString()+" from " + record.getStr("table_name") + " where " - + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), + + record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), record.getLong("last_id")); //logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { @@ -81,6 +86,12 @@ public class SyncSlaveToMasterThread implements Runnable{ entity.remove(record.getStr("id_name")); } } + + if(record.getStr("table_name").equals("node_table")) { + for(Record entity:data) { + entity.set("sync_status",1); + } + } // 针对监测结果表的id值 自动生成处理 if("detection_info_new".equals(record.getStr("table_name"))) { Db.use(url.toString()).tx(new IAtom() { diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index a08717e..19f0bdb 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -51,6 +51,11 @@ public class SyncThread implements Runnable { for (final Record record : find) { //logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name")); errorTableName=record.getStr("table_name"); + //针对个别特殊表动态条件 + String special=""; + if(record.getStr("table_name").equals("node_table")) { + special=" and sync_status!=1 "; + } //如果设定指定字段 则只操作指定字段数据 无则操作全部 final StringBuffer columns=new StringBuffer(); columns.append("*"); @@ -68,7 +73,7 @@ public class SyncThread implements Runnable { // 查询增量数据 final List data =Db.use("masterDataSource") .find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where " - + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"), + + record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"), record.getLong("last_id")); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { @@ -78,6 +83,11 @@ public class SyncThread implements Runnable { entity.remove(record.getStr("id_name")); } } + if(record.getStr("table_name").equals("node_table")) { + for(Record entity:data) { + entity.set("sync_status",1); + } + } //多数据源事务 主数据源嵌套子数据源 Db.use().tx(new IAtom() { @Override @@ -206,6 +216,18 @@ public class SyncThread implements Runnable { } Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); }else { + if(record.getStr("table_name").equals("node_table")) { + for(Record updateData:updateDatas) { + updateData.remove("sync_status"); + } + }else if(record.getStr("table_name").equals("mission_state_table")) { + // 主库同步任务状态表 如果状态修改不是周期停用的话 将状态字段的修改过滤 + for(Record updateData:updateDatas) { + if(!(updateData.getInt("mission_state").equals(5)||updateData.getInt("mission_state").equals(1))) { + updateData.remove("mission_state"); + } + } + } Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); }