diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java deleted file mode 100644 index 6f2e146..0000000 --- a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ /dev/null @@ -1,129 +0,0 @@ -package cn.ac.iie.dao; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.service.read.ReadHistoryArangoData; -import cn.ac.iie.utils.ArangoDBConnect; -import cn.ac.iie.utils.ExecutorThreadPool; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -/** - * 获取arangoDB历史数据 - * - * @author wlh - */ -public class BaseArangoData { - private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - - public static ConcurrentHashMap> historyVertexFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyVertexIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyVertexSubscriberMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); - - private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); - - private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - - public void readHistoryData(String table, - ConcurrentHashMap> historyMap, - Class type) { - try { - LOG.warn("开始更新" + table); - long start = System.currentTimeMillis(); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) { - historyMap.put(i, new ConcurrentHashMap<>()); - } - CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER()); -// long[] timeRange = getTimeRange(table); - Long countTotal = getCountTotal(table); - for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) { -// String sql = getQuerySql(timeRange, i, table); - String sql = getQuerySql(countTotal, i, table); - ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch); - threadPool.executor(readHistoryArangoData); - } - countDownLatch.await(); - long last = System.currentTimeMillis(); - LOG.warn("读取" + table + " arangoDB 共耗时:" + (last - start)); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private Long getCountTotal(String table){ - long start = System.currentTimeMillis(); - Long cnt = 0L; - String sql = "RETURN LENGTH("+table+")"; - try { - ArangoCursor longs = arangoDBConnect.executorQuery(sql, Long.class); - while (longs.hasNext()){ - cnt = longs.next(); - } - }catch (Exception e){ - LOG.error(sql +"执行异常"); - } - long last = System.currentTimeMillis(); - LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); - return cnt; - } - - private String getQuerySql(Long cnt,int threadNumber, String table){ - long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER() + 1; - long offsetNum = threadNumber * sepNum; - return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc"; - } - - - private long[] getTimeRange(String table) { - long minTime = 0L; - long maxTime = 0L; - long startTime = System.currentTimeMillis(); - String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}"; - switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE()) { - case 0: - ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); - try { - if (timeDoc != null) { - while (timeDoc.hasNext()) { - BaseDocument doc = timeDoc.next(); - maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER(); - minTime = Long.parseLong(doc.getAttribute("min_time").toString()); - } - } else { - LOG.warn("获取ArangoDb时间范围为空"); - } - } catch (Exception e) { - e.printStackTrace(); - } - break; - case 1: - maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME(); - minTime = ApplicationConfig.READ_ARANGO_MIN_TIME(); - break; - default: - } - long lastTime = System.currentTimeMillis(); - LOG.warn(sql + "\n查询最大最小时间用时:" + (lastTime - startTime)); - return new long[]{minTime, maxTime}; - - } - - private String getQuerySql(long[] timeRange, int threadNumber, String table) { - long minTime = timeRange[0]; - long maxTime = timeRange[1]; - long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER(); - long maxThreadTime = minTime + (threadNumber + 1) * diffTime; - long minThreadTime = minTime + threadNumber * diffTime; - return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT() + " RETURN doc"; - } - -} diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java deleted file mode 100644 index 1ca66d7..0000000 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ /dev/null @@ -1,125 +0,0 @@ -package cn.ac.iie.service.read; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - - -/** - * @author wlh - * 多线程全量读取arangoDb历史数据,封装到map - */ -public class ReadHistoryArangoData extends Thread { - public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60; - private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); - static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR(); - - public static final HashSet PROTOCOL_SET; - - static { - PROTOCOL_SET = new HashSet<>(); - PROTOCOL_SET.add("HTTP"); - PROTOCOL_SET.add("TLS"); - PROTOCOL_SET.add("DNS"); - } - - private ArangoDBConnect arangoConnect; - private String query; - private ConcurrentHashMap> map; - private Class type; - private String table; - private CountDownLatch countDownLatch; - - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, - String query, - ConcurrentHashMap> map, - Class type, - String table, - CountDownLatch countDownLatch) { - this.arangoConnect = arangoConnect; - this.query = query; - this.map = map; - this.type = type; - this.table = table; - this.countDownLatch = countDownLatch; - } - - @Override - public void run() { - try { - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoConnect.executorQuery(query, type); - if (docs != null) { - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (T doc : baseDocuments) { - String key = doc.getKey(); - switch (table) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - deleteDistinctClientIpByTime(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; - default: - } - int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER(); - ConcurrentHashMap tmpMap = map.get(hashCode); - tmpMap.put(key, doc); - i++; - } - long l = System.currentTimeMillis(); - LOG.warn(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); - } - }catch (Exception e){ - e.printStackTrace(); - }finally { - countDownLatch.countDown(); - LOG.warn("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount()); - } - } - - private void updateProtocolDocument(T doc) { - if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { - for (String protocol : PROTOCOL_SET) { - String protocolRecent = protocol + "_CNT_RECENT"; - ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; - System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); - cntRecentsDst[0] = 0L; - doc.addAttribute(protocolRecent, cntRecentsDst); - } - } - } - - private void deleteDistinctClientIpByTime(T doc) { - ArrayList distCip = (ArrayList) doc.getAttribute("DIST_CIP"); - ArrayList distCipTs = (ArrayList) doc.getAttribute("DIST_CIP_TS"); - distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600); - Collections.sort(distCipTs); - int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600); - String[] distCipArr = new String[index]; - long[] disCipTsArr = new long[index]; - if (distCip.size() + 1 == distCipTs.size()){ - for (int i = 0; i < index; i++) { - distCipArr[i] = distCip.get(i); - disCipTsArr[i] = distCipTs.get(i); - } - } - doc.updateAttribute("DIST_CIP", distCipArr); - doc.updateAttribute("DIST_CIP_TS", disCipTsArr); - } - -} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index bdf8120..f5b0d42 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -4,7 +4,6 @@ package cn.ac.iie.service.update import java.lang import cn.ac.iie.config.ApplicationConfig -import cn.ac.iie.service.read.ReadHistoryArangoData import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import scala.collection.mutable @@ -13,6 +12,8 @@ import scala.collection.mutable.WrappedArray.ofRef object UpdateDocHandler { val PROTOCOL_SET: Set[String] = Set("HTTP","TLS","DNS") + private val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 + def updateMaxAttribute(hisDoc: BaseDocument,newAttribute:Long,attributeName:String): Unit ={ var hisAttritube = hisDoc.getAttribute(attributeName).toString.toLong if (newAttribute > hisAttritube){ @@ -99,7 +100,7 @@ object UpdateDocHandler { def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ val map = newDistinctIp.map(ip => { - (ip, ReadHistoryArangoData.currentHour) + (ip, currentHour) }).toMap doc.addAttribute("DIST_CIP",map.keys.toArray) doc.addAttribute("DIST_CIP_TS",map.values.toArray) @@ -112,7 +113,7 @@ object UpdateDocHandler { val distCipToTsMap: Map[String, Long] = hisDistCip.zip(hisDistCipTs).toMap val muDistCipToTsMap: mutable.Map[String, Long] = mutable.Map(distCipToTsMap.toSeq:_*) newDistinctIp.foreach(cip => { - muDistCipToTsMap.put(cip,ReadHistoryArangoData.currentHour) + muDistCipToTsMap.put(cip,currentHour) }) val resultMap = muDistCipToTsMap.toList.sortBy(-_._2).take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toMap hisDoc.addAttribute("DIST_CIP",resultMap.keys.toArray) diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala deleted file mode 100644 index 67590ff..0000000 --- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -package cn.ac.iie.service.update - -import java.util -import java.util.ArrayList -import java.util.concurrent.ConcurrentHashMap - -import cn.ac.iie.dao.BaseArangoData -import cn.ac.iie.dao.BaseArangoData._ -import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} - -import scala.collection.mutable.WrappedArray.ofRef - -object UpdateDocumentTest { - def main(args: Array[String]): Unit = { - val baseArangoData = new BaseArangoData() - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) - - val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys() - while (value.hasMoreElements) { - val integer: Integer = value.nextElement() - val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer) - val unit = map.keys() - while (unit.hasMoreElements) { - val key = unit.nextElement() - val edgeDocument = map.get(key) - // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]] - // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]] - val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]] - val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]] - println(longs.toString + "---" + strings.toString) - } - } - } - -}