From ea505405634aeb193737fb5342ca6e4f9b4846cb Mon Sep 17 00:00:00 2001 From: default Date: Wed, 17 Oct 2018 18:59:31 +0800 Subject: [PATCH] =?UTF-8?q?update=20=20=E6=8C=87=E5=AE=9A=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=90=8D=E6=96=B0=E5=A2=9E=E6=88=96=E8=80=85=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/nms/thread/SyncSlaveToMasterThread.java | 13 ++++++++++--- nms_sync/src/com/nms/thread/SyncThread.java | 14 +++++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index f0be702..131905a 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -43,6 +43,13 @@ public class SyncSlaveToMasterThread implements Runnable{ //logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { + //如果设定指定字段 则只操作指定字段数据 无则操作全部 + final StringBuffer columns=new StringBuffer(); + columns.append("*"); + if(null!=record.getStr("columns")&&!"".equals(record.getStr("columns"))) { + columns.setLength(0); + columns.append(record.getStr("columns")); + } // 循环同步数据标识 boolean flag = true; // 判断表中的event事件 1代表insert 2代表update 3代表delete @@ -51,7 +58,7 @@ public class SyncSlaveToMasterThread implements Runnable{ while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 final List data = Db.use(url.toString()) - .find("select * from " + record.getStr("table_name") + " where " + .find("select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), record.getInt("last_id")); //logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); @@ -122,7 +129,7 @@ public class SyncSlaveToMasterThread implements Runnable{ } } List insertDatas = Db.use(url.toString()) - .find(" select * from " + record.getStr("table_name") + " where " + .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); for(Record insertData:insertDatas){ @@ -183,7 +190,7 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { List updateDatas = Db.use(url.toString()) - .find(" select * from " + record.getStr("table_name") + " where " + .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); //logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas)); diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index 2014ce5..5e11dd7 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -47,6 +47,13 @@ public class SyncThread implements Runnable { //logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { + //如果设定指定字段 则只操作指定字段数据 无则操作全部 + final StringBuffer columns=new StringBuffer(); + columns.append("*"); + if(null!=record.getStr("columns")&&!"".equals(record.getStr("columns"))) { + columns.setLength(0); + columns.append(record.getStr("columns")); + } // 循环同步数据标识 boolean flag = true; // 判断表中的event事件 1代表insert 2代表update 3代表delete @@ -56,7 +63,7 @@ public class SyncThread implements Runnable { while (flag) { // 查询增量数据 final List data =Db.use("masterDataSource") - .find("select * from " + record.getStr("table_name") + " where " + .find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"), record.getLong("last_id")); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); @@ -118,7 +125,7 @@ public class SyncThread implements Runnable { } } List insertDatas = Db.use("masterDataSource") - .find(" select * from " + record.getStr("table_name") + " where " + .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); for(Record insertData:insertDatas){ @@ -151,6 +158,7 @@ public class SyncThread implements Runnable { } } } else if (record.getInt("event") == 2 || record.getInt("event") == 3) { + // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 while (flag) { final List datas = Db.find( @@ -180,7 +188,7 @@ public class SyncThread implements Runnable { logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { List updateDatas = Db.use("masterDataSource") - .find(" select * from " + record.getStr("table_name") + " where " + .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));