From d8d79129191548793ad2a59ff4c36c5bee7b41ed Mon Sep 17 00:00:00 2001 From: default Date: Tue, 20 Nov 2018 09:29:38 +0800 Subject: [PATCH] update --- nms_sync/bin/db.properties | 6 +- nms_sync/conf/db.properties | 6 +- .../nms/interceptor/SyncDataInterceptor.java | 44 +++++----- .../interceptor/SyncSocketInterceptor.java | 17 ++-- .../nms/thread/SyncSlaveToMasterThread.java | 80 ++++++++++--------- nms_sync/src/com/nms/thread/SyncThread.java | 43 +++++----- 6 files changed, 109 insertions(+), 87 deletions(-) diff --git a/nms_sync/bin/db.properties b/nms_sync/bin/db.properties index 5d43533..84039a1 100644 --- a/nms_sync/bin/db.properties +++ b/nms_sync/bin/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true +dburl=jdbc:mysql://192.168.11.67:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=nms #\u6570\u636e\u5e93\u5bc6\u7801 @@ -8,9 +8,9 @@ dbpassword=nms #\u6570\u636e\u5e93\u540d\u79f0 dbname=nms #\u8fde\u63a5\u6c60\u521d\u59cb\u5316\u5927\u5c0f -dbInitialSize=1 +dbInitialSize=3 #\u6700\u5927\u8fde\u63a5\u6570 -dbMaxActive=2 +dbMaxActive=6 #\u6700\u5c0f\u8fde\u63a5\u6570 dbMinIdle=1 #\u6700\u5927\u7b49\u5f85\u8fde\u63a5\u65f6\u95f4 diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties index 5d43533..84039a1 100644 --- a/nms_sync/conf/db.properties +++ b/nms_sync/conf/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true +dburl=jdbc:mysql://192.168.11.67:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=nms #\u6570\u636e\u5e93\u5bc6\u7801 @@ -8,9 +8,9 @@ dbpassword=nms #\u6570\u636e\u5e93\u540d\u79f0 dbname=nms #\u8fde\u63a5\u6c60\u521d\u59cb\u5316\u5927\u5c0f -dbInitialSize=1 +dbInitialSize=3 #\u6700\u5927\u8fde\u63a5\u6570 -dbMaxActive=2 +dbMaxActive=6 #\u6700\u5c0f\u8fde\u63a5\u6570 dbMinIdle=1 #\u6700\u5927\u7b49\u5f85\u8fde\u63a5\u65f6\u95f4 diff --git a/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java index e47a404..233fde9 100644 --- a/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java +++ b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java @@ -9,6 +9,7 @@ import com.alibaba.fastjson.JSON; import com.jfinal.aop.Interceptor; import com.jfinal.aop.Invocation; import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.DbPro; import com.jfinal.plugin.activerecord.Record; import com.nms.model.SyncDbInfo; import com.nms.thread.SyncThread; @@ -25,8 +26,12 @@ public class SyncDataInterceptor implements Interceptor{ String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"; logger.info("当前数据库连接为 "+url); + + DbPro masterDb = Db.use("masterDataSource"); + DbPro slaveDb = Db.use(url.toString()); + //同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构 - Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId()); + Record metadataTableSyncInfo = masterDb.findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId()); logger.info("获取metadata表中最后一次同步id的数据信息为 "+JSON.toJSONString(metadataTableSyncInfo)); //开始执行同步过程 inv.invoke(); @@ -34,11 +39,11 @@ public class SyncDataInterceptor implements Interceptor{ //判断metadata表 是否有新增数据 如果有执行sql 修改表结构 if(metadataTableSyncInfo!=null){ - List metadatas = Db.use(url).find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id")); + List metadatas = slaveDb.find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id")); //logger.info("metadata表中新增数据信息查询结果为 "+JSON.toJSONString(metadatas)); if(metadatas!=null && metadatas.size()>0){ for(Record metadata:metadatas){ - Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name")); + Record isExist = slaveDb.findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name")); logger.info("判断metadata表新增数据是修改还是新增表操作结果为"+JSON.toJSONString(isExist)); //向数据库中添加新的字段 if(isExist.getInt("count")>0){ @@ -48,17 +53,17 @@ public class SyncDataInterceptor implements Interceptor{ sqlString.append(metadata.getStr("filed_name")+" "+ toMysqlType(metadata.getStr("filed_type"))+")"); logger.info("修改metadata表结构 sql语句为"+sqlString.toString()); //执行添加字段 - int resu =Db.use(url).update(sqlString.toString()); + int resu =slaveDb.update(sqlString.toString()); logger.info("修改表结构结果为 "+resu); } } } } //判断check_type_info表 是否有新增数据 如果有执行存储过程 创建新表 - List checkTypeInfos = Db.use(url).find(" select * from check_type_info where crete_state=0 "); + List checkTypeInfos = slaveDb.find(" select * from check_type_info where crete_state=0 "); for(Record checkTypeInfo : checkTypeInfos){ //判断表是否存在 - Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME")); + Record isExist = slaveDb.findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME")); logger.info("check_type_info表中有新增数据 判断表是否创建"+JSON.toJSONString(isExist)); if(isExist.getInt("count")>0){ continue; @@ -68,7 +73,7 @@ public class SyncDataInterceptor implements Interceptor{ String filedType =""; StringBuffer sql= new StringBuffer(); StringBuffer cIndexFileds = new StringBuffer("data_check_time:seq_id:detection_set_info_id:"); - List metadatas2 = Db.use(url).find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID")); + List metadatas2 = slaveDb.find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID")); if(metadatas2!=null && metadatas2.size()>0) { for(int i=0;i0) { + Record record = new Record(); + record.set("table_name", checkTypeInfo.getStr("TABLE_NAME")); + record.set("mode", 2); + record.set("id_name", "ID"); + record.set("event",1); + record.set("last_id", -1); + record.set("db_id", -1); + record.set("last_date", new Date()); + slaveDb.save("table_sync_info", record); + } } } diff --git a/nms_sync/src/com/nms/interceptor/SyncSocketInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncSocketInterceptor.java index 32d6694..0f8dbee 100644 --- a/nms_sync/src/com/nms/interceptor/SyncSocketInterceptor.java +++ b/nms_sync/src/com/nms/interceptor/SyncSocketInterceptor.java @@ -14,6 +14,7 @@ import com.jfinal.aop.Interceptor; import com.jfinal.aop.Invocation; import com.jfinal.kit.PropKit; import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.DbPro; import com.jfinal.plugin.activerecord.Record; import com.nms.model.SetInfo; import com.nms.model.SyncDbInfo; @@ -44,14 +45,18 @@ public class SyncSocketInterceptor implements Interceptor{ String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"; logger.info("当前数据库连接为 "+url); - Record detectionSetInfoTableInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='detection_set_info' and event=1 and db_id=?",syncDbInfo.getId()); + + DbPro masterDb = Db.use("masterDataSource"); + DbPro slaveDb = Db.use(url.toString()); + + Record detectionSetInfoTableInfo = masterDb.findFirst("select * from table_sync_info where table_name='detection_set_info' and event=1 and db_id=?",syncDbInfo.getId()); logger.info("获取detectionSetInfo表中最后一次同步id的数据信息为 "+JSON.toJSONString(detectionSetInfoTableInfo)); - Record detectionSetInfoTableUpdateInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='detection_set_info' and event=2 and db_id=?",syncDbInfo.getId()); + Record detectionSetInfoTableUpdateInfo = masterDb.findFirst("select * from table_sync_info where table_name='detection_set_info' and event=2 and db_id=?",syncDbInfo.getId()); logger.info("获取detectionSetInfo表中最后一次同步修改id的数据信息为 "+JSON.toJSONString(detectionSetInfoTableUpdateInfo)); //查询所有修改数据信息 - List datas = Db.use("masterDataSource") + List datas = masterDb .find("select * from table_event_log where table_name = '" + detectionSetInfoTableUpdateInfo.getStr("table_name") + "' and id > " + detectionSetInfoTableUpdateInfo.getLong("last_id") + " and event = " + detectionSetInfoTableUpdateInfo.getInt("event") + " order by id asc "); @@ -71,7 +76,7 @@ public class SyncSocketInterceptor implements Interceptor{ Map map=new HashMap(); //查询修改前数据的信息 if(updateIds.size()>0) { - detectionSetUpdateInfos= Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray()); + detectionSetUpdateInfos= slaveDb.find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray()); if(null!=detectionSetUpdateInfos&&detectionSetUpdateInfos.size()>0) { for(Record detectionSetInfo:detectionSetUpdateInfos) { map.put(detectionSetInfo.getLong("ID"),detectionSetInfo); @@ -88,7 +93,7 @@ public class SyncSocketInterceptor implements Interceptor{ // 查询出 修改或者更新的监测配置信息 如果没有的话不进行任何操作 if(detectionSetInfoTableInfo!=null) { - List detectionSetInfos = Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",detectionSetInfoTableInfo.getLong("last_id")); + List detectionSetInfos = slaveDb.find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",detectionSetInfoTableInfo.getLong("last_id")); if(null!=detectionSetInfos&&detectionSetInfos.size()>0) { for(Record detectionSetInfo:detectionSetInfos) { SetInfo o=null; @@ -128,7 +133,7 @@ public class SyncSocketInterceptor implements Interceptor{ if(null!=detectionSetUpdateInfos&&detectionSetUpdateInfos.size()>0) { //查询出已经修改的监测配置数据 - List detectionSetInfos = Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray()); + List detectionSetInfos = slaveDb.find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray()); if(null!=detectionSetInfos&&detectionSetInfos.size()>0) { for (Record detectionSetInfo : detectionSetInfos) { SetInfo o=null; diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index 124bf4b..d3b0656 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -12,12 +12,13 @@ import com.nms.main.SyncData; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Before; import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.DbPro; import com.jfinal.plugin.activerecord.IAtom; import com.jfinal.plugin.activerecord.Record; import com.jfinal.plugin.activerecord.tx.Tx; import com.nms.model.SyncDbInfo; -@Before({SyncMissionResultStatisticalInterceptor.class,Tx.class}) +@Before({SyncMissionResultStatisticalInterceptor.class}) public class SyncSlaveToMasterThread implements Runnable{ private Logger logger = Logger.getLogger(this.getClass()); private SyncDbInfo syncDbInfo; @@ -44,7 +45,11 @@ public class SyncSlaveToMasterThread implements Runnable{ url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"); logger.info("当前分库数据库连接为"+url); - List find = Db.use(url.toString()).find("select * from table_sync_info"); + + final DbPro masterDb = Db.use("masterDataSource"); + final DbPro slaveDb = Db.use(url.toString()); + + List find = slaveDb.find("select * from table_sync_info"); //logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { @@ -69,7 +74,7 @@ public class SyncSlaveToMasterThread implements Runnable{ if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){ while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 - final List data = Db.use(url.toString()) + final List data = slaveDb .find("select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), record.getLong("last_id")); @@ -96,58 +101,57 @@ public class SyncSlaveToMasterThread implements Runnable{ } // 针对监测结果表的id值 自动生成处理 if("detection_info_new".equals(record.getStr("table_name"))) { - Db.use(url.toString()).tx(new IAtom() { + slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - // TODO Auto-generated method stub - return Db.use("masterDataSource").tx(new IAtom() { + return masterDb.tx(new IAtom() { @Override public boolean run() throws SQLException { final List insertDatas=new ArrayList(); final List updateDatas=new ArrayList(); for(Record entity:data) { // 循环遍历数据 判断当前数据是新增还是修改 - Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",entity.get("DETECTION_SET_INFO_ID"),entity.get("SEQ_ID")); + Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",entity.get("DETECTION_SET_INFO_ID"),entity.get("SEQ_ID")); if(null!=findFirst) { entity.set(record.getStr("id_name"),findFirst.getStr("id_name")); updateDatas.add(entity); }else { - Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + Record seqData = masterDb.findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); entity.set(record.getStr("id_name"), seqData.getLong("seqId")); insertDatas.add(entity); } } if(insertDatas.size()>0) { - Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); } if(updateDatas.size()>0) { - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name")); logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); + slaveDb.update("table_sync_info", record); return true; } }); } }); }else { - Db.use(url.toString()).tx(new IAtom() { + slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use("masterDataSource").tx(new IAtom() { + return masterDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); + masterDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name")); logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); + slaveDb.update("table_sync_info", record); return true; } }); @@ -163,17 +167,17 @@ public class SyncSlaveToMasterThread implements Runnable{ //当数据库表结构主键不是自增时 增量更新的操作步骤 while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 - final List data =Db.use(url.toString()) + final List data =slaveDb .find("select * from table_event_log where table_name = '" + record.getStr("table_name") + "' and id > " + record.getLong("last_id") + " and event = " + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { //多数据源事务 主数据源嵌套子数据源 - Db.use(url.toString()).tx(new IAtom() { + slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use("masterDataSource").tx(new IAtom() { + return masterDb.tx(new IAtom() { @Override public boolean run() throws SQLException { List insertIds = new ArrayList(); @@ -186,14 +190,14 @@ public class SyncSlaveToMasterThread implements Runnable{ insertStr.append(",?"); } } - List insertDatas = Db.use(url.toString()) + List insertDatas = slaveDb .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); if(!record.getStr("table_name").equals("loopmission_state_table")) { for(Record insertData:insertDatas){ if(record.getStr("table_name").equals("event_record_library")) { - Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + Record seqData = masterDb.findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); //设置数据状态为同步数据 insertData.set("sync_status",1); //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库 @@ -202,13 +206,13 @@ public class SyncSlaveToMasterThread implements Runnable{ insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); } } - Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get("id"); logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); + slaveDb.update("table_sync_info", record); return true; }else{ Object lastInsertId = data.get(data.size() - 1).get("id"); @@ -218,7 +222,7 @@ public class SyncSlaveToMasterThread implements Runnable{ final List updateDatas=new ArrayList(); for(Record entity:insertDatas) { // 循环遍历数据 判断当前数据是新增还是修改 - Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name"))); + Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name"))); if(null!=findFirst) { updateDatas.add(entity); loopUpdateSet.add(entity.getLong(record.getStr("id_name"))); @@ -230,17 +234,17 @@ public class SyncSlaveToMasterThread implements Runnable{ SyncData.setThreadlocalLoopUpdateMissionIds(loopUpdateSet); SyncData.setThreadlocalLoopInsertMissionIds(set); if(insertDatas2.size()>0) { - Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas2, record.getInt("batch_size")); + masterDb.batchSave(record.getStr("table_name"), insertDatas2, record.getInt("batch_size")); } if(updateDatas.size()>0) { - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name")); logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); + slaveDb.update("table_sync_info", record); return true; } } @@ -256,16 +260,16 @@ public class SyncSlaveToMasterThread implements Runnable{ } else if (record.getInt("event") == 2 || record.getInt("event") == 3) { // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 while (flag) { - final List datas = Db.use(url.toString()).find( + final List datas = slaveDb.find( " select * from table_event_log where table_name = '" + record.getStr("table_name") + "' and id > " + record.getLong("last_id") + " and event = " + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size")); //logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas)); if (datas != null && datas.size() > 0) { - Db.use(url.toString()).tx(new IAtom() { + slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use("masterDataSource").tx(new IAtom() { + return masterDb.tx(new IAtom() { @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); @@ -281,7 +285,7 @@ public class SyncSlaveToMasterThread implements Runnable{ } logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { - List updateDatas = Db.use(url.toString()) + List updateDatas = slaveDb .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); @@ -294,12 +298,12 @@ public class SyncSlaveToMasterThread implements Runnable{ updateData.remove(record.getStr("id_name")); updateData.remove("sync_status"); } - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); + masterDb.batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); }else if(record.getStr("table_name").equals("detection_info_warning")){ for(Record updateData:updateDatas) { updateData.remove(record.getStr("id_name")); } - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size")); + masterDb.batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size")); }else if(record.getStr("table_name").contains("mission_result_table")){ Set missionIds = SyncData.getThreadlocalUpdateMissionIds(); for(Record updateData:updateDatas) { @@ -307,11 +311,11 @@ public class SyncSlaveToMasterThread implements Runnable{ updateData.remove(record.getStr("id_name")); } SyncData.setThreadlocalUpdateMissionIds(missionIds); - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size")); + masterDb.batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size")); }else { if(record.getStr("table_name").equals("detection_info_new")) { for(Record updateData:updateDatas) { - Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID")); + Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID")); updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name"))); } } @@ -322,7 +326,7 @@ public class SyncSlaveToMasterThread implements Runnable{ } SyncData.setThreadlocalLoopUpdateMissionIds(missionIds); } - Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } } @@ -337,9 +341,9 @@ public class SyncSlaveToMasterThread implements Runnable{ deleteRecords.add(deleteRecord); } if(record.getStr("table_name").equals("event_record_library")) { - Db.use("masterDataSource").batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size")); + masterDb.batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size")); }else { - Db.use("masterDataSource").batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size")); + masterDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size")); } logger.info("分库同步主库删除数据任务完成"); } @@ -347,7 +351,7 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId); record.set("last_id", lastUpdateId); record.set("last_date", new Date()); - Db.use(url.toString()).update("table_sync_info", record); + slaveDb.update("table_sync_info", record); return true; } }); diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index 19f0bdb..49647fc 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -8,6 +8,7 @@ import org.apache.log4j.Logger; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Before; import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.DbPro; import com.jfinal.plugin.activerecord.IAtom; import com.jfinal.plugin.activerecord.Record; import com.nms.interceptor.SyncDataInterceptor; @@ -20,7 +21,7 @@ import com.jfinal.plugin.activerecord.tx.Tx; * @author Administrator * */ -@Before({SyncDataInterceptor.class,SyncSocketInterceptor.class,Tx.class}) +@Before({SyncDataInterceptor.class,SyncSocketInterceptor.class}) public class SyncThread implements Runnable { private Logger logger = Logger.getLogger(this.getClass()); private SyncDbInfo syncDbInfo; @@ -44,7 +45,11 @@ public class SyncThread implements Runnable { url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"); logger.info("获取分库数据库连接信息"+url); - List find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ", + + final DbPro masterDb = Db.use("masterDataSource"); + final DbPro slaveDb = Db.use(url.toString()); + + List find = masterDb.find("select * from table_sync_info where db_id=? ", syncDbInfo.get("id")); //logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { @@ -71,7 +76,7 @@ public class SyncThread implements Runnable { if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){ while (flag) { // 查询增量数据 - final List data =Db.use("masterDataSource") + final List data =masterDb .find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"), record.getLong("last_id")); @@ -92,15 +97,15 @@ public class SyncThread implements Runnable { Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use(url.toString()).tx(new IAtom() { + return slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); + slaveDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use("masterDataSource").update("table_sync_info", record); + masterDb.update("table_sync_info", record); return true; } }); @@ -115,7 +120,7 @@ public class SyncThread implements Runnable { //当数据库表结构主键不是自增时 增量更新的操作步骤 while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 - final List data =Db.use("masterDataSource") + final List data =masterDb .find("select * from table_event_log where table_name = '" + record.getStr("table_name") + "' and id > " + record.getLong("last_id") + " and event = " + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); @@ -125,7 +130,7 @@ public class SyncThread implements Runnable { Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use(url.toString()).tx(new IAtom() { + return slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { List insertIds = new ArrayList(); @@ -138,12 +143,12 @@ public class SyncThread implements Runnable { insertStr.append(",?"); } } - List insertDatas = Db.use("masterDataSource") + List insertDatas = masterDb .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); for(Record insertData:insertDatas){ - Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + Record seqData = slaveDb.findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); if(record.getStr("table_name").equals("event_record_library")) { //设置数据状态为同步数据 insertData.set("sync_status",1); @@ -153,13 +158,13 @@ public class SyncThread implements Runnable { } insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); } - Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + slaveDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get("id"); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); - Db.use("masterDataSource").update("table_sync_info", record); + masterDb.update("table_sync_info", record); return true; } }); @@ -185,7 +190,7 @@ public class SyncThread implements Runnable { Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use(url.toString()).tx(new IAtom() { + return slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); @@ -201,7 +206,7 @@ public class SyncThread implements Runnable { } logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { - List updateDatas = Db.use("masterDataSource") + List updateDatas = masterDb .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); @@ -214,7 +219,7 @@ public class SyncThread implements Runnable { updateData.remove(record.getStr("id_name")); updateData.remove("sync_status"); } - Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); + slaveDb.batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); }else { if(record.getStr("table_name").equals("node_table")) { for(Record updateData:updateDatas) { @@ -228,7 +233,7 @@ public class SyncThread implements Runnable { } } } - Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + slaveDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } } @@ -243,9 +248,9 @@ public class SyncThread implements Runnable { deleteRecords.add(deleteRecord); } if(record.getStr("table_name").equals("event_record_library")) { - Db.use(url.toString()).batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size")); + slaveDb.batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size")); }else { - Db.use(url.toString()).batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size")); + slaveDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size")); } logger.info("分库对主库删除操作的数据同步完成"); } @@ -253,7 +258,7 @@ public class SyncThread implements Runnable { logger.info("获取最后一次修改或者删除操作的数据ID信息"+JSON.toJSONString(lastUpdateId)); record.set("last_id", lastUpdateId); record.set("last_date", new Date()); - Db.use("masterDataSource").update("table_sync_info", record); + masterDb.update("table_sync_info", record); logger.info("修改table_sync_info记录结果 用于下次同步完成"); return true; }