This commit is contained in:
default
2018-11-20 09:29:38 +08:00
parent 84a84760ff
commit d8d7912919
6 changed files with 109 additions and 87 deletions

View File

@@ -1,6 +1,6 @@
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
dburl=jdbc:mysql://192.168.11.67:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
#\u6570\u636e\u5e93\u8d26\u6237\u540d
dbusername=nms
#\u6570\u636e\u5e93\u5bc6\u7801
@@ -8,9 +8,9 @@ dbpassword=nms
#\u6570\u636e\u5e93\u540d\u79f0
dbname=nms
#\u8fde\u63a5\u6c60\u521d\u59cb\u5316\u5927\u5c0f
dbInitialSize=1
dbInitialSize=3
#\u6700\u5927\u8fde\u63a5\u6570
dbMaxActive=2
dbMaxActive=6
#\u6700\u5c0f\u8fde\u63a5\u6570
dbMinIdle=1
#\u6700\u5927\u7b49\u5f85\u8fde\u63a5\u65f6\u95f4

View File

@@ -1,6 +1,6 @@
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
dburl=jdbc:mysql://192.168.10.182:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
dburl=jdbc:mysql://192.168.11.67:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
#\u6570\u636e\u5e93\u8d26\u6237\u540d
dbusername=nms
#\u6570\u636e\u5e93\u5bc6\u7801
@@ -8,9 +8,9 @@ dbpassword=nms
#\u6570\u636e\u5e93\u540d\u79f0
dbname=nms
#\u8fde\u63a5\u6c60\u521d\u59cb\u5316\u5927\u5c0f
dbInitialSize=1
dbInitialSize=3
#\u6700\u5927\u8fde\u63a5\u6570
dbMaxActive=2
dbMaxActive=6
#\u6700\u5c0f\u8fde\u63a5\u6570
dbMinIdle=1
#\u6700\u5927\u7b49\u5f85\u8fde\u63a5\u65f6\u95f4

View File

@@ -9,6 +9,7 @@ import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Interceptor;
import com.jfinal.aop.Invocation;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.Record;
import com.nms.model.SyncDbInfo;
import com.nms.thread.SyncThread;
@@ -25,8 +26,12 @@ public class SyncDataInterceptor implements Interceptor{
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
logger.info("当前数据库连接为 "+url);
DbPro masterDb = Db.use("masterDataSource");
DbPro slaveDb = Db.use(url.toString());
//同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构
Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId());
Record metadataTableSyncInfo = masterDb.findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId());
logger.info("获取metadata表中最后一次同步id的数据信息为 "+JSON.toJSONString(metadataTableSyncInfo));
//开始执行同步过程
inv.invoke();
@@ -34,11 +39,11 @@ public class SyncDataInterceptor implements Interceptor{
//判断metadata表 是否有新增数据 如果有执行sql 修改表结构
if(metadataTableSyncInfo!=null){
List<Record> metadatas = Db.use(url).find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id"));
List<Record> metadatas = slaveDb.find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id"));
//logger.info("metadata表中新增数据信息查询结果为 "+JSON.toJSONString(metadatas));
if(metadatas!=null && metadatas.size()>0){
for(Record metadata:metadatas){
Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name"));
Record isExist = slaveDb.findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name"));
logger.info("判断metadata表新增数据是修改还是新增表操作结果为"+JSON.toJSONString(isExist));
//向数据库中添加新的字段
if(isExist.getInt("count")>0){
@@ -48,17 +53,17 @@ public class SyncDataInterceptor implements Interceptor{
sqlString.append(metadata.getStr("filed_name")+" "+ toMysqlType(metadata.getStr("filed_type"))+")");
logger.info("修改metadata表结构 sql语句为"+sqlString.toString());
//执行添加字段
int resu =Db.use(url).update(sqlString.toString());
int resu =slaveDb.update(sqlString.toString());
logger.info("修改表结构结果为 "+resu);
}
}
}
}
//判断check_type_info表 是否有新增数据 如果有执行存储过程 创建新表
List<Record> checkTypeInfos = Db.use(url).find(" select * from check_type_info where crete_state=0 ");
List<Record> checkTypeInfos = slaveDb.find(" select * from check_type_info where crete_state=0 ");
for(Record checkTypeInfo : checkTypeInfos){
//判断表是否存在
Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME"));
Record isExist = slaveDb.findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME"));
logger.info("check_type_info表中有新增数据 判断表是否创建"+JSON.toJSONString(isExist));
if(isExist.getInt("count")>0){
continue;
@@ -68,7 +73,7 @@ public class SyncDataInterceptor implements Interceptor{
String filedType ="";
StringBuffer sql= new StringBuffer();
StringBuffer cIndexFileds = new StringBuffer("data_check_time:seq_id:detection_set_info_id:");
List<Record> metadatas2 = Db.use(url).find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID"));
List<Record> metadatas2 = slaveDb.find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID"));
if(metadatas2!=null && metadatas2.size()>0) {
for(int i=0;i<metadatas2.size();i++){
filedName = metadatas2.get(i).getStr("filed_name");
@@ -90,19 +95,22 @@ public class SyncDataInterceptor implements Interceptor{
logger.info("check_type_info新增数据创建表结构 参数信息 sql"+sql.toString());
logger.info("check_type_info新增数据创建表结构 参数信息 字段名称"+cIndexFileds.toString());
SyncStoredProcedure syncStoreProcedure=new SyncStoredProcedure("pro_createTable",checkTypeInfo.getStr("TABLE_NAME"), sql.toString(),cIndexFileds.toString());
Object execute = Db.use(url).execute(syncStoreProcedure);
logger.info("创建新表操作完成"+execute);
Db.use(url).execute(syncStoreProcedure);
logger.info("创建新表操作完成");
//创建完新表结构以后 将同步信息添加到同步表 使新配置的监测数据同步到中心
Record record = new Record();
record.set("table_name", checkTypeInfo.getStr("TABLE_NAME"));
record.set("mode", 2);
record.set("id_name", "ID");
record.set("event",1);
record.set("last_id", -1);
record.set("db_id", -1);
record.set("last_date", new Date());
Db.use(url).save("table_sync_info", record);
Record hadExist = slaveDb.findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME"));
if(hadExist.getInt("count")>0) {
Record record = new Record();
record.set("table_name", checkTypeInfo.getStr("TABLE_NAME"));
record.set("mode", 2);
record.set("id_name", "ID");
record.set("event",1);
record.set("last_id", -1);
record.set("db_id", -1);
record.set("last_date", new Date());
slaveDb.save("table_sync_info", record);
}
}
}

View File

@@ -14,6 +14,7 @@ import com.jfinal.aop.Interceptor;
import com.jfinal.aop.Invocation;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.Record;
import com.nms.model.SetInfo;
import com.nms.model.SyncDbInfo;
@@ -44,14 +45,18 @@ public class SyncSocketInterceptor implements Interceptor{
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
logger.info("当前数据库连接为 "+url);
Record detectionSetInfoTableInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='detection_set_info' and event=1 and db_id=?",syncDbInfo.getId());
DbPro masterDb = Db.use("masterDataSource");
DbPro slaveDb = Db.use(url.toString());
Record detectionSetInfoTableInfo = masterDb.findFirst("select * from table_sync_info where table_name='detection_set_info' and event=1 and db_id=?",syncDbInfo.getId());
logger.info("获取detectionSetInfo表中最后一次同步id的数据信息为 "+JSON.toJSONString(detectionSetInfoTableInfo));
Record detectionSetInfoTableUpdateInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='detection_set_info' and event=2 and db_id=?",syncDbInfo.getId());
Record detectionSetInfoTableUpdateInfo = masterDb.findFirst("select * from table_sync_info where table_name='detection_set_info' and event=2 and db_id=?",syncDbInfo.getId());
logger.info("获取detectionSetInfo表中最后一次同步修改id的数据信息为 "+JSON.toJSONString(detectionSetInfoTableUpdateInfo));
//查询所有修改数据信息
List<Record> datas = Db.use("masterDataSource")
List<Record> datas = masterDb
.find("select * from table_event_log where table_name = '" + detectionSetInfoTableUpdateInfo.getStr("table_name")
+ "' and id > " + detectionSetInfoTableUpdateInfo.getLong("last_id") + " and event = "
+ detectionSetInfoTableUpdateInfo.getInt("event") + " order by id asc ");
@@ -71,7 +76,7 @@ public class SyncSocketInterceptor implements Interceptor{
Map<Long,Record> map=new HashMap<Long,Record>();
//查询修改前数据的信息
if(updateIds.size()>0) {
detectionSetUpdateInfos= Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray());
detectionSetUpdateInfos= slaveDb.find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray());
if(null!=detectionSetUpdateInfos&&detectionSetUpdateInfos.size()>0) {
for(Record detectionSetInfo:detectionSetUpdateInfos) {
map.put(detectionSetInfo.getLong("ID"),detectionSetInfo);
@@ -88,7 +93,7 @@ public class SyncSocketInterceptor implements Interceptor{
// 查询出 修改或者更新的监测配置信息 如果没有的话不进行任何操作
if(detectionSetInfoTableInfo!=null) {
List<Record> detectionSetInfos = Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",detectionSetInfoTableInfo.getLong("last_id"));
List<Record> detectionSetInfos = slaveDb.find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",detectionSetInfoTableInfo.getLong("last_id"));
if(null!=detectionSetInfos&&detectionSetInfos.size()>0) {
for(Record detectionSetInfo:detectionSetInfos) {
SetInfo o=null;
@@ -128,7 +133,7 @@ public class SyncSocketInterceptor implements Interceptor{
if(null!=detectionSetUpdateInfos&&detectionSetUpdateInfos.size()>0) {
//查询出已经修改的监测配置数据
List<Record> detectionSetInfos = Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray());
List<Record> detectionSetInfos = slaveDb.find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray());
if(null!=detectionSetInfos&&detectionSetInfos.size()>0) {
for (Record detectionSetInfo : detectionSetInfos) {
SetInfo o=null;

View File

@@ -12,12 +12,13 @@ import com.nms.main.SyncData;
import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Before;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.activerecord.tx.Tx;
import com.nms.model.SyncDbInfo;
@Before({SyncMissionResultStatisticalInterceptor.class,Tx.class})
@Before({SyncMissionResultStatisticalInterceptor.class})
public class SyncSlaveToMasterThread implements Runnable{
private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo;
@@ -44,7 +45,11 @@ public class SyncSlaveToMasterThread implements Runnable{
url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true");
logger.info("当前分库数据库连接为"+url);
List<Record> find = Db.use(url.toString()).find("select * from table_sync_info");
final DbPro masterDb = Db.use("masterDataSource");
final DbPro slaveDb = Db.use(url.toString());
List<Record> find = slaveDb.find("select * from table_sync_info");
//logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
@@ -69,7 +74,7 @@ public class SyncSlaveToMasterThread implements Runnable{
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据
final List<Record> data = Db.use(url.toString())
final List<Record> data = slaveDb
.find("select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
record.getLong("last_id"));
@@ -96,58 +101,57 @@ public class SyncSlaveToMasterThread implements Runnable{
}
// 针对监测结果表的id值 自动生成处理
if("detection_info_new".equals(record.getStr("table_name"))) {
Db.use(url.toString()).tx(new IAtom() {
slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
// TODO Auto-generated method stub
return Db.use("masterDataSource").tx(new IAtom() {
return masterDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
final List<Record> insertDatas=new ArrayList<Record>();
final List<Record> updateDatas=new ArrayList<Record>();
for(Record entity:data) {
// 循环遍历数据 判断当前数据是新增还是修改
Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",entity.get("DETECTION_SET_INFO_ID"),entity.get("SEQ_ID"));
Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",entity.get("DETECTION_SET_INFO_ID"),entity.get("SEQ_ID"));
if(null!=findFirst) {
entity.set(record.getStr("id_name"),findFirst.getStr("id_name"));
updateDatas.add(entity);
}else {
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
Record seqData = masterDb.findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
entity.set(record.getStr("id_name"), seqData.getLong("seqId"));
insertDatas.add(entity);
}
}
if(insertDatas.size()>0) {
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
}
if(updateDatas.size()>0) {
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name"));
logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use(url.toString()).update("table_sync_info", record);
slaveDb.update("table_sync_info", record);
return true;
}
});
}
});
}else {
Db.use(url.toString()).tx(new IAtom() {
slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use("masterDataSource").tx(new IAtom() {
return masterDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
masterDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name"));
logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use(url.toString()).update("table_sync_info", record);
slaveDb.update("table_sync_info", record);
return true;
}
});
@@ -163,17 +167,17 @@ public class SyncSlaveToMasterThread implements Runnable{
//当数据库表结构主键不是自增时 增量更新的操作步骤
while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据
final List<Record> data =Db.use(url.toString())
final List<Record> data =slaveDb
.find("select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
if (data != null && data.size() > 0) {
//多数据源事务 主数据源嵌套子数据源
Db.use(url.toString()).tx(new IAtom() {
slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use("masterDataSource").tx(new IAtom() {
return masterDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> insertIds = new ArrayList<Long>();
@@ -186,14 +190,14 @@ public class SyncSlaveToMasterThread implements Runnable{
insertStr.append(",?");
}
}
List<Record> insertDatas = Db.use(url.toString())
List<Record> insertDatas = slaveDb
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + insertStr + ") ",
insertIds.toArray());
if(!record.getStr("table_name").equals("loopmission_state_table")) {
for(Record insertData:insertDatas){
if(record.getStr("table_name").equals("event_record_library")) {
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
Record seqData = masterDb.findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
//设置数据状态为同步数据
insertData.set("sync_status",1);
//设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
@@ -202,13 +206,13 @@ public class SyncSlaveToMasterThread implements Runnable{
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
}
}
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get("id");
logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use(url.toString()).update("table_sync_info", record);
slaveDb.update("table_sync_info", record);
return true;
}else{
Object lastInsertId = data.get(data.size() - 1).get("id");
@@ -218,7 +222,7 @@ public class SyncSlaveToMasterThread implements Runnable{
final List<Record> updateDatas=new ArrayList<Record>();
for(Record entity:insertDatas) {
// 循环遍历数据 判断当前数据是新增还是修改
Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where "+record.getStr("id_name")+"= ? ",entity.get(record.getStr("id_name")));
if(null!=findFirst) {
updateDatas.add(entity);
loopUpdateSet.add(entity.getLong(record.getStr("id_name")));
@@ -230,17 +234,17 @@ public class SyncSlaveToMasterThread implements Runnable{
SyncData.setThreadlocalLoopUpdateMissionIds(loopUpdateSet);
SyncData.setThreadlocalLoopInsertMissionIds(set);
if(insertDatas2.size()>0) {
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas2, record.getInt("batch_size"));
masterDb.batchSave(record.getStr("table_name"), insertDatas2, record.getInt("batch_size"));
}
if(updateDatas.size()>0) {
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use(url.toString()).update("table_sync_info", record);
slaveDb.update("table_sync_info", record);
return true;
}
}
@@ -256,16 +260,16 @@ public class SyncSlaveToMasterThread implements Runnable{
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
while (flag) {
final List<Record> datas = Db.use(url.toString()).find(
final List<Record> datas = slaveDb.find(
" select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
//logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
if (datas != null && datas.size() > 0) {
Db.use(url.toString()).tx(new IAtom() {
slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use("masterDataSource").tx(new IAtom() {
return masterDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> updateIds = new ArrayList<Long>();
@@ -281,7 +285,7 @@ public class SyncSlaveToMasterThread implements Runnable{
}
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) {
List<Record> updateDatas = Db.use(url.toString())
List<Record> updateDatas = slaveDb
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + handleStr + ") ",
updateIds.toArray());
@@ -294,12 +298,12 @@ public class SyncSlaveToMasterThread implements Runnable{
updateData.remove(record.getStr("id_name"));
updateData.remove("sync_status");
}
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
masterDb.batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
}else if(record.getStr("table_name").equals("detection_info_warning")){
for(Record updateData:updateDatas) {
updateData.remove(record.getStr("id_name"));
}
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size"));
masterDb.batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size"));
}else if(record.getStr("table_name").contains("mission_result_table")){
Set<Long> missionIds = SyncData.getThreadlocalUpdateMissionIds();
for(Record updateData:updateDatas) {
@@ -307,11 +311,11 @@ public class SyncSlaveToMasterThread implements Runnable{
updateData.remove(record.getStr("id_name"));
}
SyncData.setThreadlocalUpdateMissionIds(missionIds);
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size"));
masterDb.batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size"));
}else {
if(record.getStr("table_name").equals("detection_info_new")) {
for(Record updateData:updateDatas) {
Record findFirst = Db.use("masterDataSource").findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID"));
Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID"));
updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name")));
}
}
@@ -322,7 +326,7 @@ public class SyncSlaveToMasterThread implements Runnable{
}
SyncData.setThreadlocalLoopUpdateMissionIds(missionIds);
}
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
}
@@ -337,9 +341,9 @@ public class SyncSlaveToMasterThread implements Runnable{
deleteRecords.add(deleteRecord);
}
if(record.getStr("table_name").equals("event_record_library")) {
Db.use("masterDataSource").batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size"));
masterDb.batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size"));
}else {
Db.use("masterDataSource").batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
masterDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
}
logger.info("分库同步主库删除数据任务完成");
}
@@ -347,7 +351,7 @@ public class SyncSlaveToMasterThread implements Runnable{
logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId);
record.set("last_id", lastUpdateId);
record.set("last_date", new Date());
Db.use(url.toString()).update("table_sync_info", record);
slaveDb.update("table_sync_info", record);
return true;
}
});

View File

@@ -8,6 +8,7 @@ import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Before;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.Record;
import com.nms.interceptor.SyncDataInterceptor;
@@ -20,7 +21,7 @@ import com.jfinal.plugin.activerecord.tx.Tx;
* @author Administrator
*
*/
@Before({SyncDataInterceptor.class,SyncSocketInterceptor.class,Tx.class})
@Before({SyncDataInterceptor.class,SyncSocketInterceptor.class})
public class SyncThread implements Runnable {
private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo;
@@ -44,7 +45,11 @@ public class SyncThread implements Runnable {
url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true");
logger.info("获取分库数据库连接信息"+url);
List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ",
final DbPro masterDb = Db.use("masterDataSource");
final DbPro slaveDb = Db.use(url.toString());
List<Record> find = masterDb.find("select * from table_sync_info where db_id=? ",
syncDbInfo.get("id"));
//logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
@@ -71,7 +76,7 @@ public class SyncThread implements Runnable {
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
while (flag) {
// 查询增量数据
final List<Record> data =Db.use("masterDataSource")
final List<Record> data =masterDb
.find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
record.getLong("last_id"));
@@ -92,15 +97,15 @@ public class SyncThread implements Runnable {
Db.use().tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use(url.toString()).tx(new IAtom() {
return slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
slaveDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use("masterDataSource").update("table_sync_info", record);
masterDb.update("table_sync_info", record);
return true;
}
});
@@ -115,7 +120,7 @@ public class SyncThread implements Runnable {
//当数据库表结构主键不是自增时 增量更新的操作步骤
while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据
final List<Record> data =Db.use("masterDataSource")
final List<Record> data =masterDb
.find("select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
@@ -125,7 +130,7 @@ public class SyncThread implements Runnable {
Db.use().tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use(url.toString()).tx(new IAtom() {
return slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> insertIds = new ArrayList<Long>();
@@ -138,12 +143,12 @@ public class SyncThread implements Runnable {
insertStr.append(",?");
}
}
List<Record> insertDatas = Db.use("masterDataSource")
List<Record> insertDatas = masterDb
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + insertStr + ") ",
insertIds.toArray());
for(Record insertData:insertDatas){
Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
Record seqData = slaveDb.findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
if(record.getStr("table_name").equals("event_record_library")) {
//设置数据状态为同步数据
insertData.set("sync_status",1);
@@ -153,13 +158,13 @@ public class SyncThread implements Runnable {
}
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
}
Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
slaveDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get("id");
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use("masterDataSource").update("table_sync_info", record);
masterDb.update("table_sync_info", record);
return true;
}
});
@@ -185,7 +190,7 @@ public class SyncThread implements Runnable {
Db.use().tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use(url.toString()).tx(new IAtom() {
return slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> updateIds = new ArrayList<Long>();
@@ -201,7 +206,7 @@ public class SyncThread implements Runnable {
}
logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) {
List<Record> updateDatas = Db.use("masterDataSource")
List<Record> updateDatas = masterDb
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + handleStr + ") ",
updateIds.toArray());
@@ -214,7 +219,7 @@ public class SyncThread implements Runnable {
updateData.remove(record.getStr("id_name"));
updateData.remove("sync_status");
}
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
slaveDb.batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
}else {
if(record.getStr("table_name").equals("node_table")) {
for(Record updateData:updateDatas) {
@@ -228,7 +233,7 @@ public class SyncThread implements Runnable {
}
}
}
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
slaveDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
}
@@ -243,9 +248,9 @@ public class SyncThread implements Runnable {
deleteRecords.add(deleteRecord);
}
if(record.getStr("table_name").equals("event_record_library")) {
Db.use(url.toString()).batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size"));
slaveDb.batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size"));
}else {
Db.use(url.toString()).batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
slaveDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
}
logger.info("分库对主库删除操作的数据同步完成");
}
@@ -253,7 +258,7 @@ public class SyncThread implements Runnable {
logger.info("获取最后一次修改或者删除操作的数据ID信息"+JSON.toJSONString(lastUpdateId));
record.set("last_id", lastUpdateId);
record.set("last_date", new Date());
Db.use("masterDataSource").update("table_sync_info", record);
masterDb.update("table_sync_info", record);
logger.info("修改table_sync_info记录结果 用于下次同步完成");
return true;
}