diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java index 2bb9879..f4cb662 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/dao/BaseArangoData.java @@ -33,9 +33,8 @@ public class BaseArangoData { private ExecutorThreadPool threadPool = ExecutorThreadPool.getInstance(); - public void readHistoryData(String table, - ConcurrentHashMap> historyMap, - Class type) { + public ConcurrentHashMap> readHistoryData(String table, Class type) { + ConcurrentHashMap> historyMap = new ConcurrentHashMap<>(); try { LOG.warn("开始更新" + table); long start = System.currentTimeMillis(); @@ -57,6 +56,7 @@ public class BaseArangoData { } catch (Exception e) { e.printStackTrace(); } + return historyMap; } private Long getCountTotal(String table){ @@ -72,7 +72,7 @@ public class BaseArangoData { LOG.error(sql +"执行异常"); } long last = System.currentTimeMillis(); - LOG.info(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); + LOG.warn(sql+" 结果:"+cnt+" 执行时间:"+(last-start)); return cnt; } diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 2fdd582..ada64da 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -58,6 +58,7 @@ public class ReadHistoryArangoData extends Thread { public void run() { try { long s = System.currentTimeMillis(); + LOG.warn(query+" \n 开始查询"); ArangoCursor docs = arangoConnect.executorQuery(query, type); if (docs != null) { List baseDocuments = docs.asListRemaining(); @@ -67,11 +68,11 @@ public class ReadHistoryArangoData extends Thread { switch (table) { case "R_LOCATE_FQDN2IP": updateProtocolDocument(doc); -// deleteDistinctClientIpByTime(doc); - break; - case "R_VISIT_IP2FQDN": - updateProtocolDocument(doc); + deleteDistinctClientIpByTime(doc); break; +// case "R_VISIT_IP2FQDN": +// updateProtocolDocument(doc); +// break; default: } int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER(); diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 3dcda7e..3b1c633 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -7,13 +7,16 @@ repartitionNumber=36 spark.serializer=org.apache.spark.serializer.KryoSerializer master=local[*] #spark读取clickhouse配置 -spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 +#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 +spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.12:8123/tsg_galaxy_v3 spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver spark.read.clickhouse.user=default -spark.read.clickhouse.password=111111 +#spark.read.clickhouse.password=111111 +spark.read.clickhouse.password=ceiec2019 spark.read.clickhouse.numPartitions=144 spark.read.clickhouse.fetchsize=10000 -spark.read.clickhouse.partitionColumn=common_end_time +#spark.read.clickhouse.partitionColumn=common_end_time +spark.read.clickhouse.partitionColumn=FIRST_FOUND_TIME clickhouse.socket.timeout=300000 #arangoDB配置 arangoDB.host=192.168.40.182 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 310eaa4..ac5221f 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -11,7 +11,7 @@ object BaseClickhouseData { val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 private val timeLimit: (Long, Long) = getTimeLimit - private def initClickhouseData(sql:String): Unit ={ + private def initClickhouseData(sql:String): DataFrame ={ val dataFrame: DataFrame = spark.read.format("jdbc") .option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL) @@ -28,6 +28,7 @@ object BaseClickhouseData { .load() dataFrame.printSchema() dataFrame.createOrReplaceGlobalTempView("dbtable") + dataFrame } def loadConnectionDataFromCk(): Unit ={ @@ -45,6 +46,28 @@ object BaseClickhouseData { initClickhouseData(sql) } + def getRelationFqdnLocateIpDf(): DataFrame ={ + val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1 + " and common_schema_type != 'BASE'" + val sql = + s""" + |(SELECT * FROM + |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) + |UNION ALL + |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) + |WHERE FQDN != '') as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + private def loadRadiusDataFromCk(): Unit ={ val where = s""" @@ -146,6 +169,7 @@ object BaseClickhouseData { vertexIpDf } + /* def getRelationFqdnLocateIpDf: DataFrame ={ loadConnectionDataFromCk() val sslSql = @@ -188,6 +212,7 @@ object BaseClickhouseData { relationFqdnLocateIpDf.printSchema() relationFqdnLocateIpDf } + */ private def getTimeLimit: (Long,Long) ={ var maxTime = 0L diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index f94195e..ac1f281 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -35,13 +35,14 @@ object MergeDataFrame { } def mergeRelationFqdnLocateIp(): RDD[Row] ={ - val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN"))) + val frame = BaseClickhouseData.getRelationFqdnLocateIpDf().filter(row => isDomain(row.getAs[String]("FQDN"))) .groupBy("FQDN", "common_server_ip") .agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"), - collect_list("schema_type").alias("schema_type_list") + collect_list("schema_type").alias("schema_type_list"), + collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") ) frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") @@ -57,13 +58,8 @@ object MergeDataFrame { if (fqdn == null || fqdn.length == 0) { return false } - if (fqdn.contains(":")) { - val s = fqdn.split(":")(0) - if (s.contains(":")){ - return false - } - } - val fqdnArr = fqdn.split("\\.") + val domain = fqdn.split(":")(0) + val fqdnArr = domain.split("\\.") if (fqdnArr.length < 4 || fqdnArr.length > 4){ return true } @@ -79,7 +75,7 @@ object MergeDataFrame { } } catch { case e: Exception => - LOG.error("解析域名 " + fqdn + " 失败:\n" + e.toString) + LOG.warn("解析域名 " + fqdn + " 失败:\n" + e.toString) } false } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index bdf8120..474a691 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -93,8 +93,14 @@ object UpdateDocHandler { doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",","")) } - def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={ - distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray + def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={ + distCipRecent.flatMap(str => { + str.replaceAll("\\[", "") + .replaceAll("\\]", "") + .replaceAll("'", "") + .split(",") + }).distinct.toArray +// distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray } def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index d3e4af5..2a5e12c 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -26,9 +26,7 @@ object UpdateDocument { def update(): Unit = { try { -// updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn) -// updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp) - updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) + updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) } catch { case e: Exception => e.printStackTrace() } finally { @@ -39,13 +37,13 @@ object UpdateDocument { } private def updateDocument[T <: BaseDocument](collName: String, - historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T, clazz: Class[T], getNewDataRdd: () => RDD[Row] ): Unit = { - baseArangoData.readHistoryData(collName, historyMap, clazz) + val historyMap = baseArangoData.readHistoryData(collName, clazz) val hisBc = spark.sparkContext.broadcast(historyMap) + LOG.warn("广播变量发送完毕") try { val start = System.currentTimeMillis() val newDataRdd = getNewDataRdd() @@ -134,6 +132,9 @@ object UpdateDocument { val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") + val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT") + val disCips = mergeDistinctIp(distCipRecent) + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val key = fqdn.concat("-" + serverIp) @@ -141,6 +142,7 @@ object UpdateDocument { if (document != null) { updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") updateProtocolAttritube(document, sepAttritubeMap) + updateDistinctIp(document,disCips) } else { document = new BaseEdgeDocument() document.setKey(key) @@ -148,6 +150,7 @@ object UpdateDocument { document.setTo("IP/" + serverIp) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime) + putDistinctIp(document,disCips) putProtocolAttritube(document, sepAttritubeMap) } document diff --git a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala b/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala deleted file mode 100644 index 67590ff..0000000 --- a/ip-learning-spark/src/test/scala/cn/ac/iie/service/update/UpdateDocumentTest.scala +++ /dev/null @@ -1,35 +0,0 @@ -package cn.ac.iie.service.update - -import java.util -import java.util.ArrayList -import java.util.concurrent.ConcurrentHashMap - -import cn.ac.iie.dao.BaseArangoData -import cn.ac.iie.dao.BaseArangoData._ -import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} - -import scala.collection.mutable.WrappedArray.ofRef - -object UpdateDocumentTest { - def main(args: Array[String]): Unit = { - val baseArangoData = new BaseArangoData() - baseArangoData.readHistoryData("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, classOf[BaseEdgeDocument]) - - val value = BaseArangoData.historyRelationFqdnAddressIpMap.keys() - while (value.hasMoreElements) { - val integer: Integer = value.nextElement() - val map: ConcurrentHashMap[String, BaseEdgeDocument] = historyRelationFqdnAddressIpMap.get(integer) - val unit = map.keys() - while (unit.hasMoreElements) { - val key = unit.nextElement() - val edgeDocument = map.get(key) - // val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[util.ArrayList[Long]] - // val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[util.ArrayList[String]] - val strings = edgeDocument.getAttribute("DIST_CIP").asInstanceOf[Array[String]] - val longs = edgeDocument.getAttribute("DNS_CNT_RECENT").asInstanceOf[Array[java.lang.Long]] - println(longs.toString + "---" + strings.toString) - } - } - } - -}