197 lines
8.7 KiB
Java
197 lines
8.7 KiB
Java
package com.nms.thread;
|
|
|
|
import java.sql.SQLException;
|
|
import java.util.ArrayList;
|
|
import java.util.Date;
|
|
import java.util.List;
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.jfinal.aop.Before;
|
|
import com.jfinal.plugin.activerecord.Db;
|
|
import com.jfinal.plugin.activerecord.IAtom;
|
|
import com.jfinal.plugin.activerecord.Record;
|
|
import com.jfinal.plugin.activerecord.tx.Tx;
|
|
import com.nms.model.SyncDbInfo;
|
|
|
|
@Before({Tx.class})
|
|
public class SyncSlaveToMasterThread implements Runnable{
|
|
private Logger logger = Logger.getLogger(this.getClass());
|
|
private SyncDbInfo syncDbInfo;
|
|
|
|
public SyncSlaveToMasterThread(SyncDbInfo syncDbInfo) {
|
|
this.syncDbInfo = syncDbInfo;
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
try {
|
|
// 主库向分库同步数据
|
|
logger.info("开始分库数据同步主库");
|
|
// 获取url路径
|
|
final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
|
+ syncDbInfo.get("database_name");
|
|
logger.info("当前分库数据库连接为"+url);
|
|
List<Record> find = Db.use(url).find("select * from table_sync_info");
|
|
logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
|
|
if (find != null && find.size() > 0) {
|
|
for (final Record record : find) {
|
|
// 循环同步数据标识
|
|
boolean flag = true;
|
|
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
|
if (record.getInt("event") == 1) {
|
|
if(record.getInt("mode").equals(1)){
|
|
while (flag) {
|
|
// 新增操作 取出最后更新id信息 查询增量数据
|
|
final List<Record> data = Db.use(url)
|
|
.find("select * from " + record.getStr("table_name") + " where "
|
|
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"),
|
|
record.getInt("last_id"));
|
|
logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
|
if (data != null && data.size() > 0) {
|
|
Db.use(url).tx(new IAtom() {
|
|
@Override
|
|
public boolean run() throws SQLException {
|
|
return Db.use().tx(new IAtom() {
|
|
@Override
|
|
public boolean run() throws SQLException {
|
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
|
|
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
|
logger.info("主库同步增量更新数据完成");
|
|
Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
|
logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
|
|
record.set("last_id", lastInsertId);
|
|
record.set("last_date", new Date());
|
|
Db.use(url).update("table_sync_info", record);
|
|
return true;
|
|
}
|
|
});
|
|
}
|
|
});
|
|
logger.info("主库同步增量更新数据完成 修改最后同步ID");
|
|
} else {
|
|
flag = false;
|
|
}
|
|
}
|
|
}else if(record.getInt("mode").equals(0)){
|
|
//当数据库表结构主键不是自增时 增量更新的操作步骤
|
|
while (flag) {
|
|
// 新增操作 取出最后更新id信息 查询增量数据
|
|
final List<Record> data =Db.use(url)
|
|
.find("select * from table_event_log where table_name = '" + record.getStr("table_name")
|
|
+ "' and id > " + record.getLong("last_id") + " and event = "
|
|
+ record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
|
|
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
|
if (data != null && data.size() > 0) {
|
|
//多数据源事务 主数据源嵌套子数据源
|
|
Db.use(url).tx(new IAtom() {
|
|
@Override
|
|
public boolean run() throws SQLException {
|
|
return Db.use().tx(new IAtom() {
|
|
@Override
|
|
public boolean run() throws SQLException {
|
|
List<Integer> insertIds = new ArrayList<Integer>();
|
|
StringBuffer insertStr = new StringBuffer();
|
|
for (int i = 0; i < data.size(); i++) {
|
|
insertIds.add(data.get(i).getInt("target_id"));
|
|
if (i == 0) {
|
|
insertStr.append("?");
|
|
} else {
|
|
insertStr.append(",?");
|
|
}
|
|
}
|
|
List<Record> insertDatas = Db.use(url)
|
|
.find(" select * from " + record.getStr("table_name") + " where "
|
|
+ record.getStr("id_name") + " in (" + insertStr + ") ",
|
|
insertIds.toArray());
|
|
for(Record insertData:insertDatas){
|
|
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
|
|
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
|
|
}
|
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
|
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
|
Object lastInsertId = data.get(data.size() - 1).get("id");
|
|
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
|
|
record.set("last_id", lastInsertId);
|
|
record.set("last_date", new Date());
|
|
Db.use(url).update("table_sync_info", record);
|
|
return true;
|
|
}
|
|
});
|
|
}
|
|
});
|
|
logger.info("增量更新数据任务结束");
|
|
} else {
|
|
flag = false;
|
|
}
|
|
}
|
|
}
|
|
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
|
|
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
|
|
while (flag) {
|
|
final List<Record> datas = Db.find(
|
|
" select * from table_event_log where table_name = '" + record.getStr("table_name")
|
|
+ "' and id > " + record.getInt("last_id") + " and event = "
|
|
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
|
|
logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
|
|
if (datas != null && datas.size() > 0) {
|
|
Db.use(url).tx(new IAtom() {
|
|
@Override
|
|
public boolean run() throws SQLException {
|
|
return Db.use().tx(new IAtom() {
|
|
@Override
|
|
public boolean run() throws SQLException {
|
|
List<Integer> updateIds = new ArrayList<Integer>();
|
|
StringBuffer deleteStr = new StringBuffer();
|
|
for (int i = 0; i < datas.size(); i++) {
|
|
updateIds.add(datas.get(i).getInt("target_id"));
|
|
if (i == 0) {
|
|
deleteStr.append("?");
|
|
} else {
|
|
deleteStr.append(",?");
|
|
}
|
|
}
|
|
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
|
|
if (record.getInt("event") == 2) {
|
|
List<Record> updateDatas = Db.use(url)
|
|
.find(" select * from " + record.getStr("table_name") + " where "
|
|
+ record.getStr("id_name") + " in (" + deleteStr + ") ",
|
|
updateIds.toArray());
|
|
logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
|
|
if (updateDatas != null && updateDatas.size() > 0) {
|
|
Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
|
|
updateDatas, record.getInt("batch_size"));
|
|
}
|
|
logger.info("分库同步主库修改数据任务完成");
|
|
} else if (record.getInt("event") == 3) {
|
|
Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in ("
|
|
+ deleteStr + ") ", updateIds.toArray());
|
|
logger.info("分库同步主库删除数据任务完成");
|
|
}
|
|
Object lastUpdateId = datas.get(datas.size() - 1).get("id");
|
|
logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId);
|
|
record.set("last_id", lastUpdateId);
|
|
record.set("last_date", new Date());
|
|
Db.use(url).update("table_sync_info", record);
|
|
return true;
|
|
}
|
|
});
|
|
}
|
|
});
|
|
logger.info("修改分库table_sync_info最后操作数据信息 用于下次同步操作完成");
|
|
} else {
|
|
flag = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
logger.info("分库数据同步主库结束");
|
|
} catch (Exception e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
|
|
}
|