This commit is contained in:
default
2018-11-20 09:53:10 +08:00
parent d8d7912919
commit 35a0a50ec9

View File

@@ -26,6 +26,7 @@ import com.nms.thread.SyncThread;
* *
*/ */
public class SyncData{ public class SyncData{
private static Logger logger = Logger.getLogger(SyncData.class);
// 线程变量记录任务结果修改的任务id // 线程变量记录任务结果修改的任务id
private static final ThreadLocal<Set<Long>> threadLocalUpdateMissionIds = new ThreadLocal<Set<Long>>(); private static final ThreadLocal<Set<Long>> threadLocalUpdateMissionIds = new ThreadLocal<Set<Long>>();
// 线程变量记录任务结果新增的任务id // 线程变量记录任务结果新增的任务id
@@ -37,46 +38,51 @@ public class SyncData{
public static void main(String[] args) { public static void main(String[] args) {
Logger logger = Logger.getLogger(SyncData.class); try {
logger.info("同步程序开始启动"); Logger logger = Logger.getLogger(SyncData.class);
//从配置文件获取数据库连接信息 logger.info("同步程序开始启动");
PropKit.use("db.properties"); //从配置文件获取数据库连接信息
//创建主数据库数据源 PropKit.use("db.properties");
DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword")); //创建主数据库数据源
masterDruid.setInitialSize(Integer.valueOf(PropKit.get("dbInitialSize"))); DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword"));
masterDruid.setMaxActive(Integer.valueOf(PropKit.get("dbMaxActive"))); masterDruid.setInitialSize(Integer.valueOf(PropKit.get("dbInitialSize")));
masterDruid.setMinIdle(Integer.valueOf(PropKit.get("dbMinIdle"))); masterDruid.setMaxActive(Integer.valueOf(PropKit.get("dbMaxActive")));
masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait"))); masterDruid.setMinIdle(Integer.valueOf(PropKit.get("dbMinIdle")));
masterDruid.setRemoveAbandoned(true); masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait")));
masterDruid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout"))); masterDruid.setRemoveAbandoned(true);
ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid); masterDruid.setRemoveAbandonedTimeoutMillis(Integer.valueOf(PropKit.get("dbRemoveAbandonedTimeout")));
masterArp.setShowSql(true); ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid);
masterDruid.start(); masterArp.setShowSql(true);
masterArp.start(); masterDruid.start();
logger.info("加载配置文件 设置当前同步 masterDataSource 完成"); masterArp.start();
List<SyncDbInfo> syncDbInfos = SyncDbInfo.dao.use("masterDataSource").find("select * from sync_db_info"); logger.info("加载配置文件 设置当前同步 masterDataSource 完成");
logger.info("数据库获取其它分库 数据库连接信息"+JSON.toJSONString(syncDbInfos)); List<SyncDbInfo> syncDbInfos = SyncDbInfo.dao.use("masterDataSource").find("select * from sync_db_info");
if(syncDbInfos!=null&&syncDbInfos.size()>0){ logger.info("数据库获取其它分库 数据库连接信息"+JSON.toJSONString(syncDbInfos));
//创建其它数据源的连接 if(syncDbInfos!=null&&syncDbInfos.size()>0){
Conn.createConn(syncDbInfos); //创建其它数据源的连接
logger.info("分库数据库连接池创建完成"); Conn.createConn(syncDbInfos);
// 定时周期执行线程池 用于周期执行线程的运行 logger.info("分库数据库连接池创建完成");
ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size()); // 定时周期执行线程池 用于周期执行线程的运行
logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size()); ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size());
// 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务 logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size());
for(SyncDbInfo syncDbInfo : syncDbInfos){ // 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务
// 主库向分库同步数据 for(SyncDbInfo syncDbInfo : syncDbInfos){
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); // 主库向分库同步数据
logger.info("创建主库同步分库线程执行任务"); SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); logger.info("创建主库同步分库线程执行任务");
// 分库向主库同步数据 scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
logger.info("创建分库数据同步到主库线程执行任务"); // 分库向主库同步数据
SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo)); logger.info("创建分库数据同步到主库线程执行任务");
scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));
scheduleService.scheduleWithFixedDelay(syncSlaveToMasterThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS);
}
}else{
logger.info("获取同步记录信息失败 请检查数据库数据信息");
logger.error("获取同步记录信息失败 请检查数据库数据信息");
} }
}else{ } catch(Exception e) {
logger.info("获取同步记录信息失败 请检查数据库数据信息"); e.printStackTrace();
logger.error("获取同步记录信息失败 请检查数据库数据信息"); logger.error("数据同步启动发生异常 信息为:"+e.getMessage());
} }
} }