diff --git a/nms_sync/conf/config.properties b/nms_sync/conf/config.properties index c32ace2..33a36dd 100644 --- a/nms_sync/conf/config.properties +++ b/nms_sync/conf/config.properties @@ -1,4 +1,8 @@ #\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee syncMaterToSlaveTime=30000 #\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee -syncSlaveToMaterTime=60000 \ No newline at end of file +syncSlaveToMaterTime=60000 +#从库向主库同步的线程池数量 +slave.to.master.pool.num=3 +#主库向从库同步的线程池数量 +master.to.slave.pool.num=2 diff --git a/nms_sync/conf/druid.properties b/nms_sync/conf/druid.properties new file mode 100644 index 0000000..0073e99 --- /dev/null +++ b/nms_sync/conf/druid.properties @@ -0,0 +1,30 @@ +#检测数据库链接是否有效,必须配置 +druid.validationQuery=SELECT 1 from dual +#初始连接数 +druid.initialSize=1 +#最大连接池数量 +druid.maxActive=3 +#去掉,配置文件对应去掉 +#druid.maxIdle=20 +#配置0,当线程池数量不足,自动补充。 +druid.minIdle=1 +#获取链接超时时间为1分钟,单位为毫秒。 +druid.maxWait=60000 +#获取链接的时候,不校验是否可用,开启会有损性能。 +druid.testOnBorrow=false +#归还链接到连接池的时候校验链接是否可用。 +druid.testOnReturn=false +#此项配置为true即可,不影响性能,并且保证安全性。意义为:申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 +druid.testWhileIdle=true +#1.Destroy线程会检测连接的间隔时间 +#2.testWhileIdle的判断依据 +druid.timeBetweenEvictionRunsMillis=600000 +#一个链接生存的时间 +druid.minEvictableIdleTimeMillis=600000 +#链接使用超过时间限制是否回收 +druid.removeAbandoned=false +#超过时间限制时间(单位秒),目前为5分钟,如果有业务处理时间超过5分钟,可以适当调整。 +druid.removeAbandonedTimeout=300 +#链接回收的时候控制台打印信息,测试环境可以加上true,线上环境false。会影响性能。 +druid.logAbandoned=false +druid.filters= \ No newline at end of file diff --git a/nms_sync/src/com/nms/main/Conn.java b/nms_sync/src/com/nms/main/Conn.java index 80793c9..54b2ddd 100644 --- a/nms_sync/src/com/nms/main/Conn.java +++ b/nms_sync/src/com/nms/main/Conn.java @@ -32,10 +32,12 @@ public class Conn { druid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait"))); druid.setRemoveAbandoned(Boolean.valueOf(PropKit.get("dbRemoveAbandoned"))); druid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout"))); + druid.setConnectionProperties(SyncData.DRUID_CONFIG_FILE_PATH);//druid 配置文件路径 ActiveRecordPlugin arp=new ActiveRecordPlugin(url,druid); arp.setShowSql(Boolean.valueOf(PropKit.get("dbShowSql"))); druid.start(); arp.start(); + logger.debug(String.format("分库数据库连接池创建成功,ip: %s", syncDbInfo.get("ip"))); } logger.info("创建各分库数据库的连接池完成"); } diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java index a722153..d2c559d 100644 --- a/nms_sync/src/com/nms/main/SyncData.java +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -1,21 +1,19 @@ package com.nms.main; +import java.net.URL; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; + import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; import com.jfinal.aop.Duang; import com.jfinal.kit.PropKit; import com.jfinal.plugin.activerecord.ActiveRecordPlugin; -import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.druid.DruidPlugin; import com.nms.model.SyncDbInfo; import com.nms.thread.SyncSlaveToMasterThread; @@ -35,6 +33,19 @@ public class SyncData{ private static final ThreadLocal> threadLocalLoopUpdateMissionIds = new ThreadLocal>(); // 线程变量记录周期任务结果新增的任务id private static final ThreadLocal> threadLocalLoopInsertMissionIds = new ThreadLocal>(); + //druid 配置文件路径 + public static final String DRUID_CONFIG_FILE_PATH; + + static { + URL urlObj = SyncData.class.getClassLoader().getResource("druid.properties"); + if(urlObj==null){ + System.err.println("找不到配置文件:druid.properties"); + logger.error("No configuration file can be found: druid.properties"); + System.exit(1); + } + DRUID_CONFIG_FILE_PATH = urlObj.getPath().replaceAll("%20", " "); + logger.debug(String.format("druid配置文件路径:", DRUID_CONFIG_FILE_PATH)); + } public static void main(String[] args) { @@ -51,6 +62,7 @@ public class SyncData{ masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait"))); masterDruid.setRemoveAbandoned(Boolean.valueOf(PropKit.get("dbRemoveAbandoned"))); masterDruid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout"))); + masterDruid.setConnectionProperties(DRUID_CONFIG_FILE_PATH);//druid 配置文件路径 ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid); masterArp.setShowSql(Boolean.valueOf(PropKit.get("dbShowSql"))); masterDruid.start(); @@ -63,26 +75,25 @@ public class SyncData{ Conn.createConn(syncDbInfos); logger.info("分库数据库连接池创建完成"); // 定时周期执行线程池 用于周期执行线程的运行 - ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size()); + ScheduledExecutorService masterToSlavePool = Executors.newScheduledThreadPool(PropKit.use("config.properties").getInt("master.to.slave.pool.num", 2)); + ScheduledExecutorService slaveToMasterPool = Executors.newScheduledThreadPool(PropKit.use("config.properties").getInt("slave.to.master.pool.num", 3)); logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size()); // 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务 for(SyncDbInfo syncDbInfo : syncDbInfos){ // 主库向分库同步数据 SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); logger.info("创建主库同步分库线程执行任务"); - scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); + masterToSlavePool.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); // 分库向主库同步数据 logger.info("创建分库数据同步到主库线程执行任务"); SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo)); - scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); + slaveToMasterPool.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); } }else{ - logger.info("获取同步记录信息失败 请检查数据库数据信息"); logger.error("获取同步记录信息失败 请检查数据库数据信息"); } } catch(Exception e) { - e.printStackTrace(); - logger.error("数据同步启动发生异常 信息为:"+e.getMessage()); + logger.error("数据同步启动发生异常",e); } } diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index d3b0656..7955c92 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -7,22 +7,27 @@ import java.util.List; import java.util.Set; import org.apache.log4j.Logger; -import com.nms.interceptor.SyncMissionResultStatisticalInterceptor; -import com.nms.main.SyncData; + +import com.alibaba.druid.util.StringUtils; import com.alibaba.fastjson.JSON; import com.jfinal.aop.Before; +import com.jfinal.json.Json; +import com.jfinal.kit.Prop; +import com.jfinal.kit.PropKit; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.DbPro; import com.jfinal.plugin.activerecord.IAtom; import com.jfinal.plugin.activerecord.Record; -import com.jfinal.plugin.activerecord.tx.Tx; +import com.nms.interceptor.SyncMissionResultStatisticalInterceptor; +import com.nms.main.SyncData; import com.nms.model.SyncDbInfo; +import com.nms.util.StopWatch; @Before({SyncMissionResultStatisticalInterceptor.class}) public class SyncSlaveToMasterThread implements Runnable{ private Logger logger = Logger.getLogger(this.getClass()); private SyncDbInfo syncDbInfo; - + private boolean firstRun= true; public SyncSlaveToMasterThread() { super(); } @@ -34,8 +39,11 @@ public class SyncSlaveToMasterThread implements Runnable{ @Override public void run() { + Thread.currentThread().setName(syncDbInfo.getIp()+"->同步主库"); String errorTableName=null; String errorUrl=null; + StopWatch sw = new StopWatch(); + sw.start(); try { // 主库向分库同步数据 logger.info("开始分库数据同步主库"); @@ -53,6 +61,10 @@ public class SyncSlaveToMasterThread implements Runnable{ //logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { + long total = 0; + //同步 表 開始 + sw.tag("s_" +record.getInt("event")+ record.getStr("table_name")); + //logger.info("分库数据同步到主库 正在操作的表名为:"+ record.getStr("table_name")); errorTableName=record.getStr("table_name"); //针对个别特殊表动态条件 @@ -67,10 +79,29 @@ public class SyncSlaveToMasterThread implements Runnable{ columns.setLength(0); columns.append(record.getStr("columns")); } + + + final StringBuilder colRelation = new StringBuilder(); + //整理关联查询的 column + if("*".equalsIgnoreCase(columns.toString())) { + colRelation.append("tt.*"); + }else { + String[] cols = columns.toString().split(","); + colRelation.setLength(0); + for(String s : cols) { + if(!StringUtils.isEmpty(s)) { + colRelation.append(",tt."); + colRelation.append(s.trim()); + } + } + colRelation.deleteCharAt(0); + } + // 循环同步数据标识 boolean flag = true; // 判断表中的event事件 1代表insert 2代表update 3代表delete if (record.getInt("event") == 1) { + sw.tag("s_insert"+ record.getStr("table_name")); if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){ while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 @@ -80,6 +111,7 @@ public class SyncSlaveToMasterThread implements Runnable{ record.getLong("last_id")); //logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { + total += data.size(); final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); if(record.getInt("mode").equals(2)&&record.getStr("table_name").contains("mission_result_table")) { Set set = SyncData.getThreadlocalInsertMissionIds(); @@ -128,11 +160,10 @@ public class SyncSlaveToMasterThread implements Runnable{ masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } - logger.info("分库同步 detection_info_new 增量更新数据完成 表名为"+record.getStr("table_name")); - logger.info("分库同步 detection_info_new 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); slaveDb.update("table_sync_info", record); + logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + data.size()); return true; } }); @@ -147,11 +178,10 @@ public class SyncSlaveToMasterThread implements Runnable{ public boolean run() throws SQLException { masterDb.batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 - logger.info("分库同步增量更新数据完成 表名为"+record.getStr("table_name")); - logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); slaveDb.update("table_sync_info", record); + logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + data.size()); return true; } }); @@ -209,10 +239,10 @@ public class SyncSlaveToMasterThread implements Runnable{ masterDb.batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get("id"); - logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); slaveDb.update("table_sync_info", record); + logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + insertDatas.size()); return true; }else{ Object lastInsertId = data.get(data.size() - 1).get("id"); @@ -240,11 +270,10 @@ public class SyncSlaveToMasterThread implements Runnable{ masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } - logger.info("分库同步 loopmission_state_table 增量更新数据完成 表名为"+record.getStr("table_name")); - logger.info("分库同步 loopmission_state_table 最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); slaveDb.update("table_sync_info", record); + logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastInsertId +",num:" + insertDatas.size()); return true; } } @@ -258,37 +287,67 @@ public class SyncSlaveToMasterThread implements Runnable{ } } } else if (record.getInt("event") == 2 || record.getInt("event") == 3) { + Boolean allUpdate = PropKit.use("").getBoolean("detection_all_update", false);//是否启用全量同步 + //监测信息最新表同步 全量更新 + if((firstRun || allUpdate) && "detection_info_new".equalsIgnoreCase(record.getStr("table_name"))) { + long fromId = 0L;//开始id + String sql = "select max(id) last_id from table_event_log where table_name='detection_info_new' and event = 2 and id > "+ record.getLong("last_id"); + Record maxIdRec = slaveDb.findFirst(sql); + if(maxIdRec != null) { + while(true) { + String dinSql = "select ID, `DETECTION_SET_INFO_ID`, `CHECK_WAY`, `DETECTION_STATE_INFO`, `PERFORMACE_DATA`, `CURRENT_TIMES`, `START_TIME`, `WAIT_TIME`, `DELAY_TIME`, `NEXT_CHECK_TIME`, `OFF_LINE`, `POLICE_LEVEL`, `DATA_CHECK_TIME`, `DATA_ARRIVE_TIME`, `DETECTIONED_STATE`, `NODE_IP`, `STATUS_CHANGE_TIME`, `DATA_CHECK_TIME_DIGITAL`, `DATA_ARRIVE_TIME_DIGITAL`, `SEQ_ID`, `DETECTION_INFO_ID`, `VALID`, `POLICE_EMERGENT` from detection_info_new din where id > ? order by id limit "+ record.getInt("batch_size") ; + List dinList = slaveDb.find(dinSql, fromId); + if(dinList!= null && dinList.size()>0) { + fromId = dinList.get(dinList.size()-1).getLong("ID"); + for(Record r : dinList) { + r.remove("ID"); + } + masterDb.batchUpdate("detection_info_new", "DETECTION_SET_INFO_ID,SEQ_ID", dinList, record.getInt("batch_size")); + logger.debug("分库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,fromId : " + fromId +",num:" + dinList.size()); + }else { + Long newLastId = maxIdRec.getLong("last_id"); + record.set("last_id", Long.valueOf(newLastId)); + record.set("last_date", new Date()); + slaveDb.update("table_sync_info", record); + logger.debug(String.format("new表全量更新完成,最后update id:%s ", newLastId) ); + break; + } + } + } + continue; + } // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 while (flag) { - final List datas = slaveDb.find( - " select * from table_event_log where table_name = '" + record.getStr("table_name") - + "' and id > " + record.getLong("last_id") + " and event = " - + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size")); + String sql = "select max(id) lastId,count(1) total from ( select id from table_event_log l " + + " where table_name = '"+record.getStr("table_name")+"' " + + " and event = "+record.getInt("event") + + " and id > "+record.getLong("last_id") + +" order by id asc limit " + record.getInt("batch_size") +" ) llll"; + + Record statRec = slaveDb.findFirst(sql); + final Long num = statRec.getLong("total");//将要同步的数据量 + final String lastId = statRec.getStr("lastId");//将要同步的最大id //logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas)); - if (datas != null && datas.size() > 0) { + if (num > 0) { + total += num; slaveDb.tx(new IAtom() { @Override public boolean run() throws SQLException { return masterDb.tx(new IAtom() { @Override public boolean run() throws SQLException { - List updateIds = new ArrayList(); List deleteRecords=new ArrayList(); - StringBuilder handleStr=new StringBuilder(); - for (int i = 0; i < datas.size(); i++) { - updateIds.add(datas.get(i).getLong("target_id")); - if(i==0) { - handleStr.append("?"); - }else { - handleStr.append(",?"); - } - } - logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); + if (record.getInt("event") == 2) { - List updateDatas = slaveDb - .find(" select "+columns.toString()+" from " + record.getStr("table_name") + " where " - + record.getStr("id_name") + " in (" + handleStr + ") ", - updateIds.toArray()); + String sql = "select "+colRelation.toString()+" from " + record.getStr("table_name") +" tt " + + " left join table_event_log log on tt."+record.getStr("id_name")+"=log.target_id " + + " where log.table_name = '"+record.getStr("table_name")+"'" + + " and log.event = " +record.getInt("event") + + " and log.id < " + (Long.valueOf(lastId)+1) + + " and log.id > "+record.getLong("last_id") + + " order by log.id asc limit " + record.getInt("batch_size"); + logger.debug(String.format("查询sql:%s", sql)); + List updateDatas = slaveDb.find(sql); //logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { if(record.getStr("table_name").equals("event_record_library")) { @@ -312,31 +371,34 @@ public class SyncSlaveToMasterThread implements Runnable{ } SyncData.setThreadlocalUpdateMissionIds(missionIds); masterDb.batchUpdate(record.getStr("table_name"), "mission_id,seq_id",updateDatas, record.getInt("batch_size")); - }else { - if(record.getStr("table_name").equals("detection_info_new")) { - for(Record updateData:updateDatas) { - Record findFirst = masterDb.findFirst("select * from " + record.getStr("table_name")+" where DETECTION_SET_INFO_ID = ? and SEQ_ID = ? ",updateData.get("DETECTION_SET_INFO_ID"),updateData.get("SEQ_ID")); - updateData.set(record.getStr("id_name"),findFirst.getLong(record.getStr("id_name"))); - } + }else if(record.getStr("table_name").equals("detection_info_new")){ + for(Record updateData:updateDatas) { + updateData.remove(record.getStr("id_name")); } - if(record.getStr("table_name").equals("loopmission_state_table")) { - Set missionIds = SyncData.getThreadlocalLoopUpdateMissionIds(); - for(Record updateData:updateDatas) { - missionIds.add(updateData.getLong(record.getStr("id_name"))); - } - SyncData.setThreadlocalLoopUpdateMissionIds(missionIds); + masterDb.batchUpdate(record.getStr("table_name"), "DETECTION_SET_INFO_ID,SEQ_ID",updateDatas, record.getInt("batch_size")); + + }else if(record.getStr("table_name").equals("loopmission_state_table")) { + Set missionIds = SyncData.getThreadlocalLoopUpdateMissionIds(); + for(Record updateData:updateDatas) { + missionIds.add(updateData.getLong(record.getStr("id_name"))); } + SyncData.setThreadlocalLoopUpdateMissionIds(missionIds); masterDb.batchUpdate(record.getStr("table_name"), record.getStr("id_name"), updateDatas, record.getInt("batch_size")); } } - logger.info("分库同步主库修改数据任务完成"); } else if (record.getInt("event") == 3) { - for (int i = 0; i < datas.size(); i++) { + + String sql = "select target_id from table_event_log l " + + " where table_name = '"+record.getStr("table_name")+"' " + + " and event = "+record.getInt("event") + + " and id > "+record.getLong("last_id")+" limit " + record.getInt("batch_size"); + List delList = slaveDb.find(sql); + for (int i = 0; i < delList.size(); i++) { Record deleteRecord=new Record(); - deleteRecord.set(record.getStr("id_name"), datas.get(i).getLong("target_id")); + deleteRecord.set(record.getStr("id_name"), delList.get(i).getLong("target_id")); //如果是针对 event_record_library 下两行数据使用 不是则仅仅赋值 无意义 - deleteRecord.set("old_id", datas.get(i).getLong("target_id")); + deleteRecord.set("old_id", delList.get(i).getLong("target_id")); deleteRecord.set("db_id", syncDbInfo.get("id")); deleteRecords.add(deleteRecord); } @@ -345,11 +407,9 @@ public class SyncSlaveToMasterThread implements Runnable{ }else { masterDb.batch("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+"=?",record.getStr("id_name"),deleteRecords,record.getInt("batch_size")); } - logger.info("分库同步主库删除数据任务完成"); } - Object lastUpdateId = datas.get(datas.size() - 1).get("id"); - logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId); - record.set("last_id", lastUpdateId); + logger.debug("表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" ,lastId : " + lastId +",num:" + num); + record.set("last_id", Long.valueOf(lastId)); record.set("last_date", new Date()); slaveDb.update("table_sync_info", record); return true; @@ -357,20 +417,21 @@ public class SyncSlaveToMasterThread implements Runnable{ }); } }); - logger.info("修改分库table_sync_info最后操作数据信息 用于下次同步操作完成"); } else { flag = false; } } + } + sw.tag("e_" +record.getInt("event")+ record.getStr("table_name")); + logger.debug("分库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" , 耗时 : " + sw.toString(sw.between("e_" +record.getInt("event")+ record.getStr("table_name"), "s_" +record.getInt("event")+ record.getStr("table_name")))+" ,totalnum : " + total); } } - logger.info("分库数据同步主库结束"); - logger.info("##################################################"); + sw.end(); + logger.info(errorUrl + " 分库数据同步主库结束,耗时:"+sw.toString(sw.total())); + firstRun = false;//修改首次启动标志位 } catch (Exception e) { - logger.error("分库同步主库数据连接为:"+errorUrl); logger.error("分库同步主库数据当前操作的异常表名为:"+errorTableName); - logger.error("分库数据同步主库发生错误 异常信息为:"+e.getMessage()); logger.error("分库数据同步主库发生错误 异常信息",e); // 如果出现异常信息 清楚线程变量 不用进行统计 SyncData.removeThreadlocalUpdateMissionIds(); @@ -379,6 +440,7 @@ public class SyncSlaveToMasterThread implements Runnable{ SyncData.removeThreadlocalLoopInsertMissionIds(); e.printStackTrace(); } + } public SyncDbInfo getSyncDbInfo() { diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index 49647fc..4b64e6c 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -14,6 +14,7 @@ import com.jfinal.plugin.activerecord.Record; import com.nms.interceptor.SyncDataInterceptor; import com.nms.interceptor.SyncSocketInterceptor; import com.nms.model.SyncDbInfo; +import com.nms.util.StopWatch; import com.jfinal.plugin.activerecord.tx.Tx; /** * 数据同步功能线程 @@ -37,7 +38,10 @@ public class SyncThread implements Runnable { @Override public void run() { + Thread.currentThread().setName("主库同步->" + syncDbInfo.getIp()); String errorTableName=null; + StopWatch sw = new StopWatch(); + sw.start(); try { logger.info("开始主库数据同步分库任务"); // 获取url路径 @@ -48,12 +52,16 @@ public class SyncThread implements Runnable { final DbPro masterDb = Db.use("masterDataSource"); final DbPro slaveDb = Db.use(url.toString()); - + logger.debug("數據源獲取成功"); List find = masterDb.find("select * from table_sync_info where db_id=? ", syncDbInfo.get("id")); //logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find)); if (find != null && find.size() > 0) { for (final Record record : find) { + logger.debug("主库同步表开始:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +",lastId:" + record.getStr("last_id")); + long total = 0; + //同步 表 開始 + sw.tag("s_" +record.getInt("event")+ record.getStr("table_name")); //logger.info("主库数据同步到分库 正在操作的表名为:"+ record.getStr("table_name")); errorTableName=record.getStr("table_name"); //针对个别特殊表动态条件 @@ -270,13 +278,15 @@ public class SyncThread implements Runnable { } } } + sw.tag("e_" +record.getInt("event")+ record.getStr("table_name")); + logger.debug("主库同步表:" + record.getStr("table_name")+",event : " +(record.getInt("event") == 1? "insert":(record.getInt("event")==2?"update":"delete")) +" , 耗时 : " + sw.toString(sw.between("e_" +record.getInt("event")+ record.getStr("table_name"), "s_" +record.getInt("event")+ record.getStr("table_name")))+" ,totalnum : " + total); } } - logger.info("主库数据同步分库结束"); + sw.end(); + logger.info("主库数据同步分库结束,耗时:"+sw.toString(sw.total())); logger.info("*****************************************************"); } catch (Exception e) { logger.error("主库同步分库数据当前操作的异常表名为:"+errorTableName); - logger.error("主库数据同步分库发生错误 异常信息为:"+e.getMessage()); logger.error("主库数据同步分库发生错误 异常信息",e); e.printStackTrace(); } diff --git a/nms_sync/src/com/nms/util/StopWatch.java b/nms_sync/src/com/nms/util/StopWatch.java new file mode 100644 index 0000000..895f9ac --- /dev/null +++ b/nms_sync/src/com/nms/util/StopWatch.java @@ -0,0 +1,164 @@ +package com.nms.util; + +import java.util.LinkedHashMap; + +/** + * 秒表计时器 + * @author fang + * + */ +public class StopWatch { + private static final long SEC_MILL = 1000; + private static final long MIN_MILL = 60 * SEC_MILL; + private static final long HOUR_MILL = 60 * MIN_MILL; + private static final long DAY_MILL = 24 * HOUR_MILL; + private long start; + private long end; + private LinkedHashMap tagMap = new LinkedHashMap(); + + public StopWatch(){ + start(); + } + + public static StopWatch newStopWacth(){ + return new StopWatch(); + } + + /** + * 计时器开始 + * @return + */ + public long start(){ + this.start = System.currentTimeMillis(); + return start; + } + + /** + * 计时器结束 + * @return + */ + public long end(){ + this.end = System.currentTimeMillis(); + return end; + } + + + public long tag(String tag){ + long l = System.currentTimeMillis(); + this.tagMap.put(tag, l); + return l; + } + + /** + * 计算两个 tag 之间的时间差 + * @param b + * @param a + * @return + */ + public long between(String b,String a){ + Long l1 = this.tagMap.get(b); + Long l2 = this.tagMap.get(a); + if(l1 != null && l2 != null){ + return l1-l2; + } + return -1; + } + + /** + * 兩個 tag之間的時間差 + * @param b + * @param a + * @return + */ + public String timeBetween(String b,String a) { + long between = between(b, a); + return toString(between); + } + + + public static String toString(long l){ + StringBuilder sb = new StringBuilder(); + if(l >= DAY_MILL){ + sb.append((l/DAY_MILL)); + sb.append( "天"); + l = l % DAY_MILL; + } + if(l >= HOUR_MILL){ + sb.append((l/HOUR_MILL)); + sb.append( "小时"); + l = l % HOUR_MILL; + } + if(l >= MIN_MILL){ + sb.append((l/MIN_MILL)); + sb.append( "分"); + l = l % MIN_MILL; + } + if(l >= SEC_MILL){ + sb.append((l/SEC_MILL)); + sb.append( "秒"); + l = l % SEC_MILL; + } + + sb.append((l)); + sb.append( "毫秒"); + + return sb.toString(); + } + + public String toString(){ + + return ""; + } + + /** + * 从开始到结束总耗时 + * @return + */ + public long total(){ + long temp = System.currentTimeMillis(); + if(this.end < this.start){ + this.end = temp; + } + return end - start; + } + + + public void reset(){ + this.tagMap.clear(); + this.start(); + } + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public LinkedHashMap getTag() { + return tagMap; + } + + public void LinkedHashMap(LinkedHashMap tag) { + this.tagMap = tag; + } + + public static void main(String[] args) { + long s = System.currentTimeMillis(); + long end = s +2*DAY_MILL+ 12 * MIN_MILL + 30*SEC_MILL + 388; + + String string = StopWatch.toString(end -s); + System.out.println(string); + + } + +}