diff --git a/ip-learning-spark/pom.xml b/ip-learning-spark/pom.xml index 7ea3c38..0048991 100644 --- a/ip-learning-spark/pom.xml +++ b/ip-learning-spark/pom.xml @@ -7,99 +7,263 @@ cn.ac.iie ip-learning-spark 1.0-SNAPSHOT - - - javax.servlet - javax.servlet-api - 3.0.1 - - - org.apache.httpcomponents - httpclient - 4.5.2 - + + + nexus + Team Nexus Repository + http://192.168.40.125:8099/content/groups/public + - - - org.apache.httpcomponents - httpcore - 4.4.6 - + + ebi + www.ebi.ac.uk + http://www.ebi.ac.uk/intact/maven/nexus/content/groups/public/ + - - com.google.guava - guava - 19.0 - + + maven-ali + http://maven.aliyun.com/nexus/content/groups/public/ + + true + + + true + always + fail + + - - org.apache.spark - spark-core_2.11 - 2.2.3 - + - - org.apache.spark - spark-sql_2.11 - 2.2.3 - + - - ru.yandex.clickhouse - clickhouse-jdbc - 0.1.54 - + + com.zdjizhi + galaxy + 1.0.6 + + + slf4j-log4j12 + org.slf4j + + + log4j-over-slf4j + org.slf4j + + + com.google.guava + guava + + + - - com.typesafe - config - 1.2.1 - + + org.slf4j + slf4j-log4j12 + 1.7.25 + - - com.arangodb - arangodb-java-driver - 6.6.3 - + + javax.servlet + javax.servlet-api + 3.0.1 + - - com.arangodb - velocypack-module-jdk8 - 1.1.0 - + + org.apache.httpcomponents + httpclient + 4.5.2 + + + httpcore + org.apache.httpcomponents + + + - - com.arangodb - velocypack-module-scala_2.11 - 1.2.0 - + + + org.apache.httpcomponents + httpcore + 4.4.6 + - - org.scala-lang - scala-library - 2.11.8 - + + com.google.guava + guava + 19.0 + - - net.alchim31.maven - scala-maven-plugin - 3.2.0 - + + org.apache.spark + spark-core_2.11 + 2.2.3 + + + guava + com.google.guava + + + netty + io.netty + + + javax.servlet-api + javax.servlet + + + jaxb-api + javax.xml.bind + + + log4j + log4j + + + httpclient + org.apache.httpcomponents + + + httpcore + org.apache.httpcomponents + + + zookeeper + org.apache.zookeeper + + + slf4j-api + org.slf4j + + + slf4j-log4j12 + org.slf4j + + + - - org.scala-lang.modules - scala-xml_2.11 - 1.0.4 - + + org.apache.spark + spark-sql_2.11 + 2.2.3 + - - org.scala-tools - maven-scala-plugin - 2.15.2 - + + ru.yandex.clickhouse + clickhouse-jdbc + 0.1.54 + + + jackson-databind + com.fasterxml.jackson.core + + + slf4j-api + org.slf4j + + + - + + com.typesafe + config + 1.2.1 + + + + com.arangodb + arangodb-java-driver + 6.6.3 + + + + com.arangodb + velocypack-module-jdk8 + 1.1.0 + + + + com.arangodb + velocypack-module-scala_2.11 + 1.2.0 + + + + org.scala-lang + scala-library + 2.11.8 + + + + net.alchim31.maven + scala-maven-plugin + 3.2.0 + + + maven-artifact + org.apache.maven + + + maven-core + org.apache.maven + + + maven-model + org.apache.maven + + + maven-plugin-api + org.apache.maven + + + maven-repository-metadata + org.apache.maven + + + maven-settings + org.apache.maven + + + doxia-sink-api + org.apache.maven.doxia + + + plexus-container-default + org.codehaus.plexus + + + plexus-interpolation + org.codehaus.plexus + + + plexus-utils + org.codehaus.plexus + + + scala-library + org.scala-lang + + + scala-reflect + org.scala-lang + + + + + + org.scala-lang.modules + scala-xml_2.11 + 1.0.4 + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 3365c33..5952237 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -1,6 +1,8 @@ #spark任务配置 spark.sql.shuffle.partitions=10 spark.executor.memory=4g +spark.executor.cores=1 +spark.cores.max=10 spark.app.name=test spark.network.timeout=300s spark.serializer=org.apache.spark.serializer.KryoSerializer @@ -17,22 +19,19 @@ spark.read.clickhouse.session.table=session_record spark.read.clickhouse.radius.table=radius_record clickhouse.socket.timeout=300000 #arangoDB配置 -#arangoDB.host=192.168.40.223 arangoDB.host=192.168.44.12 arangoDB.port=8529 arangoDB.user=root -#arangoDB.password=galaxy_2019 arangoDB.password=ceiec2019 arangoDB.DB.name=tsg_galaxy_v3_test -#arangoDB.DB.name=iplearn_media_domain arangoDB.ttl=3600 thread.pool.number=10 #读取clickhouse时间范围方式,0:读取过去一小时;1:指定时间范围 -clickhouse.time.limit.type=0 -read.clickhouse.max.time=1608518990 -read.clickhouse.min.time=1604851201 +clickhouse.time.limit.type=1 +read.clickhouse.max.time=1634902508 +read.clickhouse.min.time=1631759985 update.arango.batch=10000 @@ -40,5 +39,6 @@ distinct.client.ip.num=10000 recent.count.hour=24 update.interval=3600 - arangodb.total.num=20000000 +#读取radius时间范围,与radius任务执行周期一致,单位:分钟 +read.radius.granularity=-30 diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala index 983921f..29eb786 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/config/ApplicationConfig.scala @@ -7,6 +7,8 @@ object ApplicationConfig { val SPARK_SQL_SHUFFLE_PARTITIONS: Int = config.getInt("spark.sql.shuffle.partitions") val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory") + val SPARK_EXECUTOR_CORES: String = config.getString("spark.executor.cores") + val SPARK_CORES_MAX: String = config.getString("spark.cores.max") val SPARK_APP_NAME: String = config.getString("spark.app.name") val SPARK_NETWORK_TIMEOUT: String = config.getString("spark.network.timeout") // val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber") @@ -46,4 +48,6 @@ object ApplicationConfig { val ARANGODB_TOTAL_NUM: Long = config.getLong("arangodb.total.num") + val READ_RADIUS_GRANULARITY: Int = config.getInt("read.radius.granularity") + } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 9e0bf44..b568e52 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -1,7 +1,10 @@ package cn.ac.iie.dao +import java.util.Date + import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.utils.SparkSessionUtil.spark +import com.zdjizhi.utils.DateUtils import org.apache.spark.sql.DataFrame import org.slf4j.LoggerFactory @@ -36,19 +39,19 @@ object BaseClickhouseData { val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1 val sql = s""" - |(SELECT - | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME - |FROM - | ((SELECT - | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME - | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} - | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni - | )UNION ALL - | (SELECT - | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME - | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} - | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host)) - |GROUP BY FQDN HAVING FQDN != '') as dbtable + |(SELECT + | FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME + |FROM + | ((SELECT + | ssl_sni AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME + | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} + | WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni + | )UNION ALL + | (SELECT + | http_host AS FQDN,MAX( common_recv_time ) AS LAST_FOUND_TIME,MIN( common_recv_time ) AS FIRST_FOUND_TIME + | FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} + | WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host)) + |GROUP BY FQDN HAVING FQDN != '') as dbtable """.stripMargin LOG.warn(sql) val frame = initClickhouseData(sql) @@ -114,10 +117,12 @@ object BaseClickhouseData { def getRelationSubidLocateIpDf: DataFrame = { val where = s""" - | common_recv_time >= ${timeLimit._2} - | AND common_recv_time < ${timeLimit._1} + | common_recv_time >= ${getRadiusTimeRange._2} + | AND common_recv_time < ${getRadiusTimeRange._1} | AND common_subscriber_id != '' | AND radius_framed_ip != '' + | AND radius_packet_type = 4 + | AND radius_acct_status_type = 1 """.stripMargin val sql = s""" @@ -136,8 +141,8 @@ object BaseClickhouseData { def getVertexSubidDf: DataFrame = { val where = s""" - | common_recv_time >= ${timeLimit._2} - | AND common_recv_time < ${timeLimit._1} + | common_recv_time >= ${getRadiusTimeRange._2} + | AND common_recv_time < ${getRadiusTimeRange._1} | AND common_subscriber_id != '' | AND radius_framed_ip != '' | AND radius_packet_type = 4 @@ -159,8 +164,8 @@ object BaseClickhouseData { def getVertexFramedIpDf: DataFrame = { val where = s""" - | common_recv_time >= ${timeLimit._2} - | AND common_recv_time < ${timeLimit._1} + | common_recv_time >= ${getRadiusTimeRange._2} + | AND common_recv_time < ${getRadiusTimeRange._1} | AND common_subscriber_id != '' | AND radius_framed_ip != '' | AND radius_packet_type = 4 @@ -180,6 +185,17 @@ object BaseClickhouseData { frame } + private def getRadiusTimeRange: (Long, Long) = { + val date = DateUtils.getTimeFloor(new Date(System.currentTimeMillis()), "PT1M") + val max = date.getTime / 1000 + val min = DateUtils.getSomeMinute(date, ApplicationConfig.READ_RADIUS_GRANULARITY).getTime / 1000 + (max, min) + } + + def main(args: Array[String]): Unit = { + println(getRadiusTimeRange) + println(getRadiusTimeRange._2 - getRadiusTimeRange._1) + } private def getTimeLimit: (Long, Long) = { var maxTime = 0L diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala index b190ad9..1ed2fa0 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpLearningApplication.scala @@ -5,6 +5,6 @@ import cn.ac.iie.service.update.UpdateDocument object IpLearningApplication { def main(args: Array[String]): Unit = { - UpdateDocument.update() + UpdateDocument.ipLearning() } } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpRecommendApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpRecommendApplication.scala new file mode 100644 index 0000000..7fa6d62 --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/IpRecommendApplication.scala @@ -0,0 +1,11 @@ +package cn.ac.iie.main + +import cn.ac.iie.service.update.UpdateDocument + +object IpRecommendApplication { + + def main(args: Array[String]): Unit = { + UpdateDocument.ipRecommend() + } + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/main/SubscriberRecommendApplication.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/main/SubscriberRecommendApplication.scala new file mode 100644 index 0000000..04cbec4 --- /dev/null +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/main/SubscriberRecommendApplication.scala @@ -0,0 +1,11 @@ +package cn.ac.iie.main + +import cn.ac.iie.service.update.UpdateDocument + +object SubscriberRecommendApplication { + + def main(args: Array[String]): Unit = { + UpdateDocument.subscriberRecommend() + } + +} diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 487e9c0..d6b0a51 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -17,20 +17,35 @@ object UpdateDocument { private val arangoManger: ArangoDBConnect = ArangoDBConnect.getInstance() private val LOG = LoggerFactory.getLogger(UpdateDocument.getClass) - def update(): Unit = { + def ipLearning(): Unit = { try { -// updateDocument("FQDN", getVertexFqdnRow, mergeVertexFqdn) - - updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) - - insertFrameIp() - - updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) - updateDocument("R_LOCATE_FQDN2IP", getRelationFqdnLocateIpRow, mergeRelationFqdnLocateIp) + } catch { + case e: Exception => e.printStackTrace() + } finally { + arangoManger.clean() + SparkSessionUtil.closeSpark() + System.exit(0) + } + } + def subscriberRecommend(): Unit = { + try { + updateDocument("SUBSCRIBER", getVertexSubidRow, mergeVertexSubid) + insertFrameIp() + updateDocument("R_LOCATE_SUBSCRIBER2IP", getRelationSubidLocateIpRow, mergeRelationSubidLocateIp) + } catch { + case e: Exception => e.printStackTrace() + } finally { + arangoManger.clean() + SparkSessionUtil.closeSpark() + System.exit(0) + } + } + + def ipRecommend(): Unit = { + try { updateDocument("IP", getVertexIpRow, mergeVertexIp) - } catch { case e: Exception => e.printStackTrace() } finally { @@ -57,7 +72,7 @@ object UpdateDocument { val document: T = getDocumentRow(row) if (document != null) { fqdnAccmu.add(1) - +// println(document) resultDocumentList.add(document) } i += 1 @@ -118,14 +133,7 @@ object UpdateDocument { case Some(doc) => doc case None => null } - val subidLocIpRow = joinRow._2._2 - - // val subidLocIpRow = subidLocIpRowOpt match { - // case Some(r) => r - // case None => null - // } - if (subidLocIpRow != null) { val subId = subidLocIpRow.getAs[String]("common_subscriber_id") val ip = subidLocIpRow.getAs[String]("radius_framed_ip") @@ -155,18 +163,12 @@ object UpdateDocument { case Some(doc) => doc case None => null } - val subidRow = joinRow._2._2 - - // val subidRow = subidRowOpt match { - // case Some(r) => r - // case None => null - // } - if (subidRow != null) { val subId = subidRow.getAs[String]("common_subscriber_id") val subLastFoundTime = subidRow.getAs[Long]("LAST_FOUND_TIME") val subFirstFoundTime = subidRow.getAs[Long]("FIRST_FOUND_TIME") + if (subidDoc != null) { updateMaxAttribute(subidDoc, subLastFoundTime, "LAST_FOUND_TIME") } else { @@ -177,7 +179,6 @@ object UpdateDocument { subidDoc.addAttribute("LAST_FOUND_TIME", subLastFoundTime) } } - subidDoc } @@ -187,18 +188,12 @@ object UpdateDocument { case Some(doc) => doc case None => null } - val fqdnRow: Row = joinRow._2._2 - - // val fqdnRow = fqdnRowOpt match { - // case Some(r) => r - // case None => null - // } - if (fqdnRow != null) { val fqdn = fqdnRow.getAs[String]("FQDN") val lastFoundTime = fqdnRow.getAs[Long]("LAST_FOUND_TIME") val firstFoundTime = fqdnRow.getAs[Long]("FIRST_FOUND_TIME") + if (fqdnDoc != null) { updateMaxAttribute(fqdnDoc, lastFoundTime, "LAST_FOUND_TIME") } else { @@ -209,7 +204,6 @@ object UpdateDocument { fqdnDoc.addAttribute("LAST_FOUND_TIME", lastFoundTime) } } - fqdnDoc } @@ -219,14 +213,7 @@ object UpdateDocument { case Some(doc) => doc case None => null } - val ipRow = joinRow._2._2 - - // val ipRow = ipRowOpt match { - // case Some(r) => r - // case None => null - // } - if (ipRow != null) { val ip = ipRow.getAs[String]("IP") val firstFoundTime = ipRow.getAs[Long]("FIRST_FOUND_TIME") @@ -257,7 +244,6 @@ object UpdateDocument { ipDoc.addAttribute("COMMON_LINK_INFO", "") } } - ipDoc } @@ -268,18 +254,10 @@ object UpdateDocument { case Some(doc) => doc case None => null } - val fqdnLocIpRow = joinRow._2._2 - - // val fqdnLocIpRow = fqdnLocIpRowOpt match { - // case Some(r) => r - // case None => null - // } - if (fqdnLocIpDoc != null) { updateProtocolDocument(fqdnLocIpDoc) } - if (fqdnLocIpRow != null) { val fqdn = fqdnLocIpRow.getAs[String]("FQDN") val serverIp = fqdnLocIpRow.getAs[String]("common_server_ip") @@ -291,9 +269,7 @@ object UpdateDocument { val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) - val key = fqdn.concat("-" + serverIp) - if (fqdnLocIpDoc != null) { updateMaxAttribute(fqdnLocIpDoc, lastFoundTime, "LAST_FOUND_TIME") updateProtocolAttritube(fqdnLocIpDoc, sepAttritubeMap) @@ -309,7 +285,6 @@ object UpdateDocument { putDistinctIp(fqdnLocIpDoc, distinctIp) } } - fqdnLocIpDoc } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala index 36c114a..f96862a 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala @@ -21,6 +21,8 @@ object SparkSessionUtil { .config("spark.network.timeout", ApplicationConfig.SPARK_NETWORK_TIMEOUT) .config("spark.sql.shuffle.partitions", ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS) .config("spark.executor.memory", ApplicationConfig.SPARK_EXECUTOR_MEMORY) + .config("spark.executor.cores",ApplicationConfig.SPARK_EXECUTOR_CORES) + .config("spark.cores.max",ApplicationConfig.SPARK_CORES_MAX) .config("arangodb.hosts", s"${ApplicationConfig.ARANGODB_HOST}:${ApplicationConfig.ARANGODB_PORT}") .config("arangodb.user", ApplicationConfig.ARANGODB_USER) .config("arangodb.password", ApplicationConfig.ARANGODB_PASSWORD)