update 指定字段名新增或者修改
This commit is contained in:
@@ -43,6 +43,13 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
//logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
|
//logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
|
||||||
if (find != null && find.size() > 0) {
|
if (find != null && find.size() > 0) {
|
||||||
for (final Record record : find) {
|
for (final Record record : find) {
|
||||||
|
//如果设定指定字段 则只操作指定字段数据 无则操作全部
|
||||||
|
final StringBuffer columns=new StringBuffer();
|
||||||
|
columns.append("*");
|
||||||
|
if(null!=record.getStr("columns")&&!"".equals(record.getStr("columns"))) {
|
||||||
|
columns.setLength(0);
|
||||||
|
columns.append(record.getStr("columns"));
|
||||||
|
}
|
||||||
// 循环同步数据标识
|
// 循环同步数据标识
|
||||||
boolean flag = true;
|
boolean flag = true;
|
||||||
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
||||||
@@ -51,7 +58,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
while (flag) {
|
while (flag) {
|
||||||
// 新增操作 取出最后更新id信息 查询增量数据
|
// 新增操作 取出最后更新id信息 查询增量数据
|
||||||
final List<Record> data = Db.use(url.toString())
|
final List<Record> data = Db.use(url.toString())
|
||||||
.find("select * from " + record.getStr("table_name") + " where "
|
.find("select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
||||||
record.getInt("last_id"));
|
record.getInt("last_id"));
|
||||||
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
||||||
@@ -122,7 +129,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<Record> insertDatas = Db.use(url.toString())
|
List<Record> insertDatas = Db.use(url.toString())
|
||||||
.find(" select * from " + record.getStr("table_name") + " where "
|
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
||||||
insertIds.toArray());
|
insertIds.toArray());
|
||||||
for(Record insertData:insertDatas){
|
for(Record insertData:insertDatas){
|
||||||
@@ -183,7 +190,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
|
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
|
||||||
if (record.getInt("event") == 2) {
|
if (record.getInt("event") == 2) {
|
||||||
List<Record> updateDatas = Db.use(url.toString())
|
List<Record> updateDatas = Db.use(url.toString())
|
||||||
.find(" select * from " + record.getStr("table_name") + " where "
|
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " in (" + handleStr + ") ",
|
+ record.getStr("id_name") + " in (" + handleStr + ") ",
|
||||||
updateIds.toArray());
|
updateIds.toArray());
|
||||||
//logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
|
//logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
|
||||||
|
|||||||
@@ -47,6 +47,13 @@ public class SyncThread implements Runnable {
|
|||||||
//logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
|
//logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
|
||||||
if (find != null && find.size() > 0) {
|
if (find != null && find.size() > 0) {
|
||||||
for (final Record record : find) {
|
for (final Record record : find) {
|
||||||
|
//如果设定指定字段 则只操作指定字段数据 无则操作全部
|
||||||
|
final StringBuffer columns=new StringBuffer();
|
||||||
|
columns.append("*");
|
||||||
|
if(null!=record.getStr("columns")&&!"".equals(record.getStr("columns"))) {
|
||||||
|
columns.setLength(0);
|
||||||
|
columns.append(record.getStr("columns"));
|
||||||
|
}
|
||||||
// 循环同步数据标识
|
// 循环同步数据标识
|
||||||
boolean flag = true;
|
boolean flag = true;
|
||||||
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
||||||
@@ -56,7 +63,7 @@ public class SyncThread implements Runnable {
|
|||||||
while (flag) {
|
while (flag) {
|
||||||
// 查询增量数据
|
// 查询增量数据
|
||||||
final List<Record> data =Db.use("masterDataSource")
|
final List<Record> data =Db.use("masterDataSource")
|
||||||
.find("select * from " + record.getStr("table_name") + " where "
|
.find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
|
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
|
||||||
record.getLong("last_id"));
|
record.getLong("last_id"));
|
||||||
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
||||||
@@ -118,7 +125,7 @@ public class SyncThread implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<Record> insertDatas = Db.use("masterDataSource")
|
List<Record> insertDatas = Db.use("masterDataSource")
|
||||||
.find(" select * from " + record.getStr("table_name") + " where "
|
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
||||||
insertIds.toArray());
|
insertIds.toArray());
|
||||||
for(Record insertData:insertDatas){
|
for(Record insertData:insertDatas){
|
||||||
@@ -151,6 +158,7 @@ public class SyncThread implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
|
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
|
||||||
|
|
||||||
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
|
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
|
||||||
while (flag) {
|
while (flag) {
|
||||||
final List<Record> datas = Db.find(
|
final List<Record> datas = Db.find(
|
||||||
@@ -180,7 +188,7 @@ public class SyncThread implements Runnable {
|
|||||||
logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds));
|
logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds));
|
||||||
if (record.getInt("event") == 2) {
|
if (record.getInt("event") == 2) {
|
||||||
List<Record> updateDatas = Db.use("masterDataSource")
|
List<Record> updateDatas = Db.use("masterDataSource")
|
||||||
.find(" select * from " + record.getStr("table_name") + " where "
|
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " in (" + handleStr + ") ",
|
+ record.getStr("id_name") + " in (" + handleStr + ") ",
|
||||||
updateIds.toArray());
|
updateIds.toArray());
|
||||||
//logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));
|
//logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));
|
||||||
|
|||||||
Reference in New Issue
Block a user