diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index 1206494..3931078 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -83,7 +83,7 @@ public class ArangoDBConnect { collection.replaceDocuments(docUpdate); } } catch (Exception e) { - LOG.error("update failure:" + e.toString()); + LOG.error("update failure: " + e.toString()); } finally { docInsert.clear(); docInsert.clear(); @@ -101,11 +101,11 @@ public class ArangoDBConnect { MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); for (ErrorEntity errorEntity : errors) { - LOG.warn("write arangoDB error:" + errorEntity.getErrorMessage()); + LOG.warn("write arangoDB error: " + errorEntity.getErrorMessage()); } } } catch (Exception e) { - LOG.error("update failure:" + e.toString()); + LOG.error("update failure: " + e.toString()); } finally { docOverwrite.clear(); } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index 00f71df..51ac9e6 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -99,13 +99,13 @@ object BaseClickhouseData { val sql = s""" |(SELECT * FROM - |((SELECT ssl_sni AS FQDN,server_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, - |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(client_ip)) AS DIST_CIP_RECENT,'TLS' AS decoded_as_list, vsys_id AS VSYS_ID + |((SELECT ssl_sni AS FQDN,server_ip AS destination_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(source_ip)) AS DIST_CIP_RECENT,'TLS' AS decoded_as_list, vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |WHERE $where and decoded_as = 'SSL' GROUP BY ssl_sni,server_ip,vsys_id) |UNION ALL - |(SELECT http_host AS FQDN,server_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, - |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(client_ip)) AS DIST_CIP_RECENT,'HTTP' AS decoded_as_list,vsys_id AS VSYS_ID + |(SELECT http_host AS FQDN,server_ip AS destination_ip,MAX(recv_time) AS LAST_FOUND_TIME,MIN(recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(source_ip)) AS DIST_CIP_RECENT,'HTTP' AS decoded_as_list,vsys_id AS VSYS_ID |FROM ${ApplicationConfig.SPARK_READ_CLICKHOUSE_SESSION_TABLE} |WHERE $where and decoded_as = 'HTTP' GROUP BY http_host,server_ip,vsys_id)) |WHERE FQDN != '') as dbtable diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 958736b..47623d0 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -50,8 +50,10 @@ object MergeDataFrame { def mergeRelationFqdnLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Row))] = { val frame = BaseClickhouseData.getRelationFqdnLocateIpDf - .repartition() - .filter(row => isDomain(row.getAs[String]("FQDN"))) + .repartition(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS) + .filter(row => isDomain(row.getAs[String]("FQDN")) && + row.getAs[String]("server_ip") != null && + row.getAs[Integer]("VSYS_ID") != null) .groupBy("FQDN", "server_ip", "VSYS_ID") .agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), @@ -61,14 +63,7 @@ object MergeDataFrame { collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") ) - val fqdnLocIpRddRow = frame.rdd - .filter(row => { - // 检查 server_ip 和 VSYS_ID 是否为 null - val serverIp = row.getAs[String]("server_ip") - val vsysId = row.getAs[Integer]("VSYS_ID") - serverIp != null && vsysId != null - }) - .map(row => { + val fqdnLocIpRddRow = frame.rdd.map(row => { val fqdn = row.getAs[String]("FQDN") val serverIp = row.getAs[String]("server_ip") val vsysId = row.getAs[Integer]("VSYS_ID").toLong diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 82ff420..5319812 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -72,7 +72,6 @@ object UpdateDocument { val document: T = getDocumentRow(row) if (document != null) { fqdnAccmu.add(1) -// println(document) resultDocumentList.add(document) } i += 1 @@ -91,7 +90,7 @@ object UpdateDocument { LOG.warn(s"update $collName rows: ${fqdnAccmu.value}") val last = System.currentTimeMillis() - LOG.warn(s"update $collName time: ${last - start}") + LOG.warn(s"update $collName time: ${last - start}ms") } catch { case e: Exception => e.printStackTrace() } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala index b498df8..1c065d4 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/spark/rdd/ArangoRdd.scala @@ -49,10 +49,10 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext, val separate = split.separate val collection = options.collection val sql = s"FOR doc IN $collection limit $offset,$separate RETURN doc" - LOG.info(sql) + LOG.info(s"Executing query: $sql") arangoCursor = arangoDB.db(options.database).query(sql,bindVars,queryOptions,clazz.runtimeClass.asInstanceOf[Class[T]]) }catch { - case e: Exception => LOG.error(s"创建Cursor异常:${e.getMessage}") + case e: Exception => LOG.error(s"create Cursor error: ${e.getMessage}") }finally { arangoDB.shutdown() } @@ -81,7 +81,7 @@ class ArangoRdd[T: ClassTag](@transient override val sparkContext: SparkContext, cnt = ApplicationConfig.ARANGODB_TOTAL_NUM } } catch { - case e: Exception => LOG.error(sql + s"执行异常:${e.getMessage}") + case e: Exception => LOG.error(sql + s"execute error: ${e.getMessage}") }finally { arangoDB.shutdown() } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala index 65737c6..2523d32 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala @@ -27,6 +27,7 @@ object SparkSessionUtil { .config("arangodb.hosts", s"${ApplicationConfig.ARANGODB_HOST}:${ApplicationConfig.ARANGODB_PORT}") .config("arangodb.user", ApplicationConfig.ARANGODB_USER) .config("arangodb.password", ApplicationConfig.ARANGODB_PASSWORD) + .config("spark.sql.objectHashAggregate.sortBased.fallbackThreshold", Integer.MAX_VALUE) .master(ApplicationConfig.MASTER) .getOrCreate() LOG.warn("spark session start success!!!")