From 6265bb5e90d7809913f838f81794c7a90d883419 Mon Sep 17 00:00:00 2001 From: wanglihui <949764788@qq.com> Date: Thu, 27 Aug 2020 09:19:15 +0800 Subject: [PATCH] =?UTF-8?q?IP=20Learning=20report=20=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ip-learning-spark/pom.xml | 12 +++ .../cn/ac/iie/utils/ClickhouseConnect.java | 95 +++++++++++++++++++ .../src/main/resources/application.properties | 10 +- .../cn/ac/iie/config/ApplicationConfig.scala | 7 ++ .../service/transform/MergeDataFrame.scala | 32 ++----- .../iie/service/update/UpdateDocument.scala | 82 ++++++++-------- 6 files changed, 174 insertions(+), 64 deletions(-) create mode 100644 ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java diff --git a/ip-learning-spark/pom.xml b/ip-learning-spark/pom.xml index 204fa68..4b9a1be 100644 --- a/ip-learning-spark/pom.xml +++ b/ip-learning-spark/pom.xml @@ -27,6 +27,12 @@ 4.4.6 + + com.alibaba + druid + 1.1.10 + + com.google.guava guava @@ -75,6 +81,12 @@ 3.2.0 + + org.scala-lang.modules + scala-xml_2.11 + 1.0.4 + + org.scala-tools diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java new file mode 100644 index 0000000..cff990b --- /dev/null +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ClickhouseConnect.java @@ -0,0 +1,95 @@ +package cn.ac.iie.utils; + +import cn.ac.iie.config.ApplicationConfig; +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidPooledConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +public class ClickhouseConnect { + private static final Logger LOG = LoggerFactory.getLogger(ClickhouseConnect.class); + private static DruidDataSource dataSource = null; + private static ClickhouseConnect dbConnect = null; + + static { + getDbConnect(); + } + + private static void getDbConnect() { + try { + if (dataSource == null) { + dataSource = new DruidDataSource(); + //设置连接参数 + dataSource.setUrl(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_URL()); + dataSource.setDriverClassName(ApplicationConfig.SPARK_READ_CLICKHOUSE_DRIVER()); + dataSource.setUsername(ApplicationConfig.SPARK_READ_CLICKHOUSE_USER()); + dataSource.setPassword(ApplicationConfig.SPARK_READ_CLICKHOUSE_PASSWORD()); + //配置初始化大小、最小、最大 + dataSource.setInitialSize(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_INITIALSIZE()); + dataSource.setMinIdle(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_MINIDLE()); + dataSource.setMaxActive(ApplicationConfig.SPARK_WRITE_CLICKHOUSE_MAXACTIVE()); + //配置获取连接等待超时的时间 + dataSource.setMaxWait(30000); + //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + dataSource.setTimeBetweenEvictionRunsMillis(2000); + //防止过期 + dataSource.setValidationQuery("SELECT 1"); + dataSource.setTestWhileIdle(true); + dataSource.setTestOnBorrow(true); + dataSource.setKeepAlive(true); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + + } + } + + /** + * 数据库连接池单例 + * + * @return dbConnect + */ + public static synchronized ClickhouseConnect getInstance() { + if (null == dbConnect) { + dbConnect = new ClickhouseConnect(); + } + return dbConnect; + } + + /** + * 返回druid数据库连接 + * + * @return 连接 + * @throws SQLException sql异常 + */ + public DruidPooledConnection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + /** + * 清空PreparedStatement、Connection对象,未定义的置空。 + * + * @param pstmt PreparedStatement对象 + * @param connection Connection对象 + */ + public void clear(Statement pstmt, Connection connection) { + try { + if (pstmt != null) { + pstmt.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.error(e.getMessage()); + } + + } + +} diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index b48b95c..544f01d 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -11,9 +11,17 @@ spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver spark.read.clickhouse.user=default spark.read.clickhouse.password=111111 -spark.read.clickhouse.numPartitions=144 +spark.read.clickhouse.numPartitions=10 spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.partitionColumn=common_recv_time + +spark.write.clickhouse.url=jdbc:clickhouse://192.168.40.194:8123/ip_learning?socket_timeout=3600000 +spark.write.clickhouse.user=default +spark.write.clickhouse.password=111111 +spark.write.clickhouse.initialsize=1 +spark.write.clickhouse.minidle=1 +spark.write.clickhouse.maxactive=50 + clickhouse.socket.timeout=300000 #arangoDB配置 arangoDB.host=192.168.40.182 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala index 395ea6b..d8cdb21 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala @@ -21,6 +21,13 @@ object ApplicationConfig { val SPARK_READ_CLICKHOUSE_FETCHSIZE: String = config.getString("spark.read.clickhouse.fetchsize") val SPARK_READ_CLICKHOUSE_PARTITIONCOLUMN: String = config.getString("spark.read.clickhouse.partitionColumn") + val SPARK_WRITE_CLICKHOUSE_URL: String = config.getString("spark.write.clickhouse.url") + val SPARK_WRITE_CLICKHOUSE_USER: String = config.getString("spark.write.clickhouse.user") + val SPARK_WRITE_CLICKHOUSE_PASSWORD: String = config.getString("spark.write.clickhouse.password") + val SPARK_WRITE_CLICKHOUSE_INITIALSIZE: Int = config.getInt("spark.write.clickhouse.initialsize") + val SPARK_WRITE_CLICKHOUSE_MINIDLE: Int = config.getInt("spark.write.clickhouse.minidle") + val SPARK_WRITE_CLICKHOUSE_MAXACTIVE: Int = config.getInt("spark.write.clickhouse.maxactive") + val ARANGODB_HOST: String= config.getString("arangoDB.host") val ARANGODB_PORT: Int = config.getInt("arangoDB.port") val ARANGODB_USER: String= config.getString("arangoDB.user") diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index dd264b0..6cf6a4a 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -4,14 +4,12 @@ import java.util.regex.Pattern import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.dao.BaseClickhouseData -import cn.ac.iie.service.update.UpdateDocHandler.{mergeDistinctIp, separateAttributeByProtocol} import cn.ac.iie.spark.partition.CustomPartitioner import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions._ import org.slf4j.LoggerFactory -import scala.collection.mutable.WrappedArray.ofRef object MergeDataFrame { private val LOG = LoggerFactory.getLogger(MergeDataFrame.getClass) @@ -39,7 +37,7 @@ object MergeDataFrame { values } - def mergeRelationFqdnLocateIp(): RDD[Row] ={ + def mergeRelationFqdnLocateIp(): DataFrame ={ val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN"))) .groupBy("FQDN", "common_server_ip") .agg( @@ -49,21 +47,7 @@ object MergeDataFrame { collect_list("schema_type").alias("schema_type_list"), collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") ) - frame.rdd.map(row => { - val fqdn = row.getAs[String]("FQDN") - val serverIp = row.getAs[String]("common_server_ip") - val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") - val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") - val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") - val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") - val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") - val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) - val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - - Row(fqdn,serverIp,firstFoundTime,lastFoundTime, - sepAttritubeMap.get("HTTP"),sepAttritubeMap.get("TLS"),sepAttritubeMap.get("DNS"),distinctIp,currentHour) - }) - + frame } private def isDomain(fqdn: String): Boolean = { @@ -71,13 +55,9 @@ object MergeDataFrame { if (fqdn == null || fqdn.length == 0) { return false } - if (fqdn.contains(":")) { - val s = fqdn.split(":")(0) - if (s.contains(":")){ - return false - } - } - val fqdnArr = fqdn.split("\\.") + val domain = fqdn.split(":")(0) + + val fqdnArr = domain.split("\\.") if (fqdnArr.length < 4 || fqdnArr.length > 4){ return true } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index e386fac..0143204 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -1,25 +1,26 @@ package cn.ac.iie.service.update -import java.util.Properties +import java.sql.PreparedStatement import java.util.concurrent.ConcurrentHashMap import cn.ac.iie.config.ApplicationConfig -import cn.ac.iie.dao.BaseArangoData import cn.ac.iie.service.update.UpdateDocHandler._ -import cn.ac.iie.utils.SparkSessionUtil +import cn.ac.iie.utils.{ClickhouseConnect, SparkSessionUtil} import cn.ac.iie.service.transform.MergeDataFrame._ -import cn.ac.iie.utils.SparkSessionUtil.spark +import com.alibaba.druid.pool.DruidPooledConnection import com.arangodb.entity.{BaseDocument, BaseEdgeDocument} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SaveMode} -import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} import org.slf4j.LoggerFactory +import ru.yandex.clickhouse.ClickHouseArray import scala.collection.mutable.WrappedArray.ofRef object UpdateDocument { private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) + private val manger: ClickhouseConnect = ClickhouseConnect.getInstance() + private val currentHour: Long = System.currentTimeMillis / (60 * 60 * 1000) * 60 * 60 + def update(): Unit = { try { updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, classOf[BaseEdgeDocument], mergeRelationFqdnLocateIp) @@ -33,49 +34,56 @@ object UpdateDocument { private def updateDocument[T <: BaseDocument](collName: String, getDocumentRow: Row => T, clazz: Class[T], - getNewDataRdd: () => RDD[Row] + getNewDataRdd: () => DataFrame ): Unit = { try { val start = System.currentTimeMillis() - val newDataRdd = getNewDataRdd() - val schema = StructType(List( - StructField("fqdn",StringType), - StructField("ip",StringType), - StructField("first_found_time",LongType), - StructField("last_found_time",LongType), - StructField("dns_cnt_total",LongType), - StructField("tls_cnt_total",LongType), - StructField("http_cnt_total",LongType), - StructField("dist_cip",ArrayType(StringType)), - StructField("stat_time",LongType) - )) - val frame = spark.createDataFrame(newDataRdd,schema) - /* - newDataRdd.foreachPartition(iter => { + val newDataFrame = getNewDataRdd() + + newDataFrame.foreachPartition(iter => { + val connection: DruidPooledConnection = manger.getConnection + val sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local" + + "VALUES(?,?,?,?,?,?,?,?,?)" + val pstm: PreparedStatement = connection.prepareStatement(sql) var i = 0 iter.foreach(row => { - val document = getDocumentRow(row) + val fqdn = row.getAs[String]("FQDN") + val serverIp = row.getAs[String]("common_server_ip") + val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") + val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") + val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") + val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") + val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT") + val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) + val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) + + pstm.setString(1,fqdn) + pstm.setString(2,serverIp) + pstm.setLong(3,firstFoundTime) + pstm.setLong(4,lastFoundTime) + pstm.setLong(5,sepAttritubeMap.getOrElse("HTTP",0L)) + pstm.setLong(6,sepAttritubeMap.getOrElse("TLS",0L)) + pstm.setLong(7,sepAttritubeMap.getOrElse("DNS",0L)) + pstm.setArray(8,new ClickHouseArray(1, distinctIp)) + pstm.setLong(9,currentHour) + i += 1 + pstm.addBatch() + if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { - LOG.warn(s"更新:$collName" + i) + pstm.executeBatch() + connection.commit() + LOG.warn("写入clickhouse数据量:" + i) i = 0 } }) if (i != 0) { - LOG.warn(s"更新$collName:" + i) + pstm.executeBatch + connection.commit() + LOG.warn("写入clickhouse数据量:" + i) } + manger.clear(pstm,connection) }) - */ - frame.write.mode(SaveMode.Append).format("jdbc").options( - Map( - "driver" -> "ru.yandex.clickhouse.ClickHouseDriver", - "url" -> "jdbc:clickhouse://192.168.40.194:8123/ip_learning", - "dbtable" -> "r_locate_fqdn2ip_local", - "user" -> "default", - "password" -> "111111", - "batchsize" -> "10000", - "truncate" -> "true") - ).save() val last = System.currentTimeMillis() LOG.warn(s"更新$collName 时间:${last - start}") } catch {