From 23f608778db1270774305389dbf426b873279d99 Mon Sep 17 00:00:00 2001 From: default Date: Tue, 16 Oct 2018 16:25:30 +0800 Subject: [PATCH] update --- nms_sync/bin/db.properties | 2 +- nms_sync/conf/db.properties | 2 +- .../nms/interceptor/SyncDataInterceptor.java | 14 ++++++- nms_sync/src/com/nms/main/Conn.java | 2 +- nms_sync/src/com/nms/main/SyncData.java | 2 +- .../nms/thread/SyncSlaveToMasterThread.java | 40 ++++++++++++------- nms_sync/src/com/nms/thread/SyncThread.java | 9 +++-- 7 files changed, 49 insertions(+), 22 deletions(-) diff --git a/nms_sync/bin/db.properties b/nms_sync/bin/db.properties index 14d18f2..7b577b9 100644 --- a/nms_sync/bin/db.properties +++ b/nms_sync/bin/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://192.168.11.153:3306/nms +dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=nms #\u6570\u636e\u5e93\u5bc6\u7801 diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties index 14d18f2..7b577b9 100644 --- a/nms_sync/conf/db.properties +++ b/nms_sync/conf/db.properties @@ -1,6 +1,6 @@ #dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] #\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 -dburl=jdbc:mysql://192.168.11.153:3306/nms +dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true #\u6570\u636e\u5e93\u8d26\u6237\u540d dbusername=nms #\u6570\u636e\u5e93\u5bc6\u7801 diff --git a/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java index d1c4943..8f6324d 100644 --- a/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java +++ b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java @@ -1,6 +1,7 @@ package com.nms.interceptor; +import java.util.Date; import java.util.List; import org.apache.log4j.Logger; @@ -22,7 +23,7 @@ public class SyncDataInterceptor implements Interceptor{ SyncThread target = inv.getTarget(); SyncDbInfo syncDbInfo = target.getSyncDbInfo(); String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" - + syncDbInfo.get("database_name"); + + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"; logger.info("当前数据库连接为 "+url); //同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构 Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId()); @@ -91,6 +92,17 @@ public class SyncDataInterceptor implements Interceptor{ SyncStoredProcedure syncStoreProcedure=new SyncStoredProcedure("pro_createTable",checkTypeInfo.getStr("TABLE_NAME"), sql.toString(),cIndexFileds.toString()); Db.use(url).execute(syncStoreProcedure); logger.info("创建新表操作完成"); + + //创建完新表结构以后 将同步信息添加到同步表 使新配置的监测数据同步到中心 + Record record = new Record(); + record.set("table_name", checkTypeInfo.getStr("TABLE_NAME")); + record.set("mode", 2); + record.set("id_name", "ID"); + record.set("event",1); + record.set("last_id", -1); + record.set("db_id", -1); + record.set("last_date", new Date()); + Db.use(url).save("table_sync_info", record); } } diff --git a/nms_sync/src/com/nms/main/Conn.java b/nms_sync/src/com/nms/main/Conn.java index f9c0151..ae9ef6f 100644 --- a/nms_sync/src/com/nms/main/Conn.java +++ b/nms_sync/src/com/nms/main/Conn.java @@ -22,7 +22,7 @@ public class Conn { Logger logger = Logger.getLogger(Conn.class); logger.info("开始创建各分库数据库的连接池"); for (SyncDbInfo syncDbInfo : syncDbInfos) { - String url="jdbc:mysql://"+syncDbInfo.get("ip")+":"+syncDbInfo.get("port")+"/"+syncDbInfo.get("database_name"); + String url="jdbc:mysql://"+syncDbInfo.get("ip")+":"+syncDbInfo.get("port")+"/"+syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"; logger.info("当前创建数据库连接信息为"+url); //初始化各数据源插件 DruidPlugin druid=new DruidPlugin(url,(String)syncDbInfo.get("user"),(String)syncDbInfo.get("password")); diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java index 01f7487..aed5029 100644 --- a/nms_sync/src/com/nms/main/SyncData.java +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -54,7 +54,7 @@ public class SyncData{ // 主库向分库同步数据 SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); logger.info("创建主库同步分库线程执行任务"); - scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); + //scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); // 分库向主库同步数据 logger.info("创建分库数据同步到主库线程执行任务"); SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo)); diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java index 02c8af6..ea64aaf 100644 --- a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -37,7 +37,7 @@ public class SyncSlaveToMasterThread implements Runnable{ // 获取url路径 final StringBuffer url=new StringBuffer(); url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" - + syncDbInfo.get("database_name")); + + syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true"); logger.info("当前分库数据库连接为"+url); List find = Db.use(url.toString()).find("select * from table_sync_info"); logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); @@ -47,7 +47,7 @@ public class SyncSlaveToMasterThread implements Runnable{ boolean flag = true; // 判断表中的event事件 1代表insert 2代表update 3代表delete if (record.getInt("event") == 1) { - if(record.getInt("mode").equals(1)){ + if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){ while (flag) { // 新增操作 取出最后更新id信息 查询增量数据 final List data = Db.use(url.toString()) @@ -57,19 +57,30 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); - for(Record entity:data) { - entity.remove(record.getStr("id_name")); + if(record.getInt("mode").equals(2)) { + for(Record entity:data) { + entity.remove(record.getStr("id_name")); + } + } + // 针对监测结果表的id值 自动生成处理 + if("detection_info_new".equals(record.getStr("table_name"))) { + for(Record entity:data) { + Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + entity.set("old_id",entity.getLong(record.getStr("id_name"))); + entity.set("db_id", syncDbInfo.getInt("id")); + entity.set(record.getStr("id_name"), seqData.getLong("seqId")); + } } Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use().tx(new IAtom() { + return Db.use("masterDataSource").tx(new IAtom() { @Override public boolean run() throws SQLException { Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 - logger.info("主库同步增量更新数据完成"); - logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); + logger.info("分库同步增量更新数据完成"); + logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); Db.use(url.toString()).update("table_sync_info", record); @@ -78,7 +89,7 @@ public class SyncSlaveToMasterThread implements Runnable{ }); } }); - logger.info("主库同步增量更新数据完成 修改最后同步ID"); + logger.info("分库同步增量更新数据完成 修改最后同步ID"); } else { flag = false; } @@ -97,7 +108,7 @@ public class SyncSlaveToMasterThread implements Runnable{ Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use().tx(new IAtom() { + return Db.use("masterDataSource").tx(new IAtom() { @Override public boolean run() throws SQLException { List insertIds = new ArrayList(); @@ -128,7 +139,7 @@ public class SyncSlaveToMasterThread implements Runnable{ Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 Object lastInsertId = data.get(data.size() - 1).get("id"); - logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); + logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); record.set("last_id", lastInsertId); record.set("last_date", new Date()); Db.use(url.toString()).update("table_sync_info", record); @@ -137,7 +148,7 @@ public class SyncSlaveToMasterThread implements Runnable{ }); } }); - logger.info("增量更新数据任务结束"); + logger.info("分库增量更新数据任务结束"); } else { flag = false; } @@ -155,7 +166,7 @@ public class SyncSlaveToMasterThread implements Runnable{ Db.use(url.toString()).tx(new IAtom() { @Override public boolean run() throws SQLException { - return Db.use().tx(new IAtom() { + return Db.use("masterDataSource").tx(new IAtom() { @Override public boolean run() throws SQLException { List updateIds = new ArrayList(); @@ -177,7 +188,7 @@ public class SyncSlaveToMasterThread implements Runnable{ updateIds.toArray()); logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas)); if (updateDatas != null && updateDatas.size() > 0) { - if(record.getStr("table_name").equals("event_record_library")) { + if(record.getStr("table_name").equals("event_record_library")||record.getStr("table_name").equals("detection_info_new")) { for(Record updateData:updateDatas) { updateData.set("old_id",updateData.getLong("id")); updateData.set("db_id", syncDbInfo.get("id")); @@ -207,7 +218,7 @@ public class SyncSlaveToMasterThread implements Runnable{ logger.info("分库同步主库删除数据任务完成"); } Object lastUpdateId = datas.get(datas.size() - 1).get("id"); - logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId); + logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId); record.set("last_id", lastUpdateId); record.set("last_date", new Date()); Db.use(url.toString()).update("table_sync_info", record); @@ -225,6 +236,7 @@ public class SyncSlaveToMasterThread implements Runnable{ } } logger.info("分库数据同步主库结束"); + logger.info("##################################################"); } catch (Exception e) { e.printStackTrace(); } diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java index 9dc684c..5e9a8e3 100644 --- a/nms_sync/src/com/nms/thread/SyncThread.java +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -52,7 +52,7 @@ public class SyncThread implements Runnable { // 判断表中的event事件 1代表insert 2代表update 3代表delete if (record.getInt("event") == 1) { //根据mode判断主键产生方式 - if(record.getInt("mode").equals(1)){ + if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){ while (flag) { // 查询增量数据 final List data =Db.use("masterDataSource") @@ -62,8 +62,10 @@ public class SyncThread implements Runnable { //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); if (data != null && data.size() > 0) { final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); - for(Record entity:data) { - entity.remove(record.getStr("id_name")); + if(record.getInt("mode").equals(2)) { + for(Record entity:data) { + entity.remove(record.getStr("id_name")); + } } //多数据源事务 主数据源嵌套子数据源 Db.use().tx(new IAtom() { @@ -231,6 +233,7 @@ public class SyncThread implements Runnable { } } logger.info("主库数据同步分库结束"); + logger.info("*****************************************************"); } catch (Exception e) { e.printStackTrace(); }