diff --git a/nms_sync/bin/db.properties b/nms_sync/bin/db.properties index 7b577b9..69071e9 100644 --- a/nms_sync/bin/db.properties +++ b/nms_sync/bin/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true +dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=nms #\u6570\u636e\u5e93\u5bc6\u7801 diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties index 7b577b9..69071e9 100644 --- a/nms_sync/conf/db.properties +++ b/nms_sync/conf/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true +dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=nms #\u6570\u636e\u5e93\u5bc6\u7801 diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index e1664cc..91949c7 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -43,6 +43,7 @@ public class SyncSlaveToMasterThread implements Runnable{ //logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { + logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name")); //如果设定指定字段 则只操作指定字段数据 无则操作全部 final StringBuffer columns=new StringBuffer(); columns.append("*"); @@ -71,32 +72,65 @@ public class SyncSlaveToMasterThread implements Runnable{ } // 针对监测结果表的id值 自动生成处理 if("detection_info_new".equals(record.getStr("table_name"))) { - for(Record entity:data) { - Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); - entity.set("old_id",entity.getLong(record.getStr("id_name"))); - entity.set("db_id", syncDbInfo.getInt("id")); - entity.set(record.getStr("id_name"), seqData.getLong("seqId")); - } + Db.use(url.toString()).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + // TODO Auto-generated method stub + return Db.use("masterDataSource").tx(new IAtom() { + @Override + public boolean run() throws SQLException { + final List insertDatas=new ArrayList(); + final List updateDatas=new ArrayList(); + for(Record entity:data) { + // 循环遍历数据 判断当前数据是新增还是修改 + Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",entity.get("DETECTION_SET_INFO_ID"),entity.get("SEQ_ID")); + if(null!=findFirst) { + entity.set(record.getStr("id_name"),findFirst.getStr("id_name")); + updateDatas.add(entity); + }else { + Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + entity.set(record.getStr("id_name"), seqData.getLong("seqId")); + insertDatas.add(entity); + } + } + if(insertDatas.size()>0) { + Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + } + if(updateDatas.size()>0) { + Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } + logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name")); + logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url.toString()).update("table_sync_info", record); + return true; + } + }); + } + }); + }else { + Db.use(url.toString()).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use("masterDataSource").tx(new IAtom() { + @Override + public boolean run() throws SQLException { + Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); + // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 + logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name")); + logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url.toString()).update("table_sync_info", record); + return true; + } + }); + } + }); + logger.info("分库同步增量更新数据完成 修改最后同步ID"); } - Db.use(url.toString()).tx(new IAtom() { - @Override - public boolean run() throws SQLException { - return Db.use("masterDataSource").tx(new IAtom() { - @Override - public boolean run() throws SQLException { - Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); - // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 - logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name")); - logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); - record.set("last_id", lastInsertId); - record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); - return true; - } - }); - } - }); - logger.info("分库同步增量更新数据完成 修改最后同步ID"); } else { flag = false; } @@ -195,7 +229,7 @@ public class SyncSlaveToMasterThread implements Runnable{ updateIds.toArray()); //logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { - if(record.getStr("table_name").equals("event_record_library")||record.getStr("table_name").equals("detection_info_new")) { + if(record.getStr("table_name").equals("event_record_library")) { for(Record updateData:updateDatas) { updateData.set("old_id",updateData.getLong(record.getStr("id_name"))); updateData.set("db_id", syncDbInfo.get("id")); @@ -204,6 +238,12 @@ public class SyncSlaveToMasterThread implements Runnable{ } Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); }else { + if(record.getStr("table_name").equals("detection_info_new")) { + for(Record updateData:updateDatas) { + Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID")); + updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name"))); + } + } Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); }