This commit is contained in:
default
2018-10-17 09:47:57 +08:00
parent 79ab895191
commit ad5fcec365
3 changed files with 6 additions and 4 deletions

View File

@@ -54,7 +54,7 @@ public class SyncData{
// 主库向分库同步数据 // 主库向分库同步数据
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
logger.info("创建主库同步分库线程执行任务"); logger.info("创建主库同步分库线程执行任务");
//scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
// 分库向主库同步数据 // 分库向主库同步数据
logger.info("创建分库数据同步到主库线程执行任务"); logger.info("创建分库数据同步到主库线程执行任务");
SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo)); SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));

View File

@@ -157,7 +157,7 @@ public class SyncSlaveToMasterThread implements Runnable{
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) { } else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
while (flag) { while (flag) {
final List<Record> datas = Db.find( final List<Record> datas = Db.use(url.toString()).find(
" select * from table_event_log where table_name = '" + record.getStr("table_name") " select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getInt("last_id") + " and event = " + "' and id > " + record.getInt("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size")); + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
@@ -193,6 +193,7 @@ public class SyncSlaveToMasterThread implements Runnable{
updateData.set("old_id",updateData.getLong("id")); updateData.set("old_id",updateData.getLong("id"));
updateData.set("db_id", syncDbInfo.get("id")); updateData.set("db_id", syncDbInfo.get("id"));
updateData.remove("id"); updateData.remove("id");
updateData.remove("sync_status");
} }
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
}else { }else {
@@ -207,7 +208,7 @@ public class SyncSlaveToMasterThread implements Runnable{
deleteRecord.set(record.getStr("id_name"), datas.get(i).getInt("target_id")); deleteRecord.set(record.getStr("id_name"), datas.get(i).getInt("target_id"));
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义 //如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
deleteRecord.set("old_id", datas.get(i).getInt("target_id")); deleteRecord.set("old_id", datas.get(i).getInt("target_id"));
deleteRecord.set("db_id", -1); deleteRecord.set("db_id", syncDbInfo.get("id"));
deleteRecords.add(deleteRecord); deleteRecords.add(deleteRecord);
} }
if(record.getStr("table_name").equals("event_record_library")) { if(record.getStr("table_name").equals("event_record_library")) {

View File

@@ -40,7 +40,7 @@ public class SyncThread implements Runnable {
// 获取url路径 // 获取url路径
final StringBuffer url=new StringBuffer(); final StringBuffer url=new StringBuffer();
url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")); + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true");
logger.info("获取分库数据库连接信息"+url); logger.info("获取分库数据库连接信息"+url);
List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ", List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ",
syncDbInfo.get("id")); syncDbInfo.get("id"));
@@ -190,6 +190,7 @@ public class SyncThread implements Runnable {
updateData.set("old_id",updateData.getLong("id")); updateData.set("old_id",updateData.getLong("id"));
updateData.set("db_id", -1); updateData.set("db_id", -1);
updateData.remove("id"); updateData.remove("id");
updateData.remove("sync_status");
} }
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
}else { }else {