update
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
|
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
|
||||||
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
|
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
|
||||||
dburl=jdbc:mysql://192.168.11.153:3306/nms
|
dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
|
||||||
#\u6570\u636e\u5e93\u8d26\u6237\u540d
|
#\u6570\u636e\u5e93\u8d26\u6237\u540d
|
||||||
dbusername=nms
|
dbusername=nms
|
||||||
#\u6570\u636e\u5e93\u5bc6\u7801
|
#\u6570\u636e\u5e93\u5bc6\u7801
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
|
#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}]
|
||||||
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
|
#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740
|
||||||
dburl=jdbc:mysql://192.168.11.153:3306/nms
|
dburl=jdbc:mysql://192.168.11.153:3306/nms?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true
|
||||||
#\u6570\u636e\u5e93\u8d26\u6237\u540d
|
#\u6570\u636e\u5e93\u8d26\u6237\u540d
|
||||||
dbusername=nms
|
dbusername=nms
|
||||||
#\u6570\u636e\u5e93\u5bc6\u7801
|
#\u6570\u636e\u5e93\u5bc6\u7801
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.nms.interceptor;
|
package com.nms.interceptor;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -22,7 +23,7 @@ public class SyncDataInterceptor implements Interceptor{
|
|||||||
SyncThread target = inv.getTarget();
|
SyncThread target = inv.getTarget();
|
||||||
SyncDbInfo syncDbInfo = target.getSyncDbInfo();
|
SyncDbInfo syncDbInfo = target.getSyncDbInfo();
|
||||||
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
||||||
+ syncDbInfo.get("database_name");
|
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
|
||||||
logger.info("当前数据库连接为 "+url);
|
logger.info("当前数据库连接为 "+url);
|
||||||
//同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构
|
//同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构
|
||||||
Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId());
|
Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId());
|
||||||
@@ -91,6 +92,17 @@ public class SyncDataInterceptor implements Interceptor{
|
|||||||
SyncStoredProcedure syncStoreProcedure=new SyncStoredProcedure("pro_createTable",checkTypeInfo.getStr("TABLE_NAME"), sql.toString(),cIndexFileds.toString());
|
SyncStoredProcedure syncStoreProcedure=new SyncStoredProcedure("pro_createTable",checkTypeInfo.getStr("TABLE_NAME"), sql.toString(),cIndexFileds.toString());
|
||||||
Db.use(url).execute(syncStoreProcedure);
|
Db.use(url).execute(syncStoreProcedure);
|
||||||
logger.info("创建新表操作完成");
|
logger.info("创建新表操作完成");
|
||||||
|
|
||||||
|
//创建完新表结构以后 将同步信息添加到同步表 使新配置的监测数据同步到中心
|
||||||
|
Record record = new Record();
|
||||||
|
record.set("table_name", checkTypeInfo.getStr("TABLE_NAME"));
|
||||||
|
record.set("mode", 2);
|
||||||
|
record.set("id_name", "ID");
|
||||||
|
record.set("event",1);
|
||||||
|
record.set("last_id", -1);
|
||||||
|
record.set("db_id", -1);
|
||||||
|
record.set("last_date", new Date());
|
||||||
|
Db.use(url).save("table_sync_info", record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ public class Conn {
|
|||||||
Logger logger = Logger.getLogger(Conn.class);
|
Logger logger = Logger.getLogger(Conn.class);
|
||||||
logger.info("开始创建各分库数据库的连接池");
|
logger.info("开始创建各分库数据库的连接池");
|
||||||
for (SyncDbInfo syncDbInfo : syncDbInfos) {
|
for (SyncDbInfo syncDbInfo : syncDbInfos) {
|
||||||
String url="jdbc:mysql://"+syncDbInfo.get("ip")+":"+syncDbInfo.get("port")+"/"+syncDbInfo.get("database_name");
|
String url="jdbc:mysql://"+syncDbInfo.get("ip")+":"+syncDbInfo.get("port")+"/"+syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
|
||||||
logger.info("当前创建数据库连接信息为"+url);
|
logger.info("当前创建数据库连接信息为"+url);
|
||||||
//初始化各数据源插件
|
//初始化各数据源插件
|
||||||
DruidPlugin druid=new DruidPlugin(url,(String)syncDbInfo.get("user"),(String)syncDbInfo.get("password"));
|
DruidPlugin druid=new DruidPlugin(url,(String)syncDbInfo.get("user"),(String)syncDbInfo.get("password"));
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ public class SyncData{
|
|||||||
// 主库向分库同步数据
|
// 主库向分库同步数据
|
||||||
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
|
SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo));
|
||||||
logger.info("创建主库同步分库线程执行任务");
|
logger.info("创建主库同步分库线程执行任务");
|
||||||
scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
|
//scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS);
|
||||||
// 分库向主库同步数据
|
// 分库向主库同步数据
|
||||||
logger.info("创建分库数据同步到主库线程执行任务");
|
logger.info("创建分库数据同步到主库线程执行任务");
|
||||||
SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));
|
SyncSlaveToMasterThread syncSlaveToMasterThread = Duang.duang(new SyncSlaveToMasterThread(syncDbInfo));
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
// 获取url路径
|
// 获取url路径
|
||||||
final StringBuffer url=new StringBuffer();
|
final StringBuffer url=new StringBuffer();
|
||||||
url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
url.append("jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
||||||
+ syncDbInfo.get("database_name"));
|
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true");
|
||||||
logger.info("当前分库数据库连接为"+url);
|
logger.info("当前分库数据库连接为"+url);
|
||||||
List<Record> find = Db.use(url.toString()).find("select * from table_sync_info");
|
List<Record> find = Db.use(url.toString()).find("select * from table_sync_info");
|
||||||
logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
|
logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find));
|
||||||
@@ -47,7 +47,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
boolean flag = true;
|
boolean flag = true;
|
||||||
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
||||||
if (record.getInt("event") == 1) {
|
if (record.getInt("event") == 1) {
|
||||||
if(record.getInt("mode").equals(1)){
|
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
|
||||||
while (flag) {
|
while (flag) {
|
||||||
// 新增操作 取出最后更新id信息 查询增量数据
|
// 新增操作 取出最后更新id信息 查询增量数据
|
||||||
final List<Record> data = Db.use(url.toString())
|
final List<Record> data = Db.use(url.toString())
|
||||||
@@ -57,19 +57,30 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data));
|
||||||
if (data != null && data.size() > 0) {
|
if (data != null && data.size() > 0) {
|
||||||
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
||||||
for(Record entity:data) {
|
if(record.getInt("mode").equals(2)) {
|
||||||
entity.remove(record.getStr("id_name"));
|
for(Record entity:data) {
|
||||||
|
entity.remove(record.getStr("id_name"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 针对监测结果表的id值 自动生成处理
|
||||||
|
if("detection_info_new".equals(record.getStr("table_name"))) {
|
||||||
|
for(Record entity:data) {
|
||||||
|
Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual");
|
||||||
|
entity.set("old_id",entity.getLong(record.getStr("id_name")));
|
||||||
|
entity.set("db_id", syncDbInfo.getInt("id"));
|
||||||
|
entity.set(record.getStr("id_name"), seqData.getLong("seqId"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
return Db.use().tx(new IAtom() {
|
return Db.use("masterDataSource").tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size"));
|
||||||
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
||||||
logger.info("主库同步增量更新数据完成");
|
logger.info("分库同步增量更新数据完成");
|
||||||
logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
|
logger.info("分库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId));
|
||||||
record.set("last_id", lastInsertId);
|
record.set("last_id", lastInsertId);
|
||||||
record.set("last_date", new Date());
|
record.set("last_date", new Date());
|
||||||
Db.use(url.toString()).update("table_sync_info", record);
|
Db.use(url.toString()).update("table_sync_info", record);
|
||||||
@@ -78,7 +89,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
logger.info("主库同步增量更新数据完成 修改最后同步ID");
|
logger.info("分库同步增量更新数据完成 修改最后同步ID");
|
||||||
} else {
|
} else {
|
||||||
flag = false;
|
flag = false;
|
||||||
}
|
}
|
||||||
@@ -97,7 +108,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
return Db.use().tx(new IAtom() {
|
return Db.use("masterDataSource").tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
List<Integer> insertIds = new ArrayList<Integer>();
|
List<Integer> insertIds = new ArrayList<Integer>();
|
||||||
@@ -128,7 +139,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size"));
|
||||||
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
// 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用
|
||||||
Object lastInsertId = data.get(data.size() - 1).get("id");
|
Object lastInsertId = data.get(data.size() - 1).get("id");
|
||||||
logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
|
logger.info("分库增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId));
|
||||||
record.set("last_id", lastInsertId);
|
record.set("last_id", lastInsertId);
|
||||||
record.set("last_date", new Date());
|
record.set("last_date", new Date());
|
||||||
Db.use(url.toString()).update("table_sync_info", record);
|
Db.use(url.toString()).update("table_sync_info", record);
|
||||||
@@ -137,7 +148,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
logger.info("增量更新数据任务结束");
|
logger.info("分库增量更新数据任务结束");
|
||||||
} else {
|
} else {
|
||||||
flag = false;
|
flag = false;
|
||||||
}
|
}
|
||||||
@@ -155,7 +166,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
Db.use(url.toString()).tx(new IAtom() {
|
Db.use(url.toString()).tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
return Db.use().tx(new IAtom() {
|
return Db.use("masterDataSource").tx(new IAtom() {
|
||||||
@Override
|
@Override
|
||||||
public boolean run() throws SQLException {
|
public boolean run() throws SQLException {
|
||||||
List<Integer> updateIds = new ArrayList<Integer>();
|
List<Integer> updateIds = new ArrayList<Integer>();
|
||||||
@@ -177,7 +188,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
updateIds.toArray());
|
updateIds.toArray());
|
||||||
logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
|
logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas));
|
||||||
if (updateDatas != null && updateDatas.size() > 0) {
|
if (updateDatas != null && updateDatas.size() > 0) {
|
||||||
if(record.getStr("table_name").equals("event_record_library")) {
|
if(record.getStr("table_name").equals("event_record_library")||record.getStr("table_name").equals("detection_info_new")) {
|
||||||
for(Record updateData:updateDatas) {
|
for(Record updateData:updateDatas) {
|
||||||
updateData.set("old_id",updateData.getLong("id"));
|
updateData.set("old_id",updateData.getLong("id"));
|
||||||
updateData.set("db_id", syncDbInfo.get("id"));
|
updateData.set("db_id", syncDbInfo.get("id"));
|
||||||
@@ -207,7 +218,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
logger.info("分库同步主库删除数据任务完成");
|
logger.info("分库同步主库删除数据任务完成");
|
||||||
}
|
}
|
||||||
Object lastUpdateId = datas.get(datas.size() - 1).get("id");
|
Object lastUpdateId = datas.get(datas.size() - 1).get("id");
|
||||||
logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId);
|
logger.info("分库同步获取最后一次操作数据的数据ID信息为"+lastUpdateId);
|
||||||
record.set("last_id", lastUpdateId);
|
record.set("last_id", lastUpdateId);
|
||||||
record.set("last_date", new Date());
|
record.set("last_date", new Date());
|
||||||
Db.use(url.toString()).update("table_sync_info", record);
|
Db.use(url.toString()).update("table_sync_info", record);
|
||||||
@@ -225,6 +236,7 @@ public class SyncSlaveToMasterThread implements Runnable{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.info("分库数据同步主库结束");
|
logger.info("分库数据同步主库结束");
|
||||||
|
logger.info("##################################################");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ public class SyncThread implements Runnable {
|
|||||||
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
// 判断表中的event事件 1代表insert 2代表update 3代表delete
|
||||||
if (record.getInt("event") == 1) {
|
if (record.getInt("event") == 1) {
|
||||||
//根据mode判断主键产生方式
|
//根据mode判断主键产生方式
|
||||||
if(record.getInt("mode").equals(1)){
|
if(record.getInt("mode").equals(1)||record.getInt("mode").equals(2)){
|
||||||
while (flag) {
|
while (flag) {
|
||||||
// 查询增量数据
|
// 查询增量数据
|
||||||
final List<Record> data =Db.use("masterDataSource")
|
final List<Record> data =Db.use("masterDataSource")
|
||||||
@@ -62,8 +62,10 @@ public class SyncThread implements Runnable {
|
|||||||
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
//logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data));
|
||||||
if (data != null && data.size() > 0) {
|
if (data != null && data.size() > 0) {
|
||||||
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
final Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name"));
|
||||||
for(Record entity:data) {
|
if(record.getInt("mode").equals(2)) {
|
||||||
entity.remove(record.getStr("id_name"));
|
for(Record entity:data) {
|
||||||
|
entity.remove(record.getStr("id_name"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//多数据源事务 主数据源嵌套子数据源
|
//多数据源事务 主数据源嵌套子数据源
|
||||||
Db.use().tx(new IAtom() {
|
Db.use().tx(new IAtom() {
|
||||||
@@ -231,6 +233,7 @@ public class SyncThread implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.info("主库数据同步分库结束");
|
logger.info("主库数据同步分库结束");
|
||||||
|
logger.info("*****************************************************");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user