格式化代码

This commit is contained in:
wanglihui
2020-09-29 10:26:25 +08:00
parent e7ff669d4c
commit e0f5b20ab6
7 changed files with 47 additions and 66 deletions

View File

@@ -19,6 +19,7 @@ import java.util.concurrent.CountDownLatch;
* @author wlh
* 多线程全量读取arangoDb历史数据封装到map
*/
@SuppressWarnings("unchecked")
public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
public static long currentHour = System.currentTimeMillis() / (60 * 60 * 1000) * 60 * 60;
private static final Logger LOG = LoggerFactory.getLogger(ReadHistoryArangoData.class);
@@ -92,7 +93,7 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
for (String protocol : PROTOCOL_SET) {
String protocolRecent = protocol + "_CNT_RECENT";
ArrayList<Long> cntRecent = (ArrayList<Long>) doc.getAttribute(protocolRecent);
Long[] cntRecentsSrc = cntRecent.toArray(new Long[cntRecent.size()]);
Long[] cntRecentsSrc = cntRecent.toArray(new Long[0]);
Long[] cntRecentsDst = new Long[RECENT_COUNT_HOUR];
System.arraycopy(cntRecentsSrc, 0, cntRecentsDst, 1, cntRecentsSrc.length - 1);
cntRecentsDst[0] = 0L;
@@ -104,6 +105,11 @@ public class ReadHistoryArangoData<T extends BaseDocument> extends Thread {
private void deleteDistinctClientIpByTime(T doc) {
ArrayList<String> distCip = (ArrayList<String>) doc.getAttribute("DIST_CIP");
ArrayList<Long> distCipTs = (ArrayList<Long>) doc.getAttribute("DIST_CIP_TS");
if (distCip == null || distCip.isEmpty()){
doc.updateAttribute("DIST_CIP", new String[0]);
doc.updateAttribute("DIST_CIP_TS", new long[0]);
return;
}
distCipTs.add(currentHour - RECENT_COUNT_HOUR * 3600);
Collections.sort(distCipTs);
int index = distCipTs.indexOf(currentHour - RECENT_COUNT_HOUR * 3600);

View File

@@ -70,26 +70,6 @@ public class ArangoDBConnect {
}
}
@Deprecated
public <T> void insertAndUpdate(ArrayList<T> docInsert,ArrayList<T> docUpdate,String collectionName){
ArangoDatabase database = getDatabase();
try {
ArangoCollection collection = database.collection(collectionName);
if (!docInsert.isEmpty()){
collection.importDocuments(docInsert);
}
if (!docUpdate.isEmpty()){
collection.replaceDocuments(docUpdate);
}
}catch (Exception e){
System.out.println("更新失败");
e.printStackTrace();
}finally {
docInsert.clear();
docInsert.clear();
}
}
public <T> void overwrite(ArrayList<T> docOverwrite,String collectionName){
ArangoDatabase database = getDatabase();
try {
@@ -101,11 +81,11 @@ public class ArangoDBConnect {
MultiDocumentEntity<DocumentCreateEntity<T>> documentCreateEntityMultiDocumentEntity = collection.insertDocuments(docOverwrite, documentCreateOptions);
Collection<ErrorEntity> errors = documentCreateEntityMultiDocumentEntity.getErrors();
for (ErrorEntity errorEntity:errors){
LOG.warn("写入arangoDB异常"+errorEntity.getErrorMessage());
LOG.debug("写入arangoDB异常"+errorEntity.getErrorMessage());
}
}
}catch (Exception e){
System.out.println("更新失败:"+e.toString());
LOG.error("更新arangoDB失败:"+e.toString());
}finally {
docOverwrite.clear();
}

View File

@@ -8,11 +8,9 @@ spark.serializer=org.apache.spark.serializer.KryoSerializer
master=local[*]
#spark读取clickhouse配置
spark.read.clickhouse.url=jdbc:clickhouse://192.168.44.12:8123/tsg_galaxy_v3
#spark.read.clickhouse.url=jdbc:clickhouse://192.168.40.186:8123/tsg_galaxy_v3
spark.read.clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
spark.read.clickhouse.user=default
spark.read.clickhouse.password=ceiec2019
#spark.read.clickhouse.password=111111
spark.read.clickhouse.numPartitions=5
spark.read.clickhouse.fetchsize=10000
spark.read.clickhouse.partitionColumn=LAST_FOUND_TIME

View File

@@ -107,45 +107,35 @@ object BaseClickhouseData {
def getVertexIpDf: DataFrame ={
loadConnectionDataFromCk()
val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
val sql =
"""
|SELECT
| *
|FROM
| (
| (
| SELECT
| common_client_ip AS IP,
| MIN(common_recv_time) AS FIRST_FOUND_TIME,
| MAX(common_recv_time) AS LAST_FOUND_TIME,
s"""
|(SELECT * FROM
|((SELECT common_client_ip AS IP,MIN(common_end_time) AS FIRST_FOUND_TIME,
|MAX(common_end_time) AS LAST_FOUND_TIME,
|count(*) as SESSION_COUNT,
| sum(common_c2s_byte_num) as BYTES_SUM,
|SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
|groupUniqArray(2)(common_link_info_c2s)[2] as common_link_info,
|'client' as ip_type
| FROM
| global_temp.dbtable
| GROUP BY
| IP
| )
|FROM tsg_galaxy_v3.connection_record_log
|where $where
|group by common_client_ip)
|UNION ALL
| (
| SELECT
| common_server_ip AS IP,
| MIN(common_recv_time) AS FIRST_FOUND_TIME,
| MAX(common_recv_time) AS LAST_FOUND_TIME,
|(SELECT common_server_ip AS IP,
|MIN(common_end_time) AS FIRST_FOUND_TIME,
|MAX(common_end_time) AS LAST_FOUND_TIME,
|count(*) as SESSION_COUNT,
| sum(common_s2c_byte_num) as BYTES_SUM,
|SUM(common_c2s_byte_num+common_s2c_byte_num) as BYTES_SUM,
|groupUniqArray(2)(common_link_info_s2c)[2] as common_link_info,
|'server' as ip_type
| FROM
| global_temp.dbtable
| GROUP BY
| IP
| )
| )
|FROM tsg_galaxy_v3.connection_record_log
|where $where
|group by common_server_ip))) as dbtable
""".stripMargin
LOG.warn(sql)
val vertexIpDf = spark.sql(sql)
vertexIpDf.printSchema()
vertexIpDf
val frame = initClickhouseData(sql)
frame.printSchema()
frame
}
/*
@@ -196,16 +186,16 @@ object BaseClickhouseData {
*/
def getRelationFqdnLocateIpDf: DataFrame ={
val where = "common_end_time >= " + timeLimit._2 + " AND common_end_time < " + timeLimit._1
val where = "common_recv_time >= " + timeLimit._2 + " AND common_recv_time < " + timeLimit._1
val sql =
s"""
|(SELECT * FROM
|((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|((SELECT ssl_sni AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'TLS' AS schema_type
|FROM tsg_galaxy_v3.connection_record_log
|WHERE $where and common_schema_type = 'SSL' GROUP BY ssl_sni,common_server_ip)
|UNION ALL
|(SELECT http_host AS FQDN,common_server_ip,MAX(common_end_time) AS LAST_FOUND_TIME,MIN(common_end_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|(SELECT http_host AS FQDN,common_server_ip,MAX(common_recv_time) AS LAST_FOUND_TIME,MIN(common_recv_time) AS FIRST_FOUND_TIME,COUNT(*) AS COUNT_TOTAL,
|toString(groupUniqArray(${ApplicationConfig.DISTINCT_CLIENT_IP_NUM})(common_client_ip)) AS DIST_CIP_RECENT,'HTTP' AS schema_type
|FROM tsg_galaxy_v3.connection_record_log
|WHERE $where and common_schema_type = 'HTTP' GROUP BY http_host,common_server_ip))

View File

@@ -38,7 +38,8 @@ object MergeDataFrame {
max("LAST_FOUND_TIME").alias("LAST_FOUND_TIME"),
collect_list("SESSION_COUNT").alias("SESSION_COUNT_LIST"),
collect_list("BYTES_SUM").alias("BYTES_SUM_LIST"),
collect_list("ip_type").alias("ip_type_list")
collect_list("ip_type").alias("ip_type_list"),
last("common_link_info").alias("common_link_info")
)
val values = frame.rdd.map(row => (row.get(0), row))
.partitionBy(new CustomPartitioner(ApplicationConfig.SPARK_SQL_SHUFFLE_PARTITIONS)).values

View File

@@ -26,6 +26,10 @@ object UpdateDocHandler {
hisDoc.addAttribute(attributeName,newAttribute+hisAttritube)
}
def replaceAttribute(hisDoc: BaseDocument,newAttribute:String,attributeName:String): Unit ={
hisDoc.addAttribute(attributeName,newAttribute)
}
def separateAttributeByIpType(ipTypeList:ofRef[String],
sessionCountList:ofRef[AnyRef],
bytesSumList:ofRef[AnyRef]): (Long,Long,Long,Long) ={

View File

@@ -174,6 +174,7 @@ object UpdateDocument {
val sessionCountList = row.getAs[ofRef[AnyRef]]("SESSION_COUNT_LIST")
val bytesSumList = row.getAs[ofRef[AnyRef]]("BYTES_SUM_LIST")
val ipTypeList = row.getAs[ofRef[String]]("ip_type_list")
val linkInfo = row.getAs[String]("common_link_info")
val sepAttributeTuple = separateAttributeByIpType(ipTypeList, sessionCountList, bytesSumList)
var document = dictionaryMap.getOrDefault(ip, null)
@@ -183,6 +184,7 @@ object UpdateDocument {
updateSumAttribute(document, sepAttributeTuple._2, "SERVER_BYTES_SUM")
updateSumAttribute(document, sepAttributeTuple._3, "CLIENT_SESSION_COUNT")
updateSumAttribute(document, sepAttributeTuple._4, "CLIENT_BYTES_SUM")
replaceAttribute(document,linkInfo,"COMMON_LINK_INFO")
} else {
document = new BaseDocument
document.setKey(ip)
@@ -193,7 +195,7 @@ object UpdateDocument {
document.addAttribute("SERVER_BYTES_SUM", sepAttributeTuple._2)
document.addAttribute("CLIENT_SESSION_COUNT", sepAttributeTuple._3)
document.addAttribute("CLIENT_BYTES_SUM", sepAttributeTuple._4)
document.addAttribute("COMMON_LINK_INFO", "")
document.addAttribute("COMMON_LINK_INFO", linkInfo)
}
document
}