delete directory ip-learning

This commit is contained in:
wanglihui
2020-08-06 16:36:40 +08:00
parent 95603676bb
commit a034238679
34 changed files with 0 additions and 3310 deletions

View File

@@ -1,9 +0,0 @@
# Created by .ignore support plugin (hsz.mobi)
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
target
spark-warehouse/

View File

@@ -1,211 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.ac.iie</groupId>
<artifactId>ip-learning</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.54</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-graphdb -->
<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-graphdb</artifactId>
<version>3.0.31</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-client -->
<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-client</artifactId>
<version>3.0.31</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-core -->
<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-core</artifactId>
<version>3.0.31</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-server -->
<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-server</artifactId>
<version>3.0.31</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.orientechnologies/orientdb-jdbc -->
<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-jdbc</artifactId>
<version>3.0.31</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.tinkerpop.blueprints/blueprints-orient-graph -->
<dependency>
<groupId>com.tinkerpop.blueprints</groupId>
<artifactId>blueprints-orient-graph</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>
<version>4.2.2</version>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-connector</artifactId>
<version>1.0.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.swoop/spark-alchemy -->
<dependency>
<groupId>com.swoop</groupId>
<artifactId>spark-alchemy_2.11</artifactId>
<version>0.3.28</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>cn.ac.iie.main.IPLearningApplication</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,26 +0,0 @@
#spark任务配置
spark.sql.shuffle.partitions=144
spark.sql.read.fetchsize="10000"
spark.executor.memory="120g"
spark.app.name="test"
spark.network.timeout="300s"
repartitionNumber=36
spark.serializer="org.apache.spark.serializer.KryoSerializer"
master="local[*]"
#spark读取clickhouse配置
numPartitions="144"
maxPoolSize=40
minTime="1571245199"
maxTime="1571284799"
clickhouse.socket.timeout=300000
#arangoDB配置
arangoDB.host="192.168.40.127"
arangoDB.port=8529
arangoDB.user="root"
arangoDB.password="111111"
arangoDB.DB.name="insert_iplearn_index"
arangoDB.batch=100000
arangoDB.ttl=3600
thread.pool.number=10

View File

@@ -1,25 +0,0 @@
#spark任务配置
spark.sql.shuffle.partitions=144
spark.sql.read.fetchsize=10000
spark.executor.memory=120g
spark.app.name=test
spark.network.timeout=300s
repartitionNumber=36
spark.serializer=org.apache.spark.serializer.KryoSerializer
master=local[*]
#spark读取clickhouse配置
numPartitions=144
maxPoolSize=40
minTime=1571245199
maxTime=1571284799
clickhouse.socket.timeout=300000
#arangoDB配置
arangoDB.host=192.168.40.127
arangoDB.port=8529
arangoDB.user=root
arangoDB.password=111111
arangoDB.DB.name=insert_iplearn_index
arangoDB.batch=100000
arangoDB.ttl=3600
thread.pool.number=10

View File

@@ -1,40 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

View File

@@ -1,71 +0,0 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.
export SPARK_MASTER_IP=bigdata-119
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=4
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=3g
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_73
export SCALA_HOME=/home/ceiec/scala-2.11.7

View File

@@ -1,125 +0,0 @@
package cn.ac.iie.dao
import cn.ac.iie.test.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
object BaseMediaDataLoad {
def loadMediaDate(spark: SparkSession): Unit = {
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
}
def getFQDNVertexFromMedia(spark: SparkSession): DataFrame = {
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS new_fqdn_name,
| MIN( recv_time ) AS new_fqdn_first_found_time,
| MAX( recv_time ) AS new_fqdn_last_found_time,
| COUNT( * ) AS new_fqdn_count_total
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
v_FQDN_DF
}
def getIPVertexFromMedia(spark: SparkSession): DataFrame = {
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as new_ip,
s1_s_location_region as new_location,
MIN( recv_time ) AS new_ip_first_found_time,
MAX( recv_time ) AS new_ip_last_found_time,
COUNT( * ) AS new_ip_count_total
from global_temp.media_expire_patch
GROUP BY
s1_s_ip,
s1_s_location_region
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as new_ip,
s1_d_location_region as new_location,
| MIN( recv_time ) AS new_ip_first_found_time,
MAX( recv_time ) AS new_ip_last_found_time,
COUNT( * ) AS new_ip_count_total
from global_temp.media_expire_patch
GROUP BY
s1_d_ip,
s1_d_location_region
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("new_ip", "new_location").agg(
min("new_ip_first_found_time").as("new_ip_first_found_time"),
max("new_ip_last_found_time").as("new_ip_last_found_time"),
count("new_ip").as("new_ip_count_total")
)
v_IP_DF
}
def getFQDNAddressIPEdgeFromMedia(spark: SparkSession): DataFrame = {
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS new_fqdn,
| s1_d_ip AS new_ip,
| MIN( recv_time ) AS new_first_found_time,
| MAX( recv_time ) AS new_last_found_time,
| COUNT( * ) AS new_count_total,
| CONCAT_WS('-',media_domain,s1_d_ip) AS new_key
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
e_Address_v_FQDN_to_v_IP_DF
}
def getIPVisitFQDNEdgeFromMedia(spark: SparkSession): DataFrame = {
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS new_ip,
| media_domain AS new_fqdn,
| MIN( recv_time ) AS new_first_found_time,
| MAX( recv_time ) AS new_last_found_time,
| COUNT( * ) AS new_count_total,
| CONCAT_WS('-',s1_s_ip,media_domain) as new_key
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
e_Visit_v_IP_to_v_FQDN_DF
}
}

View File

@@ -1,177 +0,0 @@
package cn.ac.iie.dao
import cn.ac.iie.test.ArangoDbTest.arangoDB
import cn.ac.iie.test.Config
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.sql.DataFrame
import scala.util.Try
object UpdateArangoGraph {
/**
* 更新FQDN点
*/
def updateFQDNVertex(v_FQDN_DF:DataFrame): Unit ={
v_FQDN_DF.printSchema()
v_FQDN_DF.foreachPartition(iter => {
val v_FQDN_Coll = arangoDB.db("insert_iplearn_index").collection("V_FQDN")
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (v_FQDN_Coll.documentExists(fqdn)) {
val document: BaseDocument = v_FQDN_Coll.getDocument(fqdn, classOf[BaseDocument])
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_FQDN_Coll.importDocuments(docs_Insert))
Try(v_FQDN_Coll.updateDocuments(docs_Update))
})
}
/**
* 更新IP点
*/
def updateIPVertex(v_IP_DF:DataFrame): Unit ={
v_IP_DF.printSchema()
v_IP_DF.foreachPartition(iter => {
val v_IP_Coll = arangoDB.db("insert_iplearn_index").collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
if (v_IP_Coll.documentExists(ip)) {
val document: BaseDocument = v_IP_Coll.getDocument(ip, classOf[BaseDocument])
val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(ip)
baseDocument.addAttribute("IP", ip)
baseDocument.addAttribute("IP_LOCATION", location)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Update))
})
}
/**
* 统计e_Address_Fqdn_to_IP
*/
def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame): Unit ={
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Add_Fqdn_to_IP_Coll.documentExists(fqdn+"-"+ip)) {
val document: BaseEdgeDocument = e_Add_Fqdn_to_IP_Coll.getDocument(fqdn+"-"+ip, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(fqdn+"-"+ip)
baseDocument.setFrom(s"V_FQDN/$fqdn")
baseDocument.setTo(s"V_IP/$ip")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
// println(fqdn+"-"+ip)
i+=1
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
}
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame): Unit ={
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Visit_Fqdn_to_IP_Coll.documentExists(ip+"-"+fqdn)) {
val document: BaseEdgeDocument = e_Visit_Fqdn_to_IP_Coll.getDocument(ip+"-"+fqdn, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(ip+"-"+fqdn)
baseDocument.setFrom(s"V_IP/$ip")
baseDocument.setTo(s"V_FQDN/$fqdn")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
}
}

View File

@@ -1,237 +0,0 @@
package cn.ac.iie.dao
import cn.ac.iie.etl.CursorTransform
import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
import cn.ac.iie.test.Config
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try
object UpdateArangoGraphByArangoSpark {
/**
* 更新FQDN点
*/
def updateFQDNVertex(v_FQDN_DF:DataFrame,v_FQDN_Cursor_DF: DataFrame): Unit ={
v_FQDN_DF.printSchema()
v_FQDN_Cursor_DF.printSchema()
val v_Fqdn_Join_Df = v_FQDN_DF
.join(v_FQDN_Cursor_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Cursor_DF("key"),"fullouter")
v_Fqdn_Join_Df.printSchema()
v_Fqdn_Join_Df.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_FQDN_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
val docs_Replace = new java.util.ArrayList[BaseVertexFqdn]()
val docs_Insert = new java.util.ArrayList[BaseVertexFqdn]()
iter.foreach(row => {
val new_fqdn_name = row.getAs[String]("new_fqdn_name")
val new_fqdn_first_found_time = row.getAs[Long]("new_fqdn_first_found_time")
val new_fqdn_last_found_time = row.getAs[Long]("new_fqdn_last_found_time")
val new_fqdn_count_total = row.getAs[Long]("new_fqdn_count_total")
val fqdn = row.getAs[String]("key")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (fqdn != null) {
val document: BaseVertexFqdn = new BaseVertexFqdn()
document.setKey(new_fqdn_name)
document.setFQDN_NAME(new_fqdn_name)
document.setFQDN_FIRST_FOUND_TIME(v_Fqdn_First)
document.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
document.setFQDN_COUNT_TOTAL(v_Fqdn_Cnt+new_fqdn_count_total)
docs_Replace.add(document)
} else {
val baseDocument: BaseVertexFqdn = new BaseVertexFqdn()
baseDocument.setKey(new_fqdn_name)
baseDocument.setFQDN_NAME(new_fqdn_name)
baseDocument.setFQDN_FIRST_FOUND_TIME(new_fqdn_first_found_time)
baseDocument.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
baseDocument.setFQDN_COUNT_TOTAL(new_fqdn_count_total)
docs_Insert.add(baseDocument)
}
})
Try(v_FQDN_Coll.replaceDocuments(docs_Replace))
Try(v_FQDN_Coll.importDocuments(docs_Insert))
})
}
/**
* 更新IP点
*/
def updateIPVertex(v_IP_DF:DataFrame,v_IP_Cursor_DF: DataFrame): Unit ={
v_IP_DF.printSchema()
v_IP_Cursor_DF.printSchema()
val v_IP_Join_DF = v_IP_DF.join(v_IP_Cursor_DF,v_IP_DF("new_ip")===v_IP_Cursor_DF("key"),"fullouter")
v_IP_Join_DF.printSchema()
v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
val docs_Replace: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
iter.foreach(row => {
val new_Ip = row.getAs[String]("new_ip")
val new_Location = row.getAs[String]("new_location")
val new_Ip_First_Found_Time = row.getAs[Long]("new_ip_first_found_time")
val new_Ip_Last_Found_Time = row.getAs[Long]("new_ip_last_found_time")
val new_Ip_Count_Total = row.getAs[Long]("new_ip_count_total")
val key = row.getAs[String]("key")
val location = row.getAs[String]("IP_LOCATION")
val v_IP_First = row.getAs[Long]("FIRST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_APPEAR_COUNT")
if (key != null) {
val document = new BaseVertexIP()
document.setKey(key)
document.setIP(key)
document.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
document.setIP_APPEAR_COUNT(v_IP_Cnt+new_Ip_Count_Total)
document.setFIRST_FOUND_TIME(v_IP_First)
document.setIP_LOCATION(location)
docs_Replace.add(document)
} else {
val baseDocument = new BaseVertexIP()
baseDocument.setKey(new_Ip)
baseDocument.setIP(new_Ip)
baseDocument.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
baseDocument.setIP_APPEAR_COUNT(new_Ip_Count_Total)
baseDocument.setFIRST_FOUND_TIME(new_Ip_First_Found_Time)
baseDocument.setIP_LOCATION(new_Location)
docs_Insert.add(baseDocument)
}
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Replace))
})
}
/**
* 统计e_Address_Fqdn_to_IP
*/
def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame,e_Fqdn_Address_IP_Cursor_DF: DataFrame): Unit ={
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Fqdn_Address_IP_Cursor_DF.printSchema()
e_Fqdn_Address_IP_Cursor_DF.printSchema()
val e_Address_v_FQDN_to_v_IP_Join_DF = e_Address_v_FQDN_to_v_IP_DF
.join(e_Fqdn_Address_IP_Cursor_DF,
e_Address_v_FQDN_to_v_IP_DF("new_key")===e_Fqdn_Address_IP_Cursor_DF("key"),
"fullouter")
e_Address_v_FQDN_to_v_IP_Join_DF.printSchema()
e_Address_v_FQDN_to_v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
val docs_Replace: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEgdeFqdnAddressIP()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEgdeFqdnAddressIP = new BaseEgdeFqdnAddressIP()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame,e_IP_Visit_FQDN_Cursor_DF: DataFrame): Unit = {
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_IP_Visit_FQDN_Cursor_DF.printSchema()
e_IP_Visit_FQDN_Cursor_DF.printSchema()
val e_Visit_v_IP_to_v_FQDN_Join_DF = e_Visit_v_IP_to_v_FQDN_DF
.join(e_IP_Visit_FQDN_Cursor_DF, e_Visit_v_IP_to_v_FQDN_DF("new_key") === e_IP_Visit_FQDN_Cursor_DF("key"), "fullouter")
e_Visit_v_IP_to_v_FQDN_Join_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
val docs_Replace: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val to = row.getAs[String]("to")
val from = row.getAs[String]("from")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEdgeIPVisitFqdn()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEdgeIPVisitFqdn = new BaseEdgeIPVisitFqdn()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
}

View File

@@ -1,250 +0,0 @@
package cn.ac.iie.dao
import cn.ac.iie.etl.CursorTransform
import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
import cn.ac.iie.test.Config
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try
object UpdateArangoGraphByDF {
/**
* 更新FQDN点
* @param v_FQDN_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateFQDNVertex(v_FQDN_DF:DataFrame,spark:SparkSession): Unit ={
v_FQDN_DF.printSchema()
val v_FQDN_Cursor_DF = CursorTransform.cursorToDataFrame("V_FQDN",classOf[BaseVertexFqdn],spark)
val v_Fqdn_Join_Df = v_FQDN_DF
.join(v_FQDN_Cursor_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Cursor_DF("key"),"fullouter")
v_Fqdn_Join_Df.printSchema()
v_Fqdn_Join_Df.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_FQDN_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
val docs_Replace = new java.util.ArrayList[BaseVertexFqdn]()
val docs_Insert = new java.util.ArrayList[BaseVertexFqdn]()
iter.foreach(row => {
val new_fqdn_name = row.getAs[String]("new_fqdn_name")
val new_fqdn_first_found_time = row.getAs[Long]("new_fqdn_first_found_time")
val new_fqdn_last_found_time = row.getAs[Long]("new_fqdn_last_found_time")
val new_fqdn_count_total = row.getAs[Long]("new_fqdn_count_total")
val fqdn = row.getAs[String]("key")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (fqdn != null) {
val document: BaseVertexFqdn = new BaseVertexFqdn()
document.setKey(new_fqdn_name)
document.setFQDN_NAME(new_fqdn_name)
document.setFQDN_FIRST_FOUND_TIME(v_Fqdn_First)
document.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
document.setFQDN_COUNT_TOTAL(v_Fqdn_Cnt+new_fqdn_count_total)
docs_Replace.add(document)
} else {
val baseDocument: BaseVertexFqdn = new BaseVertexFqdn()
baseDocument.setKey(new_fqdn_name)
baseDocument.setFQDN_NAME(new_fqdn_name)
baseDocument.setFQDN_FIRST_FOUND_TIME(new_fqdn_first_found_time)
baseDocument.setFQDN_LAST_FOUND_TIME(new_fqdn_last_found_time)
baseDocument.setFQDN_COUNT_TOTAL(new_fqdn_count_total)
docs_Insert.add(baseDocument)
}
})
Try(v_FQDN_Coll.replaceDocuments(docs_Replace))
Try(v_FQDN_Coll.importDocuments(docs_Insert))
})
}
/**
* 更新IP点
* @param v_IP_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateIPVertex(v_IP_DF:DataFrame,spark:SparkSession): Unit ={
v_IP_DF.printSchema()
val v_IP_Cursor_DF = CursorTransform.cursorToDataFrame("V_IP",classOf[BaseVertexIP],spark)
val v_IP_Join_DF = v_IP_DF.join(v_IP_Cursor_DF,v_IP_DF("new_ip")===v_IP_Cursor_DF("key"),"fullouter")
v_IP_Join_DF.printSchema()
v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
val docs_Replace: java.util.ArrayList[BaseVertexIP] = new java.util.ArrayList[BaseVertexIP]()
iter.foreach(row => {
val new_Ip = row.getAs[String]("new_ip")
val new_Location = row.getAs[String]("new_location")
val new_Ip_First_Found_Time = row.getAs[Long]("new_ip_first_found_time")
val new_Ip_Last_Found_Time = row.getAs[Long]("new_ip_last_found_time")
val new_Ip_Count_Total = row.getAs[Long]("new_ip_count_total")
val key = row.getAs[String]("key")
val location = row.getAs[String]("IP_LOCATION")
val v_IP_First = row.getAs[Long]("FIRST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_APPEAR_COUNT")
if (key != null) {
val document = new BaseVertexIP()
document.setKey(key)
document.setIP(key)
document.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
document.setIP_APPEAR_COUNT(v_IP_Cnt+new_Ip_Count_Total)
document.setFIRST_FOUND_TIME(v_IP_First)
document.setIP_LOCATION(location)
docs_Replace.add(document)
} else {
val baseDocument = new BaseVertexIP()
baseDocument.setKey(new_Ip)
baseDocument.setIP(new_Ip)
baseDocument.setLAST_FOUND_TIME(new_Ip_Last_Found_Time)
baseDocument.setIP_APPEAR_COUNT(new_Ip_Count_Total)
baseDocument.setFIRST_FOUND_TIME(new_Ip_First_Found_Time)
baseDocument.setIP_LOCATION(new_Location)
docs_Insert.add(baseDocument)
}
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Replace))
})
}
/**
* 统计e_Address_Fqdn_to_IP
* @param e_Address_v_FQDN_to_v_IP_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF:DataFrame,spark:SparkSession): Unit ={
e_Address_v_FQDN_to_v_IP_DF.printSchema()
val e_Fqdn_Address_IP_Cursor_DF = CursorTransform
.cursorToDataFrame("E_ADDRESS_V_FQDN_TO_V_IP",classOf[BaseEgdeFqdnAddressIP],spark)
e_Fqdn_Address_IP_Cursor_DF.printSchema()
val e_Address_v_FQDN_to_v_IP_Join_DF = e_Address_v_FQDN_to_v_IP_DF
.join(e_Fqdn_Address_IP_Cursor_DF,
e_Address_v_FQDN_to_v_IP_DF("new_key")===e_Fqdn_Address_IP_Cursor_DF("key"),
"fullouter")
e_Address_v_FQDN_to_v_IP_Join_DF.printSchema()
e_Address_v_FQDN_to_v_IP_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
val docs_Replace: java.util.ArrayList[BaseEgdeFqdnAddressIP] = new java.util.ArrayList[BaseEgdeFqdnAddressIP]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val from = row.getAs[String]("from")
val to = row.getAs[String]("to")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEgdeFqdnAddressIP()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEgdeFqdnAddressIP = new BaseEgdeFqdnAddressIP()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
/**
* 统计e_Visit_v_IP_to_v_FQDN
* @param e_Visit_v_IP_to_v_FQDN_DF //读取clickhouse结果集
* @param spark //sparkSession引擎
*/
def updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF:DataFrame,spark:SparkSession): Unit = {
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
val e_IP_Visit_FQDN_Cursor_DF = CursorTransform
.cursorToDataFrame("E_VISIT_V_IP_TO_V_FQDN",classOf[BaseEdgeIPVisitFqdn],spark)
e_IP_Visit_FQDN_Cursor_DF.printSchema()
val e_Visit_v_IP_to_v_FQDN_Join_DF = e_Visit_v_IP_to_v_FQDN_DF
.join(e_IP_Visit_FQDN_Cursor_DF, e_Visit_v_IP_to_v_FQDN_DF("new_key") === e_IP_Visit_FQDN_Cursor_DF("key"), "fullouter")
e_Visit_v_IP_to_v_FQDN_Join_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_Join_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
val docs_Replace: java.util.ArrayList[BaseEdgeIPVisitFqdn] = new java.util.ArrayList[BaseEdgeIPVisitFqdn]()
iter.foreach(row => {
val new_Fqdn = row.getAs[String]("new_fqdn")
val new_IP = row.getAs[String]("new_ip")
val new_Key = row.getAs[String]("new_key")
val new_First_Found_Time = row.getAs[Long]("new_first_found_time")
val new_Last_Found_Time = row.getAs[Long]("new_last_found_time")
val new_Count_Total = row.getAs[Long]("new_count_total")
val to = row.getAs[String]("to")
val from = row.getAs[String]("from")
val key = row.getAs[String]("key")
val e_First_Time = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Count_Total = row.getAs[Long]("COUNT_TOTAL")
if (key != null) {
val document = new BaseEdgeIPVisitFqdn()
document.setKey(key)
document.setFrom(from)
document.setTo(to)
document.setLAST_FOUND_TIME(new_Last_Found_Time)
document.setFIRST_FOUND_TIME(e_First_Time)
document.setCOUNT_TOTAL(new_Count_Total+e_Count_Total)
docs_Replace.add(document)
} else {
val baseDocument: BaseEdgeIPVisitFqdn = new BaseEdgeIPVisitFqdn()
baseDocument.setKey(new_Key)
baseDocument.setFrom(s"V_FQDN/$new_Fqdn")
baseDocument.setTo(s"V_IP/$new_IP")
baseDocument.setLAST_FOUND_TIME(new_Last_Found_Time)
baseDocument.setFIRST_FOUND_TIME(new_First_Found_Time)
baseDocument.setCOUNT_TOTAL(new_Count_Total)
docs_Insert.add(baseDocument)
}
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.replaceDocuments(docs_Replace))
})
}
}

View File

@@ -1,33 +0,0 @@
package cn.ac.iie.etl
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import com.arangodb.ArangoCursor
import com.arangodb.entity.BaseDocument
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.reflect.ClassTag
object CursorTransform {
/**
* 将查询ArangoDB的结果集转换为DataFrame
* @param collection_Name //查询的collection
* @param class_Type //转换的pojo类对象
* @param spark / /sparkSession引擎
* @tparam T
* @return
*/
def cursorToDataFrame[T:ClassTag](collection_Name:String,class_Type: Class[T],spark:SparkSession): DataFrame ={
val query = s"FOR doc IN $collection_Name RETURN doc"
println(query)
val cursor: ArangoCursor[T] = InitArangoDBPool.arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(query, InitArangoDBPool.bindVars, InitArangoDBPool.options, class_Type)
val cursor_DF = spark.createDataFrame(cursor.asListRemaining(),class_Type)
cursor_DF.printSchema()
cursor_DF
}
}

View File

@@ -1,29 +0,0 @@
package cn.ac.iie.main
import cn.ac.iie.test.Config
import cn.ac.iie.dao.BaseMediaDataLoad
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
object IPLearningApplication {
private val logger: Logger = LoggerFactory.getLogger(IPLearningApplication.getClass)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
BaseMediaDataLoad.loadMediaDate(spark)
// BaseMediaDataLoad.
}
}

View File

@@ -1,34 +0,0 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseEdgeIPVisitFqdn {
@BeanProperty
@DocumentField(Type.FROM)
var from: String=""
@BeanProperty
@DocumentField(Type.TO)
var to: String=""
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var COUNT_TOTAL:Long = 0
}

View File

@@ -1,34 +0,0 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseEgdeFqdnAddressIP {
@BeanProperty
@DocumentField(Type.FROM)
var from: String=""
@BeanProperty
@DocumentField(Type.TO)
var to: String=""
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var COUNT_TOTAL:Long = 0
}

View File

@@ -1,30 +0,0 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseVertexFqdn {
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FQDN_FIRST_FOUND_TIME:Long = 0
@BeanProperty
var FQDN_LAST_FOUND_TIME:Long = 0
@BeanProperty
var FQDN_COUNT_TOTAL:Long = 0
@BeanProperty
var FQDN_NAME:String = ""
}

View File

@@ -1,32 +0,0 @@
package cn.ac.iie.pojo
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class BaseVertexIP {
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var IP_APPEAR_COUNT:Long = 0
@BeanProperty
var IP:String = ""
@BeanProperty
var IP_LOCATION:String = ""
}

View File

@@ -1,52 +0,0 @@
package cn.ac.iie.test
import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByArangoSpark}
import cn.ac.iie.pojo.{BaseEdgeIPVisitFqdn, BaseEgdeFqdnAddressIP, BaseVertexFqdn, BaseVertexIP}
import cn.ac.iie.utils.ConfigUtils
import com.arangodb.spark.rdd.ArangoRDD
import com.arangodb.spark.{ArangoSpark, ReadOptions}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ArangoDBSparkTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(ConfigUtils.SPARK_APP_NAME)
.config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
.config("spark.network.timeout", ConfigUtils.SPARK_NETWORK_TIMEOUT)
.config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", ConfigUtils.SPARK_EXECUTOR_MEMORY)
.config("arangodb.hosts", "192.168.40.127:8529")
.config("arangodb.user", ConfigUtils.ARANGODB_USER)
.config("arangodb.password", ConfigUtils.ARANGODB_PASSWORD)
.config("arangodb.maxConnections",ConfigUtils.MAXPOOLSIZE)
.master(ConfigUtils.MASTER)
.getOrCreate()
BaseMediaDataLoad.loadMediaDate(spark)
val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
val v_IP_DF = BaseMediaDataLoad.getIPVertexFromMedia(spark)
val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
val e_Visit_v_IP_to_v_FQDN_DF= BaseMediaDataLoad.getIPVisitFQDNEdgeFromMedia(spark)
val v_FQDN_Cursor_Rdd: ArangoRDD[BaseVertexFqdn] = ArangoSpark.load[BaseVertexFqdn](spark.sparkContext, "V_FQDN", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val v_FQDN_Cursor_DF: DataFrame = spark.createDataFrame(v_FQDN_Cursor_Rdd,classOf[BaseVertexFqdn])
val v_IP_Cursor_Rdd: ArangoRDD[BaseVertexIP] = ArangoSpark.load[BaseVertexIP](spark.sparkContext, "V_IP", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val v_IP_Cursor_DF: DataFrame = spark.createDataFrame(v_IP_Cursor_Rdd,classOf[BaseVertexIP])
val e_Fqdn_Address_IP_Cursor_Rdd: ArangoRDD[BaseEgdeFqdnAddressIP] = ArangoSpark.load[BaseEgdeFqdnAddressIP](spark.sparkContext, "E_ADDRESS_V_FQDN_TO_V_IP", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val e_Fqdn_Address_IP_Cursor_DF: DataFrame = spark.createDataFrame(e_Fqdn_Address_IP_Cursor_Rdd,classOf[BaseEgdeFqdnAddressIP])
val e_IP_Visit_FQDN_Cursor_Rdd: ArangoRDD[BaseEdgeIPVisitFqdn] = ArangoSpark.load[BaseEdgeIPVisitFqdn](spark.sparkContext, "E_VISIT_V_IP_TO_V_FQDN", ReadOptions(ConfigUtils.ARANGODB_DB_NAME))
val e_IP_Visit_FQDN_Cursor_DF: DataFrame = spark.createDataFrame(e_IP_Visit_FQDN_Cursor_Rdd,classOf[BaseEdgeIPVisitFqdn])
UpdateArangoGraphByArangoSpark.updateFQDNVertex(v_FQDN_DF,v_FQDN_Cursor_DF)
UpdateArangoGraphByArangoSpark.updateIPVertex(v_IP_DF,v_IP_Cursor_DF)
UpdateArangoGraphByArangoSpark.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,e_Fqdn_Address_IP_Cursor_DF)
UpdateArangoGraphByArangoSpark.updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF,e_IP_Visit_FQDN_Cursor_DF)
spark.close()
System.exit(1)
}
}

View File

@@ -1,37 +0,0 @@
package cn.ac.iie.test
import com.arangodb.entity.BaseDocument
import com.arangodb.model.AqlQueryOptions
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
object ArangoDbReadV_IPTest {
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
val bindVars = new MapBuilder().get
val options = new AqlQueryOptions()
.ttl(Config.ARANGODB_TTL)
val v_IP_Mutabal_Map = new java.util.HashMap[String,BaseDocument](16048576,0.9f)
val v_IP_Query = "FOR doc IN V_IP RETURN doc"
val v_IP_Cursor: ArangoCursor[BaseDocument] = arangoDB.db("insert_iplearn_index")
.query(v_IP_Query, bindVars, options, classOf[BaseDocument])
while (v_IP_Cursor.hasNext){
val document = v_IP_Cursor.next()
v_IP_Mutabal_Map.put(document.getKey ,document)
}
println(v_IP_Mutabal_Map.size())
arangoDB.shutdown()
}
}

View File

@@ -1,314 +0,0 @@
package cn.ac.iie.test
import com.arangodb.ArangoDB
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.util.Try
object ArangoDbTest {
private val logger: Logger = LoggerFactory.getLogger(ReadClickhouseTest.getClass)
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
// val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", Config.SPARK_SERIALIZER)
// .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
/*
.config("spark.driver.host", "192.168.41.79")
.config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
.master("spark://192.168.40.119:7077")
*/
.getOrCreate()
logger.warn("sparkession获取成功")
// val sql = "(select * from av_miner.media_expire_patch_local limit 1000)"
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
// .option("dbtable", "av_miner.media_expire_patch")
// .option("dbtable", "(select * from av_miner.media_expire_patch limit 10)")
// .option("dbtable","(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where media_domain not LIKE '%\\n%')")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
// .option("dbtable","(select media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region,min(recv_time) as min_recv_time,max(recv_time) as max_recv_time from av_miner.media_expire_patch group by media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region limit 10)")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
// mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
// val mediaDataGlobalView = spark.sql("select * from global_temp.media_expire_patch limit 10")
// mediaDataGlobalView.show()
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as IP,
s1_s_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as IP,
s1_d_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP", "location").agg(
min("IP_FIRST_FOUND_TIME").as("IP_FIRST_FOUND_TIME"),
max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
count("IP").as("IP_COUNT_TOTAL")
)
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS V_IP,
| media_domain AS V_FQDN,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
/**
* 获取数据库连接
*/
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
/**
* 更新FQDN点
*/
v_FQDN_DF.printSchema()
v_FQDN_DF.foreachPartition(iter => {
val v_FQDN_Coll = arangoDB.db("insert_iplearn_index").collection("V_FQDN")
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
if (v_FQDN_Coll.documentExists(fqdn)) {
val document: BaseDocument = v_FQDN_Coll.getDocument(fqdn, classOf[BaseDocument])
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_FQDN_Coll.importDocuments(docs_Insert))
Try(v_FQDN_Coll.updateDocuments(docs_Update))
})
/**
* 更新IP点
*/
v_IP_DF.printSchema()
v_IP_DF.foreachPartition(iter => {
val v_IP_Coll = arangoDB.db("insert_iplearn_index").collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
if (v_IP_Coll.documentExists(ip)) {
val document: BaseDocument = v_IP_Coll.getDocument(ip, classOf[BaseDocument])
val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(ip)
baseDocument.addAttribute("IP", ip)
baseDocument.addAttribute("IP_LOCATION", location)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Update))
})
/**
* 统计e_Address_Fqdn_to_IP
*/
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Add_Fqdn_to_IP_Coll.documentExists(fqdn+"-"+ip)) {
val document: BaseEdgeDocument = e_Add_Fqdn_to_IP_Coll.getDocument(fqdn+"-"+ip, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(fqdn+"-"+ip)
baseDocument.setFrom(s"V_FQDN/$fqdn")
baseDocument.setTo(s"V_IP/$ip")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
// println(fqdn+"-"+ip)
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = arangoDB.db("insert_iplearn_index").collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
if (e_Visit_Fqdn_to_IP_Coll.documentExists(ip+"-"+fqdn)) {
val document: BaseEdgeDocument = e_Visit_Fqdn_to_IP_Coll.getDocument(ip+"-"+fqdn, classOf[BaseEdgeDocument])
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(ip+"-"+fqdn)
baseDocument.setFrom(s"V_IP/$ip")
baseDocument.setTo(s"V_FQDN/$fqdn")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
i+=1
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
arangoDB.shutdown()
}
}

View File

@@ -1,355 +0,0 @@
package cn.ac.iie.test
import cn.ac.iie.utils.ConfigUtils
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import com.arangodb.model.AqlQueryOptions
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.util.Try
object ArangoDbTestMemory {
private val logger: Logger = LoggerFactory.getLogger(ArangoDbTestMemory.getClass)
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", Config.SPARK_SERIALIZER)
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.option("socket_timeout",Config.CLICKHOUSE_SOCKET_TIMEOUT)
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as IP,
s1_s_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as IP,
s1_d_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP", "location").agg(
min("IP_FIRST_FOUND_TIME").as("IP_FIRST_FOUND_TIME"),
max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
count("IP").as("IP_COUNT_TOTAL")
)
val e_Address_v_FQDN_to_v_IP_DF: DataFrame = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS V_IP,
| media_domain AS V_FQDN,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
/**
* 获取数据库连接
*/
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host(ConfigUtils.ARANGODB_HOST, ConfigUtils.ARANGODB_PORT)
.user(ConfigUtils.ARANGODB_USER)
.password(ConfigUtils.ARANGODB_PASSWORD)
.build
val bindVars = new MapBuilder().get
val options = new AqlQueryOptions()
.ttl(Config.ARANGODB_TTL)
val v_FQDN_Mutabal_Map = new java.util.HashMap[String,BaseDocument](1048576,0.9f)
val v_IP_Mutabal_Map = new java.util.HashMap[String,BaseDocument](16048576,0.9f)
val e_FQDN_Address_IP_Mutabal_Map = new java.util.HashMap[String,BaseEdgeDocument](1048576,0.9f)
val e_IP_Visit_FQDN_Mutabal_Map = new java.util.HashMap[String,BaseEdgeDocument](30408576,0.9f)
/**
* 更新FQDN点
*/
val v_FQDN_Query = "FOR doc IN V_FQDN RETURN doc"
val v_FQDN_Cursor: ArangoCursor[BaseDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(v_FQDN_Query, bindVars, options, classOf[BaseDocument])
while (v_FQDN_Cursor.hasNext){
val document = v_FQDN_Cursor.next()
v_FQDN_Mutabal_Map.put(document.getKey,document)
}
val v_FQDN_Map= spark.sparkContext.broadcast(v_FQDN_Mutabal_Map)
v_FQDN_Mutabal_Map.clear()
v_FQDN_DF.show(20)
v_FQDN_DF.printSchema()
v_FQDN_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_FQDN_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_FQDN")
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
val doc = v_FQDN_Map.value.getOrDefault(fqdn, null)
if (doc != null) {
val document: BaseDocument = doc
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
})
// Try(v_FQDN_Coll.importDocuments(docs_Insert))
v_FQDN_Coll.importDocuments(docs_Insert)
Try(v_FQDN_Coll.updateDocuments(docs_Update))
})
v_FQDN_Map.destroy()
/**
* 更新IP点
*/
val v_IP_Query = "FOR doc IN V_IP RETURN doc"
val v_IP_Cursor: ArangoCursor[BaseDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(v_IP_Query, bindVars, options, classOf[BaseDocument])
while (v_IP_Cursor.hasNext){
val document = v_IP_Cursor.next()
v_IP_Mutabal_Map.put(document.getKey ,document)
}
val v_IP_Map = spark.sparkContext.broadcast(v_IP_Mutabal_Map)
// val v_IP_Map = v_IP_Mutabal_Map.toMap
v_IP_Mutabal_Map.clear()
v_IP_DF.printSchema()
v_IP_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val v_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("V_IP")
val docs_Insert: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
val docs_Update: java.util.ArrayList[BaseDocument] = new java.util.ArrayList[BaseDocument]()
var i = 0
iter.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
val doc = v_IP_Map.value.getOrDefault(ip, null)
if (doc != null) {
val document: BaseDocument = doc
val ip_Cnt = Try(document.getAttribute("IP_APPEAR_COUNT")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", v_IP_Last)
document.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt+ip_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(ip)
baseDocument.addAttribute("IP", ip)
baseDocument.addAttribute("IP_LOCATION", location)
baseDocument.addAttribute("FIRST_FOUND_TIME", v_IP_First)
baseDocument.addAttribute("LAST_FOUND_TIME", v_IP_Last)
baseDocument.addAttribute("IP_APPEAR_COUNT", v_IP_Cnt)
docs_Insert.add(baseDocument)
}
})
Try(v_IP_Coll.importDocuments(docs_Insert))
Try(v_IP_Coll.updateDocuments(docs_Update))
})
v_IP_Map.destroy()
/**
* 统计e_Address_Fqdn_to_IP
*/
val e_FQDN_Address_IP_Query = "FOR doc IN E_ADDRESS_V_FQDN_TO_V_IP RETURN doc"
val e_FQDN_Address_IP_Cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(e_FQDN_Address_IP_Query, bindVars, options, classOf[BaseEdgeDocument])
while (e_FQDN_Address_IP_Cursor.hasNext){
val document = e_FQDN_Address_IP_Cursor.next()
e_FQDN_Address_IP_Mutabal_Map.put(document.getKey ,document)
}
val e_FQDN_Address_IP_Map = spark.sparkContext.broadcast(e_FQDN_Address_IP_Mutabal_Map)
e_FQDN_Address_IP_Mutabal_Map.clear()
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Add_Fqdn_to_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val doc = e_FQDN_Address_IP_Map.value.getOrDefault(fqdn+"-"+ip, null)
if (doc != null) {
val document: BaseEdgeDocument = doc
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.setFrom(s"V_FQDN/$fqdn")
document.setTo(s"V_IP/$ip")
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(fqdn+"-"+ip)
baseDocument.setFrom(s"V_FQDN/$fqdn")
baseDocument.setTo(s"V_IP/$ip")
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
})
Try(e_Add_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Add_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
e_FQDN_Address_IP_Map.destroy()
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
val e_IP_Visit_FQDN_Query = "FOR doc IN E_VISIT_V_IP_TO_V_FQDN RETURN doc"
val e_IP_Visit_FQDN_Cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME)
.query(e_IP_Visit_FQDN_Query, bindVars, options, classOf[BaseEdgeDocument])
while (e_IP_Visit_FQDN_Cursor.hasNext){
val document = e_IP_Visit_FQDN_Cursor.next()
e_IP_Visit_FQDN_Mutabal_Map.put(document.getKey ,document)
}
val e_IP_Visit_FQDN_Map = spark.sparkContext.broadcast(e_IP_Visit_FQDN_Mutabal_Map)
e_IP_Visit_FQDN_Mutabal_Map.clear()
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.coalesce(Config.REPARTITION_NUMBER).foreachPartition(iter => {
val e_Visit_Fqdn_to_IP_Coll = arangoDB.db(ConfigUtils.ARANGODB_DB_NAME).collection("E_VISIT_V_IP_TO_V_FQDN")
val docs_Insert: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
val docs_Update: java.util.ArrayList[BaseEdgeDocument] = new java.util.ArrayList[BaseEdgeDocument]()
var i = 0
iter.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val doc = e_IP_Visit_FQDN_Map.value.getOrDefault(ip+"-"+fqdn, null)
if (doc != null) {
val document: BaseEdgeDocument = doc
val e_new_Cnt = Try(document.getAttribute("COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("LAST_FOUND_TIME", e_Last)
document.addAttribute("COUNT_TOTAL", e_new_Cnt+e_Cnt)
docs_Update.add(document)
} else {
val baseDocument: BaseEdgeDocument = new BaseEdgeDocument()
baseDocument.setKey(ip+"-"+fqdn)
baseDocument.setFrom("V_IP/"+ip)
baseDocument.setTo("V_FQDN/"+fqdn)
baseDocument.addAttribute("COUNT_TOTAL",e_Cnt)
baseDocument.addAttribute("FIRST_FOUND_TIME",e_First)
baseDocument.addAttribute("LAST_FOUND_TIME",e_Last)
docs_Insert.add(baseDocument)
}
})
Try(e_Visit_Fqdn_to_IP_Coll.importDocuments(docs_Insert))
Try(e_Visit_Fqdn_to_IP_Coll.updateDocuments(docs_Update))
})
e_IP_Visit_FQDN_Map.destroy()
arangoDB.shutdown()
spark.close()
}
}

View File

@@ -1,40 +0,0 @@
package cn.ac.iie.test
import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByDF}
import cn.ac.iie.utils.{ConfigUtils, InitArangoDBPool}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
object ArangoDbTestMemoryGroupBy {
private val logger: Logger = LoggerFactory.getLogger(ArangoDbTestMemoryGroupBy.getClass)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(ConfigUtils.SPARK_APP_NAME)
.config("spark.serializer", ConfigUtils.SPARK_SERIALIZER)
.config("spark.network.timeout", ConfigUtils.SPARK_NETWORK_TIMEOUT)
.config("spark.sql.shuffle.partitions", ConfigUtils.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", ConfigUtils.SPARK_EXECUTOR_MEMORY)
.master(ConfigUtils.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
BaseMediaDataLoad.loadMediaDate(spark)
val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
val v_IP_DF = BaseMediaDataLoad.getIPVertexFromMedia(spark)
val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
val e_Visit_v_IP_to_v_FQDN_DF= BaseMediaDataLoad.getIPVisitFQDNEdgeFromMedia(spark)
UpdateArangoGraphByDF.updateFQDNVertex(v_FQDN_DF,spark)
UpdateArangoGraphByDF.updateIPVertex(v_IP_DF,spark)
UpdateArangoGraphByDF.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,spark)
UpdateArangoGraphByDF.updateIPVisitFQDNEdge(e_Visit_v_IP_to_v_FQDN_DF,spark)
InitArangoDBPool.arangoDB.shutdown()
spark.close()
}
}

View File

@@ -1,22 +0,0 @@
package cn.ac.iie.test
import com.typesafe.config.{Config, ConfigFactory}
object Config {
private lazy val config: Config = ConfigFactory.load()
val SPARK_SQL_SHUFFLE_PARTITIONS: String = config.getString("spark.sql.shuffle.partitions")
val SPARK_SQL_READ_FETCHSIZE: String = config.getString("spark.sql.read.fetchsize")
val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory")
val NUMPARTITIONS: String = config.getString("numPartitions")
val MASTER: String = config.getString("master")
val MAXPOOLSIZE: Int = config.getInt("maxPoolSize")
val MINTIME: String = config.getString("minTime")
val MAXTIME: String = config.getString("maxTime")
val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber")
val ARANGODB_BATCH: Int = config.getInt("arangoDB.batch")
val ARANGODB_TTL: Int = config.getInt("arangoDB.ttl")
val CLICKHOUSE_SOCKET_TIMEOUT: Int = config.getInt("clickhouse.socket.timeout")
val SPARK_SERIALIZER: String = config.getString("spark.serializer")
}

View File

@@ -1,447 +0,0 @@
package cn.ac.iie.test
import com.orientechnologies.orient.core.db.{ODatabasePool, OPartitionedDatabasePool}
import com.orientechnologies.orient.core.sql.OCommandSQL
import com.tinkerpop.blueprints.impls.orient.{OrientGraph, OrientGraphFactory}
import com.tinkerpop.blueprints.{Direction, Edge, Vertex}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.util.Try
object ReadClickhouseTest {
private val logger: Logger = LoggerFactory.getLogger(ReadClickhouseTest.getClass)
@transient
var factory: OrientGraphFactory = _
@transient
var pool: ODatabasePool = _
def main(args: Array[String]): Unit = {
// val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .config("spark.kryo.classesToRegister","com.tinkerpop.blueprints.impls.orient.OrientGraphFactory")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
/*
.config("spark.driver.host", "192.168.41.79")
.config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
.master("spark://192.168.40.119:7077")
*/
.getOrCreate()
logger.warn("sparkession获取成功")
// val sql = "(select * from av_miner.media_expire_patch_local limit 1000)"
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
// .option("dbtable", "av_miner.media_expire_patch")
// .option("dbtable", "(select * from av_miner.media_expire_patch limit 10)")
// .option("dbtable","(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where media_domain not LIKE '%\\n%')")
.option("dbtable",s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
// .option("dbtable","(select media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region,min(recv_time) as min_recv_time,max(recv_time) as max_recv_time from av_miner.media_expire_patch group by media_domain,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region limit 10)")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
// mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
// val mediaDataGlobalView = spark.sql("select * from global_temp.media_expire_patch limit 10")
// mediaDataGlobalView.show()
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val s_IP_DF = spark.sql(
"""
select
s1_s_ip as IP,
s1_s_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
val d_IP_DF = spark.sql(
"""
select
s1_d_ip as IP,
s1_d_location_region as location,
MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
MAX( recv_time ) AS IP_LAST_FOUND_TIME,
COUNT( * ) AS IP_COUNT_TOTAL
from global_temp.media_expire_patch
GROUP BY
IP,
location
""".stripMargin)
import org.apache.spark.sql.functions._
val v_IP_DF = s_IP_DF.union(d_IP_DF).groupBy("IP","location").agg(
min("IP_FIRST_FOUND_TIME") .as("IP_FIRST_FOUND_TIME"),
max("IP_LAST_FOUND_TIME").as("IP_LAST_FOUND_TIME"),
count("IP").as("IP_COUNT_TOTAL")
)
/*
val v_IP_DF = spark.sql(
"""
|SELECT
| IP,
| location,
| MIN( recv_time ) AS IP_FIRST_FOUND_TIME,
| MAX( recv_time ) AS IP_LAST_FOUND_TIME,
| COUNT( * ) AS IP_COUNT_TOTAL
|FROM
| (
| ( SELECT s1_s_ip AS IP, s1_s_location_region AS location, recv_time FROM global_temp.media_expire_patch ) UNION ALL
| ( SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM global_temp.media_expire_patch )
| )
|GROUP BY
| IP,
| location
""".stripMargin)
*/
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain
""".stripMargin)
val e_Visit_v_IP_to_v_FQDN_DF = spark.sql(
"""
|SELECT
| s1_s_ip AS V_IP,
| media_domain AS V_FQDN,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( s1_s_ip != '' )
| AND ( media_domain != '' )
|GROUP BY
| s1_s_ip,
| media_domain
""".stripMargin)
/**
* 获取数据库连接
*/
val uri: String = "remote:192.168.40.127/iplearning-insert"
// val uri: String = "remote:192.168.40.207/iplearing-test"
// val uri: String = "remote:192.168.40.152:2424;192.168.40.151:2424:192.168.40.153:2424/iplearing-test"
val pool = new OPartitionedDatabasePool(uri, "root", "111111", Config.MAXPOOLSIZE, Config.MAXPOOLSIZE)
factory = new OrientGraphFactory(uri, "root", "111111", pool)
factory.setConnectionStrategy("ROUND_ROBIN_CONNECT")
/**
* 更新FQDN点
*/
v_FQDN_DF.printSchema()
v_FQDN_DF.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
// val graph = factory.getNoTx
val graph: OrientGraph = factory.getTx
var v_Fqdn_Obj: Vertex = null
import scala.collection.JavaConversions._
if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
val update_Fqdn_Last = v.getProperty[Long]("LAST_FOUND_TIME")
val update_Fqdn_Cnt = v.getProperty[Long]("FQDN_APPEAR_COUNT")
val sqlComm = new OCommandSQL(
s"UPDATE v_FQDN SET LAST_FOUND_TIME = $update_Fqdn_Last,FQDN_APPEAR_COUNT = ${update_Fqdn_Cnt + v_Fqdn_Cnt} WHERE FQDN_NAME == '$fqdn'")
Try(graph.command(sqlComm).execute())
println("update fqdn:"+fqdn)
v_Fqdn_Obj = v
}
} else {
v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
v_Fqdn_Obj.setProperty("FQDN_NAME", fqdn)
v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", v_Fqdn_First)
v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", v_Fqdn_Last)
v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", v_Fqdn_Cnt)
println("insert fqdn:"+fqdn)
}
var i = 0
i = i+1
if (i == 10000){
graph.commit()
}
})
factory.getTx.commit()
/**
* 更新IP点
*/
v_IP_DF.printSchema()
v_IP_DF.foreach(row => {
val ip = row.getAs[String]("IP")
val location = row.getAs[String]("location")
val v_IP_First = row.getAs[Long]("IP_FIRST_FOUND_TIME")
val v_IP_Last = row.getAs[Long]("IP_LAST_FOUND_TIME")
val v_IP_Cnt = row.getAs[Long]("IP_COUNT_TOTAL")
// val graph = factory.getNoTx
val graph = factory.getTx
var v_IP_Obj: Vertex = null
import scala.collection.JavaConversions._
if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
val update_IP_Last = v.getProperty[Long]("LAST_FOUND_TIME")
val update_IP_Cnt = v.getProperty[Long]("IP_APPEAR_COUNT")
val sqlComm = new OCommandSQL(
s"UPDATE v_IP SET LAST_FOUND_TIME = $update_IP_Last,FQDN_APPEAR_COUNT = ${update_IP_Cnt + v_IP_Cnt} "
+ s"WHERE IP == '$ip'")
Try(graph.command(sqlComm).execute())
println("update ip:"+ip)
v_IP_Obj = v
}
} else {
v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("IP", ip)
v_IP_Obj.setProperty("IP_LOCATION", location)
v_IP_Obj.setProperty("FIRST_FOUND_TIME", v_IP_First)
v_IP_Obj.setProperty("LAST_FOUND_TIME", v_IP_Last)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", v_IP_Cnt)
println("insert ip:"+ip)
}
var i = 0
i = i+1
if (i == 10000){
graph.commit()
}
})
factory.getTx.commit()
/**
* 统计e_Address_Fqdn_to_IP
*/
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val graph = factory.getNoTx
var v_Fqdn_Obj: Vertex = null
var v_IP_Obj: Vertex = null
var e_Edge_Obj:Edge = null
import scala.collection.JavaConversions._
//获取fqdn点
if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
v_Fqdn_Obj = v
}
} else {
v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
v_Fqdn_Obj.setProperty("FQDN_NAME", fqdn)
v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", 0)
}
//获取IP点
if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
v_IP_Obj = v
}
} else {
v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("IP", ip)
v_IP_Obj.setProperty("IP_LOCATION", "")
v_IP_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_IP_Obj.setProperty("LAST_FOUND_TIME", 0)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", 0)
}
// println("e_address_egde:"+v_Fqdn_Obj.getProperty[String]("FQDN_NAME")+"-"+v_IP_Obj.getProperty[String]("IP"))
//添加或更新边
for (e: Edge <- v_Fqdn_Obj.getEdges(Direction.OUT)) {
if (e.getVertex(Direction.IN).getProperty[String]("IP") == ip){
val cnt = e.getProperty[Long]("COUNT_TOTAL")
e.setProperty("COUNT_TOTAL",e_Cnt+cnt)
e.setProperty("LAST_FOUND_TIME",e_Last)
println("update e_address_egde:"+fqdn+"-"+ip)
e_Edge_Obj = e
}
}
if (e_Edge_Obj != null){
val newEdge = graph.addEdge(null, v_Fqdn_Obj, v_IP_Obj, "E_ADDRESS_V_FQDN_TO_V_IP")
newEdge.setProperty("COUNT_TOTAL",e_Cnt)
newEdge.setProperty("FIRST_FOUND_TIME",e_First)
newEdge.setProperty("LAST_FOUND_TIME",e_Last)
println("insert e_address_egde:"+fqdn+"-"+ip)
}
})
/**
* 统计e_Visit_v_IP_to_v_FQDN
*/
e_Visit_v_IP_to_v_FQDN_DF.printSchema()
e_Visit_v_IP_to_v_FQDN_DF.foreach(row => {
val fqdn = row.getAs[String]("V_FQDN")
val ip = row.getAs[String]("V_IP")
val e_First = row.getAs[Long]("FIRST_FOUND_TIME")
val e_Last = row.getAs[Long]("LAST_FOUND_TIME")
val e_Cnt = row.getAs[Long]("COUNT_TOTAL")
val graph = factory.getNoTx
var v_Fqdn_Obj: Vertex = null
var v_IP_Obj: Vertex = null
var e_Edge_Obj:Edge = null
import scala.collection.JavaConversions._
//添加或更新fqdn点
if (graph.getVertices("v_FQDN.FQDN_NAME", fqdn).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_FQDN.FQDN_NAME", fqdn)) {
v_Fqdn_Obj = v
}
} else {
v_Fqdn_Obj = graph.addVertex("class:v_FQDN", Nil: _*)
v_Fqdn_Obj.setProperty("IP", ip)
v_Fqdn_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("LAST_FOUND_TIME", 0)
v_Fqdn_Obj.setProperty("FQDN_APPEAR_COUNT", 0)
}
//添加或更新IP点
if (graph.getVertices("v_IP.IP", ip).nonEmpty) {
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
v_IP_Obj = v
}
} else {
v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("FQDN_NAME", fqdn)
v_IP_Obj.setProperty("IP_LOCATION", "")
v_IP_Obj.setProperty("FIRST_FOUND_TIME", 0)
v_IP_Obj.setProperty("LAST_FOUND_TIME", 0)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", 0)
}
// println("e_visit_egde:"+v_Fqdn_Obj.getProperty[String]("FQDN_NAME")+"-"+v_IP_Obj.getProperty[String]("IP"))
//添加或更新边
for (e: Edge <- v_IP_Obj.getEdges(Direction.OUT)) {
if (e.getVertex(Direction.IN).getProperty[String]("FQDN_NAME") == fqdn){
val cnt = e.getProperty[Long]("COUNT_TOTAL")
e.setProperty("COUNT_TOTAL",e_Cnt+cnt)
e.setProperty("LAST_FOUND_TIME",e_Last)
println("update e_visit_egde:"+fqdn+"-"+ip)
e_Edge_Obj = e
}
}
if (e_Edge_Obj != null){
val newEdge = graph.addEdge(null, v_Fqdn_Obj, v_IP_Obj, "E_VISIT_V_IP_TO_V_FQDN")
newEdge.setProperty("COUNT_TOTAL",e_Cnt)
newEdge.setProperty("FIRST_FOUND_TIME",e_First)
newEdge.setProperty("LAST_FOUND_TIME",e_Last)
println("insert e_visit_egde:"+fqdn+"-"+ip)
}
// graph.commit()
})
/*
v_FQDN_DF.printSchema()
v_FQDN_DF.coalesce(20).foreach(row => {
val fqdn = row.getAs[String](0)
val first = row.getAs[Long](1)
val last = row.getAs[Long](2)
val count = row.getAs[Long](3)
val session = pool.acquire()
val vertex = session.newVertex("v_FQDN")
vertex.setProperty("FQDN_NAME",fqdn)
vertex.setProperty("FIRST_FOUND_TIME", first)
vertex.setProperty("LAST_FOUND_TIME", last)
vertex.setProperty("FQDN_APPEAR_COUNT", count)
vertex
})
v_IP_DF.printSchema()
v_IP_DF.coalesce(20).foreach(row => {
val ip = row.getAs[String](0)
val first = row.getAs[Long](2)
val last = row.getAs[Long](3)
val count = row.getAs[Long](4)
val tx: OrientGraph = factory.getTx
val vertex = tx.addVertex("class:v_FQDN",Nil: _*)
vertex.setProperties("FQDN_NAME",ip)
vertex.setProperty("FIRST_FOUND_TIME", first)
vertex.setProperty("LAST_FOUND_TIME", last)
vertex.setProperty("FQDN_APPEAR_COUNT", count)
tx.commit()
})
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.foreach(row => {
val fqdn = row.getAs[String](0)
val ip = row.getAs[String](2)
val first = row.getAs[Long](3)
val last = row.getAs[Long](4)
val count = row.getAs[Long](5)
val session = pool.acquire()
val tx: OrientGraph = factory.getTx
tx.getFeatures.supportsVertexProperties
val vertex: OrientVertex = tx.getVertex()
tx.addEdge(null,vertex,vertex,"")
})
*/
}
}

View File

@@ -1,29 +0,0 @@
package cn.ac.iie.test
import com.arangodb.entity.DocumentField
import com.arangodb.entity.DocumentField.Type
import scala.beans.BeanProperty
class TestBaseEdgeDocument {
@BeanProperty
@DocumentField(Type.FROM)
var from: String=""
@BeanProperty
@DocumentField(Type.TO)
var to: String=""
@BeanProperty
@DocumentField(Type.KEY)
var key: String=""
@BeanProperty
@DocumentField(Type.ID)
var id: String=""
@BeanProperty
var FIRST_FOUND_TIME:Long = 0
@BeanProperty
var LAST_FOUND_TIME:Long = 0
@BeanProperty
var COUNT_TOTAL:Long = 0
}

View File

@@ -1,35 +0,0 @@
package cn.ac.iie.test
import cn.ac.iie.dao.{BaseMediaDataLoad, UpdateArangoGraphByDF}
import cn.ac.iie.utils.InitArangoDBPool
import com.arangodb.entity.BaseEdgeDocument
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object TestBaseEdgeDocumentDataFrame {
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
BaseMediaDataLoad.loadMediaDate(spark)
val e_Address_v_FQDN_to_v_IP_DF = BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark)
UpdateArangoGraphByDF.updateFQDNAddressIPEdge(e_Address_v_FQDN_to_v_IP_DF,spark)
InitArangoDBPool.arangoDB.shutdown()
spark.close()
}
}

View File

@@ -1,219 +0,0 @@
package cn.ac.iie.test
import com.arangodb.entity.{BaseDocument, BaseEdgeDocument}
import com.arangodb.util.MapBuilder
import com.arangodb.{ArangoCursor, ArangoDB}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Try
object TestIndices {
@transient
var arangoDB: ArangoDB = _
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable", s"(select media_domain,recv_time,s1_s_ip,s1_d_ip,s1_s_location_region,s1_d_location_region from av_miner.media_expire_patch where recv_time>=${Config.MINTIME} and recv_time<=${Config.MAXTIME})")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", Config.NUMPARTITIONS)
.option("partitionColumn", "recv_time")
.option("lowerBound", Config.MINTIME)
.option("upperBound", Config.MAXTIME)
.option("fetchsize", Config.SPARK_SQL_READ_FETCHSIZE)
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
val v_FQDN_DF = spark.sql(
"""
|SELECT
| media_domain AS FQDN_NAME,
| MIN( recv_time ) AS FQDN_FIRST_FOUND_TIME,
| MAX( recv_time ) AS FQDN_LAST_FOUND_TIME,
| COUNT( * ) AS FQDN_COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| media_domain != ''
|GROUP BY
| media_domain
""".stripMargin
)
val time1 = System.currentTimeMillis()
arangoDB = new ArangoDB.Builder()
.maxConnections(Config.MAXPOOLSIZE)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
val dbName = "insert_iplearn_index"
val collectionName = "V_FQDN"
val query = "FOR doc IN " + collectionName + " RETURN doc"
val bindVars = new MapBuilder().get
val cursor: ArangoCursor[BaseEdgeDocument] = arangoDB.db(dbName).query(query, bindVars, null, classOf[BaseEdgeDocument])
var cursor_Map = scala.collection.mutable.HashMap[String,BaseEdgeDocument]()
while (cursor.hasNext){
val document = cursor.next()
cursor_Map += (document.getKey -> document)
}
val time2 = System.currentTimeMillis()
println((time2 - time1)/1000)
val docs_Insert = new java.util.ArrayList[BaseDocument]()
val docs_Update = new java.util.ArrayList[BaseDocument]()
v_FQDN_DF.foreach(row => {
val fqdn = row.getAs[String]("FQDN_NAME")
val v_Fqdn_First = row.getAs[Long]("FQDN_FIRST_FOUND_TIME")
val v_Fqdn_Last = row.getAs[Long]("FQDN_LAST_FOUND_TIME")
val v_Fqdn_Cnt = row.getAs[Long]("FQDN_COUNT_TOTAL")
val doc = cursor_Map.getOrElse(fqdn, null)
if (doc != null) {
val document: BaseDocument = doc
val fqdn_Cnt = Try(document.getAttribute("FQDN_COUNT_TOTAL")).getOrElse(0).toString.toInt
document.addAttribute("FQDN_COUNT_TOTAL", fqdn_Cnt)
document.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
docs_Update.add(document)
} else {
val baseDocument: BaseDocument = new BaseDocument()
baseDocument.setKey(fqdn)
baseDocument.addAttribute("FQDN_NAME", fqdn)
baseDocument.addAttribute("FQDN_FIRST_FOUND_TIME", v_Fqdn_First)
baseDocument.addAttribute("FQDN_LAST_FOUND_TIME", v_Fqdn_Last)
baseDocument.addAttribute("FQDN_COUNT_TOTAL", v_Fqdn_Cnt)
docs_Insert.add(baseDocument)
}
})
// Try(v_FQDN_Coll.importDocuments(docs_Insert))
// Try(v_FQDN_Coll.updateDocuments(docs_Update))
/*
val db = arangoDB.db("insert_iplearn_index")
val coll = db.collection("E_ADDRESS_V_FQDN_TO_V_IP")
val docs = new java.util.ArrayList[BaseEdgeDocument]
val baseEdgeDocument2 = new BaseEdgeDocument
baseEdgeDocument2.setKey("test_edge_2.com")
baseEdgeDocument2.setFrom("V_FQDN/test_edge_2_from")
baseEdgeDocument2.setTo("V_IP/test_edge_2_to")
baseEdgeDocument2.addAttribute("e_add_test_str", "1Two3")
baseEdgeDocument2.addAttribute("e_add_test_num", 4321)
docs.add(baseEdgeDocument2)
coll.importDocuments(docs)
arangoDB.shutdown()
*/
/*
val uri: String = "remote:192.168.40.127/iplearning-insert"
val pool = new OPartitionedDatabasePool(uri, "root", "111111", 5, 5)
factory = new OrientGraphFactory(uri, "root", "111111", pool)
val graph = factory.getNoTx
val ip = "23.224.224.163"
import scala.collection.JavaConversions._
/*
for (v: Vertex <- graph.getVertices("v_IP.IP", ip)) {
val update_IP_Last = v.getProperty[Long]("LAST_FOUND_TIME")
val update_IP_Cnt = v.getProperty[Long]("IP_APPEAR_COUNT")
val sqlComm = new OCommandSQL(
s"UPDATE v_IP SET LAST_FOUND_TIME = $update_IP_Last,FQDN_APPEAR_COUNT = 100 "
+ s"WHERE IP == '$ip'")
Try(graph.command(sqlComm).execute())
println("update ip:" + ip)
}
*/
val v_IP_Obj = graph.addVertex("class:v_IP", Nil: _*)
v_IP_Obj.setProperty("IP", ip)
v_IP_Obj.setProperty("IP_LOCATION", "fas")
v_IP_Obj.setProperty("FIRST_FOUND_TIME", 1)
v_IP_Obj.setProperty("LAST_FOUND_TIME", 1)
v_IP_Obj.setProperty("IP_APPEAR_COUNT", 1)
*/
/*
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.config("arangodb.hosts", "192.168.40.127:8529")
.config("arangodb.user", "root")
.config("arangodb.password", "111111")
.master(Config.MASTER)
.getOrCreate()
val value: ArangoRDD[BaseDocument] = ArangoSpark
.load[BaseDocument](spark.sparkContext,"V_FQDN",ReadOptions("insert_iplearn_index"))
// var stringToDocument: Map[String, BaseDocument] = Map[String,BaseDocument]()
val lstBuffer: ListBuffer[(String, BaseDocument)] = ListBuffer[(String, BaseDocument)]()
val map: Map[String, BaseDocument] = value.map(doc => (doc.getKey,doc)).collect().toMap
println(map.size)
spark.close()
*/
/*
arangoDB = new ArangoDB.Builder()
.maxConnections(10)
.host("192.168.40.127", 8529)
.user("root")
.password("111111")
.build
val db = arangoDB.db("insert_iplearn_index")
// db.createCollection("V_FQDN")
// db.createCollection("V_IP")
// db.createCollection("E_ADDRESS_V_FQDN_TO_V_IP")
// db.createCollection("E_VISIT_V_IP_TO_V_FQDN")
val v_FQDN_Coll = db.collection("E_VISIT_V_IP_TO_V_FQDN")
*/
// val coll: ArangoCollection = db.collection("V_FQDN")
// val value = coll.getDocument("test1.com",classOf[BaseDocument])
// val str = value.getAttribute("v_fqdn_test_str")
// val num: Int = value.getAttribute("v_fqdn_test_num").toString.toInt
// println(str+"-"+num)
/*
val docs = new util.ArrayList[BaseDocument]
val baseDocument1 = new BaseDocument
baseDocument1.setKey("test1.com")
baseDocument1.addAttribute("v_fqdn_test_str", "one2three")
baseDocument1.addAttribute("v_fqdn_test_num", 1234)
docs.add(baseDocument1)
val baseDocument2 = new BaseDocument
baseDocument2.setKey("test2.com")
baseDocument2.addAttribute("v_fqdn_test_str", "1Two3")
baseDocument2.addAttribute("v_fqdn_test_num", 4321)
docs.add(baseDocument2)
coll.importDocuments(docs)
*/
// arangoDB.shutdown()
}
}

View File

@@ -1,56 +0,0 @@
package cn.ac.iie.test
import cn.ac.iie.dao.BaseMediaDataLoad
import cn.ac.iie.etl.CursorTransform
import cn.ac.iie.pojo.BaseVertexFqdn
import cn.ac.iie.utils.InitArangoDBPool
import com.arangodb.ArangoCursor
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object TestSparkJoin {
private val logger: Logger = LoggerFactory.getLogger(TestSparkJoin.getClass)
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", Config.SPARK_SERIALIZER)
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", Config.SPARK_SQL_SHUFFLE_PARTITIONS)
.config("spark.executor.memory", Config.SPARK_EXECUTOR_MEMORY)
.master(Config.MASTER)
.getOrCreate()
logger.warn("sparkession获取成功")
BaseMediaDataLoad.loadMediaDate(spark)
// val v_FQDN_DF = BaseMediaDataLoad.getFQDNVertexFromMedia(spark)
BaseMediaDataLoad.getFQDNAddressIPEdgeFromMedia(spark).show(10)
// v_FQDN_DF.printSchema()
/*
val arangoDB = InitArangoDBPool.arangoDB
val options = InitArangoDBPool.options
val bindVars = InitArangoDBPool.bindVars
// val v_FQDN_Query = "FOR doc IN V_FQDN limit 100 RETURN doc"
// val v_FQDN_Query = "FOR doc IN V_FQDN RETURN doc"
// val v_FQDN_Cursor: ArangoCursor[BaseVertexFqdn] = arangoDB.db("insert_iplearn_noindex")
// .query(v_FQDN_Query, bindVars, options, classOf[BaseVertexFqdn])
// val v_FQDN_Curson_DF: DataFrame = spark.createDataFrame(v_FQDN_Cursor.asListRemaining(),classOf[BaseVertexFqdn])
// v_FQDN_Curson_DF.printSchema()
//
val v_FQDN_Curson_DF = CursorTransform.cursorToDataFrame("V_FQDN",classOf[BaseVertexFqdn],spark)
val v_Fqdn_Join_Df = v_FQDN_DF.join(v_FQDN_Curson_DF,v_FQDN_DF("new_fqdn_name")===v_FQDN_Curson_DF("key"),"fullouter")
v_Fqdn_Join_Df.printSchema()
v_Fqdn_Join_Df
// .filter(row => row.getAs[String]("new_fqdn_name")!=null)
// .filter(row => row.getAs[String]("new_fqdn_name")!=null)
.show(300)
arangoDB.shutdown()
spark.close()
*/
}
}

View File

@@ -1,34 +0,0 @@
package cn.ac.iie.utils
import com.typesafe.config.{Config, ConfigFactory}
object ConfigUtils {
private lazy val config: Config = ConfigFactory.load()
val SPARK_SQL_SHUFFLE_PARTITIONS: String = config.getString("spark.sql.shuffle.partitions")
val SPARK_SQL_READ_FETCHSIZE: String = config.getString("spark.sql.read.fetchsize")
val SPARK_EXECUTOR_MEMORY: String = config.getString("spark.executor.memory")
val SPARK_APP_NAME: String = config.getString("spark.app.name")
val SPARK_NETWORK_TIMEOUT: String = config.getString("spark.network.timeout")
val REPARTITION_NUMBER: Int = config.getInt("repartitionNumber")
val SPARK_SERIALIZER: String = config.getString("spark.serializer")
val NUMPARTITIONS: String = config.getString("numPartitions")
val MASTER: String = config.getString("master")
val MAXPOOLSIZE: Int = config.getInt("maxPoolSize")
val MINTIME: String = config.getString("minTime")
val MAXTIME: String = config.getString("maxTime")
val ARANGODB_HOST: String= config.getString("arangoDB.host")
val ARANGODB_PORT: Int = config.getInt("arangoDB.port")
val ARANGODB_USER: String= config.getString("arangoDB.user")
val ARANGODB_PASSWORD:String= config.getString("arangoDB.password")
val ARANGODB_BATCH: Int = config.getInt("arangoDB.batch")
val ARANGODB_DB_NAME:String= config.getString("arangoDB.DB.name")
val ARANGODB_TTL: Int = config.getInt("arangoDB.ttl")
val CLICKHOUSE_SOCKET_TIMEOUT: Int = config.getInt("clickhouse.socket.timeout")
val THREAD_POOL_NUMBER: Int = config.getInt("thread.pool.number")
}

View File

@@ -1,5 +0,0 @@
package cn.ac.iie.utils
object DateTimeUtils {
}

View File

@@ -1,24 +0,0 @@
package cn.ac.iie.utils
import java.util
import com.arangodb.ArangoDB
import com.arangodb.model.AqlQueryOptions
import com.arangodb.util.MapBuilder
object InitArangoDBPool {
@transient
lazy val arangoDB: ArangoDB = new ArangoDB.Builder()
.maxConnections(ConfigUtils.MAXPOOLSIZE)
.host(ConfigUtils.ARANGODB_HOST, ConfigUtils.ARANGODB_PORT)
.user(ConfigUtils.ARANGODB_USER)
.password(ConfigUtils.ARANGODB_PASSWORD)
.build
val bindVars: util.Map[String, AnyRef] = new MapBuilder().get
val options: AqlQueryOptions = new AqlQueryOptions()
.ttl(ConfigUtils.ARANGODB_TTL)
}

View File

@@ -1,32 +0,0 @@
package cn.ac.iie;
import com.tinkerpop.blueprints.impls.orient.OrientGraph;
public class CreateObjectJavaTest {
public static void main(String[] args) {
String orientDbProtocol = "remote";
String orientDbHost = "192.168.40.127";
String orientDbDatabase= "iplearning-insert";
String orientDbUsername= "root";
String orientDbPassword= "111111";
// String orientDbUri = "${orientDbProtocol}:${orientDbHost}/${orientDbDatabase}";
String orientDbUri = orientDbProtocol+":"+orientDbHost+"/"+orientDbDatabase;
OrientGraph dblpOrientDbGraph = new OrientGraph(orientDbUri, orientDbUsername, orientDbPassword);
// val orientDB: OrientGraph = new OrientGraph(orientDbUri,orientDbUsername, orientDbPassword)
try {
System.out.println((dblpOrientDbGraph.isAutoStartTx()));
dblpOrientDbGraph.dropEdgeType("author_of ");
dblpOrientDbGraph.commit();
}catch (Exception ex){
//catching errors & print out
System.out.println(ex.getMessage());
}
finally {
//close the current OrientDb's connection
dblpOrientDbGraph.shutdown();
}
}
}

View File

@@ -1,157 +0,0 @@
package cn.ac.iie
import java.lang
import com.orientechnologies.orient.core.command.OCommandRequest
import com.orientechnologies.orient.core.config.OGlobalConfiguration
import com.orientechnologies.orient.core.db._
import com.orientechnologies.orient.core.metadata.schema.{OClass, OType}
import com.orientechnologies.orient.core.sql.OCommandSQL
import com.tinkerpop.blueprints.Vertex
import com.tinkerpop.blueprints.impls.orient.{OrientDynaElementIterable, OrientGraph, OrientGraphFactory}
object CreateObjectTest {
def main(args: Array[String]): Unit = {
/*
val orientDbProtocol: String = "remote"
val orientDbHost: String = "192.168.40.127"
val orientDbDatabase: String = "iplearning-insert"
val orientDbUsername: String = "root"
val orientDbPassword: String = "111111"
val orientDbUri: String = s"${orientDbProtocol}:${orientDbHost}/${orientDbDatabase}"
val orientDB: OrientGraph = new OrientGraph(orientDbUri, orientDbUsername, orientDbPassword)
try {
println("---------------")
println(orientDB.isAutoStartTx)
orientDB.dropEdgeType("author-paper ")
orientDB.commit()
}catch {
//catching errors & print out
case ex: Exception => println(ex.getMessage)
}
finally {
//close the current OrientDb's connection
orientDB.shutdown()
}
*/
/*
val poolCfg = OrientDBConfig.builder
poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN, 5)
poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX, 10)
val pool: ODatabasePool = new ODatabasePool(orientDB, "iplearning-insert", "root", "111111", poolCfg.build)
val session: ODatabaseSession = pool.acquire()
println(session.isValidationEnabled)
session.close()
*/
/*
val info = new Properties()
info.put("user", "root")
info.put("password", "111111")
info.put("db.usePool", "true"); // USE THE POOL
info.put("db.pool.min", "3"); // MINIMUM POOL SIZE
val conn: OrientJdbcConnection = DriverManager.getConnection("jdbc:orient:remote:192.168.40.127/iplearing-index", info).asInstanceOf[OrientJdbcConnection]
println(conn.isValid(1))
*/
val uri: String = "remote:192.168.40.127/iplearning-insert"
val pool = new OPartitionedDatabasePool(uri,"root","111111",20,20)
val factory: OrientGraphFactory = new OrientGraphFactory(uri,"root","111111",pool)
// val graph: OrientGraph = factory.getTx()
val graph = factory.getNoTx
/*
val v_FQDN = graph.createVertexType("v_FQDN")
v_FQDN.createProperty("FQDN_NAME", OType.STRING)
v_FQDN.createProperty("FIRST_FOUND_TIME", OType.LONG)
v_FQDN.createProperty("LAST_FOUND_TIME", OType.LONG)
v_FQDN.createProperty("FQDN_APPEAR_COUNT", OType.LONG)
val v_IP = graph.createVertexType("v_IP")
v_IP.createProperty("IP", OType.STRING)
v_IP.createProperty("FIRST_FOUND_TIME", OType.LONG)
v_IP.createProperty("LAST_FOUND_TIME", OType.LONG)
v_IP.createProperty("IP_APPEAR_COUNT", OType.LONG)
val e_FQDN_to_IP = graph.createEdgeType("E_ADDRESS_V_FQDN_TO_V_IP")
e_FQDN_to_IP.createProperty("FIRST_FOUND_TIME", OType.LONG)
e_FQDN_to_IP.createProperty("LAST_FOUND_TIME", OType.LONG)
e_FQDN_to_IP.createProperty("COUNT_TOTAL", OType.LONG)
val e_IP_to_FQDN = graph.createEdgeType("E_VISIT_V_IP_TO_V_FQDN")
e_IP_to_FQDN.createProperty("FIRST_FOUND_TIME", OType.LONG)
e_IP_to_FQDN.createProperty("LAST_FOUND_TIME", OType.LONG)
e_IP_to_FQDN.createProperty("COUNT_TOTAL", OType.LONG)
*/
/*
val orientDB = new OrientDB("remote:192.168.40.127", "root", "111111", OrientDBConfig.defaultConfig)
val poolCfg = OrientDBConfig.builder
poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MIN, 5)
poolCfg.addConfig(OGlobalConfiguration.DB_POOL_MAX, 10)
val pool: ODatabasePool = new ODatabasePool(orientDB, "iplearning-insert", "root", "111111", poolCfg.build)
val session: ODatabaseSession = pool.acquire()
println(session.isClosed)
*/
/*
val vertex = graph.addVertex("class:v_FQDN",Nil: _*)
vertex.setProperties("FQDN_NAME","123com1234")
vertex.setProperty("FIRST_FOUND_TIME", 1571241623)
vertex.setProperty("LAST_FOUND_TIME", 1571241570)
vertex.setProperty("FQDN_APPEAR_COUNT", 5)
val v_IP = graph.addVertex("class:v_IP",Nil: _*)
v_IP.setProperty("IP","3.3.3.3")
v_IP.setProperty("IP_LOCATION","cn")
v_IP.setProperty("FIRST_FOUND_TIME", 1571241623)
v_IP.setProperty("LAST_FOUND_TIME", 1571241570)
v_IP.setProperty("IP_APPEAR_COUNT", 6543)
graph.addEdge(null,vertex,v_IP,"E_ADDRESS_V_FQDN_TO_V_IP")
*/
val sqlComm = new OCommandSQL(
"UPDATE E_ADDRESS_V_FQDN_TO_V_IP SET LAST_FOUND_TIME = 1412,FQDN_APPEAR_COUNT = 1244"
+ "UPSERT WHERE 'v5-dy.ixigua.com' IN FQDN_NAME")
graph.command(sqlComm).execute()
import scala.collection.JavaConversions._
/*
if (graph.getVertices("v_FQDN.FQDN_NAME","v5-dy.ixigua.com111111").nonEmpty){
for (v <- graph.getVertices("v_FQDN.FQDN_NAME","v5-dy.ixigua.com1")) {
println(v == null)
}
}
val sqlComm = new OCommandSQL(
"UPDATE v_FQDN SET LAST_FOUND_TIME = 1412,FQDN_APPEAR_COUNT = 1244"
+ "WHERE 'v5-dy.ixigua.com' IN FQDN_NAME")
graph.command(sqlComm).execute()
*/
/*
println(graphFactory.exists())
val tx = graphFactory.getTx
val v_FQDN = tx.createVertexType("V_FQDN")
v_FQDN.createProperty("FQDN_NAME",OType.STRING)
v_FQDN.createProperty("FIRST_FOUND_TIME",OType.LONG)
v_FQDN.createProperty("LAST_FOUND_TIME",OType.LONG)
*/
}
}

View File

@@ -1,78 +0,0 @@
package cn.ac.iie
import org.apache.spark.sql.{DataFrame, SparkSession}
object HiveUnionTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.network.timeout", "300s")
.config("spark.sql.shuffle.partitions", 50)
.master("local[*]")
/*
.config("spark.executor.memory", "30g")
.config("spark.driver.host", "192.168.41.79")
.config("spark.jars", "D:\\GITREPO\\ip-learning\\target\\ip-learning-1.0-SNAPSHOT-jar-with-dependencies.jar")
.master("spark://192.168.40.119:7077")
*/
.getOrCreate()
val sql =
"""
|(SELECT s1_s_ip AS IP, s1_s_location_region AS location,recv_time FROM av_miner.media_expire_patch limit 100
|UNION ALL
|SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM av_miner.media_expire_patch limit 100)
""".stripMargin
val mediaDataFrame: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:clickhouse://192.168.40.193:8123")
.option("dbtable","(select * from av_miner.media_expire_patch limit 100) as media_expire_patch")
// .option("dbtable", "av_miner.media_expire_patch")
// .option("dbtable", sql + " as a")
.option("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.option("user", "default")
.option("password", "111111")
.option("numPartitions", "40")
.option("fetchsize", "1000000")
.load()
mediaDataFrame.printSchema()
mediaDataFrame.createOrReplaceGlobalTempView("media_expire_patch")
val frame = spark.sql(
"""
select IP,location,MIN(recv_time) AS FIRST_FOUND_TIME, MAX(recv_time) AS LAST_FOUND_TIME, COUNT(*) AS COUNT_TOTAL from (
(SELECT s1_s_ip AS IP, s1_s_location_region AS location,recv_time FROM global_temp.media_expire_patch limit 100)
UNION ALL
(SELECT s1_d_ip AS IP, s1_d_location_region AS location, recv_time FROM global_temp.media_expire_patch limit 100)
) group by IP,location
""".stripMargin)
val e_Address_v_FQDN_to_v_IP_DF = spark.sql(
"""
|SELECT
| media_domain AS V_FQDN,
| media_type,
| s1_d_ip AS V_IP,
| MIN( recv_time ) AS FIRST_FOUND_TIME,
| MAX( recv_time ) AS LAST_FOUND_TIME,
| COUNT( * ) AS COUNT_TOTAL
|FROM
| global_temp.media_expire_patch
|WHERE
| ( media_domain != '' )
| AND ( s1_d_ip != '' )
|GROUP BY
| s1_d_ip,
| media_domain,
| media_type
""".stripMargin)
e_Address_v_FQDN_to_v_IP_DF.printSchema()
e_Address_v_FQDN_to_v_IP_DF.show(200)
// mediaDataFrame.show(20)
}
}

View File

@@ -1,11 +0,0 @@
package cn.ac.iie
object TestMap {
def main(args: Array[String]): Unit = {
var mapTest: Map[String, Int] = Map[String,Int]()
mapTest += ("1" -> 1)
println(mapTest.size)
}
}