This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
nms-nmssync/nms_sync/src/com/nms/interceptor/SyncDataInterceptor.java
2018-10-23 19:34:15 +08:00

130 lines
6.3 KiB
Java

package com.nms.interceptor;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSON;
import com.jfinal.aop.Interceptor;
import com.jfinal.aop.Invocation;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.Record;
import com.nms.model.SyncDbInfo;
import com.nms.thread.SyncThread;
public class SyncDataInterceptor implements Interceptor{
private Logger logger =Logger.getLogger(this.getClass());
@Override
public void intercept(Invocation inv) {
try{
logger.info("--------数据同步前 syncDataInterceptor拦截器拦截------------");
SyncThread target = inv.getTarget();
SyncDbInfo syncDbInfo = target.getSyncDbInfo();
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
logger.info("当前数据库连接为 "+url);
//同步数据前 取出metadata表中最后一次同步的id信息 获取新增的数据信息 方便接下来修改表结构
Record metadataTableSyncInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='metadata' and event=1 and db_id=?",syncDbInfo.getId());
logger.info("获取metadata表中最后一次同步id的数据信息为 "+JSON.toJSONString(metadataTableSyncInfo));
//开始执行同步过程
inv.invoke();
//处理同步数据结束
//判断metadata表 是否有新增数据 如果有执行sql 修改表结构
if(metadataTableSyncInfo!=null){
List<Record> metadatas = Db.use(url).find("select m.*,cti.table_name from metadata m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",metadataTableSyncInfo.getLong("last_id"));
//logger.info("metadata表中新增数据信息查询结果为 "+JSON.toJSONString(metadatas));
if(metadatas!=null && metadatas.size()>0){
for(Record metadata:metadatas){
Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",metadata.getStr("table_name"));
logger.info("判断metadata表新增数据是修改还是新增表操作结果为"+JSON.toJSONString(isExist));
//向数据库中添加新的字段
if(isExist.getInt("count")>0){
StringBuffer sqlString = new StringBuffer("alter table ");
sqlString.append(metadata.getStr("table_name").toUpperCase());
sqlString.append(" add(");
sqlString.append(metadata.getStr("filed_name")+" "+ toMysqlType(metadata.getStr("filed_type"))+")");
logger.info("修改metadata表结构 sql语句为"+sqlString.toString());
//执行添加字段
int resu =Db.use(url).update(sqlString.toString());
logger.info("修改表结构结果为 "+resu);
}
}
}
}
//判断check_type_info表 是否有新增数据 如果有执行存储过程 创建新表
List<Record> checkTypeInfos = Db.use(url).find(" select * from check_type_info where crete_state=0 ");
for(Record checkTypeInfo : checkTypeInfos){
//判断表是否存在
Record isExist = Db.use(url).findFirst("select COUNT(1) count from information_schema.`TABLES` WHERE TABLE_SCHEMA = '"+syncDbInfo.getStr("database_name")+"' and UPPER(TABLE_NAME) = ?",checkTypeInfo.getStr("TABLE_NAME"));
logger.info("check_type_info表中有新增数据 判断表是否创建"+JSON.toJSONString(isExist));
if(isExist.getInt("count")>0){
continue;
}else{
//创建表数据
String filedName ="";
String filedType ="";
StringBuffer sql= new StringBuffer();
StringBuffer cIndexFileds = new StringBuffer("data_check_time:seq_id:detection_set_info_id:");
List<Record> metadatas2 = Db.use(url).find("select * from metadata where 1=1 and check_type_id=? and state = '0' order by show_num asc",checkTypeInfo.getLong("ID"));
if(metadatas2!=null && metadatas2.size()>0) {
for(int i=0;i<metadatas2.size();i++){
filedName = metadatas2.get(i).getStr("filed_name");
sql.append(filedName+" ");
filedType = metadatas2.get(i).getStr("filed_type");
if(i != metadatas2.size()-1){
sql.append(toMysqlType(filedType)+",");
}else{
sql.append(toMysqlType(filedType));
}
//判断是否为统计
if(metadatas2.get(i).getStr("chart_state").equals("0")){
cIndexFileds.append(metadatas2.get(i).getStr("chart_state")+":");
}
}
}
logger.info("check_type_info新增数据创建表结构 参数信息 调用存储过程名称 pro_createTable");
logger.info("check_type_info新增数据创建表结构 参数信息 表名"+checkTypeInfo.getStr("TABLE_NAME"));
logger.info("check_type_info新增数据创建表结构 参数信息 sql"+sql.toString());
logger.info("check_type_info新增数据创建表结构 参数信息 字段名称"+cIndexFileds.toString());
SyncStoredProcedure syncStoreProcedure=new SyncStoredProcedure("pro_createTable",checkTypeInfo.getStr("TABLE_NAME"), sql.toString(),cIndexFileds.toString());
Db.use(url).execute(syncStoreProcedure);
logger.info("创建新表操作完成");
//创建完新表结构以后 将同步信息添加到同步表 使新配置的监测数据同步到中心
Record record = new Record();
record.set("table_name", checkTypeInfo.getStr("TABLE_NAME"));
record.set("mode", 2);
record.set("id_name", "ID");
record.set("event",1);
record.set("last_id", -1);
record.set("db_id", -1);
record.set("last_date", new Date());
Db.use(url).save("table_sync_info", record);
}
}
logger.info("--------数据同步前 syncDataInterceptor拦截器拦截结束------------");
}catch(Exception e){
e.printStackTrace();
logger.error("syncDataInterceptor拦截器内部程序出现异常信息"+e.getMessage());
}
}
private static String toMysqlType(String type){
type = type.trim().toLowerCase();
if(type.startsWith("date")){
type = type.replaceAll("date", "datetime");
}else if(type.startsWith("number")){
type = type.replaceAll("number", "bigint");
}else if(type.startsWith("varchar")){
type = type.replaceAll("varchar2", "varchar");
}
return type;
}
}