This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
nms-nmssync/nms_sync/src/com/nms/thread/SyncThread.java
default 7082f8ea0c update
2018-10-26 10:54:47 +08:00

262 lines
12 KiB
Java

package com.nms.thread;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Before;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.Record;
import com.nms.interceptor.SyncDataInterceptor;
import com.nms.model.SyncDbInfo;
import com.jfinal.plugin.activerecord.tx.Tx;
/**
* 数据同步功能线程
*
* @author Administrator
*
*/
@Before({SyncDataInterceptor.class,Tx.class})
public class SyncThread implements Runnable {
private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo;
public SyncThread() {
super();
}
public SyncThread(SyncDbInfo syncDbInfo) {
super();
this.syncDbInfo = syncDbInfo;
}
@Override
public void run() {
try {
logger.info("开始主库数据同步分库任务");
// 获取url路径
final StringBuffer url=new StringBuffer();
url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true");
logger.info("获取分库数据库连接信息"+url);
List<Record> find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ",
syncDbInfo.get("id"));
//logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name"));
//如果设定指定字段 则只操作指定字段数据 无则操作全部
final StringBuffer columns=new StringBuffer();
columns.append("*");
if(null!=record.getStr("columns")&&!"".equals(record.getStr("columns"))) {
columns.setLength(0);
columns.append(record.getStr("columns"));
}
// 循环同步数据标识
boolean flag = true;
// 判断表中的event事件 1代表insert 2代表update 3代表delete
if (record.getInt("event") == 1) {
//根据mode判断主键产生方式
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
while (flag) {
// 查询增量数据
final List<Record> data =Db.use("masterDataSource")
.find("select " +columns.toString()+ " from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"),
record.getLong("last_id"));
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
if (data != null && data.size() > 0) {
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
if(record.getInt("mode").equals(2)) {
for(Record entity:data) {
entity.remove(record.getStr("id_name"));
}
}
//多数据源事务 主数据源嵌套子数据源
Db.use().tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use(url.toString()).tx(new IAtom() {
@Override
public boolean run() throws SQLException {
Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use("masterDataSource").update("table_sync_info", record);
return true;
}
});
}
});
logger.info("增量更新数据任务结束");
} else {
flag = false;
}
}
}else if(record.getInt("mode").equals(0)){
//当数据库表结构主键不是自增时 增量更新的操作步骤
while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据
final List<Record> data =Db.use("masterDataSource")
.find("select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
if (data != null && data.size() > 0) {
//多数据源事务 主数据源嵌套子数据源
Db.use().tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use(url.toString()).tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> insertIds = new ArrayList<Long>();
StringBuffer insertStr = new StringBuffer();
for (int i = 0; i < data.size(); i++) {
insertIds.add(data.get(i).getLong("target_id"));
if (i == 0) {
insertStr.append("?");
} else {
insertStr.append(",?");
}
}
List<Record> insertDatas = Db.use("masterDataSource")
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + insertStr + ") ",
insertIds.toArray());
for(Record insertData:insertDatas){
Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
if(record.getStr("table_name").equals("event_record_library")) {
//设置数据状态为同步数据
insertData.set("sync_status",1);
//设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库
insertData.set("old_id",insertData.getLong(record.getStr("id_name")));
insertData.set("db_id", -1);
}
insertData.set(record.getStr("id_name"), seqData.getLong("seqId"));
}
Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get("id");
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
Db.use("masterDataSource").update("table_sync_info", record);
return true;
}
});
}
});
logger.info("增量更新数据任务结束");
} else {
flag = false;
}
}
}
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
while (flag) {
final List<Record> datas = Db.find(
" select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit "+record.getInt("batch_size"));
//logger.info("获取主库删除或者修改数据的数据信息"+JSON.toJSONString(datas));
if (datas != null && datas.size() > 0) {
//多数据源事务 主数据源嵌套子数据源
Db.use().tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return Db.use(url.toString()).tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> updateIds = new ArrayList<Long>();
List<Record> deleteRecords=new ArrayList<Record>();
StringBuilder handleStr=new StringBuilder();
for (int i = 0; i < datas.size(); i++) {
updateIds.add(datas.get(i).getLong("target_id"));
if(i==0) {
handleStr.append("?");
}else {
handleStr.append(",?");
}
}
logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) {
List<Record> updateDatas = Db.use("masterDataSource")
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + handleStr + ") ",
updateIds.toArray());
//logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) {
if(record.getStr("table_name").equals("event_record_library")) {
for(Record updateData:updateDatas) {
updateData.set("old_id",updateData.getLong(record.getStr("id_name")));
updateData.set("db_id", -1);
updateData.remove(record.getStr("id_name"));
updateData.remove("sync_status");
}
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size"));
}else {
Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
}
logger.info("分库对主库修改操作的数据同步任务完成");
} else if (record.getInt("event") == 3) {
for (int i = 0; i < datas.size(); i++) {
Record deleteRecord=new Record();
deleteRecord.set(record.getStr("id_name"), datas.get(i).getLong("target_id"));
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
deleteRecord.set("old_id", datas.get(i).getLong("target_id"));
deleteRecord.set("db_id", -1);
deleteRecords.add(deleteRecord);
}
if(record.getStr("table_name").equals("event_record_library")) {
Db.use(url.toString()).batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size"));
}else {
Db.use(url.toString()).batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
}
logger.info("分库对主库删除操作的数据同步完成");
}
Object lastUpdateId = datas.get(datas.size() - 1).get("id");
logger.info("获取最后一次修改或者删除操作的数据ID信息"+JSON.toJSONString(lastUpdateId));
record.set("last_id", lastUpdateId);
record.set("last_date", new Date());
Db.use("masterDataSource").update("table_sync_info", record);
logger.info("修改table_sync_info记录结果 用于下次同步完成");
return true;
}
});
}
});
} else {
flag = false;
}
}
}
}
}
logger.info("主库数据同步分库结束");
logger.info("*****************************************************");
} catch (Exception e) {
logger.error("主库数据同步分库发生错误 异常信息为:"+e.getMessage());
e.printStackTrace();
}
}
public SyncDbInfo getSyncDbInfo() {
return syncDbInfo;
}
public void setSyncDbInfo(SyncDbInfo syncDbInfo) {
this.syncDbInfo = syncDbInfo;
}
}