diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 6f2e146..1e67c11 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -6,7 +6,6 @@ import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ExecutorThreadPool; import com.arangodb.ArangoCursor; import com.arangodb.entity.BaseDocument; -import com.arangodb.entity.BaseEdgeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,22 +19,12 @@ import java.util.concurrent.CountDownLatch; */ public class BaseArangoData { private static final Logger LOG = LoggerFactory.getLogger(BaseArangoData.class); - - public static ConcurrentHashMap> historyVertexFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyVertexIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyVertexSubscriberMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationFqdnAddressIpMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationIpVisitFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationFqdnSameFqdnMap = new ConcurrentHashMap<>(); - public static ConcurrentHashMap> historyRelationSubsciberLocateIpMap = new ConcurrentHashMap<>(); - private static ArangoDBConnect arangoDBConnect = ArangoDBConnect.getInstance(); private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - public void readHistoryData(String table, - ConcurrentHashMap> historyMap, - Class type) { + public ConcurrentHashMap> readHistoryData(String table, Class type) { + ConcurrentHashMap> historyMap = new ConcurrentHashMap<>(); try { LOG.warn("开始更新" + table); long start = System.currentTimeMillis(); @@ -43,10 +32,8 @@ public class BaseArangoData { historyMap.put(i, new ConcurrentHashMap<>()); } CountDownLatch countDownLatch = new CountDownLatch(ApplicationConfig.THREAD_POOL_NUMBER()); -// long[] timeRange = getTimeRange(table); Long countTotal = getCountTotal(table); for (int i = 0; i < ApplicationConfig.THREAD_POOL_NUMBER(); i++) { -// String sql = getQuerySql(timeRange, i, table); String sql = getQuerySql(countTotal, i, table); ReadHistoryArangoData readHistoryArangoData = new ReadHistoryArangoData<>(arangoDBConnect, sql, historyMap, type, table, countDownLatch); threadPool.executor(readHistoryArangoData); @@ -57,6 +44,7 @@ public class BaseArangoData { } catch (Exception e) { e.printStackTrace(); } + return historyMap; } private Long getCountTotal(String table){ @@ -72,58 +60,18 @@ public class BaseArangoData { LOG.error(sql +"执行异常"); } long last = System.currentTimeMillis(); - LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); + LOG.warn(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); return cnt; } private String getQuerySql(Long cnt,int threadNumber, String table){ long sepNum = cnt / ApplicationConfig.THREAD_POOL_NUMBER() + 1; long offsetNum = threadNumber * sepNum; + if (sepNum >= ApplicationConfig.ARANGODB_READ_LIMIT() * 10000){ + sepNum = ApplicationConfig.ARANGODB_READ_LIMIT() * 10000; + } return "FOR doc IN " + table + " limit "+offsetNum+","+sepNum+" RETURN doc"; } - private long[] getTimeRange(String table) { - long minTime = 0L; - long maxTime = 0L; - long startTime = System.currentTimeMillis(); - String sql = "LET doc = (FOR doc IN " + table + " RETURN doc) return {max_time:MAX(doc[*].FIRST_FOUND_TIME),min_time:MIN(doc[*].FIRST_FOUND_TIME)}"; - switch (ApplicationConfig.ARANGO_TIME_LIMIT_TYPE()) { - case 0: - ArangoCursor timeDoc = arangoDBConnect.executorQuery(sql, BaseDocument.class); - try { - if (timeDoc != null) { - while (timeDoc.hasNext()) { - BaseDocument doc = timeDoc.next(); - maxTime = Long.parseLong(doc.getAttribute("max_time").toString()) + ApplicationConfig.THREAD_POOL_NUMBER(); - minTime = Long.parseLong(doc.getAttribute("min_time").toString()); - } - } else { - LOG.warn("获取ArangoDb时间范围为空"); - } - } catch (Exception e) { - e.printStackTrace(); - } - break; - case 1: - maxTime = ApplicationConfig.READ_ARANGO_MAX_TIME(); - minTime = ApplicationConfig.READ_ARANGO_MIN_TIME(); - break; - default: - } - long lastTime = System.currentTimeMillis(); - LOG.warn(sql + "\n查询最大最小时间用时:" + (lastTime - startTime)); - return new long[]{minTime, maxTime}; - - } - - private String getQuerySql(long[] timeRange, int threadNumber, String table) { - long minTime = timeRange[0]; - long maxTime = timeRange[1]; - long diffTime = (maxTime - minTime) / ApplicationConfig.THREAD_POOL_NUMBER(); - long maxThreadTime = minTime + (threadNumber + 1) * diffTime; - long minThreadTime = minTime + threadNumber * diffTime; - return "FOR doc IN " + table + " filter doc.FIRST_FOUND_TIME >= " + minThreadTime + " and doc.FIRST_FOUND_TIME <= " + maxThreadTime + " " + ApplicationConfig.ARANGODB_READ_LIMIT() + " RETURN doc"; - } - } diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 1ca66d7..9b1f6c2 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -22,9 +22,9 @@ import java.util.concurrent.CountDownLatch; public class ReadHistoryArangoData extends Thread { public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60; private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); - static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR(); + private static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR(); - public static final HashSet PROTOCOL_SET; + private static final HashSet PROTOCOL_SET; static { PROTOCOL_SET = new HashSet<>(); @@ -69,9 +69,6 @@ public class ReadHistoryArangoData extends Thread { updateProtocolDocument(doc); deleteDistinctClientIpByTime(doc); break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; default: } int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER(); diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 0010b23..77006b8 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -1,5 +1,5 @@ #spark任务配置 -spark.sql.shuffle.partitions=5 +spark.sql.shuffle.partitions=10 spark.executor.memory=4g spark.app.name=test spark.network.timeout=300s @@ -7,13 +7,15 @@ repartitionNumber=36 spark.serializer=org.apache.spark.serializer.KryoSerializer master=local[*] #spark读取clickhouse配置 -spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 +spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.12:8123/tsg_galaxy_v3 +#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver spark.read.clickhouse.user=default -spark.read.clickhouse.password=111111 -spark.read.clickhouse.numPartitions=144 +spark.read.clickhouse.password=ceiec2019 +#spark.read.clickhouse.password=111111 +spark.read.clickhouse.numPartitions=5 spark.read.clickhouse.fetchsize=10000 -spark.read.clickhouse.partitionColumn=common_recv_time +spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME clickhouse.socket.timeout=300000 #arangoDB配置 arangoDB.host=192.168.40.182 @@ -22,24 +24,20 @@ arangoDB.user=upsert arangoDB.password=ceiec2018 #arangoDB.DB.name=insert_iplearn_index arangoDB.DB.name=ip-learning-test-0 +#arangoDB.DB.name=ip-learning-test arangoDB.ttl=3600 -thread.pool.number=5 +thread.pool.number=10 #读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围 clickhouse.time.limit.type=0 -read.clickhouse.max.time=1571245220 -read.clickhouse.min.time=1571245210 +read.clickhouse.max.time=1600246160 +read.clickhouse.min.time=1597197469 -#读取arangoDB时间范围方式,0:正常读;1:指定时间范围 -arango.time.limit.type=0 -read.arango.max.time=1571245320 -read.arango.min.time=1571245200 - -arangoDB.read.limit= +arangoDB.read.sepNum=10 update.arango.batch=10000 distinct.client.ip.num=10000 recent.count.hour=24 -update.interval=10800 +update.interval=3600 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala index 395ea6b..da926ff 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala @@ -36,12 +36,7 @@ object ApplicationConfig { val READ_CLICKHOUSE_MAX_TIME: Long = config.getLong("read.clickhouse.max.time") val READ_CLICKHOUSE_MIN_TIME: Long = config.getLong("read.clickhouse.min.time") - val ARANGO_TIME_LIMIT_TYPE: Int = config.getInt("arango.time.limit.type") - - val READ_ARANGO_MAX_TIME: Long = config.getLong("read.arango.max.time") - val READ_ARANGO_MIN_TIME: Long = config.getLong("read.arango.min.time") - - val ARANGODB_READ_LIMIT: String = config.getString("arangoDB.read.limit") + val ARANGODB_READ_LIMIT: Long = config.getLong("arangoDB.read.sepNum") val UPDATE_ARANGO_BATCH: Int = config.getInt("update.arango.batch") val RECENT_COUNT_HOUR: Int = config.getInt("recent.count.hour") val DISTINCT_CLIENT_IP_NUM: Int = config.getInt("distinct.client.ip.num") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 48bbd9a..e37b959 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -11,7 +11,7 @@ object BaseClickhouseData { val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 private val timeLimit: (Long, Long) = getTimeLimit - private def initClickhouseData(sql:String): Unit ={ + private def initClickhouseData(sql:String): DataFrame ={ val dataFrame: DataFrame = spark.read.format("jdbc") .option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL) @@ -28,6 +28,8 @@ object BaseClickhouseData { .load() dataFrame.printSchema() dataFrame.createOrReplaceGlobalTempView("dbtable") + + dataFrame } def loadConnectionDataFromCk(): Unit ={ @@ -146,6 +148,7 @@ object BaseClickhouseData { vertexIpDf } + /* def getRelationFqdnLocateIpDf: DataFrame ={ loadConnectionDataFromCk() val sslSql = @@ -190,6 +193,92 @@ object BaseClickhouseData { relationFqdnLocateIpDf.printSchema() relationFqdnLocateIpDf } + */ + + def getRelationFqdnLocateIpDf: DataFrame ={ + val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1 + val sql = + s""" + |(SELECT * FROM + |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) + |UNION ALL + |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) + |WHERE FQDN != '') as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getRelationSubidLocateIpDf: DataFrame ={ + val where = + s""" + | common_recv_time >= ${timeLimit._2} + | AND common_recv_time < ${timeLimit._1} + | AND common_subscriber_id != '' + | AND radius_framed_ip != '' + """.stripMargin + val sql = + s""" + |( + |SELECT common_subscriber_id,radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME + |FROM radius_record_log + |WHERE $where GROUP BY common_subscriber_id,radius_framed_ip + |) as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getVertexSubidDf: DataFrame ={ + val where = + s""" + | common_recv_time >= ${timeLimit._2} + | AND common_recv_time < ${timeLimit._1} + | AND common_subscriber_id != '' + | AND radius_framed_ip != '' + """.stripMargin + val sql = + s""" + |( + |SELECT common_subscriber_id,MAX(common_recv_time) as LAST_FOUND_TIME,MIN(common_recv_time) as FIRST_FOUND_TIME FROM radius_record_log + |WHERE $where GROUP BY common_subscriber_id + |)as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + def getVertexFramedIpDf: DataFrame ={ + val where = + s""" + | common_recv_time >= ${timeLimit._2} + | AND common_recv_time < ${timeLimit._1} + | AND common_subscriber_id != '' + | AND radius_framed_ip != '' + """.stripMargin + val sql = + s""" + |( + |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where + |)as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } private def getTimeLimit: (Long,Long) ={ var maxTime = 0L diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 460caed..7ae7f30 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -20,6 +20,17 @@ object MergeDataFrame { .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values } + def mergeVertexFrameIp: RDD[Row] ={ + val values = BaseClickhouseData.getVertexFramedIpDf + .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS) + .rdd.map(row => { + val ip = row.getAs[String]("radius_framed_ip") + (ip, row) + }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values + LOG.warn(s"读取R_LOCATE_SUBSCRIBER2IP clickhouse成功,共:${values.count()} 条") + values + } + def mergeVertexIp(): RDD[Row]={ val vertexIpDf = BaseClickhouseData.getVertexIpDf val frame = vertexIpDf.groupBy("IP").agg( @@ -35,7 +46,9 @@ object MergeDataFrame { } def mergeRelationFqdnLocateIp(): RDD[Row] ={ - val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN"))) + val frame = BaseClickhouseData.getRelationFqdnLocateIpDf + .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS) + .filter(row => isDomain(row.getAs[String]("FQDN"))) .groupBy("FQDN", "common_server_ip") .agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), @@ -44,28 +57,50 @@ object MergeDataFrame { collect_list("schema_type").alias("schema_type_list"), collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") ) - frame.rdd.map(row => { + val values = frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") val serverIp = row.getAs[String]("common_server_ip") - val key = fqdn.concat("-"+serverIp) - (key,row) + val key = fqdn.concat("-" + serverIp) + (key, row) }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values + LOG.warn(s"读取R_LOCATE_FQDN2IP clickhouse成功,共:${values.count()} 条") + values } + def mergeRelationSubidLocateIp(): RDD[Row] ={ + val values = BaseClickhouseData.getRelationSubidLocateIpDf + .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS) + .rdd.map(row => { + val commonSubscriberId = row.getAs[String]("common_subscriber_id") + val ip = row.getAs[String]("radius_framed_ip") + val key = commonSubscriberId.concat("-" + ip) + (key, row) + }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values + LOG.warn(s"读取R_LOCATE_SUBSCRIBER2IP clickhouse成功,共:${values.count()} 条") + values + } + + def mergeVertexSubid(): RDD[Row] ={ + val values = BaseClickhouseData.getVertexSubidDf + .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS) + .rdd.map(row => { + val commonSubscriberId = row.getAs[String]("common_subscriber_id") + (commonSubscriberId, row) + }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values + LOG.warn(s"读取SUBSCRIBER clickhouse成功,共:${values.count()} 条") + values + } + private def isDomain(fqdn: String): Boolean = { try { if (fqdn == null || fqdn.length == 0) { return false } - if (fqdn.contains(":")) { - val s = fqdn.split(":")(0) - if (s.contains(":")){ - return false - } - } - val fqdnArr = fqdn.split("\\.") - if (fqdnArr.length < 4 || fqdnArr.length > 4){ + + val fqdnArr = fqdn.split(":")(0).split("\\.") + + if (fqdnArr.length != 4){ return true } for (f <- fqdnArr) { diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index bdf8120..ddaf145 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -93,8 +93,13 @@ object UpdateDocHandler { doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",","")) } - def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={ - distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray + def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={ + distCipRecent.flatMap(str => { + str.replaceAll("\\[","") + .replaceAll("\\]","") + .replaceAll("\\'","") + .split(",") + }).distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray } def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b7d4875..b3719fb 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -5,7 +5,6 @@ import java.util.concurrent.ConcurrentHashMap import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.BaseArangoData -import cn.ac.iie.dao.BaseArangoData._ import cn.ac.iie.service.transform.MergeDataFrame._ import cn.ac.iie.service.update.UpdateDocHandler._ import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil} @@ -26,9 +25,12 @@ object UpdateDocument { def update(): Unit = { try { - updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn) - updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp) - updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) +// updateDocument("FQDN", getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn) +// updateDocument("IP", getVertexIpRow, classOf[BaseDocument], mergeVertexIp) + updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) + updateDocument("SUBSCRIBER",getVertexSubidRow,classOf[BaseDocument],mergeVertexSubid) + insertFrameIp() + updateDocument("R_LOCATE_SUBSCRIBER2IP",getRelationSubidLocateIpRow,classOf[BaseEdgeDocument],mergeRelationSubidLocateIp) } catch { case e: Exception => e.printStackTrace() } finally { @@ -38,13 +40,33 @@ object UpdateDocument { } } + private def insertFrameIp(): Unit ={ + mergeVertexFrameIp.foreachPartition(iter => { + val resultDocumentList = new util.ArrayList[BaseDocument] + var i = 0 + iter.foreach(row => { + val document = getVertexFrameipRow(row) + resultDocumentList.add(document) + i += 1 + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { + arangoManger.overwrite(resultDocumentList, "IP") + LOG.warn(s"更新:IP" + i) + i = 0 + } + }) + if (i != 0) { + arangoManger.overwrite(resultDocumentList, "IP") + LOG.warn(s"更新IP:" + i) + } + }) + } + private def updateDocument[T <: BaseDocument](collName: String, - historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T, clazz: Class[T], getNewDataRdd: () => RDD[Row] ): Unit = { - baseArangoData.readHistoryData(collName, historyMap, clazz) + val historyMap = baseArangoData.readHistoryData(collName, clazz) val hisBc = spark.sparkContext.broadcast(historyMap) try { val start = System.currentTimeMillis() @@ -95,6 +117,56 @@ object UpdateDocument { document } + private def getRelationSubidLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument ={ + val subId = row.getAs[String]("common_subscriber_id") + val ip = row.getAs[String]("radius_framed_ip") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + + val key = subId.concat("-"+ip) + var document = dictionaryMap.getOrDefault(key,null) + if (document != null){ + updateMaxAttribute(document,lastFoundTime,"LAST_FOUND_TIME") + } else { + document = new BaseEdgeDocument() + document.setKey(key) + document.setFrom("SUBSCRIBER/" + subId) + document.setTo("IP/" + ip) + document.addAttribute("SUBSCRIBER",subId) + document.addAttribute("IP",ip) + document.addAttribute("FIRST_FOUND_TIME",firstFoundTime) + document.addAttribute("LAST_FOUND_TIME",lastFoundTime) + } + + document + } + + private def getVertexFrameipRow(row: Row): BaseDocument ={ + val ip = row.getAs[String]("radius_framed_ip") + val document = new BaseDocument() + document.setKey(ip) + document.addAttribute("IP",ip) + document + } + + private def getVertexSubidRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument ={ + val subId = row.getAs[String]("common_subscriber_id") + val subLastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val subFirstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + var document = dictionaryMap.getOrDefault(subId,null) + if (document != null){ + updateMaxAttribute(document,subLastFoundTime,"LAST_FOUND_TIME") + } else { + document = new BaseDocument() + document.setKey(subId) + document.addAttribute("SUBSCRIBER",subId) + document.addAttribute("FIRST_FOUND_TIME",subFirstFoundTime) + document.addAttribute("LAST_FOUND_TIME",subLastFoundTime) + } + + document + } + private def getVertexIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = { val ip = row.getAs[String]("IP") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") @@ -133,7 +205,7 @@ object UpdateDocument { val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") + val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT") val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala index 67590ff..76bf35f 100644 --- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala +++ b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala @@ -1,35 +1,35 @@ -package cn.ac.iie.service.update - -import java.util -import java.util.ArrayList -import java.util.concurrent.ConcurrentHashMap - -import cn.ac.iie.dao.BaseArangoData -import cn.ac.iie.dao.BaseArangoData._ -import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} - -import scala.collection.mutable.WrappedArray.ofRef - -object UpdateDocumentTest { - def main(args: Array[String]): Unit = { - val baseArangoData = new BaseArangoData() - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) - - val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys() - while (value.hasMoreElements) { - val integer: Integer = value.nextElement() - val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer) - val unit = map.keys() - while (unit.hasMoreElements) { - val key = unit.nextElement() - val edgeDocument = map.get(key) - // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]] - // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]] - val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]] - val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]] - println(longs.toString + "---" + strings.toString) - } - } - } - -} +//package cn.ac.iie.service.update +// +//import java.util +//import java.util.ArrayList +//import java.util.concurrent.ConcurrentHashMap +// +//import cn.ac.iie.dao.BaseArangoData +//import cn.ac.iie.dao.BaseArangoData._ +//import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} +// +//import scala.collection.mutable.WrappedArray.ofRef +// +//object UpdateDocumentTest { +// def main(args: Array[String]): Unit = { +// val baseArangoData = new BaseArangoData() +// baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) +// +// val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys() +// while (value.hasMoreElements) { +// val integer: Integer = value.nextElement() +// val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer) +// val unit = map.keys() +// while (unit.hasMoreElements) { +// val key = unit.nextElement() +// val edgeDocument = map.get(key) +// // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]] +// // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]] +// val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]] +// val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]] +// println(longs.toString + "---" + strings.toString) +// } +// } +// } +// +//}