YSP分析spark版本

This commit is contained in:
wanglihui
2020-08-11 15:18:45 +08:00
parent ad6582893b
commit 55879a2c32
7 changed files with 75 additions and 78 deletions

View File

@@ -64,16 +64,6 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
int i = 0; int i = 0;
for (T doc : baseDocuments) { for (T doc : baseDocuments) {
String key = doc.getKey(); String key = doc.getKey();
switch (table) {
case "R_LOCATE_FQDN2IP":
updateProtocolDocument(doc);
deleteDistinctClientIpByTime(doc);
break;
case "R_VISIT_IP2FQDN":
updateProtocolDocument(doc);
break;
default:
}
int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER(); int hashCode = Math.abs(key.hashCode()) % ApplicationConfig.THREAD_POOL_NUMBER();
ConcurrentHashMap<String, T> tmpMap = map.get(hashCode); ConcurrentHashMap<String, T> tmpMap = map.get(hashCode);
tmpMap.put(key, doc); tmpMap.put(key, doc);

View File

@@ -0,0 +1,25 @@
package cn.ac.iie.utils;
public class TopDomainUtils {
/**
* 通用方法,传入url,返回domain,这里的domain不包含端口号,含有:一定是v6
* @param oriUrl
* @return
*/
public static String getDomainFromUrl(String oriUrl) {
String url = oriUrl.split("[?]")[0];
url = url.replaceAll("https://", "").replaceAll("http://", "");
String domain;
if (url.split("/")[0].split(":").length <= 2) {
domain = url
.split("/")[0]
.split(":")[0];
} else {
domain = url.split("/")[0];
}
return domain;
}
}

View File

@@ -7,13 +7,14 @@ repartitionNumber=36
spark.serializer=org.apache.spark.serializer.KryoSerializer spark.serializer=org.apache.spark.serializer.KryoSerializer
master=local[*] master=local[*]
#spark读取clickhouse配置 #spark读取clickhouse配置
spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 #spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.193:8123/tsg_galaxy_zx
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
spark.read.clickhouse.user=default spark.read.clickhouse.user=default
spark.read.clickhouse.password=111111 spark.read.clickhouse.password=111111
spark.read.clickhouse.numPartitions=144 spark.read.clickhouse.numPartitions=144
spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.fetchsize=10000
spark.read.clickhouse.partitionColumn=common_start_time spark.read.clickhouse.partitionColumn=recv_time
clickhouse.socket.timeout=300000 clickhouse.socket.timeout=300000
#arangoDB配置 #arangoDB配置
arangoDB.host=192.168.40.182 arangoDB.host=192.168.40.182
@@ -27,9 +28,9 @@ arangoDB.ttl=3600
thread.pool.number=5 thread.pool.number=5
#读取clickhouse时间范围方式0读取过去一小时1指定时间范围 #读取clickhouse时间范围方式0读取过去一小时1指定时间范围
clickhouse.time.limit.type=0 clickhouse.time.limit.type=1
read.clickhouse.max.time=1571245220 read.clickhouse.max.time=1571241720
read.clickhouse.min.time=1571245210 read.clickhouse.min.time=1571241600
#读取arangoDB时间范围方式0正常读1指定时间范围 #读取arangoDB时间范围方式0正常读1指定时间范围
arango.time.limit.type=0 arango.time.limit.type=0

View File

@@ -2,6 +2,7 @@ package cn.ac.iie.dao
import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.config.ApplicationConfig
import cn.ac.iie.utils.SparkSessionUtil.spark import cn.ac.iie.utils.SparkSessionUtil.spark
import cn.ac.iie.utils.TopDomainUtils
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.DataFrame
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@@ -31,13 +32,13 @@ object BaseClickhouseData {
} }
def loadConnectionDataFromCk(): Unit ={ def loadConnectionDataFromCk(): Unit ={
val where = "common_start_time >= " + timeLimit._2 + " AND common_start_time < " + timeLimit._1 val where = "recv_time >= " + timeLimit._2 + " AND recv_time < " + timeLimit._1
val sql = val sql =
s""" s"""
|(SELECT |(SELECT
| ssl_sni,http_host,common_client_ip,common_server_ip,common_start_time,common_c2s_byte_num,common_s2c_byte_num,common_schema_type | s1_domain,s1_referer,s1_s_ip,s1_d_ip,recv_time,media_len
|FROM |FROM
| connection_record_log | media_expire_patch
|WHERE $where) as dbtable |WHERE $where) as dbtable
""".stripMargin """.stripMargin
@@ -68,28 +69,31 @@ object BaseClickhouseData {
initClickhouseData(sql) initClickhouseData(sql)
} }
def getDomain(url:String): String ={
TopDomainUtils.getDomainFromUrl(url)
}
def getVertexFqdnDf: DataFrame ={ def getVertexFqdnDf: DataFrame ={
loadConnectionDataFromCk() loadConnectionDataFromCk()
spark.udf.register("getDomain",TopDomainUtils.getDomainFromUrl _)
val sql = val sql =
""" """
|SELECT |SELECT
| FQDN,MAX( LAST_FOUND_TIME ) AS LAST_FOUND_TIME,MIN( FIRST_FOUND_TIME ) AS FIRST_FOUND_TIME | FQDN,MAX(LAST_FOUND_TIME) AS LAST_FOUND_TIME,MIN(FIRST_FOUND_TIME) AS FIRST_FOUND_TIME
|FROM |FROM
| ( | (
| (SELECT | (SELECT
| ssl_sni AS FQDN,MAX( common_start_time ) AS LAST_FOUND_TIME,MIN( common_start_time ) AS FIRST_FOUND_TIME | s1_domain AS FQDN,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME
| FROM | FROM
| global_temp.dbtable | global_temp.dbtable
| WHERE | GROUP BY s1_domain
| common_schema_type = 'SSL' GROUP BY ssl_sni
| ) | )
| UNION ALL | UNION ALL
| (SELECT | (SELECT
| http_host AS FQDN,MAX( common_start_time ) AS LAST_FOUND_TIME,MIN( common_start_time ) AS FIRST_FOUND_TIME | getDomain(s1_referer) AS FQDN,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME
| FROM | FROM
| global_temp.dbtable | global_temp.dbtable
| WHERE | GROUP BY getDomain(s1_referer)
| common_schema_type = 'HTTP' GROUP BY http_host
| ) | )
| ) | )
|GROUP BY |GROUP BY
@@ -103,6 +107,11 @@ object BaseClickhouseData {
vertexFqdnDf vertexFqdnDf
} }
def main(args: Array[String]): Unit = {
val df = getRelationFqdnLocateIpDf
df.show(10)
}
def getVertexIpDf: DataFrame ={ def getVertexIpDf: DataFrame ={
loadConnectionDataFromCk() loadConnectionDataFromCk()
val sql = val sql =
@@ -113,11 +122,11 @@ object BaseClickhouseData {
| ( | (
| ( | (
| SELECT | SELECT
| common_client_ip AS IP, | s1_s_ip AS IP,
| MIN(common_start_time) AS FIRST_FOUND_TIME, | MIN(recv_time) AS FIRST_FOUND_TIME,
| MAX(common_start_time) AS LAST_FOUND_TIME, | MAX(recv_time) AS LAST_FOUND_TIME,
| count(*) as SESSION_COUNT, | count(*) as SESSION_COUNT,
| sum(common_c2s_byte_num) as BYTES_SUM, | sum(media_len) as BYTES_SUM,
| 'client' as ip_type | 'client' as ip_type
| FROM | FROM
| global_temp.dbtable | global_temp.dbtable
@@ -127,11 +136,11 @@ object BaseClickhouseData {
| UNION ALL | UNION ALL
| ( | (
| SELECT | SELECT
| common_server_ip AS IP, | s1_d_ip AS IP,
| MIN(common_start_time) AS FIRST_FOUND_TIME, | MIN(recv_time) AS FIRST_FOUND_TIME,
| MAX(common_start_time) AS LAST_FOUND_TIME, | MAX(recv_time) AS LAST_FOUND_TIME,
| count(*) as SESSION_COUNT, | count(*) as SESSION_COUNT,
| sum(common_s2c_byte_num) as BYTES_SUM, | sum(media_len) as BYTES_SUM,
| 'server' as ip_type | 'server' as ip_type
| FROM | FROM
| global_temp.dbtable | global_temp.dbtable
@@ -148,42 +157,23 @@ object BaseClickhouseData {
def getRelationFqdnLocateIpDf: DataFrame ={ def getRelationFqdnLocateIpDf: DataFrame ={
loadConnectionDataFromCk() loadConnectionDataFromCk()
val sslSql =
"""
|SELECT
| ssl_sni AS FQDN,
| common_server_ip,
| MAX(common_start_time) AS LAST_FOUND_TIME,
| MIN(common_start_time) AS FIRST_FOUND_TIME,
| COUNT(*) AS COUNT_TOTAL,
| collect_set(common_client_ip) AS DIST_CIP_RECENT,
| 'TLS' AS schema_type
|FROM
| global_temp.dbtable
|WHERE
| common_schema_type = 'SSL'
|GROUP BY
| ssl_sni,common_server_ip
""".stripMargin
val httpSql = val sql =
""" """
|SELECT |SELECT
| http_host AS FQDN, | s1_domain AS FQDN,
| common_server_ip, | s1_d_ip AS common_server_ip,
| MAX(common_start_time) AS LAST_FOUND_TIME, | MAX(recv_time) AS LAST_FOUND_TIME,
| MIN(common_start_time) AS FIRST_FOUND_TIME, | MIN(recv_time) AS FIRST_FOUND_TIME,
| COUNT(*) AS COUNT_TOTAL, | COUNT(*) AS COUNT_TOTAL,
| collect_set(common_client_ip) AS DIST_CIP_RECENT, | collect_set(s1_s_ip) AS DIST_CIP_RECENT
| 'HTTP' AS schema_type
|FROM |FROM
| global_temp.dbtable | global_temp.dbtable
|WHERE |WHERE
| common_schema_type = 'HTTP' | s1_domain != ''
|GROUP BY |GROUP BY
| http_host,common_server_ip | s1_domain,s1_d_ip
""".stripMargin """.stripMargin
val sql = s"SELECT * FROM (($sslSql) UNION ALL ($httpSql)) WHERE FQDN != ''"
LOG.warn(sql) LOG.warn(sql)
val relationFqdnLocateIpDf = spark.sql(sql) val relationFqdnLocateIpDf = spark.sql(sql)

View File

@@ -35,16 +35,9 @@ object MergeDataFrame {
} }
def mergeRelationFqdnLocateIp(): RDD[Row] ={ def mergeRelationFqdnLocateIp(): RDD[Row] ={
val frame = BaseClickhouseData.getRelationFqdnLocateIpDf.filter(row => isDomain(row.getAs[String]("FQDN"))) BaseClickhouseData.getRelationFqdnLocateIpDf
.groupBy("FQDN", "common_server_ip") .filter(row => isDomain(row.getAs[String]("FQDN")))
.agg( .rdd.map(row => {
min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"),
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("COUNT_TOTAL").alias("COUNT_TOTAL_LIST"),
collect_list("schema_type").alias("schema_type_list"),
collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT")
)
frame.rdd.map(row => {
val fqdn = row.getAs[String]("FQDN") val fqdn = row.getAs[String]("FQDN")
val serverIp = row.getAs[String]("common_server_ip") val serverIp = row.getAs[String]("common_server_ip")
val key = fqdn.concat("-"+serverIp) val key = fqdn.concat("-"+serverIp)

View File

@@ -93,8 +93,8 @@ object UpdateDocHandler {
doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",","")) doc.addAttribute("PROTOCOL_TYPE",protocolTypeBuilder.toString().replaceFirst(",",""))
} }
def mergeDistinctIp(distCipRecent:ofRef[ofRef[String]]): Array[String] ={ def mergeDistinctIp(distCipRecent:ofRef[String]): Array[String] ={
distCipRecent.flatten.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray distCipRecent.distinct.take(ApplicationConfig.DISTINCT_CLIENT_IP_NUM).toArray
} }
def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={ def putDistinctIp(doc:BaseEdgeDocument,newDistinctIp:Array[String]): Unit ={

View File

@@ -131,18 +131,16 @@ object UpdateDocument {
val serverIp = row.getAs[String]("common_server_ip") val serverIp = row.getAs[String]("common_server_ip")
val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME") val firstFoundTime = row.getAs[Long]("FIRST_FOUND_TIME")
val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME") val lastFoundTime = row.getAs[Long]("LAST_FOUND_TIME")
val countTotalList = row.getAs[ofRef[AnyRef]]("COUNT_TOTAL_LIST") val countTotal = row.getAs[Long]("COUNT_TOTAL")
val schemaTypeList = row.getAs[ofRef[AnyRef]]("schema_type_list") val distCipRecent = row.getAs[ofRef[String]]("DIST_CIP_RECENT")
val distCipRecent = row.getAs[ofRef[ofRef[String]]]("DIST_CIP_RECENT")
val sepAttritubeMap: Map[String, Long] = separateAttributeByProtocol(schemaTypeList, countTotalList)
val distinctIp: Array[String] = mergeDistinctIp(distCipRecent) val distinctIp: Array[String] = mergeDistinctIp(distCipRecent)
val key = fqdn.concat("-" + serverIp) val key = fqdn.concat("-" + serverIp)
var document = dictionaryMap.getOrDefault(key, null) var document = dictionaryMap.getOrDefault(key, null)
if (document != null) { if (document != null) {
updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME") updateMaxAttribute(document, lastFoundTime, "LAST_FOUND_TIME")
updateProtocolAttritube(document, sepAttritubeMap) updateSumAttribute(document,countTotal,"CNT_TOTAL")
updateDistinctIp(document, distinctIp) updateDistinctIp(document, distinctIp)
} else { } else {
document = new BaseEdgeDocument() document = new BaseEdgeDocument()
@@ -151,7 +149,7 @@ object UpdateDocument {
document.setTo("IP/" + serverIp) document.setTo("IP/" + serverIp)
document.addAttribute("FIRST_FOUND_TIME", firstFoundTime) document.addAttribute("FIRST_FOUND_TIME", firstFoundTime)
document.addAttribute("LAST_FOUND_TIME", lastFoundTime) document.addAttribute("LAST_FOUND_TIME", lastFoundTime)
putProtocolAttritube(document, sepAttritubeMap) document.addAttribute("CNT_TOTAL",countTotal)
putDistinctIp(document, distinctIp) putDistinctIp(document, distinctIp)
} }
document document