diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java index cff990b..b6df1fe 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java @@ -7,10 +7,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Properties; public class ClickhouseConnect { private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class); diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 544f01d..eec7e44 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -13,7 +13,7 @@ spark.read.clickhouse.user=default spark.read.clickhouse.password=111111 spark.read.clickhouse.numPartitions=10 spark.read.clickhouse.fetchsize=10000 -spark.read.clickhouse.partitionColumn=common_recv_time +spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME spark.write.clickhouse.url=jdbc:clickhouse://192.168.40.194:8123/ip_learning?socket_timeout=3600000 spark.write.clickhouse.user=default diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 48bbd9a..326aaab 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -11,7 +11,7 @@ object BaseClickhouseData { val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 private val timeLimit: (Long, Long) = getTimeLimit - private def initClickhouseData(sql:String): Unit ={ + private def initClickhouseData(sql:String): DataFrame ={ val dataFrame: DataFrame = spark.read.format("jdbc") .option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL) @@ -28,6 +28,7 @@ object BaseClickhouseData { .load() dataFrame.printSchema() dataFrame.createOrReplaceGlobalTempView("dbtable") + dataFrame } def loadConnectionDataFromCk(): Unit ={ @@ -146,6 +147,30 @@ object BaseClickhouseData { vertexIpDf } + def getRelationFqdnLocateIpDf(): DataFrame ={ + val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1 + val sql = + s""" + |(SELECT * FROM + |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) + |UNION ALL + |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type + |FROM tsg_galaxy_v3.connection_record_log + |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) + |WHERE FQDN != '') as dbtable + """.stripMargin + LOG.warn(sql) + val frame = initClickhouseData(sql) + frame.printSchema() + frame + } + + + /* def getRelationFqdnLocateIpDf: DataFrame ={ loadConnectionDataFromCk() val sslSql = @@ -190,6 +215,7 @@ object BaseClickhouseData { relationFqdnLocateIpDf.printSchema() relationFqdnLocateIpDf } + */ private def getTimeLimit: (Long,Long) ={ var maxTime = 0L diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index f5b0d42..958508c 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -94,9 +94,23 @@ object UpdateDocHandler { doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",","")) } + /* def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={ distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray } + */ + + def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={ + distCipRecent.flatMap(str => { + str.replaceAll("\\[", "") + .replaceAll("\\]", "") + .replaceAll("'", "") + .split(",") + }).distinct.toArray + // distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray + } + + def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ val map = newDistinctIp.map(ip => { diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 0143204..3c9c258 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -23,7 +23,7 @@ object UpdateDocument { def update(): Unit = { try { - updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) + updateDocument("r_locate_fqdn2ip_local", getRelationFqdnLocIpPstm, mergeRelationFqdnLocateIp) } catch { case e: Exception => e.printStackTrace() } finally { @@ -31,66 +31,69 @@ object UpdateDocument { } } - private def updateDocument[T <: BaseDocument](collName: String, - getDocumentRow: Row => T, - clazz: Class[T], - getNewDataRdd: () => DataFrame - ): Unit = { + private def updateDocument[T <: BaseDocument](tableName: String, + setPstm: (Row, PreparedStatement) => PreparedStatement, + getNewDataRdd: () => DataFrame): Unit = { try { val start = System.currentTimeMillis() val newDataFrame = getNewDataRdd() newDataFrame.foreachPartition(iter => { val connection: DruidPooledConnection = manger.getConnection - val sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local" + - "VALUES(?,?,?,?,?,?,?,?,?)" - val pstm: PreparedStatement = connection.prepareStatement(sql) + val sql = s"INSERT INTO $tableName VALUES(?,?,?,?,?,?,?,?,?)" + var pstm: PreparedStatement = connection.prepareStatement(sql) var i = 0 iter.foreach(row => { - val fqdn = row.getAs[String]("FQDN") - val serverIp = row.getAs[String]("common_server_ip") - val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") - val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) - val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - - pstm.setString(1,fqdn) - pstm.setString(2,serverIp) - pstm.setLong(3,firstFoundTime) - pstm.setLong(4,lastFoundTime) - pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L)) - pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L)) - pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L)) - pstm.setArray(8,new ClickHouseArray(1, distinctIp)) - pstm.setLong(9,currentHour) - + pstm = setPstm(row,pstm) i += 1 pstm.addBatch() - if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { pstm.executeBatch() connection.commit() - LOG.warn("写入clickhouse数据量:" + i) + LOG.warn(s"写入$tableName 数据量:" + i) i = 0 } }) if (i != 0) { pstm.executeBatch connection.commit() - LOG.warn("写入clickhouse数据量:" + i) + LOG.warn(s"写入$tableName 数据量:" + i) } manger.clear(pstm,connection) }) val last = System.currentTimeMillis() - LOG.warn(s"更新$collName 时间:${last - start}") + LOG.warn(s"更新$tableName 时间:${last - start}") } catch { case e: Exception => e.printStackTrace() } } + private def getRelationFqdnLocIpPstm(row: Row,pstm: PreparedStatement): PreparedStatement ={ + val fqdn = row.getAs[String]("FQDN") + val serverIp = row.getAs[String]("common_server_ip") + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") + val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") + + val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT") + val disCips = mergeDistinctIp(distCipRecent) + + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) + + pstm.setString(1,fqdn) + pstm.setString(2,serverIp) + pstm.setLong(3,firstFoundTime) + pstm.setLong(4,lastFoundTime) + pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L)) + pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L)) + pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L)) + pstm.setArray(8,new ClickHouseArray(1, disCips)) + pstm.setLong(9,currentHour) + + pstm + } + private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = { val fqdn = row.getAs[String]("FQDN") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") @@ -139,30 +142,4 @@ object UpdateDocument { document } - private def getRelationFqdnLocateIpRow(row: Row): BaseEdgeDocument = { - val fqdn = row.getAs[String]("FQDN") - val serverIp = row.getAs[String]("common_server_ip") - val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") - val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") - - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) - val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - - val key = fqdn.concat("-" + serverIp) - val document:BaseEdgeDocument = new BaseEdgeDocument() - - document.setKey(key) - document.setFrom("FQDN/" + fqdn) - document.setTo("IP/" + serverIp) - document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) - document.addAttribute("LAST_FOUND_TIME", lastFoundTime) - putProtocolAttritube(document, sepAttritubeMap) - putDistinctIp(document, distinctIp) - - document - } - }