This commit is contained in:
default
2018-09-29 11:38:05 +08:00
parent 9ae6f5addc
commit 57f8111c94
6 changed files with 42 additions and 35 deletions

View File

@@ -1,6 +1,6 @@
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
dburl=jdbc:mysql://10.0.6.247:3306/nms-dev dburl=jdbc:mysql://192.168.10.126:3306/nms-dev
#\u6570\u636e\u5e93\u8d26\u6237\u540d #\u6570\u636e\u5e93\u8d26\u6237\u540d
dbusername=root dbusername=root
#\u6570\u636e\u5e93\u5bc6\u7801 #\u6570\u636e\u5e93\u5bc6\u7801

View File

@@ -1,6 +1,6 @@
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
dburl=jdbc:mysql://10.0.6.247:3306/nms-dev dburl=jdbc:mysql://192.168.10.126:3306/nms-dev
#\u6570\u636e\u5e93\u8d26\u6237\u540d #\u6570\u636e\u5e93\u8d26\u6237\u540d
dbusername=root dbusername=root
#\u6570\u636e\u5e93\u5bc6\u7801 #\u6570\u636e\u5e93\u5bc6\u7801

View File

@@ -54,10 +54,10 @@ public class SyncData{
// 主库向分库同步数据 // 主库向分库同步数据
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
logger.info("创建主库同步分库线程执行任务"); logger.info("创建主库同步分库线程执行任务");
scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); //scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
// 分库向主库同步数据 // 分库向主库同步数据
logger.info("创建分库数据同步到主库线程执行任务"); logger.info("创建分库数据同步到主库线程执行任务");
//scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
} }
}else{ }else{
logger.info("获取同步记录信息失败 请检查数据库数据信息"); logger.info("获取同步记录信息失败 请检查数据库数据信息");

View File

@@ -45,9 +45,9 @@ public class TestClass {
private static Connection getNmsConnection(){ private static Connection getNmsConnection(){
String driver ="com.mysql.jdbc.Driver"; String driver ="com.mysql.jdbc.Driver";
String url="jdbc:mysql://10.0.6.247:3306/nms"; String url="jdbc:mysql://192.168.10.126:3306/nms-dev";
String username="nms"; String username="root";
String password="nms"; String password="111111";
Connection conn=null; Connection conn=null;
try{ try{
Class.forName(driver); Class.forName(driver);
@@ -91,16 +91,21 @@ public class TestClass {
@Test @Test
public void importData(){ public void importData(){
Connection nmsConn = getNmsConnection(); Connection nmsConn = getNmsConnection();
Connection conn = getConnection();
PreparedStatement pstmt=null; PreparedStatement pstmt=null;
PreparedStatement pstmt2=null; PreparedStatement pstmt2=null;
try { try {
pstmt=(PreparedStatement)nmsConn.prepareStatement("select table_name as tn from information_schema.tables where TABLE_SCHEMA='nms'"); pstmt=(PreparedStatement)nmsConn.prepareStatement("select table_name as tn from information_schema.tables where TABLE_SCHEMA='nms'");
ResultSet resultSet = pstmt.executeQuery(); ResultSet resultSet = pstmt.executeQuery();
pstmt2=(PreparedStatement)conn.prepareStatement("insert into table_sync_info (table_name,event,last_id,last_date,db_id,mode) values (?,?,?,?,?,?)"); pstmt2=(PreparedStatement)nmsConn.prepareStatement("insert into table_sync_info (table_name,event,last_id,last_date,db_id,mode) values (?,?,?,?,?,?)");
while(resultSet.next()){ while(resultSet.next()){
String tableName = resultSet.getString("tn"); String tableName = resultSet.getString("tn");
if(tableName.indexOf("d")==0) {
continue;
}else if(tableName.indexOf("m")==0) {
continue;
}else if(tableName.indexOf("v_")==0) {
continue;
}
pstmt2.setString(1, tableName); pstmt2.setString(1, tableName);
pstmt2.setInt(2, 1); pstmt2.setInt(2, 1);
pstmt2.setInt(3, 1); pstmt2.setInt(3, 1);
@@ -144,7 +149,7 @@ public class TestClass {
e.printStackTrace(); e.printStackTrace();
} }
try { try {
conn.close(); nmsConn.close();
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@@ -30,10 +30,11 @@ public class SyncSlaveToMasterThread implements Runnable{
// 主库向分库同步数据 // 主库向分库同步数据
logger.info("开始分库数据同步主库"); logger.info("开始分库数据同步主库");
// 获取url路径 // 获取url路径
final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" final StringBuffer url=new StringBuffer();
+ syncDbInfo.get("database_name"); url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name"));
logger.info("当前分库数据库连接为"+url); logger.info("当前分库数据库连接为"+url);
List<Record> find = Db.use(url).find("select * from table_sync_info"); List<Record> find = Db.use(url.toString()).find("select * from table_sync_info");
logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) { if (find != null && find.size() > 0) {
for (final Record record : find) { for (final Record record : find) {
@@ -44,13 +45,13 @@ public class SyncSlaveToMasterThread implements Runnable{
if(record.getInt("mode").equals(1)){ if(record.getInt("mode").equals(1)){
while (flag) { while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据 // 新增操作 取出最后更新id信息 查询增量数据
final List<Record> data = Db.use(url) final List<Record> data = Db.use(url.toString())
.find("select * from " + record.getStr("table_name") + " where " .find("select * from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
record.getInt("last_id")); record.getInt("last_id"));
logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
if (data != null && data.size() > 0) { if (data != null && data.size() > 0) {
Db.use(url).tx(new IAtom() { Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
return Db.use().tx(new IAtom() { return Db.use().tx(new IAtom() {
@@ -63,7 +64,7 @@ public class SyncSlaveToMasterThread implements Runnable{
logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId); record.set("last_id", lastInsertId);
record.set("last_date", new Date()); record.set("last_date", new Date());
Db.use(url).update("table_sync_info", record); Db.use(url.toString()).update("table_sync_info", record);
return true; return true;
} }
}); });
@@ -78,14 +79,14 @@ public class SyncSlaveToMasterThread implements Runnable{
//当数据库表结构主键不是自增时 增量更新的操作步骤 //当数据库表结构主键不是自增时 增量更新的操作步骤
while (flag) { while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据 // 新增操作 取出最后更新id信息 查询增量数据
final List<Record> data =Db.use(url) final List<Record> data =Db.use(url.toString())
.find("select * from table_event_log where table_name = '" + record.getStr("table_name") .find("select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = " + "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
if (data != null && data.size() > 0) { if (data != null && data.size() > 0) {
//多数据源事务 主数据源嵌套子数据源 //多数据源事务 主数据源嵌套子数据源
Db.use(url).tx(new IAtom() { Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
return Db.use().tx(new IAtom() { return Db.use().tx(new IAtom() {
@@ -101,7 +102,7 @@ public class SyncSlaveToMasterThread implements Runnable{
insertStr.append(",?"); insertStr.append(",?");
} }
} }
List<Record> insertDatas = Db.use(url) List<Record> insertDatas = Db.use(url.toString())
.find(" select * from " + record.getStr("table_name") + " where " .find(" select * from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + insertStr + ") ", + record.getStr("id_name") + " in (" + insertStr + ") ",
insertIds.toArray()); insertIds.toArray());
@@ -118,7 +119,7 @@ public class SyncSlaveToMasterThread implements Runnable{
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId); record.set("last_id", lastInsertId);
record.set("last_date", new Date()); record.set("last_date", new Date());
Db.use(url).update("table_sync_info", record); Db.use(url.toString()).update("table_sync_info", record);
return true; return true;
} }
}); });
@@ -139,7 +140,7 @@ public class SyncSlaveToMasterThread implements Runnable{
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size")); + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas)); logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
if (datas != null && datas.size() > 0) { if (datas != null && datas.size() > 0) {
Db.use(url).tx(new IAtom() { Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
return Db.use().tx(new IAtom() { return Db.use().tx(new IAtom() {
@@ -157,7 +158,7 @@ public class SyncSlaveToMasterThread implements Runnable{
} }
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) { if (record.getInt("event") == 2) {
List<Record> updateDatas = Db.use(url) List<Record> updateDatas = Db.use(url.toString())
.find(" select * from " + record.getStr("table_name") + " where " .find(" select * from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + deleteStr + ") ", + record.getStr("id_name") + " in (" + deleteStr + ") ",
updateIds.toArray()); updateIds.toArray());
@@ -176,7 +177,7 @@ public class SyncSlaveToMasterThread implements Runnable{
logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId); logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId);
record.set("last_id", lastUpdateId); record.set("last_id", lastUpdateId);
record.set("last_date", new Date()); record.set("last_date", new Date());
Db.use(url).update("table_sync_info", record); Db.use(url.toString()).update("table_sync_info", record);
return true; return true;
} }
}); });

View File

@@ -19,7 +19,7 @@ import com.jfinal.plugin.activerecord.tx.Tx;
* @author Administrator * @author Administrator
* *
*/ */
@Before({/*SyncDataInterceptor.class,*/Tx.class}) @Before({SyncDataInterceptor.class,Tx.class})
public class SyncThread implements Runnable { public class SyncThread implements Runnable {
private Logger logger = Logger.getLogger(this.getClass()); private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo; private SyncDbInfo syncDbInfo;
@@ -38,8 +38,9 @@ public class SyncThread implements Runnable {
try { try {
logger.info("开始主库数据同步分库任务"); logger.info("开始主库数据同步分库任务");
// 获取url路径 // 获取url路径
final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" final StringBuffer url=new StringBuffer();
+ syncDbInfo.get("database_name"); url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name"));
logger.info("获取分库数据库连接信息"+url); logger.info("获取分库数据库连接信息"+url);
List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ", List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ",
syncDbInfo.get("id")); syncDbInfo.get("id"));
@@ -64,10 +65,10 @@ public class SyncThread implements Runnable {
Db.use().tx(new IAtom() { Db.use().tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
return Db.use(url).tx(new IAtom() { return Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
Db.use(url).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
@@ -98,7 +99,7 @@ public class SyncThread implements Runnable {
Db.use().tx(new IAtom() { Db.use().tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
return Db.use(url).tx(new IAtom() { return Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
List<Integer> insertIds = new ArrayList<Integer>(); List<Integer> insertIds = new ArrayList<Integer>();
@@ -116,13 +117,13 @@ public class SyncThread implements Runnable {
+ record.getStr("id_name") + " in (" + insertStr + ") ", + record.getStr("id_name") + " in (" + insertStr + ") ",
insertIds.toArray()); insertIds.toArray());
for(Record insertData:insertDatas){ for(Record insertData:insertDatas){
Record seqData = Db.use(url).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
if(record.getStr("table_name").equals("event_record_library")) { if(record.getStr("table_name").equals("event_record_library")) {
insertData.set("sync_status",1); insertData.set("sync_status",1);
} }
} }
Db.use(url).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get("id"); Object lastInsertId = data.get(data.size() - 1).get("id");
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
@@ -153,7 +154,7 @@ public class SyncThread implements Runnable {
Db.use().tx(new IAtom() { Db.use().tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
return Db.use(url).tx(new IAtom() { return Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
List<Integer> updateIds = new ArrayList<Integer>(); List<Integer> updateIds = new ArrayList<Integer>();
@@ -174,12 +175,12 @@ public class SyncThread implements Runnable {
updateIds.toArray()); updateIds.toArray());
//logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas)); //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) { if (updateDatas != null && updateDatas.size() > 0) {
Db.use(url).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size")); updateDatas, record.getInt("batch_size"));
} }
logger.info("分库对主库修改操作的数据同步任务完成"); logger.info("分库对主库修改操作的数据同步任务完成");
} else if (record.getInt("event") == 3) { } else if (record.getInt("event") == 3) {
Db.use(url).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" Db.use(url.toString()).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
+ deleteStr + ") ", updateIds.toArray()); + deleteStr + ") ", updateIds.toArray());
logger.info("分库对主库删除操作的数据同步完成"); logger.info("分库对主库删除操作的数据同步完成");
} }