This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
nms-nmssync/nms_sync/src/com/nms/job/DetectionSetInfoInterceptor.java
fangshunjian c88b984af2 1、重构代码,改用quartz做线程调度,实现HA
2、多个库的 向同一个库的表 同步数据 在同一个线程执行,保证不会造成死锁的情况,同时不会因为一个表同步事件过长而影响同一个库的其它表同步
3、部分特殊表通过 拦截器的方式 实现同步,如detection_set_info,meta_data。拦截器信息已配置到
table_sync_info 表中
4、重新整理nms 分库与主库 初始化语句及 同步配置表,初始化时
node_table.sync_status,event_record_library.db_id 的默认值需要修改为所在库的
id,id保存在sync_db_info 中,确保所有同步库 sync_db_info 表信息一致
2019-01-12 22:21:30 +06:00

239 lines
10 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.nms.job;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.kit.PropKit;
import com.jfinal.kit.StrKit;
import com.jfinal.plugin.activerecord.Db;
import com.jfinal.plugin.activerecord.DbPro;
import com.jfinal.plugin.activerecord.IAtom;
import com.jfinal.plugin.activerecord.Record;
import com.nms.job.SyncDataJob;
import com.nms.model.SetInfo;
import com.nms.model.SyncDbInfo;
import com.nms.model.TableSyncInfo;
import com.nms.socket.SocketClientServeice;
import com.nms.util.SyncCommon;
/**
* detection_set_info 特殊处理
* insert & update
* 1、主库向 分库同步
* 2、主库同步完成后 socket 通知 dc
* @author fang
*
*/
@SuppressWarnings("all")
public class DetectionSetInfoInterceptor implements Interceptor {
private static Logger logger = Logger.getLogger(DetectionSetInfoInterceptor.class);
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //Java Date 类型数据格式化格式
/**
* 监控设置信息变更请求命令
*/
private static final String WEB_NOTICE_SET_INFO_ALERT = "char:setInfoAlert";
private TableSyncInfo tableInfo;
private SyncDbInfo from;
private SyncDbInfo to;
public void init(final TableSyncInfo tableInfo, final SyncDbInfo from, final SyncDbInfo to) {
this.tableInfo = tableInfo;
this.from = from;
this.to = to;
}
@Override
public void exce(final TableSyncInfo tableInfo, final SyncDbInfo from, final SyncDbInfo to) {
init(tableInfo, from, to);//将变量保存到 对象
init(tableInfo, from, to);//将变量保存到 对象
final String idName = tableInfo.getIdName();
final String tableName = tableInfo.getTableName();
Integer mode = tableInfo.getMode();
String columns = tableInfo.getColumns();
String where = tableInfo.getWhere();
final DbPro fromDbPro = Db.use(from.getDbKey());
final DbPro toDbPro = Db.use(to.getDbKey());
Long fromDbId = from.getId();// from db id
Integer event = tableInfo.getEvent();
//排除 insertupdate 字段
String excludeNames = tableInfo.getExcludeNames();
String[] excludeNameArr = StrKit.notBlank(excludeNames)? excludeNames.split(",") :null;
if(event == 1) {//insert
StringBuilder sbSql = (mode == 2 ? SyncCommon.insertModeTwoSql(tableInfo) : SyncCommon.getSelectSqlLeftJoinEventLog(tableInfo));
Long lastId = tableInfo.getLastId();
final Integer batchSize = tableInfo.getBatchSize();
while(true) {
final List<Record> findRecord = fromDbPro.find(sbSql.toString(), lastId,batchSize);
if(findRecord != null && findRecord.size() > 0) {
Set<Long> oldSetInfo = new HashSet<Long>();
lastId = (mode==2? findRecord.get(findRecord.size()-1).getLong(idName) : findRecord.get(findRecord.size()-1).getLong("tableeventlogid"));
tableInfo.setLastId(lastId);//最后更新的id
StringBuilder wSb = new StringBuilder();
for(Record r : findRecord) {//删除自增主键,不需要同步
oldSetInfo.add(r.getLong(idName));
wSb.append(",").append("?");
r.remove("tableeventlogid");
if(excludeNameArr != null) {//删除排除字段,不需要同步
for(String exclude : excludeNameArr) {
r.remove(exclude);
}
}
}
wSb.deleteCharAt(0);
boolean tx = fromDbPro.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return toDbPro.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
toDbPro.batchSave(tableName, findRecord, batchSize);
tableInfo.setLastDate(new Date());//最后更新时间
fromDbPro.update("table_sync_info", tableInfo.toRecord());
return true;
}
});
}
});
if(tx) {
List<Record> newSetInfoList = Db.use(to.getDbKey()).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+wSb.toString()+")",oldSetInfo.toArray());
noticDcSetInfo(newSetInfoList, null, tableInfo, from, to);
}
continue;
}
//没有数据,退出循环
break ;
}
}else if (event == 2) {//update
final String updateIds = StrKit.notBlank(tableInfo.getUpdateIds())?tableInfo.getUpdateIds():idName ;//更新主键多个逗号分隔如果没有配置updateIds ,默认为 idName
boolean updateIdName = idName.equalsIgnoreCase(updateIds);//更新主键 是否和 查询主键一致
final Integer batchSize = tableInfo.getBatchSize();
if(StrKit.notBlank(idName)) {//单一主键通过table_event_log表 关联查询
StringBuilder sbSql = SyncCommon.getSelectSqlLeftJoinEventLog(tableInfo);
Long lastId = tableInfo.getLastId();
while(true) {
final List<Record> findRecord = fromDbPro.find(sbSql.toString(), lastId,batchSize);
if(findRecord != null && findRecord.size() > 0) {
//保存 监测设置 修改之前的记录
Set<Long> oldSetInfo = new HashSet<Long>();
lastId = findRecord.get(findRecord.size()-1).getLong("tableeventlogid");
tableInfo.setLastId(lastId);//最后更新的id
StringBuilder wSb = new StringBuilder();
for(Record r : findRecord) {//删除自增主键,不需要同步
oldSetInfo.add(r.getLong(idName));
wSb.append(",").append("?");
r.remove("tableeventlogid");
if(excludeNameArr != null) {//删除排除字段,不需要同步
for(String exclude : excludeNameArr) {
r.remove(exclude);
}
}
}
wSb.deleteCharAt(0);
List<Record> oldSetInfoList = Db.use(to.getDbKey()).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+wSb.toString()+")",oldSetInfo.toArray());
boolean tx = fromDbPro.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
return toDbPro.tx(new IAtom() {
@Override
public boolean run() throws SQLException {
toDbPro.batchUpdate(tableName, updateIds, findRecord, batchSize);
tableInfo.setLastDate(new Date());//最后更新时间
fromDbPro.update("table_sync_info", tableInfo.toRecord());
return true;
}
});
}
});
if(tx) {
List<Record> newSetInfoList = Db.use(to.getDbKey()).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+wSb.toString()+")",oldSetInfo.toArray());
noticDcSetInfo(newSetInfoList, oldSetInfoList, tableInfo, from, to);
}
continue;
}
//没有数据,退出循环
break ;
}
}else {//多主键的更新,需要每条查询,暂不实现
}
}else {
logger.warn(String.format("unknown event %s", event));
}
}
public void noticDcSetInfo(List<Record> newSetInfo,List<Record> oldSetInfo,TableSyncInfo tableInfo, SyncDbInfo masterDb, final SyncDbInfo slaveDb) {
Map<Long,Record> oldMap = new HashMap<Long,Record>();
if(oldSetInfo != null) {
for(Record r : oldSetInfo) {
oldMap.put(r.getLong("ID"), r);
}
}
//获取当前分库连接的dc的IP信息
final String serverIp = PropKit.use("socket.properties").get("db"+slaveDb.getIp());
logger.info("获取socket连接ip信息为"+serverIp);
//根据数据库分库dc ip信息获取数据库相关实体信息
final Record serverTableInfo = Db.findFirst("select * from server_table where server_ip = ? and server_state=0 ",serverIp);
for(Record newRecord : newSetInfo) {
Record oldRecord = oldMap.get(newRecord.getLong("ID"));
SetInfo o=null;
if(oldRecord != null) {
o = new SetInfo();
o.setId(oldRecord.getLong("ID"));
o.setCheckTypeId(oldRecord.getLong("CHECK_TYPE_INFO_ID"));
o.setCheckTypeName(oldRecord.getStr("CHECK_TYPE_INFO_Name"));
o.setProcessIden(oldRecord.getStr("PROCESS_IDEN"));
o.setNodeGroupsId(oldRecord.getStr("NODE_GROUPS_ID"));
o.setNodeIpsId(oldRecord.getStr("NODE_IPS_ID"));
o.setCheckWay(oldRecord.getStr("CHECK_WAY"));
}
SetInfo n = new SetInfo();
n.setId(newRecord.getLong("ID"));
n.setCheckTypeId(newRecord.getLong("CHECK_TYPE_INFO_ID"));
n.setCheckTypeName(newRecord.getStr("CHECK_TYPE_INFO_Name"));
n.setProcessIden(newRecord.getStr("PROCESS_IDEN"));
n.setNodeGroupsId(newRecord.getStr("NODE_GROUPS_ID"));
n.setNodeIpsId(newRecord.getStr("NODE_IPS_ID"));
n.setCheckWay(newRecord.getStr("CHECK_WAY"));
final JSONObject jObject = new JSONObject();
jObject.put("old", o);
jObject.put("new", n);
new Thread(new Runnable(){
@Override
public void run() {
try {
new SocketClientServeice(serverIp).sendInfoToServer(WEB_NOTICE_SET_INFO_ALERT,jObject.toString());
logger.info(String.format("监测设置变更通知成功data: %s", jObject.toString()));
} catch (Exception e) {
Record eventRecordLibrary=new Record();
eventRecordLibrary.set("record_command", WEB_NOTICE_SET_INFO_ALERT);
eventRecordLibrary.set("record_content", jObject.toString());
eventRecordLibrary.set("record_type", "W2S");
eventRecordLibrary.set("state", 1l);
eventRecordLibrary.set("nmsserver_id", serverTableInfo.get("id"));
eventRecordLibrary.set("create_time", new Date());
Db.use(slaveDb.getDbKey()).save("event_record_library", eventRecordLibrary);
logger.warn("Monitoring setting to change communication anomalies,",e);
}
}
}).start();
}
}
}