IP Learning report 版本

This commit is contained in:
wanglihui
2020-08-27 09:19:41 +08:00
parent 6265bb5e90
commit 33c0d826ab
4 changed files with 48 additions and 29 deletions

View File

@@ -44,14 +44,14 @@ public class UpdateGraphData {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
try { try {
updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class, // updateDocument(newVertexFqdnMap, historyVertexFqdnMap, "FQDN", Fqdn.class,BaseDocument.class,
ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument); // ReadClickhouseData::getVertexFqdnSql,ReadClickhouseData::getVertexFqdnDocument);
//
updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class, // updateDocument(newVertexIpMap,historyVertexIpMap,"IP", Ip.class,BaseDocument.class,
ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument); // ReadClickhouseData::getVertexIpSql,ReadClickhouseData::getVertexIpDocument);
//
updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class, // updateDocument(newVertexSubscriberMap,historyVertexSubscriberMap,"SUBSCRIBER", Subscriber.class,BaseDocument.class,
ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument); // ReadClickhouseData::getVertexSubscriberSql,ReadClickhouseData::getVertexSubscriberDocument);
updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class, updateDocument(newRelationFqdnAddressIpMap,historyRelationFqdnAddressIpMap,"R_LOCATE_FQDN2IP", LocateFqdn2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument); ReadClickhouseData::getRelationshipFqdnAddressIpSql,ReadClickhouseData::getRelationFqdnAddressIpDocument);
@@ -60,9 +60,9 @@ public class UpdateGraphData {
// VisitIp2Fqdn.class,BaseEdgeDocument.class, // VisitIp2Fqdn.class,BaseEdgeDocument.class,
// ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument); // ReadClickhouseData::getRelationshipIpVisitFqdnSql,ReadClickhouseData::getRelationIpVisitFqdnDocument);
updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP", // updateDocument(newRelationSubsciberLocateIpMap,historyRelationSubsciberLocateIpMap,"R_LOCATE_SUBSCRIBER2IP",
LocateSubscriber2Ip.class,BaseEdgeDocument.class, // LocateSubscriber2Ip.class,BaseEdgeDocument.class,
ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument); // ReadClickhouseData::getRelationshipSubsciberLocateIpSql,ReadClickhouseData::getRelationshipSubsciberLocateIpDocument);
long last = System.currentTimeMillis(); long last = System.currentTimeMillis();
@@ -86,7 +86,7 @@ public class UpdateGraphData {
) { ) {
try { try {
baseArangoData.readHistoryData(collection,historyMap,docmentType); // baseArangoData.readHistoryData(collection,historyMap,docmentType);
LOG.info(collection+" 读取clickhouse,封装结果集"); LOG.info(collection+" 读取clickhouse,封装结果集");
baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc); baseClickhouseData.baseDocumentFromClickhouse(newMap, getSqlSupplier,formatResultFunc);

View File

@@ -153,7 +153,7 @@ public class ReadClickhouseData {
clientIpTs[i] = currentHour; clientIpTs[i] = currentHour;
} }
String key = vFqdn + "-" + vIp; String key = vFqdn + "-!!-" + vIp;
newDoc = new BaseEdgeDocument(); newDoc = new BaseEdgeDocument();
newDoc.setKey(key); newDoc.setKey(key);
newDoc.setFrom("FQDN/" + vFqdn); newDoc.setFrom("FQDN/" + vFqdn);
@@ -177,7 +177,7 @@ public class ReadClickhouseData {
String vFqdn = resultSet.getString("FQDN"); String vFqdn = resultSet.getString("FQDN");
if (isDomain(vFqdn)) { if (isDomain(vFqdn)) {
String vIp = resultSet.getString("common_client_ip"); String vIp = resultSet.getString("common_client_ip");
String key = vIp + "-" + vFqdn; String key = vIp + "-!!-" + vFqdn;
long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME"); long firstFoundTime = resultSet.getLong("FIRST_FOUND_TIME");
long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME"); long lastFoundTime = resultSet.getLong("LAST_FOUND_TIME");
long countTotal = resultSet.getLong("COUNT_TOTAL"); long countTotal = resultSet.getLong("COUNT_TOTAL");
@@ -214,12 +214,8 @@ public class ReadClickhouseData {
if (fqdn == null || fqdn.length() == 0){ if (fqdn == null || fqdn.length() == 0){
return false; return false;
} }
if (fqdn.contains(":")){ fqdn = fqdn.split(":")[0];
String s = fqdn.split(":")[0];
if (s.contains(":")){
return false;
}
}
String[] fqdnArr = fqdn.split("\\."); String[] fqdnArr = fqdn.split("\\.");
if (fqdnArr.length < 4 || fqdnArr.length > 4) { if (fqdnArr.length < 4 || fqdnArr.length > 4) {
return true; return true;

View File

@@ -2,16 +2,23 @@ package cn.ac.iie.service.update;
import cn.ac.iie.config.ApplicationConfig; import cn.ac.iie.config.ApplicationConfig;
import cn.ac.iie.utils.ArangoDBConnect; import cn.ac.iie.utils.ArangoDBConnect;
import cn.ac.iie.utils.ClickhouseConnect;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.arangodb.entity.BaseDocument; import com.arangodb.entity.BaseDocument;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseArray;
import ru.yandex.clickhouse.domain.ClickHouseDataType;
import java.sql.PreparedStatement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import static cn.ac.iie.service.ingestion.ReadClickhouseData.currentHour;
public class Document<T extends BaseDocument> extends Thread{ public class Document<T extends BaseDocument> extends Thread{
private static final Logger LOG = LoggerFactory.getLogger(Document.class); private static final Logger LOG = LoggerFactory.getLogger(Document.class);
private HashMap<String, ArrayList<T>> newDocumentMap; private HashMap<String, ArrayList<T>> newDocumentMap;
@@ -20,6 +27,8 @@ public class Document<T extends BaseDocument> extends Thread{
private ConcurrentHashMap<String, T> historyDocumentMap; private ConcurrentHashMap<String, T> historyDocumentMap;
private CountDownLatch countDownLatch; private CountDownLatch countDownLatch;
private ClickhouseConnect manger = ClickhouseConnect.getInstance();
Document(HashMap<String, ArrayList<T>> newDocumentMap, Document(HashMap<String, ArrayList<T>> newDocumentMap,
ArangoDBConnect arangoManger, ArangoDBConnect arangoManger,
String collectionName, String collectionName,
@@ -39,25 +48,39 @@ public class Document<T extends BaseDocument> extends Thread{
LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+""); LOG.info("新读取数据"+newDocumentMap.size()+"条,历史数据"+historyDocumentMap.size()+"");
try { try {
Set<String> keySet = newDocumentMap.keySet(); Set<String> keySet = newDocumentMap.keySet();
ArrayList<T> resultDocumentList = new ArrayList<>(); DruidPooledConnection connection = manger.getConnection();
String sql = "INSERT INTO ip_learning.r_locate_fqdn2ip_local VALUES(?,?,?,?,?,?,?,?,?)";
PreparedStatement pstm = connection.prepareStatement(sql);
int i = 0; int i = 0;
for (String key : keySet) { for (String key : keySet) {
ArrayList<T> newDocumentSchemaList = newDocumentMap.getOrDefault(key, null); ArrayList<T> newDocumentSchemaList = newDocumentMap.getOrDefault(key, null);
if (newDocumentSchemaList != null) { if (newDocumentSchemaList != null) {
T newDocument = mergeDocument(newDocumentSchemaList); T newDocument = mergeDocument(newDocumentSchemaList);
String[] splitKey = key.split("-!!-");
pstm.setString(1,splitKey[0]);
pstm.setString(2,splitKey[1]);
pstm.setLong(3,Long.parseLong(newDocument.getAttribute("FIRST_FOUND_TIME").toString()));
pstm.setLong(4,Long.parseLong(newDocument.getAttribute("LAST_FOUND_TIME").toString()));
pstm.setLong(5,Long.parseLong(newDocument.getAttribute("DNS_CNT_TOTAL").toString()));
pstm.setLong(6,Long.parseLong(newDocument.getAttribute("TLS_CNT_TOTAL").toString()));
pstm.setLong(7,Long.parseLong(newDocument.getAttribute("HTTP_CNT_TOTAL").toString()));
Object[] distCips = (Object[]) newDocument.getAttribute("DIST_CIP");
pstm.setArray(8,new ClickHouseArray(ClickHouseDataType.Int64, distCips));
pstm.setLong(9,currentHour);
i += 1; i += 1;
T historyDocument = historyDocumentMap.getOrDefault(key, null); pstm.addBatch();
updateDocument(newDocument,historyDocument,resultDocumentList);
if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) { if (i >= ApplicationConfig.UPDATE_ARANGO_BATCH) {
arangoManger.overwrite(resultDocumentList, collectionName); pstm.executeBatch();
LOG.info("更新"+collectionName+":" + i); connection.commit();
LOG.warn("写入clickhouse数据量" + i);
i = 0; i = 0;
} }
} }
} }
if (i != 0) { if (i != 0) {
arangoManger.overwrite(resultDocumentList, collectionName); pstm.executeBatch();
LOG.info("更新"+collectionName+":" + i); connection.commit();
LOG.warn("写入clickhouse数据量" + i);
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

View File

@@ -15,8 +15,8 @@ thread.await.termination.time=10
#读取clickhouse时间范围方式0读取过去一小时1指定时间范围 #读取clickhouse时间范围方式0读取过去一小时1指定时间范围
time.limit.type=0 time.limit.type=1
read.clickhouse.max.time=1598323368 read.clickhouse.max.time=1598433621
read.clickhouse.min.time=1597222501 read.clickhouse.min.time=1597222501
update.interval=3600 update.interval=3600