From 57f8111c942d2f3919d4f0879926d960c245c88e Mon Sep 17 00:00:00 2001 From: default Date: Sat, 29 Sep 2018 11:38:05 +0800 Subject: [PATCH] update --- nms_sync/bin/db.properties | 2 +- nms_sync/conf/db.properties | 2 +- nms_sync/src/com/nms/main/SyncData.java | 4 +-- nms_sync/src/com/nms/test/TestClass.java | 19 ++++++++----- .../nms/thread/SyncSlaveToMasterThread.java | 27 ++++++++++--------- nms_sync/src/com/nms/thread/SyncThread.java | 23 ++++++++-------- 6 files changed, 42 insertions(+), 35 deletions(-) diff --git a/nms_sync/bin/db.properties b/nms_sync/bin/db.properties index f9730b4..2ab0803 100644 --- a/nms_sync/bin/db.properties +++ b/nms_sync/bin/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://10.0.6.247:3306/nms-dev +dburl=jdbc:mysql://192.168.10.126:3306/nms-dev #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=root #\u6570\u636e\u5e93\u5bc6\u7801 diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties index f9730b4..2ab0803 100644 --- a/nms_sync/conf/db.properties +++ b/nms_sync/conf/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://10.0.6.247:3306/nms-dev +dburl=jdbc:mysql://192.168.10.126:3306/nms-dev #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=root #\u6570\u636e\u5e93\u5bc6\u7801 diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java index d78727c..a2970ad 100644 --- a/nms_sync/src/com/nms/main/SyncData.java +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -54,10 +54,10 @@ public class SyncData{ // 主库向分库同步数据 SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); logger.info("创建主库同步分库线程执行任务"); - scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); + //scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); // 分库向主库同步数据 logger.info("创建分库数据同步到主库线程执行任务"); - //scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); + scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); } }else{ logger.info("获取同步记录信息失败 请检查数据库数据信息"); diff --git a/nms_sync/src/com/nms/test/TestClass.java b/nms_sync/src/com/nms/test/TestClass.java index c9190ed..b07f5ce 100644 --- a/nms_sync/src/com/nms/test/TestClass.java +++ b/nms_sync/src/com/nms/test/TestClass.java @@ -45,9 +45,9 @@ public class TestClass { private static Connection getNmsConnection(){ String driver ="com.mysql.jdbc.Driver"; - String url="jdbc:mysql://10.0.6.247:3306/nms"; - String username="nms"; - String password="nms"; + String url="jdbc:mysql://192.168.10.126:3306/nms-dev"; + String username="root"; + String password="111111"; Connection conn=null; try{ Class.forName(driver); @@ -91,16 +91,21 @@ public class TestClass { @Test public void importData(){ Connection nmsConn = getNmsConnection(); - Connection conn = getConnection(); PreparedStatement pstmt=null; PreparedStatement pstmt2=null; try { pstmt=(PreparedStatement)nmsConn.prepareStatement("select table_name as tn from information_schema.tables where TABLE_SCHEMA='nms'"); ResultSet resultSet = pstmt.executeQuery(); - pstmt2=(PreparedStatement)conn.prepareStatement("insert into table_sync_info (table_name,event,last_id,last_date,db_id,mode) values (?,?,?,?,?,?)"); + pstmt2=(PreparedStatement)nmsConn.prepareStatement("insert into table_sync_info (table_name,event,last_id,last_date,db_id,mode) values (?,?,?,?,?,?)"); while(resultSet.next()){ String tableName = resultSet.getString("tn"); - + if(tableName.indexOf("d")==0) { + continue; + }else if(tableName.indexOf("m")==0) { + continue; + }else if(tableName.indexOf("v_")==0) { + continue; + } pstmt2.setString(1, tableName); pstmt2.setInt(2, 1); pstmt2.setInt(3, 1); @@ -144,7 +149,7 @@ public class TestClass { e.printStackTrace(); } try { - conn.close(); + nmsConn.close(); } catch (SQLException e) { e.printStackTrace(); } diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index fcf5f5d..f07bba5 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -30,10 +30,11 @@ public class SyncSlaveToMasterThread implements Runnable{ // 主库向分库同步数据 logger.info("开始分库数据同步主库"); // 获取url路径 - final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" - + syncDbInfo.get("database_name"); + final StringBuffer url=new StringBuffer(); + url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + + syncDbInfo.get("database_name")); logger.info("当前分库数据库连接为"+url); - List find = Db.use(url).find("select * from table_sync_info"); + List find = Db.use(url.toString()).find("select * from table_sync_info"); logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { @@ -44,13 +45,13 @@ public class SyncSlaveToMasterThread implements Runnable{ if(record.getInt("mode").equals(1)){ while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 - final List data = Db.use(url) + final List data = Db.use(url.toString()) .find("select * from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), record.getInt("last_id")); logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { - Db.use(url).tx(new IAtom() { + Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { return Db.use().tx(new IAtom() { @@ -63,7 +64,7 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use(url).update("table_sync_info", record); + Db.use(url.toString()).update("table_sync_info", record); return true; } }); @@ -78,14 +79,14 @@ public class SyncSlaveToMasterThread implements Runnable{ //当数据库表结构主键不是自增时 增量更新的操作步骤 while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 - final List data =Db.use(url) + final List data =Db.use(url.toString()) .find("select * from table_event_log where table_name = '" + record.getStr("table_name") + "' and id > " + record.getLong("last_id") + " and event = " + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { //多数据源事务 主数据源嵌套子数据源 - Db.use(url).tx(new IAtom() { + Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { return Db.use().tx(new IAtom() { @@ -101,7 +102,7 @@ public class SyncSlaveToMasterThread implements Runnable{ insertStr.append(",?"); } } - List insertDatas = Db.use(url) + List insertDatas = Db.use(url.toString()) .find(" select * from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); @@ -118,7 +119,7 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use(url).update("table_sync_info", record); + Db.use(url.toString()).update("table_sync_info", record); return true; } }); @@ -139,7 +140,7 @@ public class SyncSlaveToMasterThread implements Runnable{ + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size")); logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas)); if (datas != null && datas.size() > 0) { - Db.use(url).tx(new IAtom() { + Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { return Db.use().tx(new IAtom() { @@ -157,7 +158,7 @@ public class SyncSlaveToMasterThread implements Runnable{ } logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { - List updateDatas = Db.use(url) + List updateDatas = Db.use(url.toString()) .find(" select * from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + deleteStr + ") ", updateIds.toArray()); @@ -176,7 +177,7 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId); record.set("last_id", lastUpdateId); record.set("last_date", new Date()); - Db.use(url).update("table_sync_info", record); + Db.use(url.toString()).update("table_sync_info", record); return true; } }); diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index 378ed13..5081eb4 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -19,7 +19,7 @@ import com.jfinal.plugin.activerecord.tx.Tx; * @author Administrator * */ -@Before({/*SyncDataInterceptor.class,*/Tx.class}) +@Before({SyncDataInterceptor.class,Tx.class}) public class SyncThread implements Runnable { private Logger logger = Logger.getLogger(this.getClass()); private SyncDbInfo syncDbInfo; @@ -38,8 +38,9 @@ public class SyncThread implements Runnable { try { logger.info("开始主库数据同步分库任务"); // 获取url路径 - final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" - + syncDbInfo.get("database_name"); + final StringBuffer url=new StringBuffer(); + url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + + syncDbInfo.get("database_name")); logger.info("获取分库数据库连接信息"+url); List find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ", syncDbInfo.get("id")); @@ -64,10 +65,10 @@ public class SyncThread implements Runnable { Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use(url).tx(new IAtom() { + return Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { - Db.use(url).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); + Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); @@ -98,7 +99,7 @@ public class SyncThread implements Runnable { Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use(url).tx(new IAtom() { + return Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { List insertIds = new ArrayList(); @@ -116,13 +117,13 @@ public class SyncThread implements Runnable { + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); for(Record insertData:insertDatas){ - Record seqData = Db.use(url).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); if(record.getStr("table_name").equals("event_record_library")) { insertData.set("sync_status",1); } } - Db.use(url).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get("id"); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); @@ -153,7 +154,7 @@ public class SyncThread implements Runnable { Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use(url).tx(new IAtom() { + return Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); @@ -174,12 +175,12 @@ public class SyncThread implements Runnable { updateIds.toArray()); //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { - Db.use(url).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } logger.info("分库对主库修改操作的数据同步任务完成"); } else if (record.getInt("event") == 3) { - Db.use(url).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" + Db.use(url.toString()).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" + deleteStr + ") ", updateIds.toArray()); logger.info("分库对主库删除操作的数据同步完成"); }