update
This commit is contained in:
1
nms_sync/.gitignore
vendored
Normal file
1
nms_sync/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/bin/
|
||||
12
nms_sync/conf/socket.properties
Normal file
12
nms_sync/conf/socket.properties
Normal file
@@ -0,0 +1,12 @@
|
||||
# socket\u901a\u4fe1\u7aef\u53e3\u53f7
|
||||
socket.port=50702
|
||||
|
||||
# \u516c\u6709\u5bc6\u5319\u5e93 \u5b58\u653e\u516c\u94a5\uff08\u4e0e\u5176\u4ed6\u4e3b\u673a\u52a0\u5bc6\u901a\u8baf\u5bf9\u5e94\u7684\u5bc6\u5319\uff09
|
||||
ssl.ks = F:/ssl/client_ks
|
||||
# \u79c1\u6709\u5bc6\u5319\u5e93 \u5b58\u653e\u79c1\u94a5(\u5411\u5176\u4ed6\u4e3b\u673a\u53d1\u5e03\u4fe1\u606f\u7684\u4f7f\u7528\u7684\u52a0\u5bc6\u5bc6\u5319)
|
||||
ssl.ts = F:/ssl/client_ts
|
||||
|
||||
|
||||
# db\u5173\u8054dc ip\u5730\u5740\u4fe1\u606f
|
||||
db192.168.10.186=192.168.10.186
|
||||
db192.168.10.204=192.168.10.204
|
||||
BIN
nms_sync/lib/mysql-connector-java-5.1.13.jar
Normal file
BIN
nms_sync/lib/mysql-connector-java-5.1.13.jar
Normal file
Binary file not shown.
@@ -0,0 +1,264 @@
|
||||
package com.nms.interceptor;
|
||||
|
||||
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.apache.log4j.Logger;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.jfinal.aop.Interceptor;
|
||||
import com.jfinal.aop.Invocation;
|
||||
import com.jfinal.plugin.activerecord.Db;
|
||||
import com.jfinal.plugin.activerecord.Record;
|
||||
import com.nms.model.SyncDbInfo;
|
||||
import com.nms.thread.SyncSlaveToMasterThread;
|
||||
|
||||
|
||||
public class SyncMissionResultStatisticalInterceptor implements Interceptor{
|
||||
private Logger logger =Logger.getLogger(this.getClass());
|
||||
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //Java Date 类型数据格式化格式
|
||||
|
||||
@Override
|
||||
public void intercept(Invocation inv) {
|
||||
try{
|
||||
logger.info("---------SyncMissionResultStatisticalInterceptor拦截器拦截开始------------");
|
||||
|
||||
//创建一个任务id集合 存储任务结果改变的任务id 这些任务统一走新的统计方法 修改mission_state_table状态
|
||||
|
||||
//新增或者修改的结果可能是多条 但是任务id相同 用set进行去重
|
||||
Set<Long> missionIds =new HashSet<Long>();
|
||||
//同步前 查询出最后任务结果数据信息ID 用于查出新任务结果数据或者修改的数据信息
|
||||
SyncSlaveToMasterThread target = inv.getTarget();
|
||||
SyncDbInfo syncDbInfo = target.getSyncDbInfo();
|
||||
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
||||
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
|
||||
logger.info("当前数据库连接为 "+url);
|
||||
|
||||
Record beforeInsertMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=1 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeInsertMissionResultTable1));
|
||||
|
||||
Record beforeInsertMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=1 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeInsertMissionResultTable4));
|
||||
|
||||
Record beforeInsertMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=1 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeInsertMissionResultTable6));
|
||||
|
||||
|
||||
Record beforeUpdateMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=2 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable1));
|
||||
|
||||
Record beforeUpdateMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=2 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable4));
|
||||
|
||||
Record beforeUpdateMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=2 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(beforeUpdateMissionResultTable6));
|
||||
|
||||
|
||||
inv.invoke();
|
||||
|
||||
|
||||
Record afterInsertMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=1 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable1));
|
||||
|
||||
Record afterInsertMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=1 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable4));
|
||||
|
||||
Record afterInsertMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=1 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable6));
|
||||
|
||||
if(null!=beforeInsertMissionResultTable1&&null!=afterInsertMissionResultTable1) {
|
||||
List<Record> find = Db.use(url.toString()).find("select * from mission_result_table1 where id>= ? and id <= ? ",beforeInsertMissionResultTable1.get("last_id"),afterInsertMissionResultTable1.get("last_id"));
|
||||
if(null!=find && find.size()>0) {
|
||||
for (Record record : find) {
|
||||
missionIds.add(record.getLong("mission_id"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(null!=beforeInsertMissionResultTable4&&null!=afterInsertMissionResultTable4) {
|
||||
List<Record> find = Db.use(url.toString()).find("select * from mission_result_table4 where id>= ? and id <= ? ",beforeInsertMissionResultTable4.get("last_id"),afterInsertMissionResultTable4.get("last_id"));
|
||||
if(null!=find && find.size()>0) {
|
||||
for (Record record : find) {
|
||||
missionIds.add(record.getLong("mission_id"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(null!=beforeInsertMissionResultTable6&&null!=afterInsertMissionResultTable6) {
|
||||
List<Record> find = Db.use(url.toString()).find("select * from mission_result_table6 where id>= ? and id <= ? ",beforeInsertMissionResultTable6.get("last_id"),afterInsertMissionResultTable6.get("last_id"));
|
||||
if(null!=find && find.size()>0) {
|
||||
for (Record record : find) {
|
||||
missionIds.add(record.getLong("mission_id"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Record afterUpdateMissionResultTable1 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table1' and event=2 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable1));
|
||||
|
||||
Record afterUpdateMissionResultTable4 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table4' and event=2 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable4));
|
||||
|
||||
Record afterUpdateMissionResultTable6 = Db.use(url.toString()).findFirst("select * from table_sync_info where table_name='mission_result_table6' and event=2 and db_id=-1");
|
||||
logger.info("获取mission_result_table1表中最后一次同步id的数据信息为 "+JSON.toJSONString(afterInsertMissionResultTable6));
|
||||
|
||||
|
||||
if(null!=beforeUpdateMissionResultTable1&&null!=afterUpdateMissionResultTable1) {
|
||||
List<Long> updateIds = new ArrayList<Long>();
|
||||
List<Record> updateDatas = Db.use(url.toString()).find("select * from table_event_log where table_name='mission_result_table1' and id>= ? and id <= ?",beforeUpdateMissionResultTable1.get("last_id"),afterUpdateMissionResultTable1.get("last_id"));
|
||||
StringBuilder handleStr=new StringBuilder();
|
||||
if(null!=updateDatas && updateDatas.size()>0) {
|
||||
for (int i=0;i<updateDatas.size();i++) {
|
||||
updateIds.add(updateDatas.get(i).getLong("target_id"));
|
||||
if(i==0) {
|
||||
handleStr.append("?");
|
||||
}else {
|
||||
handleStr.append(",?");
|
||||
}
|
||||
}
|
||||
List<Record> updateDataInfos = Db.use(url.toString())
|
||||
.find(" select mission_id from mission_result_table1 where id in (" + handleStr + ") ",
|
||||
updateIds.toArray());
|
||||
if (updateDataInfos != null && updateDataInfos.size() > 0) {
|
||||
for (Record record : updateDataInfos) {
|
||||
missionIds.add(record.getLong("mission_id"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(null!=beforeUpdateMissionResultTable4&&null!=afterUpdateMissionResultTable4) {
|
||||
List<Long> updateIds = new ArrayList<Long>();
|
||||
List<Record> updateDatas = Db.use(url.toString()).find("select * from table_event_log where table_name='mission_result_table4' and id>= ? and id <= ?",beforeUpdateMissionResultTable4.get("last_id"),afterUpdateMissionResultTable4.get("last_id"));
|
||||
StringBuilder handleStr=new StringBuilder();
|
||||
if(null!=updateDatas && updateDatas.size()>0) {
|
||||
for (int i=0;i<updateDatas.size();i++) {
|
||||
updateIds.add(updateDatas.get(i).getLong("target_id"));
|
||||
if(i==0) {
|
||||
handleStr.append("?");
|
||||
}else {
|
||||
handleStr.append(",?");
|
||||
}
|
||||
}
|
||||
List<Record> updateDataInfos = Db.use(url.toString())
|
||||
.find(" select mission_id from mission_result_table4 where id in (" + handleStr + ") ",
|
||||
updateIds.toArray());
|
||||
if (updateDataInfos != null && updateDataInfos.size() > 0) {
|
||||
for (Record record : updateDataInfos) {
|
||||
missionIds.add(record.getLong("mission_id"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(null!=beforeUpdateMissionResultTable6&&null!=afterUpdateMissionResultTable6) {
|
||||
List<Long> updateIds = new ArrayList<Long>();
|
||||
List<Record> updateDatas = Db.use(url.toString()).find("select * from table_event_log where table_name='mission_result_table6' and id>= ? and id <= ?",beforeUpdateMissionResultTable1.get("last_id"),afterUpdateMissionResultTable1.get("last_id"));
|
||||
StringBuilder handleStr=new StringBuilder();
|
||||
if(null!=updateDatas && updateDatas.size()>0) {
|
||||
for (int i=0;i<updateDatas.size();i++) {
|
||||
updateIds.add(updateDatas.get(i).getLong("target_id"));
|
||||
if(i==0) {
|
||||
handleStr.append("?");
|
||||
}else {
|
||||
handleStr.append(",?");
|
||||
}
|
||||
}
|
||||
List<Record> updateDataInfos = Db.use(url.toString())
|
||||
.find(" select mission_id from mission_result_table6 where id in (" + handleStr + ") ",
|
||||
updateIds.toArray());
|
||||
if (updateDataInfos != null && updateDataInfos.size() > 0) {
|
||||
for (Record record : updateDataInfos) {
|
||||
missionIds.add(record.getLong("mission_id"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("即将进行统计的所有任务的mission_id为:"+JSON.toJSONString(missionIds));
|
||||
|
||||
//根据统计结果更新mission_state_talbe表对应任务的状态
|
||||
if(missionIds.size()>0) {
|
||||
for (Long missionId : missionIds) {
|
||||
StatisticalHandle(missionId);
|
||||
}
|
||||
}
|
||||
logger.info("--------SyncMissionResultStatisticalInterceptor拦截器拦截结束------------");
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
logger.error("SyncMissionResultStatisticalInterceptor拦截器内部程序出现异常信息",e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void StatisticalHandle(Long missionId) {
|
||||
logger.info("根据当前任务id为:"+missionId+"开始统计");
|
||||
// mission_state_table 状态值
|
||||
Integer status = null;
|
||||
List<Record> results = Db.use().find("select result from (\r\n" +
|
||||
"(select result,mission_id from mission_result_table1 mrt) union all \r\n" +
|
||||
"(select result,mission_id from mission_result_table4 mrt) union all \r\n" +
|
||||
"(select result,mission_id from mission_result_table6 mrt)\r\n" +
|
||||
") t \r\n" +
|
||||
"left join mission_state_table mst on mst.mission_id = t.mission_id \r\n" +
|
||||
"where mst.is_loop = 0 and mst.mission_id=?",missionId);
|
||||
boolean noThree=true;
|
||||
if(null!=results&&results.size()>0) {
|
||||
for (Record record : results) {
|
||||
if(record.getInt("result")==3) {
|
||||
noThree=false;
|
||||
}
|
||||
}
|
||||
}
|
||||
// 判断任务结果有没有状态值为3的 如果有 则任务状态为在下发
|
||||
Record result = Db.use().findFirst("select t.mission_id,t.ok,t.fail,t.total from (\r\n" +
|
||||
"(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table1 mrt group by mrt.mission_id) union all \r\n" +
|
||||
"(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table4 mrt group by mrt.mission_id) union all \r\n" +
|
||||
"(select mrt.mission_id,sum(CASE mrt.result when 0 THEN 1 ELSE 0 end) ok,sum(CASE mrt.result when 1 then 1 when -1 then 1 else 0 end) fail,count(mrt.seq_id) total from mission_result_table6 mrt group by mrt.mission_id)\r\n" +
|
||||
") t \r\n" +
|
||||
"left join mission_state_table mst on mst.mission_id = t.mission_id \r\n" +
|
||||
"where mst.is_loop = 0 and t.mission_id = ?",missionId);
|
||||
if(null!=result) {
|
||||
Integer okCount = result.getInt("ok");
|
||||
Integer failCount =result.getInt("fail");
|
||||
Integer total =result.getInt("total");
|
||||
|
||||
|
||||
if(okCount+failCount==total) {
|
||||
if(failCount==0) {
|
||||
status=30;
|
||||
}else if(okCount==0) {
|
||||
status=31;
|
||||
}else {
|
||||
status=32;
|
||||
}
|
||||
}
|
||||
|
||||
if(!noThree) {
|
||||
status=2;
|
||||
}
|
||||
|
||||
logger.info("统计完成 修改mission_state_table状态为:? 总个数:?执行数:? 成功:? 失败:?");
|
||||
|
||||
String missionStateAutoDesc=null;
|
||||
if(okCount+failCount==total) {
|
||||
missionStateAutoDesc=format.format(System.currentTimeMillis())+" i18n_sserver.UpgradeService.sql.complate_n81i "+total+" i18n_sserver.UpgradeService.sql.executeNode_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount;
|
||||
}
|
||||
String missionStateDesc="\r\n" +
|
||||
"i18n_server.UpgradeService.sql.total_n81i "+total+" i18n_server.UpgradeService.sql.executeNode2_n81i,</br> "+(total-okCount-failCount)+" i18n_server.UpgradeService.sql.unexecute_n81i,</br>"+(okCount+failCount)+" i18n_server.UpgradeService.sql.execute_n81i【i18n_server.UpgradeService.sql.success_n81i "+okCount+" i18n_sserver.UpgradeService.sql.failed_n81i "+failCount+"】";
|
||||
|
||||
Record missionStateTableResult =new Record();
|
||||
missionStateTableResult.set("mission_id", missionId);
|
||||
missionStateTableResult.set("mission_state", status);
|
||||
missionStateTableResult.set("mission_state_desc", missionStateDesc);
|
||||
if(missionStateAutoDesc!=null) {
|
||||
missionStateTableResult.set("auto_desc", missionStateAutoDesc);
|
||||
}
|
||||
Db.use("masterDataSource").update("mission_state_table","mission_id",missionStateTableResult);
|
||||
}
|
||||
logger.info("修改mission_state_table信息完成");
|
||||
}
|
||||
}
|
||||
187
nms_sync/src/com/nms/interceptor/SyncSocketInterceptor.java
Normal file
187
nms_sync/src/com/nms/interceptor/SyncSocketInterceptor.java
Normal file
@@ -0,0 +1,187 @@
|
||||
package com.nms.interceptor;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.jfinal.aop.Interceptor;
|
||||
import com.jfinal.aop.Invocation;
|
||||
import com.jfinal.kit.PropKit;
|
||||
import com.jfinal.plugin.activerecord.Db;
|
||||
import com.jfinal.plugin.activerecord.Record;
|
||||
import com.nms.model.SetInfo;
|
||||
import com.nms.model.SyncDbInfo;
|
||||
import com.nms.socket.SocketClientServeice;
|
||||
import com.nms.thread.SyncThread;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 数据同步完成后针对新监测数据 进行dc socket通讯
|
||||
* @author default
|
||||
*
|
||||
*/
|
||||
public class SyncSocketInterceptor implements Interceptor{
|
||||
private Logger logger =Logger.getLogger(this.getClass());
|
||||
/**
|
||||
* 监控设置信息变更请求命令
|
||||
*/
|
||||
private String WEB_NOTICE_SET_INFO_ALERT = "char:setInfoAlert";
|
||||
|
||||
@Override
|
||||
public void intercept(Invocation inv) {
|
||||
try{
|
||||
logger.info("--------数据同步前 SyncSocketInterceptor拦截器拦截------------");
|
||||
//同步前 查询出最后监测数据信息ID 用于查出新监测数据或者修改的数据信息
|
||||
SyncThread target = inv.getTarget();
|
||||
SyncDbInfo syncDbInfo = target.getSyncDbInfo();
|
||||
String url = "jdbc:mysql://" + syncDbInfo.get("ip") + ":" + syncDbInfo.get("port") + "/"
|
||||
+ syncDbInfo.get("database_name")+"?useUnicode=true&characterEncoding=utf-8&useOldAliasMetadataBehavior=true&rewriteBatchedStatements=true";
|
||||
logger.info("当前数据库连接为 "+url);
|
||||
Record detectionSetInfoTableInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='detection_set_info' and event=1 and db_id=?",syncDbInfo.getId());
|
||||
logger.info("获取detectionSetInfo表中最后一次同步id的数据信息为 "+JSON.toJSONString(detectionSetInfoTableInfo));
|
||||
|
||||
Record detectionSetInfoTableUpdateInfo = Db.use("masterDataSource").findFirst("select * from table_sync_info where table_name='detection_set_info' and event=2 and db_id=?",syncDbInfo.getId());
|
||||
logger.info("获取detectionSetInfo表中最后一次同步修改id的数据信息为 "+JSON.toJSONString(detectionSetInfoTableUpdateInfo));
|
||||
|
||||
//查询所有修改数据信息
|
||||
List<Record> datas = Db.use("masterDataSource")
|
||||
.find("select * from table_event_log where table_name = '" + detectionSetInfoTableUpdateInfo.getStr("table_name")
|
||||
+ "' and id > " + detectionSetInfoTableUpdateInfo.getLong("last_id") + " and event = "
|
||||
+ detectionSetInfoTableUpdateInfo.getInt("event") + " order by id asc ");
|
||||
List<Long> updateIds = new ArrayList<Long>();
|
||||
StringBuilder handleStr=new StringBuilder();
|
||||
if (datas != null && datas.size() > 0) {
|
||||
for (int i = 0; i < datas.size(); i++) {
|
||||
updateIds.add(datas.get(i).getLong("target_id"));
|
||||
if(i==0) {
|
||||
handleStr.append("?");
|
||||
}else {
|
||||
handleStr.append(",?");
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Record> detectionSetUpdateInfos=null;
|
||||
Map<Long,Record> map=new HashMap<Long,Record>();
|
||||
//查询修改前数据的信息
|
||||
if(updateIds.size()>0) {
|
||||
detectionSetUpdateInfos= Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray());
|
||||
if(null!=detectionSetUpdateInfos&&detectionSetUpdateInfos.size()>0) {
|
||||
for(Record detectionSetInfo:detectionSetUpdateInfos) {
|
||||
map.put(detectionSetInfo.getLong("ID"),detectionSetInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
inv.invoke();
|
||||
|
||||
//获取当前分库连接的dc的IP信息
|
||||
final String serverIp = PropKit.use("socket.properties").get("db"+syncDbInfo.get("ip"));
|
||||
logger.info("获取socket连接ip信息为:"+serverIp);
|
||||
//根据数据库分库dc ip信息获取数据库相关实体信息
|
||||
final Record serverTableInfo = Db.findFirst("select * from server_table where server_ip = ? and server_state=0 ",serverIp);
|
||||
|
||||
// 查询出 修改或者更新的监测配置信息 如果没有的话不进行任何操作
|
||||
if(detectionSetInfoTableInfo!=null) {
|
||||
List<Record> detectionSetInfos = Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id > ? ",detectionSetInfoTableInfo.getLong("last_id"));
|
||||
if(null!=detectionSetInfos&&detectionSetInfos.size()>0) {
|
||||
for(Record detectionSetInfo:detectionSetInfos) {
|
||||
SetInfo o=null;
|
||||
SetInfo n = new SetInfo();
|
||||
n.setId(detectionSetInfo.getLong("ID"));
|
||||
n.setCheckTypeId(detectionSetInfo.getLong("CHECK_TYPE_INFO_ID"));
|
||||
n.setCheckTypeName(detectionSetInfo.getStr("CHECK_TYPE_INFO_Name"));
|
||||
n.setProcessIden(detectionSetInfo.getStr("PROCESS_IDEN"));
|
||||
n.setNodeGroupsId(detectionSetInfo.getStr("NODE_GROUPS_ID"));
|
||||
n.setNodeIpsId(detectionSetInfo.getStr("NODE_IPS_ID"));
|
||||
n.setCheckWay(detectionSetInfo.getStr("CHECK_WAY"));
|
||||
final JSONObject jObject = new JSONObject();
|
||||
jObject.put("old", o);
|
||||
jObject.put("new", n);
|
||||
new Thread(new Runnable(){
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
new SocketClientServeice(serverIp).sendInfoToServer(WEB_NOTICE_SET_INFO_ALERT,jObject.toString());
|
||||
logger.info("…………………………………………socket通信发送修改监测数据完毕………………………………………………");
|
||||
} catch (Exception e) {
|
||||
Record eventRecordLibrary=new Record();
|
||||
eventRecordLibrary.set("record_command", WEB_NOTICE_SET_INFO_ALERT);
|
||||
eventRecordLibrary.set("record_content", jObject.toString());
|
||||
eventRecordLibrary.set("record_type", "W2S");
|
||||
eventRecordLibrary.set("state", 1l);
|
||||
eventRecordLibrary.set("nmsserver_id", serverTableInfo.get("id"));
|
||||
eventRecordLibrary.set("create_time", new Date());
|
||||
Db.save("event_record_library", eventRecordLibrary);
|
||||
logger.error("Monitoring setting to change communication anomalies: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(null!=detectionSetUpdateInfos&&detectionSetUpdateInfos.size()>0) {
|
||||
//查询出已经修改的监测配置数据
|
||||
List<Record> detectionSetInfos = Db.use(url).find("select m.*,cti.id CHECK_TYPE_INFO_ID,cti.check_type_name CHECK_TYPE_INFO_NAME from detection_set_info m left join check_type_info cti on m.check_type_id=cti.id where m.id in ("+handleStr.toString()+")",updateIds.toArray());
|
||||
if(null!=detectionSetInfos&&detectionSetInfos.size()>0) {
|
||||
for (Record detectionSetInfo : detectionSetInfos) {
|
||||
SetInfo o=null;
|
||||
Record old = map.get(detectionSetInfo.get("ID"));
|
||||
if(old!=null) {
|
||||
o = new SetInfo();
|
||||
o.setId(detectionSetInfo.getLong("ID"));
|
||||
o.setCheckTypeId(detectionSetInfo.getLong("CHECK_TYPE_INFO_ID"));
|
||||
o.setCheckTypeName(detectionSetInfo.getStr("CHECK_TYPE_INFO_Name"));
|
||||
o.setProcessIden(detectionSetInfo.getStr("PROCESS_IDEN"));
|
||||
o.setNodeGroupsId(detectionSetInfo.getStr("NODE_GROUPS_ID"));
|
||||
o.setNodeIpsId(detectionSetInfo.getStr("NODE_IPS_ID"));
|
||||
o.setCheckWay(detectionSetInfo.getStr("CHECK_WAY"));
|
||||
}
|
||||
SetInfo n = new SetInfo();
|
||||
n.setId(detectionSetInfo.getLong("ID"));
|
||||
n.setCheckTypeId(detectionSetInfo.getLong("CHECK_TYPE_INFO_ID"));
|
||||
n.setCheckTypeName(detectionSetInfo.getStr("CHECK_TYPE_INFO_Name"));
|
||||
n.setProcessIden(detectionSetInfo.getStr("PROCESS_IDEN"));
|
||||
n.setNodeGroupsId(detectionSetInfo.getStr("NODE_GROUPS_ID"));
|
||||
n.setNodeIpsId(detectionSetInfo.getStr("NODE_IPS_ID"));
|
||||
n.setCheckWay(detectionSetInfo.getStr("CHECK_WAY"));
|
||||
final JSONObject jObject = new JSONObject();
|
||||
jObject.put("old", o);
|
||||
jObject.put("new", n);
|
||||
new Thread(new Runnable(){
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
new SocketClientServeice(serverIp).sendInfoToServer(WEB_NOTICE_SET_INFO_ALERT,jObject.toString());
|
||||
logger.info("…………………………………………socket通信发送修改监测数据完毕………………………………………………");
|
||||
} catch (Exception e) {
|
||||
Record eventRecordLibrary=new Record();
|
||||
eventRecordLibrary.set("record_command", WEB_NOTICE_SET_INFO_ALERT);
|
||||
eventRecordLibrary.set("record_content", jObject.toString());
|
||||
eventRecordLibrary.set("record_type", "W2S");
|
||||
eventRecordLibrary.set("state", 1l);
|
||||
eventRecordLibrary.set("nmsserver_id", serverTableInfo.get("id"));
|
||||
eventRecordLibrary.set("create_time", new Date());
|
||||
Db.save("event_record_library", eventRecordLibrary);
|
||||
logger.error("Monitoring setting to change communication anomalies: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info("--------数据同步前 SyncSocketInterceptor拦截器拦截完毕------------");
|
||||
}catch(Exception e){
|
||||
e.printStackTrace();
|
||||
logger.error("syncDataInterceptor socket通信拦截器内部程序出现异常信息"+e.getMessage());
|
||||
logger.error("syncDataInterceptor socket通信拦截器内部程序出现异常信息",e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.nms.main;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@@ -24,6 +25,9 @@ import com.nms.thread.SyncThread;
|
||||
*
|
||||
*/
|
||||
public class SyncData{
|
||||
|
||||
private static final ThreadLocal<Set<Long>> threadLocal = new ThreadLocal<Set<Long>>();
|
||||
|
||||
public static void main(String[] args) {
|
||||
Logger logger = Logger.getLogger(SyncData.class);
|
||||
logger.info("同步程序开始启动");
|
||||
|
||||
230
nms_sync/src/com/nms/model/SetInfo.java
Normal file
230
nms_sync/src/com/nms/model/SetInfo.java
Normal file
@@ -0,0 +1,230 @@
|
||||
package com.nms.model;
|
||||
|
||||
/**
|
||||
* 客户端用到的监测设置信息实体
|
||||
*
|
||||
*/
|
||||
public class SetInfo {
|
||||
/**
|
||||
* 监测设置信息ID
|
||||
*/
|
||||
private Long id;
|
||||
/**
|
||||
* 检测类型
|
||||
*/
|
||||
private String checkTypeName;//如:CPU、DISK等
|
||||
private String checkTypeName1;//如:CPU、DISK等
|
||||
private Long checkTypeId;//检测类型的ID,预留
|
||||
/**
|
||||
* 目标IP
|
||||
*/
|
||||
// private String nodeIp;//
|
||||
/**
|
||||
* 最大测试次数
|
||||
*/
|
||||
private Long checkMaxTimes;
|
||||
/**
|
||||
* 节点组ID
|
||||
*/
|
||||
// private Long groupId;
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private String nodeGroupsId;
|
||||
/**
|
||||
* 最大测试次数
|
||||
*/
|
||||
private String nodeIpsId;
|
||||
/**
|
||||
* 时间间隔(单位:分钟)
|
||||
*/
|
||||
private Long checkGap;
|
||||
/**
|
||||
* 超时时间(单位:秒)
|
||||
*/
|
||||
private Long checkOutTime;
|
||||
/**
|
||||
* 监测状态:0无效;1有效
|
||||
*/
|
||||
private String checkState;
|
||||
/**
|
||||
* 监测方式:0主动,1被动
|
||||
*/
|
||||
private String checkWay;
|
||||
/**
|
||||
* 进程标志
|
||||
*/
|
||||
private String processIden;
|
||||
private String processIdenName;
|
||||
/**
|
||||
* 进程执行文件
|
||||
*/
|
||||
private String processFile;
|
||||
/**
|
||||
* 进程执行文件路径
|
||||
*/
|
||||
private String processPath;
|
||||
/**
|
||||
* 是否系统启动(NMSAgent启动/第三方自己启动);默认0自启动;1NMSAgent启动
|
||||
*/
|
||||
private String IsControlStart;
|
||||
/**
|
||||
* 控制启动时间
|
||||
*/
|
||||
private Long controlStartTime;
|
||||
/**
|
||||
*上传数据时间间隔单位分钟:不能为空,默认15分钟。监测数据上传到NMSServer周期。
|
||||
*/
|
||||
private Long uploadGap;
|
||||
|
||||
/**
|
||||
* 计划检测时间:针对当前配置信息首次执行时间
|
||||
*/
|
||||
private Long planCheckTime;
|
||||
|
||||
/**
|
||||
* 是否预置监测
|
||||
* 监测类型: 0预置监测类型,1三方监测类型,2页面不显示(如snmp_trap)
|
||||
*/
|
||||
private String isSchedule;
|
||||
|
||||
private String singleNodeId;
|
||||
|
||||
/**
|
||||
* 是否SNMP检测 0:objectSNMP 1:SNMP4j
|
||||
*/
|
||||
private Long isSNMP;
|
||||
public Long getId() {
|
||||
return id;
|
||||
}
|
||||
public void setId(Long id) {
|
||||
this.id = id;
|
||||
}
|
||||
public String getCheckTypeName() {
|
||||
return checkTypeName;
|
||||
}
|
||||
public void setCheckTypeName(String checkTypeName) {
|
||||
this.checkTypeName = checkTypeName;
|
||||
}
|
||||
public Long getCheckTypeId() {
|
||||
return checkTypeId;
|
||||
}
|
||||
public Long getCheckMaxTimes() {
|
||||
return checkMaxTimes;
|
||||
}
|
||||
public void setCheckMaxTimes(Long checkMaxTimes) {
|
||||
this.checkMaxTimes = checkMaxTimes;
|
||||
}
|
||||
public Long getCheckGap() {
|
||||
return checkGap;
|
||||
}
|
||||
public void setCheckGap(Long checkGap) {
|
||||
this.checkGap = checkGap;
|
||||
}
|
||||
public Long getCheckOutTime() {
|
||||
return checkOutTime;
|
||||
}
|
||||
public void setCheckOutTime(Long checkOutTime) {
|
||||
this.checkOutTime = checkOutTime;
|
||||
}
|
||||
public String getCheckState() {
|
||||
return checkState;
|
||||
}
|
||||
public void setCheckState(String checkState) {
|
||||
this.checkState = checkState;
|
||||
}
|
||||
public String getProcessIden() {
|
||||
return processIden;
|
||||
}
|
||||
public void setProcessIden(String processIden) {
|
||||
this.processIden = processIden;
|
||||
}
|
||||
public String getProcessFile() {
|
||||
return processFile;
|
||||
}
|
||||
public void setProcessFile(String processFile) {
|
||||
this.processFile = processFile;
|
||||
}
|
||||
public String getProcessPath() {
|
||||
return processPath;
|
||||
}
|
||||
public void setProcessPath(String processPath) {
|
||||
this.processPath = processPath;
|
||||
}
|
||||
public void setCheckTypeId(Long checkTypeId) {
|
||||
this.checkTypeId = checkTypeId;
|
||||
}
|
||||
public String getCheckWay() {
|
||||
return checkWay;
|
||||
}
|
||||
public void setCheckWay(String checkWay) {
|
||||
this.checkWay = checkWay;
|
||||
}
|
||||
|
||||
public String getIsControlStart() {
|
||||
return IsControlStart;
|
||||
}
|
||||
public void setIsControlStart(String isControlStart) {
|
||||
IsControlStart = isControlStart;
|
||||
}
|
||||
public Long getControlStartTime() {
|
||||
return controlStartTime;
|
||||
}
|
||||
public void setControlStartTime(Long controlStartTime) {
|
||||
this.controlStartTime = controlStartTime;
|
||||
}
|
||||
public Long getPlanCheckTime() {
|
||||
return planCheckTime;
|
||||
}
|
||||
public void setPlanCheckTime(Long planCheckTime) {
|
||||
this.planCheckTime = planCheckTime;
|
||||
}
|
||||
public Long getUploadGap() {
|
||||
return uploadGap;
|
||||
}
|
||||
public void setUploadGap(Long uploadGap) {
|
||||
this.uploadGap = uploadGap;
|
||||
}
|
||||
public String getIsSchedule() {
|
||||
return isSchedule;
|
||||
}
|
||||
public void setIsSchedule(String isSchedule) {
|
||||
this.isSchedule = isSchedule;
|
||||
}
|
||||
public Long getIsSNMP() {
|
||||
return isSNMP;
|
||||
}
|
||||
public void setIsSNMP(Long isSNMP) {
|
||||
this.isSNMP = isSNMP;
|
||||
}
|
||||
public String getNodeGroupsId() {
|
||||
return nodeGroupsId;
|
||||
}
|
||||
public void setNodeGroupsId(String nodeGroupsId) {
|
||||
this.nodeGroupsId = nodeGroupsId;
|
||||
}
|
||||
public String getNodeIpsId() {
|
||||
return nodeIpsId;
|
||||
}
|
||||
public void setNodeIpsId(String nodeIpsId) {
|
||||
this.nodeIpsId = nodeIpsId;
|
||||
}
|
||||
public String getSingleNodeId() {
|
||||
return singleNodeId;
|
||||
}
|
||||
public void setSingleNodeId(String singleNodeId) {
|
||||
this.singleNodeId = singleNodeId;
|
||||
}
|
||||
public String getCheckTypeName1() {
|
||||
return checkTypeName1;
|
||||
}
|
||||
public void setCheckTypeName1(String checkTypeName1) {
|
||||
this.checkTypeName1 = checkTypeName1;
|
||||
}
|
||||
public String getProcessIdenName() {
|
||||
return processIdenName;
|
||||
}
|
||||
public void setProcessIdenName(String processIdenName) {
|
||||
this.processIdenName = processIdenName;
|
||||
}
|
||||
}
|
||||
83
nms_sync/src/com/nms/socket/SocketClientServeice.java
Normal file
83
nms_sync/src/com/nms/socket/SocketClientServeice.java
Normal file
@@ -0,0 +1,83 @@
|
||||
package com.nms.socket;
|
||||
|
||||
import javax.net.ssl.SSLSocket;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
||||
import com.jfinal.kit.PropKit;
|
||||
|
||||
|
||||
public class SocketClientServeice extends SocketUtils{
|
||||
|
||||
public SocketClientServeice(String ip)throws Exception {
|
||||
super(ip,Integer.parseInt(PropKit.use("socket.properties").get("socket.port")));
|
||||
}
|
||||
public SocketClientServeice(String ip, int port)throws Exception {
|
||||
super(ip, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建通讯
|
||||
*
|
||||
* @time Feb 29, 2012-5:39:01 PM
|
||||
*/
|
||||
private void init() throws Exception {
|
||||
logger.debug("目标通讯:>" + ip + " 创建开始" );
|
||||
try {
|
||||
|
||||
// -- create SocketFactory
|
||||
SSLSocketFactory ssf = sSLContext.getSocketFactory();
|
||||
|
||||
// -- create socket
|
||||
socket = (SSLSocket) ssf.createSocket(ip, port);
|
||||
this.in = socket.getInputStream();
|
||||
this.out = socket.getOutputStream();
|
||||
logger.info("create socket success.");
|
||||
|
||||
//2014-1-23 hyx 如果建立socket成功,但是startHandshake握手失败,且未设置超时时间时,则会一直阻塞
|
||||
socket.setSoTimeout(1000*1000); //1000秒
|
||||
|
||||
// -- handshake 握手
|
||||
((SSLSocket) socket).startHandshake();
|
||||
logger.info("handshake success.");
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.error("Target communication:>" + ip + " create failure" + e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public void sendInfoToServer(String cmd,String str) throws Exception{
|
||||
try {logger.info("sendInfoToServer begin:"+str );
|
||||
init();
|
||||
sendMessage(cmd);
|
||||
receiveMessage();
|
||||
sendMessage(str);
|
||||
receiveMessage();
|
||||
logger.info("sendInfoToServer end:"+str );
|
||||
// } catch (Exception e) {
|
||||
// logger.debug("sendInfoToServer 异常:"+str );
|
||||
// throw e;
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
}
|
||||
public String sendInfoToServer2(String cmd,String str) throws Exception{
|
||||
try {logger.info("sendInfoToServer begin:"+str );
|
||||
init();
|
||||
sendMessage(cmd);
|
||||
receiveMessage();
|
||||
sendMessage(str);
|
||||
String result = receiveMessage();
|
||||
sendMessage(SUCCESS);
|
||||
logger.info("sendInfoToServer end:"+str );
|
||||
return result;
|
||||
// } catch (Exception e) {
|
||||
// logger.debug("sendInfoToServer 异常:"+str );
|
||||
// throw e;
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
536
nms_sync/src/com/nms/socket/SocketUtils.java
Normal file
536
nms_sync/src/com/nms/socket/SocketUtils.java
Normal file
@@ -0,0 +1,536 @@
|
||||
package com.nms.socket;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.Socket;
|
||||
import java.net.URL;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.jfinal.kit.PropKit;
|
||||
|
||||
|
||||
/**
|
||||
* SSL 通讯 工具类
|
||||
* @date Feb 29, 2012 10:05:50 AM
|
||||
* @author ZhangGang
|
||||
*
|
||||
*/
|
||||
public abstract class SocketUtils{
|
||||
//文件传输 临时文件命名后缀
|
||||
private static final String TEMP_SUFFIX = ".tp";
|
||||
|
||||
//SSLContext 重置标识
|
||||
private static boolean resetSSLContextFlag = false;
|
||||
//通讯编码方式
|
||||
private static final String DEFAULT_ENCODING = "utf-8";
|
||||
|
||||
//公有密匙库 存放公钥(与其他主机加密通讯对应的密匙)
|
||||
private static final String SERVER_TRUST =PropKit.use("socket.properties").get("ssl.ts"); //System.getProperty("user.dir")+File.separator+"src\\conf\\ssl" + File.separator + "client_ts";
|
||||
|
||||
//私有密匙库 存放私钥(向其他主机发布信息的使用的加密密匙)
|
||||
private static final String SERVER_STORE = PropKit.use("socket.properties").get("ssl.ks"); //System.getProperty("user.dir")+File.separator+"src\\conf\\ssl" + File.separator + "client_ks";
|
||||
|
||||
//密匙库 类型
|
||||
private static final String KEYSTORE_TYPE = "jceks";
|
||||
|
||||
//私有密匙库 密码
|
||||
private static final String SERVER_STORE_PSW = "client";
|
||||
|
||||
//公有密匙库 密码
|
||||
private static final String SERVER_TRUST_PSW = "client";
|
||||
|
||||
//私有密匙 密码
|
||||
private static final String SERVER_KEY_PSW = "123456";
|
||||
|
||||
//加密上下位 类型
|
||||
private static final String SSL_CONTEXT_TYPE = "TLS";
|
||||
|
||||
//日志组件对象
|
||||
protected static Logger logger = Logger.getLogger(SocketUtils.class);
|
||||
|
||||
//Socket 通讯
|
||||
protected Socket socket = null; //Socket
|
||||
|
||||
//字节输出流
|
||||
protected OutputStream out = null;
|
||||
|
||||
//字节输入流
|
||||
protected InputStream in = null; //读取字符流
|
||||
|
||||
//通讯目标主机IP
|
||||
protected String ip = null;
|
||||
|
||||
//通讯目标主机端口
|
||||
protected Integer port = null ;
|
||||
|
||||
//SSL通讯上下文对象
|
||||
protected static SSLContext sSLContext = getSSLContext();
|
||||
|
||||
//缓存字节长度
|
||||
protected static final int BUFF_SIZE = 1024;
|
||||
|
||||
/**
|
||||
* 通讯正常标识
|
||||
*/
|
||||
protected static final String SUCCESS ="success"; //通信操作正常
|
||||
|
||||
/**
|
||||
* 通讯异常或终止标识
|
||||
*/
|
||||
protected static final String FAIL ="fail"; //通信操作异常或终止
|
||||
|
||||
/**
|
||||
* 通讯创建
|
||||
* @param ip 目标主机IP
|
||||
* @param port 目标主机端口
|
||||
* @throws Exception
|
||||
*/
|
||||
public SocketUtils(String ip,Integer port){
|
||||
logger.info("客户端通讯建立 TO:> "+ip);
|
||||
this.ip = ip;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通讯创建
|
||||
* @param client 目标通讯实例
|
||||
*/
|
||||
public SocketUtils(Socket client) {
|
||||
socket = client;
|
||||
logger.info("服务端通讯建立 From:> "+socket.getInetAddress().getHostAddress());
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建SSLContext方法
|
||||
* @time Feb 29, 2012-11:40:24 AM
|
||||
* @return
|
||||
*/
|
||||
public static SSLContext getSSLContext(){
|
||||
SSLContext ctx = null ;
|
||||
|
||||
//- 创建 新的sSLContext 校验
|
||||
//- reCreateSSLContextFlag 为false 且 sSLContext 不为空时无需创建
|
||||
if(!resetSSLContextFlag && sSLContext !=null){
|
||||
return sSLContext;
|
||||
}
|
||||
|
||||
//- 创建 新的sSLContext
|
||||
try {
|
||||
System.setProperty("javax.net.ssl.trustStore", SERVER_TRUST);
|
||||
//-- 初始化私钥证书库
|
||||
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
|
||||
KeyStore ks = KeyStore.getInstance(KEYSTORE_TYPE);
|
||||
ks.load(new FileInputStream(SERVER_STORE), SERVER_STORE_PSW.toCharArray());//载入keystore
|
||||
kmf.init(ks, SERVER_KEY_PSW.toCharArray());
|
||||
//-- 初始化公钥证书库
|
||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
|
||||
KeyStore tks = KeyStore.getInstance(KEYSTORE_TYPE);
|
||||
tks.load(new FileInputStream(SERVER_TRUST), SERVER_TRUST_PSW.toCharArray());//载入keystore
|
||||
tmf.init(tks);
|
||||
//-- 初始化SSL通讯上下文对象 和 SSL通讯工厂
|
||||
ctx = SSLContext.getInstance(SSL_CONTEXT_TYPE);
|
||||
ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(),new SecureRandom());
|
||||
logger.info("证书库载入成功(load keystore success.)");
|
||||
resetSSLContextFlag = false;
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
logger.error("",e);
|
||||
} catch (CertificateException e) {
|
||||
logger.error("",e);
|
||||
} catch (FileNotFoundException e) {
|
||||
logger.error("",e);
|
||||
} catch (IOException e) {
|
||||
logger.error("",e);
|
||||
} catch (KeyStoreException e) {
|
||||
logger.error("",e);
|
||||
} catch (UnrecoverableKeyException e) {
|
||||
logger.error("",e);
|
||||
} catch (KeyManagementException e) {
|
||||
logger.error("",e);
|
||||
}finally{
|
||||
}
|
||||
return ctx;
|
||||
}
|
||||
|
||||
/**
|
||||
* 字符流 接收信息
|
||||
**/
|
||||
protected void sendMessage(String msg) throws UnsupportedEncodingException {
|
||||
|
||||
PrintWriter pw = new PrintWriter(new OutputStreamWriter(out,DEFAULT_ENCODING));
|
||||
pw.println(msg);
|
||||
pw.flush();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 字符流 发送信息
|
||||
*/
|
||||
protected String receiveMessage()throws UnsupportedEncodingException ,IOException{
|
||||
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(in,DEFAULT_ENCODING));
|
||||
return br.readLine();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Object 形式 发送信息
|
||||
*/
|
||||
protected void sendObject(Object object) throws IOException{
|
||||
|
||||
ObjectOutputStream oos = new ObjectOutputStream(out);
|
||||
oos.writeObject(object);
|
||||
oos.flush();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Object 形式 接收信息
|
||||
*/
|
||||
protected Object receiveObject() throws ClassNotFoundException,IOException{
|
||||
|
||||
ObjectInputStream ois = new ObjectInputStream(in);
|
||||
return ois.readObject();
|
||||
}
|
||||
|
||||
/**
|
||||
* 字节流 发送单个文件
|
||||
**/
|
||||
public void sendFile(File file) throws IOException{
|
||||
|
||||
if(file==null ||file.length()==0){
|
||||
return;
|
||||
}
|
||||
|
||||
FileInputStream fis = null;
|
||||
|
||||
try {
|
||||
|
||||
//发送文件大小
|
||||
sendMessage(file.length() + "");
|
||||
|
||||
//发送文件内容
|
||||
int len;
|
||||
byte[] buff = new byte[BUFF_SIZE];
|
||||
fis = new FileInputStream(file);
|
||||
|
||||
while ((len = fis.read(buff)) != -1) {
|
||||
|
||||
//将读取的内容写入文件
|
||||
out.write(buff, 0, len);
|
||||
|
||||
}
|
||||
|
||||
out.flush();
|
||||
// } catch (IOException e) {
|
||||
// logger.error("单个发送文件失败!\n"+ExceptionPrintUtils.printExceptionStack(e));
|
||||
} finally{
|
||||
if(fis!=null){
|
||||
//try {
|
||||
fis.close();
|
||||
fis=null;
|
||||
//} catch (IOException e) {
|
||||
//logger.error("",e);
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 字节流 接收单个文件 并保存
|
||||
*/
|
||||
protected void receiveFile(String filePath) throws IOException{
|
||||
|
||||
FileOutputStream fos = null;
|
||||
|
||||
File file = new File(filePath);
|
||||
if(!file.getParentFile().exists()){
|
||||
file.getParentFile().mkdirs();
|
||||
}
|
||||
|
||||
if(!file.exists()){
|
||||
file.createNewFile();
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
//接收文件大小
|
||||
long fileSize = Long.parseLong(receiveMessage());
|
||||
|
||||
//接收文件内容
|
||||
byte[] buff = new byte[BUFF_SIZE];
|
||||
fos = new FileOutputStream(filePath);
|
||||
int nRead = 0;
|
||||
|
||||
//单个文件循环读取
|
||||
rfile:while ((nRead = in.read(buff, 0, (int)(BUFF_SIZE<fileSize?BUFF_SIZE:fileSize))) > 0) {
|
||||
fos.write(buff,0,nRead);
|
||||
fos.flush();
|
||||
fileSize -= nRead;
|
||||
if(fileSize<=0){
|
||||
break rfile;
|
||||
}
|
||||
}
|
||||
fos.close();
|
||||
|
||||
// } catch (IOException e) {
|
||||
// logger.error("接收文件失败!",e);
|
||||
}finally{
|
||||
if(fos!=null){
|
||||
//try {
|
||||
fos.close();
|
||||
fos = null;
|
||||
//} catch (IOException e) {
|
||||
// logger.error("",e);
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 批量上传文件
|
||||
* @param dir 本地文件集合根目录绝对路径
|
||||
* @param fileList 上传的文件列表
|
||||
* (DC未使用)
|
||||
*/
|
||||
protected void sendFileByBath(String dir, List<File> fileList) {
|
||||
ObjectOutputStream oos = null;
|
||||
FileInputStream fis = null;
|
||||
|
||||
try {
|
||||
// 第一步发送本地根目录地址(用于地址截取)保证fileList的目录结构完整性
|
||||
this.sendMessage(dir);
|
||||
String result = this.receiveMessage();
|
||||
logger.debug("根目录路径通信状态: " + result);
|
||||
// 第二步 用ObjectOutputStream工具类 发送file对象信息 用于文件名,文件目录,文件大小的获取
|
||||
oos = new ObjectOutputStream(out);
|
||||
List<String[]> fileStrList = new ArrayList<String[]>();
|
||||
for(File f : fileList){
|
||||
String[] tmpArr = new String[]{
|
||||
f.getAbsolutePath(), f.length() + ""
|
||||
};
|
||||
fileStrList.add(tmpArr);
|
||||
}
|
||||
oos.writeObject(fileStrList);
|
||||
// 第三部,发送文件
|
||||
byte[] buff = new byte[BUFF_SIZE];
|
||||
int len = 0;
|
||||
// 循环上传文件
|
||||
for (File file : fileList) {
|
||||
fis = new FileInputStream(file);
|
||||
while ((len = fis.read(buff)) != -1) {// 将读取的内容输出流
|
||||
out.write(buff, 0, len);
|
||||
}
|
||||
out.flush();
|
||||
fis.close();
|
||||
fis = null;
|
||||
}
|
||||
logger.debug("多文件上传结束,共 "+(fileList==null ? 0 : fileList.size())+ "个文件");
|
||||
} catch (IOException e) {
|
||||
logger.error("Fail to send file",e);
|
||||
} finally {
|
||||
try {
|
||||
if (fis != null) {
|
||||
fis.close();
|
||||
fis = null;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("",e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量接收文件 保存为 List<byte[]>形式
|
||||
* @param newDir
|
||||
* (DC未使用)
|
||||
*/
|
||||
protected LinkedList<byte []> receiveFileBytesByBath() {
|
||||
LinkedList<byte []> bsList = new LinkedList<byte []>();
|
||||
ObjectInputStream ois = null;
|
||||
try {
|
||||
//获取集合文件路径
|
||||
String oldDir = this.receiveMessage();
|
||||
this.sendMessage(SUCCESS);
|
||||
ois = new ObjectInputStream(in);
|
||||
List<String[]> fileList = (List<String[]>)ois.readObject();
|
||||
|
||||
//循环读取多个文件
|
||||
if(fileList != null && fileList.size()>0){
|
||||
for(String[] arr : fileList){
|
||||
|
||||
int fileLength = Integer.parseInt(arr[1]); //大小
|
||||
byte[] buff0 = new byte[fileLength];
|
||||
byte[] buff = new byte[BUFF_SIZE];
|
||||
int nRead = 0;
|
||||
int j = 0;
|
||||
|
||||
//单个文件循环读取
|
||||
rfile:while ((nRead = in.read(buff, 0, (int)(BUFF_SIZE<fileLength?BUFF_SIZE:fileLength))) > 0) {
|
||||
|
||||
//将数据存入集合
|
||||
for(int i = 0 ; i < nRead ; i++){
|
||||
buff0[j] = buff[i];
|
||||
j++;
|
||||
}
|
||||
|
||||
logger.debug(j+" "+buff0.length);
|
||||
|
||||
fileLength -= nRead;
|
||||
if(fileLength<=0){
|
||||
break rfile;
|
||||
}
|
||||
}
|
||||
|
||||
bsList.add(buff0);
|
||||
}
|
||||
}
|
||||
logger.debug("共接收 "+(fileList==null ? 0 : fileList.size())+ "个文件 存入内存");
|
||||
} catch (IOException e) {
|
||||
logger.error("",e);
|
||||
bsList.clear();
|
||||
} catch (ClassNotFoundException e) {
|
||||
logger.error("",e);
|
||||
bsList.clear();
|
||||
}
|
||||
return bsList;
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭通讯
|
||||
* @time Aug 28, 2011-8:35:21 PM
|
||||
*/
|
||||
protected void close(){
|
||||
try {
|
||||
if(in!=null){in.close();in=null;}
|
||||
if(out!=null){out.close();out=null;}
|
||||
if(socket!=null){socket.close();socket=null;}
|
||||
} catch (IOException e) {
|
||||
logger.error("",e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @time Mar 12, 2012-11:08:43 AM
|
||||
* @param url
|
||||
* @return -1文件不存在 0文件长度为0 N文件长度
|
||||
*/
|
||||
public static long getRemoteFileSize(String url) {
|
||||
|
||||
long size = -1;
|
||||
|
||||
try {
|
||||
HttpURLConnection conn = (HttpURLConnection) (new URL(url)).openConnection();
|
||||
|
||||
//请求状态 大于等于400 均为 represent access error
|
||||
if(conn.getResponseCode() >= 400){
|
||||
logger.error("HttpURLConnection Error Code:"+conn.getResponseCode());
|
||||
return -2;
|
||||
}
|
||||
|
||||
//获取ContentLength 并 关闭连接
|
||||
size = conn.getContentLength();
|
||||
conn.disconnect();
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("",e);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* 上传文件时,判断该文件是否已存在,如存在,则在后面加入时间戳
|
||||
*
|
||||
* @param fileName
|
||||
* 单纯的文件名
|
||||
*/
|
||||
public static String addTimeTagForFileName(String fileName,boolean isDirectory) {
|
||||
|
||||
Calendar calendar = new GregorianCalendar();
|
||||
long timestamp = calendar.getTimeInMillis();
|
||||
|
||||
// 去掉后缀的文件名
|
||||
String fielType = "";
|
||||
|
||||
if (!isDirectory && fileName.lastIndexOf(".") != -1) {
|
||||
fielType = fileName.substring(fileName.lastIndexOf("."));
|
||||
fileName = fileName.substring(0, fileName.lastIndexOf("."));
|
||||
}
|
||||
|
||||
fileName += "_" + timestamp+""+((int)(Math.random()*1000));
|
||||
fileName += fielType;
|
||||
|
||||
return fileName;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 断点续传 文件参数信息
|
||||
* @time Apr 17, 2012-9:56:56 AM
|
||||
* @param filePath
|
||||
* @return
|
||||
*/
|
||||
// public static FileComment getFileParams(String filePath) throws Exception{
|
||||
// File file = new File(filePath);
|
||||
// FileComment fileParam = new FileComment(file.getName(),file.length(),0l,file.exists()?MD5Util.getFileMD5String(file):null);
|
||||
// if(!file.exists()){
|
||||
// file = new File(filePath+TEMP_SUFFIX);
|
||||
// fileParam.setStart(file.length());
|
||||
// }
|
||||
// return fileParam;
|
||||
// }
|
||||
|
||||
public static void pl(Object object){
|
||||
System.out.println(object==null?null:object.toString());
|
||||
}
|
||||
}
|
||||
|
||||
class TempFile {
|
||||
private String fileName = null;
|
||||
private File File = null;
|
||||
public String getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
public void setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
}
|
||||
public File getFile() {
|
||||
return File;
|
||||
}
|
||||
public void setFile(File file) {
|
||||
File = file;
|
||||
}
|
||||
}
|
||||
BIN
nms_sync/ssl/client_ks
Normal file
BIN
nms_sync/ssl/client_ks
Normal file
Binary file not shown.
BIN
nms_sync/ssl/client_ts
Normal file
BIN
nms_sync/ssl/client_ts
Normal file
Binary file not shown.
Reference in New Issue
Block a user