1、主库向从库同步和从库向主库同步分别一个线程池

2、程序启东时,new表全量更新
3、修改log日志信息
This commit is contained in:
fangshunjian
2018-12-21 21:49:35 +06:00
parent e61cf4796d
commit af4f0910ba
7 changed files with 353 additions and 70 deletions

View File

@@ -1,4 +1,8 @@
#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee
syncMaterToSlaveTime=30000
#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee
syncSlaveToMaterTime=60000
syncSlaveToMaterTime=60000
#从库向主库同步的线程池数量
slave.to.master.pool.num=3
#主库向从库同步的线程池数量
master.to.slave.pool.num=2

View File

@@ -0,0 +1,30 @@
#检测数据库链接是否有效,必须配置
druid.validationQuery=SELECT 1 from dual
#初始连接数
druid.initialSize=1
#最大连接池数量
druid.maxActive=3
#去掉,配置文件对应去掉
#druid.maxIdle=20
#配置0,当线程池数量不足,自动补充。
druid.minIdle=1
#获取链接超时时间为1分钟单位为毫秒。
druid.maxWait=60000
#获取链接的时候,不校验是否可用,开启会有损性能。
druid.testOnBorrow=false
#归还链接到连接池的时候校验链接是否可用。
druid.testOnReturn=false
#此项配置为true即可不影响性能并且保证安全性。意义为申请连接的时候检测如果空闲时间大于timeBetweenEvictionRunsMillis执行validationQuery检测连接是否有效。
druid.testWhileIdle=true
#1.Destroy线程会检测连接的间隔时间
#2.testWhileIdle的判断依据
druid.timeBetweenEvictionRunsMillis=600000
#一个链接生存的时间
druid.minEvictableIdleTimeMillis=600000
#链接使用超过时间限制是否回收
druid.removeAbandoned=false
#超过时间限制时间单位秒目前为5分钟如果有业务处理时间超过5分钟可以适当调整。
druid.removeAbandonedTimeout=300
#链接回收的时候控制台打印信息测试环境可以加上true线上环境false。会影响性能。
druid.logAbandoned=false
druid.filters=

View File

@@ -32,10 +32,12 @@ public class Conn {
druid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
druid.setRemoveAbandoned(Boolean.valueOf(PropKit.get("dbRemoveAbandoned")));
druid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout")));
druid.setConnectionProperties(SyncData.DRUID_CONFIG_FILE_PATH);//druid 配置文件路径
ActiveRecordPlugin arp=new ActiveRecordPlugin(url,druid);
arp.setShowSql(Boolean.valueOf(PropKit.get("dbShowSql")));
druid.start();
arp.start();
logger.debug(String.format("分库数据库连接池创建成功ip: %s", syncDbInfo.get("ip")));
}
logger.info("创建各分库数据库的连接池完成");
}

View File

@@ -1,21 +1,19 @@
package com.nms.main;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.jfinal.aop.Duang;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.druid.DruidPlugin;
import com.nms.model.SyncDbInfo;
import com.nms.thread.SyncSlaveToMasterThread;
@@ -35,6 +33,19 @@ public class SyncData{
private static final ThreadLocal<Set<Long>> threadLocalLoopUpdateMissionIds = new ThreadLocal<Set<Long>>();
// 线程变量记录周期任务结果新增的任务id
private static final ThreadLocal<Set<Long>> threadLocalLoopInsertMissionIds = new ThreadLocal<Set<Long>>();
//druid 配置文件路径
public static final String DRUID_CONFIG_FILE_PATH;
static {
URL urlObj = SyncData.class.getClassLoader().getResource("druid.properties");
if(urlObj==null){
System.err.println("找不到配置文件druid.properties");
logger.error("No configuration file can be found: druid.properties");
System.exit(1);
}
DRUID_CONFIG_FILE_PATH = urlObj.getPath().replaceAll("%20", " ");
logger.debug(String.format("druid配置文件路径", DRUID_CONFIG_FILE_PATH));
}
public static void main(String[] args) {
@@ -51,6 +62,7 @@ public class SyncData{
masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
masterDruid.setRemoveAbandoned(Boolean.valueOf(PropKit.get("dbRemoveAbandoned")));
masterDruid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout")));
masterDruid.setConnectionProperties(DRUID_CONFIG_FILE_PATH);//druid 配置文件路径
ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid);
masterArp.setShowSql(Boolean.valueOf(PropKit.get("dbShowSql")));
masterDruid.start();
@@ -63,26 +75,25 @@ public class SyncData{
Conn.createConn(syncDbInfos);
logger.info("分库数据库连接池创建完成");
// 定时周期执行线程池 用于周期执行线程的运行
ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size());
ScheduledExecutorService masterToSlavePool = Executors.newScheduledThreadPool(PropKit.use("config.properties").getInt("master.to.slave.pool.num", 2));
ScheduledExecutorService slaveToMasterPool = Executors.newScheduledThreadPool(PropKit.use("config.properties").getInt("slave.to.master.pool.num", 3));
logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size());
// 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务
for(SyncDbInfo syncDbInfo : syncDbInfos){
// 主库向分库同步数据
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
logger.info("创建主库同步分库线程执行任务");
scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
masterToSlavePool.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
// 分库向主库同步数据
logger.info("创建分库数据同步到主库线程执行任务");
SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));
scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
slaveToMasterPool.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
}
}else{
logger.info("获取同步记录信息失败 请检查数据库数据信息");
logger.error("获取同步记录信息失败 请检查数据库数据信息");
}
} catch(Exception e) {
e.printStackTrace();
logger.error("数据同步启动发生异常 信息为:"+e.getMessage());
logger.error("数据同步启动发生异常",e);
}
}

View File

@@ -7,22 +7,27 @@ import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import com.nms.interceptor.SyncMissionResultStatisticalInterceptor;
import com.nms.main.SyncData;
import com.alibaba.druid.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Before;
import com.jfinal.json.Json;
import com.jfinal.kit.Prop;
import com.jfinal.kit.PropKit;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.Record;
import com.jfinal.plugin.activerecord.tx.Tx;
import com.nms.interceptor.SyncMissionResultStatisticalInterceptor;
import com.nms.main.SyncData;
import com.nms.model.SyncDbInfo;
import com.nms.util.StopWatch;
@Before({SyncMissionResultStatisticalInterceptor.class})
public class SyncSlaveToMasterThread implements Runnable{
private Logger logger = Logger.getLogger(this.getClass());
private SyncDbInfo syncDbInfo;
private boolean firstRun= true;
public SyncSlaveToMasterThread() {
super();
}
@@ -34,8 +39,11 @@ public class SyncSlaveToMasterThread implements Runnable{
@Override
public void run() {
Thread.currentThread().setName(syncDbInfo.getIp()+"->同步主库");
String errorTableName=null;
String errorUrl=null;
StopWatch sw = new StopWatch();
sw.start();
try {
// 主库向分库同步数据
logger.info("开始分库数据同步主库");
@@ -53,6 +61,10 @@ public class SyncSlaveToMasterThread implements Runnable{
//logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
long total = 0;
//同步 表 開始
sw.tag("s_" +record.getInt("event")+ record.getStr("table_name"));
//logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name"));
errorTableName=record.getStr("table_name");
//针对个别特殊表动态条件
@@ -67,10 +79,29 @@ public class SyncSlaveToMasterThread implements Runnable{
columns.setLength(0);
columns.append(record.getStr("columns"));
}
final StringBuilder colRelation = new StringBuilder();
//整理关联查询的 column
if("*".equalsIgnoreCase(columns.toString())) {
colRelation.append("tt.*");
}else {
String[] cols = columns.toString().split(",");
colRelation.setLength(0);
for(String s : cols) {
if(!StringUtils.isEmpty(s)) {
colRelation.append(",tt.");
colRelation.append(s.trim());
}
}
colRelation.deleteCharAt(0);
}
// 循环同步数据标识
boolean flag = true;
// 判断表中的event事件 1代表insert 2代表update 3代表delete
if (record.getInt("event") == 1) {
sw.tag("s_insert"+ record.getStr("table_name"));
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
while (flag) {
// 新增操作 取出最后更新id信息 查询增量数据
@@ -80,6 +111,7 @@ public class SyncSlaveToMasterThread implements Runnable{
record.getLong("last_id"));
//logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
if (data != null && data.size() > 0) {
total += data.size();
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
if(record.getInt("mode").equals(2)&&record.getStr("table_name").contains("mission_result_table")) {
Set<Long> set = SyncData.getThreadlocalInsertMissionIds();
@@ -128,11 +160,10 @@ public class SyncSlaveToMasterThread implements Runnable{
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name"));
logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + data.size());
return true;
}
});
@@ -147,11 +178,10 @@ public class SyncSlaveToMasterThread implements Runnable{
public boolean run() throws SQLException {
masterDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name"));
logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + data.size());
return true;
}
});
@@ -209,10 +239,10 @@ public class SyncSlaveToMasterThread implements Runnable{
masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
Object lastInsertId = data.get(data.size() - 1).get("id");
logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + insertDatas.size());
return true;
}else{
Object lastInsertId = data.get(data.size() - 1).get("id");
@@ -240,11 +270,10 @@ public class SyncSlaveToMasterThread implements Runnable{
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name"));
logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId));
record.set("last_id", lastInsertId);
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + insertDatas.size());
return true;
}
}
@@ -258,37 +287,67 @@ public class SyncSlaveToMasterThread implements Runnable{
}
}
} else if (record.getInt("event") == 2 || record.getInt("event") == 3) {
Boolean allUpdate = PropKit.use("").getBoolean("detection_all_update", false);//是否启用全量同步
//监测信息最新表同步 全量更新
if((firstRun || allUpdate) && "detection_info_new".equalsIgnoreCase(record.getStr("table_name"))) {
long fromId = 0L;//开始id
String sql = "select max(id) last_id from table_event_log where table_name='detection_info_new' and event = 2 and id > "+ record.getLong("last_id");
Record maxIdRec = slaveDb.findFirst(sql);
if(maxIdRec != null) {
while(true) {
String dinSql = "select ID, `DETECTION_SET_INFO_ID`, `CHECK_WAY`, `DETECTION_STATE_INFO`, `PERFORMACE_DATA`, `CURRENT_TIMES`, `START_TIME`, `WAIT_TIME`, `DELAY_TIME`, `NEXT_CHECK_TIME`, `OFF_LINE`, `POLICE_LEVEL`, `DATA_CHECK_TIME`, `DATA_ARRIVE_TIME`, `DETECTIONED_STATE`, `NODE_IP`, `STATUS_CHANGE_TIME`, `DATA_CHECK_TIME_DIGITAL`, `DATA_ARRIVE_TIME_DIGITAL`, `SEQ_ID`, `DETECTION_INFO_ID`, `VALID`, `POLICE_EMERGENT` from detection_info_new din where id > ? order by id limit "+ record.getInt("batch_size") ;
List<Record> dinList = slaveDb.find(dinSql, fromId);
if(dinList!= null && dinList.size()>0) {
fromId = dinList.get(dinList.size()-1).getLong("ID");
for(Record r : dinList) {
r.remove("ID");
}
masterDb.batchUpdate("detection_info_new", "DETECTION_SET_INFO_ID,SEQ_ID", dinList, record.getInt("batch_size"));
logger.debug("分库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,fromId : " + fromId +",num:" + dinList.size());
}else {
Long newLastId = maxIdRec.getLong("last_id");
record.set("last_id", Long.valueOf(newLastId));
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
logger.debug(String.format("new表全量更新完成最后update id%s ", newLastId) );
break;
}
}
}
continue;
}
// table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改
while (flag) {
final List<Record> datas = slaveDb.find(
" select * from table_event_log where table_name = '" + record.getStr("table_name")
+ "' and id > " + record.getLong("last_id") + " and event = "
+ record.getInt("event") + " order by id asc limit " + record.getInt("batch_size"));
String sql = "select max(id) lastId,count(1) total from ( select id from table_event_log l "
+ " where table_name = '"+record.getStr("table_name")+"' "
+ " and event = "+record.getInt("event")
+ " and id > "+record.getLong("last_id")
+" order by id asc limit " + record.getInt("batch_size") +" ) llll";
Record statRec = slaveDb.findFirst(sql);
final Long num = statRec.getLong("total");//将要同步的数据量
final String lastId = statRec.getStr("lastId");//将要同步的最大id
//logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas));
if (datas != null && datas.size() > 0) {
if (num > 0) {
total += num;
slaveDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return masterDb.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
List<Long> updateIds = new ArrayList<Long>();
List<Record> deleteRecords=new ArrayList<Record>();
StringBuilder handleStr=new StringBuilder();
for (int i = 0; i < datas.size(); i++) {
updateIds.add(datas.get(i).getLong("target_id"));
if(i==0) {
handleStr.append("?");
}else {
handleStr.append(",?");
}
}
logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds));
if (record.getInt("event") == 2) {
List<Record> updateDatas = slaveDb
.find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where "
+ record.getStr("id_name") + " in (" + handleStr + ") ",
updateIds.toArray());
String sql = "select "+colRelation.toString()+" from " + record.getStr("table_name") +" tt "
+ " left join table_event_log log on tt."+record.getStr("id_name")+"=log.target_id "
+ " where log.table_name = '"+record.getStr("table_name")+"'"
+ " and log.event = " +record.getInt("event")
+ " and log.id < " + (Long.valueOf(lastId)+1)
+ " and log.id > "+record.getLong("last_id")
+ " order by log.id asc limit " + record.getInt("batch_size");
logger.debug(String.format("查询sql%s", sql));
List<Record> updateDatas = slaveDb.find(sql);
//logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
if (updateDatas != null && updateDatas.size() > 0) {
if(record.getStr("table_name").equals("event_record_library")) {
@@ -312,31 +371,34 @@ public class SyncSlaveToMasterThread implements Runnable{
}
SyncData.setThreadlocalUpdateMissionIds(missionIds);
masterDb.batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size"));
}else {
if(record.getStr("table_name").equals("detection_info_new")) {
for(Record updateData:updateDatas) {
Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID"));
updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name")));
}
}else if(record.getStr("table_name").equals("detection_info_new")){
for(Record updateData:updateDatas) {
updateData.remove(record.getStr("id_name"));
}
if(record.getStr("table_name").equals("loopmission_state_table")) {
Set<Long> missionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
for(Record updateData:updateDatas) {
missionIds.add(updateData.getLong(record.getStr("id_name")));
}
SyncData.setThreadlocalLoopUpdateMissionIds(missionIds);
masterDb.batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size"));
}else if(record.getStr("table_name").equals("loopmission_state_table")) {
Set<Long> missionIds = SyncData.getThreadlocalLoopUpdateMissionIds();
for(Record updateData:updateDatas) {
missionIds.add(updateData.getLong(record.getStr("id_name")));
}
SyncData.setThreadlocalLoopUpdateMissionIds(missionIds);
masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"),
updateDatas, record.getInt("batch_size"));
}
}
logger.info("分库同步主库修改数据任务完成");
} else if (record.getInt("event") == 3) {
for (int i = 0; i < datas.size(); i++) {
String sql = "select target_id from table_event_log l "
+ " where table_name = '"+record.getStr("table_name")+"' "
+ " and event = "+record.getInt("event")
+ " and id > "+record.getLong("last_id")+" limit " + record.getInt("batch_size");
List<Record> delList = slaveDb.find(sql);
for (int i = 0; i < delList.size(); i++) {
Record deleteRecord=new Record();
deleteRecord.set(record.getStr("id_name"), datas.get(i).getLong("target_id"));
deleteRecord.set(record.getStr("id_name"), delList.get(i).getLong("target_id"));
//如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义
deleteRecord.set("old_id", datas.get(i).getLong("target_id"));
deleteRecord.set("old_id", delList.get(i).getLong("target_id"));
deleteRecord.set("db_id", syncDbInfo.get("id"));
deleteRecords.add(deleteRecord);
}
@@ -345,11 +407,9 @@ public class SyncSlaveToMasterThread implements Runnable{
}else {
masterDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size"));
}
logger.info("分库同步主库删除数据任务完成");
}
Object lastUpdateId = datas.get(datas.size() - 1).get("id");
logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId);
record.set("last_id", lastUpdateId);
logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastId +",num:" + num);
record.set("last_id", Long.valueOf(lastId));
record.set("last_date", new Date());
slaveDb.update("table_sync_info", record);
return true;
@@ -357,20 +417,21 @@ public class SyncSlaveToMasterThread implements Runnable{
});
}
});
logger.info("修改分库table_sync_info最后操作数据信息 用于下次同步操作完成");
} else {
flag = false;
}
}
}
sw.tag("e_" +record.getInt("event")+ record.getStr("table_name"));
logger.debug("分库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" , 耗时 " + sw.toString(sw.between("e_" +record.getInt("event")+ record.getStr("table_name"), "s_" +record.getInt("event")+ record.getStr("table_name")))+" ,totalnum : " + total);
}
}
logger.info("分库数据同步主库结束");
logger.info("##################################################");
sw.end();
logger.info(errorUrl + " 分库数据同步主库结束,耗时:"+sw.toString(sw.total()));
firstRun = false;//修改首次启动标志位
} catch (Exception e) {
logger.error("分库同步主库数据连接为:"+errorUrl);
logger.error("分库同步主库数据当前操作的异常表名为:"+errorTableName);
logger.error("分库数据同步主库发生错误 异常信息为:"+e.getMessage());
logger.error("分库数据同步主库发生错误 异常信息",e);
// 如果出现异常信息 清楚线程变量 不用进行统计
SyncData.removeThreadlocalUpdateMissionIds();
@@ -379,6 +440,7 @@ public class SyncSlaveToMasterThread implements Runnable{
SyncData.removeThreadlocalLoopInsertMissionIds();
e.printStackTrace();
}
}
public SyncDbInfo getSyncDbInfo() {

View File

@@ -14,6 +14,7 @@ import com.jfinal.plugin.activerecord.Record;
import com.nms.interceptor.SyncDataInterceptor;
import com.nms.interceptor.SyncSocketInterceptor;
import com.nms.model.SyncDbInfo;
import com.nms.util.StopWatch;
import com.jfinal.plugin.activerecord.tx.Tx;
/**
* 数据同步功能线程
@@ -37,7 +38,10 @@ public class SyncThread implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("主库同步->" + syncDbInfo.getIp());
String errorTableName=null;
StopWatch sw = new StopWatch();
sw.start();
try {
logger.info("开始主库数据同步分库任务");
// 获取url路径
@@ -48,12 +52,16 @@ public class SyncThread implements Runnable {
final DbPro masterDb = Db.use("masterDataSource");
final DbPro slaveDb = Db.use(url.toString());
logger.debug("數據源獲取成功");
List<Record> find = masterDb.find("select * from table_sync_info where db_id=? ",
syncDbInfo.get("id"));
//logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find));
if (find != null && find.size() > 0) {
for (final Record record : find) {
logger.debug("主库同步表开始:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +",lastId:" + record.getStr("last_id"));
long total = 0;
//同步 表 開始
sw.tag("s_" +record.getInt("event")+ record.getStr("table_name"));
//logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name"));
errorTableName=record.getStr("table_name");
//针对个别特殊表动态条件
@@ -270,13 +278,15 @@ public class SyncThread implements Runnable {
}
}
}
sw.tag("e_" +record.getInt("event")+ record.getStr("table_name"));
logger.debug("主库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" , 耗时 " + sw.toString(sw.between("e_" +record.getInt("event")+ record.getStr("table_name"), "s_" +record.getInt("event")+ record.getStr("table_name")))+" ,totalnum : " + total);
}
}
logger.info("主库数据同步分库结束");
sw.end();
logger.info("主库数据同步分库结束,耗时:"+sw.toString(sw.total()));
logger.info("*****************************************************");
} catch (Exception e) {
logger.error("主库同步分库数据当前操作的异常表名为:"+errorTableName);
logger.error("主库数据同步分库发生错误 异常信息为:"+e.getMessage());
logger.error("主库数据同步分库发生错误 异常信息",e);
e.printStackTrace();
}

View File

@@ -0,0 +1,164 @@
package com.nms.util;
import java.util.LinkedHashMap;
/**
* 秒表计时器
* @author fang
*
*/
public class StopWatch {
private static final long SEC_MILL = 1000;
private static final long MIN_MILL = 60 * SEC_MILL;
private static final long HOUR_MILL = 60 * MIN_MILL;
private static final long DAY_MILL = 24 * HOUR_MILL;
private long start;
private long end;
private LinkedHashMap<String,Long> tagMap = new LinkedHashMap<String,Long>();
public StopWatch(){
start();
}
public static StopWatch newStopWacth(){
return new StopWatch();
}
/**
* 计时器开始
* @return
*/
public long start(){
this.start = System.currentTimeMillis();
return start;
}
/**
* 计时器结束
* @return
*/
public long end(){
this.end = System.currentTimeMillis();
return end;
}
public long tag(String tag){
long l = System.currentTimeMillis();
this.tagMap.put(tag, l);
return l;
}
/**
* 计算两个 tag 之间的时间差
* @param b
* @param a
* @return
*/
public long between(String b,String a){
Long l1 = this.tagMap.get(b);
Long l2 = this.tagMap.get(a);
if(l1 != null && l2 != null){
return l1-l2;
}
return -1;
}
/**
* 兩個 tag之間的時間差
* @param b
* @param a
* @return
*/
public String timeBetween(String b,String a) {
long between = between(b, a);
return toString(between);
}
public static String toString(long l){
StringBuilder sb = new StringBuilder();
if(l >= DAY_MILL){
sb.append((l/DAY_MILL));
sb.append( "");
l = l % DAY_MILL;
}
if(l >= HOUR_MILL){
sb.append((l/HOUR_MILL));
sb.append( "小时");
l = l % HOUR_MILL;
}
if(l >= MIN_MILL){
sb.append((l/MIN_MILL));
sb.append( "");
l = l % MIN_MILL;
}
if(l >= SEC_MILL){
sb.append((l/SEC_MILL));
sb.append( "");
l = l % SEC_MILL;
}
sb.append((l));
sb.append( "毫秒");
return sb.toString();
}
public String toString(){
return "";
}
/**
* 从开始到结束总耗时
* @return
*/
public long total(){
long temp = System.currentTimeMillis();
if(this.end < this.start){
this.end = temp;
}
return end - start;
}
public void reset(){
this.tagMap.clear();
this.start();
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public long getEnd() {
return end;
}
public void setEnd(long end) {
this.end = end;
}
public LinkedHashMap<String, Long> getTag() {
return tagMap;
}
public void LinkedHashMap(LinkedHashMap<String, Long> tag) {
this.tagMap = tag;
}
public static void main(String[] args) {
long s = System.currentTimeMillis();
long end = s +2*DAY_MILL+ 12 * MIN_MILL + 30*SEC_MILL + 388;
String string = StopWatch.toString(end -s);
System.out.println(string);
}
}