修复自动注册问题
This commit is contained in:
@@ -48,6 +48,11 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
for (final Record record : find) {
|
for (final Record record : find) {
|
||||||
//logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name"));
|
//logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name"));
|
||||||
errorTableName=record.getStr("table_name");
|
errorTableName=record.getStr("table_name");
|
||||||
|
//针对个别特殊表动态条件
|
||||||
|
String special="";
|
||||||
|
if(record.getStr("table_name").equals("node_table")) {
|
||||||
|
special=" and sync_status!=1 ";
|
||||||
|
}
|
||||||
//如果设定指定字段 则只操作指定字段数据 无则操作全部
|
//如果设定指定字段 则只操作指定字段数据 无则操作全部
|
||||||
final StringBuffer columns=new StringBuffer();
|
final StringBuffer columns=new StringBuffer();
|
||||||
columns.append("*");
|
columns.append("*");
|
||||||
@@ -64,7 +69,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
// 新增操作 取出最后更新id信息 查询增量数据
|
// 新增操作 取出最后更新id信息 查询增量数据
|
||||||
final List<Record> data = Db.use(url.toString())
|
final List<Record> data = Db.use(url.toString())
|
||||||
.find("select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
.find("select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
+ record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
||||||
record.getLong("last_id"));
|
record.getLong("last_id"));
|
||||||
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
||||||
if (data != null && data.size() > 0) {
|
if (data != null && data.size() > 0) {
|
||||||
@@ -81,6 +86,12 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
entity.remove(record.getStr("id_name"));
|
entity.remove(record.getStr("id_name"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(record.getStr("table_name").equals("node_table")) {
|
||||||
|
for(Record entity:data) {
|
||||||
|
entity.set("sync_status",1);
|
||||||
|
}
|
||||||
|
}
|
||||||
// 针对监测结果表的id值 自动生成处理
|
// 针对监测结果表的id值 自动生成处理
|
||||||
if("detection_info_new".equals(record.getStr("table_name"))) {
|
if("detection_info_new".equals(record.getStr("table_name"))) {
|
||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
|
|||||||
@@ -51,6 +51,11 @@ public class SyncThread implements Runnable {
|
|||||||
for (final Record record : find) {
|
for (final Record record : find) {
|
||||||
//logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name"));
|
//logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name"));
|
||||||
errorTableName=record.getStr("table_name");
|
errorTableName=record.getStr("table_name");
|
||||||
|
//针对个别特殊表动态条件
|
||||||
|
String special="";
|
||||||
|
if(record.getStr("table_name").equals("node_table")) {
|
||||||
|
special=" and sync_status!=1 ";
|
||||||
|
}
|
||||||
//如果设定指定字段 则只操作指定字段数据 无则操作全部
|
//如果设定指定字段 则只操作指定字段数据 无则操作全部
|
||||||
final StringBuffer columns=new StringBuffer();
|
final StringBuffer columns=new StringBuffer();
|
||||||
columns.append("*");
|
columns.append("*");
|
||||||
@@ -68,7 +73,7 @@ public class SyncThread implements Runnable {
|
|||||||
// 查询增量数据
|
// 查询增量数据
|
||||||
final List<Record> data =Db.use("masterDataSource")
|
final List<Record> data =Db.use("masterDataSource")
|
||||||
.find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where "
|
.find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
|
+ record.getStr("id_name") + " > ? "+special+" order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
|
||||||
record.getLong("last_id"));
|
record.getLong("last_id"));
|
||||||
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
||||||
if (data != null && data.size() > 0) {
|
if (data != null && data.size() > 0) {
|
||||||
@@ -78,6 +83,11 @@ public class SyncThread implements Runnable {
|
|||||||
entity.remove(record.getStr("id_name"));
|
entity.remove(record.getStr("id_name"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if(record.getStr("table_name").equals("node_table")) {
|
||||||
|
for(Record entity:data) {
|
||||||
|
entity.set("sync_status",1);
|
||||||
|
}
|
||||||
|
}
|
||||||
//多数据源事务 主数据源嵌套子数据源
|
//多数据源事务 主数据源嵌套子数据源
|
||||||
Db.use().tx(new IAtom() {
|
Db.use().tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
@@ -206,6 +216,18 @@ public class SyncThread implements Runnable {
|
|||||||
}
|
}
|
||||||
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
|
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
|
||||||
}else {
|
}else {
|
||||||
|
if(record.getStr("table_name").equals("node_table")) {
|
||||||
|
for(Record updateData:updateDatas) {
|
||||||
|
updateData.remove("sync_status");
|
||||||
|
}
|
||||||
|
}else if(record.getStr("table_name").equals("mission_state_table")) {
|
||||||
|
// 主库同步任务状态表 如果状态修改不是周期停用的话 将状态字段的修改过滤
|
||||||
|
for(Record updateData:updateDatas) {
|
||||||
|
if(!(updateData.getInt("mission_state").equals(5)||updateData.getInt("mission_state").equals(1))) {
|
||||||
|
updateData.remove("mission_state");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
||||||
updateDatas, record.getInt("batch_size"));
|
updateDatas, record.getInt("batch_size"));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user