diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 0010b23..b48b95c 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -27,9 +27,9 @@ arangoDB.ttl=3600 thread.pool.number=5 #读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围 -clickhouse.time.limit.type=0 -read.clickhouse.max.time=1571245220 -read.clickhouse.min.time=1571245210 +clickhouse.time.limit.type=1 +read.clickhouse.max.time=1598246519 +read.clickhouse.min.time=1597161600 #读取arangoDB时间范围方式,0:正常读;1:指定时间范围 arango.time.limit.type=0 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 460caed..dd264b0 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -4,16 +4,21 @@ import java.util.regex.Pattern import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.BaseClickhouseData +import cn.ac.iie.service.update.UpdateDocHandler.{mergeDistinctIp, separateAttributeByProtocol} import cn.ac.iie.spark.partition.CustomPartitioner import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.slf4j.LoggerFactory +import scala.collection.mutable.WrappedArray.ofRef + object MergeDataFrame { private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass) private val pattern = Pattern.compile("^[\\d]*$") + private val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 + def mergeVertexFqdn(): RDD[Row] ={ BaseClickhouseData.getVertexFqdnDf .rdd.filter(row => isDomain(row.getAs[String](0))).map(row => (row.get(0),row)) @@ -47,9 +52,17 @@ object MergeDataFrame { frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") val serverIp = row.getAs[String]("common_server_ip") - val key = fqdn.concat("-"+serverIp) - (key,row) - }).partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") + val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") + val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) + val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) + + Row(fqdn,serverIp,firstFoundTime,lastFoundTime, + sepAttritubeMap.get("HTTP"),sepAttritubeMap.get("TLS"),sepAttritubeMap.get("DNS"),distinctIp,currentHour) + }) } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b7d4875..e386fac 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -1,80 +1,85 @@ package cn.ac.iie.service.update -import java.util +import java.util.Properties import java.util.concurrent.ConcurrentHashMap import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.BaseArangoData -import cn.ac.iie.dao.BaseArangoData._ -import cn.ac.iie.service.transform.MergeDataFrame._ import cn.ac.iie.service.update.UpdateDocHandler._ -import cn.ac.iie.utils.{ArangoDBConnect, ExecutorThreadPool, SparkSessionUtil} +import cn.ac.iie.utils.SparkSessionUtil +import cn.ac.iie.service.transform.MergeDataFrame._ import cn.ac.iie.utils.SparkSessionUtil.spark import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} -import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.types._ import org.slf4j.LoggerFactory import scala.collection.mutable.WrappedArray.ofRef object UpdateDocument { - private val pool = ExecutorThreadPool.getInstance - private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance() private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) - private val baseArangoData = new BaseArangoData() def update(): Unit = { try { - updateDocument("FQDN", historyVertexFqdnMap, getVertexFqdnRow, classOf[BaseDocument], mergeVertexFqdn) - updateDocument("IP", historyVertexIpMap, getVertexIpRow, classOf[BaseDocument], mergeVertexIp) - updateDocument("R_LOCATE_FQDN2IP", historyRelationFqdnAddressIpMap, getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) + updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) } catch { case e: Exception => e.printStackTrace() } finally { - pool.shutdown() - arangoManger.clean() SparkSessionUtil.closeSpark() } } private def updateDocument[T <: BaseDocument](collName: String, - historyMap: ConcurrentHashMap[Integer, ConcurrentHashMap[String, T]], - getDocumentRow: (Row, ConcurrentHashMap[String, T]) => T, + getDocumentRow: Row => T, clazz: Class[T], getNewDataRdd: () => RDD[Row] ): Unit = { - baseArangoData.readHistoryData(collName, historyMap, clazz) - val hisBc = spark.sparkContext.broadcast(historyMap) try { val start = System.currentTimeMillis() val newDataRdd = getNewDataRdd() + val schema = StructType(List( + StructField("fqdn",StringType), + StructField("ip",StringType), + StructField("first_found_time",LongType), + StructField("last_found_time",LongType), + StructField("dns_cnt_total",LongType), + StructField("tls_cnt_total",LongType), + StructField("http_cnt_total",LongType), + StructField("dist_cip",ArrayType(StringType)), + StructField("stat_time",LongType) + )) + val frame = spark.createDataFrame(newDataRdd,schema) + /* newDataRdd.foreachPartition(iter => { - val partitionId: Int = TaskContext.get.partitionId - val dictionaryMap: ConcurrentHashMap[String, T] = hisBc.value.get(partitionId) - val resultDocumentList = new util.ArrayList[T] var i = 0 iter.foreach(row => { - val document = getDocumentRow(row, dictionaryMap) - resultDocumentList.add(document) + val document = getDocumentRow(row) i += 1 if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - arangoManger.overwrite(resultDocumentList, collName) LOG.warn(s"更新:$collName" + i) i = 0 } }) if (i != 0) { - arangoManger.overwrite(resultDocumentList, collName) LOG.warn(s"更新$collName:" + i) } }) + */ + frame.write.mode(SaveMode.Append).format("jdbc").options( + Map( + "driver" -> "ru.yandex.clickhouse.ClickHouseDriver", + "url" -> "jdbc:clickhouse://192.168.40.194:8123/ip_learning", + "dbtable" -> "r_locate_fqdn2ip_local", + "user" -> "default", + "password" -> "111111", + "batchsize" -> "10000", + "truncate" -> "true") + ).save() val last = System.currentTimeMillis() LOG.warn(s"更新$collName 时间:${last - start}") } catch { case e: Exception => e.printStackTrace() - } finally { - hisBc.destroy() } } @@ -126,7 +131,7 @@ object UpdateDocument { document } - private def getRelationFqdnLocateIpRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseEdgeDocument]): BaseEdgeDocument = { + private def getRelationFqdnLocateIpRow(row: Row): BaseEdgeDocument = { val fqdn = row.getAs[String]("FQDN") val serverIp = row.getAs[String]("common_server_ip") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") @@ -139,21 +144,16 @@ object UpdateDocument { val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) val key = fqdn.concat("-" + serverIp) - var document = dictionaryMap.getOrDefault(key, null) - if (document != null) { - updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") - updateProtocolAttritube(document, sepAttritubeMap) - updateDistinctIp(document, distinctIp) - } else { - document = new BaseEdgeDocument() - document.setKey(key) - document.setFrom("FQDN/" + fqdn) - document.setTo("IP/" + serverIp) - document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) - document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - putProtocolAttritube(document, sepAttritubeMap) - putDistinctIp(document, distinctIp) - } + val document:BaseEdgeDocument = new BaseEdgeDocument() + + document.setKey(key) + document.setFrom("FQDN/" + fqdn) + document.setTo("IP/" + serverIp) + document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) + document.addAttribute("LAST_FOUND_TIME", lastFoundTime) + putProtocolAttritube(document, sepAttritubeMap) + putDistinctIp(document, distinctIp) + document }