commit 4929220ca98b643abaace509e80a4b73e5508a87 Author: chenjinsong Date: Thu Sep 27 16:25:51 2018 +0800 initial commit diff --git a/nms_sync/.classpath b/nms_sync/.classpath new file mode 100644 index 0000000..a9c6d9f --- /dev/null +++ b/nms_sync/.classpath @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/nms_sync/.project b/nms_sync/.project new file mode 100644 index 0000000..9f8cd42 --- /dev/null +++ b/nms_sync/.project @@ -0,0 +1,17 @@ + + + nms_sync + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + + diff --git a/nms_sync/.settings/org.eclipse.core.resources.prefs b/nms_sync/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..99f26c0 --- /dev/null +++ b/nms_sync/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,2 @@ +eclipse.preferences.version=1 +encoding/=UTF-8 diff --git a/nms_sync/.settings/org.eclipse.jdt.core.prefs b/nms_sync/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..d17b672 --- /dev/null +++ b/nms_sync/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,12 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7 +org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve +org.eclipse.jdt.core.compiler.compliance=1.7 +org.eclipse.jdt.core.compiler.debug.lineNumber=generate +org.eclipse.jdt.core.compiler.debug.localVariable=generate +org.eclipse.jdt.core.compiler.debug.sourceFile=generate +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.source=1.7 diff --git a/nms_sync/conf/config.properties b/nms_sync/conf/config.properties new file mode 100644 index 0000000..c32ace2 --- /dev/null +++ b/nms_sync/conf/config.properties @@ -0,0 +1,4 @@ +#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee +syncMaterToSlaveTime=30000 +#\u4e3b\u5e93\u540c\u6b65\u5206\u5e93\u6570\u636e\u7ebf\u7a0b\u65f6\u95f4\u5dee +syncSlaveToMaterTime=60000 \ No newline at end of file diff --git a/nms_sync/conf/db.properties b/nms_sync/conf/db.properties new file mode 100644 index 0000000..f9730b4 --- /dev/null +++ b/nms_sync/conf/db.properties @@ -0,0 +1,17 @@ +#dbinfo=[{"databaseName":"nms_sync","userName":"nms","isMaster":true,"password":"nms","url":"jdbc:mysql://localhost:3306/nms_sync"}] +#\u6570\u636e\u5e93\u8fde\u63a5\u5730\u5740 +dburl=jdbc:mysql://10.0.6.247:3306/nms-dev +#\u6570\u636e\u5e93\u8d26\u6237\u540d +dbusername=root +#\u6570\u636e\u5e93\u5bc6\u7801 +dbpassword=111111 +#\u6570\u636e\u5e93\u540d\u79f0 +dbname=nms-dev +#\u8fde\u63a5\u6c60\u521d\u59cb\u5316\u5927\u5c0f +dbInitialSize=1 +#\u6700\u5927\u8fde\u63a5\u6570 +dbMaxActive=2 +#\u6700\u5c0f\u8fde\u63a5\u6570 +dbMinIdle=1 +#\u6700\u5927\u7b49\u5f85\u8fde\u63a5\u65f6\u95f4 +dbMaxWait=60000 \ No newline at end of file diff --git a/nms_sync/conf/log4j.properties b/nms_sync/conf/log4j.properties new file mode 100644 index 0000000..10f0a91 --- /dev/null +++ b/nms_sync/conf/log4j.properties @@ -0,0 +1,13 @@ +log4j.rootLogger=DEBUG, stdout, file +#log4j.rootLogger=ERROR, stdout, file +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%n%-d{yyyy-MM-dd HH:mm:ss}%n[%p]-[Thread: %t]-[%C.%M()]: %m%n +log4j.appender.stdout.layout.ConversionPattern=%d %p (%F:%L) [%t] - <%m>%n + +# Output to the File +#log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +#log4j.appender.file.DatePattern='_'yyyy-MM-dd'.log' +#log4j.appender.file.File=./sync.log +#log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%n%-d{yyyy-MM-dd HH:mm:ss}%n[%p]-[Thread: %t]-[%C.%M()]: %m%n \ No newline at end of file diff --git a/nms_sync/lib/c3p0-0.9.1.2.jar b/nms_sync/lib/c3p0-0.9.1.2.jar new file mode 100644 index 0000000..0f42d60 Binary files /dev/null and b/nms_sync/lib/c3p0-0.9.1.2.jar differ diff --git a/nms_sync/lib/druid-1.0.29.jar b/nms_sync/lib/druid-1.0.29.jar new file mode 100644 index 0000000..9278cc3 Binary files /dev/null and b/nms_sync/lib/druid-1.0.29.jar differ diff --git a/nms_sync/lib/fastjson-1.2.47.jar b/nms_sync/lib/fastjson-1.2.47.jar new file mode 100644 index 0000000..f342bca Binary files /dev/null and b/nms_sync/lib/fastjson-1.2.47.jar differ diff --git a/nms_sync/lib/jfinal-3.4-bin-with-src.jar b/nms_sync/lib/jfinal-3.4-bin-with-src.jar new file mode 100644 index 0000000..e0f28c5 Binary files /dev/null and b/nms_sync/lib/jfinal-3.4-bin-with-src.jar differ diff --git a/nms_sync/lib/log4j-1.2.15.jar b/nms_sync/lib/log4j-1.2.15.jar new file mode 100644 index 0000000..c930a6a Binary files /dev/null and b/nms_sync/lib/log4j-1.2.15.jar differ diff --git a/nms_sync/lib/mysql-connector-java-5.1.0-bin.jar b/nms_sync/lib/mysql-connector-java-5.1.0-bin.jar new file mode 100644 index 0000000..d848c13 Binary files /dev/null and b/nms_sync/lib/mysql-connector-java-5.1.0-bin.jar differ diff --git a/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java new file mode 100644 index 0000000..d1c4943 --- /dev/null +++ b/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java @@ -0,0 +1,117 @@ +package com.nms.interceptor; + + +import java.util.List; +import org.apache.log4j.Logger; + +import com.alibaba.fastjson.JSON; +import com.jfinal.aop.Interceptor; +import com.jfinal.aop.Invocation; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.nms.model.SyncDbInfo; +import com.nms.thread.SyncThread; + +public class SyncDataInterceptor implements Interceptor{ + private Logger logger =Logger.getLogger(this.getClass()); + + @Override + public void intercept(Invocation inv) { + try{ + logger.info("--------数据同步前 syncDataInterceptor拦截器拦截------------"); + SyncThread target = inv.getTarget(); + SyncDbInfo syncDbInfo = target.getSyncDbInfo(); + String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + + syncDbInfo.get("database_name"); + logger.info("当前数据库连接为 "+url); + //同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构 + Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId()); + logger.info("获取metadata表中最后一次同步id的数据信息为 "+JSON.toJSONString(metadataTableSyncInfo)); + //开始执行同步过程 + inv.invoke(); + //处理同步数据结束 + + //判断metadata表 是否有新增数据 如果有执行sql 修改表结构 + if(metadataTableSyncInfo!=null){ + List metadatas = Db.use(url).find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id")); + logger.info("metadata表中新增数据信息查询结果为 "+JSON.toJSONString(metadatas)); + if(metadatas!=null && metadatas.size()>0){ + for(Record metadata:metadatas){ + Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name")); + logger.info("判断metadata表新增数据是修改还是新增表操作结果为"+JSON.toJSONString(isExist)); + //向数据库中添加新的字段 + if(isExist.getInt("count")>0){ + StringBuffer sqlString = new StringBuffer("alter table "); + sqlString.append(metadata.getStr("table_name").toUpperCase()); + sqlString.append(" add("); + sqlString.append(metadata.getStr("filed_name")+" "+ toMysqlType(metadata.getStr("filed_type"))+")"); + logger.info("修改metadata表结构 sql语句为"+sqlString.toString()); + //执行添加字段 + int resu =Db.use(url).update(sqlString.toString()); + logger.info("修改表结构结果为 "+resu); + } + } + } + } + //判断check_type_info表 是否有新增数据 如果有执行存储过程 创建新表 + List checkTypeInfos = Db.use(url).find(" select * from check_type_info where crete_state=0 "); + for(Record checkTypeInfo : checkTypeInfos){ + //判断表是否存在 + Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME")); + logger.info("check_type_info表中有新增数据 判断表是否创建"+JSON.toJSONString(isExist)); + if(isExist.getInt("count")>0){ + continue; + }else{ + //创建表数据 + String filedName =""; + String filedType =""; + StringBuffer sql= new StringBuffer(); + StringBuffer cIndexFileds = new StringBuffer("data_check_time:seq_id:detection_set_info_id:"); + List metadatas2 = Db.use(url).find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID")); + if(metadatas2!=null && metadatas2.size()>0) { + for(int i=0;i syncDbInfos){ + Logger logger = Logger.getLogger(Conn.class); + logger.info("开始创建各分库数据库的连接池"); + for (SyncDbInfo syncDbInfo : syncDbInfos) { + String url="jdbc:mysql://"+syncDbInfo.get("ip")+":"+syncDbInfo.get("port")+"/"+syncDbInfo.get("database_name"); + logger.info("当前创建数据库连接信息为"+url); + //初始化各数据源插件 + DruidPlugin druid=new DruidPlugin(url,(String)syncDbInfo.get("user"),(String)syncDbInfo.get("password")); + druid.setInitialSize(Integer.valueOf(PropKit.get("dbInitialSize"))); + druid.setMaxActive(Integer.valueOf(PropKit.get("dbMaxActive"))); + druid.setMinIdle(Integer.valueOf(PropKit.get("dbMinIdle"))); + druid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait"))); + ActiveRecordPlugin arp=new ActiveRecordPlugin(url,druid); + arp.setShowSql(true); + druid.start(); + arp.start(); + } + logger.info("创建各分库数据库的连接池完成"); + } +} diff --git a/nms_sync/src/com/nms/main/SyncData.java b/nms_sync/src/com/nms/main/SyncData.java new file mode 100644 index 0000000..d78727c --- /dev/null +++ b/nms_sync/src/com/nms/main/SyncData.java @@ -0,0 +1,66 @@ +package com.nms.main; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.jfinal.aop.Duang; +import com.jfinal.kit.PropKit; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.druid.DruidPlugin; +import com.nms.model.SyncDbInfo; +import com.nms.thread.SyncSlaveToMasterThread; +import com.nms.thread.SyncThread; +/** + * 数据同步主功能 相当于主动推送操作 + * @author Administrator + * + */ +public class SyncData{ + public static void main(String[] args) { + Logger logger = Logger.getLogger(SyncData.class); + logger.info("同步程序开始启动"); + //从配置文件获取数据库连接信息 + PropKit.use("db.properties"); + //创建主数据库数据源 + DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword")); + masterDruid.setInitialSize(Integer.valueOf(PropKit.get("dbInitialSize"))); + masterDruid.setMaxActive(Integer.valueOf(PropKit.get("dbMaxActive"))); + masterDruid.setMinIdle(Integer.valueOf(PropKit.get("dbMinIdle"))); + masterDruid.setMaxWait(Integer.valueOf(PropKit.get("dbMaxWait"))); + ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid); + masterArp.setShowSql(true); + masterDruid.start(); + masterArp.start(); + logger.info("加载配置文件 设置当前同步 masterDataSource 完成"); + List syncDbInfos = SyncDbInfo.dao.use("masterDataSource").find("select * from sync_db_info"); + logger.info("数据库获取其它分库 数据库连接信息"+JSON.toJSONString(syncDbInfos)); + if(syncDbInfos!=null&&syncDbInfos.size()>0){ + //创建其它数据源的连接 + Conn.createConn(syncDbInfos); + logger.info("分库数据库连接池创建完成"); + // 定时周期执行线程池 用于周期执行线程的运行 + ScheduledExecutorService scheduleService = Executors.newScheduledThreadPool(syncDbInfos.size()); + logger.info("创建线程池完毕 数量大小为"+syncDbInfos.size()); + // 使用scheduleWithFixedDleay在上一个线程任务执行完成后 5分钟执行下一次任务 + for(SyncDbInfo syncDbInfo : syncDbInfos){ + // 主库向分库同步数据 + SyncThread syncThread = Duang.duang(new SyncThread(syncDbInfo)); + logger.info("创建主库同步分库线程执行任务"); + scheduleService.scheduleWithFixedDelay(syncThread, 0, Integer.valueOf(PropKit.use("config.properties").get("syncMaterToSlaveTime")), TimeUnit.MILLISECONDS); + // 分库向主库同步数据 + logger.info("创建分库数据同步到主库线程执行任务"); + //scheduleService.scheduleWithFixedDelay(new SyncSlaveToMasterThread(syncDbInfo), 0, Integer.valueOf(PropKit.use("config.properties").get("syncSlaveToMaterTime")), TimeUnit.MILLISECONDS); + } + }else{ + logger.info("获取同步记录信息失败 请检查数据库数据信息"); + } + } +} diff --git a/nms_sync/src/com/nms/model/SyncDbInfo.java b/nms_sync/src/com/nms/model/SyncDbInfo.java new file mode 100644 index 0000000..dbd2ed0 --- /dev/null +++ b/nms_sync/src/com/nms/model/SyncDbInfo.java @@ -0,0 +1,11 @@ +package com.nms.model; + +import com.nms.model.basemodel.BaseSyncDbInfo; + +/** + * Generated by JFinal. + */ +@SuppressWarnings("serial") +public class SyncDbInfo extends BaseSyncDbInfo { + public static final SyncDbInfo dao = new SyncDbInfo().dao(); +} diff --git a/nms_sync/src/com/nms/model/TableEventLog.java b/nms_sync/src/com/nms/model/TableEventLog.java new file mode 100644 index 0000000..d7ee628 --- /dev/null +++ b/nms_sync/src/com/nms/model/TableEventLog.java @@ -0,0 +1,11 @@ +package com.nms.model; + +import com.nms.model.basemodel.BaseTableEventLog; + +/** + * Generated by JFinal. + */ +@SuppressWarnings("serial") +public class TableEventLog extends BaseTableEventLog { + public static final TableEventLog dao = new TableEventLog().dao(); +} diff --git a/nms_sync/src/com/nms/model/TableSyncInfo.java b/nms_sync/src/com/nms/model/TableSyncInfo.java new file mode 100644 index 0000000..0c7f9e2 --- /dev/null +++ b/nms_sync/src/com/nms/model/TableSyncInfo.java @@ -0,0 +1,11 @@ +package com.nms.model; + +import com.nms.model.basemodel.BaseTableSyncInfo; + +/** + * Generated by JFinal. + */ +@SuppressWarnings("serial") +public class TableSyncInfo extends BaseTableSyncInfo { + public static final TableSyncInfo dao = new TableSyncInfo().dao(); +} diff --git a/nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java b/nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java new file mode 100644 index 0000000..538137b --- /dev/null +++ b/nms_sync/src/com/nms/model/basemodel/BaseSyncDbInfo.java @@ -0,0 +1,60 @@ +package com.nms.model.basemodel; + +import com.jfinal.plugin.activerecord.Model; +import com.jfinal.plugin.activerecord.IBean; + +/** + * Generated by JFinal, do not modify this file. + */ +@SuppressWarnings("serial") +public abstract class BaseSyncDbInfo> extends Model implements IBean { + + public void setId(java.lang.Long id) { + set("id", id); + } + + public java.lang.Long getId() { + return getLong("id"); + } + + public void setIp(java.lang.String ip) { + set("ip", ip); + } + + public java.lang.String getIp() { + return getStr("ip"); + } + + public void setPort(java.lang.Integer port) { + set("port", port); + } + + public java.lang.Integer getPort() { + return getInt("port"); + } + + public void setDatabaseName(java.lang.String databaseName) { + set("database_name", databaseName); + } + + public java.lang.String getDatabaseName() { + return getStr("database_name"); + } + + public void setUser(java.lang.String user) { + set("user", user); + } + + public java.lang.String getUser() { + return getStr("user"); + } + + public void setPassword(java.lang.String password) { + set("password", password); + } + + public java.lang.String getPassword() { + return getStr("password"); + } + +} diff --git a/nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java b/nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java new file mode 100644 index 0000000..61f1f9d --- /dev/null +++ b/nms_sync/src/com/nms/model/basemodel/BaseTableEventLog.java @@ -0,0 +1,52 @@ +package com.nms.model.basemodel; + +import com.jfinal.plugin.activerecord.Model; +import com.jfinal.plugin.activerecord.IBean; + +/** + * Generated by JFinal, do not modify this file. + */ +@SuppressWarnings("serial") +public abstract class BaseTableEventLog> extends Model implements IBean { + + public void setId(java.lang.Long id) { + set("id", id); + } + + public java.lang.Long getId() { + return getLong("id"); + } + + public void setTable(java.lang.String table) { + set("table", table); + } + + public java.lang.String getTable() { + return getStr("table"); + } + + public void setEvent(java.lang.Integer event) { + set("event", event); + } + + public java.lang.Integer getEvent() { + return getInt("event"); + } + + public void setTargetId(java.lang.Long targetId) { + set("target_id", targetId); + } + + public java.lang.Long getTargetId() { + return getLong("target_id"); + } + + public void setIds(java.lang.String ids) { + set("ids", ids); + } + + public java.lang.String getIds() { + return getStr("ids"); + } + +} diff --git a/nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java b/nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java new file mode 100644 index 0000000..4557803 --- /dev/null +++ b/nms_sync/src/com/nms/model/basemodel/BaseTableSyncInfo.java @@ -0,0 +1,116 @@ +package com.nms.model.basemodel; + +import com.jfinal.plugin.activerecord.Model; +import com.jfinal.plugin.activerecord.IBean; + +/** + * Generated by JFinal, do not modify this file. + */ +@SuppressWarnings("serial") +public abstract class BaseTableSyncInfo> extends Model implements IBean { + + public void setId(java.lang.Long id) { + set("id", id); + } + + public java.lang.Long getId() { + return getLong("id"); + } + + public void setTableName(java.lang.String tableName) { + set("table_name", tableName); + } + + public java.lang.String getTableName() { + return getStr("table_name"); + } + + public void setEvent(java.lang.Integer event) { + set("event", event); + } + + public java.lang.Integer getEvent() { + return getInt("event"); + } + + public void setIdName(java.lang.String idName) { + set("id_name", idName); + } + + public java.lang.String getIdName() { + return getStr("id_name"); + } + + public void setIdNames(java.lang.String idNames) { + set("id_names", idNames); + } + + public java.lang.String getIdNames() { + return getStr("id_names"); + } + + public void setColumns(java.lang.String columns) { + set("columns", columns); + } + + public java.lang.String getColumns() { + return getStr("columns"); + } + + public void setWhere(java.lang.String where) { + set("where", where); + } + + public java.lang.String getWhere() { + return getStr("where"); + } + + public void setLastId(java.lang.Long lastId) { + set("last_Id", lastId); + } + + public java.lang.Long getLastId() { + return getLong("last_Id"); + } + + public void setLastDate(java.util.Date lastDate) { + set("last_date", lastDate); + } + + public java.util.Date getLastDate() { + return get("last_date"); + } + + public void setDbId(java.lang.Long dbId) { + set("db_id", dbId); + } + + public java.lang.Long getDbId() { + return getLong("db_id"); + } + + public void setMode(java.lang.Integer mode) { + set("mode", mode); + } + + public java.lang.Integer getMode() { + return getInt("mode"); + } + + public void setBatchSize(java.lang.Integer batchSize) { + set("batch_size", batchSize); + } + + public java.lang.Integer getBatchSize() { + return getInt("batch_size"); + } + + public void setInterceptor(java.lang.String interceptor) { + set("interceptor", interceptor); + } + + public java.lang.String getInterceptor() { + return getStr("interceptor"); + } + +} diff --git a/nms_sync/src/com/nms/test/TestClass.java b/nms_sync/src/com/nms/test/TestClass.java new file mode 100644 index 0000000..c9190ed --- /dev/null +++ b/nms_sync/src/com/nms/test/TestClass.java @@ -0,0 +1,304 @@ +package com.nms.test; + +import java.sql.CallableStatement; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import com.alibaba.fastjson.JSON; +import com.jfinal.kit.PropKit; +import com.jfinal.kit.StrKit; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.ICallback; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.c3p0.C3p0Plugin; +import com.jfinal.plugin.druid.DruidPlugin; +import com.mysql.jdbc.Connection; +import com.mysql.jdbc.PreparedStatement; +import com.mysql.jdbc.Statement; +import com.nms.main.Conn; + +public class TestClass { + private static Connection getConnection(){ + String driver ="com.mysql.jdbc.Driver"; + String url="jdbc:mysql://localhost:3306/nms_sync"; + String username="root"; + String password="root"; + Connection conn=null; + try{ + Class.forName(driver); + conn = (Connection) DriverManager.getConnection(url, username, password); + }catch(Exception e){ + e.printStackTrace(); + } + return conn; + } + + private static Connection getNmsConnection(){ + String driver ="com.mysql.jdbc.Driver"; + String url="jdbc:mysql://10.0.6.247:3306/nms"; + String username="nms"; + String password="nms"; + Connection conn=null; + try{ + Class.forName(driver); + conn = (Connection) DriverManager.getConnection(url, username, password); + }catch(Exception e){ + e.printStackTrace(); + } + return conn; + } + + @Test + public void testA(){ + Connection conn = getConnection(); + System.out.println(conn); + PreparedStatement pstmt=null; + try { + pstmt=(PreparedStatement) conn.prepareStatement("insert into sync_db_info (ip,port,database_name) values (?,?,?)"); + pstmt.setString(1, "10.0.6.247"); + pstmt.setInt(2, 8080); + pstmt.setString(3,"nms"); + int id = pstmt.executeUpdate(); + System.out.println(id); + } catch (SQLException e) { + e.printStackTrace(); + }finally{ + try { + if(pstmt!=null){ + pstmt.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + @Test + public void importData(){ + Connection nmsConn = getNmsConnection(); + Connection conn = getConnection(); + PreparedStatement pstmt=null; + PreparedStatement pstmt2=null; + try { + pstmt=(PreparedStatement)nmsConn.prepareStatement("select table_name as tn from information_schema.tables where TABLE_SCHEMA='nms'"); + ResultSet resultSet = pstmt.executeQuery(); + pstmt2=(PreparedStatement)conn.prepareStatement("insert into table_sync_info (table_name,event,last_id,last_date,db_id,mode) values (?,?,?,?,?,?)"); + while(resultSet.next()){ + String tableName = resultSet.getString("tn"); + + pstmt2.setString(1, tableName); + pstmt2.setInt(2, 1); + pstmt2.setInt(3, 1); + pstmt2.setDate(4,new Date(System.currentTimeMillis())); + pstmt2.setInt(5, 1); + pstmt2.setInt(6,1); + pstmt2.addBatch(); + + pstmt2.setString(1, tableName); + pstmt2.setInt(2, 2); + pstmt2.setInt(3, 1); + pstmt2.setDate(4,new Date(System.currentTimeMillis())); + pstmt2.setInt(5, 1); + pstmt2.setInt(6,1); + pstmt2.addBatch(); + + pstmt2.setString(1, tableName); + pstmt2.setInt(2, 3); + pstmt2.setInt(3, 1); + pstmt2.setDate(4,new Date(System.currentTimeMillis())); + pstmt2.setInt(5, 1); + pstmt2.setInt(6,1); + pstmt2.addBatch(); + } + pstmt2.executeBatch(); + } catch (Exception e) { + e.printStackTrace(); + }finally{ + try { + if(pstmt!=null){ + pstmt.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + try { + if(pstmt2!=null){ + pstmt2.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + + @Test + public void testDruid(){ + C3p0Plugin c3p0=new C3p0Plugin("jdbc:mysql://localhost:3306/nms_sync","root","root"); + c3p0.setInitialPoolSize(1); + c3p0.setMaxIdleTime(30); + c3p0.setMaxPoolSize(2); + c3p0.setMinPoolSize(1); + ActiveRecordPlugin arp=new ActiveRecordPlugin("c3p0",c3p0); + + c3p0.start(); + arp.start(); + + C3p0Plugin c3p02=new C3p0Plugin("jdbc:mysql://10.0.6.247:3306/nms","nms","nms"); + c3p02.setInitialPoolSize(1); + c3p02.setMaxIdleTime(30); + c3p02.setMaxPoolSize(2); + c3p02.setMinPoolSize(1); + ActiveRecordPlugin arp2=new ActiveRecordPlugin("c3p02",c3p02); + + c3p02.start(); + arp2.start(); + + DruidPlugin druid=new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms-dev","nms","nms"); + druid.setInitialSize(1); + druid.setMaxActive(2); + druid.setMinIdle(1); + druid.setMaxWait(60000); + ActiveRecordPlugin arp3=new ActiveRecordPlugin("druid",druid); + + druid.start(); + arp3.start(); + + List find = Db.find("select * from table_sync_info"); + for (Record record : find) { + System.out.println(record); + } + + System.out.println("----------------------------"); + + List find2 = Db.use("c3p02").find("select * from node_table"); + for (Record record : find2) { + System.out.println(record); + } + + + System.out.println("----------------------------"); + List find3 = Db.use("druid").find("select * from node_table"); + for (Record record : find3) { + System.out.println(record); + } + + } + + @Test + public void testGetDataSize(){ + DruidPlugin druid=new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms","nms","nms"); + druid.setInitialSize(1); + druid.setMaxActive(2); + druid.setMinIdle(1); + druid.setMaxWait(60000); + ActiveRecordPlugin arp=new ActiveRecordPlugin(druid); + druid.start(); + arp.start(); + List find = Db.find("select count(*) size from detect_info_cpu"); + System.out.println(find.get(0).getInt("size")); + } + + @Test + public void testBatchDelete(){ + PropKit.use("db.properties"); + DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword")); + masterDruid.setInitialSize(1); + masterDruid.setMaxActive(2); + masterDruid.setMinIdle(1); + masterDruid.setMaxWait(600000); + ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid); + masterArp.setShowSql(true); + masterDruid.start(); + masterArp.start(); + List ids=new ArrayList(); + ids.add(100026); + ids.add(100027); + List find = Db.find("select * from xt_yh_js_index where id in (?,?)",ids.toArray()); + System.out.println(JSON.toJSON(find)); + Object[] array = ids.toArray(); + System.out.println(array); + } + + @Test + public void testICallBack(){ + DruidPlugin druid=new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms-synctest2","nms","nms"); + druid.setInitialSize(1); + druid.setMaxActive(2); + druid.setMinIdle(1); + druid.setMaxWait(60000); + ActiveRecordPlugin arp3=new ActiveRecordPlugin("druid",druid); + + druid.start(); + arp3.start(); + Db.execute(new ICallback(){ + @Override + public Object call(java.sql.Connection conn) throws SQLException { + CallableStatement proc=null; + try{ + proc=conn.prepareCall("{call pro_createTable(?,?,?)}"); + proc.setString(1,"di_thtest"); + proc.setString(2,"age bigint,name varchar(11)"); + proc.setString(3, "data_check_time:seq_id:detection_set_info_id:"); + proc.execute(); + } catch (Exception e){ + e.printStackTrace(); + } finally{ + if(conn!=null){ + conn.close(); + } + if(proc!=null){ + proc.close(); + } + } + return null; + } + }); + } + + + @Test + public void addEventRecordLibraryData(){ + PropKit.use("db.properties"); + DruidPlugin masterDruid=new DruidPlugin(PropKit.get("dburl"),PropKit.get("dbusername"),PropKit.get("dbpassword")); + masterDruid.setInitialSize(1); + masterDruid.setMaxActive(2); + masterDruid.setMinIdle(1); + masterDruid.setMaxWait(600000); + ActiveRecordPlugin masterArp=new ActiveRecordPlugin("masterDataSource",masterDruid); + masterArp.setShowSql(true); + masterDruid.start(); + masterArp.start(); + + List find = Db.find("select * from event_record_library"); + List datas=new ArrayList(); + for(Record data:find){ + Record record=new Record(); + record.set("table_name", "event_record_library"); + record.set("event", 1); + record.set("target_id", data.getLong("id")); + datas.add(record); + } + Db.batchSave("table_event_log", datas, 500); + } +} diff --git a/nms_sync/src/com/nms/test/TestExecutors.java b/nms_sync/src/com/nms/test/TestExecutors.java new file mode 100644 index 0000000..30a7c0d --- /dev/null +++ b/nms_sync/src/com/nms/test/TestExecutors.java @@ -0,0 +1,32 @@ +package com.nms.test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Test; + +public class TestExecutors { + @Test + public void test1(){ + ExecutorService service = Executors.newFixedThreadPool(5); + for(int i=0;i<6;i++){ + service.execute(new TestThread(i)); + } + } + + + class TestThread implements Runnable{ + private int index; + public TestThread(int index){ + this.index=index; + } + + @Override + public void run(){ + if(index==3){ + throw new RuntimeException("error"); + } + System.out.println("test"+index); + } + } +} diff --git a/nms_sync/src/com/nms/test/TestThread.java b/nms_sync/src/com/nms/test/TestThread.java new file mode 100644 index 0000000..061bc72 --- /dev/null +++ b/nms_sync/src/com/nms/test/TestThread.java @@ -0,0 +1,37 @@ +package com.nms.test; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.alibaba.fastjson.JSON; +import com.jfinal.plugin.activerecord.ActiveRecordPlugin; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.druid.DruidPlugin; + +public class TestThread implements Runnable{ + static{ + DruidPlugin druid=new DruidPlugin("jdbc:mysql://localhost:3306/nms_sync","nms","nms"); + druid.setInitialSize(1); + druid.setMaxActive(2); + druid.setMinIdle(1); + druid.setMaxWait(60000); + ActiveRecordPlugin arp=new ActiveRecordPlugin(druid); + druid.start(); + arp.start(); + } + @Override + public void run() { + System.out.println("进入线程任务"); + Record find = Db.findFirst(" select * from table_sync_info where id =1 "); + System.out.println(JSON.toJSON(find)); + System.out.println("线程任务结束"); + } + + public static void main(String[] args) { + ScheduledExecutorService service = Executors.newScheduledThreadPool(1); + service.scheduleWithFixedDelay(new TestThread(), 0, 5000, TimeUnit.MILLISECONDS); + } +} diff --git a/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java new file mode 100644 index 0000000..18f1b31 --- /dev/null +++ b/nms_sync/src/com/nms/thread/SyncSlaveToMasterThread.java @@ -0,0 +1,196 @@ +package com.nms.thread; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.log4j.Logger; + +import com.alibaba.fastjson.JSON; +import com.jfinal.aop.Before; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.IAtom; +import com.jfinal.plugin.activerecord.Record; +import com.jfinal.plugin.activerecord.tx.Tx; +import com.nms.model.SyncDbInfo; + +@Before({Tx.class}) +public class SyncSlaveToMasterThread implements Runnable{ + private Logger logger = Logger.getLogger(this.getClass()); + private SyncDbInfo syncDbInfo; + + public SyncSlaveToMasterThread(SyncDbInfo syncDbInfo) { + this.syncDbInfo = syncDbInfo; + } + + @Override + public void run() { + try { + // 主库向分库同步数据 + logger.info("开始分库数据同步主库"); + // 获取url路径 + final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + + syncDbInfo.get("database_name"); + logger.info("当前分库数据库连接为"+url); + List find = Db.use(url).find("select * from table_sync_info"); + logger.info("查询分库需要同步到主库的数据信息"+JSON.toJSONString(find)); + if (find != null && find.size() > 0) { + for (final Record record : find) { + // 循环同步数据标识 + boolean flag = true; + // 判断表中的event事件 1代表insert 2代表update 3代表delete + if (record.getInt("event") == 1) { + if(record.getInt("mode").equals(1)){ + while (flag) { + // 新增操作 取出最后更新id信息 查询增量数据 + final List data = Db.use(url) + .find("select * from " + record.getStr("table_name") + " where " + + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit " + record.getInt("batch_size"), + record.getInt("last_id")); + logger.info("分库同步到主库新增数据信息为"+JSON.toJSONString(data)); + if (data != null && data.size() > 0) { + Db.use(url).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use().tx(new IAtom() { + @Override + public boolean run() throws SQLException { + Db.use("masterDataSource").batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); + // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 + logger.info("主库同步增量更新数据完成"); + Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); + logger.info("主库同步最后数据的id信息为"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url).update("table_sync_info", record); + return true; + } + }); + } + }); + logger.info("主库同步增量更新数据完成 修改最后同步ID"); + } else { + flag = false; + } + } + }else if(record.getInt("mode").equals(0)){ + //当数据库表结构主键不是自增时 增量更新的操作步骤 + while (flag) { + // 新增操作 取出最后更新id信息 查询增量数据 + final List data =Db.use(url) + .find("select * from table_event_log where table_name = '" + record.getStr("table_name") + + "' and id > " + record.getLong("last_id") + " and event = " + + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); + //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); + if (data != null && data.size() > 0) { + //多数据源事务 主数据源嵌套子数据源 + Db.use(url).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use().tx(new IAtom() { + @Override + public boolean run() throws SQLException { + List insertIds = new ArrayList(); + StringBuffer insertStr = new StringBuffer(); + for (int i = 0; i < data.size(); i++) { + insertIds.add(data.get(i).getInt("target_id")); + if (i == 0) { + insertStr.append("?"); + } else { + insertStr.append(",?"); + } + } + List insertDatas = Db.use(url) + .find(" select * from " + record.getStr("table_name") + " where " + + record.getStr("id_name") + " in (" + insertStr + ") ", + insertIds.toArray()); + for(Record insertData:insertDatas){ + Record seqData = Db.use("masterDataSource").findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); + } + Db.use("masterDataSource").batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 + Object lastInsertId = data.get(data.size() - 1).get("id"); + logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use(url).update("table_sync_info", record); + return true; + } + }); + } + }); + logger.info("增量更新数据任务结束"); + } else { + flag = false; + } + } + } + } else if (record.getInt("event") == 2 || record.getInt("event") == 3) { + // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 + while (flag) { + final List datas = Db.find( + " select * from table_event_log where table_name = '" + record.getStr("table_name") + + "' and id > " + record.getInt("last_id") + " and event = " + + record.getInt("event") + " order by id asc limit " + record.getInt("batch_size")); + logger.info("分库同步到主库数据的修改或者删除数据信息为"+JSON.toJSONString(datas)); + if (datas != null && datas.size() > 0) { + Db.use(url).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use().tx(new IAtom() { + @Override + public boolean run() throws SQLException { + List updateIds = new ArrayList(); + StringBuffer deleteStr = new StringBuffer(); + for (int i = 0; i < datas.size(); i++) { + updateIds.add(datas.get(i).getInt("target_id")); + if (i == 0) { + deleteStr.append("?"); + } else { + deleteStr.append(",?"); + } + } + logger.info("分库同步到主库数据的操作数据的ID信息"+JSON.toJSONString(updateIds)); + if (record.getInt("event") == 2) { + List updateDatas = Db.use(url) + .find(" select * from " + record.getStr("table_name") + " where " + + record.getStr("id_name") + " in (" + deleteStr + ") ", + updateIds.toArray()); + logger.info("分库修改的数据信息为"+JSON.toJSONString(updateDatas)); + if (updateDatas != null && updateDatas.size() > 0) { + Db.use("masterDataSource").batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } + logger.info("分库同步主库修改数据任务完成"); + } else if (record.getInt("event") == 3) { + Db.use("masterDataSource").update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" + + deleteStr + ") ", updateIds.toArray()); + logger.info("分库同步主库删除数据任务完成"); + } + Object lastUpdateId = datas.get(datas.size() - 1).get("id"); + logger.info("获取最后一次操作数据的数据ID信息为"+lastUpdateId); + record.set("last_id", lastUpdateId); + record.set("last_date", new Date()); + Db.use(url).update("table_sync_info", record); + return true; + } + }); + } + }); + logger.info("修改分库table_sync_info最后操作数据信息 用于下次同步操作完成"); + } else { + flag = false; + } + } + } + } + } + logger.info("分库数据同步主库结束"); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/nms_sync/src/com/nms/thread/SyncThread.java b/nms_sync/src/com/nms/thread/SyncThread.java new file mode 100644 index 0000000..db99c15 --- /dev/null +++ b/nms_sync/src/com/nms/thread/SyncThread.java @@ -0,0 +1,216 @@ +package com.nms.thread; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import org.apache.log4j.Logger; +import com.alibaba.fastjson.JSON; +import com.jfinal.aop.Before; +import com.jfinal.plugin.activerecord.Db; +import com.jfinal.plugin.activerecord.IAtom; +import com.jfinal.plugin.activerecord.Record; +import com.nms.interceptor.SyncDataInterceptor; +import com.nms.model.SyncDbInfo; +import com.jfinal.plugin.activerecord.tx.Tx; +/** + * 数据同步功能线程 + * + * @author Administrator + * + */ +@Before({/*SyncDataInterceptor.class,*/Tx.class}) +public class SyncThread implements Runnable { + private Logger logger = Logger.getLogger(this.getClass()); + private SyncDbInfo syncDbInfo; + + public SyncThread() { + super(); + } + + public SyncThread(SyncDbInfo syncDbInfo) { + super(); + this.syncDbInfo = syncDbInfo; + } + + @Override + public void run() { + try { + logger.info("开始主库数据同步分库任务"); + // 获取url路径 + final String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/" + + syncDbInfo.get("database_name"); + logger.info("获取分库数据库连接信息"+url); + List find = Db.use("masterDataSource").find("select * from table_sync_info where db_id=? ", + syncDbInfo.get("id")); + logger.info("查询主库须向分库同步数据信息"+JSON.toJSONString(find)); + if (find != null && find.size() > 0) { + for (final Record record : find) { + // 循环同步数据标识 + boolean flag = true; + // 判断表中的event事件 1代表insert 2代表update 3代表delete + if (record.getInt("event") == 1) { + //根据mode判断主键产生方式 + if(record.getInt("mode").equals(1)){ + while (flag) { + // 查询增量数据 + final List data =Db.use("masterDataSource") + .find("select * from " + record.getStr("table_name") + " where " + + record.getStr("id_name") + " > ? order by "+record.getStr("id_name")+" asc limit "+record.getInt("batch_size"), + record.getLong("last_id")); + //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); + if (data != null && data.size() > 0) { + //多数据源事务 主数据源嵌套子数据源 + Db.use().tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use(url).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + Db.use(url).batchSave(record.getStr("table_name"), data, record.getInt("batch_size")); + // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 + Object lastInsertId = data.get(data.size() - 1).get(record.getStr("id_name")); + logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use("masterDataSource").update("table_sync_info", record); + return true; + } + }); + } + }); + logger.info("增量更新数据任务结束"); + } else { + flag = false; + } + } + }else if(record.getInt("mode").equals(0)){ + //当数据库表结构主键不是自增时 增量更新的操作步骤 + while (flag) { + // 新增操作 取出最后更新id信息 查询增量数据 + final List data =Db.use("masterDataSource") + .find("select * from table_event_log where table_name = '" + record.getStr("table_name") + + "' and id > " + record.getLong("last_id") + " and event = " + + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); + //logger.info("主库同步分库新增数据增量更新的数据信息"+JSON.toJSONString(data)); + if (data != null && data.size() > 0) { + //多数据源事务 主数据源嵌套子数据源 + Db.use().tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use(url).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + List insertIds = new ArrayList(); + StringBuffer insertStr = new StringBuffer(); + for (int i = 0; i < data.size(); i++) { + insertIds.add(data.get(i).getInt("target_id")); + if (i == 0) { + insertStr.append("?"); + } else { + insertStr.append(",?"); + } + } + List insertDatas = Db.use("masterDataSource") + .find(" select * from " + record.getStr("table_name") + " where " + + record.getStr("id_name") + " in (" + insertStr + ") ", + insertIds.toArray()); + for(Record insertData:insertDatas){ + Record seqData = Db.use(url).findFirst("select nextval('seq_"+record.getStr("table_name")+"') seqId from dual"); + insertData.set(record.getStr("id_name"), seqData.getLong("seqId")); + } + Db.use(url).batchSave(record.getStr("table_name"), insertDatas, record.getInt("batch_size")); + // 同步完成后 取出最后一条数据的id 更新到table_sync_info表中 用作下次使用 + Object lastInsertId = data.get(data.size() - 1).get("id"); + logger.info("增量更新结束 获取最后一条更新数据的id信息"+JSON.toJSONString(lastInsertId)); + record.set("last_id", lastInsertId); + record.set("last_date", new Date()); + Db.use("masterDataSource").update("table_sync_info", record); + return true; + } + }); + } + }); + logger.info("增量更新数据任务结束"); + } else { + flag = false; + } + } + } + } else if (record.getInt("event") == 2 || record.getInt("event") == 3) { + // table_event_log sync_db_info两表查询获取修改数据信息 进行数据修改 + while (flag) { + final List datas = Db.find( + " select * from table_event_log where table_name = '" + record.getStr("table_name") + + "' and id > " + record.getLong("last_id") + " and event = " + + record.getInt("event") + " order by id asc limit "+record.getInt("batch_size")); + //logger.info("获取主库删除或者修改数据的数据信息"+JSON.toJSONString(datas)); + if (datas != null && datas.size() > 0) { + //多数据源事务 主数据源嵌套子数据源 + Db.use().tx(new IAtom() { + @Override + public boolean run() throws SQLException { + return Db.use(url).tx(new IAtom() { + @Override + public boolean run() throws SQLException { + List updateIds = new ArrayList(); + StringBuffer deleteStr = new StringBuffer(); + for (int i = 0; i < datas.size(); i++) { + updateIds.add(datas.get(i).getInt("target_id")); + if (i == 0) { + deleteStr.append("?"); + } else { + deleteStr.append(",?"); + } + } + logger.info("获取所有操作的数据id信息为"+JSON.toJSONString(updateIds)); + if (record.getInt("event") == 2) { + List updateDatas = Db.use("masterDataSource") + .find(" select * from " + record.getStr("table_name") + " where " + + record.getStr("id_name") + " in (" + deleteStr + ") ", + updateIds.toArray()); + //logger.info("获取所有修改数据的数据信息为"+JSON.toJSONString(updateDatas)); + if (updateDatas != null && updateDatas.size() > 0) { + Db.use(url).batchUpdate(record.getStr("table_name"), record.getStr("id_name"), + updateDatas, record.getInt("batch_size")); + } + logger.info("分库对主库修改操作的数据同步任务完成"); + } else if (record.getInt("event") == 3) { + Db.use(url).update("delete from " + record.getStr("table_name") + " where "+record.getStr("id_name")+" in (" + + deleteStr + ") ", updateIds.toArray()); + logger.info("分库对主库删除操作的数据同步完成"); + } + Object lastUpdateId = datas.get(datas.size() - 1).get("id"); + logger.info("获取最后一次修改或者删除操作的数据ID信息"+JSON.toJSONString(lastUpdateId)); + record.set("last_id", lastUpdateId); + record.set("last_date", new Date()); + Db.use("masterDataSource").update("table_sync_info", record); + logger.info("修改table_sync_info记录结果 用于下次同步完成"); + return true; + } + }); + } + }); + } else { + flag = false; + } + } + } + } + } + logger.info("主库数据同步分库结束"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public SyncDbInfo getSyncDbInfo() { + return syncDbInfo; + } + + public void setSyncDbInfo(SyncDbInfo syncDbInfo) { + this.syncDbInfo = syncDbInfo; + } + + +} \ No newline at end of file diff --git a/nms_sync/src/com/nms/util/GeneratorUtil.java b/nms_sync/src/com/nms/util/GeneratorUtil.java new file mode 100644 index 0000000..311f213 --- /dev/null +++ b/nms_sync/src/com/nms/util/GeneratorUtil.java @@ -0,0 +1,62 @@ +package com.nms.util; + +import javax.sql.DataSource; + +import com.jfinal.kit.PathKit; +import com.jfinal.plugin.activerecord.generator.Generator; +import com.jfinal.plugin.druid.DruidPlugin; + +/** + * 用于动态生成model以及basemodel生成器 + * 可以在数据库有修改时 同时更新model信息 + * @author Administrator + * + */ +public class GeneratorUtil { + public static void main(String args[]) { + // base model 所使用报名 + String baseModelPackageName="com.nms.model.basemodel"; + // base model 文件保存路径 + String baseModelOutputDir=PathKit.getRootClassPath() + "/../src/com/nms/model/basemodel"; + // model 所使用的报名 + String modelPackageName = "com.nms.model"; + // model 文件保存路径 + String modelOutputDir = baseModelOutputDir+"/.."; + // 创建生成器 + //DruidPlugin druid = new DruidPlugin("jdbc:mysql://localhost:3306/nms_sync","root","root"); + DruidPlugin druid = new DruidPlugin("jdbc:mysql://10.0.6.247:3306/nms","nms","nms"); + druid.start(); + Generator generator = new Generator(druid.getDataSource(),baseModelPackageName,baseModelOutputDir,modelPackageName,modelOutputDir); + + // 设置是否生成链式setter方法 + generator.setGenerateChainSetter(false); + + // 设置不需要生成的表名 + generator.addExcludedTable("di_systeminfo_disk"); + generator.addExcludedTable("di_systeminfo_net"); + generator.addExcludedTable("node_ip_table"); + generator.addExcludedTable("node_lattice_record"); + generator.addExcludedTable("nodegroup_arrow_position"); + generator.addExcludedTable("pro_deltabspace_temp"); + generator.addExcludedTable("set_det_data_con"); + generator.addExcludedTable("sys_data_dictionary_item"); + generator.addExcludedTable("sys_data_dictionary_name"); + /* generator.addExcludedTable("v_detection_set_info"); + generator.addExcludedTable("v_mission_node_group_1"); + generator.addExcludedTable("v_mission_node_group_4"); + generator.addExcludedTable("v_mission_node_group_6"); + generator.addExcludedTable("v_node_table"); +*/ + // 设置是否在Model中生成dao对象 + generator.setGenerateDaoInModel(true); + + // 设置是否生成字典文件 + generator.setGenerateDataDictionary(false); + + // 设置需要被移除的表明前缀用于生成modelName + generator.setRemovedTableNamePrefixes(""); + + // 开始生成 + generator.generate(); + } +}