package com.nms.thread; import java.sql.SQLException; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.log4j.Logger; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Before; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.IAtom; import com.jfinal.plugin.activerecord.Record; import com.nms.interceptor.SyncDataInterceptor; import com.nms.model.SyncDbInfo; import com.jfinal.plugin.activerecord.tx.Tx; /** * 数据同步功能线程 * * @author Administrator * */ @Before({SyncDataInterceptor.class,Tx.class}) public class SyncThread implements Runnable { private Logger logger = Logger.getLogger(this.getClass()); private SyncDbInfo syncDbInfo; public SyncThread() { super(); } public SyncThread(SyncDbInfo syncDbInfo) { super(); this.syncDbInfo = syncDbInfo; } @Override public void run() { try { logger.info("开始主库数据同步分库任务"); // 获取url路径 final StringBuffer url=new StringBuffer(); url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"); logger.info("获取分库数据库连接信息"+url); List find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ", syncDbInfo.get("id")); //logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { // 循环同步数据标识 boolean flag = true; // 判断表中的event事件 1代表insert 2代表update 3代表delete if (record.getInt("event") == 1) { //根据mode判断主键产生方式 if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){ while (flag) { // 查询增量数据 final List data =Db.use("masterDataSource") .find("select * from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"), record.getLong("last_id")); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); if(record.getInt("mode").equals(2)) { for(Record entity:data) { entity.remove(record.getStr("id_name")); } } //多数据源事务 主数据源嵌套子数据源 Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { return Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { Db.use(url.toString()).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); Db.use("masterDataSource").update("table_sync_info", record); return true; } }); } }); logger.info("增量更新数据任务结束"); } else { flag = false; } } }else if(record.getInt("mode").equals(0)){ //当数据库表结构主键不是自增时 增量更新的操作步骤 while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 final List data =Db.use("masterDataSource") .find("select * from table_event_log where table_name = '" + record.getStr("table_name") + "' and id > " + record.getLong("last_id") + " and event = " + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { //多数据源事务 主数据源嵌套子数据源 Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { return Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { List insertIds = new ArrayList(); StringBuffer insertStr = new StringBuffer(); for (int i = 0; i < data.size(); i++) { insertIds.add(data.get(i).getInt("target_id")); if (i == 0) { insertStr.append("?"); } else { insertStr.append(",?"); } } List insertDatas = Db.use("masterDataSource") .find(" select * from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + insertStr + ") ", insertIds.toArray()); for(Record insertData:insertDatas){ Record seqData = Db.use(url.toString()).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); if(record.getStr("table_name").equals("event_record_library")) { //设置数据状态为同步数据 insertData.set("sync_status",1); //设置同步数据所在数据库的id以及所在原来表的id 用于修改和删除 -1为中心或主库 insertData.set("old_id",insertData.getLong(record.getStr("id_name"))); insertData.set("db_id", -1); } insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); } Db.use(url.toString()).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get("id"); logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); Db.use("masterDataSource").update("table_sync_info", record); return true; } }); } }); logger.info("增量更新数据任务结束"); } else { flag = false; } } } } else if (record.getInt("event") == 2 || record.getInt("event") == 3) { // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 while (flag) { final List datas = Db.find( " select * from table_event_log where table_name = '" + record.getStr("table_name") + "' and id > " + record.getLong("last_id") + " and event = " + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); //logger.info("获取主库删除或者修改数据的数据信息"+JSON.toJSONString(datas)); if (datas != null && datas.size() > 0) { //多数据源事务 主数据源嵌套子数据源 Db.use().tx(new IAtom() { @Override public boolean run() throws SQLException { return Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); List deleteRecords=new ArrayList(); StringBuilder handleStr=new StringBuilder(); for (int i = 0; i < datas.size(); i++) { updateIds.add(datas.get(i).getInt("target_id")); if(i==0) { handleStr.append("?"); }else { handleStr.append(",?"); } } logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds)); if (record.getInt("event") == 2) { List updateDatas = Db.use("masterDataSource") .find(" select * from " + record.getStr("table_name") + " where " + record.getStr("id_name") + " in (" + handleStr + ") ", updateIds.toArray()); //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { if(record.getStr("table_name").equals("event_record_library")) { for(Record updateData:updateDatas) { updateData.set("old_id",updateData.getLong(record.getStr("id_name"))); updateData.set("db_id", -1); updateData.remove("id"); updateData.remove("sync_status"); } Db.use(url.toString()).batchUpdate(record.getStr("table_name"), "old_id,db_id",updateDatas, record.getInt("batch_size")); }else { Db.use(url.toString()).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } } logger.info("分库对主库修改操作的数据同步任务完成"); } else if (record.getInt("event") == 3) { for (int i = 0; i < datas.size(); i++) { Record deleteRecord=new Record(); deleteRecord.set(record.getStr("id_name"), datas.get(i).getInt("target_id")); //如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义 deleteRecord.set("old_id", datas.get(i).getInt("target_id")); deleteRecord.set("db_id", -1); deleteRecords.add(deleteRecord); } if(record.getStr("table_name").equals("event_record_library")) { Db.use(url.toString()).batch("delete from event_record_library where old_id=? and db_id=?","old_id,db_id",deleteRecords,record.getInt("batch_size")); }else { Db.use(url.toString()).batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size")); } logger.info("分库对主库删除操作的数据同步完成"); } Object lastUpdateId = datas.get(datas.size() - 1).get("id"); logger.info("获取最后一次修改或者删除操作的数据ID信息"+JSON.toJSONString(lastUpdateId)); record.set("last_id", lastUpdateId); record.set("last_date", new Date()); Db.use("masterDataSource").update("table_sync_info", record); logger.info("修改table_sync_info记录结果 用于下次同步完成"); return true; } }); } }); } else { flag = false; } } } } } logger.info("主库数据同步分库结束"); logger.info("*****************************************************"); } catch (Exception e) { e.printStackTrace(); } } public SyncDbInfo getSyncDbInfo() { return syncDbInfo; } public void setSyncDbInfo(SyncDbInfo syncDbInfo) { this.syncDbInfo = syncDbInfo; } }