From 94f6853323dac772778320aefaf1ae530795f82a Mon Sep 17 00:00:00 2001 From: default Date: Mon, 8 Oct 2018 15:44:24 +0800 Subject: [PATCH] update --- nms_sync/src/com/nms/main/SyncData.java | 4 +- .../nms/thread/SyncSlaveToMasterThread.java | 46 +++++++++++++++---- nms_sync/src/com/nms/thread/SyncThread.java | 46 +++++++++++++++---- 3 files changed, 76 insertions(+), 20 deletions(-) diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java index a2970ad..d78727c 100644 --- a/nms_sync/src/com/nms/main/SyncData.java +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -54,10 +54,10 @@ public class SyncData{ // 主库向分库同步数据 SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); logger.info("创建主库同步分库线程执行任务"); - //scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); + scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); // 分库向主库同步数据 logger.info("创建分库数据同步到主库线程执行任务"); - scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); + //scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); } }else{ logger.info("获取同步记录信息失败 请检查数据库数据信息"); diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index f07bba5..603b76f 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -108,10 +108,14 @@ public class SyncSlaveToMasterThread implements Runnable{ insertIds.toArray()); for(Record insertData:insertDatas){ Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); - insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); if(record.getStr("table_name").equals("event_record_library")) { + //设置数据状态为同步数据 insertData.set("sync_status",1); + //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库 + insertData.set("old_id",insertData.getLong(record.getStr("id_name"))); + insertData.set("db_id", syncDbInfo.getInt("id")); } + insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); } Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 @@ -147,30 +151,54 @@ public class SyncSlaveToMasterThread implements Runnable{ @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); - StringBuffer deleteStr = new StringBuffer(); + StringBuffer handleStr = new StringBuffer(); for (int i = 0; i < datas.size(); i++) { updateIds.add(datas.get(i).getInt("target_id")); if (i == 0) { - deleteStr.append("?"); + handleStr.append("?"); } else { - deleteStr.append(",?"); + handleStr.append(",?"); } } logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { List updateDatas = Db.use(url.toString()) .find(" select * from " + record.getStr("table_name") + " where " - + record.getStr("id_name") + " in (" + deleteStr + ") ", + + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), - updateDatas, record.getInt("batch_size")); + if(record.getStr("table_name").equals("event_record_library")) { + for(Record updateData:updateDatas) { + updateData.set("old_id",updateData.getLong("id")); + updateData.set("db_id", syncDbInfo.get("id")); + updateData.remove("id"); + } + Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); + }else { + Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } } logger.info("分库同步主库修改数据任务完成"); } else if (record.getInt("event") == 3) { - Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" - + deleteStr + ") ", updateIds.toArray()); + if(record.getStr("table_name").equals("event_record_library")) { + updateIds.clear(); + handleStr.delete(0,handleStr.length()); + for (int i = 0; i < datas.size(); i++) { + updateIds.add(datas.get(i).getInt("target_id")); + updateIds.add(syncDbInfo.getInt("id")); + if (i == 0) { + handleStr.append("(?,?)"); + } else { + handleStr.append(",(?,?)"); + } + } + Db.use("masterDataSource").update("delete from event_record_library where (old_id,db_id) in ("+handleStr+")",updateIds.toArray()); + }else { + Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" + + handleStr + ") ", updateIds.toArray()); + } logger.info("分库同步主库删除数据任务完成"); } Object lastUpdateId = datas.get(datas.size() - 1).get("id"); diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index 5081eb4..0141f0f 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -118,10 +118,14 @@ public class SyncThread implements Runnable { insertIds.toArray()); for(Record insertData:insertDatas){ Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); - insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); if(record.getStr("table_name").equals("event_record_library")) { + //设置数据状态为同步数据 insertData.set("sync_status",1); + //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库 + insertData.set("old_id",insertData.getLong(record.getStr("id_name"))); + insertData.set("db_id", -1); } + insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); } Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 @@ -158,30 +162,54 @@ public class SyncThread implements Runnable { @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); - StringBuffer deleteStr = new StringBuffer(); + StringBuffer handleStr = new StringBuffer(); for (int i = 0; i < datas.size(); i++) { updateIds.add(datas.get(i).getInt("target_id")); if (i == 0) { - deleteStr.append("?"); + handleStr.append("?"); } else { - deleteStr.append(",?"); + handleStr.append(",?"); } } logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { List updateDatas = Db.use("masterDataSource") .find(" select * from " + record.getStr("table_name") + " where " - + record.getStr("id_name") + " in (" + deleteStr + ") ", + + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { - Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), - updateDatas, record.getInt("batch_size")); + if(record.getStr("table_name").equals("event_record_library")) { + for(Record updateData:updateDatas) { + updateData.set("old_id",updateData.getLong("id")); + updateData.set("db_id", -1); + updateData.remove("id"); + } + Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); + }else { + Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } } logger.info("分库对主库修改操作的数据同步任务完成"); } else if (record.getInt("event") == 3) { - Db.use(url.toString()).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" - + deleteStr + ") ", updateIds.toArray()); + if(record.getStr("table_name").equals("event_record_library")) { + updateIds.clear(); + handleStr.delete(0,handleStr.length()); + for (int i = 0; i < datas.size(); i++) { + updateIds.add(datas.get(i).getInt("target_id")); + updateIds.add(-1); + if (i == 0) { + handleStr.append("(?,?)"); + } else { + handleStr.append(",(?,?)"); + } + } + Db.use(url.toString()).update("delete from event_record_library where (old_id,db_id) in ("+handleStr+")",updateIds.toArray()); + }else { + Db.use(url.toString()).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" + + handleStr + ") ", updateIds.toArray()); + } logger.info("分库对主库删除操作的数据同步完成"); } Object lastUpdateId = datas.get(datas.size() - 1).get("id");