IP Learning report 版本
This commit is contained in:
@@ -27,6 +27,12 @@
|
|||||||
<version>4.4.6</version>
|
<version>4.4.6</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba</groupId>
|
||||||
|
<artifactId>druid</artifactId>
|
||||||
|
<version>1.1.10</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.google.guava</groupId>
|
<groupId>com.google.guava</groupId>
|
||||||
<artifactId>guava</artifactId>
|
<artifactId>guava</artifactId>
|
||||||
@@ -75,6 +81,12 @@
|
|||||||
<version>3.2.0</version>
|
<version>3.2.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.scala-lang.modules</groupId>
|
||||||
|
<artifactId>scala-xml_2.11</artifactId>
|
||||||
|
<version>1.0.4</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.scala-tools</groupId>
|
<groupId>org.scala-tools</groupId>
|
||||||
|
|||||||
@@ -0,0 +1,95 @@
|
|||||||
|
package cn.ac.iie.utils;
|
||||||
|
|
||||||
|
import cn.ac.iie.config.ApplicationConfig;
|
||||||
|
import com.alibaba.druid.pool.DruidDataSource;
|
||||||
|
import com.alibaba.druid.pool.DruidPooledConnection;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public class ClickhouseConnect {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class);
|
||||||
|
private static DruidDataSource dataSource = null;
|
||||||
|
private static ClickhouseConnect dbConnect = null;
|
||||||
|
|
||||||
|
static {
|
||||||
|
getDbConnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void getDbConnect() {
|
||||||
|
try {
|
||||||
|
if (dataSource == null) {
|
||||||
|
dataSource = new DruidDataSource();
|
||||||
|
//设置连接参数
|
||||||
|
dataSource.setUrl(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_URL());
|
||||||
|
dataSource.setDriverClassName(ApplicationConfig.SPARK_READ_CLICKHOUSE_DRIVER());
|
||||||
|
dataSource.setUsername(ApplicationConfig.SPARK_READ_CLICKHOUSE_USER());
|
||||||
|
dataSource.setPassword(ApplicationConfig.SPARK_READ_CLICKHOUSE_PASSWORD());
|
||||||
|
//配置初始化大小、最小、最大
|
||||||
|
dataSource.setInitialSize(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_INITIALSIZE());
|
||||||
|
dataSource.setMinIdle(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_MINIDLE());
|
||||||
|
dataSource.setMaxActive(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_MAXACTIVE());
|
||||||
|
//配置获取连接等待超时的时间
|
||||||
|
dataSource.setMaxWait(30000);
|
||||||
|
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
|
||||||
|
dataSource.setTimeBetweenEvictionRunsMillis(2000);
|
||||||
|
//防止过期
|
||||||
|
dataSource.setValidationQuery("SELECT 1");
|
||||||
|
dataSource.setTestWhileIdle(true);
|
||||||
|
dataSource.setTestOnBorrow(true);
|
||||||
|
dataSource.setKeepAlive(true);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error(e.getMessage());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 数据库连接池单例
|
||||||
|
*
|
||||||
|
* @return dbConnect
|
||||||
|
*/
|
||||||
|
public static synchronized ClickhouseConnect getInstance() {
|
||||||
|
if (null == dbConnect) {
|
||||||
|
dbConnect = new ClickhouseConnect();
|
||||||
|
}
|
||||||
|
return dbConnect;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回druid数据库连接
|
||||||
|
*
|
||||||
|
* @return 连接
|
||||||
|
* @throws SQLException sql异常
|
||||||
|
*/
|
||||||
|
public DruidPooledConnection getConnection() throws SQLException {
|
||||||
|
return dataSource.getConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清空PreparedStatement、Connection对象,未定义的置空。
|
||||||
|
*
|
||||||
|
* @param pstmt PreparedStatement对象
|
||||||
|
* @param connection Connection对象
|
||||||
|
*/
|
||||||
|
public void clear(Statement pstmt, Connection connection) {
|
||||||
|
try {
|
||||||
|
if (pstmt != null) {
|
||||||
|
pstmt.close();
|
||||||
|
}
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
LOG.error(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -11,9 +11,17 @@ spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
|
|||||||
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
|
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
|
||||||
spark.read.clickhouse.user=default
|
spark.read.clickhouse.user=default
|
||||||
spark.read.clickhouse.password=111111
|
spark.read.clickhouse.password=111111
|
||||||
spark.read.clickhouse.numPartitions=144
|
spark.read.clickhouse.numPartitions=10
|
||||||
spark.read.clickhouse.fetchsize=10000
|
spark.read.clickhouse.fetchsize=10000
|
||||||
spark.read.clickhouse.partitionColumn=common_recv_time
|
spark.read.clickhouse.partitionColumn=common_recv_time
|
||||||
|
|
||||||
|
spark.write.clickhouse.url=jdbc:clickhouse://192.168.40.194:8123/ip_learning?socket_timeout=3600000
|
||||||
|
spark.write.clickhouse.user=default
|
||||||
|
spark.write.clickhouse.password=111111
|
||||||
|
spark.write.clickhouse.initialsize=1
|
||||||
|
spark.write.clickhouse.minidle=1
|
||||||
|
spark.write.clickhouse.maxactive=50
|
||||||
|
|
||||||
clickhouse.socket.timeout=300000
|
clickhouse.socket.timeout=300000
|
||||||
#arangoDB配置
|
#arangoDB配置
|
||||||
arangoDB.host=192.168.40.182
|
arangoDB.host=192.168.40.182
|
||||||
|
|||||||
@@ -21,6 +21,13 @@ object ApplicationConfig {
|
|||||||
val SPARK_READ_CLICKHOUSE_FETCHSIZE: String = config.getString("spark.read.clickhouse.fetchsize")
|
val SPARK_READ_CLICKHOUSE_FETCHSIZE: String = config.getString("spark.read.clickhouse.fetchsize")
|
||||||
val SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN: String = config.getString("spark.read.clickhouse.partitionColumn")
|
val SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN: String = config.getString("spark.read.clickhouse.partitionColumn")
|
||||||
|
|
||||||
|
val SPARK_WRITE_CLICKHOUSE_URL: String = config.getString("spark.write.clickhouse.url")
|
||||||
|
val SPARK_WRITE_CLICKHOUSE_USER: String = config.getString("spark.write.clickhouse.user")
|
||||||
|
val SPARK_WRITE_CLICKHOUSE_PASSWORD: String = config.getString("spark.write.clickhouse.password")
|
||||||
|
val SPARK_WRITE_CLICKHOUSE_INITIALSIZE: Int = config.getInt("spark.write.clickhouse.initialsize")
|
||||||
|
val SPARK_WRITE_CLICKHOUSE_MINIDLE: Int = config.getInt("spark.write.clickhouse.minidle")
|
||||||
|
val SPARK_WRITE_CLICKHOUSE_MAXACTIVE: Int = config.getInt("spark.write.clickhouse.maxactive")
|
||||||
|
|
||||||
val ARANGODB_HOST: String= config.getString("arangoDB.host")
|
val ARANGODB_HOST: String= config.getString("arangoDB.host")
|
||||||
val ARANGODB_PORT: Int = config.getInt("arangoDB.port")
|
val ARANGODB_PORT: Int = config.getInt("arangoDB.port")
|
||||||
val ARANGODB_USER: String= config.getString("arangoDB.user")
|
val ARANGODB_USER: String= config.getString("arangoDB.user")
|
||||||
|
|||||||
@@ -4,14 +4,12 @@ import java.util.regex.Pattern
|
|||||||
|
|
||||||
import cn.ac.iie.config.ApplicationConfig
|
import cn.ac.iie.config.ApplicationConfig
|
||||||
import cn.ac.iie.dao.BaseClickhouseData
|
import cn.ac.iie.dao.BaseClickhouseData
|
||||||
import cn.ac.iie.service.update.UpdateDocHandler.{mergeDistinctIp, separateAttributeByProtocol}
|
|
||||||
import cn.ac.iie.spark.partition.CustomPartitioner
|
import cn.ac.iie.spark.partition.CustomPartitioner
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.{DataFrame, Row}
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
import scala.collection.mutable.WrappedArray.ofRef
|
|
||||||
|
|
||||||
object MergeDataFrame {
|
object MergeDataFrame {
|
||||||
private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass)
|
private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass)
|
||||||
@@ -39,7 +37,7 @@ object MergeDataFrame {
|
|||||||
values
|
values
|
||||||
}
|
}
|
||||||
|
|
||||||
def mergeRelationFqdnLocateIp(): RDD[Row] ={
|
def mergeRelationFqdnLocateIp(): DataFrame ={
|
||||||
val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN")))
|
val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN")))
|
||||||
.groupBy("FQDN", "common_server_ip")
|
.groupBy("FQDN", "common_server_ip")
|
||||||
.agg(
|
.agg(
|
||||||
@@ -49,21 +47,7 @@ object MergeDataFrame {
|
|||||||
collect_list("schema_type").alias("schema_type_list"),
|
collect_list("schema_type").alias("schema_type_list"),
|
||||||
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
|
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
|
||||||
)
|
)
|
||||||
frame.rdd.map(row => {
|
frame
|
||||||
val fqdn = row.getAs[String]("FQDN")
|
|
||||||
val serverIp = row.getAs[String]("common_server_ip")
|
|
||||||
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
|
|
||||||
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
|
|
||||||
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
|
|
||||||
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
|
|
||||||
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
|
|
||||||
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
|
|
||||||
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
|
|
||||||
|
|
||||||
Row(fqdn,serverIp,firstFoundTime,lastFoundTime,
|
|
||||||
sepAttritubeMap.get("HTTP"),sepAttritubeMap.get("TLS"),sepAttritubeMap.get("DNS"),distinctIp,currentHour)
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def isDomain(fqdn: String): Boolean = {
|
private def isDomain(fqdn: String): Boolean = {
|
||||||
@@ -71,13 +55,9 @@ object MergeDataFrame {
|
|||||||
if (fqdn == null || fqdn.length == 0) {
|
if (fqdn == null || fqdn.length == 0) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if (fqdn.contains(":")) {
|
val domain = fqdn.split(":")(0)
|
||||||
val s = fqdn.split(":")(0)
|
|
||||||
if (s.contains(":")){
|
val fqdnArr = domain.split("\\.")
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
val fqdnArr = fqdn.split("\\.")
|
|
||||||
if (fqdnArr.length < 4 || fqdnArr.length > 4){
|
if (fqdnArr.length < 4 || fqdnArr.length > 4){
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,25 +1,26 @@
|
|||||||
package cn.ac.iie.service.update
|
package cn.ac.iie.service.update
|
||||||
|
|
||||||
import java.util.Properties
|
import java.sql.PreparedStatement
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
import cn.ac.iie.config.ApplicationConfig
|
import cn.ac.iie.config.ApplicationConfig
|
||||||
import cn.ac.iie.dao.BaseArangoData
|
|
||||||
import cn.ac.iie.service.update.UpdateDocHandler._
|
import cn.ac.iie.service.update.UpdateDocHandler._
|
||||||
import cn.ac.iie.utils.SparkSessionUtil
|
import cn.ac.iie.utils.{ClickhouseConnect, SparkSessionUtil}
|
||||||
import cn.ac.iie.service.transform.MergeDataFrame._
|
import cn.ac.iie.service.transform.MergeDataFrame._
|
||||||
import cn.ac.iie.utils.SparkSessionUtil.spark
|
import com.alibaba.druid.pool.DruidPooledConnection
|
||||||
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
|
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.sql.{DataFrame, Row}
|
||||||
import org.apache.spark.sql.{Row, SaveMode}
|
|
||||||
import org.apache.spark.sql.types._
|
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
import ru.yandex.clickhouse.ClickHouseArray
|
||||||
|
|
||||||
import scala.collection.mutable.WrappedArray.ofRef
|
import scala.collection.mutable.WrappedArray.ofRef
|
||||||
|
|
||||||
object UpdateDocument {
|
object UpdateDocument {
|
||||||
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
|
private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass)
|
||||||
|
|
||||||
|
private val manger: ClickhouseConnect = ClickhouseConnect.getInstance()
|
||||||
|
private val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60
|
||||||
|
|
||||||
def update(): Unit = {
|
def update(): Unit = {
|
||||||
try {
|
try {
|
||||||
updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
|
updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp)
|
||||||
@@ -33,49 +34,56 @@ object UpdateDocument {
|
|||||||
private def updateDocument[T <: BaseDocument](collName: String,
|
private def updateDocument[T <: BaseDocument](collName: String,
|
||||||
getDocumentRow: Row => T,
|
getDocumentRow: Row => T,
|
||||||
clazz: Class[T],
|
clazz: Class[T],
|
||||||
getNewDataRdd: () => RDD[Row]
|
getNewDataRdd: () => DataFrame
|
||||||
): Unit = {
|
): Unit = {
|
||||||
try {
|
try {
|
||||||
val start = System.currentTimeMillis()
|
val start = System.currentTimeMillis()
|
||||||
val newDataRdd = getNewDataRdd()
|
val newDataFrame = getNewDataRdd()
|
||||||
val schema = StructType(List(
|
|
||||||
StructField("fqdn",StringType),
|
newDataFrame.foreachPartition(iter => {
|
||||||
StructField("ip",StringType),
|
val connection: DruidPooledConnection = manger.getConnection
|
||||||
StructField("first_found_time",LongType),
|
val sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local" +
|
||||||
StructField("last_found_time",LongType),
|
"VALUES(?,?,?,?,?,?,?,?,?)"
|
||||||
StructField("dns_cnt_total",LongType),
|
val pstm: PreparedStatement = connection.prepareStatement(sql)
|
||||||
StructField("tls_cnt_total",LongType),
|
|
||||||
StructField("http_cnt_total",LongType),
|
|
||||||
StructField("dist_cip",ArrayType(StringType)),
|
|
||||||
StructField("stat_time",LongType)
|
|
||||||
))
|
|
||||||
val frame = spark.createDataFrame(newDataRdd,schema)
|
|
||||||
/*
|
|
||||||
newDataRdd.foreachPartition(iter => {
|
|
||||||
var i = 0
|
var i = 0
|
||||||
iter.foreach(row => {
|
iter.foreach(row => {
|
||||||
val document = getDocumentRow(row)
|
val fqdn = row.getAs[String]("FQDN")
|
||||||
|
val serverIp = row.getAs[String]("common_server_ip")
|
||||||
|
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
|
||||||
|
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
|
||||||
|
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST")
|
||||||
|
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list")
|
||||||
|
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
|
||||||
|
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
|
||||||
|
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
|
||||||
|
|
||||||
|
pstm.setString(1,fqdn)
|
||||||
|
pstm.setString(2,serverIp)
|
||||||
|
pstm.setLong(3,firstFoundTime)
|
||||||
|
pstm.setLong(4,lastFoundTime)
|
||||||
|
pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L))
|
||||||
|
pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L))
|
||||||
|
pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L))
|
||||||
|
pstm.setArray(8,new ClickHouseArray(1, distinctIp))
|
||||||
|
pstm.setLong(9,currentHour)
|
||||||
|
|
||||||
i += 1
|
i += 1
|
||||||
|
pstm.addBatch()
|
||||||
|
|
||||||
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
|
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
|
||||||
LOG.warn(s"更新:$collName" + i)
|
pstm.executeBatch()
|
||||||
|
connection.commit()
|
||||||
|
LOG.warn("写入clickhouse数据量:" + i)
|
||||||
i = 0
|
i = 0
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if (i != 0) {
|
if (i != 0) {
|
||||||
LOG.warn(s"更新$collName:" + i)
|
pstm.executeBatch
|
||||||
|
connection.commit()
|
||||||
|
LOG.warn("写入clickhouse数据量:" + i)
|
||||||
}
|
}
|
||||||
|
manger.clear(pstm,connection)
|
||||||
})
|
})
|
||||||
*/
|
|
||||||
frame.write.mode(SaveMode.Append).format("jdbc").options(
|
|
||||||
Map(
|
|
||||||
"driver" -> "ru.yandex.clickhouse.ClickHouseDriver",
|
|
||||||
"url" -> "jdbc:clickhouse://192.168.40.194:8123/ip_learning",
|
|
||||||
"dbtable" -> "r_locate_fqdn2ip_local",
|
|
||||||
"user" -> "default",
|
|
||||||
"password" -> "111111",
|
|
||||||
"batchsize" -> "10000",
|
|
||||||
"truncate" -> "true")
|
|
||||||
).save()
|
|
||||||
val last = System.currentTimeMillis()
|
val last = System.currentTimeMillis()
|
||||||
LOG.warn(s"更新$collName 时间:${last - start}")
|
LOG.warn(s"更新$collName 时间:${last - start}")
|
||||||
} catch {
|
} catch {
|
||||||
|
|||||||
Reference in New Issue
Block a user