diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java index 9b1f6c2..2b6fcef 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/service/read/ReadHistoryArangoData.java @@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch; * @author wlh * 多线程全量读取arangoDb历史数据,封装到map */ +@SuppressWarnings("unchecked") public class ReadHistoryArangoData extends Thread { public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60; private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class); @@ -92,7 +93,7 @@ public class ReadHistoryArangoData extends Thread { for (String protocol : PROTOCOL_SET) { String protocolRecent = protocol + "_CNT_RECENT"; ArrayList cntRecent = (ArrayList) doc.getAttribute(protocolRecent); - Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]); + Long[] cntRecentsSrc = cntRecent.toArray(new Long[0]); Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR]; System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1); cntRecentsDst[0] = 0L; @@ -104,6 +105,11 @@ public class ReadHistoryArangoData extends Thread { private void deleteDistinctClientIpByTime(T doc) { ArrayList distCip = (ArrayList) doc.getAttribute("DIST_CIP"); ArrayList distCipTs = (ArrayList) doc.getAttribute("DIST_CIP_TS"); + if (distCip == null || distCip.isEmpty()){ + doc.updateAttribute("DIST_CIP", new String[0]); + doc.updateAttribute("DIST_CIP_TS", new long[0]); + return; + } distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600); Collections.sort(distCipTs); int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600); diff --git a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java index d5fb1b8..f6d3c5f 100644 --- a/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java +++ b/ip-learning-spark/src/main/java/cn/ac/iie/utils/ArangoDBConnect.java @@ -70,26 +70,6 @@ public class ArangoDBConnect { } } - @Deprecated - public void insertAndUpdate(ArrayList docInsert,ArrayList docUpdate,String collectionName){ - ArangoDatabase database = getDatabase(); - try { - ArangoCollection collection = database.collection(collectionName); - if (!docInsert.isEmpty()){ - collection.importDocuments(docInsert); - } - if (!docUpdate.isEmpty()){ - collection.replaceDocuments(docUpdate); - } - }catch (Exception e){ - System.out.println("更新失败"); - e.printStackTrace(); - }finally { - docInsert.clear(); - docInsert.clear(); - } - } - public void overwrite(ArrayList docOverwrite,String collectionName){ ArangoDatabase database = getDatabase(); try { @@ -101,11 +81,11 @@ public class ArangoDBConnect { MultiDocumentEntity> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions); Collection errors = documentCreateEntityMultiDocumentEntity.getErrors(); for (ErrorEntity errorEntity:errors){ - LOG.warn("写入arangoDB异常:"+errorEntity.getErrorMessage()); + LOG.debug("写入arangoDB异常:"+errorEntity.getErrorMessage()); } } }catch (Exception e){ - System.out.println("更新失败:"+e.toString()); + LOG.error("更新arangoDB失败:"+e.toString()); }finally { docOverwrite.clear(); } diff --git a/ip-learning-spark/src/main/resources/application.properties b/ip-learning-spark/src/main/resources/application.properties index 77006b8..9a61792 100644 --- a/ip-learning-spark/src/main/resources/application.properties +++ b/ip-learning-spark/src/main/resources/application.properties @@ -8,11 +8,9 @@ spark.serializer=org.apache.spark.serializer.KryoSerializer master=local[*] #spark读取clickhouse配置 spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.12:8123/tsg_galaxy_v3 -#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3 spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver spark.read.clickhouse.user=default spark.read.clickhouse.password=ceiec2019 -#spark.read.clickhouse.password=111111 spark.read.clickhouse.numPartitions=5 spark.read.clickhouse.fetchsize=10000 spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala index e37b959..4d66deb 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/dao/BaseClickhouseData.scala @@ -107,45 +107,35 @@ object BaseClickhouseData { def getVertexIpDf: DataFrame ={ loadConnectionDataFromCk() + val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1 val sql = - """ - |SELECT - | * - |FROM - | ( - | ( - | SELECT - | common_client_ip AS IP, - | MIN(common_recv_time) AS FIRST_FOUND_TIME, - | MAX(common_recv_time) AS LAST_FOUND_TIME, - | count(*) as SESSION_COUNT, - | sum(common_c2s_byte_num) as BYTES_SUM, - | 'client' as ip_type - | FROM - | global_temp.dbtable - | GROUP BY - | IP - | ) - | UNION ALL - | ( - | SELECT - | common_server_ip AS IP, - | MIN(common_recv_time) AS FIRST_FOUND_TIME, - | MAX(common_recv_time) AS LAST_FOUND_TIME, - | count(*) as SESSION_COUNT, - | sum(common_s2c_byte_num) as BYTES_SUM, - | 'server' as ip_type - | FROM - | global_temp.dbtable - | GROUP BY - | IP - | ) - | ) + s""" + |(SELECT * FROM + |((SELECT common_client_ip AS IP,MIN(common_end_time) AS FIRST_FOUND_TIME, + |MAX(common_end_time) AS LAST_FOUND_TIME, + |count(*) as SESSION_COUNT, + |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, + |groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info, + |'client' as ip_type + |FROM tsg_galaxy_v3.connection_record_log + |where $where + |group by common_client_ip) + |UNION ALL + |(SELECT common_server_ip AS IP, + |MIN(common_end_time) AS FIRST_FOUND_TIME, + |MAX(common_end_time) AS LAST_FOUND_TIME, + |count(*) as SESSION_COUNT, + |SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM, + |groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info, + |'server' as ip_type + |FROM tsg_galaxy_v3.connection_record_log + |where $where + |group by common_server_ip))) as dbtable """.stripMargin LOG.warn(sql) - val vertexIpDf = spark.sql(sql) - vertexIpDf.printSchema() - vertexIpDf + val frame = initClickhouseData(sql) + frame.printSchema() + frame } /* @@ -196,16 +186,16 @@ object BaseClickhouseData { */ def getRelationFqdnLocateIpDf: DataFrame ={ - val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1 + val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1 val sql = s""" |(SELECT * FROM - |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type |FROM tsg_galaxy_v3.connection_record_log |WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip) |UNION ALL - |(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, + |(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL, |toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type |FROM tsg_galaxy_v3.connection_record_log |WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip)) diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala index 7ae7f30..0a55ed8 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/transform/MergeDataFrame.scala @@ -38,7 +38,8 @@ object MergeDataFrame { max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"), collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"), collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"), - collect_list("ip_type").alias("ip_type_list") + collect_list("ip_type").alias("ip_type_list"), + last("common_link_info").alias("common_link_info") ) val values = frame.rdd.map(row => (row.get(0), row)) .partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala index ddaf145..f56f6f2 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocHandler.scala @@ -26,6 +26,10 @@ object UpdateDocHandler { hisDoc.addAttribute(attributeName,newAttribute+hisAttritube) } + def replaceAttribute(hisDoc: BaseDocument,newAttribute:String,attributeName:String): Unit ={ + hisDoc.addAttribute(attributeName,newAttribute) + } + def separateAttributeByIpType(ipTypeList:ofRef[String], sessionCountList:ofRef[AnyRef], bytesSumList:ofRef[AnyRef]): (Long,Long,Long,Long) ={ diff --git a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala index b3719fb..1c8dd91 100644 --- a/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala +++ b/ip-learning-spark/src/main/scala/cn/ac/iie/service/update/UpdateDocument.scala @@ -174,6 +174,7 @@ object UpdateDocument { val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST") val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST") val ipTypeList = row.getAs[ofRef[String]]("ip_type_list") + val linkInfo = row.getAs[String]("common_link_info") val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList) var document = dictionaryMap.getOrDefault(ip, null) @@ -183,6 +184,7 @@ object UpdateDocument { updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM") updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT") updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM") + replaceAttribute(document,linkInfo,"COMMON_LINK_INFO") } else { document = new BaseDocument document.setKey(ip) @@ -193,7 +195,7 @@ object UpdateDocument { document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2) document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3) document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4) - document.addAttribute("COMMON_LINK_INFO", "") + document.addAttribute("COMMON_LINK_INFO", linkInfo) } document }