update
This commit is contained in:
@@ -61,15 +61,18 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
final List<Record> data = Db.use(url.toString())
|
final List<Record> data = Db.use(url.toString())
|
||||||
.find("select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
.find("select "+columns.toString()+" from " + record.getStr("table_name") + " where "
|
||||||
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
||||||
record.getInt("last_id"));
|
record.getLong("last_id"));
|
||||||
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
||||||
if (data != null && data.size() > 0) {
|
if (data != null && data.size() > 0) {
|
||||||
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
||||||
|
StringBuffer ssss=new StringBuffer();
|
||||||
if(record.getInt("mode").equals(2)) {
|
if(record.getInt("mode").equals(2)) {
|
||||||
for(Record entity:data) {
|
for(Record entity:data) {
|
||||||
|
ssss.append(entity.get("ID")+",");
|
||||||
entity.remove(record.getStr("id_name"));
|
entity.remove(record.getStr("id_name"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
System.out.println(ssss.toString());
|
||||||
// 针对监测结果表的id值 自动生成处理
|
// 针对监测结果表的id值 自动生成处理
|
||||||
if("detection_info_new".equals(record.getStr("table_name"))) {
|
if("detection_info_new".equals(record.getStr("table_name"))) {
|
||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
@@ -152,10 +155,10 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
return Db.use("masterDataSource").tx(new IAtom() {
|
return Db.use("masterDataSource").tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
List<Integer> insertIds = new ArrayList<Integer>();
|
List<Long> insertIds = new ArrayList<Long>();
|
||||||
StringBuffer insertStr = new StringBuffer();
|
StringBuffer insertStr = new StringBuffer();
|
||||||
for (int i = 0; i < data.size(); i++) {
|
for (int i = 0; i < data.size(); i++) {
|
||||||
insertIds.add(data.get(i).getInt("target_id"));
|
insertIds.add(data.get(i).getLong("target_id"));
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
insertStr.append("?");
|
insertStr.append("?");
|
||||||
} else {
|
} else {
|
||||||
@@ -173,7 +176,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
insertData.set("sync_status",1);
|
insertData.set("sync_status",1);
|
||||||
//设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
|
//设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
|
||||||
insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
|
insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
|
||||||
insertData.set("db_id", syncDbInfo.getInt("id"));
|
insertData.set("db_id", syncDbInfo.getLong("id"));
|
||||||
}
|
}
|
||||||
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
|
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
|
||||||
}
|
}
|
||||||
@@ -200,7 +203,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
while (flag) {
|
while (flag) {
|
||||||
final List<Record> datas = Db.use(url.toString()).find(
|
final List<Record> datas = Db.use(url.toString()).find(
|
||||||
" select * from table_event_log where table_name = '" + record.getStr("table_name")
|
" select * from table_event_log where table_name = '" + record.getStr("table_name")
|
||||||
+ "' and id > " + record.getInt("last_id") + " and event = "
|
+ "' and id > " + record.getLong("last_id") + " and event = "
|
||||||
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
|
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
|
||||||
//logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
|
//logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
|
||||||
if (datas != null && datas.size() > 0) {
|
if (datas != null && datas.size() > 0) {
|
||||||
@@ -210,11 +213,11 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
return Db.use("masterDataSource").tx(new IAtom() {
|
return Db.use("masterDataSource").tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
List<Integer> updateIds = new ArrayList<Integer>();
|
List<Long> updateIds = new ArrayList<Long>();
|
||||||
List<Record> deleteRecords=new ArrayList<Record>();
|
List<Record> deleteRecords=new ArrayList<Record>();
|
||||||
StringBuilder handleStr=new StringBuilder();
|
StringBuilder handleStr=new StringBuilder();
|
||||||
for (int i = 0; i < datas.size(); i++) {
|
for (int i = 0; i < datas.size(); i++) {
|
||||||
updateIds.add(datas.get(i).getInt("target_id"));
|
updateIds.add(datas.get(i).getLong("target_id"));
|
||||||
if(i==0) {
|
if(i==0) {
|
||||||
handleStr.append("?");
|
handleStr.append("?");
|
||||||
}else {
|
}else {
|
||||||
@@ -252,9 +255,9 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
} else if (record.getInt("event") == 3) {
|
} else if (record.getInt("event") == 3) {
|
||||||
for (int i = 0; i < datas.size(); i++) {
|
for (int i = 0; i < datas.size(); i++) {
|
||||||
Record deleteRecord=new Record();
|
Record deleteRecord=new Record();
|
||||||
deleteRecord.set(record.getStr("id_name"), datas.get(i).getInt("target_id"));
|
deleteRecord.set(record.getStr("id_name"), datas.get(i).getLong("target_id"));
|
||||||
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
|
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
|
||||||
deleteRecord.set("old_id", datas.get(i).getInt("target_id"));
|
deleteRecord.set("old_id", datas.get(i).getLong("target_id"));
|
||||||
deleteRecord.set("db_id", syncDbInfo.get("id"));
|
deleteRecord.set("db_id", syncDbInfo.get("id"));
|
||||||
deleteRecords.add(deleteRecord);
|
deleteRecords.add(deleteRecord);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,10 +114,10 @@ public class SyncThread implements Runnable {
|
|||||||
return Db.use(url.toString()).tx(new IAtom() {
|
return Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
List<Integer> insertIds = new ArrayList<Integer>();
|
List<Long> insertIds = new ArrayList<Long>();
|
||||||
StringBuffer insertStr = new StringBuffer();
|
StringBuffer insertStr = new StringBuffer();
|
||||||
for (int i = 0; i < data.size(); i++) {
|
for (int i = 0; i < data.size(); i++) {
|
||||||
insertIds.add(data.get(i).getInt("target_id"));
|
insertIds.add(data.get(i).getLong("target_id"));
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
insertStr.append("?");
|
insertStr.append("?");
|
||||||
} else {
|
} else {
|
||||||
@@ -174,11 +174,11 @@ public class SyncThread implements Runnable {
|
|||||||
return Db.use(url.toString()).tx(new IAtom() {
|
return Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
List<Integer> updateIds = new ArrayList<Integer>();
|
List<Long> updateIds = new ArrayList<Long>();
|
||||||
List<Record> deleteRecords=new ArrayList<Record>();
|
List<Record> deleteRecords=new ArrayList<Record>();
|
||||||
StringBuilder handleStr=new StringBuilder();
|
StringBuilder handleStr=new StringBuilder();
|
||||||
for (int i = 0; i < datas.size(); i++) {
|
for (int i = 0; i < datas.size(); i++) {
|
||||||
updateIds.add(datas.get(i).getInt("target_id"));
|
updateIds.add(datas.get(i).getLong("target_id"));
|
||||||
if(i==0) {
|
if(i==0) {
|
||||||
handleStr.append("?");
|
handleStr.append("?");
|
||||||
}else {
|
}else {
|
||||||
@@ -210,9 +210,9 @@ public class SyncThread implements Runnable {
|
|||||||
} else if (record.getInt("event") == 3) {
|
} else if (record.getInt("event") == 3) {
|
||||||
for (int i = 0; i < datas.size(); i++) {
|
for (int i = 0; i < datas.size(); i++) {
|
||||||
Record deleteRecord=new Record();
|
Record deleteRecord=new Record();
|
||||||
deleteRecord.set(record.getStr("id_name"), datas.get(i).getInt("target_id"));
|
deleteRecord.set(record.getStr("id_name"), datas.get(i).getLong("target_id"));
|
||||||
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
|
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
|
||||||
deleteRecord.set("old_id", datas.get(i).getInt("target_id"));
|
deleteRecord.set("old_id", datas.get(i).getLong("target_id"));
|
||||||
deleteRecord.set("db_id", -1);
|
deleteRecord.set("db_id", -1);
|
||||||
deleteRecords.add(deleteRecord);
|
deleteRecords.add(deleteRecord);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user