修复同步主键自增冲突bug

This commit is contained in:
default
2018-10-15 09:53:52 +08:00
parent 789b033b12
commit 039a8df261
3 changed files with 15 additions and 4 deletions

View File

@@ -57,8 +57,8 @@ public class SyncData{
scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
// 分库向主库同步数据 // 分库向主库同步数据
logger.info("创建分库数据同步到主库线程执行任务"); logger.info("创建分库数据同步到主库线程执行任务");
//SyncThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo)); SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));
//scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
} }
}else{ }else{
logger.info("获取同步记录信息失败 请检查数据库数据信息"); logger.info("获取同步记录信息失败 请检查数据库数据信息");

View File

@@ -20,7 +20,12 @@ public class SyncSlaveToMasterThread implements Runnable{
private Logger logger = Logger.getLogger(this.getClass()); private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo; private SyncDbInfo syncDbInfo;
public SyncSlaveToMasterThread() {
super();
}
public SyncSlaveToMasterThread(SyncDbInfo syncDbInfo) { public SyncSlaveToMasterThread(SyncDbInfo syncDbInfo) {
super();
this.syncDbInfo = syncDbInfo; this.syncDbInfo = syncDbInfo;
} }
@@ -51,6 +56,10 @@ public class SyncSlaveToMasterThread implements Runnable{
record.getInt("last_id")); record.getInt("last_id"));
logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
if (data != null && data.size() > 0) { if (data != null && data.size() > 0) {
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
for(Record entity:data) {
entity.remove(record.getStr("id_name"));
}
Db.use(url.toString()).tx(new IAtom() { Db.use(url.toString()).tx(new IAtom() {
@Override @Override
public boolean run() throws SQLException { public boolean run() throws SQLException {
@@ -60,7 +69,6 @@ public class SyncSlaveToMasterThread implements Runnable{
Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
logger.info("主库同步增量更新数据完成"); logger.info("主库同步增量更新数据完成");
Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId); record.set("last_id", lastInsertId);
record.set("last_date", new Date()); record.set("last_date", new Date());

View File

@@ -61,6 +61,10 @@ public class SyncThread implements Runnable {
record.getLong("last_id")); record.getLong("last_id"));
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
if (data != null && data.size() > 0) { if (data != null && data.size() > 0) {
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
for(Record entity:data) {
entity.remove(record.getStr("id_name"));
}
//多数据源事务 主数据源嵌套子数据源 //多数据源事务 主数据源嵌套子数据源
Db.use().tx(new IAtom() { Db.use().tx(new IAtom() {
@Override @Override
@@ -70,7 +74,6 @@ public class SyncThread implements Runnable {
public boolean run() throws SQLException { public boolean run() throws SQLException {
Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId); record.set("last_id", lastInsertId);
record.set("last_date", new Date()); record.set("last_date", new Date());