package com.nms.server.thread.detecData; import java.io.File; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Callable; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import com.nms.server.bean.DetectInfo; import com.nms.server.bean.EmailInfo; import com.nms.server.bean.TableColumnsModel; import com.nms.server.common.Common; import com.nms.server.common.Constants; import com.nms.server.common.EmailTypeConstants; import com.nms.server.dao.CommonDao; import com.nms.server.service.CommonService; import com.nms.server.thread.socket.SSLClient; import com.nms.server.util.CSVUtils; import com.nms.server.util.FileUtils; import com.nms.server.util.StringUtil; /** * CSV解析线程 * @author ZGGG3 * */ public class NewDetecDataResoveThread implements Callable { Logger logger = Logger.getLogger(NewDetecDataResoveThread.class); volatile boolean stop = false;//线程是否被取消标志 private String name; // 自定义Thread Name private LinkedList dsbList; private SimpleDateFormat format2 = new SimpleDateFormat(Constants.COMMON_DATE_FORMAT); //Java Date 类型数据格式化格式 private long infoId = -1; private long currentId = -1; private long interval = -1; private int warnResumeNum = 0; private PreparedStatement infoNewInsertStmt; private PreparedStatement infoNewUpdateStmt; private List insertDataTonew; private List updateDataTonew; private PreparedStatement warnUpdateStmt; public NewDetecDataResoveThread(String name,LinkedList dsbList) { this.name = name; this.dsbList = dsbList; } /* * 线程操作 * * 依次解析urlList中仍存在的Files * * 实现了依次解析Files,由于后期实现多线程解析操作 */ public Object call() { // 为当前线程命名 ,用与开发阶段友好输出。 Thread.currentThread().setName(name +"-"+System.currentTimeMillis()); logger.info("配置文件设置批量入库监测数据条数:"+ Constants.DETECTION_INFO_DATA_MAX_ROWS); CommonDao dao = null; try { dao = new CommonDao(); CommonService service = new CommonService(dao); //-- 空数据集合 结束操作 if(dsbList == null || dsbList.size() == 0 ){ logger.debug("缓存监测数据 为空,无需解析"); return null; } //用于判断网络端口 long eNewTime = System.currentTimeMillis(); Map> allPortName= service.getAllPortName();//之后如果数据大了,影响性能了,可以改成获取一个seqid的数据和一个端口的list,查询seqid,得到下标,再从list中获取信息,目前先暂时不改 long portTime = System.currentTimeMillis()-eNewTime; int portTimeM=(int)portTime/1000/60; int portTimeS=(int)((long)(portTime/1000)%60); int portTimeMS=(int)portTime%1000; logger.info("第零步:获取端口表的所有数据,总耗时:"+(portTimeM)+"分 "+(portTimeS)+"秒 "+(portTimeMS)+"毫秒"); logger.debug("缓存监测数据 " + (dsbList.size())+ "条,解析开始"); //-- 遍历解析 Iterator dsbIte = dsbList.iterator(); int totalCount = dsbList.size(); int successCount=0; int falseCount = 0; int zeroByteCount = 0; int count = 1; int resoveCount=1; int allResove = 0; int allSuccess = 0; int allFail = 0; int allZero = 0; /* * 第一步:解析所有数据 * 新格式数据,旧格式同一放入 newDataMap 集合 */ //存放解析之后的数据:[["systemdate",["5",["1602",[{detectInfo1,detectInfo2,...}]],...],...],...] List> dataList = new LinkedList>(); long sTime = System.currentTimeMillis(); while (dsbIte.hasNext()&& !stop) {//线程未被中断 /* 单条解析异常捕捉处理 */ byte [] dsb = null; try { dsb = dsbIte.next(); /* 零字节文件记录和过滤 */ if (dsb != null && dsb.length > 0) { /* 解析非0字节文件内容 */ List strsList = CSVUtils.csvBytesParser(dsb,Constants.COMMON_TEXT_CODING); /* 新旧解析格式判断 */ /** * 新旧解析格式区别: * 区别:第一行基本信息数据 新格式12列固定比旧格式至少15列 * */ if(strsList!=null && strsList.size()>0){ //多行数据判断 Object[] detecInfo = null; String [] detailsStr = strsList.get(0); //取第一行数据 if(detailsStr != null){ if(detailsStr.length > 12){//旧格式 解析 logger.debug("旧格式数据"); detecInfo = service.resoveOldData(strsList,allPortName);//解析数据,不判断状态变更 }else if(detailsStr.length<12){ logger.debug( "基本数据解析长度:"+detailsStr.length+" 小于最小解析标准,不予解析;"); }else{//新格式 解析 logger.debug("新格式数据"); detecInfo = service.resoveNewData(strsList,allPortName);//解析数据,不判断状态变更 } } //将解析之后的数据放入集合,不为空说明解析成功 if(detecInfo != null){ successCount++;//成功解析 detecAddToList(dataList, detecInfo); }else{ falseCount ++;//解析失败 String filePath = getFilePath();//Constants.ERROR_DETEC_FILE_DIR+"/"+Common.getDateDirName()+"/"+Calendar.getInstance().getTimeInMillis()+".csv"; logger.error("Abnormal monitoring data format,file has been saved to "+filePath); FileUtils.wirteBytesToFile(dsb, filePath); } } }else{ zeroByteCount++; } //解析入库一条数据,从list中remove一条数据 dsbIte.remove(); //判断当前线程是否被中断 if (Thread.currentThread().isInterrupted()) { // 线程中断状态,不会改变中断状态的值 logger.info("监测数据解析线程 被中断"); stop = true;//如果中断执行(重启、或者升级),就停止解析数据,之后将数据入库 } }catch (InterruptedException e) { logger.error("Monitoring data parsing threads are interrupted",e); stop = true; }catch (ConcurrentModificationException e) { logger.debug("获取下一条监测数据 异常 停止本轮解析!",e); stop = true;//由于获取next的时候出错,之后的都出错了就,所以此处stop=true,停止循环 }catch (Exception e) { logger.debug("一条监测数据解析失败!",e); } if(count%Constants.BATCH_RESOVE_COUNT==0||stop||count==totalCount){ long resoveCurTime = System.currentTimeMillis(); long resoveTime = resoveCurTime-sTime; int resoveTimeM=(int)resoveTime/1000/60; int resoveTimeS=(int)((long)(resoveTime/1000)%60); int resoveTimeMS=(int)resoveTime%1000; logger.info("第一步:解析完毕,共:"+(resoveCount)+"条[成功:"+successCount+"条,失败:"+falseCount+"条,空数据:"+zeroByteCount+"条]" +" 总耗时:"+(resoveTimeM)+"分 "+(resoveTimeS)+"秒 "+(resoveTimeMS)+"毫秒"); insertDataTonew=new ArrayList(); updateDataTonew=new ArrayList(); warnUpdateStmt=dao.getConn().prepareStatement(DetectInfo.getChangeWarningInfoSql()); judgeStateAndSave(dataList,sTime,resoveCount,successCount,falseCount,zeroByteCount,service, dao); dataList.clear(); allResove = allResove + resoveCount; allSuccess = allSuccess+ successCount; allFail = allFail + falseCount; allZero = allZero + zeroByteCount; warnResumeNum = 0; resoveCount=0; successCount=0; falseCount=0; zeroByteCount=0; } resoveCount++; count++; } dsbList.clear(); //将监测数据入库后,需要检查新入库的数据是否为无效(处理监测数据的有效性标志) long startUpdateTime = System.currentTimeMillis(); updateDetectDataValid(dao); long updateCurTime = System.currentTimeMillis()-startUpdateTime; int updateCurTimeM=(int)updateCurTime/1000/60; int updateCurTimeS=(int)((long)(updateCurTime/1000)%60); int updateCurTimeMS=(int)updateCurTime%1000; logger.info("第四步:更新监测数据有效性完毕,总耗时:"+(updateCurTimeM)+"分 "+(updateCurTimeS)+"秒 "+(updateCurTimeMS)+"毫秒"); long eTime = System.currentTimeMillis(); long curTime = eTime-sTime; int m=(int)curTime/1000/60; int s=(int)((long)(curTime/1000)%60); int ms=(int)curTime%1000; logger.info("第一到四步,整体解析批量入库监测数据全过程执行完毕,共:"+(allResove)+"条[成功:"+allSuccess+"条,失败:"+allFail+"条,空数据:"+allZero+"条]" +" 总耗时:"+(m)+"分 "+(s)+"秒 "+(ms)+"毫秒"); } catch (Exception e) { logger.error("Running exception",e); }finally{ dao.closeCommonBatchStatement(); dao.closeDetailBatchStatement(); if(dao!=null){ dao.close(); dao=null; } //1:由web 主控控制入库 if(Constants.DETECT_INSERT_MODE == 1 ||Constants.DETECT_INSERT_MODE == 3){ //入库完成,通知web 主控释放锁 SSLClient.sengReleaseDetect(); } logger.debug("数据清理 解析完成"); } return null; } public void judgeStateAndSave(List> dataList,long sTime, int resoveCount, int successCount, int falseCount, int zeroByteCount, CommonService service,CommonDao dao) throws Exception{ /** * 第二步:判断监测状态变更 */ long resoveCurTime = System.currentTimeMillis(); if(dataList != null && dataList.size() >0){ for(List checkTypeList : dataList){ int checkTypeSize = checkTypeList.size(); for(int i = 1,j =checkTypeSize; i < j; i ++){ List setIdList = (List) checkTypeList.get(i); String setId = (String) setIdList.get(0);//监测设置id List detectionInfoNewData = null; List seqList = new ArrayList(); int setIdSize = setIdList.size(); /** * 2017年1月9日11:15:21 新增 添加过滤条件查询new表数据(只查询当前解析数据seqid的new表数据) */ for(int h = 1;h seqIdList = (List) setIdList.get(h); String seqId = (String) seqIdList.get(0);//seqId seqList.add(seqId); } /**以上*/ //获取new表中对应监测设置的监测数据 detectionInfoNewData = service.getDetectionInfoNewListBySet(setId,seqList); for(int ii = 1; ii < setIdSize ; ii ++){ List seqIdList = (List) setIdList.get(ii); String seqId = (String) seqIdList.get(0);//seqId List allDeteInfo = (List) seqIdList.get(1); if(allDeteInfo != null && allDeteInfo.size() > 0){ //detection_info_new中的 最新监测数据 Object[] oldDetecInfo = null; if(detectionInfoNewData != null){//detectionInfoNewData:监测设置--节点--监测信息 oldDetecInfo = service.getIndexObj(seqList, seqId,detectionInfoNewData); }else{ oldDetecInfo = null; } //数据库中没有当前监测类别对应的seqid节点的监测信息,不需要判断状态变更 if(oldDetecInfo != null){ if(allDeteInfo.size() <2 ){//只有一条数据直接比较数据库中的数据 //只有一条最新监测 Object[] newDetecInfo = allDeteInfo.get(0); int state = (Integer) newDetecInfo[DetectInfo.STATE]; /* * 入库数据不是延时数据时,与数据库中的最新数据比对判断状态变更 * 数据库中的监测时间小于当前需要入库的监测时间 */ if((Long)oldDetecInfo[DetectInfo.CHECKTIME] < (Long)newDetecInfo[DetectInfo.CHECKTIME]){ /*判断状态变更*/ stateChange(newDetecInfo, oldDetecInfo); }else{//监测时间小于数据库中监测数据的时间,说明当前需要入库的监测为延迟数据 if(state != 1){//延迟数据监测状态没有成功,数据库入库的是正常数据,补录一条报警信息 if((Integer) newDetecInfo[DetectInfo.STATE] == 1 && (Integer)oldDetecInfo[DetectInfo.PLEVEL] == 99){ newDetecInfo[DetectInfo.APPENDWARNINGINFO]= oldDetecInfo; warnResumeNum++; } } } }else{//大于两条数据时,排序之后判断数据库的数据是否为最接近的时间数据 //未入库的数据排序 Collections.sort(allDeteInfo, new Comparator() { @Override public int compare(Object[] o1, Object[] o2) { Long time1 = (Long) o1[DetectInfo.CHECKTIME]; Long time2 = (Long) o2[DetectInfo.CHECKTIME]; return time1 < time2 ? -1 : 1; } }); /*判断状态变更*/ judgeState(allDeteInfo, oldDetecInfo); } judgeAndCollect(allDeteInfo, oldDetecInfo, updateDataTonew,service); }else{//如果new表中没有数据,应为detection_info status_change_time设置初始值-->data_check_time if(allDeteInfo!=null&&allDeteInfo.size()>0){ for (Object[] newInfo : allDeteInfo) { if(newInfo[DetectInfo.STATUSCHANGTIME]==null||(Long)newInfo[DetectInfo.STATUSCHANGTIME]==-1){ newInfo[DetectInfo.STATUSCHANGTIME]= newInfo[DetectInfo.CHECKTIME]==null?newInfo[DetectInfo.STARTTIME]:newInfo[DetectInfo.CHECKTIME]; } } insertDataTonew.addAll(allDeteInfo); } } } } //清理内存 detectionInfoNewData.clear(); detectionInfoNewData = null; } } } long stateCurTime = System.currentTimeMillis(); long stateCheckTime = stateCurTime-resoveCurTime; int stateCheckM=(int)stateCheckTime/1000/60; int stateCheckS=(int)((long)(stateCheckTime/1000)%60); int stateCheck=(int)stateCheckTime%1000; logger.info("第二步:判断监测状态变更完毕(只针对解析成功的),共:"+resoveCount+"条 总耗时:"+(stateCheckM)+"分 "+(stateCheckS)+"秒 "+(stateCheck)+"毫秒"); logger.info(" 第二步-1: 补录告警恢复告警信息:"+warnResumeNum+"条"); /** * 第三步:批量入库 */ //判断入库标示 if(Constants.FLAG_RESOVE_COMMIT_DB != 1){ String s = "file.resove.commitDB.flag 不为1,数据不可保存,请检查配置文件"; logger.warn(s); // return null; return ; } int failBatchInsert=0; String failBatchInsertCheckType = "";//批量入库失败的监测类别信息 if(dataList != null && dataList.size() >0){ int infoCount = 0; int detailCount = 0; int sysinfoCount = 0; int warnningCount = 0; int statusChange = 0; int emailCount = 0; //监测设置对应的联系人email地址 Map> setInfoEmail = null; //未指定的监测设置通过seqid找到对应的联系人 Map> seqIdEmail = null; // List logList = new ArrayList(); dao.setAutoCommit(false); LinkedList dsbBatchList = new LinkedList();//用于批量入库后是否将监测数据存入硬盘的overrun :每次入库,或者存硬盘都需要清空此列表 //迭代器遍历,方便删除数据 Iterator> iteDataList = dataList.iterator(); while(iteDataList.hasNext()){ List checkTypeList = iteDataList.next();//["cpu",["1",["1602",[{detecAllInfo}]]]], String checkType = (String) checkTypeList.get(0);//监测类型 int checkTypeRecordNum=0;//每种监测类别入库的记录数 String insertDetailSql = null;//插入详细参数 List fieldNames = new ArrayList();//detail表的字段名 //详细信息对应的表名 String detailTableName = Common.getInsertTable().get(checkType) == null ? null : Common.getInsertTable().get(checkType).getTableName(); if(detailTableName == null){ logger.error(checkType + " the corresponding detailed table was not found and the data could not be stored"); continue; } String seqId = null; Iterator checkIte = checkTypeList.iterator(); boolean firstKey1 = true; while(checkIte.hasNext()){ if(firstKey1){ checkIte.next(); firstKey1 = false; }else{ List setIdList = (List) checkIte.next();//["1",["1602",[{detecAllInfo}]]] String setInfoId = (String) setIdList.get(0);//监测设置id Iterator setIte = setIdList.iterator(); boolean firstKey2 = true; while(setIte.hasNext()){ if(firstKey2){ firstKey2 = false; setIte.next(); }else{ List seqIdList = (List) setIte.next();//["1602",[{detecAllInfo,detecAllInfo}]] seqId = (String) seqIdList.get(0);//seqid List detecDatas = (List) seqIdList.get(1);//{detecAllInfo,detecAllInfo} Iterator detecIte = detecDatas.iterator(); long statusChangeTime = -1;//状态变更时间 while(detecIte.hasNext()){ Object[] temInfo = detecIte.next(); //将解析之后的数据还原 byte[] dsb = objectToString(temInfo); //将还原之后的数据缓存,出现异常时保存到文件 dsbBatchList.add(dsb); if(temInfo != null){ // 计算 详细表的 外键 时间戳(10) + seqId(5)+ setId (3),主要是为了使用 java 中long 类型 String checkTime = temInfo[DetectInfo.CHECKTIME].toString(); String setId = temInfo[DetectInfo.SETINFOID].toString(); String seqid = temInfo[DetectInfo.SEQID].toString(); String id = DetectInfo.computeId(checkTime, seqId, setId); // currentId = getSequenceId(service); currentId = Long.parseLong(id); //addbatch service.addInfoStmt(currentId,temInfo); infoCount ++; logger.debug(" detection_info表id:" + currentId + ",监测类别:" + checkType +",监测设置id:" + setInfoId + ",seqId:" + seqId); //添加详细信息 if((Boolean) temInfo[DetectInfo.DELYFLAG]){ //当delyFlag 为false时 不插入详细信息 List> details = (List>) temInfo[DetectInfo.DETAILS]; if(details!= null && details.size()>0 ){ for(Map tempMap :details){ //首先组织sql语句 if(insertDetailSql == null || "".equals(insertDetailSql)){ //组织detail表的insert语句 insertDetailSql = createDetailSql(fieldNames,detailTableName,tempMap); logger.debug("监测设置id: " + setInfoId +" ,detail表插入语句:" + insertDetailSql); } dao.setDetailVals(insertDetailSql,fieldNames,tempMap,currentId,detailTableName); checkTypeRecordNum++; detailCount ++; } } } //特殊格式追加信息 if(Constants.DETEC_SYSTEMINFO_STR.equalsIgnoreCase(checkType) ){ List diskInfoList = (List) temInfo[DetectInfo.DISKINFOLIST]; List netInfoList = (List) temInfo[DetectInfo.NETINFOLIST]; if(diskInfoList != null && diskInfoList.size() > 0){//disk for(String[] temDiskInfo : diskInfoList){ if(temDiskInfo != null && temDiskInfo.length >7){ service.addDiskStmt(currentId,temDiskInfo); sysinfoCount ++; } } } if(netInfoList != null && netInfoList.size() > 0){//net for(String[] temNetInfo : netInfoList){ if(temNetInfo != null && temNetInfo.length >7){ service.addNetStmt(currentId,temNetInfo); sysinfoCount ++; } } } } //添加告警信息:告警信息=状态异常+状态改变+推迟入库的 if((Integer)temInfo[DetectInfo.STATE] != 1 || (Boolean)temInfo[DetectInfo.STATECHANGEFLAG]){ service.addWarningStmt(currentId +"",temInfo); // logList.addAll(generateLog(dsb," 第三步-1: 添加一条告警信息,状态发生变更或者数据异常!state="+temInfo[DetectInfo.STATE]+",isChange="+(Boolean)temInfo[DetectInfo.STATECHANGEFLAG])); warnningCount ++; } //延迟监测数据追加告警信息 if(temInfo[DetectInfo.APPENDWARNINGINFO] != null){ Object[] warningInfo = (Object[]) temInfo[DetectInfo.APPENDWARNINGINFO]; service.addWarningStmt((String)warningInfo[DetectInfo.DETECTIONINFOID],warningInfo); // logList.addAll(generateLog(dsb," 第三步-1: 追加一条延迟数据的告警信息!")); warnningCount ++; } //添加邮件信息 if(Constants.flag_email != 1){ logger.info("邮件功能已关闭"); }else{ if((Boolean) temInfo[DetectInfo.SENDEMAILFLAG]){ EmailInfo emailInfo = (EmailInfo) temInfo[DetectInfo.EMAILINFO]; if(emailInfo != null){ if(setInfoEmail == null){ setInfoEmail = service.getDetecSetEmailListOf123(); } if(seqIdEmail == null){ seqIdEmail = service.getDetecEmailListOf4(); } if((setInfoEmail == null || setInfoEmail.size() == 0) && (seqIdEmail == null || seqIdEmail.size() == 0)){ logger.debug("未找到邮件地址!"); }else{ List emailAddress = null; //1.首先通过监测设置的map中查找 if(setInfoEmail != null){ emailAddress = setInfoEmail.get(setInfoId); } if(emailAddress == null){ emailAddress = seqIdEmail.get(seqId); } if(emailAddress != null && emailAddress.size() >0){ for(String tempAddress : emailAddress){ if(StringUtils.isNotEmpty(tempAddress)){ emailCount ++; //addbatch service.addEmailStmt(emailInfo,tempAddress); } } } } } } } /*判断状态变更时间的最大时间,用于更新new表 当监测类型为 nmsclient 时,比较多条相同监测节点数据的状态变更时间,取最大的一个 */ if(checkType.equalsIgnoreCase(Constants.DETEC_NMSC_STR)){ long temChangeTime = (Long) temInfo[DetectInfo.STATUSCHANGTIME]; if(temChangeTime > 0 ){ if(temChangeTime > statusChangeTime){ statusChangeTime = temChangeTime; } } } } //dsbBatchList.add((byte[]) temInfo[27]); //判断infoStmt的数量是否达到最大入库条数 /*if(infoCount != 0 && infoCount % Constants.DETECTION_INFO_DATA_MAX_ROWS == 0){ int dataLen = dsbBatchList.size(); try { //执行批处理 dao.executeBatchAllStmt(false); //提交事务 dao.commit(); } catch (SQLException e){ dao.rollback(); handleException(dsbBatchList, e); failBatchInsert = failBatchInsert + dataLen; failBatchInsertCheckType = failBatchInsertCheckType + checkType+" "; }catch (Exception e) { logger.error("批处理保存失败,数据库错误,共"+dataLen+"条",e); dao.rollback(); //批量入库失败以后,要将失败的监测数据存到硬盘的overrun里,以备下次进行再次入库 Common.saveByteToFile(dsbBatchList); failBatchInsert = failBatchInsert + dataLen; failBatchInsertCheckType = failBatchInsertCheckType + checkType+" "; } dsbBatchList.clear(); }*/ detecIte.remove();//删除,内存优化 } /* * 更新状态变更时间 * 只有当监测类型:nmsclient 时,statusChangeTime才会大于0 */ if(statusChangeTime > 0){ service.addStateChangeStmt(statusChangeTime,seqId); statusChange++; logger.info("状态变更详细信息:seqId="+seqId+" changeTime="+statusChangeTime); } } setIte.remove();//删除监测设置 } } checkIte.remove();//删除监测设置,内存优化 } iteDataList.remove();//删除监测类型,内存优化 int dataLastLen = dsbBatchList.size(); try{ //执行批处理 dao.executeBatchAllStmt(true); service.saveToNew(insertDataTonew, 0, infoNewInsertStmt); service.saveToNew(updateDataTonew, 1, infoNewUpdateStmt); warnUpdateStmt.executeBatch(); //提交事务 dao.commit(); }catch (SQLException e){ logger.error("",e); dao.rollback(); handleException(dsbBatchList, e); failBatchInsert = failBatchInsert + dataLastLen; failBatchInsertCheckType = failBatchInsertCheckType + checkType+" "; }catch(Exception e){ logger.error("Batch save failed, database error, total"+dsbBatchList.size(),e); dao.rollback(); //批量入库失败以后,要将失败的监测数据存到硬盘的overrun里,以备下次进行再次入库 logger.error("Monitoring data batch storage, database error, the file has been saved to the hard disk."); Common.saveByteToFile(dsbBatchList); failBatchInsert = failBatchInsert + dataLastLen; failBatchInsertCheckType = failBatchInsertCheckType + checkType+" "; }finally{ //关闭stmt:此处逻辑不太顺,最后会有一个对dao的整体的关闭,但是本线程中使用的都是自己定义的preparedstatement,不够统一 dao.closeDetailBatchStatement(); dsbBatchList.clear(); } // logList.add(" 第三步-1: 监测类型:" + checkType +" ,批量入库结束,记录数: "+checkTypeRecordNum); } long endTime = System.currentTimeMillis(); long curTime = endTime-stateCurTime; int m=(int)curTime/1000/60; int s=(int)((long)(curTime/1000)%60); int ms=(int)curTime%1000; // logList.add(" 第三步-1: 共"+dataList.size()+" 种详细信息 " ); // logList.add(" 第三步-1: new表状态变更 共"+statusChange+" 条 " ); logger.info("第三步:批量保存入库完毕(只针对解析成功的),共:"+(dataList.size())+"条[共插入detection_info表: " + infoCount +" 条,detail表: " + detailCount +" 条,特殊追加信息: " + sysinfoCount +" 条,warning 表: " +warnningCount +" 条。]" +" 总耗时:"+(m)+"分 "+(s)+"秒 "+(ms)+"毫秒,批量入库失败记录数:"+failBatchInsert+" 具体"+failBatchInsertCheckType); // printLogs(logList); } } public void handleException(LinkedList dsbBatchList, SQLException e) { try { //如果数据过错误是:1、ORA-01861:文字与格式字符串不匹配 // (一般情况下) 2、ORA-01400:不能将空值插入 3、ORA-01722:无效数字 // 4、ORA-00351:时间无效 // 5、ORA-01401: 插入的值过大 // 属于文件内容的错误所以保存到 错误文件夹中 int size = dsbBatchList.size(); if(e.getMessage().indexOf("ORA-01722")>=0 || e.getMessage().indexOf("ORA-01400")>=0 || e.getMessage().indexOf("ORA-01401")>=0 || e.getMessage().indexOf("ORA-01861")>=0 || e.getMessage().indexOf("ORA-00351")>=0){ logger.error("Monitoring data batch storage:Monitor data format exception,a total of"+size+",the file has been saved to "+getParentDirPath(),e); for(int j=0;j> dataList,Object[] detecInfo){ String checkType = (String) detecInfo[DetectInfo.CHECKTYPE]; String setInfoId = (String) detecInfo[DetectInfo.SETINFOID]; String seqId = (String) detecInfo[DetectInfo.SEQID]; /** * 设置默认值 */ if(detecInfo[DetectInfo.STATE] == null){ detecInfo[DetectInfo.STATE] = -1;//state } if(detecInfo[DetectInfo.PLEVEL] == null){ detecInfo[DetectInfo.PLEVEL] = 99;//pLevel } if(detecInfo[DetectInfo.DELYFLAG] == null){ detecInfo[DetectInfo.DELYFLAG] = false;//delyFlag } if(detecInfo[DetectInfo.URGENTLEVEL] == null){ detecInfo[DetectInfo.URGENTLEVEL] = 0;//urgentLevel } if(detecInfo[DetectInfo.SENDEMAILFLAG] == null){ detecInfo[DetectInfo.SENDEMAILFLAG] = false;//sendEmailFlag } if(detecInfo[DetectInfo.STATECHANGEFLAG] == null){ detecInfo[DetectInfo.STATECHANGEFLAG] = false;//stateChangeFlag } if(detecInfo[DetectInfo.STATUSCHANGTIME] == null){ detecInfo[DetectInfo.STATUSCHANGTIME] = -1L;//statusChangeTime } if(dataList.size() == 0){ List detecList = new LinkedList(); detecList.add(detecInfo); List seqList = new LinkedList();//seqid list seqList.add(seqId); seqList.add(detecList); List setIdList = new LinkedList();//监测设置 id list setIdList.add(detecInfo[DetectInfo.SETINFOID]); setIdList.add(seqList); List checkTypeList = new LinkedList(); checkTypeList.add(checkType); checkTypeList.add(setIdList); dataList.add(checkTypeList); }else{ //第一层 checkType for(int i = 0 , j = dataList.size(); i < j ; i ++){ List tempObj = dataList.get(i); String key = (String)tempObj.get(0);//第一个为key if(checkType.equals(key)){//监测类型已存在 for(int ii = 1 , jj = tempObj.size(); ii < jj ; ii ++){ List tempSet = (LinkedList) tempObj.get(ii); String setKey = (String) tempSet.get(0);//监测设置id if(setInfoId.equals(setKey)){ for(int iii = 1 ,jjj = tempSet.size(); iii < jjj; iii ++){ List tempSeq = (LinkedList) tempSet.get(iii); String seqKey = (String) tempSeq.get(0);//监测设置id if(seqId.equals(seqKey)){//seqId 存在 List detecList = (List) tempSeq.get(1); detecList.add(detecInfo); break; }else if(iii == jjj -1){//seqid 不存在 List detecList = new LinkedList(); detecList.add(detecInfo); List seqList = new LinkedList();//seqid list seqList.add(seqId); seqList.add(detecList); tempSet.add(seqList); } } break; }else if(ii == jj-1){//监测设置id不存在 List detecList = new LinkedList(); detecList.add(detecInfo); List seqList = new LinkedList();//seqid list seqList.add(seqId); seqList.add(detecList); List setIdList = new LinkedList();//监测设置 id list setIdList.add(detecInfo[DetectInfo.SETINFOID]); setIdList.add(seqList); tempObj.add(setIdList); } } break;//跳出循环 }else if(i == j-1){//不存在 List detecList = new LinkedList(); detecList.add(detecInfo); List seqList = new LinkedList();//seqid list seqList.add(seqId); seqList.add(detecList); List setIdList = new LinkedList();//监测设置 id list setIdList.add(detecInfo[DetectInfo.SETINFOID]); setIdList.add(seqList); List checkTypeList = new LinkedList(); checkTypeList.add(checkType); checkTypeList.add(setIdList); dataList.add(checkTypeList); } } } } /** * 组织detail表的insert语句 * @param fieldNames * @param detailTableName * @param tempMap * @return */ public String createDetailSql(List fieldNames, String detailTableName, Map tempMap) { String insertDetailSql; StringBuffer preSql = new StringBuffer(); StringBuffer sufSql = new StringBuffer(); preSql.append(",detection_info_id"); sufSql.append(",?"); fieldNames.add("detection_info_id"); for(String tempField : tempMap.keySet()){ fieldNames.add(tempField); preSql.append("," + tempField); String value = tempMap.get(tempField); sufSql.append(",?"); } insertDetailSql = "insert into "+detailTableName +" ( "+ preSql.substring(1) +" ) values (" + sufSql.substring(1) +" )"; return insertDetailSql; } /** * 状态变更判断:只对大于等于new表监测时间的监测数据进行是否告警的判断 * @param allDetectInfo * @param inDbInfo */ public void judgeState(List allDetectInfo , Object[] inDbInfo){ /*判断每一个的状态变更 * 1.监测时间小于已经入库的监测数据时间不做判断 * */ int startIndex = 0; for(int i = 0, j = allDetectInfo.size(); i < j ; i++){ Object[] newDetecInfo = allDetectInfo.get(i); Object[] oldDetecInfo = null;//前一条监测数据 //监测时间小于已经入库的监测数据时间不做判断,只在最后一条数据判断插入告警信息 if( newDetecInfo== null || (Long) newDetecInfo[DetectInfo.CHECKTIME] < (Long)inDbInfo[DetectInfo.CHECKTIME]){ startIndex++; if(i < j -1){ continue; }else{ //集合中的最后一条数据监测时间也小于数据库中的时间时,最后一条数据判断是否插入一条告警信息 if((Integer)newDetecInfo[DetectInfo.STATE] != 1){//延迟数据监测状态没有成功,数据库入库的是正常数据,补录一条报警信息 if((Integer)inDbInfo[DetectInfo.STATE] == 1 && (Integer)inDbInfo[DetectInfo.PLEVEL] == 99){ newDetecInfo[DetectInfo.APPENDWARNINGINFO] = oldDetecInfo; warnResumeNum++; } } } } //判断最新的监测数据 if(i== startIndex){ oldDetecInfo = inDbInfo; }else{ oldDetecInfo = allDetectInfo.get(i-1); } /*判断状态变更*/ stateChange(newDetecInfo, oldDetecInfo); } } /** * 判断状态变更具体实现 * @param newDetecInfo * @param oldDetecInfo */ public void stateChange(Object[] newDetecInfo,Object[] oldDetecInfo ){ /** * 判断状态变更 */ if(newDetecInfo == null || oldDetecInfo == null || ((Long)newDetecInfo[DetectInfo.CHECKTIME] < (Long)oldDetecInfo[DetectInfo.CHECKTIME])){ return; } if(((Integer)oldDetecInfo[DetectInfo.STATE] != (Integer)newDetecInfo[DetectInfo.STATE]) || //状态改变 (((Integer)oldDetecInfo[DetectInfo.STATE] == (Integer)newDetecInfo[DetectInfo.STATE] && (Integer)oldDetecInfo[DetectInfo.PLEVEL]!= (Integer)newDetecInfo[DetectInfo.PLEVEL] ))){//状态未变,告警级别改变 int state = (Integer)newDetecInfo[DetectInfo.STATE]; //邮件告警信息 StringBuffer alarmInfo = new StringBuffer((String)newDetecInfo[DetectInfo.ALARMINFO]);//告警信息 String dsinfo = (String) newDetecInfo[DetectInfo.DSINFO];//状态信息 String checkType = (String) newDetecInfo[DetectInfo.CHECKTYPE];//监测类别 long checkTime = (Long) newDetecInfo[DetectInfo.CHECKTIME]; int actionType = 11; int urgentLevel = EmailTypeConstants.URGENT_LATER; //整理告警数据,拼写邮件信息 if(state == -1){ //执行失败 // alarmInfo.append("监测执行失败\n "); // alarmInfo.append("Failure to monitor execution\n "); alarmInfo.append("i18n_server.DetecDataResoveThread.alarmInfo1_n81i\n "); alarmInfo.append(dsinfo+"\n"); actionType= EmailTypeConstants.TYPE_DETECTION_INFO_EXCEPTION; if(Constants.DETEC_NMSC_STR.equalsIgnoreCase(checkType)) { urgentLevel = EmailTypeConstants.URGENT_IMMEDIATELY; } }else if(state == 0){ //一般异常 alarmInfo.append(" "+dsinfo+"\n"); //如果是握手监测,不添加告警设置字段相关信息 if(!Constants.DETEC_NMSC_STR.equalsIgnoreCase(checkType)) { alarmInfo.append(dsinfo+" \n"); }else { alarmInfo.delete(0,alarmInfo.length());//如果是握手监测,则不添加告警设置字段相关信息 alarmInfo.append(dsinfo+" \n"); urgentLevel = EmailTypeConstants.URGENT_IMMEDIATELY; } actionType= EmailTypeConstants.TYPE_DETECTION_INFO_EXCEPTION; }else if(state == 1){ //恢复正常 // alarmInfo.append("监测恢复正常"); // alarmInfo.append("Monitoring back to normal"); alarmInfo.append("i18n_server.DetecDataResoveThread.alarmInfo1_n81i"); actionType= EmailTypeConstants.TYPE_DETECTION_INFO_RECOVER; urgentLevel = EmailTypeConstants.URGENT_LATER; //更新该节点所有监测时间为当前监测时间 if(checkType.equalsIgnoreCase(Constants.DETEC_NMSC_STR)){ newDetecInfo[DetectInfo.STATUSCHANGTIME] = (checkTime); } } // 报警通知 newDetecInfo[DetectInfo.STATECHANGEFLAG] = (true); newDetecInfo[DetectInfo.ALARMINFO] = (alarmInfo.toString()); newDetecInfo[DetectInfo.SENDEMAILFLAG] = (true); newDetecInfo[DetectInfo.URGENTLEVEL] = (urgentLevel); //目前代码中是否紧急和紧急级别不对应,有的是紧急,但是级别是2,所以此处进行修改,如果是紧急,则级别修改为0级 if(EmailTypeConstants.URGENT_IMMEDIATELY==urgentLevel){ newDetecInfo[DetectInfo.PLEVEL] = Constants.LEVEL_OF_EMERGENCY; } //如果是0级,则设置为紧急,使得紧急状态和级别一致 if(newDetecInfo[DetectInfo.PLEVEL].equals(Constants.LEVEL_OF_EMERGENCY)){ newDetecInfo[DetectInfo.URGENTLEVEL] = EmailTypeConstants.URGENT_IMMEDIATELY; } long seqIdLong = Long.parseLong((String)newDetecInfo[DetectInfo.SEQID]); long setInfoIdLong = Long.parseLong((String)newDetecInfo[DetectInfo.SETINFOID]); String emailContent = alarmInfo.toString(); if(emailContent.getBytes().length>290){ emailContent = StringUtil.substring(emailContent,290); } EmailInfo emailInfo = new EmailInfo(actionType,Common.getSetInfoNameMape().get(setInfoIdLong)+"("+Common.getCheckTypeNameMape().get(checkType)+")",Common.getNodeIpByUUID(seqIdLong), format2.format(new Date(checkTime)),emailContent,EmailTypeConstants.FLAG_SEND_LATER,urgentLevel); newDetecInfo[DetectInfo.EMAILINFO] = (emailInfo); //如果新入数据和new表中的监测状态不同,或者监测状态相同,告警级别不同 ,修改 状态改变时间 为获取新入数据的时间(data_check_time) newDetecInfo[DetectInfo.STATUSCHANGTIME]=newDetecInfo[DetectInfo.CHECKTIME]; } else{//如果新入数据和new表中的监测状态相同,且告警级别相同 ,则状态改变时间更新为new表中的时间 newDetecInfo[DetectInfo.STATUSCHANGTIME]=oldDetecInfo[DetectInfo.STATUSCHANGTIME]; } } /** * id 和 currentId 同时 = -1时,为第一次获取sequence id * result[0] 为currentID * result[1] 为基准id,获取的sequence id * @param id * @param currentId * @return */ public long getSequenceId(CommonService service){ long result = -1L; if( infoId != -1 && currentId != -1 && currentId < (infoId + interval -1) && interval != -1){ currentId ++; result = currentId; }else{ //-- 申请sequences for detection_info long[] temp = service.getDetectionInfoSequenceID(); if(temp[0] > 0){ result = infoId = currentId = temp[0];//获取的id interval = temp[1];//步进 } } return result; } /** * * 获取文件路径,判断当天目录下的文件如果>=500个,则新创建父目录, * 创建父目录的原则是"日期_1"、"日期_2"、"日期_3"累加的形式 * * @author jsj Apr 28, 2013 * @version 1.0 * @return */ private String getFilePath() { String filePath = ""; try { String parentDirPath = getParentDirPath(); filePath = parentDirPath + "/" + Calendar.getInstance().getTimeInMillis() + ".csv"; } catch (Exception e) { logger.error("Getting the error file path exception", e); } return filePath; } /** * *获取文件的父目录 * 获取当天的父母目录,如果父目录不存在,则为第一次写入文件,直接返回当天的目录, 存在则判断目录下的文件是否>-500,>=500则新建日期目录,否则取当前目录 * @author jsj Apr 28, 2013 * @version 1.0 * @return */ private String getParentDirPath() { String parentDirPath = Constants.ERROR_DETEC_FILE_DIR; String currentDirName = Common.getDateDirName(); String dirPath = parentDirPath + "/" + currentDirName; File parentDirFile = new File(parentDirPath); List existDir = new ArrayList(); if (parentDirFile.exists()) { File[] dirFiles = parentDirFile.listFiles(); if (dirFiles.length > 0) { for (File dirFile: dirFiles) { if (dirFile.getName().contains(Common.getDateDirName())) { existDir.add(dirFile); } } } } else { return dirPath; } boolean isGetFile = false; for (File dirFile: existDir) { File[] files = dirFile.listFiles(); if (currentDirName.compareTo(dirFile.getName()) < 0) { currentDirName = dirFile.getName(); } if (files.length < Constants.ERROR_DETEC_FILE_DIR_FILE_SIZES) { dirPath = dirFile.getAbsolutePath(); isGetFile = true; break; } } if (!isGetFile) { String[] currentDirNames = currentDirName.split("_"); if (currentDirNames.length == 1) { currentDirName += "_1"; } else { currentDirName = currentDirNames[0] + "_" + +(Integer.parseInt(currentDirNames[1]) + 1); } dirPath = parentDirPath + "/" + currentDirName; } return dirPath; } //检查入库的监测数据中是否有无效的数据,有则置为无效 public void updateDetectDataValid(CommonDao dao) throws Exception{ CommonService service = new CommonService(dao); Map detectSetSeqIdMap = service.getDetectSetInfo();//查询有效监测数值对应的seqid Map detectDataSeqIdMap = service.getDetectSeq(Constants.DETECTION_INFO_TABLE_NAME);//统计 监测数据 表: 监测设置<->已有有效数据的seqid Iterator detectDateSeqIte = detectDataSeqIdMap.entrySet().iterator(); String invalidSeqIds = ""; List sqls = new ArrayList(); while(detectDateSeqIte.hasNext()) { invalidSeqIds = ""; Entry en = (Entry)detectDateSeqIte.next(); String setId = (String)en.getKey(); String seqIdsValid = detectSetSeqIdMap.get(setId);//得到监测设置当前有效的seqids:1,2,3 if(seqIdsValid!=null) {//如果有效的监测设置内没有此监测设置,则不予处理 seqIdsValid = "," + seqIdsValid + ","; String[] seqIds = (String[])en.getValue(); for(String seqIdTmp:seqIds) { if(seqIdTmp!=null && !"".equals(seqIdTmp)) { if(!seqIdsValid.contains(","+seqIdTmp+",")) {//不在有效范围内,则更新为无效 invalidSeqIds = invalidSeqIds + seqIdTmp + ","; } } } if(invalidSeqIds.endsWith(",")) { invalidSeqIds = invalidSeqIds.substring(0,invalidSeqIds.length()-1); sqls.add("update "+Constants.DETECTION_INFO_TABLE_NAME+" t set valid=0 where t.valid=1 and t.detection_set_info_id="+setId+" and t.seq_id in ("+invalidSeqIds+")"); } } } if(sqls.size()>0){//一般情况下不需要更新,所以,如果不加if判断,则每次都会浪费时间去创建statement dao.dbUpdateByBatch(sqls); } } /** *打印统计的监测数据:监测类别--监测设置--节点--监测信息 */ private void printDetectDataInfo(Map>>> newDataMap) { /** *打印统计的监测数据:监测类别--监测设置--节点--监测信息 */ if(newDataMap != null && newDataMap.size()>0){ logger.debug("解析完毕:共" +newDataMap.size()+ "个监测类型"); for(Map.Entry>>> setInfoIdMap :newDataMap.entrySet()){ logger.debug("监测类型: " + setInfoIdMap.getKey() + " 共有:" + setInfoIdMap.getValue().size() +" 个监测设置项"); if(setInfoIdMap.getValue() != null){ for(Map.Entry>> me : setInfoIdMap.getValue().entrySet() ){ if(me.getValue() != null){ logger.debug("监测设置id: " + me.getKey() + " 共包含:" + me.getValue().size() +" 节点"); for(Map.Entry> seqMe : me.getValue().entrySet()){ if(seqMe.getValue() != null){ logger.debug("seqId : " + seqMe.getKey() + " 共包含:" + seqMe.getValue().size() +" 条监测数据"); } } } } } } } } /** * 将解析之后的监测数据写入到文件 * @param temInfo * @param file * @return */ private byte[] objectToString(Object[] temInfo){ byte[] dsb = null; try { // ------- 总监测数据组织 int index = 0; String[] totalData = new String[12]; totalData[index++] = (String) temInfo[DetectInfo.SEQID];// UUID totalData[index++] = (String) temInfo[DetectInfo.SETINFOID];// 监测设置ID String checkType = totalData[index++] = (String) temInfo[DetectInfo.CHECKTYPE];// 监测类别 totalData[index++] = (String) temInfo[DetectInfo.PROCESS];// 进程名称 totalData[index++] = temInfo[DetectInfo.STARTTIME] +"";// 监测服务启动时间 totalData[index++] = temInfo[DetectInfo.DELAYTIME] +"";// 检测时延(秒) totalData[index++] = temInfo[DetectInfo.CHECKTIME]+ "";// 本次检测时间 totalData[index++] = temInfo[DetectInfo.CURRENTTIMES] + "";// 尝试次数 totalData[index++] = temInfo[DetectInfo.NEXTCHECKTIME]+"";// 下次计划监测时间 totalData[index++] = temInfo[DetectInfo.STATE] + "";// 执行状态是否成功是否正常 totalData[index++] = temInfo[DetectInfo.DSINFO] +"";// 状态信息(描述信息) totalData[index++] = temInfo[DetectInfo.PDATA] +"";// 性能数据 /** * 详细数据前三个告警相关信息 */ List showNumList = (List) temInfo[DetectInfo.SHOWNUMLIST]; List plevelList = (List) temInfo[DetectInfo.PLEVELLIST]; List alarmList = (List) temInfo[DetectInfo.ALARMLIST]; List dataList = new LinkedList(); // 总数据 dataList.add(totalData); //详细信息 List> details = (List>) temInfo[DetectInfo.DETAILS]; if(details != null && details.size() > 0){ Map tableMap = Common.getTableMap().get(checkType); Map fieldMap = fieldNameToLower(tableMap);//将表字段名称转换成小写 // 详细信息 dataList.add(new String[]{"details", details.size()+ ""});// details(解析标识),详细信息条数 List detailDataList = new LinkedList(); int listIndex = 0; for(Map temp : details){ //单条详细信息 String[] tempDetail = new String[tableMap.size() + 4]; tempDetail[0] = showNumList.get(listIndex); tempDetail[1] = plevelList.get(listIndex); tempDetail[2] = alarmList.get(listIndex); for(Map.Entry tempEntry : temp.entrySet()){ String fieldName = tempEntry.getKey().toLowerCase(); if("seq_id".equals(fieldName) || //解析时添加的数据跳过 "detectioned_state".equals(fieldName) || "detection_set_info_id".equals(fieldName) || "data_check_time_digital".equals(fieldName) || "data_check_time".equals(fieldName) || "data_arrive_time_digital".equals(fieldName) || "data_arrive_time".equals(fieldName) ){ continue; } String value = tempEntry.getValue(); int arrIndex = fieldMap.get(fieldName) + 2; tempDetail[arrIndex] = value; } detailDataList.add(tempDetail); listIndex ++; } dataList.addAll(detailDataList);// 具体的详细数据 // 特殊数据定制(目前是针对系统信息监测systeminfo) if ("systeminfo".equalsIgnoreCase(checkType)) { List netList = (List) temInfo[DetectInfo.NETINFOLIST]; if(netList != null && netList.size() > 0){ dataList.add(new String[]{"net", netList.size() + ""});// 解析标识, 行数(当前解析标识指定的类型有多少条数据) dataList.addAll(netList);// 多条数据 } List diskList = (List) temInfo[DetectInfo.DISKINFOLIST]; if(netList != null && diskList.size() > 0){ dataList.add(new String[]{"disk", netList.size() + ""});// 解析标识, 行数(当前解析标识指定的类型有多少条数据) dataList.addAll(diskList);// 多条数据 } } } dsb = CSVUtils.csvBytesPrinter1(dataList, Constants.COMMON_TEXT_CODING); } catch (Exception e) { e.printStackTrace(); logger.error("Conversion from Object[] to byte[] failure",e); } return dsb; } /** * 表字段名称全部转换成小写,并fieldName做key * @param tableMap * @return */ private Map fieldNameToLower(Map tableMap){ Map result = new HashMap(); if(tableMap != null && tableMap.size()>0){ for(Map.Entry temp : tableMap.entrySet()){ TableColumnsModel column = temp.getValue(); if(column != null){ result.put(column.getFiledName().toLowerCase(), temp.getKey()); } } } return result; } private List generateLog(byte[] info,String head) { List warnInfoList = new ArrayList(); try { //byte[] info = (byte[]) temInfo[27]; List strsList = CSVUtils.csvBytesParser(info,Constants.COMMON_TEXT_CODING); String time = System.currentTimeMillis()+""; for(String[] strArr:strsList) { // logger.info(head+" "+time +" "+StringUtils.join(strArr,",")); warnInfoList.add(head+" "+time +" "+StringUtils.join(strArr,",")); } } catch (Exception e){ } return warnInfoList; } private void printLogs(List logList) { for(String log:logList){ logger.info(log); } } private void judgeAndCollect(List allDeteInfo, Object[] oldDetecInfo,List newDataList,CommonService service){ Object[] newDeteInfo = allDeteInfo.get(0); if((Long)newDeteInfo[DetectInfo.CHECKTIME]>(Long)oldDetecInfo[DetectInfo.CHECKTIME]){ newDataList.add(newDeteInfo); } if((Integer)newDeteInfo[DetectInfo.STATE]==1){ Object[] params={newDeteInfo[DetectInfo.SEQID],newDeteInfo[DetectInfo.SETINFOID]}; try { service.addRecordToStatement(warnUpdateStmt, params); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String[] args) { } }