initial commit

This commit is contained in:
chenjinsong
2018-09-27 16:21:05 +08:00
commit dc91c4c987
2011 changed files with 408920 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
package com.nms.thread;
import java.util.concurrent.Future;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.pool.ThreadPoolConfig;
/**
* DC心跳线程启动控制器
* @date Jan 21, 2013 4:28:00 PM
* @author ZhangGang
*
*/
public class DCHandShakeManagerThread implements Runnable{
private Logger logger = Logger.getLogger(DCHandShakeManagerThread.class);
private DCHandShakeThread handShakeThread = new DCHandShakeThread();
public void run() {
// Thread.currentThread().setName("DC心跳监测管理线程");
Thread.currentThread().setName("DC Heartbeat Monitoring Management Thread");
//将线程运行程序尽可能的catch捕获异常
try {
//- 检查线程运行状态 运行中无操作
Future<?> future = ThreadPoolCommon.threadManagerMap.get(ThreadPoolConfig.DC_HANDSHAKE);
if(future != null && !future.isCancelled() && !future.isDone()){ //运行中
logger.debug("DC心跳监测线程 运行中 不再启动");
return ;
}else{
future = ThreadPoolCommon.service.submit(handShakeThread);
ThreadPoolCommon.threadManagerMap.put(ThreadPoolConfig.DC_HANDSHAKE, future);
logger.debug("DC心跳监测线程 空闲中 再次启动");
}
} catch (Exception e) {
logger.error("",e);
}finally{
logger.info("执行结束");
}
}
}

View File

@@ -0,0 +1,506 @@
package com.nms.thread;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import nis.nms.domains.NmsErrorInfo;
import nis.nms.domains.ServerTable;
import nis.nms.util.ConnectionOracle;
import nis.nms.util.Constant;
import nis.nms.util.IpCovert;
import nis.nms.util.LocalAddress;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.pool.ThreadConstants;
import com.nms.thread.socket.SSLClient;
/**
* DC握手状态及管理节点数据集合解析
* Map<dcip,Integer[]{state,nodeNum}>
* dcip DC通讯IP
* state DC状态值1 通讯正常、0通讯失败、-1Ping失败
* NodeNum 管理节点数量 0表示未分配节点其他表示已分配节点
*
* 业务操作:
* Web启动时查询数据库 对数据库信息进行统计(未完成)
* 对DC进行定期通讯握手测试最多socket尝试三次ping一次已完成
* 情景处理:(已完成)
* DC通讯成功【1】 比较集合中state值比较并变更 [1:变更为1不作操作][0:变更为1做系统告警恢复通讯正常][-1:变更为1做系统告警恢复通讯正常]
* DC通讯失败ping成功【0】 比较集合中state值并变更 [1:变更为0做系统告警异常通讯失败][0:不做操作][-1:变更为0做系统告警异常Ping恢复通讯失败]
* DC通讯失败ping失败【-1】 比较集合中state值并变更 [1:变更为-1做系统告警异常Ping失败][0:变更为-1做系统告警异常Ping失败][-1:不做操作]
* 告警信息及告警恢复入库
*
* @date Apr 17, 2013 10:32:39 AM
* @author ZhangGang
*
*/
public class DCHandShakeThread implements Callable<Object>{
private Logger logger = Logger.getLogger(DCHandShakeThread.class);
private Map<String,Long> DCIPIDMap = new HashMap<String,Long>();
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private List<NmsErrorInfo> errorInfoList = new LinkedList<NmsErrorInfo>();
@Override
public Object call() throws Exception {
Thread.currentThread().setName("DCHandShakeThread");
logger.debug("开始执行");
ConnectionOracle conn = null;
try {
conn = ConnectionOracle.getConnection();
/* 获取所有符合握手操作业务的DC
* 业务: 1、查询所有DC
* 2、对失效DC查询异常校验是否需要再次握手
* */
List<ServerTable> dclist = getHandShakeDCList(conn);
/* 循环DC并握手通讯 尝试次数3 间隔20秒 */
int totalCount = dclist.size();
for(int i=1; i<4; i++){
CountDownLatch downLatch = new CountDownLatch(dclist.size());
handShakeForDC(dclist,downLatch);
downLatch.await();
pl(""+i+"次 握手 有效数:"+(totalCount-dclist.size())+" 无效数:"+dclist.size());
//判断 最多尝试3次
if(dclist.size()>0){
Thread.sleep(20000);
}else{
break;
}
}
/* 剩余失败DC 进行Ping 握手 判断主机是否存在 */
CountDownLatch downLatch = new CountDownLatch(dclist.size());
pingHandshake(dclist,downLatch);
downLatch.await();
/* 异常DC告警分析 */
if(errorInfoList!= null && errorInfoList.size()>0){
String sql = "insert into nms_error_info (error_code,error_time,state_update_time,errort_getip,errort_ip,error_state,error_des) values (?,to_date(?,'yyyy-mm-dd hh24:mi:ss'),to_date(?,'yyyy-mm-dd hh24:mi:ss'),?,?,?,?)";
List<String[]> paramsList = new LinkedList<String[]>();
for(NmsErrorInfo nei : errorInfoList){
String[] params = new String[] {
nei.getErrorCode(),
format.format(nei.getErrorTime()),
format.format(nei.getStateUpdateTime()),
nei.getErrortGetip(),
nei.getErrortIp(),
(nei.getErrorState() == null ? 0 : nei.getErrorState().intValue())+ "",
nei.getErrorDesc()};
paramsList.add(params);
}
conn.dbUpdateByBatch(sql, paramsList);
}
// if(dclist!= null && dclist.size()>0){
// for(ServerTable st : dclist){
// Integer [] stData = ThreadPoolCommon.DCStatusMap.get(st.getServerIp());
/*
int newState =
* createErrorInfo(dcIp, newState, oldState);
errorInfoList.add();
Map<String, String> error = new HashMap<String, String>();
// error.put("ERROR_CODE", ""); //异常code非空
error.put("ERROR_TIME", format.format(Calendar.getInstance().getTime())); //异常产生时间:非空
error.put("ERRORT_GETIP", LocalAddress.getRealIp()); //异常信息提供IP非空
error.put("ERRORT_IP", st.getServerIp()); //异常信息产生IP非空
error.put("ERROR_STATE", "1"); //非空 异常状态0已解决1未解决
*/
//目标主机不存在
/*if(stData[0]== ThreadConstants.HAND_SHAKE_RESULT_PING_FAILED){ //ping Failed
error.put("ERROR_CODE", Constant.ERROR_NET_WORK_ERROR); //异常code非空
}
else if(stData[0]==null || stData[0]== ThreadConstants.HAND_SHAKE_RESULT_SOCKET_FAILED || stData[0]== ThreadConstants.HAND_SHAKE_RESULT_PING_SUCCESS){ //ping OK socket Down
error.put("ERROR_CODE", Constant.HANDSHAKE_ERROR); //异常code非空
}
else if(stData[0]== ThreadConstants.HAND_SHAKE_RESULT_SOCKET_SUCCESS){ //socket success正常
error.put("ERROR_CODE", Constant.); //异常code非空
}
else if(stData[0]== ThreadConstants.HAND_SHAKE_RESULT_UNKNOWN_FAILED){ //其他异常
error.put("ERROR_CODE", Constant.ERROR_UNKNOWN_ERROR); //异常code非空
}
conn.insertObj("nms_error_info", error);*/
// }
// }
//分析告警DC Down
// NmsErrorInfo info = new NmsErrorInfo(errorCode, errorTime,
// errortGetip, errortIp, errorState,
// stateUpdateTime, stateUpdateUserid);
/* 计算DC变更IP */
// updateIPSegment(dclist,conn);
/* 将变更后的信息重新发送到 */
} catch (Exception e) {
logger.error("", e);
}finally{
errorInfoList.clear();//清理集合数据
try {
if(conn != null){
conn.close();
}
} catch (Exception e2) {
}
}
return null;
}
public void pl(Object obj){
System.out.println(obj==null?null:obj.toString());
}
/**
* 查询符合握手操作的DCList
* 业务: 1、查询所有DC
* 2、对失效DC查询异常校验是否需要再次握手
* @time Jan 22, 2013-3:23:57 PM
* @param serverList
* @return
*/
public List<ServerTable> getHandShakeDCList(ConnectionOracle conn) throws Exception{
List<ServerTable> dclist = new ArrayList<ServerTable>();
String selectSql = "select st.id,st.server_ip,st.server_state from server_table st where st.server_state =0";
ArrayList<String> fields = new ArrayList<String>();
fields.add("id");
fields.add("server_ip");
fields.add("server_state");
ArrayList<Map<String, String>> mapsList = conn.dbSelect(selectSql,fields);
StringBuffer falseDCId = new StringBuffer("0");
if(mapsList!= null && mapsList.size()>0){
for(Map<String, String> map : mapsList){
ServerTable serverTable = new ServerTable();
serverTable.setId(StringUtils.isBlank(map.get("id"))?null:Long.parseLong(map.get("id")));
serverTable.setServerIp(map.get("server_ip"));
serverTable.setServerState(StringUtils.isBlank(map.get("server_state"))?null:Long.parseLong(map.get("server_state")));
dclist.add(serverTable);
if(serverTable.getServerState()==1l){//失效DC
falseDCId.append(","+serverTable.getId());
}
DCIPIDMap.put(serverTable.getServerIp(), serverTable.getId());
}
//查询校验失效DC 的异常信息,判断是否做再次通讯处理(待完善)
}
return dclist;
}
/**
*
* @time Jan 22, 2013-10:54:57 AM
* @param ipList
*/
public int handShakeForDC(List<ServerTable> serverList,CountDownLatch downLatch) {
if(serverList!= null && serverList.size()>0){
//单线程操作
Semaphore handShakeSemaphore = new Semaphore(10,true);
for(int i = 0;i<serverList.size();i++){
ServerTable st = serverList.get(i);
try {
handShakeSemaphore.acquire(); //申请监测信号
// pl("handshake for:"+st.getServerIp());
SSLClient client = new SSLClient(st.getServerIp(),SSLClient.HAND_SHAKE_DC,null);
//-- 启动新线程
final Future<?> future = ThreadPoolCommon.service.submit(client);
//-- 获取结果
Integer val = (Integer) future.get();
//-- 结果保存
Integer[] data = ThreadPoolCommon.DCStatusMap.get(st.getServerIp());
if(data==null){ //新DC第一次握手结果不做告警处理
data = new Integer[]{val,0};
}else{
//val值为null 或1 1为通讯正常作通讯恢复判断
if(val!=null && val.intValue()==1){
NmsErrorInfo nei = createErrorInfo(st.getServerIp(),val,data[0]);
if(nei!=null){
errorInfoList.add(nei);
}
data[0] = val;
}
}
ThreadPoolCommon.DCStatusMap.put(st.getServerIp(), data);
//-- 队列处理
if(val == ThreadConstants.HAND_SHAKE_RESULT_SOCKET_SUCCESS){
//删除该DC无需再次轮询
serverList.remove(i);
i--;
}
} catch (Exception e) {
logger.error("",e);
}finally{
handShakeSemaphore.release();
downLatch.countDown();
}
}
}
return 1;
}
private NmsErrorInfo createErrorInfo(String dcIp,Integer newState,Integer oldState) throws Exception{
if(dcIp==null
|| oldState == null
|| (newState!=null && newState.intValue() == oldState.intValue())){
return null;
}
NmsErrorInfo nei = new NmsErrorInfo();
nei.setErrorCode(Constant.HANDSHAKE_ERROR);
nei.setErrorTime(Calendar.getInstance().getTime());
nei.setStateUpdateTime(Calendar.getInstance().getTime());
//nei.setErrortGetip(LocalAddress.getRealIp());
nei.setErrortGetip(LocalAddress.getLocalIp());
nei.setErrortIp(dcIp);
nei.setErrorDesc("WEB与DC握手");
if(newState!=null && newState == 1){ //正常
if(oldState != 1){
nei.setErrorState(2l); //非空 异常状态0已解决1未解决2已恢复
}
}else{ //异常
if(oldState == 1){
nei.setErrorState(1l); //非空 异常状态0已解决1未解决2已恢复
}
}
return nei;
}
/**
* 根据比较新旧DC握手状态
* 获取告警描述信息
* @time Apr 17, 2013-10:38:15 AM
* @param newState
* @param oldState
* @return
*/
private String getErrorInfoDesc(int newState,int oldState){
if(newState == oldState){
return null;
}
// 情景处理:
// DC通讯成功【1】 比较集合中state值比较并变更 [1:变更为1不作操作][0:变更为1做系统告警恢复通讯恢复][-1:变更为1做系统告警恢复通讯恢复]
// DC通讯失败ping成功【0】 比较集合中state值并变更 [1:变更为0做系统告警异常Ping正常通讯失败][0:不做操作][-1:变更为0做系统告警异常Ping恢复通讯失败]
// DC通讯失败ping失败【-1】 比较集合中state值并变更 [1:变更为-1做系统告警异常Ping失败][0:变更为-1做系统告警异常Ping失败][-1:不做操作]
switch (newState) {
case -1:
return "Ping失败";
case 0:
if (oldState == 1)
return "Ping正常通讯失败";
else if(oldState ==-1)
return "Ping恢复通讯失败";
break;
case 1:
return "通讯恢复";
default:
return null;
}
return null;
}
public int pingHandshake(List<ServerTable> serverList,CountDownLatch downLatch) {
if(serverList!= null && serverList.size()>0){
//单线程操作
Semaphore handShakeSemaphore = new Semaphore(5,true);
for(int i = 0;i<serverList.size();i++){
ServerTable st = serverList.get(i);
try {
handShakeSemaphore.acquire(); //申请监测信号
pl("ping for:"+st.getServerIp());
PingThread pingThread = new PingThread(st.getServerIp());
//-- 启动新线程
final Future<?> future = ThreadPoolCommon.service.submit(pingThread);
//-- 获取结果
Integer val = (Integer) future.get();
//-- 结果处理
Integer[] data = ThreadPoolCommon.DCStatusMap.get(st.getServerIp());
if(data==null){
data = new Integer[]{val,0};
}else{
//val值为null 或3 或4 3为Ping正常通讯失败null 或 4为通讯失败 对节点做最终状态处理
val = (val==null || val.intValue()==ThreadConstants.HAND_SHAKE_RESULT_PING_FAILED)?-1:0;
if(val!=null && data[0]!=null){
NmsErrorInfo nei = createErrorInfo(st.getServerIp(),val,data[0]);
if(nei!=null){
errorInfoList.add(nei);
}
}
data[0] = val;
}
ThreadPoolCommon.DCStatusMap.put(st.getServerIp(), data);
//-- 队列处理
/*if(val == ThreadConstants.HAND_SHAKE_RESULT_PING_SUCCESS){
//删除该DC无需再次轮询
serverList.remove(i);
i--;
}*/
} catch (Exception e) {
logger.error("",e);
}finally{
handShakeSemaphore.release();
downLatch.countDown();
}
}
}
return 1;
}
/**
* 更新各DC管理的IP段
* @time Jan 22, 2013-10:54:57 AM
* @param ipList
*/
public int updateIPSegment(List<ServerTable> errorDCList,ConnectionOracle conn) throws Exception{
//--查询节点类型
String selectSql = "select distinct nt.node_type from node_table nt where nt.node_type is not null";
ArrayList<String> fields = new ArrayList<String>();
fields.add("node_type");
ArrayList<Map<String, String>> nTypeMapsList = conn.dbSelect(selectSql,fields);
StringBuffer falseDCId = new StringBuffer("0");
LinkedList<String> nTypeList = new LinkedList<String>();
if(nTypeMapsList!= null && nTypeMapsList.size()>0){
for(Map<String, String> map : nTypeMapsList){
nTypeList.add(map.get("node_type"));
}
}
//--统计DC状态
int OKCount = 0; //有效数
List<String> OKDCIpList = new ArrayList<String>(); //有效Ip集合
Iterator<Entry<String, Integer[]>> ite = ThreadPoolCommon.DCStatusMap.entrySet().iterator();
while (ite.hasNext()) {
Map.Entry<java.lang.String,Integer[]> entry = (Map.Entry<java.lang.String,Integer[]>) ite.next();
if(entry.getValue()[0] == 1){ //有效
OKCount++;
OKDCIpList.add(entry.getKey());
}
}
//--正序排序
Collections.sort(OKDCIpList,new Comparator(){
@Override
public int compare(Object o1, Object o2) {
return o1.toString().compareTo(o2.toString());
}
});
//展示
for(String ip :OKDCIpList){
pl("ip>"+ip);
}
//--查询有效节点,条件节点类型,节点IP有小到大排序
if(nTypeList.size()>0){
for(String nType : nTypeList){
String nSql = "select distinct nt.ipn from node_table nt where nt.node_state=0 and nt.node_type=? order by nt.ipn asc";
ArrayList<String> ipnList = conn.dbSelectSingleColumn(nSql,nType);
int segmentSize = ipnList.size()%OKCount==0?ipnList.size()%OKCount:(ipnList.size()%OKCount)+1;
//-- 分配操作
long minIpn=0,maxipn=0,segmentCount=0;
int i = 0;
for(String dcIp : OKDCIpList){
ipnF:for(;i<ipnList.size();i++){
long ipn = Long.parseLong(ipnList.get(i));
//最小值
if(minIpn == 0){
minIpn = ipn;
}
//最大值
if(maxipn<ipn){
maxipn = ipn;
}
segmentCount++;
//跳出判断
if(segmentCount == segmentSize){
minIpn = 0;
maxipn = 0;
segmentCount = 0;
break ipnF;
}
}
//更新或保存
saveOrUpdateIPSegment(conn,DCIPIDMap.get(dcIp),dcIp,nType,minIpn,maxipn,segmentCount);
}
}
}
return 1;
}
/**
* 插入或保存IPSegment操作
* @time Jan 22, 2013-3:49:45 PM
* @param conn
* @param dcIp
* @param nType
* @param minIpn
* @param maxipn
* @param segmentCount
*/
public void saveOrUpdateIPSegment(ConnectionOracle conn,Long dcId,String dcIp,String nType,Long minIpn,Long maxipn,Long segmentCount) throws Exception{
//查询是否存在
// pl(dcId);
String selectSql = "select sis.id from server_ip_segment sis where sis.SERVER_ID=? and sis.node_type=?";
ArrayList<String> fields = new ArrayList<String>();
fields.add("node_type");
ArrayList<String> sisIdList = conn.dbSelectSingleColumn(selectSql,dcId,nType);
if(sisIdList!= null && sisIdList.size()>0){ //update
conn.dbUpdate("update server_ip_segment set start_ip=? ,start_ipn=?,end_ip=?,end_ipn=?,node_count=? where id=? ", IpCovert.longToIP(minIpn),minIpn,IpCovert.longToIP(maxipn),maxipn,segmentCount,sisIdList.get(0));
}else{ //insert
Map<String, String> obj = new HashMap<String, String>();
obj.put("SERVER_ID", dcId+"");
obj.put("start_ip", IpCovert.longToIP(minIpn));
obj.put("start_ipn", minIpn+"");
obj.put("end_ip", IpCovert.longToIP(maxipn));
obj.put("end_ipn", maxipn+"");
obj.put("segment_state", "1");
obj.put("node_type", nType+"");
obj.put("node_count", segmentCount+"");
conn.insertObj("server_ip_segment", obj);
}
}
/**
* @time Jan 21, 2013-4:27:33 PM
* @param args
*/
public static void main(String[] args) {
ThreadPoolCommon.scheduled.submit(new DCHandShakeThread());
// ThreadPoolCommon.scheduled.scheduleAtFixedRate(new DCHandShakeManagerThread(),1,10,TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,109 @@
package com.nms.thread;
import java.io.FileInputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import com.nms.thread.service.ThreadService;
import nis.nms.bean.SetInfo;
import nis.nms.util.ConnectionOracle;
public class DetectDatasTimeoutAlarmThread implements Runnable{
private Logger logger = Logger.getLogger(DetectDatasTimeoutAlarmThread.class);
private Date startTime = Calendar.getInstance().getTime();
@Override
public void run() {
// Thread.currentThread().setName("监测数据超时告警线程");
Thread.currentThread().setName("Monitoring Data Timeout Alarm Thread");
ConnectionOracle conn = null;
try {
conn = ConnectionOracle.getConnection();
ThreadService service = new ThreadService(conn);
//先检查握手监测是否正常,若正常再检查其他监测,若超周期无数据,则无需检查其他监测(因为握手如果异常,其他监测也会异常)
FileInputStream inStream = null;
URL url2 = ThreadService.class.getResource("/myconfig.properties");
Properties properties = new Properties();
inStream = new FileInputStream(url2.getPath().replace("%20", " "));
properties.load(inStream);
String checkTypeName = properties.getProperty("detec.nmsclient.str");
// String checkTypeName = Constants.NMS_CLIENT_CHECKTYPENAME;
if(StringUtils.isEmpty(checkTypeName)){
checkTypeName = "NMSClient";
}
SetInfo nmsClientSet = service.getSetInfoByCheckName(1,checkTypeName); //查询握手监测相关信息
List<String[]> alarm = new ArrayList<String[]>();
List<String []> nmsClientAlarmList = service.detectDatasTimeoutCheck(nmsClientSet,startTime,null);//监测当前监测是否超时无数据
String errorSeqIds = "-1";//握手监测异常的节点
if(nmsClientAlarmList != null && nmsClientAlarmList.size()>0){//握手监测异常:有异常的节点(此次检查产生超周期告警信息)
alarm.addAll(nmsClientAlarmList);
for(String [] datas: nmsClientAlarmList){
try {
//service.resoveAlarms(datas);
if(StringUtils.isNotBlank(datas[1])) {//seqId和ip不为空
errorSeqIds = errorSeqIds + "," + datas[1];
}
logger.info("握手监测超时告警:"+datas[9]);
} catch (Exception e) {
logger.error("Parsing handshake monitoring timeout anomaly information ", e);
}
}
}else {
logger.info("当前时间 所有节点握手监测均正常");
}
// //握手监测正常,查询握手最新时间的监测是否有异常的
List<String> seqIds = new ArrayList<String>();
seqIds = service.searchTimeoutNmsClient(nmsClientSet);
for(String seqId:seqIds) {
if(StringUtils.isNotBlank(seqId) && !(","+errorSeqIds+",").contains((","+seqId+","))) {
errorSeqIds = errorSeqIds + "," + seqId;
}
}
logger.info("超时无握手监测信息节点的seqId"+errorSeqIds);
List<SetInfo> setInfoList = service.getAllSetInfo(1,null); //查询有效SetInfo信息
for(SetInfo setInfo : setInfoList){
List<String []> alarmInfoList = service.detectDatasTimeoutCheck(setInfo,startTime,errorSeqIds);//监测当前监测是否超时无数据
if(alarmInfoList != null && alarmInfoList.size()>0){
alarm.addAll(alarmInfoList);
/*for(String [] datas: alarmInfoList){
try {
// System.out.println("--"+JSONArray.fromObject(datas));
service.resoveAlarms(datas);
} catch (Exception e) {
logger.error("Parsing the abnormity of the timeout anomaly information of the monitoring data", e);
}
}*/
}
}
int size = alarm.size();
logger.debug("监测超时告警:共 " + size +" 条,开始批量入库");
boolean r = service.resoveAlarmsBatch(alarm);
logger.debug("监测告警批量入库 -> "+ (r?"成功":"失败"));
} catch (Exception e) {
logger.error("Monitoring data timeout anomaly",e);
}finally{
try {
if(conn != null){
conn.close();
}
} catch (Exception e2) {
}
}
}
public static void main(String [] args){
new Thread(new DetectDatasTimeoutAlarmThread()).start();
}
}

View File

@@ -0,0 +1,112 @@
package com.nms.thread;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.pool.ThreadPoolConfig;
import com.nms.thread.socket.SSLClient;
import nis.nms.util.BaseAction;
/**
* 监测数据入库 控制 线程主动与dc 通信通知dc 入库监测数据
* @author dell
*
*/
public class DetectInsertThread implements Runnable{
private static final Logger logger = Logger.getLogger(DetectInsertThread.class);
public static boolean flag = true;//当前时间是否可以入库
public static String currentDc;//当前入库的dc
private static Semaphore insertSem = new Semaphore(1);
//通知dc 入库,超过 此值 继续下一个 dc 入库,同时 尝试通知 超时的dc 暂停 入库单位s
private static long timeout = Integer.parseInt(BaseAction.rb.getString("detect.insert.timeout"));//入库超时时间
/**
* 通知超时时间,3s
*/
public static long INSERT_DETECT_SSL_TIMEOUT = 3000l;
/**
* 释放锁
*/
public static void release(){
DetectInsertTimeOutThread.stop();
insertSem.release();
currentDc = null;
logger.debug("监测入库 release");
}
@Override
public void run() {
Thread.currentThread().setName("DC监测数据入库控制线程");
long start = System.currentTimeMillis();
long now = start;
while(true){
now = System.currentTimeMillis();
if((now - start) > ThreadPoolConfig.DETECT_INSERT_CONTROL_PERIOD*1000){
start = now;
Set<Entry<String, Integer[]>> es = ThreadPoolCommon.DCStatusMap.entrySet();
for(Entry<String, Integer[]> en : es){
String ip = null;
try {
ip = en.getKey();
Integer[] value = en.getValue();
Integer status = value[0];
logger.debug(ip +"状态:"+ status);
if(status == null || status != 1){
logger.error(ip + " 通信异常,不做监测入库通信");
continue;
}
insertSem.acquire();
currentDc = ip;
SSLClient client = new SSLClient(ip, SSLClient.DC_PORT, SSLClient.INSERT_DETECT);
Future<Object> future = ThreadPoolCommon.service.submit(client);
Object r = future.get(INSERT_DETECT_SSL_TIMEOUT, TimeUnit.MILLISECONDS);
Integer result = Integer.valueOf(r+"");
logger.debug(ip+ " 通知下发结果 " + result);
if(result.intValue() > 0){
//启动超时判断线程
ThreadPoolCommon.service.submit(new DetectInsertTimeOutThread(ip, SSLClient.DC_PORT, timeout));
logger.info("入库超时监控线程创建成功");
}else if(result.intValue() ==0){
insertSem.release();//释放锁
logger.debug(ip + "监测数据为 0");
}else if(result.intValue() == -1){
insertSem.release();//释放锁
logger.debug(ip + "通知下发响应异常");
}else if(result.intValue() == -2){//
insertSem.release();//释放锁
logger.debug(ip + " 监测数据上传web");
}else if(result.intValue() == -4){
insertSem.release();//释放锁
logger.debug(ip + " DC 主动入库");
}else{
insertSem.release();//释放锁
logger.debug("未知响应");
}
} catch (Exception e) {
logger.error(ip,e);
}
}
}else{
try {
Thread.sleep(now -start);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 获取当前入库的dc
* @return
*/
public static String getCurrentDc(){
return currentDc;
}
}

View File

@@ -0,0 +1,48 @@
package com.nms.thread;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.socket.SSLClient;
public class DetectInsertTimeOutThread implements Runnable{
private static final Logger logger = Logger.getLogger(DetectDatasTimeoutAlarmThread.class);
private String ip;
private int port;
private long timeout;
private long start = System.currentTimeMillis();
private static boolean stop = false;
public DetectInsertTimeOutThread(String ip ,int port,long timeout) {
this.ip = ip;
this.port = port;
this.timeout = timeout*1000;
stop = false;
}
@Override
public void run() {
Thread.currentThread().setName(ip + "-监测数据入库超时监控线程-"+ System.currentTimeMillis());
logger.debug("线程开始");
try {
long now;
while(!stop){
now = System.currentTimeMillis();
if(now - start > timeout){
SSLClient sc = new SSLClient(ip, port, SSLClient.RELEASE_DETECT);
ThreadPoolCommon.service.submit(sc);
DetectInsertThread.release();
stop = true;
logger.warn("监测数据入库超时释放锁");
}
}
} catch (Exception e) {
logger.error("",e);
DetectInsertThread.release();
}
logger.debug("线程结束");
}
public static void stop() {
stop = true;
}
}

View File

@@ -0,0 +1,128 @@
package com.nms.thread;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.Future;
import nis.nms.util.BaseAction;
import nis.nms.util.ConnectionOracle;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.pool.ThreadPoolConfig;
/**
*
* 邮件发送管理器
* @date Mar 29, 2013 10:37:27 AM
* @author ZhangGang
*
*/
public class MailingManagerThread implements Runnable{
private Logger logger = Logger.getLogger(MailingManagerThread.class);
private boolean emailConfigSuccess = true;
private boolean dbErrorInfoFlag = true;
private boolean dbInitFlag = true;
public void run() {
//将线程运行程序尽可能的catch捕获异常
// Thread.currentThread().setName("邮件发送管理线程");
Thread.currentThread().setName("Mail Send Management Thread");
ConnectionOracle dao = null;
try {
dao = ConnectionOracle.getConnection();
List<String> flaglist = dao.dbSelectSingleColumn("select t.type_state from type_table t where t.type_identity='emailflag'");//type_state:0启用1停用
if(flaglist!=null&&flaglist.size()>0){//未配置emailflag则默认为发送邮件
String flag = flaglist.get(0);
if(!"0".equals(flag)){
logger.info("邮件功能未开启");
return;
}
}
ResourceBundle rb = BaseAction.rb;
String address = rb.getString("email.address");
String userName = rb.getString("email.userName");
String password = rb.getString("email.password");
String host = rb.getString("email.host");
String errorInfo = null;
if(StringUtils.isBlank(address)){
emailConfigSuccess = false;
// errorInfo += "缺少参数 email.address;";
// logger.error("邮件功能已开启但缺少参数 email.address");
errorInfo += "Lack of parameters email.address;";
logger.error("The Email function has been opened,but the parameter email.address is missing");
}
if(StringUtils.isBlank(userName)){
emailConfigSuccess = false;
// errorInfo += "缺少参数 email.userName;";
// logger.error("邮件功能已开启但缺少参数 email.userName");
errorInfo += "Lack of parameters email.userName;";
logger.error("The Email function has been opened,but the parameter email.userName is missing");
}
if(StringUtils.isBlank(password)){
emailConfigSuccess = false;
// errorInfo += "缺少参数 email.password;";
// logger.error("邮件功能已开启但缺少参数 email.password");
errorInfo += "Lack of parameters email.password;";
logger.error("The Email function has been opened,but the parameter email.password is missing");
}
if(StringUtils.isBlank(host)){
emailConfigSuccess = false;
// errorInfo += "缺少参数 email.host;";
// logger.error("邮件功能已开启但缺少参数 email.host");
errorInfo += "Lack of parameters email.host;";
logger.error("The Email function has been opened,but the parameter email.host is missing");
}
if(!emailConfigSuccess){
logger.error("Mail function can not be started");
return;
}
//- 检查线程运行状态 运行中无操作
Future<?> future = ThreadPoolCommon.threadManagerMap.get(ThreadPoolConfig.MAILING_THREAD);
if(future != null && !future.isCancelled() && !future.isDone()){ //运行中
logger.info("邮件发送线程 运行中 不再启动新线程");
}
//- 非升级操作判断
logger.info("邮件发送线程 空闲中 启动新解析线程");
//-- 获取线程执行 需进行主动告警和邮件通知等相关操作,待考虑
future = ThreadPoolCommon.service.submit(new MailingThread());
//注册
ThreadPoolCommon.threadManagerMap.put(ThreadPoolConfig.MAILING_THREAD, future);
logger.debug("执行结束");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(dao != null){
dao.close();
}
} catch (Exception e2) {
}
}
}
public static void main(String [] args) {
Thread thread = new Thread(new MailingManagerThread());
thread.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
}

View File

@@ -0,0 +1,141 @@
package com.nms.thread;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import nis.nms.bean.EmailInfo;
import nis.nms.util.ConnectionOracle;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.service.EmailService;
/**
* CSV解析线程
*
* @author ZGGG3
*
*/
public class MailingThread implements Callable<Object> {
Logger logger = Logger.getLogger(MailingThread.class);
volatile boolean stop = false;// 线程是否被取消标志
private static long startTime;
Long waitTime = 30 * 60 * 1000l;
// 初始化starttime 在重启服务器的时候
static {
ConnectionOracle dao = null;
try {
dao = ConnectionOracle.getConnection();
List<String> emaillist = dao.dbSelectSingleColumn(
"select to_char(create_time,'yyyy-mm-dd hh24:mi:ss') create_time from email_table where send_flag='0' and send_level='1' order by create_time asc");
if (emaillist != null && emaillist.size() > 0) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (emaillist.get(0) != null && !emaillist.get(0).equals("")) {
Date d = sdf.parse(emaillist.get(0));
startTime = d.getTime();
} else {
startTime = System.currentTimeMillis();
}
} else {
startTime = System.currentTimeMillis();
}
System.out.println("====static===,startTime=" + startTime);
System.out.println("====static===,startTime=" + new Date(startTime));
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(dao != null){
dao.close();
}
} catch (Exception e2) {
}
}
}
/*
* 线程操作
*
* 依次解析urlList中仍存在的Files
*
* 实现了依次解析Files由于后期实现多线程解析操作
*/
public Object call() {
// 为当前线程命名 ,用与开发阶段友好输出。
// Thread.currentThread().setName("邮件发送线程");
Thread.currentThread().setName("Mail Sending Thread");
logger.info("**************startTime=" + startTime + "startTimeStr=" + new Date(startTime) + ",waitTime=" + waitTime);
ConnectionOracle dao = null;
try {
dao = ConnectionOracle.getConnection();
boolean sendflag = true;// 发送标识 若非紧急状态下延时时间为-1 则不发送非紧急邮件
List<String> timelist = dao.dbSelectSingleColumn(
"select t.delay_time from option_table t where t.type_identity='emergent' and t.type_code='1'");
if (timelist != null && timelist.size() > 0) {
logger.info("delayTime:" + timelist.get(0));
if (timelist.get(0).equals("-1")) {
sendflag = false;
}
try {
waitTime = Long.parseLong(timelist.get(0)) * 60 * 1000;
} catch (Exception e) {
logger.error("Non emergency time setting error", e);
}
}
EmailService service = new EmailService(dao);
boolean flag = false;// 时间标识 若等待时间大于等于设定周期 则发送非紧急邮件 否则不发送
System.out.println("当前时间:" + new Date() + ",=" + System.currentTimeMillis());
System.out.println("startTime=" + startTime);
System.out.println("waitTime=" + waitTime);
System.out.println("startTimeStr=" + new Date(startTime));
System.out.println("startTimeWaitTimeStr=" + new Date(startTime + waitTime));
logger.info("当前时间:" + new Date() + ",=" + System.currentTimeMillis());
logger.info("startTime=" + startTime);
logger.info("waitTime=" + waitTime);
logger.info("startTimeStr=" + new Date(startTime));
logger.info("startTimeWaitTimeStr=" + new Date(startTime + waitTime));
if (startTime + waitTime < System.currentTimeMillis()) {
flag = true;
startTime = startTime + waitTime;// 每个周期结束后重新设定起始时间,起始时间增加值为设定周期时间
logger.info(new Date(startTime));
}
logger.info("----------" + new Date(startTime) + "延时设置:" + sendflag + ",到时,发送非紧急邮件:" + flag);
// 发送标识和时间标识皆为true时 系统发送非紧急邮件 (注:预防设定非紧急状态下的延时时间为-1时
// 系统按照默认30分钟发送非紧急邮件
List<EmailInfo> eiList = service.getEmailInfoList(sendflag, flag);
// -- 空数据集合 结束操作
if (eiList == null || eiList.size() == 0) {
return null;
}
long sTime = System.currentTimeMillis();
service.sendEmailNew(eiList);
long eTime = System.currentTimeMillis();
logger.debug("本次邮件发送耗时: " + (sTime - eTime) + " ms");
} catch (Exception e) {
logger.error("Running exception", e);
} finally {
try {
if(dao != null){
dao.close();
}
} catch (Exception e2) {
}
logger.debug("线程结束");
}
return null;
}
public static void main(String[] args) {
ThreadPoolCommon.service.submit(new MailingThread());
}
}

View File

@@ -0,0 +1,42 @@
package com.nms.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import nis.nms.util.BaseAction;
import nis.nms.util.ConnectionOracle;
import org.apache.log4j.Logger;
import com.nms.thread.service.NmsReportService;
public class NmsPortThread implements Runnable {
private Logger logger = Logger.getLogger(NmsPortThread.class);
@Override
public void run() {
Date now = new Date();
Long nowLong = now.getTime();
Integer interval = null;
try {
interval = Integer.parseInt(BaseAction.rb.getString("nms.report.interval"));
} catch (Exception e) {
interval = 300;
}
ConnectionOracle connection = null;
try {
connection = ConnectionOracle.getConnection();
NmsReportService service = new NmsReportService(connection);
//ArrayList<Map<String, String>> nmsRuleInfo = service.getNmsPortInfo(nowLong, nowLong-interval);
} catch (Exception e) {
logger.error(e);
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@@ -0,0 +1,43 @@
package com.nms.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import nis.nms.util.BaseAction;
import nis.nms.util.ConnectionOracle;
import nis.nms.util.DateUtil;
import org.apache.log4j.Logger;
import com.nms.thread.service.NmsReportService;
public class NmsRuleThread implements Runnable {
private Logger logger = Logger.getLogger(NmsRuleThread.class);
@Override
public void run() {
Date now = new Date();
Long nowLong = now.getTime();
Integer interval = null;
try {
interval = Integer.parseInt(BaseAction.rb.getString("nms.report.interval"));
} catch (Exception e) {
interval = 300;
}
ConnectionOracle connection = null;
try {
connection = ConnectionOracle.getConnection();
NmsReportService service = new NmsReportService(connection);
//ArrayList<Map<String, String>> nmsRuleInfo = service.getNmsRuleInfo(nowLong, nowLong-interval);
} catch (Exception e) {
logger.error(e);
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@@ -0,0 +1,104 @@
package com.nms.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import com.nis.util.StringUtil;
import com.nms.thread.service.NmsReportService;
import net.sf.json.JSONObject;
import nis.nms.util.BaseAction;
import nis.nms.util.ConnectionOracle;
import nis.nms.util.DateUtil;
import nis.nms.util.HttpClientUtil;
public class NmsStatusThread implements Runnable {
private Logger logger = Logger.getLogger(NmsStatusThread.class);
@Override
public void run() {
String now = DateUtil.format.format(new Date());
List<Map> results = new ArrayList<Map>();
String setId = null;
try {
setId = BaseAction.rb.getString("nms.status.setId");
} catch (Exception e) {
setId = "7";
}
ConnectionOracle connection = null;
try {
connection = ConnectionOracle.getConnection();
NmsReportService service = new NmsReportService(connection);
ArrayList<Map<String, String>> nmsReportInfo = service.getNmsStatusInfo(setId);
Map<String, List<Map<String, String>>> tmp = new HashMap<String, List<Map<String, String>>>();
tmp.put("unknown", new ArrayList<Map<String, String>>());
for (Map<String, String> info : nmsReportInfo) {
if (!StringUtil.isBlank(info.get("system_name"))) {
if (tmp.containsKey(info.get("system_name"))) {
tmp.get(info.get("system_name")).add(info);
} else {
List<Map<String, String>> l = new ArrayList<Map<String, String>>();
l.add(info);
tmp.put(info.get("system_name"), l);
}
} else {
tmp.get("unknown").add(info);
}
}
for (String area : tmp.keySet()) {
List<Map<String, String>> l = tmp.get(area);
if (l.size() > 0) {
Map result = new HashMap();
result.put("area", area);
result.put("commitTime", now);
result.put("total", l.size());
List<Map<String, String>> abnormalList = new ArrayList<Map<String, String>>();
int normal = 0;
for (Map<String, String> m : l) {
if ("1".equals(m.get("detectioned_state"))) {
normal++;
} else {
Map<String, String> abnormal = new HashMap<String, String>();
abnormal.put("hostName", m.get("host_name"));
abnormal.put("ip", m.get("node_ip"));
abnormalList.add(abnormal);
}
}
result.put("normal", normal);
result.put("abnormal", l.size()-normal);
if (abnormalList.size() > 0) {
result.put("abnormalMachineList", abnormalList);
}
results.add(result);
}
}
Map<String, List<Map>> map = new HashMap<String, List<Map>>();
map.put("trafficNmsServerList", results);
HttpClientUtil httpUtil = new HttpClientUtil();
JSONObject fromObject = JSONObject.fromObject(map);
httpUtil.post(BaseAction.rb.getString("nms.status.url"), fromObject.toString());
} catch (Exception e) {
logger.error(e);
} finally {
if (connection != null) {
connection.close();
}
}
}
}

View File

@@ -0,0 +1,79 @@
package com.nms.thread;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.Callable;
import org.apache.log4j.Logger;
import com.nms.thread.pool.ThreadConstants;
public class PingThread implements Callable<Object> {
private Logger logger = Logger.getLogger(DCHandShakeThread.class);
private String ip;
public PingThread(String ip){
this.ip = ip;
}
@Override
public Object call() throws Exception {
String command = ""; // 命令语句
int snum = 0, fnum = 0; // 发包成功和失败数
Process process = null;
BufferedReader in = null; // 读取 Ping命令返回的信息
try {
// 判断系统类型 win or Linux
String system = (String) (System.getProperty("os.name")).toLowerCase();
if (system.toLowerCase().indexOf("win") != -1) {
command += "ping -n 4 " + ip;
} else if (system.toLowerCase().indexOf("linux") != -1) {
command += "ping -c 4 " + ip;
} else {
command += "ping -w 4 " + ip;
}
process = Runtime.getRuntime().exec(command);
in = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
long count = 4 + 10;
// 最多多读10行
while ((line = in.readLine()) != null && count != 0) {
if ("".equals(line)) {
continue;
} // 空串跳过
line = line.toLowerCase();
logger.debug("line:"+line);
if (line.indexOf("ttl") > 0) { // 获得成功响应的数据
count--; // 计数器自减1
snum++; // 成功接收次数加1
} else if (line.split(" ").length < 4) {
count--; // 计数器自减1
fnum++; // 失败接收次数加1
}
}
} catch (Exception e) {
logger.error("",e);
}finally{
if(in!=null){try {
in.close();
} catch (IOException e) {
logger.error("",e);
}}
if(process!= null)process.destroy();
process = null;
}
if(snum>0){
return ThreadConstants.HAND_SHAKE_RESULT_PING_SUCCESS;
}else{
return ThreadConstants.HAND_SHAKE_RESULT_PING_FAILED;
}
}
}

View File

@@ -0,0 +1,44 @@
package com.nms.thread.common;
public class EmailTypeConstants {
// private static final Object[][] type =new Object[][]{
// {10,"监测信息恢复"},
// {11,"监测信息异常"},
// {12,"监测信息超时"},
// {20,"主动告警异常"},
// {21,"主动告警恢复"},
// {31,"节点结果失败"},
// {32,"任务状态变更"},
// {40,"系统运行异常"},
// {41,"系统运行恢复"},
// };
public static final int FLAG_SEND_LATER = 0;
public static final int FLAG_SEND_ALLREADY = 1;
public static final int FLAG_SEND_IMMEDIATELY = 2;
public static final int URGENT_IMMEDIATELY = 0;
public static final int URGENT_LATER = 1;
public static final int TYPE_DETECTION_INFO_RECOVER = 10;
public static final String DESC_DETECTION_INFO_RECOVER = "i18n_EmailTypeConstants.content.DESC_DETECTION_INFO_RECOVER_n81i";
public static final int TYPE_DETECTION_INFO_EXCEPTION = 11;
public static final String DESC_DETECTION_INFO_EXCEPTION = "i18n_EmailTypeConstants.content.DESC_DETECTION_INFO_EXCEPTION_n81i";
public static final int TYPE_DETECTION_INFO_TIMEOUT = 12;
public static final String DESC_DETECTION_INFO_TIMEOUT = "i18n_EmailTypeConstants.content.DESC_DETECTION_INFO_TIMEOUT_n81i";
public static final int TYPE_ALARM_INFO_EXCEPTION = 20;
public static final String DESC_ALARM_INFO_EXCEPTION = "i18n_EmailTypeConstants.content.DESC_ALARM_INFO_EXCEPTION_n81i";
public static final int TYPE_ALARM_INFO_RECOVER = 21;
public static final String DESC_ALARM_INFO_RECOVER = "i18n_EmailTypeConstants.content.DESC_ALARM_INFO_RECOVER_n81i";
public static final int TYPE_TASK_NODE_RESULT_ERROR = 31;
public static final String DESC_TASK_NODE_RESULT_ERROR = "i18n_EmailTypeConstants.content.DESC_TASK_NODE_RESULT_ERROR_n81i";
public static final int TYPE_TASK_STATE_CHANGE = 32;
public static final String DESC_TASK_STATE_CHANGE = "i18n_EmailTypeConstants.content.DESC_TASK_STATE_CHANGE_n81i";
public static final int TYPE_SYSTEM_RUNNING_EXCEPTION = 40;
public static final String DESC_SYSTEM_RUNNING_EXCEPTION = "i18n_EmailTypeConstants.content.DESC_SYSTEM_RUNNING_EXCEPTION_n81i";
public static final int TYPE_SYSTEM_RUNNING_RECOVER = 41;
public static final String DESC_SYSTEM_RUNNING_RECOVER = "i18n_EmailTypeConstants.content.DESC_SYSTEM_RUNNING_RECOVER_n81i";
}

View File

@@ -0,0 +1,47 @@
package com.nms.thread.common;
import java.util.ListResourceBundle;
public class TaskResources extends ListResourceBundle{
static final Object[][] contents = new String[][]{
// { "ms_1", "已创建" },
// { "ms_2", "进行中" },
// { "ms_3", "已完成" },
// { "ms_30", "全部成功" },
// { "ms_31", "全部失败" },
// { "ms_32", "部分成功" },
// { "ms_4", "未能执行" },
// { "ms_5", "撤销准备" },
// { "ms_6", "撤销开始" },
// { "ms_7", "撤销完成" },
// { "mt_1", "推送文件" },
// { "mt_4", "命令执行" },
// { "mt_6", "升级部署" },
// { "loop_0", "非周期任务" },
// { "loop_1", "周期任务" },
// { "ec_1", "任务开始下发" },
// { "ec_4", "任务开始下发" },
// { "ec_6", "任务开始下发"}};
{ "ms_1", "i18n_TaskResources.contents.ms_1_n81i" },
{ "ms_2", "i18n_TaskResources.contents.ms_2_n81i" },
{ "ms_3", "i18n_TaskResources.contents.ms_3_n81i" },
{ "ms_30", "i18n_TaskResources.contents.ms_30_n81i" },
{ "ms_31", "i18n_TaskResources.contents.ms_31_n81i" },
{ "ms_32", "i18n_TaskResources.contents.ms_32_n81i" },
{ "ms_4", "i18n_TaskResources.contents.ms_4_n81i" },
{ "ms_5", "i18n_TaskResources.contents.ms_5_n81i" },
{ "ms_6", "i18n_TaskResources.contents.ms_6_n81i" },
{ "ms_7", "i18n_TaskResources.contents.ms_7_n81i" },
{ "mt_1", "i18n_TaskResources.contents.mt_1_n81i" },
{ "mt_4", "i18n_TaskResources.contents.mt_4_n81i" },
{ "mt_6", "i18n_TaskResources.contents.mt_6_n81i" },
{ "loop_0", "i18n_TaskResources.contents.loop_0_n81i" },
{ "loop_1", "i18n_TaskResources.contents.loop_1_n81i" },
{ "ec_1", "i18n_TaskResources.contents.ec_1_n81i" },
{ "ec_4", "i18n_TaskResources.contents.ec_4_n81i" },
{ "ec_6", "i18n_TaskResources.contents.ec_6_n81i"}};
public Object[][] getContents() {
return contents;
}
}

View File

@@ -0,0 +1,191 @@
package com.nms.thread.common;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import nis.nms.domains.NmsErrorInfo;
import org.apache.log4j.Logger;
import com.nms.thread.pool.ThreadPoolConfig;
public class ThreadPoolCommon {
private static Logger logger = Logger.getLogger(ThreadPoolCommon.class);
public static ExecutorService service = Executors.newFixedThreadPool(ThreadPoolConfig.EXECUTOR_SOCKET_THREAD_SIZE); //非周期执行线程池
public static ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(ThreadPoolConfig.EXECUTOR_SCHEDULED_THREAD_SIZE); //定时周期执行线程池
public static Map<String, Future<?>> threadManagerMap = new HashMap<String, Future<?>>();
public static Map<String,Integer[]> DCStatusMap = new HashMap<String,Integer[]>(); //
// Map<dcip,int[]{state,nodeNum}> stateDC状态值1 通讯正常、0通讯失败、-1Ping失败、2通讯恢复[从非1状态恢复] NodeNum管理节点数量 0表示未分配节点其他表示已分配节点
private static final Semaphore handShakeSemaphore = new Semaphore(5,true); //主动监测线程最大并发数
// public static List<NmsErrorInfo> nelist = new LinkedList<NmsErrorInfo>();
private static boolean alarmDataFlag = true;
private static final LinkedList<NmsErrorInfo> nmsErrorList1 = new LinkedList<NmsErrorInfo>();
private static final LinkedList<NmsErrorInfo> nmsErrorList2 = new LinkedList<NmsErrorInfo>();
private static final Byte[] ALARM_DATA_LOCK = new Byte[0]; //告警数据锁
/**
* 改变存放监测数据的数据集合
* @time Mar 7, 2012-2:15:38 PM
*/
public static void chengeNmsErrorFlag() {
alarmDataFlag = alarmDataFlag ? false : true;
logger.info("监测数据缓存集合变更为集合"+(alarmDataFlag?"1":"2"));
}
/**
* 获得 非存放状态的数据集合
* @time Mar 7, 2012-3:18:11 PM
* @return
*/
public static LinkedList<NmsErrorInfo> getNmsErrorList() {
if(!alarmDataFlag){logger.info("取到集合1的缓存告警数据 共"+nmsErrorList1.size());
return nmsErrorList1;}
else{logger.info("取到集合2的缓存告警数据 共"+nmsErrorList2.size());
return nmsErrorList2;}
}
/**
* 将监测数据存入存放状态的数据集合中
* @time Mar 7, 2012-3:18:58 PM
* @param dsb
*/
public static void addNmsError(NmsErrorInfo nei) {
synchronized (ALARM_DATA_LOCK) {
if(nei==null){
logger.debug("告警数据 字节长度0 无需添加到监测数据集合");
return;
}
if(alarmDataFlag){logger.debug("告警数据 添加到 集合1");
nmsErrorList1.add(nei);}
else{logger.debug("告警数据 添加到 集合2");
nmsErrorList2.add(nei); }
}
}
/**
* 将监测数据存入存放状态的数据集合中
* @time Mar 7, 2012-3:18:58 PM
* @param seqId
* @param dsb
*/
public static void addAllAlarmDataList(List<NmsErrorInfo> neiList) {
synchronized (ALARM_DATA_LOCK) {
if(neiList==null || neiList.size()==0l){
logger.debug("告警数据 个数0 无需添加到监测数据集合");
return;
}
if(alarmDataFlag){logger.debug("告警数据 添加到 集合1");
nmsErrorList1.addAll(neiList);}
else{logger.debug("告警数据 添加到 集合2");
nmsErrorList2.addAll(neiList);}
}
}
/**
* 清空非存放状态的数据集合
* @time Mar 7, 2012-3:19:30 PM
* @param seqId
* @param dsb
*/
public static void clearAlarmDataList() {
synchronized (ALARM_DATA_LOCK) {
getNmsErrorList().clear();
logger.info("清空该集合");
}
}
/**
* 获取握手信号
* @time Nov 23, 2011-3:49:38 PM
*/
public static void acquireHandShakeSemaphore() {
// synchronized (changeSemaphore) {
try {
handShakeSemaphore.acquire();
logger.debug("变更锁 已申请 剩余可用许可:> "+handShakeSemaphore.availablePermits());
} catch (Exception e) {
logger.warn("Changed lock thread failed to apply for ,and it has been interrupted"+Thread.currentThread().interrupted(),e);
}
// }
}
/**
* 释放握手信号
* @time Nov 25, 2011-1:24:08 PM
*/
public static void releaseHandShakeSemaphore() {
// synchronized (changeSemaphore) {
handShakeSemaphore.release();
logger.debug("变更锁 已释放 当前可用许可:> "+handShakeSemaphore.availablePermits());
// }
}
/**
* 执行握手线程
* 引入 申请信号,和释放信号 操作
* @time Sep 12, 2012-4:28:45 PM
* @param runnable
*/
public static void runHandShakeRunnable(final Object runnable){
try {
//-- 针对节点 进行文件推送
acquireHandShakeSemaphore(); //申请监测信号
//-- 启动新线程 推送文件和任务信息
final Future<?> future;
if(runnable instanceof Runnable){
future = service.submit((Runnable)runnable);
}else if(runnable instanceof Callable){
future = service.submit((Callable)runnable);
}else {
future = null;
}
final String threadName = Thread.currentThread().getName();
service.submit(new Runnable() {
public void run() {
Thread.currentThread().setName(threadName);
try {
future.get();
} catch (Exception e) {
logger.error("",e);
}finally{
releaseHandShakeSemaphore();
}
}
});
} catch (Exception e) {
logger.error("",e);
releaseHandShakeSemaphore();
}finally{
}
}
static{
// System.out.println("公共信息类加载");
}
}

View File

@@ -0,0 +1,9 @@
package com.nms.thread.pool;
public class ThreadConstants {
public static final Integer HAND_SHAKE_RESULT_SOCKET_SUCCESS = 1;
public static final Integer HAND_SHAKE_RESULT_SOCKET_FAILED = null;
public static final Integer HAND_SHAKE_RESULT_PING_SUCCESS = 2;
public static final Integer HAND_SHAKE_RESULT_PING_FAILED = 3;
public static final Integer HAND_SHAKE_RESULT_UNKNOWN_FAILED = 4;
}

View File

@@ -0,0 +1,30 @@
package com.nms.thread.pool;
import org.apache.commons.lang.StringUtils;
import nis.nms.util.BaseAction;
public class ThreadPoolConfig {
public static Integer EXECUTOR_SOCKET_THREAD_SIZE = StringUtils.isNotBlank(BaseAction.rb.getString("executor_socket_thread_size"))? Integer.parseInt(BaseAction.rb.getString("executor_socket_thread_size")): 10 ;// 线程池
public static Integer EXECUTOR_SCHEDULED_THREAD_SIZE = StringUtils.isNotBlank(BaseAction.rb.getString("executor_scheduled_thread_size"))? Integer.parseInt(BaseAction.rb.getString("executor_scheduled_thread_size")): 5 ;// 线程池
public static final String DC_HANDSHAKE_MANAGER = "DC_HANDSHAKE_MANAGER";
public static final String DC_HANDSHAKE = "DC_HANDSHAKE";
public static final Integer DC_HANDSHAKE_START = StringUtils.isNotBlank(BaseAction.rb.getString("dc.handshake.start"))? Integer.parseInt(BaseAction.rb.getString("dc.handshake.start")):30;
public static final Integer DC_HANDSHAKE_PERIOD = StringUtils.isNotBlank(BaseAction.rb.getString("dc.handshake.period"))? Integer.parseInt(BaseAction.rb.getString("dc.handshake.period")):60*5;
public static final String DETEC_TIMEOUT_CHECK_MANAGER = "DETEC_TIMEOUT_CHECK_MANAGER";
public static final Integer DETEC_TIMEOUT_ALARM_PERIOD_TIMES =2;
public static final Integer DETEC_TIMEOUT_CHECK_PERIOD = StringUtils.isNotBlank(BaseAction.rb.getString("detec.timeout.check.period"))? Integer.parseInt(BaseAction.rb.getString("detec.timeout.check.period")): 15*60;
public static final Integer FLAG_DETEC_TIMEOUT = StringUtils.isNotBlank(BaseAction.rb.getString("flag_detec_timeout"))? Integer.parseInt(BaseAction.rb.getString("flag_detec_timeout")): 1;
public static final String MAILING_MANAGER = "errorInfoResoveManager"; //错误信息解析入库管理线程 标识, futureMap 中的Key值
public static final String MAILING_THREAD = "errorInfoResove"; //错误信息解析入库操作线程 标识, futureMap 中的Key值
public static final Integer MAILING_PERIOD = StringUtils.isNotBlank(BaseAction.rb.getString("mailing_period"))? Integer.parseInt(BaseAction.rb.getString("mailing_period")): 2*60; //邮件发送周期
/**
* 是否启用 监测入库控制线程
*/
public static final Integer FLAG_DETECT_INSERT_CONTROL = StringUtils.isNotBlank(BaseAction.rb.getString("flag.detect.insert.control"))? Integer.parseInt(BaseAction.rb.getString("flag.detect.insert.control")): 0;
/**
* 监测入库 控制轮询间隔
*/
public static final Integer DETECT_INSERT_CONTROL_PERIOD = StringUtils.isNotBlank(BaseAction.rb.getString("detect.insert.control.period"))? Integer.parseInt(BaseAction.rb.getString("detect.insert.control.period")): 60;
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,87 @@
package com.nms.thread.service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import nis.nms.util.ConnectionOracle;
import org.apache.log4j.Logger;
public class NmsReportService {
private final Logger logger = Logger.getLogger(NmsReportService.class);
private ConnectionOracle dao = null;
public NmsReportService(ConnectionOracle dao){
this.dao = dao;
}
public ArrayList<Map<String, String>> getNmsStatusInfo(String setId) {
//detectioned_state=1时正常
String sql = "select nt.node_ip, ds.host_name, din.detectioned_state, st.system_name "
+ "from node_table nt "
+ "left join detection_info_new din on din.SEQ_ID=nt.seq_id "
+ "left join di_systeminfo ds on nt.SEQ_ID=ds.SEQ_ID "
+ "left join system_table st ON nt.system_id=st.system_id "
+ "where nt.node_state=0 AND din.DETECTION_SET_INFO_ID=" + setId + " "
+ "group by nt.node_ip";
ArrayList<String> fields = new ArrayList<String>();
fields.add("node_ip");
fields.add("host_name");
fields.add("detectioned_state");
fields.add("system_name");
try {
ArrayList<Map<String, String>> dbSelect = dao.dbSelect(sql, fields);
return dbSelect;
} catch (Exception e) {
logger.error(e);
return null;
}
}
public ArrayList<Map<String, String>> getNmsRuleInfo(Long end, Long start) {
String sql = "SELECT nt.node_ip, dr.ServiceIndex, dr.ServiceCode, dr.ServiceDesc, dr.agedTime, dr.ClientNum, dr.RefluxPort, dr.RuleNumber, dr.usedRuleNum, dr.leftRuleNum, dr.HitTotalNum, dr.DETECTIONED_STATE "
+ "FROM di_rule dr "
+ "LEFT JOIN node_table nt ON nt.seq_id=dr.seq_id "
+ "WHERE nt.node_state=0 AND dr.data_check_time_digital<" + end + " AND dr.data_check_time_digital>=" + start;
ArrayList<String> fields = new ArrayList<String>();
fields.add("node_ip");
fields.add("ServiceIndex");
fields.add("ServiceCode");
fields.add("ServiceDesc");
fields.add("agedTime");
fields.add("ClientNum");
fields.add("RefluxPort");
fields.add("RuleNumber");
fields.add("usedRuleNum");
fields.add("leftRuleNum");
fields.add("HitTotalNum");
fields.add("DETECTIONED_STATE");
try {
ArrayList<Map<String, String>> dbSelect = dao.dbSelect(sql, fields);
return dbSelect;
} catch (Exception e) {
logger.error(e);
return null;
}
}
public ArrayList<Map<String, String>> getNmsPortInfo(Long end, Long start) {
String sql = "SELECT nt.node_ip, ds.* "
+ "FROM di_switchport ds "
+ "LEFT JOIN node_table nt ON nt.seq_id=ds.seq_id"
+ "WHERE nt.node_state=0 AND ds.data_check_time_digital<" + end + " AND ds.data_check_time_digital>=" + start;
ArrayList<String> fields = new ArrayList<String>();
fields.add("node_ip");
try {
ArrayList<Map<String, String>> dbSelect = dao.dbSelect(sql, fields);
return dbSelect;
} catch (Exception e) {
logger.error(e);
return null;
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
package com.nms.thread.socket;
import nis.nms.util.BaseAction;
import org.apache.log4j.Logger;
import com.nms.thread.common.ThreadPoolCommon;
import com.nms.thread.pool.ThreadConstants;
import com.nms.thread.utils.ssl.SSLSocketCallable;
/**
* 安全通讯的客户端
*/
public class SSLClient extends SSLSocketCallable {
private Logger logger = Logger.getLogger(SSLClient.class);
private String cmd = null;
private String content = null;
public static final String HAND_SHAKE_DC = "char:handshake";
public static final String UPDATE_CONFIG_DC = "char:updateConfig";
public static final String INSERT_DETECT = "char:insertDetect";
public static final String RELEASE_DETECT = "char:releaseDetect";
//dc server 端口
public static final Integer DC_PORT = Integer.parseInt(BaseAction.rb.getString("common.single.socket.port"));
public SSLClient(String ip,String cmd,String content){
super(ip,Integer.parseInt(BaseAction.rb.getString("common.single.socket.port")));
this.cmd = cmd;
this.content = content;
}
public SSLClient(String ip,int port,String cmd){
super(ip, port);
this.cmd = cmd;
}
public SSLClient(String ip, int port)throws Exception {
super(ip, port);
}
@Override
protected Object toDo() throws Exception {
// Thread.currentThread().setName("通信线程 TO:>"+ip);
Thread.currentThread().setName("Communication Thread TO:>"+ip);
logger.debug("通讯命令:>"+cmd);
if(HAND_SHAKE_DC.equals(cmd)){
/*通讯内容 */
sendMessage(cmd);
String str = receiveMessage();
return ThreadConstants.HAND_SHAKE_RESULT_SOCKET_SUCCESS;
}else if(UPDATE_CONFIG_DC.equals(cmd)){
sendMessage(cmd);
receiveMessage();
sendMessage(content);
receiveMessage();
return 1;
}else if(INSERT_DETECT.equalsIgnoreCase(cmd)){
/**
* 通知dc 入库 监测数据
*/
sendMessage(cmd);
String msg = receiveMessage();//当前缓存 监测内容 条数
Integer size = Integer.valueOf(msg);//-1:dc升级中0 :没有监测数据,>0监测数据条数-2监测数据上传web -4 dc主动入库
return size;
}else if(RELEASE_DETECT.equalsIgnoreCase(cmd)){
/**
* 通知 dc 取消 入库监测数据
*/
sendMessage(cmd);
String msg = receiveMessage();
logger.debug(RELEASE_DETECT + " -> " + msg);
}
return null;
}
public static void main(String [] args){
SSLClient client = new SSLClient("10.0.6.113",HAND_SHAKE_DC,null);
ThreadPoolCommon.service.submit(client);
}
}

View File

@@ -0,0 +1,154 @@
package com.nms.thread.socket;
import java.io.File;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import nis.nms.util.BaseAction;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FalseFileFilter;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.apache.commons.lang.StringUtils;
import com.nms.thread.DetectInsertThread;
import com.nms.thread.utils.ssl.SSLSocketRunnable;
import com.socket.utils.FileComment;
public class SSLServer extends SSLSocketRunnable{
private String uploadPath ;
private String snmpDir ;
public static final String TYPE_MISSION = "1";
public static final String TYPE_SNMP_CLASS = "2";
public static final String TYPE_SNMP_MIB = "3";
public SSLServer(Socket client,String uploadPath,String snmpDir) {
super(client);
this.uploadPath = uploadPath;
this.snmpDir = snmpDir;
}
@SuppressWarnings("unchecked")
@Override
protected void toDo() throws Exception {
String cmd = this.receiveMessage();
System.out.println("cmd "+cmd);
String uploadPath = this.uploadPath;
//- 断点下载MissionFiles
// if("byte:bpDownloadFile".equals(cmd)){
if(StringUtils.isNotEmpty(cmd) && cmd.startsWith("byte:bpDownloadFile")){
this.sendMessage(SUCCESS);
String[] cmds = cmd.split(":");
if(cmds.length>2){
if(cmds[2].equals(TYPE_MISSION)){ //download MissionFile
uploadPath = this.uploadPath;
}else if(cmds[2].equals(TYPE_SNMP_CLASS)){ //download snmp jarFile
uploadPath = this.snmpDir;
}else if(cmds[2].equals(TYPE_SNMP_MIB)){ //download snmp mibFile
uploadPath = this.snmpDir;
}
}
List<FileComment> fileList = (List<FileComment>) this.receiveObject();
for(FileComment fileInfo : fileList){
fileInfo.setFileName(uploadPath+ fileInfo.getFileName());
pl("DownloadPath "+fileInfo.getFileName());
}
bpSendFileByBathMD5(fileList);
this.receiveMessage();
}
//- 断点上传
else if("byte:bpUploadFiles".equals(cmd)){
this.sendMessage(SUCCESS);
bpReceiveFileByBath(uploadPath);
this.sendMessage(SUCCESS);
}
//- server握手请求
else if("char:getLocalIp".equals(cmd)){
this.sendMessage(socket.getInetAddress().getHostAddress());
pl(socket.getInetAddress().getHostAddress());
this.receiveMessage();
}
/**
* server请求下载第三方监测脚本
* 根据脚本文件名下发单个脚本
*/
else if("char:downloadPluginScript".equalsIgnoreCase(cmd)) {
this.sendMessage(SUCCESS);
String prefixNames = this.receiveMessage();
String uploadFilePath = new String(BaseAction.rb.getString("uploadServerPath"));
File pluginScriptDir = new File(uploadFilePath, "pluginDetecScript");
Collection<?> files = FileUtils.listFiles(pluginScriptDir,
new PrefixFileFilter(prefixNames.split(",")), FalseFileFilter.FALSE);
if(files.isEmpty()) {
this.sendMessage(FAIL);
} else {
this.sendMessage(SUCCESS);
this.receiveMessage();
List<File> fileList = new ArrayList<File>();
fileList.addAll((Collection<? extends File>) files);
this.bpSendFileByBath(fileList, pluginScriptDir.getCanonicalPath());
}
}
/**
* dc 释放 监测数据入库锁
*/
else if("char:releaseDetect".equalsIgnoreCase(cmd)){
this.sendMessage(SUCCESS);
String ip = socket.getInetAddress().getHostAddress();
String cip = DetectInsertThread.getCurrentDc();
if(cip != null && ip.equalsIgnoreCase(cip)){
DetectInsertThread.release();
logger.info(ip + " char:releaseDetect 监测数据入库完成");
}else{
logger.debug("监测数据入库锁不一致currentDc: "+ cip + ",requestIp : " + ip);
}
}
}
/*
private boolean bpUpLoadFiles(){
boolean flag = true;
//- 获取上传文件参数 fileNames 字符串数组 Json信息
String params = this.receiveMessage();
System.out.println(""+params);
//- JSON解析 并循环
List<String> upFName = new LinkedList<String>();
JSONArray fileNames = JSONArray.fromObject(params);
if(fileNames!= null && fileNames.size()>0 ){
for(int i = 0; i < fileNames.size() ; i++){
String fileName = (String)fileNames.get(i);
File file = new File(uploadPath+fileName);
if(!file.exists()){
upFName.add(fileName);
}
}
}
this.sendMessage(JSONArray.fromObject(upFName).toString());
String msg = this.receiveMessage();
try {
while (SUCCESS.equals(msg)) {
this.sendMessage(SUCCESS);
String fn = this.receiveMessage();
this.sendMessage(SUCCESS);
pl("" + fn + "文件接收开始");
logger.debug("" + fn + "文件接收开始");
bpReceiveFile(uploadPath+fn);
this.sendMessage(SUCCESS);
msg = this.receiveMessage();
}
}catch (Exception e) {
logger.error(ExceptionPrintUtils.printExceptionStack(e));
}
return flag;
}*/
}

View File

@@ -0,0 +1,106 @@
package com.nms.thread.socket;
import java.io.File;
import java.util.List;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import com.nms.thread.utils.ssl.SocketUtils;
import nis.nms.util.BaseAction;
import nis.nms.util.ExceptionPrintUtils;
/**
* 安全通讯的客户端
*/
public class SocketClientServeice extends SocketUtils {
public SocketClientServeice(String ip)throws Exception {
super(ip,Integer.parseInt(BaseAction.rb.getString("common.single.socket.port")));
}
public SocketClientServeice(String ip, int port)throws Exception {
super(ip, port);
}
/**
* 创建通讯
*
* @time Feb 29, 2012-5:39:01 PM
*/
private void init() throws Exception {
logger.debug("目标通讯:>" + ip + " 创建开始" );
try {
// -- create SocketFactory
SSLSocketFactory ssf = sSLContext.getSocketFactory();
// -- create socket
socket = (SSLSocket) ssf.createSocket(ip, port);
this.in = socket.getInputStream();
this.out = socket.getOutputStream();
logger.debug("create socket success.");
//2014-1-23 hyx 如果建立socket成功但是startHandshake握手失败且未设置超时时间时则会一直阻塞
socket.setSoTimeout(1000*1000); //1000秒
// -- handshake 握手
((SSLSocket) socket).startHandshake();
logger.debug("handshake success.");
} catch (Exception e) {
logger.warn("Target communication:>" + ip + " create failure" + ExceptionPrintUtils.printExceptionStack(e));
throw e;
}
}
public void sendInfoToServer(String cmd,String str) throws Exception{
try {logger.debug("sendInfoToServer begin"+str );
init();
sendMessage(cmd);
receiveMessage();
sendMessage(str);
receiveMessage();
logger.debug("sendInfoToServer end"+str );
// } catch (Exception e) {
// logger.debug("sendInfoToServer 异常:"+str );
// throw e;
} finally {
close();
}
}
public String sendInfoToServer2(String cmd,String str) throws Exception{
try {logger.debug("sendInfoToServer begin"+str );
init();
sendMessage(cmd);
receiveMessage();
sendMessage(str);
String result = receiveMessage();
sendMessage(SUCCESS);
logger.debug("sendInfoToServer end"+str );
return result;
// } catch (Exception e) {
// logger.debug("sendInfoToServer 异常:"+str );
// throw e;
} finally {
close();
}
}
public String sendFilesToServer(String cmd, List<File> files) throws Exception {
try {
logger.debug("sendFilesToServer begin");
init();
sendMessage(cmd);
receiveMessage();
this.bpSendFileByBath(files, files.get(0).getParent());
String result = receiveMessage();
sendMessage(SUCCESS);
logger.debug("sendFilesToServer end");
return result;
} finally {
close();
}
}
}

View File

@@ -0,0 +1,330 @@
package com.nms.thread.utils;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class SQLExecuteTimeoutException extends SQLException implements Iterable<Throwable>{
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>reason</code>, <code>SQLState</code> and
* <code>vendorCode</code>.
*
* The <code>cause</code> is not initialized, and may subsequently be
* initialized by a call to the
* {@link Throwable#initCause(java.lang.Throwable)} method.
* <p>
* @param reason a description of the exception
* @param SQLState an XOPEN or SQL:2003 code identifying the exception
* @param vendorCode a database vendor-specific exception code
*/
public SQLExecuteTimeoutException(String reason, String SQLState, int vendorCode) {
super(reason);
this.SQLState = SQLState;
this.vendorCode = vendorCode;
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
DriverManager.println("SQLState(" + SQLState +
") vendor code(" + vendorCode + ")");
printStackTrace(DriverManager.getLogWriter());
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>reason</code> and <code>SQLState</code>.
*
* The <code>cause</code> is not initialized, and may subsequently be
* initialized by a call to the
* {@link Throwable#initCause(java.lang.Throwable)} method. The vendor code
* is initialized to 0.
* <p>
* @param reason a description of the exception
* @param SQLState an XOPEN or SQL:2003 code identifying the exception
*/
public SQLExecuteTimeoutException(String reason, String SQLState) {
super(reason);
this.SQLState = SQLState;
this.vendorCode = 0;
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
printStackTrace(DriverManager.getLogWriter());
DriverManager.println("SQLExecuteTimeoutExeception: SQLState(" + SQLState + ")");
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>reason</code>. The <code>SQLState</code> is initialized to
* <code>null</code> and the vender code is initialized to 0.
*
* The <code>cause</code> is not initialized, and may subsequently be
* initialized by a call to the
* {@link Throwable#initCause(java.lang.Throwable)} method.
* <p>
* @param reason a description of the exception
*/
public SQLExecuteTimeoutException(String reason) {
super(reason);
this.SQLState = null;
this.vendorCode = 0;
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
printStackTrace(DriverManager.getLogWriter());
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object.
* The <code>reason</code>, <code>SQLState</code> are initialized
* to <code>null</code> and the vendor code is initialized to 0.
*
* The <code>cause</code> is not initialized, and may subsequently be
* initialized by a call to the
* {@link Throwable#initCause(java.lang.Throwable)} method.
* <p>
*/
public SQLExecuteTimeoutException() {
super();
this.SQLState = null;
this.vendorCode = 0;
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
printStackTrace(DriverManager.getLogWriter());
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>cause</code>.
* The <code>SQLState</code> is initialized
* to <code>null</code> and the vendor code is initialized to 0.
* The <code>reason</code> is initialized to <code>null</code> if
* <code>cause==null</code> or to <code>cause.toString()</code> if
* <code>cause!=null</code>.
* <p>
* @param cause the underlying reason for this <code>SQLExecuteTimeoutExeception</code>
* (which is saved for later retrieval by the <code>getCause()</code> method);
* may be null indicating the cause is non-existent or unknown.
* @since 1.6
*/
public SQLExecuteTimeoutException(Throwable cause) {
super(cause);
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
printStackTrace(DriverManager.getLogWriter());
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>reason</code> and <code>cause</code>.
* The <code>SQLState</code> is initialized to <code>null</code>
* and the vendor code is initialized to 0.
* <p>
* @param reason a description of the exception.
* @param cause the underlying reason for this <code>SQLExecuteTimeoutExeception</code>
* (which is saved for later retrieval by the <code>getCause()</code> method);
* may be null indicating the cause is non-existent or unknown.
* @since 1.6
*/
public SQLExecuteTimeoutException(String reason, Throwable cause) {
super(reason,cause);
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
printStackTrace(DriverManager.getLogWriter());
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>reason</code>, <code>SQLState</code> and <code>cause</code>.
* The vendor code is initialized to 0.
* <p>
* @param reason a description of the exception.
* @param sqlState an XOPEN or SQL:2003 code identifying the exception
* @param cause the underlying reason for this <code>SQLExecuteTimeoutExeception</code>
* (which is saved for later retrieval by the
* <code>getCause()</code> method); may be null indicating
* the cause is non-existent or unknown.
* @since 1.6
*/
public SQLExecuteTimeoutException(String reason, String sqlState, Throwable cause) {
super(reason,cause);
this.SQLState = sqlState;
this.vendorCode = 0;
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
printStackTrace(DriverManager.getLogWriter());
DriverManager.println("SQLState(" + SQLState + ")");
}
// }
}
/**
* Constructs a <code>SQLExecuteTimeoutExeception</code> object with a given
* <code>reason</code>, <code>SQLState</code>, <code>vendorCode</code>
* and <code>cause</code>.
* <p>
* @param reason a description of the exception
* @param sqlState an XOPEN or SQL:2003 code identifying the exception
* @param vendorCode a database vendor-specific exception code
* @param cause the underlying reason for this <code>SQLExecuteTimeoutExeception</code>
* (which is saved for later retrieval by the <code>getCause()</code> method);
* may be null indicating the cause is non-existent or unknown.
* @since 1.6
*/
public SQLExecuteTimeoutException(String reason, String sqlState, int vendorCode, Throwable cause) {
super(reason,cause);
this.SQLState = sqlState;
this.vendorCode = vendorCode;
// if (!(this instanceof SQLWarning)) {
if (DriverManager.getLogWriter() != null) {
DriverManager.println("SQLState(" + SQLState +
") vendor code(" + vendorCode + ")");
printStackTrace(DriverManager.getLogWriter());
}
// }
}
/**
* Retrieves the SQLState for this <code>SQLExecuteTimeoutExeception</code> object.
*
* @return the SQLState value
*/
public String getSQLState() {
return (SQLState);
}
/**
* Retrieves the vendor-specific exception code
* for this <code>SQLExecuteTimeoutExeception</code> object.
*
* @return the vendor's error code
*/
public int getErrorCode() {
return (vendorCode);
}
/**
* Retrieves the exception chained to this
* <code>SQLExecuteTimeoutExeception</code> object by setNextException(SQLExecuteTimeoutExeception ex).
*
* @return the next <code>SQLExecuteTimeoutExeception</code> object in the chain;
* <code>null</code> if there are none
* @see #setNextException
*/
public SQLExecuteTimeoutException getNextException() {
return (next);
}
/**
* Adds an <code>SQLExecuteTimeoutExeception</code> object to the end of the chain.
*
* @param ex the new exception that will be added to the end of
* the <code>SQLExecuteTimeoutExeception</code> chain
* @see #getNextException
*/
public void setNextException(SQLExecuteTimeoutException ex) {
SQLExecuteTimeoutException current = this;
for(;;) {
SQLExecuteTimeoutException next=current.next;
if (next != null) {
current = next;
continue;
}
if (nextUpdater.compareAndSet(current,null,ex)) {
return;
}
current=current.next;
}
}
/**
* Returns an iterator over the chained SQLExecuteTimeoutExeceptions. The iterator will
* be used to iterate over each SQLExecuteTimeoutExeception and its underlying cause
* (if any).
*
* @return an iterator over the chained SQLExecuteTimeoutExeceptions and causes in the proper
* order
*
* @since 1.6
*/
public Iterator<Throwable> iterator() {
return new Iterator<Throwable>() {
SQLExecuteTimeoutException firstException = SQLExecuteTimeoutException.this;
SQLExecuteTimeoutException nextException = firstException.getNextException();
Throwable cause = firstException.getCause();
public boolean hasNext() {
if(firstException != null || nextException != null || cause != null)
return true;
return false;
}
public Throwable next() {
Throwable throwable = null;
if(firstException != null){
throwable = firstException;
firstException = null;
}
else if(cause != null){
throwable = cause;
cause = cause.getCause();
}
else if(nextException != null){
throwable = nextException;
cause = nextException.getCause();
nextException = nextException.getNextException();
}
else
throw new NoSuchElementException();
return throwable;
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
/**
* @serial
*/
private String SQLState;
/**
* @serial
*/
private int vendorCode;
/**
* @serial
*/
private volatile SQLExecuteTimeoutException next;
private static final AtomicReferenceFieldUpdater<SQLExecuteTimeoutException,SQLExecuteTimeoutException> nextUpdater =
AtomicReferenceFieldUpdater.newUpdater(SQLExecuteTimeoutException.class,SQLExecuteTimeoutException.class,"next");
private static final long serialVersionUID = 2135244094396331484L;
}

View File

@@ -0,0 +1,112 @@
package com.nms.thread.utils.ssl;
import java.net.Socket;
import java.util.concurrent.Callable;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import nis.nms.util.ExceptionPrintUtils;
import org.apache.commons.lang.StringUtils;
/**
* SSL 通讯 工具类
* @date Feb 29, 2012 10:05:50 AM
* @author ZhangGang
*
*/
public abstract class SSLSocketCallable extends SocketUtils implements Callable<Object>{
/**
* 通讯创建
* @param ip 目标主机IP
* @param port 目标主机端口
* @throws Exception
*/
public SSLSocketCallable(String ip,Integer port){
super(ip, port);
}
/**
* 通讯创建
* @param client 目标通讯实例
*/
public SSLSocketCallable(Socket client) {
super(client);
}
/**
* 通讯线程执行方法 默认格式
* @return
* @throws Exception
*/
public Object call(){
Object obj = null; //返回对象
//- 校验 是否创建新通讯连接
if(socket==null && (StringUtils.isNotEmpty(ip) && port != null)){
try {
//-- create SocketFactory
SSLSocketFactory ssf = sSLContext.getSocketFactory();
//-- create socket
socket=(SSLSocket)ssf.createSocket(ip,port);
logger.debug("create socket success.");
//2014-1-23 hyx 如果建立socket成功但是startHandshake握手失败且未设置超时时间时则会一直阻塞
socket.setSoTimeout(1000*1000); //1000秒
//-- handshake 握手
((SSLSocket) socket).startHandshake();
logger.debug("handshake success.");
} catch (Exception e) {
logger.warn("Target communication:>"+ip+" create failure "+e.getMessage());
close();
return obj;
}
}
//- socket 不为空 执行通讯操作
if(socket!=null){
try {
//-- 获取通讯IO流
out = socket.getOutputStream();
in = socket.getInputStream();
socket.setSoTimeout(1000*1000); //1000秒
//-- 自定义通讯操作
obj = toDo();
}catch (Exception e) {
logger.error("Communicating Exception "+e.getMessage());
} finally {
logger.info("--- 通信关闭 ---");
close();
}
}
return obj;
}
/**
* 待实现的通信操作
* @time Aug 28, 2011-9:04:46 PM
* @param out
* @param in
* @throws Exception
*/
protected abstract Object toDo()throws Exception;
}

View File

@@ -0,0 +1,107 @@
package com.nms.thread.utils.ssl;
import java.net.Socket;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import nis.nms.util.ExceptionPrintUtils;
import org.apache.commons.lang.StringUtils;
/**
* SSL 通讯 工具类
* @date Feb 29, 2012 10:05:50 AM
* @author ZhangGang
*
*/
public abstract class SSLSocketRunnable extends SocketUtils implements Runnable{
/**
* 通讯成功标识
*/
protected static final String SOCKET_SUCCESS_FLAG ="success"; //成功通信
/**
* 通讯失败标识
*/
protected static final String SOCKET_FAIL_FLAG ="fail"; //失败通信
/**
* 通讯创建
* @param client 目标通讯实例
*/
public SSLSocketRunnable(Socket client) {
super(client);
}
/**
* 通讯线程执行方法 默认格式
* @return
* @throws Exception
*/
@Override
public void run() {
//- 校验 是否创建新通讯连接
if(socket==null && (StringUtils.isNotEmpty(ip) && port != null)){
try {
//-- create SocketFactory
SSLSocketFactory ssf = sSLContext.getSocketFactory();
//-- create socket
socket=(SSLSocket)ssf.createSocket(ip,port);
logger.debug("create socket success.");
//2014-1-23 hyx 如果建立socket成功但是startHandshake握手失败且未设置超时时间时则会一直阻塞
socket.setSoTimeout(1000*1000); //1000秒
//-- handshake 握手
((SSLSocket) socket).startHandshake();
logger.debug("handshake success.");
} catch (Exception e) {
logger.warn("Target communication:>"+ip+" create failure"+ExceptionPrintUtils.printExceptionStack(e));
close();
return ;
}
}
//- socket 不为空 执行通讯操作
if(socket!=null){
try {
//-- 获取通讯IO流
out = socket.getOutputStream();
in = socket.getInputStream();
//-- 自定义通讯操作
toDo();
}catch (Exception e) {
logger.error(ExceptionPrintUtils.printExceptionStack(e));
} finally {
logger.info("--- 通信关闭 ---");
close();
}
}
return ;
}
/**
* 待实现的通信操作
* @time Aug 28, 2011-9:04:46 PM
* @param out
* @param in
* @throws Exception
*/
protected abstract void toDo()throws Exception;
}

File diff suppressed because it is too large Load Diff