diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java deleted file mode 100644 index 1ca66d7..0000000 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ /dev/null @@ -1,125 +0,0 @@ -package cn.ac.iie.service.read; - -import cn.ac.iie.config.ApplicationConfig; -import cn.ac.iie.utils.ArangoDBConnect; -import com.arangodb.ArangoCursor; -import com.arangodb.entity.BaseDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - - -/** - * @author wlh - * 多线程全量读取arangoDb历史数据,封装到map - */ -public class ReadHistoryArangoData extends Thread { - public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60; - private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); - static final Integer RECENT_COUNT_HOUR = ApplicationConfig.RECENT_COUNT_HOUR(); - - public static final HashSet PROTOCOL_SET; - - static { - PROTOCOL_SET = new HashSet<>(); - PROTOCOL_SET.add("HTTP"); - PROTOCOL_SET.add("TLS"); - PROTOCOL_SET.add("DNS"); - } - - private ArangoDBConnect arangoConnect; - private String query; - private ConcurrentHashMap> map; - private Class type; - private String table; - private CountDownLatch countDownLatch; - - public ReadHistoryArangoData(ArangoDBConnect arangoConnect, - String query, - ConcurrentHashMap> map, - Class type, - String table, - CountDownLatch countDownLatch) { - this.arangoConnect = arangoConnect; - this.query = query; - this.map = map; - this.type = type; - this.table = table; - this.countDownLatch = countDownLatch; - } - - @Override - public void run() { - try { - long s = System.currentTimeMillis(); - ArangoCursor docs = arangoConnect.executorQuery(query, type); - if (docs != null) { - List baseDocuments = docs.asListRemaining(); - int i = 0; - for (T doc : baseDocuments) { - String key = doc.getKey(); - switch (table) { - case "R_LOCATE_FQDN2IP": - updateProtocolDocument(doc); - deleteDistinctClientIpByTime(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); - break; - default: - } - int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER(); - ConcurrentHashMap tmpMap = map.get(hashCode); - tmpMap.put(key, doc); - i++; - } - long l = System.currentTimeMillis(); - LOG.warn(query + "\n读取" + i + "条数据,运行时间:" + (l - s)); - } - }catch (Exception e){ - e.printStackTrace(); - }finally { - countDownLatch.countDown(); - LOG.warn("本线程读取完毕,剩余线程数量:"+countDownLatch.getCount()); - } - } - - private void updateProtocolDocument(T doc) { - if (doc.getProperties().containsKey("PROTOCOL_TYPE")) { - for (String protocol : PROTOCOL_SET) { - String protocolRecent = protocol + "_CNT_RECENT"; - ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); - Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; - System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); - cntRecentsDst[0] = 0L; - doc.addAttribute(protocolRecent, cntRecentsDst); - } - } - } - - private void deleteDistinctClientIpByTime(T doc) { - ArrayList distCip = (ArrayList) doc.getAttribute("DIST_CIP"); - ArrayList distCipTs = (ArrayList) doc.getAttribute("DIST_CIP_TS"); - distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600); - Collections.sort(distCipTs); - int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600); - String[] distCipArr = new String[index]; - long[] disCipTsArr = new long[index]; - if (distCip.size() + 1 == distCipTs.size()){ - for (int i = 0; i < index; i++) { - distCipArr[i] = distCip.get(i); - disCipTsArr[i] = distCipTs.get(i); - } - } - doc.updateAttribute("DIST_CIP", distCipArr); - doc.updateAttribute("DIST_CIP_TS", disCipTsArr); - } - -} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index e70ffea..c33ee14 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -33,20 +33,21 @@ object BaseClickhouseData { } def getVertexFqdnDf: DataFrame = { + val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1 val sql = - """ + s""" |(SELECT | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME |FROM | ((SELECT | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME | FROM connection_record_log - | WHERE common_schema_type = 'SSL' GROUP BY ssl_sni + | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni | )UNION ALL | (SELECT | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME | FROM connection_record_log - | WHERE common_schema_type = 'HTTP' GROUP BY http_host)) + | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host)) |GROUP BY FQDN HAVING FQDN != '') as dbtable """.stripMargin LOG.warn(sql) @@ -164,7 +165,8 @@ object BaseClickhouseData { val sql = s""" |( - |SELECT DISTINCT radius_framed_ip,common_recv_time as LAST_FOUND_TIME FROM radius_record_log WHERE $where + |SELECT radius_framed_ip,MAX(common_recv_time) as LAST_FOUND_TIME FROM radius_record_log WHERE $where + |GROUP BY radius_framed_ip |)as dbtable """.stripMargin LOG.warn(sql) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 309c1a7..b936697 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -18,9 +18,9 @@ object MergeDataFrame { def mergeVertexFqdn(): RDD[(String, (Option[BaseDocument], Row))] = { val fqdnRddRow: RDD[(String, Row)] = BaseClickhouseData.getVertexFqdnDf - .rdd.filter(row => isDomain(row.getAs[String](0))).map(row => { + .repartition().rdd.filter(row => isDomain(row.getAs[String](0))).map(row => { (row.getAs[String]("FQDN"), row) - }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)) + })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ val fqdnRddDoc: ArangoRdd[BaseDocument] = BaseArangoData.loadArangoRdd[BaseDocument]("FQDN") @@ -29,7 +29,7 @@ object MergeDataFrame { def mergeVertexIp(): RDD[(String, (Option[BaseDocument], Row))] = { val vertexIpDf = BaseClickhouseData.getVertexIpDf - val frame = vertexIpDf.groupBy("IP").agg( + val frame = vertexIpDf.repartition().groupBy("IP").agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"), @@ -39,14 +39,15 @@ object MergeDataFrame { ) val ipRddRow = frame.rdd.map(row => { (row.getAs[String]("IP"), row) - }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)) + })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ val ipRddDoc = BaseArangoData.loadArangoRdd[BaseDocument]("IP") ipRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(ipRddRow) } def mergeRelationFqdnLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Row))] = { - val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN"))) + val frame = BaseClickhouseData.getRelationFqdnLocateIpDf + .repartition().filter(row => isDomain(row.getAs[String]("FQDN"))) .groupBy("FQDN", "common_server_ip") .agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), @@ -60,7 +61,7 @@ object MergeDataFrame { val serverIp = row.getAs[String]("common_server_ip") val key = fqdn.concat("-" + serverIp) (key, row) - }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)) + })/*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ val fqdnLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_FQDN2IP") fqdnLocIpRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(fqdnLocIpRddRow) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index 06d731a..e91ef03 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -2,10 +2,10 @@ package cn.ac.iie.service.update import java.util -import scala.collection.JavaConversions._ +import scala.collection.JavaConversions._ import cn.ac.iie.config.ApplicationConfig -import cn.ac.iie.service.read.ReadHistoryArangoData +import cn.ac.iie.dao.BaseClickhouseData import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} import scala.collection.mutable @@ -132,7 +132,7 @@ object UpdateDocHandler { def putDistinctIp(doc: BaseEdgeDocument, newDistinctIp: Array[String]): Unit = { val map = newDistinctIp.map(ip => { - (ip, ReadHistoryArangoData.currentHour) + (ip, BaseClickhouseData.currentHour) }).toMap doc.addAttribute("DIST_CIP", map.keys.toArray) doc.addAttribute("DIST_CIP_TS", map.values.toArray) @@ -146,7 +146,7 @@ object UpdateDocHandler { val distCipToTsMap: Map[String, Long] = hisDistCip.zip(hisDistCipTs).toMap val muDistCipToTsMap: mutable.Map[String, Long] = mutable.Map(distCipToTsMap.toSeq: _*) newDistinctIp.foreach(cip => { - muDistCipToTsMap.put(cip, ReadHistoryArangoData.currentHour) + muDistCipToTsMap.put(cip, BaseClickhouseData.currentHour) }) val resultMap = muDistCipToTsMap.toList.sortBy(-_._2).take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toMap hisDoc.addAttribute("DIST_CIP", resultMap.keys.toArray) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index dba2b98..febfe4d 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -21,11 +21,11 @@ object UpdateDocument { try { updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) - updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) +// updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) - insertFrameIp() +// insertFrameIp() - updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) +// updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala index ab77299..7132c19 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala @@ -1,11 +1,15 @@ package cn.ac.iie.spark.rdd +import java.util + import scala.collection.JavaConverters.asScalaIteratorConverter import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.service.update.UpdateDocument import cn.ac.iie.spark import cn.ac.iie.spark.partition.QueryArangoPartition import com.arangodb.ArangoCursor +import com.arangodb.model.AqlQueryOptions +import com.arangodb.util.MapBuilder import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.slf4j.LoggerFactory @@ -38,13 +42,15 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext, var arangoCursor:ArangoCursor[T] = null val arangoDB = spark.createArangoBuilder(options).build() + val bindVars: util.Map[String, AnyRef] = new MapBuilder().get + val queryOptions: AqlQueryOptions = new AqlQueryOptions().ttl(ApplicationConfig.ARANGODB_TTL) try { val offset = split.offset val separate = split.separate val collection = options.collection val sql = s"FOR doc IN $collection limit $offset,$separate RETURN doc" LOG.info(sql) - arangoCursor = arangoDB.db(options.database).query(sql,clazz.runtimeClass.asInstanceOf[Class[T]]) + arangoCursor = arangoDB.db(options.database).query(sql,bindVars,queryOptions,clazz.runtimeClass.asInstanceOf[Class[T]]) }catch { case e: Exception => LOG.error(s"创建Cursor异常:${e.getMessage}") }finally {