diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index b3c0c15..1206494 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -83,8 +83,7 @@ public class ArangoDBConnect { collection.replaceDocuments(docUpdate); } } catch (Exception e) { - System.out.println("更新失败"); - e.printStackTrace(); + LOG.error("update failure:" + e.toString()); } finally { docInsert.clear(); docInsert.clear(); @@ -102,11 +101,11 @@ public class ArangoDBConnect { MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); for (ErrorEntity errorEntity : errors) { - LOG.warn("写入arangoDB异常:" + errorEntity.getErrorMessage()); + LOG.warn("write arangoDB error:" + errorEntity.getErrorMessage()); } } } catch (Exception e) { - LOG.error("更新失败:" + e.toString()); + LOG.error("update failure:" + e.toString()); } finally { docOverwrite.clear(); } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala index 8a38062..7342fa0 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseArangoData.scala @@ -15,7 +15,7 @@ object BaseArangoData { def loadArangoRdd[T: ClassTag](name:String): ArangoRdd[T] ={ val value = ArangoSpark.load[T](sparkContext, name, options) - LOG.warn(s"读取$name arangoDb:${value.count()}") + LOG.warn(s"read $name arangoDb: ${value.count()}") value } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index b8e91f7..00f71df 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -1,13 +1,13 @@ package cn.ac.iie.dao -import java.util.Date - import cn.ac.iie.config.ApplicationConfig import cn.ac.iie.utils.SparkSessionUtil.spark import com.zdjizhi.utils.DateUtils import org.apache.spark.sql.DataFrame import org.slf4j.LoggerFactory +import java.util.Date + object BaseClickhouseData { private val LOG = LoggerFactory.getLogger(BaseClickhouseData.getClass) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index bbce2cf..958736b 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -50,7 +50,8 @@ object MergeDataFrame { def mergeRelationFqdnLocateIp(): RDD[(String, (Option[BaseEdgeDocument], Row))] = { val frame = BaseClickhouseData.getRelationFqdnLocateIpDf - .repartition().filter(row => isDomain(row.getAs[String]("FQDN"))) + .repartition() + .filter(row => isDomain(row.getAs[String]("FQDN"))) .groupBy("FQDN", "server_ip", "VSYS_ID") .agg( min("FIRST_FOUND_TIME").alias("FIRST_FOUND_TIME"), @@ -59,13 +60,21 @@ object MergeDataFrame { collect_list("decoded_as_list").alias("decoded_as_list"), collect_set("DIST_CIP_RECENT").alias("DIST_CIP_RECENT") ) - val fqdnLocIpRddRow = frame.rdd.map(row => { - val fqdn = row.getAs[String]("FQDN") - val serverIp = row.getAs[String]("server_ip") - val vsysId = row.getAs[Integer]("VSYS_ID").toLong - val key = fqdn.concat("-" + serverIp + "-" + vsysId) - (key, row) - }) /*.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS))*/ + + val fqdnLocIpRddRow = frame.rdd + .filter(row => { + // 检查 server_ip 和 VSYS_ID 是否为 null + val serverIp = row.getAs[String]("server_ip") + val vsysId = row.getAs[Integer]("VSYS_ID") + serverIp != null && vsysId != null + }) + .map(row => { + val fqdn = row.getAs[String]("FQDN") + val serverIp = row.getAs[String]("server_ip") + val vsysId = row.getAs[Integer]("VSYS_ID").toLong + val key = fqdn.concat("-" + serverIp + "-" + vsysId) + (key, row) + }) val fqdnLocIpRddDoc = BaseArangoData.loadArangoRdd[BaseEdgeDocument]("R_LOCATE_FQDN2IP") fqdnLocIpRddDoc.map(doc => (doc.getKey, doc)).rightOuterJoin(fqdnLocIpRddRow) @@ -133,7 +142,7 @@ object MergeDataFrame { } } catch { case e: Exception => - LOG.error("解析域名 " + fqdn + " 失败:\n" + e.toString) + LOG.error("Domain name resolution " + fqdn + " failure:\n" + e.toString) } false diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index 96b3dcd..82ff420 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -78,20 +78,20 @@ object UpdateDocument { i += 1 if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, collName) - LOG.warn(s"更新:$collName" + i) + LOG.warn(s"update $collName: " + i) i = 0 } }) if (i != 0) { arangoManger.overwrite(resultDocumentList, collName) - LOG.warn(s"更新$collName:" + i) + LOG.warn(s"update $collName: " + i) } }) - LOG.warn(s"更新$collName 条数:${fqdnAccmu.value}") + LOG.warn(s"update $collName rows: ${fqdnAccmu.value}") val last = System.currentTimeMillis() - LOG.warn(s"更新$collName 时间:${last - start}") + LOG.warn(s"update $collName time: ${last - start}") } catch { case e: Exception => e.printStackTrace() } @@ -107,13 +107,13 @@ object UpdateDocument { i += 1 if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { arangoManger.overwrite(resultDocumentList, "IP") - LOG.warn(s"更新:IP" + i) + LOG.warn(s"update IP: " + i) i = 0 } }) if (i != 0) { arangoManger.overwrite(resultDocumentList, "IP") - LOG.warn(s"更新IP:" + i) + LOG.warn(s"update IP: " + i) } }) } diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala index cce388e..65737c6 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/utils/SparkSessionUtil.scala @@ -29,7 +29,7 @@ object SparkSessionUtil { .config("arangodb.password", ApplicationConfig.ARANGODB_PASSWORD) .master(ApplicationConfig.MASTER) .getOrCreate() - LOG.warn("sparkession获取成功!!!") + LOG.warn("spark session start success!!!") spark }