修改为读取clickhouse计算后的结果数据

This commit is contained in:
wanglihui
2020-08-31 15:58:36 +08:00
parent 57fd13d053
commit 60d688f289
5 changed files with 78 additions and 63 deletions

View File

@@ -7,10 +7,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Properties;
public class ClickhouseConnect { public class ClickhouseConnect {
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class); private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class);

View File

@@ -13,7 +13,7 @@ spark.read.clickhouse.user=default
spark.read.clickhouse.password=111111 spark.read.clickhouse.password=111111
spark.read.clickhouse.numPartitions=10 spark.read.clickhouse.numPartitions=10
spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.fetchsize=10000
spark.read.clickhouse.partitionColumn=common_recv_time spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME
spark.write.clickhouse.url=jdbc:clickhouse://192.168.40.194:8123/ip_learning?socket_timeout=3600000 spark.write.clickhouse.url=jdbc:clickhouse://192.168.40.194:8123/ip_learning?socket_timeout=3600000
spark.write.clickhouse.user=default spark.write.clickhouse.user=default

View File

@@ -11,7 +11,7 @@ object BaseClickhouseData {
val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
private val timeLimit: (Long, Long) = getTimeLimit private val timeLimit: (Long, Long) = getTimeLimit
private def initClickhouseData(sql:String): Unit ={ private def initClickhouseData(sql:String): DataFrame ={
val dataFrame: DataFrame = spark.read.format("jdbc") val dataFrame: DataFrame = spark.read.format("jdbc")
.option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL) .option("url", ApplicationConfig.SPARK_READ_CLICKHOUSE_URL)
@@ -28,6 +28,7 @@ object BaseClickhouseData {
.load() .load()
dataFrame.printSchema() dataFrame.printSchema()
dataFrame.createOrReplaceGlobalTempView("dbtable") dataFrame.createOrReplaceGlobalTempView("dbtable")
dataFrame
} }
def loadConnectionDataFromCk(): Unit ={ def loadConnectionDataFromCk(): Unit ={
@@ -146,6 +147,30 @@ object BaseClickhouseData {
vertexIpDf vertexIpDf
} }
def getRelationFqdnLocateIpDf(): DataFrame ={
val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1
val sql =
s"""
|(SELECT * FROM
|((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
|FROM tsg_galaxy_v3.connection_record_log
|WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
|UNION ALL
|(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
|FROM tsg_galaxy_v3.connection_record_log
|WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))
|WHERE FQDN != '') as dbtable
""".stripMargin
LOG.warn(sql)
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
/*
def getRelationFqdnLocateIpDf: DataFrame ={ def getRelationFqdnLocateIpDf: DataFrame ={
loadConnectionDataFromCk() loadConnectionDataFromCk()
val sslSql = val sslSql =
@@ -190,6 +215,7 @@ object BaseClickhouseData {
relationFqdnLocateIpDf.printSchema() relationFqdnLocateIpDf.printSchema()
relationFqdnLocateIpDf relationFqdnLocateIpDf
} }
*/
private def getTimeLimit: (Long,Long) ={ private def getTimeLimit: (Long,Long) ={
var maxTime = 0L var maxTime = 0L

View File

@@ -94,9 +94,23 @@ object UpdateDocHandler {
doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",","")) doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",",""))
} }
/*
def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={ def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={
distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
} }
*/
def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={
distCipRecent.flatMap(str => {
str.replaceAll("\\[", "")
.replaceAll("\\]", "")
.replaceAll("'", "")
.split(",")
}).distinct.toArray
// distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
}
def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={
val map = newDistinctIp.map(ip => { val map = newDistinctIp.map(ip => {

View File

@@ -23,7 +23,7 @@ object UpdateDocument {
def update(): Unit = { def update(): Unit = {
try { try {
updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) updateDocument("r_locate_fqdn2ip_local", getRelationFqdnLocIpPstm, mergeRelationFqdnLocateIp)
} catch { } catch {
case e: Exception => e.printStackTrace() case e: Exception => e.printStackTrace()
} finally { } finally {
@@ -31,66 +31,69 @@ object UpdateDocument {
} }
} }
private def updateDocument[T <: BaseDocument](collName: String, private def updateDocument[T <: BaseDocument](tableName: String,
getDocumentRow: Row => T, setPstm: (Row, PreparedStatement) => PreparedStatement,
clazz: Class[T], getNewDataRdd: () => DataFrame): Unit = {
getNewDataRdd: () => DataFrame
): Unit = {
try { try {
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
val newDataFrame = getNewDataRdd() val newDataFrame = getNewDataRdd()
newDataFrame.foreachPartition(iter => { newDataFrame.foreachPartition(iter => {
val connection: DruidPooledConnection = manger.getConnection val connection: DruidPooledConnection = manger.getConnection
val sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local" + val sql = s"INSERT INTO $tableName VALUES(?,?,?,?,?,?,?,?,?)"
"VALUES(?,?,?,?,?,?,?,?,?)" var pstm: PreparedStatement = connection.prepareStatement(sql)
val pstm: PreparedStatement = connection.prepareStatement(sql)
var i = 0 var i = 0
iter.foreach(row => { iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN") pstm = setPstm(row,pstm)
val serverIp = row.getAs[String]("common_server_ip")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
pstm.setString(1,fqdn)
pstm.setString(2,serverIp)
pstm.setLong(3,firstFoundTime)
pstm.setLong(4,lastFoundTime)
pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L))
pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L))
pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L))
pstm.setArray(8,new ClickHouseArray(1, distinctIp))
pstm.setLong(9,currentHour)
i += 1 i += 1
pstm.addBatch() pstm.addBatch()
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
pstm.executeBatch() pstm.executeBatch()
connection.commit() connection.commit()
LOG.warn("写入clickhouse数据量:" + i) LOG.warn(s"写入$tableName 数据量:" + i)
i = 0 i = 0
} }
}) })
if (i != 0) { if (i != 0) {
pstm.executeBatch pstm.executeBatch
connection.commit() connection.commit()
LOG.warn("写入clickhouse数据量:" + i) LOG.warn(s"写入$tableName 数据量:" + i)
} }
manger.clear(pstm,connection) manger.clear(pstm,connection)
}) })
val last = System.currentTimeMillis() val last = System.currentTimeMillis()
LOG.warn(s"更新$collName 时间:${last - start}") LOG.warn(s"更新$tableName 时间:${last - start}")
} catch { } catch {
case e: Exception => e.printStackTrace() case e: Exception => e.printStackTrace()
} }
} }
private def getRelationFqdnLocIpPstm(row: Row,pstm: PreparedStatement): PreparedStatement ={
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT")
val disCips = mergeDistinctIp(distCipRecent)
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
pstm.setString(1,fqdn)
pstm.setString(2,serverIp)
pstm.setLong(3,firstFoundTime)
pstm.setLong(4,lastFoundTime)
pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L))
pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L))
pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L))
pstm.setArray(8,new ClickHouseArray(1, disCips))
pstm.setLong(9,currentHour)
pstm
}
private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = { private def getVertexFqdnRow(row: Row, dictionaryMap: ConcurrentHashMap[String, BaseDocument]): BaseDocument = {
val fqdn = row.getAs[String]("FQDN") val fqdn = row.getAs[String]("FQDN")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
@@ -139,30 +142,4 @@ object UpdateDocument {
document document
} }
private def getRelationFqdnLocateIpRow(row: Row): BaseEdgeDocument = {
val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
val key = fqdn.concat("-" + serverIp)
val document:BaseEdgeDocument = new BaseEdgeDocument()
document.setKey(key)
document.setFrom("FQDN/" + fqdn)
document.setTo("IP/" + serverIp)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
putProtocolAttritube(document, sepAttritubeMap)
putDistinctIp(document, distinctIp)
document
}
} }